package moonpool

  1. Overview
  2. Docs

Source file worker_loop_.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
open Types_

type fiber = Picos.Fiber.t

type task_full =
  | T_start of {
      fiber: fiber;
      f: unit -> unit;
    }
  | T_resume : {
      fiber: fiber;
      k: unit -> unit;
    }
      -> task_full

exception No_more_tasks

type 'st ops = {
  schedule: 'st -> task_full -> unit;
  get_next_task: 'st -> task_full;  (** @raise No_more_tasks *)
  on_exn: 'st -> Exn_bt.t -> unit;
  runner: 'st -> Runner.t;
  before_start: 'st -> unit;
  cleanup: 'st -> unit;
}

(** A dummy task. *)
let _dummy_task : task_full = T_start { f = ignore; fiber = _dummy_fiber }

let[@inline] discontinue k exn =
  let bt = Printexc.get_raw_backtrace () in
  Effect.Deep.discontinue_with_backtrace k exn bt

let[@inline] raise_with_bt exn =
  let bt = Printexc.get_raw_backtrace () in
  Printexc.raise_with_backtrace exn bt

let with_handler (type st) ~(ops : st ops) (self : st) : (unit -> unit) -> unit
    =
  let current =
    Some
      (fun k ->
        match get_current_fiber_exn () with
        | fiber -> Effect.Deep.continue k fiber
        | exception exn -> discontinue k exn)
  and yield =
    Some
      (fun k ->
        let fiber = get_current_fiber_exn () in
        match
          let k () = Effect.Deep.continue k () in
          ops.schedule self @@ T_resume { fiber; k }
        with
        | () -> ()
        | exception exn -> discontinue k exn)
  and reschedule trigger fiber k : unit =
    ignore (Picos.Fiber.unsuspend fiber trigger : bool);
    let k () = Picos.Fiber.resume fiber k in
    let task = T_resume { fiber; k } in
    ops.schedule self task
  in
  let effc (type a) :
      a Effect.t -> ((a, _) Effect.Deep.continuation -> _) option = function
    | Picos.Fiber.Current -> current
    | Picos.Fiber.Yield -> yield
    | Picos.Fiber.Spawn r ->
      Some
        (fun k ->
          match
            let f () = r.main r.fiber in
            let task = T_start { fiber = r.fiber; f } in
            ops.schedule self task
          with
          | unit -> Effect.Deep.continue k unit
          | exception exn -> discontinue k exn)
    | Picos.Trigger.Await trigger ->
      Some
        (fun k ->
          let fiber = get_current_fiber_exn () in
          (* when triggers is signaled, reschedule task *)
          if not (Picos.Fiber.try_suspend fiber trigger fiber k reschedule) then
            (* trigger was already signaled, reschedule task now *)
            reschedule trigger fiber k)
    | Picos.Computation.Cancel_after _r ->
      Some
        (fun k ->
          (* not implemented *)
          let exn = Failure "Moonpool: cancel_after is not supported." in
          discontinue k exn)
    | _ -> None
  in
  let handler = Effect.Deep.{ retc = Fun.id; exnc = raise_with_bt; effc } in
  fun f -> Effect.Deep.match_with f () handler

module type FINE_GRAINED_ARGS = sig
  type st

  val ops : st ops
  val st : st
end

module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
  open Args

  let cur_fiber : fiber ref = ref _dummy_fiber
  let runner = ops.runner st

  type state =
    | New
    | Ready
    | Torn_down

  let state = ref New

  let run_task (task : task_full) : unit =
    let fiber =
      match task with
      | T_start { fiber; _ } | T_resume { fiber; _ } -> fiber
    in

    cur_fiber := fiber;
    TLS.set k_cur_fiber fiber;

    (* let _ctx = before_task runner in *)

    (* run the task now, catching errors, handling effects *)
    assert (task != _dummy_task);
    (try
       match task with
       | T_start { fiber = _; f } -> with_handler ~ops st f
       | T_resume { fiber = _; k } ->
         (* this is already in an effect handler *)
         k ()
     with e ->
       let bt = Printexc.get_raw_backtrace () in
       let ebt = Exn_bt.make e bt in
       ops.on_exn st ebt);

    (* after_task runner _ctx; *)
    cur_fiber := _dummy_fiber;
    TLS.set k_cur_fiber _dummy_fiber

  let setup ~block_signals () : unit =
    if !state <> New then invalid_arg "worker_loop.setup: not a new instance";
    state := Ready;

    if block_signals then Signals_.ignore_signals_ ();

    TLS.set Runner.For_runner_implementors.k_cur_runner runner;

    ops.before_start st

  let run ?(max_tasks = max_int) () : unit =
    if !state <> Ready then invalid_arg "worker_loop.run: not setup";

    let continue = ref true in
    let n_tasks = ref 0 in
    while !continue && !n_tasks < max_tasks do
      match ops.get_next_task st with
      | task ->
        incr n_tasks;
        run_task task
      | exception No_more_tasks -> continue := false
    done

  let teardown () =
    if !state <> Torn_down then (
      state := Torn_down;
      cur_fiber := _dummy_fiber;
      ops.cleanup st
    )
end

let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit =
  let module FG =
    Fine_grained
      (struct
        type nonrec st = st

        let ops = ops
        let st = self
      end)
      ()
  in
  FG.setup ~block_signals ();
  try
    FG.run ();
    FG.teardown ()
  with exn ->
    let bt = Printexc.get_raw_backtrace () in
    FG.teardown ();
    Printexc.raise_with_backtrace exn bt