Source file moonpool_dpool.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
module Bb_queue = struct
type 'a t = {
mutex: Mutex.t;
cond: Condition.t;
q: 'a Queue.t;
}
let create () : _ t =
{ mutex = Mutex.create (); cond = Condition.create (); q = Queue.create () }
let push (self : _ t) x : unit =
Mutex.lock self.mutex;
let was_empty = Queue.is_empty self.q in
Queue.push x self.q;
if was_empty then Condition.broadcast self.cond;
Mutex.unlock self.mutex
let pop (type a) (self : a t) : a =
let module M = struct
exception Found of a
end in
try
Mutex.lock self.mutex;
while true do
if Queue.is_empty self.q then
Condition.wait self.cond self.mutex
else (
let x = Queue.pop self.q in
Mutex.unlock self.mutex;
raise (M.Found x)
)
done;
assert false
with M.Found x -> x
end
module Lock = struct
type 'a t = {
mutex: Mutex.t;
mutable content: 'a;
}
let create content : _ t = { mutex = Mutex.create (); content }
let[@inline never] with_ (self : _ t) f =
Mutex.lock self.mutex;
match f self.content with
| x ->
Mutex.unlock self.mutex;
x
| exception e ->
Mutex.unlock self.mutex;
raise e
let[@inline] update_map l f =
with_ l (fun x ->
let x', y = f x in
l.content <- x';
y)
let get l =
Mutex.lock l.mutex;
let x = l.content in
Mutex.unlock l.mutex;
x
end
type domain = Domain_.t
type event =
| Run of (unit -> unit) (** Run this function *)
| Decr (** Decrease count *)
type worker_state = {
q: event Bb_queue.t;
th_count: int Atomic.t; (** Number of threads on this *)
}
(** Array of (optional) workers.
Workers are started/stop on demand. For each index we have the (currently
active) domain's state including a work queue and a thread refcount; and the
domain itself, if any, in a separate option because it might outlive its own
state. *)
let domains_ : (worker_state option * Domain_.t option) Lock.t array =
let n = max 1 (Domain_.recommended_number ()) in
Array.init n (fun _ -> Lock.create (None, None))
(** main work loop for a domain worker.
A domain worker does two things:
- run functions it's asked to (mainly, to start new threads inside it)
- decrease the refcount when one of these threads stops. The thread will
notify the domain that it's exiting, so the domain can know how many
threads are still using it. If all threads exit, the domain polls a bit
(in case new threads are created really shortly after, which happens with
a [Pool.with_] or [Pool.create() … Pool.shutdown()] in a tight loop), and
if nothing happens it tries to stop to free resources. *)
let work_ idx (st : worker_state) : unit =
Signals_.ignore_signals_ ();
let main_loop () =
let continue = ref true in
while !continue do
match Bb_queue.pop st.q with
| Run f -> (try f () with _ -> ())
| Decr ->
if Atomic.fetch_and_add st.th_count (-1) = 1 then (
continue := false;
try
for _n_attempt = 1 to 50 do
Thread.delay 0.001;
if Atomic.get st.th_count > 0 then (
continue := true;
raise Exit
)
done
with Exit -> ()
)
done
in
while
main_loop ();
let is_alive =
Lock.update_map domains_.(idx) (function
| None, _ -> assert false
| Some _st', dom ->
assert (st == _st');
if Atomic.get st.th_count > 0 then
(Some st, dom), true
else
(None, dom), false)
in
is_alive
do
()
done;
()
let () =
assert (Domain_.is_main_domain ());
let w = { th_count = Atomic.make 1; q = Bb_queue.create () } in
ignore (Thread.create (fun () -> work_ 0 w) () : Thread.t);
domains_.(0) <- Lock.create (Some w, None)
let[@inline] max_number_of_domains () : int = Array.length domains_
let run_on (i : int) (f : unit -> unit) : unit =
assert (i < Array.length domains_);
let w : worker_state =
Lock.update_map domains_.(i) (function
| (Some w, _) as st ->
Atomic.incr w.th_count;
st, w
| None, dying_dom ->
Option.iter Domain_.join dying_dom;
let w = { th_count = Atomic.make 1; q = Bb_queue.create () } in
let worker : domain = Domain_.spawn (fun () -> work_ i w) in
(Some w, Some worker), w)
in
Bb_queue.push w.q (Run f)
let decr_on (i : int) : unit =
assert (i < Array.length domains_);
match Lock.get domains_.(i) with
| Some st, _ -> Bb_queue.push st.q Decr
| None, _ -> ()
let run_on_and_wait (i : int) (f : unit -> 'a) : 'a =
let q = Bb_queue.create () in
run_on i (fun () ->
let x = f () in
Bb_queue.push q x);
Bb_queue.pop q