package fiber

  1. Overview
  2. Docs

Concurrency library

This module implements "structured concurrency".

  • alert unstable The API of this library is not stable and may change without notice.

Generals

type 'a t

Type of fiber. A fiber represent a suspended computation. Note that using the same fiber twice will execute it twice, which is probably not what you want. To share the result of a fiber, use an Ivar.t.

type 'a fiber := 'a t
val return : 'a -> 'a t

Create a fiber that has already terminated.

val of_thunk : (unit -> 'a t) -> 'a t

Converts a thunk to a fiber, making sure the thunk runs in the context of the fiber (rather than applied in the current context).

Equivalent to (>>=) (return ()), but more explicit.

val never : 'a t

Fiber that never completes.

module O : sig ... end
val map : 'a t -> f:('a -> 'b) -> 'b t
val bind : 'a t -> f:('a -> 'b t) -> 'b t

Joining

The following combinators are helpers to combine the result of several fibers into one. Note that they do not introduce parallelism.

val both : 'a t -> 'b t -> ('a * 'b) t
val all : 'a t list -> 'a list t

Execute a list of fibers in sequence. We use the short name to conform with the Applicative interface.

val sequential_map : 'a list -> f:('a -> 'b t) -> 'b list t
val sequential_iter : 'a list -> f:('a -> unit t) -> unit t

Forking + joining

The following functions combine forking 2 or more fibers followed by joining the results. The execution of the various fibers might be interleaved, however once the combining fiber has terminated, it is guaranteed that there are no fibers lingering around.

val fork_and_join : (unit -> 'a t) -> (unit -> 'b t) -> ('a * 'b) t

Start two fibers and wait for their result. Note that this function combines both successes and errors: if one of the computations fails, we let the other one run to completion, to give it a chance to raise its errors too. All other parallel execution combinators have the same error semantics.

val fork_and_join_unit : (unit -> unit t) -> (unit -> 'a t) -> 'a t

Same but assume the first fiber returns unit.

val parallel_map : 'a list -> f:('a -> 'b t) -> 'b list t

Map a list in parallel.

val all_concurrently : 'a t list -> 'a list t

Like all but executes the fibers concurrently.

val all_concurrently_unit : unit t list -> unit t

Like all_concurrently but is specialized for unit fibers. The advantage being that it doesn't allocate a return list.

val parallel_iter : 'a list -> f:('a -> unit t) -> unit t

Iter over a list in parallel.

val parallel_iter_set : (module Stdune.Set.S with type elt = 'a and type t = 's) -> 's -> f:('a -> unit t) -> unit t
val sequential_iter_seq : 'a Stdune.Seq.t -> f:('a -> unit t) -> unit t
val record_metrics : 'a t -> tag:string -> 'a t

Returns a fiber which wraps the given fiber with calls to Metrics.Timer.start/stop.

Note: This will measure the wall clock time between when the fiber starts and finishes, including any time that it is inactive

module Make_map_traversals (Map : Stdune.Map.S) : sig ... end

Provide efficient parallel iter/map functions for maps.

Local storage

module Var : sig ... end

Variables local to a fiber

Error handling

val with_error_handler : (unit -> 'a t) -> on_error:(Stdune.Exn_with_backtrace.t -> Stdune.Nothing.t t) -> 'a t

with_error_handler f ~on_error calls on_error for every exception raised during the execution of f. This include exceptions raised when calling f () or during the execution of fibers after f () has returned. Exceptions raised by on_error are passed on to the parent error handler.

It is guaranteed that after the fiber has returned a value, on_error will never be called.

val map_reduce_errors : (module Stdune.Monoid with type t = 'a) -> on_error:(Stdune.Exn_with_backtrace.t -> 'a t) -> (unit -> 'b t) -> ('b, 'a) Stdune.result t
val collect_errors : (unit -> 'a t) -> ('a, Stdune.Exn_with_backtrace.t list) Stdune.Result.t t

collect_errors f is: fold_errors f ~init:[] ~on_error:(fun e l -> e :: l)

val finalize : (unit -> 'a t) -> finally:(unit -> unit t) -> 'a t

finalize f ~finally runs finally after f () has terminated, whether it fails or succeeds.

val reraise_all : Stdune.Exn_with_backtrace.t list -> 'a t

reraise_all exns re-raises all exns to the current error handler

Synchronization

module Ivar : sig ... end

Write once variables

module Mvar : sig ... end

Mailbox variables

module Svar : sig ... end

State variables

module Mutex : sig ... end
module Throttle : sig ... end

Limit the number of jobs

val repeat_while : f:('a -> 'a option t) -> init:'a -> unit t
module Stream : sig ... end

Destructive streams that can be composed to pipelines.

module Pool : sig ... end

Running fibers

type fill =
  1. | Fill : 'a Ivar.t * 'a -> fill
val run : 'a t -> iter:(unit -> fill Stdune.Nonempty_list.t) -> 'a

run t ~iter runs a fiber until it terminates. iter is used to implement the scheduler, it should block waiting for an event and return at least one ivar to fill.

module Scheduler : sig ... end

Advanced fiber execution

module Expert : sig ... end

This module offers no safety protections. It is only needed for maximizing performance in certain situations

OCaml

Innovation. Community. Security.