package fuseau

  1. Overview
  2. Docs

Source file chan.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
type waker = unit -> unit

let[@inline] suspend ~before_suspend =
  Effect.perform @@ Effects.Suspend { before_suspend }

type 'a t = {
  max_size: int;
  q: 'a Queue.t;
  receivers: waker Queue.t;
  senders: waker Queue.t;
  mutable closed: bool;
}

let create ?(max_size = max_int) () : _ t =
  {
    max_size;
    q = Queue.create ();
    receivers = Queue.create ();
    senders = Queue.create ();
    closed = false;
  }

exception Closed

let[@inline] size self = Queue.length self.q
let[@inline] is_empty self = Queue.is_empty self.q
let[@inline] is_full_ self : bool = Queue.length self.q >= self.max_size

let close self : unit =
  self.closed <- true;
  (* wakeup everyone *)
  Queue.iter (fun f -> f ()) self.senders;
  Queue.iter (fun f -> f ()) self.receivers;
  ()

let wakeup_receivers self =
  while not (Queue.is_empty self.receivers) do
    let r = Queue.pop self.receivers in
    r ()
  done

let send self x : unit =
  let continue = ref true in
  while !continue do
    if self.closed then raise Closed;

    if is_full_ self then
      suspend ~before_suspend:(fun ~wakeup -> Queue.push wakeup self.senders)
    else (
      continue := false;
      Queue.push x self.q;
      wakeup_receivers self
    )
  done

let try_send self x : bool =
  if self.closed then raise Closed;
  if is_full_ self then
    false
  else (
    Queue.push x self.q;
    wakeup_receivers self;
    true
  )

let on_send_ready self cb =
  if is_full_ self then
    Queue.push cb self.senders
  else
    cb ()

let wakeup_senders self =
  while not (Queue.is_empty self.senders) do
    let w = Queue.pop self.senders in
    w ()
  done

let rec receive_exn (self : 'a t) : 'a =
  match Queue.pop self.q with
  | x ->
    wakeup_senders self;
    x
  | exception Queue.Empty ->
    if self.closed then raise Closed;

    suspend ~before_suspend:(fun ~wakeup -> Queue.push wakeup self.receivers);
    receive_exn self

let receive (self : 'a t) : 'a option =
  try Some (receive_exn self) with Closed -> None

let try_receive (self : 'a t) : 'a option =
  if Queue.is_empty self.q then (
    if self.closed then raise Closed;
    None
  ) else (
    let x = Queue.pop self.q in
    wakeup_senders self;
    Some x
  )

let on_receive_ready (self : _ t) cb : unit =
  if Queue.is_empty self.q then
    Queue.push cb self.receivers
  else
    cb ()

let ev_send c x : unit Event.t =
  let poll () =
    if try_send c x then
      Some (Ok ())
    else
      None
  in
  let wait cb =
    on_send_ready c cb;
    Cancel_handle.dummy
  in
  { poll; wait }

let ev_receive (self : 'a t) : 'a Event.t =
  let poll () =
    match try_receive self with
    | Some x -> Some (Ok x)
    | None -> None
    | exception (Closed as exn) ->
      let ebt = Exn_bt.get exn in
      Some (Error ebt)
  in
  let wait cb =
    on_receive_ready self cb;
    Cancel_handle.dummy
  in
  { poll; wait }