package fiber

  1. Overview
  2. Docs

Source file pool.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
open Stdune
open Core
open Core.O

type mvar =
  | Done
  | Task of (unit -> unit t)

type status =
  | Open
  | Closed

type t =
  { mvar : mvar Mvar.t
  ; mutable status : status
  }

let running t k =
  match t.status with
  | Open -> k true
  | Closed -> k false

let create () = { mvar = Mvar.create (); status = Open }

let task t ~f k =
  match t.status with
  | Closed ->
    Code_error.raise "pool is closed. new tasks may not be submitted" []
  | Open -> Mvar.write t.mvar (Task f) k

let stream t =
  Stream.In.create (fun () ->
      let+ next = Mvar.read t.mvar in
      match next with
      | Done -> None
      | Task task -> Some task)

let stop t k =
  match t.status with
  | Closed -> k ()
  | Open ->
    t.status <- Closed;
    Mvar.write t.mvar Done k

let run t = stream t |> Stream.In.parallel_iter ~f:(fun task -> task ())