Legend:
Library
Module
Module type
Parameter
Class
Class type
A way to limit the number of concurrent computations.
A throttle is essentially a pipe to which one can feed jobs.
A throttle schedules asynchronous jobs so that at any point in time no more than max_concurrent_jobs jobs are running. A job f is considered to be running from the time f () is executed until the deferred returned by f () becomes determined, or f () raises. The throttle initiates jobs first-come first-served.
One can use create_with to create a throttle with "resources" that one would like to make available to concurrent jobs and to guarantee that different jobs access different resources.
A throttle is killed if one of its jobs throws an exception, and the throttle has continue_on_error = false. A throttle can also be explicitly killed. If a throttle is killed, then all jobs in it that haven't yet started are aborted, i.e. they will not start and will become determined with `Aborted. Jobs that had already started will continue, and return `Ok or `Raised as usual when they finish. Jobs enqueued into a killed throttle will be immediately aborted.
We use a phantom type to distinguish between throttles, which have max_concurrent_jobs >= 1, and sequencers, which have max_concurrent_jobs = 1. All operations are available on both. We make the distinction because it is sometimes useful to know from the type of a throttle that it is a sequencer and that at most one job can be running at a time.
val sexp_of_t : ('a->Sexplib.Sexp.t)->'at->Sexplib.Sexp.t
includeCore_kernel.Invariant.S1 withtype'a t := 'at
val invariant : 'aBase__.Invariant_intf.inv->'atBase__.Invariant_intf.inv
val create : continue_on_error:bool ->max_concurrent_jobs:int ->unit t
create ~continue_on_error ~max_concurrent_jobs returns a throttle that will run up to max_concurrent_jobs concurrently.
If some job raises an exception, then the throttle will be killed, unless continue_on_error is true.
val create_with : continue_on_error:bool ->'a list->'at
create_with ~continue_on_error job_resources returns a throttle that will run up to List.length job_resources concurrently, and will ensure that all running jobs are supplied distinct elements of job_resources.
type'a outcome = [
| `Ok of'a
| `Aborted
| `Raised of exn
]
includesig ... end
val sexp_of_outcome : ('a->Sexplib.Sexp.t)->'aoutcome->Sexplib.Sexp.t
enqueue t job schedules job to be run as soon as possible. Jobs are guaranteed to be started in the order they are enqueued and to not be started during the call to enqueue. If t is dead, then job will be immediately aborted (for enqueue this will send an exception to the monitor in effect).
monad_sequence_how ~how ~f returns a function that behaves like f, except that it uses a throttle to limit the number of concurrent invocations can be running simultaneously. The throttle has continue_on_error = false.
num_jobs_running t returns the number of jobs that t is currently running. It is guaranteed that if num_jobs_running t < max_concurrent_jobs t then num_jobs_waiting_to_start t = 0. That is, the throttle always uses its maximum concurrency if possible.
capacity_available t becomes determined the next time that t has fewer than max_concurrent_jobs t running, and hence an enqueued job would start immediately.
kill t kills t, which aborts all enqueued jobs that haven't started and all jobs enqueued in the future. kill also initiates the at_kill clean functions.
If t has already been killed, then calling kill t has no effect.
at_kill t clean runs clean on each resource when t is killed, either by kill or an unhandled exception. clean is called on each resource as it becomes available, i.e. immediately if the resource isn't currently in use, otherwise when the job currently using the resource finishes. If a call to clean fails, the exception is raised to the monitor in effect when at_kill was called.