package dolmen_loop

  1. Overview
  2. Docs

Source file pipeline.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239

(* This file is free software, part of dolmen. See file "LICENSE" for more information *)

exception Sigint
exception Out_of_time
exception Out_of_space

module Make(State : State.S) = struct

  (* GC alarm for time/space limits *)
  (* ************************************************************************ *)

  (* This function analyze the current size of the heap
     TODO: take into account the minor heap size
     TODO: should we only consider the live words ? *)
  let check size_limit = function () ->
    let heap_size = (Gc.quick_stat ()).Gc.heap_words in
    let s = float heap_size *. float Sys.word_size /. 8. in
    if s > size_limit then
      raise Out_of_space

  (* There are two kinds of limits we want to enforce:
     - a size limit: we use the Gc's alarm function to enforce the limit
       on the size of the RAM used
     - a time limit: the Gc alarm is not reliable to enforce this, so instead
       we use the Unix.timer facilities
     TODO: this does not work on windows.
     TODO: allow to use the time limit only for some passes *)
  let setup_alarm t s =
    if t <> infinity then
      ignore (Unix.setitimer Unix.ITIMER_REAL
                Unix.{it_value = t; it_interval = 0.01 });
    if s <> infinity then (Some (Gc.create_alarm (check s)))
    else None

  let delete_alarm alarm =
    (* it's alwyas safe to delete the timer here,
       even if none was present before. *)
    ignore (Unix.setitimer Unix.ITIMER_REAL
              Unix.{it_value = 0.; it_interval = 0. });
    match alarm with None -> () | Some alarm -> Gc.delete_alarm alarm

  (* The Unix.timer works by sending a Sys.sigalrm, so in order to use it,
     we catch it and raise the Out_of_time exception. *)
  let () =
    Sys.set_signal Sys.sigalrm (
      Sys.Signal_handle (fun _ ->
          raise Out_of_time)
    )

  (* We also want to catch user interruptions *)
  let () =
    Sys.set_signal Sys.sigint (
      Sys.Signal_handle (fun _ ->
          raise Sigint)
    )

  (* Pipeline and execution *)
  (* ************************************************************************ *)

  type 'st merge = 'st -> 'st -> 'st
  type ('a, 'b) cont = [ `Done of 'a | `Continue of 'b ]
  type ('st, 'a) fix = [ `Ok | `Gen of 'st merge * ('st -> 'st * 'a option) ]
  type 'st k_exn = { k : 'a. 'st -> Printexc.raw_backtrace -> exn -> 'a; }

  type ('st, 'a, 'b) op = {
    name : string;
    f : 'st -> 'a -> 'st * 'b;
  }

  (* Type for pipelines, i.e a series of transformations to
      apply to the input. An ('st, 'a, 'b) t is a pipeline that
      takes an input of type ['a] and returns a value of type
      ['b]. *)
  type (_, _, _) t =
    (* The end of the pipeline, the identity/reflexive constructor *)
    | End :
        ('st, 'a, 'a) t
    (* Apply a single function and then proceed with the rest of the pipeline *)
    | Map :
        ('st, 'a, 'c) op * ('st, 'c, 'b) t -> ('st, 'a, 'b) t
    (* Allow early exiting from the loop *)
    | Cont :
        ('st, 'a, ('b, 'c) cont) op * ('st, 'c, 'b) t -> ('st, 'a, 'b) t
    (* Concat two pipeline. Not tail recursive. *)
    | Concat :
        ('st, 'a, 'b) t * ('st, 'b, 'c) t -> ('st, 'a, 'c) t
    (* Fixpoint expansion *)
    | Fix :
        ('st, 'a, ('st, 'a) fix) op * ('st, 'a, unit) t -> ('st, 'a, unit) t

  (* Creating operators. *)

  let op ?(name="") f = { name; f; }

  let apply ?name f = op ?name (fun st x -> st, f x)

  let iter_ ?name f = op ?name (fun st x -> f x; st, x)

  let f_map ?name ?(test=(fun _ _ -> true)) f =
    op ?name (fun st x ->
        if test st x then begin
          let st', y = f st x in
          st', `Continue y
        end else
          st, `Done x
      )

  (* Creating pipelines. *)

  let _end = End
  let (@>>>) op t = Map(op, t)
  let (@>|>) op t = Cont(op, t)
  let (@|||) t t' = Concat (t, t')

  let fix op t = Fix(op, t)

  (* Eval an operator *)
  let eval_op ~exn op st x =
    try op.f st x
    with e ->
      let bt = Printexc.get_raw_backtrace () in
      exn.k st bt e

  (* Eval a pipeline into the corresponding function *)
  let rec eval : type st a b.
    exn:st k_exn -> (st, a, b) t -> st -> a -> st * b =
    fun ~exn pipe st x ->
    match pipe with
    | End -> st, x
    | Map (op, t) ->
      let st', y = eval_op ~exn op st x in
      eval ~exn t st' y
    | Cont (op, t) ->
      let st', y = eval_op ~exn op st x in
      begin match y with
        | `Continue res -> eval ~exn t st' res
        | `Done res -> st', res
      end
    | Concat (t, t') ->
      let st', y = eval ~exn t st x in
      eval ~exn t' st' y
    | Fix (op, t) ->
      let st', y = eval_op ~exn op st x in
      begin match y with
        | `Ok -> eval ~exn t st' x
        | `Gen (merge, g) ->
          let st'' = eval_gen_fold ~exn pipe st' g in
          let st''' = merge st st'' in
          st''', ()
      end

  and eval_gen_fold : type st a.
    exn: st k_exn -> (st, a, unit) t -> st -> (st -> st * a option) -> st =
    fun ~exn pipe st g ->
    match g st with
    | st, None -> st
    | st, Some x ->
      let st', () = eval ~exn pipe st x in
      eval_gen_fold ~exn pipe st' g
    | exception e ->
      let bt = Printexc.get_raw_backtrace () in
      exn.k st bt e

  (* Aux function to eval a pipeline on the current value of a generator. *)
  let run_aux ~exn pipe g st =
    match g st with
    | st, None -> `Done st
    | st, Some x -> `Continue (eval ~exn pipe st x)
    | exception e ->
      let bt = Printexc.get_raw_backtrace () in
      exn.k st bt e

  (* Effectively run a pipeline on all values that come from a generator.
     Time/size limits apply for the complete evaluation of each input
     (so all expanded values count toward the same limit). *)
  let rec run :
    type a.
    finally:(State.t -> (Printexc.raw_backtrace * exn) option -> State.t) ->
    (State.t -> State.t * a option) -> State.t -> (State.t, a, unit) t -> State.t
    = fun ~finally g st pipe ->
      let exception Exn of State.t * Printexc.raw_backtrace * exn in
      let time = State.get State.time_limit st in
      let size = State.get State.size_limit st in
      let al = setup_alarm time size in
      let exn = { k = fun st bt e ->
          (* delete alarm as soon as possible *)
          let () = delete_alarm al in
          (* go the the correct handler *)
          raise (Exn (st, bt, e));
        }
      in
      begin
        match run_aux ~exn pipe g st with

        (* End of the run, yay ! *)
        | `Done st ->
          let () = delete_alarm al in
          st

        (* Regular case, we finished running the pipeline on one input
           value, let's get to the next one. *)
        | `Continue (st', ()) ->
          let () = delete_alarm al in
          let st'' = try finally st' None with _ -> st' in
          run ~finally g st'' pipe

        (* "Normal" exception case: the exn was raised by an operator, and caught
           then re-raised by the {exn} cotinuation passed to run_aux *)
        | exception Exn (st, bt, e) ->
          (* delete alarm *)
          let () = delete_alarm al in
          (* Flush stdout and print a newline in case the exn was
             raised in the middle of printing *)
          Format.pp_print_flush Format.std_formatter ();
          Format.pp_print_flush Format.err_formatter ();
          (* Go on running the rest of the pipeline. *)
          let st' = finally st (Some (bt,e)) in
          run ~finally g st' pipe

        (* Exception case for exceptions, that can realisically occur for all
           asynchronous exceptions, or if some operator was not properly wrapped.
           In this error case, we might use a rather old and outdate state, but
           this should not happen often, and should not matter for asynchronous
           exceptions. *)
        | exception e ->
          let bt = Printexc.get_raw_backtrace () in
          (* delete alarm *)
          let () = delete_alarm al in
          (* Flush stdout and print a newline in case the exn was
             raised in the middle of printing *)
          Format.pp_print_flush Format.std_formatter ();
          Format.pp_print_flush Format.err_formatter ();
          (* Go on running the rest of the pipeline. *)
          let st' = finally st (Some (bt,e)) in
          run ~finally g st' pipe
      end

end