package moonpool

  1. Overview
  2. Docs

Source file chan.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
module A = Atomic_

type 'a or_error = 'a Fut.or_error
type 'a waiter = 'a Fut.promise

let[@inline] list_is_empty_ = function
  | [] -> true
  | _ :: _ -> false

(** Simple functional queue *)
module Q : sig
  type 'a t

  val return : 'a -> 'a t
  val is_empty : _ t -> bool

  exception Empty

  val pop_exn : 'a t -> 'a * 'a t
  val push : 'a t -> 'a -> 'a t
  val iter : ('a -> unit) -> 'a t -> unit
end = struct
  type 'a t = {
    hd: 'a list;
    tl: 'a list;
  }
  (** Queue containing elements of type 'a.

      invariant: if hd=[], then tl=[] *)

  let[@inline] return x : _ t = { hd = [ x ]; tl = [] }

  let[@inline] make_ hd tl =
    match hd with
    | [] -> { hd = List.rev tl; tl = [] }
    | _ :: _ -> { hd; tl }

  let[@inline] is_empty self = list_is_empty_ self.hd
  let[@inline] push self x : _ t = make_ self.hd (x :: self.tl)

  let iter f (self : _ t) : unit =
    List.iter f self.hd;
    List.iter f self.tl

  exception Empty

  let pop_exn self =
    match self.hd with
    | [] ->
      assert (list_is_empty_ self.tl);
      raise Empty
    | x :: hd' ->
      let self' = make_ hd' self.tl in
      x, self'
end

exception Closed

type 'a state =
  | Empty
  | St_closed
  | Elems of 'a Q.t
  | Waiters of 'a waiter Q.t

type 'a t = { st: 'a state A.t } [@@unboxed]

let create () : _ t = { st = A.make Empty }

(** Produce a state from a queue of waiters *)
let[@inline] mk_st_waiters_ ws : _ state =
  if Q.is_empty ws then
    Empty
  else
    Waiters ws

(** Produce a state from a queue of elements *)
let[@inline] mk_st_elems_ q : _ state =
  if Q.is_empty q then
    Empty
  else
    Elems q

let push (self : _ t) x : unit =
  while
    let old_st = A.get self.st in
    match old_st with
    | St_closed -> raise Closed
    | Empty -> not (A.compare_and_set self.st old_st (Elems (Q.return x)))
    | Waiters ws ->
      (* awake first waiter and give it [x] *)
      let w, ws' = Q.pop_exn ws in
      let new_st = mk_st_waiters_ ws' in
      if A.compare_and_set self.st old_st new_st then (
        Fut.fulfill w (Ok x);
        false
      ) else
        true
    | Elems q -> not (A.compare_and_set self.st old_st (Elems (Q.push q x)))
  do
    Domain_.relax ()
  done

let try_pop (type elt) self : elt option =
  let module M = struct
    exception Found of elt
  end in
  try
    (* a bit of spinning *)
    for _retry = 1 to 10 do
      let old_st = A.get self.st in
      match old_st with
      | Elems q ->
        let x, q' = Q.pop_exn q in
        let new_st = mk_st_elems_ q' in
        if A.compare_and_set self.st old_st new_st then
          raise_notrace (M.Found x)
        else
          Domain_.relax ()
      | _ -> raise_notrace Exit
    done;
    None
  with
  | M.Found x -> Some x
  | Exit -> None

let pop (type elt) (self : _ t) : elt Fut.t =
  let module M = struct
    exception Ret of elt
    exception Fut of elt Fut.t
  end in
  try
    while
      let old_st = A.get self.st in
      (match old_st with
      | St_closed ->
        let bt = Printexc.get_callstack 10 in
        raise_notrace (M.Fut (Fut.fail Closed bt))
      | Elems q ->
        let x, q' = Q.pop_exn q in
        let new_st = mk_st_elems_ q' in
        if A.compare_and_set self.st old_st new_st then raise_notrace (M.Ret x)
      | Empty ->
        let fut, promise = Fut.make () in
        let new_st = Waiters (Q.return promise) in
        if A.compare_and_set self.st old_st new_st then
          raise_notrace (M.Fut fut)
      | Waiters ws ->
        let fut, promise = Fut.make () in
        (* add new promise at the end of the queue of waiters *)
        let new_st = Waiters (Q.push ws promise) in
        if A.compare_and_set self.st old_st new_st then
          raise_notrace (M.Fut fut));
      true
    do
      Domain_.relax ()
    done;
    (* never reached *)
    assert false
  with
  | M.Ret x -> Fut.return x
  | M.Fut f -> f

let pop_block_exn (self : 'a t) : 'a =
  match try_pop self with
  | Some x -> x
  | None -> Fut.wait_block_exn @@ pop self

let close (self : _ t) : unit =
  while
    let old_st = A.get self.st in
    match old_st with
    | St_closed -> false (* exit *)
    | Elems _ | Empty -> not (A.compare_and_set self.st old_st St_closed)
    | Waiters ws ->
      if A.compare_and_set self.st old_st St_closed then (
        (* fail all waiters with [Closed]. *)
        let bt = Printexc.get_callstack 10 in
        Q.iter (fun w -> Fut.fulfill_idempotent w (Error (Closed, bt))) ws;
        false
      ) else
        true
  do
    Domain_.relax ()
  done

[@@@ifge 5.0]

let pop_await self =
  match try_pop self with
  | Some x -> x
  | None -> Fut.await @@ pop self

[@@@endif]