Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
pool.ml1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45open Stdune open Core open Core.O type mvar = | Done | Task of (unit -> unit t) type status = | Open | Closed type t = { mvar : mvar Mvar.t ; mutable status : status } let running t k = match t.status with | Open -> k true | Closed -> k false let create () = { mvar = Mvar.create (); status = Open } let task t ~f k = match t.status with | Closed -> Code_error.raise "pool is closed. new tasks may not be submitted" [] | Open -> Mvar.write t.mvar (Task f) k let stream t = Stream.In.create (fun () -> let+ next = Mvar.read t.mvar in match next with | Done -> None | Task task -> Some task) let stop t k = match t.status with | Closed -> k () | Open -> t.status <- Closed; Mvar.write t.mvar Done k let run t = stream t |> Stream.In.parallel_iter ~f:(fun task -> task ())