package sihl-facade

  1. Overview
  2. Docs

Source file queue.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
open Sihl_contract.Queue
open Sihl_core.Container

let to_sexp { name; max_tries; retry_delay; _ } =
  let open Sexplib0.Sexp_conv in
  let open Sexplib0.Sexp in
  List
    [ List [ Atom "name"; sexp_of_string name ]
    ; List [ Atom "max_tries"; sexp_of_int max_tries ]
    ; List
        [ Atom "retry_delay"
        ; sexp_of_string (Sihl_core.Time.show_duration retry_delay)
        ]
    ]
;;

let pp fmt t = Sexplib0.Sexp.pp_hum fmt (to_sexp t)
let default_tries = 5
let default_retry_delay = Sihl_core.Time.OneMinute

let create ~name ~input_to_string ~string_to_input ~handle ?failed () =
  let failed =
    failed |> Option.value ~default:(fun _ -> Lwt_result.return ())
  in
  { name
  ; input_to_string
  ; string_to_input
  ; handle
  ; failed
  ; max_tries = default_tries
  ; retry_delay = default_retry_delay
  }
;;

let set_max_tries max_tries job = { job with max_tries }
let set_retry_delay retry_delay job = { job with retry_delay }

(* Service *)
let instance : (module Sig) option ref = ref None

let dispatch job ?delay input =
  let module Service = (val unpack name instance : Sig) in
  Service.dispatch job ?delay input
;;

let register_jobs jobs =
  let module Service = (val unpack name instance : Sig) in
  Service.register_jobs jobs
;;

let lifecycle () =
  let module Service = (val unpack name instance : Sig) in
  Service.lifecycle
;;

let register ?(jobs = []) implementation =
  let module Service = (val implementation : Sig) in
  instance := Some implementation;
  Service.register ~jobs ()
;;