package eio

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file pool.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
(* A pool is a sequence of cells containing either available slots or consumers waiting for them.
   A slot may or may not contain an actual resource.

   To use a resource:

   1. Get the next "suspend" cell. If it contains a resource slot, use it.
   2. If no slot is ready and we're below capacity, create a new slot and add it (to the next resume cell).
   3. Either way, wait for the cell to be resumed with a slot.
   4. Once you have a slot, ensure it contains a resource, creating one if not.
   5. When done, add the slot back (in the next resume cell).
*)

(* Import these directly because we copy this file for the dscheck tests. *)
module Fiber_context = Eio__core.Private.Fiber_context
module Suspend = Eio__core.Private.Suspend

type 'a slot = 'a option ref

module Cell = struct
  (* The possible behaviours are:

     1. Suspender : In_transition -> Request            Suspender waits for a resource
        1.1. Resumer : Request -> Finished              Resumer then providers a resource
        1.2. Suspender : Request -> Finished            Suspender cancels
     2. Resumer : In_transition -> Resource             Resumer provides a spare resource
        2.1. Suspender : Resource -> Finished           Suspender doesn't need to wait
  *)

  type 'a t =
    | In_transition
    | Request of ('a slot -> unit)
    | Resource of 'a slot
    | Finished

  let init = In_transition

  let segment_order = 2

  let dump f = function
    | In_transition -> Fmt.string f "In_transition"
    | Request _ -> Fmt.string f "Request"
    | Resource _ -> Fmt.string f "Resource"
    | Finished -> Fmt.string f "Finished"
end

module Q = Cells.Make(Cell)

type 'a t = {
  slots : int Atomic.t;        (* Total resources, available and in use *)
  max_slots : int;
  alloc : unit -> 'a;
  validate : 'a -> bool;
  dispose : 'a -> unit;
  q : 'a Q.t;
}

let create ?(validate=Fun.const true) ?(dispose=ignore) max_size alloc =
  if max_size <= 0 then invalid_arg "Pool.create: max_size is <= 0";
  {
    slots = Atomic.make 0;
    max_slots = max_size;
    alloc;
    validate;
    dispose;
    q = Q.make ();
  }

(* [add t x] adds [x] to the queue of available slots. *)
let rec add t x =
  let cell = Q.next_resume t.q in
  let rec aux () =
    match Atomic.get cell with
    | In_transition -> if not (Atomic.compare_and_set cell In_transition (Resource x)) then aux ()
    | Finished -> add t x         (* The consumer cancelled. Get another cell and retry. *)
    | Request r as prev ->
      if Atomic.compare_and_set cell prev Finished then (
        r x               (* We had a consumer waiting. Give it to them. *)
      ) else add t x      (* Consumer cancelled; retry with another cell. *)
    | Resource _ -> assert false  (* Can't happen; only a resumer can set this, and we're the resumer. *)
  in
  aux ()

(* Try to cancel by transitioning from [Request] to [Finished].
   This can only be called after previously transitioning to [Request]. *)
let cancel segment cell =
  match Atomic.exchange cell Cell.Finished with
  | Request _ -> Q.cancel_cell segment; true
  | Finished -> false                                   (* Already resumed; reject cancellation *)
  | In_transition | Resource _ -> assert false          (* Can't get here from [Request]. *)

(* If [t] is under capacity, add another (empty) slot. *)
let rec maybe_add_slot t current =
  if current < t.max_slots then (
    if Atomic.compare_and_set t.slots current (current + 1) then add t (ref None)
    else maybe_add_slot t (Atomic.get t.slots)      (* Concurrent update; try again *)
  )

(* [run_with t f slot] ensures that [slot] contains a valid resource and then runs [f resource] with it.
   Afterwards, the slot is returned to [t]. *)
let run_with t f slot =
  match
    begin match !slot with
      | Some x when t.validate x -> f x
      | Some x ->
        slot := None;
        t.dispose x;
        let x = t.alloc () in
        slot := Some x;
        f x
      | None ->
        let x = t.alloc () in
        slot := Some x;
        f x
    end
  with
  | r ->
    add t slot;
    r
  | exception ex ->
    let bt = Printexc.get_raw_backtrace () in
    add t slot;
    Printexc.raise_with_backtrace ex bt

(* Creates a fresh resource [x], runs [f x], then disposes of [x] *)
let run_new_and_dispose t f =
  let x = t.alloc () in
  match f x with
  | r ->
    t.dispose x;
    r
  | exception ex ->
    let bt = Printexc.get_raw_backtrace () in
    t.dispose x;
    Printexc.raise_with_backtrace ex bt

let use t ?(never_block=false) f =
  let segment, cell = Q.next_suspend t.q in
  match Atomic.get cell with
  | Finished | Request _ -> assert false
  | Resource slot ->
    Atomic.set cell Finished;   (* Allow value to be GC'd *)
    run_with t f slot
  | In_transition ->
    let current = Atomic.get t.slots in
    match current < t.max_slots with
    | false when never_block -> (
      (* We are at capacity, but cannot block.
         Create a new resource to run f but don't add it to the pool. *)
      match Atomic.exchange cell Finished with
      | Resource slot -> run_with t f slot
      | _ -> run_new_and_dispose t f
    )
    | can_add ->
      (* Create a slot if not at capacity. *)
      if can_add then maybe_add_slot t current;
      (* No item is available right now. Start waiting *)
      let slot =
        Suspend.enter_unchecked "Pool.acquire" (fun ctx enqueue ->
            let r x = enqueue (Ok x) in
            if Atomic.compare_and_set cell In_transition (Request r) then (
              match Fiber_context.get_error ctx with
              | Some ex ->
                if cancel segment cell then enqueue (Error ex);
                (* else being resumed *)
              | None ->
                Fiber_context.set_cancel_fn ctx (fun ex ->
                    if cancel segment cell then enqueue (Error ex)
                    (* else being resumed *)
                  )
            ) else (
              match Atomic.exchange cell Finished with
              | Resource x -> enqueue (Ok x)
              | _ -> assert false
            );
          )
      in
      (* assert (Atomic.get cell = Finished); *)
      run_with t f slot
OCaml

Innovation. Community. Security.