package shared-block-ring

  1. Overview
  2. Docs

Source file journal.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
open Result
open Lwt

module Alarm(Time: S.TIME)(Clock: S.CLOCK) = struct
  type t = {
    mutable wake_up_at: int64;
    mutable thread: unit Lwt.t option;
    m: Lwt_mutex.t;
    c: unit Lwt_condition.t;
    mutable wake_up: bool;
  }

  let create () =
    let wake_up_at = Int64.max_int in
    let thread = None in
    let m = Lwt_mutex.create () in
    let c = Lwt_condition.create () in
    let wake_up = false in
    { wake_up_at; thread; m; c; wake_up}

  let rec next t =
    if t.wake_up then begin
      t.wake_up <- false;
      return ()
    end else begin
      Lwt_condition.wait t.c
      >>= fun () ->
      next t
    end

  let rec countdown t =
    let now = Clock.elapsed_ns () in
    let to_sleep_ns = Int64.sub t.wake_up_at now in
    if to_sleep_ns < 0L then begin
      t.thread <- None;
      t.wake_up <- true;
      t.wake_up_at <- Int64.max_int;
      Lwt_condition.signal t.c ();
      return ()
    end else begin
      Time.sleep_ns to_sleep_ns
      >>= fun () ->
      countdown t
    end

  let reset t for_how_long =
    assert (for_how_long >= 0L);
    let now = Clock.elapsed_ns () in
    let new_deadline = Int64.add now for_how_long in
    let old_deadline = t.wake_up_at in
    t.wake_up_at <- new_deadline;
    match t.thread with
    | None ->
      t.thread <- Some (countdown t)
    | Some thread ->
      if new_deadline < old_deadline then begin
        Lwt.cancel thread;
        t.thread <- Some (countdown t)
      end (* otherwise it'll keep sleeping *)
end

module Make
  (Log: S.LOG)
  (Block: S.BLOCK)
  (Time: S.TIME)
  (Clock: S.CLOCK)
  (Op: S.CSTRUCTABLE) = struct

  open Log

  module R = Ring.Make(Log)(Block)(Op)
  open R

  module Alarm = Alarm(Time)(Clock)

  type error = [ `Msg of string ]

  let pp_error fmt = function
    | `Msg x -> Format.pp_print_string fmt x
    [@coverage off]

  let error_to_msg = function
    | Ok x -> Ok x
    | Error (`Msg x) -> Error (`Msg x)
    [@coverage off]

  let open_error = function
    | Ok x -> Ok x
    | Error (`Msg x) -> Error (`Msg x)
    [@coverage off]

  type 'a result = ('a, error) Result.result [@coverage off]

  type waiter = {
    flush: unit -> unit;
    sync: unit -> unit Lwt.t
  }

  type t = {
    p: Producer.t;
    c: Consumer.t;
    filename: Block.t;
    cvar: unit Lwt_condition.t;
    mutable data_available: bool;
    mutable please_shutdown: bool;
    mutable shutdown_complete: bool;
    mutable consumed: Consumer.position option;
    perform: Op.t list -> (unit, error) Result.result Lwt.t;
    alarm: Alarm.t;
    m: Lwt_mutex.t;
    flush_interval: int64;
    retry_interval: int64;
    (* Internally handle Error `Retry by sleeping on the cvar.
       All other errors are fatal. *)
    bind: 'a 'b 'c 'd.
      (unit -> (('a, [< R.Consumer.error] as 'd) Result.result Lwt.t))
      -> ('a -> ('b, [> R.Consumer.error] as 'c) Result.result Lwt.t)
      -> ('b, [> R.Consumer.error] as 'c) Result.result Lwt.t
  }

  let perform t items () =
    Lwt.catch
      (fun () -> t.perform items)
      (fun e ->
         let msg = Printexc.to_string e in
         t.data_available <- true;
         Alarm.reset t.alarm t.retry_interval;
         return (Error (`Msg msg))
      ) >>= function
    | Ok x -> return (Ok x)
    | Error (`Msg x) ->
      error "Failed to process journal item: %s" x
      >>= fun () ->
      return (Error (`Msg x))

  let replay t () =
    let (>>|=) = t.bind in
    t.data_available <- false;
    Consumer.fold ~f:(fun x y -> x :: y) ~t:t.c ~init:[]
    >>|= fun (position, items) ->
    (* Note we want to apply the items in the original order *)
    let items = List.rev items in
    info "There are %d items in the journal to replay" (List.length items)
    >>= fun () ->
    perform t items
    >>|= fun () ->
    Consumer.advance ~t:t.c ~position
    >>|= fun () ->
    t.consumed <- Some position;
    (* wake up anyone stuck in a `Retry loop *)
    Lwt_condition.broadcast t.cvar ();
    return (Ok ())

  let start ?(name="unknown journal") ?(client="unknown") ?(flush_interval=0L) ?(retry_interval=(Duration.of_sec 5)) filename perform =
    let (>>|=) fn f = fn () >>= function
    | Error `Retry -> return (Error (`Msg "start: received `Retry"))
    | Error `Suspended -> return (Error (`Msg "start: received `Suspended"))
    | Error (`Msg m) -> return (Error (`Msg m))
    | Error x -> return (Error x)
    | Ok x -> f x in
    (* If the ring doesn't exist, create it *)
    ( fun () -> Consumer.attach ~queue:name ~client ~disk:filename ()
      >>= function
      | Error (`Msg _) ->
        Producer.create ~disk:filename
        >>|= fun () ->
        return (Ok ())
      | _ ->
        return (Ok ()) ) >>|= fun () ->
    Consumer.attach ~queue:name ~client ~disk:filename
    >>|= fun c ->
    Producer.attach ~queue:name ~client ~disk:filename
    >>|= fun p ->
    let please_shutdown = false in
    let shutdown_complete = false in
    let cvar = Lwt_condition.create () in
    let consumed = None in
    let m = Lwt_mutex.create () in
    let data_available = true in
    let alarm = Alarm.create () in
    let rec bind fn f = fn () >>= function
      | Error `Suspended -> return (Error (`Msg "Ring is suspended"))
      | Error (`Msg x) -> return (Error (`Msg x))
      | Error `Retry ->
        (* If we're out of space then allow the journal to replay
           immediately. *)
        Alarm.reset alarm 0L;
        Lwt_condition.wait cvar
        >>= fun () ->
        bind fn f
      | Ok x -> f x in
    let t = { p; c; filename; please_shutdown; shutdown_complete; cvar;
              consumed; perform; m; data_available; bind; alarm;
              flush_interval; retry_interval } in
    replay t ()
    >>= fun _ ->
    (* Run a background thread processing items from the journal *)
    let (_: (unit, R.Consumer.error) Result.result Lwt.t) =
      let rec forever () =
        ( if t.data_available || t.please_shutdown
          then return ()
          else Lwt_condition.wait t.cvar )
        >>= fun () ->
        if t.please_shutdown then begin
          t.shutdown_complete <- true;
          Lwt_condition.broadcast t.cvar ();
          return (Ok ())
        end else begin
          (* This allows us to wait for batching *)
          Alarm.next alarm
          >>= fun () ->
          (* If we fail to process an item, we can't make progress
             but we can keep trying and keep responding to shutdown
             requests. *)
          replay t ()
          >>= fun _ ->
          forever ()
        end in
      forever () in
    return (Ok t)
  let start ?name ?client ?flush_interval ?retry_interval filename perform =
    start ?name ?client ?flush_interval ?retry_interval filename perform
    >>= fun x ->
    return (R.Consumer.error_to_msg x)

  let shutdown t =
    t.please_shutdown <- true;
    Lwt_condition.broadcast t.cvar ();
    let rec loop () =
      if t.shutdown_complete
      then return ()
      else
        Lwt_condition.wait t.cvar
        >>= fun () ->
        loop () in
    loop ()
    >>= fun () ->
    Consumer.detach t.c

  let push t item =
    let (>>|=) = t.bind in
    if t.please_shutdown
    then return (Error (`Msg "journal shutdown in progress"))
    else begin
      Producer.push ~t:t.p ~item
      >>|= fun position ->
      Producer.advance ~t:t.p ~position
      >>|= fun () ->
      t.data_available <- true;
      Lwt_condition.broadcast t.cvar ();
      (* If the ring is becoming full, we want to flush.
         Otherwise we reset the alarm timer. *)
      Alarm.reset t.alarm t.flush_interval;
      (* Some clients want to know when the item has been processed
         i.e. when the consumer is > position *)
      let has_consumed () = match t.consumed with
        | None -> false
        | Some c ->
          begin match Consumer.compare c position with
          | `GreaterThan | `Equal -> true
          | `LessThan -> false
          end in
        let rec sync () =
          if has_consumed ()
          then return ()
          else
            Lwt_condition.wait t.cvar
            >>= fun () ->
            sync () in
        let flush () = Alarm.reset t.alarm 0L in
        let waiter = { sync; flush } in
        return (Ok waiter)
    end
  let push t op =
    Lwt_mutex.with_lock t.m (fun () ->
      push t op
      >>= fun ret ->
      return (R.Consumer.error_to_msg ret)
    )
end