package sel

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file sel.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
(**************************************************************************)
(*                                                                        *)
(*                                 SEL                                    *)
(*                                                                        *)
(*                   Copyright INRIA and contributors                     *)
(*       (see version control and README file for authors & dates)        *)
(*                                                                        *)
(**************************************************************************)
(*                                                                        *)
(*   This file is distributed under the terms of the MIT License.         *)
(*   See LICENSE file.                                                    *)
(*                                                                        *)
(**************************************************************************)

type 'a res = ('a,exn) result

(* Events in progress *)
type _ in_progress =
  | Line : Bytes.t * Buffer.t -> string in_progress
  | Bytes : int * Bytes.t -> Bytes.t in_progress

(* Reified function composition *)
type _ fcomp =
  | FNil : 'a -> 'a fcomp
  | FCons : 'a in_progress * ('a res -> 'b fcomp) -> 'b fcomp

let rec map_fcomp f = function
  | FNil x -> FNil (f x)
  | FCons(pending,g) -> FCons(pending,(fun x -> map_fcomp f (g x)))

let finish_with (k : 'a res -> 'b) x = FNil(k x)
let (--) = fun x xs -> FCons(x,xs)
(* locally one does let (--?) x y = err k x y in  as a cons with error handling *)
let err : type a b c. (c res -> b) -> a in_progress -> (a -> b fcomp) -> b fcomp =
  fun k x xs ->
    let xs = function
      | Ok v -> xs v
      | Error e -> FNil(k (Error e)) in
    FCons(x,xs)

(* Read events can be composed of many read operations *)
type 'a system_event =
  | ReadInProgress of Unix.file_descr * 'a fcomp
  | WaitProcess of int * (Unix.process_status -> 'a)
let pp_system_event _ fmt = function
  | ReadInProgress(_,_) -> Stdlib.Format.fprintf fmt "ReadInProgress"
  | WaitProcess(pid,_) -> Stdlib.Format.fprintf fmt "WaitProcess %d" pid

type 'a queue_event =
  | WaitQueue1 : 'b Queue.t * ('b -> 'a) -> 'a queue_event
  | WaitQueue2 : 'b Queue.t * 'c Queue.t * ('b -> 'c -> 'a) -> 'a queue_event

let pp_queue_event _ fmt = function
  | WaitQueue1 _ -> Stdlib.Format.fprintf fmt "WaitQueue1"
  | WaitQueue2 _ -> Stdlib.Format.fprintf fmt "WaitQueue2"

type 'a task_event = 'a [@@deriving show]

module Event = struct

type cancellation_handle = bool ref [@@deriving show]

type ('a,'b) with_attributes = {
  name : string option ;
  recurring : 'a;
  priority : int;
  it : 'b;
  cancelled : cancellation_handle;
}
[@@deriving show]

let cmp_priority { priority = t1; _} { priority = t2; _} = t1 - t2

let make_event it =
  let cancelled = ref false in
  { name = None; recurring = false; priority = 0; cancelled; it }

type 'a event_ =
  | SystemEvent of 'a system_event
  | QueueEvent of 'a queue_event
  | Task of 'a task_event
  [@@deriving show]

let map_system_event f = function
  | WaitProcess(pid,k) -> WaitProcess(pid, (fun x -> f (k x)))
  | ReadInProgress(fd,fcomp) -> ReadInProgress(fd,map_fcomp f fcomp)
  
let map_queue_event f = function
  | WaitQueue1(q1,k) -> WaitQueue1(q1,(fun x -> f (k x)))
  | WaitQueue2(q1,q2,k) -> WaitQueue2(q1,q2,(fun x y -> f (k x y)))

let map_task_event f x = f x

let map_with_attributes f { name; recurring; priority; cancelled; it } =
  { name; recurring; priority; cancelled; it = f it }

let map f = function
  | Task e -> Task (map_task_event f e)
  | SystemEvent e -> SystemEvent (map_system_event f e)
  | QueueEvent e -> QueueEvent(map_queue_event f e)

      
type 'a t = (bool,'a event_) with_attributes
[@@deriving show]
  
let map f e = map_with_attributes (map f) e

let name name it = { it with name = Some name }
let recurring x = { x with recurring = true }
let is_recurring { recurring; _ } = recurring <> None
let at_priority priority x = { x with priority }

let get_cancellation_handle { cancelled; _ } = cancelled
let cancel x = x := true

end

open Event

type ('a,'b) has_finished =
  | Yes of 'a
  | No  of 'b (* can make_event a step *)

let mkReadInProgress fd = function
  | FCons _ as f -> No (ReadInProgress(fd,f))
  | FNil x -> Yes x

let one_line () = Line (Bytes.make 1 '0', Buffer.create 40)
let some_bytes n ?(buff=Bytes.create n) () = Bytes(n,buff)

module On = struct

  type 'a res = ('a,exn) result

let line fd k : 'a Event.t =
  make_event @@ SystemEvent (ReadInProgress(fd, one_line () -- finish_with k))

let bytes fd n k : 'a Event.t =
  make_event @@ SystemEvent (ReadInProgress(fd, some_bytes n () -- finish_with k))

let death_of ~pid k : 'a Event.t =
  make_event @@ SystemEvent (WaitProcess(pid,k))

let an_ocaml_value (k : 'a res -> 'b) : 'b fcomp =
  let (--?) x y = err k x y in
  some_bytes Marshal.header_size ()
  --? (fun buff -> let data_size = Marshal.data_size buff 0 in
  some_bytes data_size ~buff:(Bytes.extend buff 0 data_size) ()
  --? (fun buff -> let value = Marshal.from_bytes buff 0 in
  finish_with k (Ok value)))
;;

let ocaml_value fd k : 'a Event.t =
  make_event @@ SystemEvent (ReadInProgress(fd, an_ocaml_value k))

let parse_content_length_or err k s =
  try
    let input = Scanf.Scanning.from_string (String.uppercase_ascii s) in
    Scanf.bscanf input "CONTENT-LENGTH: %d" k
  with
    (Scanf.Scan_failure _ | Failure _ | End_of_file | Invalid_argument _) as e ->
      err (Error e)

let an_httpcle (k : Bytes.t res -> 'b) : 'b fcomp  =
  let (--?) x y = err k x y in
  one_line ()
  --? (parse_content_length_or (finish_with k) (fun length ->
  one_line ()
  --? (fun _discard ->
  some_bytes length ()
  --? (fun buff ->
  finish_with k (Ok buff)))))

let httpcle fd k : 'a Event.t =
  make_event @@ SystemEvent (ReadInProgress(fd, an_httpcle k))

let queue q1 k : 'a Event.t =
  make_event @@ QueueEvent (WaitQueue1(q1,k))

let queues q1 q2 k : 'a Event.t =
  make_event @@ QueueEvent (WaitQueue2(q1,q2,k))

end

let now task : 'a Event.t =
  make_event @@ Task task

let cons_recurring e l =
  match e.recurring with
  | None -> l
  | Some x -> { e with it = x } :: l

let rec pull_ready f yes no min_priority = function
  | [] -> List.rev yes, List.rev no, min_priority
  | { it; cancelled; _ } as e :: rest ->
      match f cancelled it with
      | Yes y -> pull_ready f ({ e with it = y } :: yes) (cons_recurring e no) (min min_priority e.priority) rest
      | No x  -> pull_ready f yes ({ e with it = x } :: no) min_priority rest 

let pull_ready f l = pull_ready f [] [] max_int l

let cast_to_recurring : (bool, 'a) with_attributes -> 'b -> ('b option, 'b) with_attributes =
  fun e it ->
  { it;
    recurring = if e.recurring then Some it else None;
    name = e.name;
    priority = e.priority;
    cancelled = e.cancelled;
  }
let cast_to_not_recurring : ('b, 'a) with_attributes -> ('c option, 'a) with_attributes =
  fun e ->
  { it = e.it;
    recurring = None;
    name = e.name;
    priority = e.priority;
    cancelled = e.cancelled;
  }
  
let rec partition_events sys queue task = function
  | [] -> List.rev sys, List.rev queue, List.rev task
  | { it = SystemEvent x; _ } as e :: rest -> partition_events    (cast_to_recurring e x :: sys) queue task rest
  | { it = QueueEvent x; _ } as e :: rest -> partition_events sys (cast_to_recurring e x :: queue) task rest
  | { it = Task x; _ } as e :: rest -> partition_events sys queue (cast_to_recurring e x :: task) rest

let partition_events l = partition_events [] [] [] l

let advance_queue _ = function
  | WaitQueue1(q1,_) as x when Queue.is_empty q1 ->  No x
  | WaitQueue1(q1,k) -> Yes(k (Queue.pop q1))
  | WaitQueue2(q1,q2,_) as x when Queue.is_empty q1 || Queue.is_empty q2 -> No x
  | WaitQueue2(q1,q2,k) -> Yes(k (Queue.pop q1) (Queue.pop q2))

let advance_system ~ready_fds _ = function
  | WaitProcess(pid,k) as x ->
      let rc, code = Unix.waitpid [Unix.WNOHANG] pid in
      if rc == 0 then No x
      else Yes (k code)
  | ReadInProgress(_, FNil _) -> assert false
  | ReadInProgress(fd,_) as x when not (List.mem fd ready_fds) -> No x
  | ReadInProgress(fd, FCons(Line (buff,acc) as line,rest)) -> begin try
      let n = Unix.read fd buff 0 1 in
      if n = 0 then begin
        Buffer.clear acc;
        mkReadInProgress fd (rest (Error End_of_file))
      end else
        let c = Bytes.get buff 0 in
        if c != '\n' then begin
          Buffer.add_char acc c; 
          mkReadInProgress fd (FCons(line,rest))
        end else begin
          let one_line = Buffer.contents acc in
          Buffer.clear acc;
          mkReadInProgress fd (rest (Ok one_line))
        end
      with Unix.Unix_error _ as e ->
        Buffer.clear acc;
        mkReadInProgress fd (rest (Error e))
      end
  | ReadInProgress(fd,FCons(Bytes(n,buff),rest)) -> begin try
      let m = Unix.read fd buff (Bytes.length buff - n) n in
      if m = 0 then
        mkReadInProgress fd (rest (Error End_of_file))
      else
        if m != n then
          mkReadInProgress fd (FCons(Bytes(n-m,buff),rest))
        else
          mkReadInProgress fd (rest (Ok buff))
      with Unix.Unix_error _ as e -> mkReadInProgress fd (rest (Error e))
      end

let rec map_filter f = function
  | [] -> []
  | x :: xs ->
      match f x with
      | None -> map_filter f xs
      | Some x -> x :: map_filter f xs

(* For fairness reasons, even if there are immediately ready events we
   give a shot to system events with 0 wait, otherwise we wait until a
   system event is ready. We never sleep forever, since process death events
   do not wakeup select: we anyway wake up 10 times per second *)
let check_for_system_events l =
  let fds = map_filter (function { it = ReadInProgress(fd,_); _ } -> Some fd | _ -> None) l in
  let ready_fds, _, _ = Unix.select fds [] [] 0.0 in
  let ready, waiting, min_prio  = pull_ready (advance_system ~ready_fds) l in
  ready, waiting, min_prio

let check_for_queue_events l =
  pull_ready advance_queue l

let next_deadline delta = Unix.gettimeofday() +. delta

module Sorted : sig

  type 'a list
  type 'a list_ =
    | Nil
    | Cons of 'a * int * 'a list

  val look : 'a list -> 'a list_
  val cons : 'a -> int -> 'a list -> 'a list
  val cons_opt : ('a * int) option -> 'a list -> 'a list
  val nil : 'a list
  val length : 'a list -> int
  val filter : ('a -> bool) -> 'a list -> 'a list
  val for_all : ('a -> bool) -> 'a list -> bool
  val append : 'a list -> 'a list -> 'a list
  val concat3 : ('a -> int) -> 'a List.t -> 'a List.t -> 'a list -> 'a list
  val sort : ('a -> int) -> 'a List.t -> 'a list
  val pp_list : (Format.formatter -> 'a -> unit) -> Format.formatter -> 'a list -> unit

end = struct
  module L = struct type 'a t = 'a list [@@deriving show] end

  type 'a list = ('a * int) L.t
  [@@deriving show]

  type 'a list_ =
    | Nil
    | Cons of 'a * int * ('a * int) L.t

  let nil = []

  let length = List.length

  let on_fst f = (); fun (x,_) -> f x

  let filter f l = List.filter (on_fst f) l

  let for_all f l = List.for_all (on_fst f) l

  let look = function
    | [] -> Nil
    | (x,p) :: xs -> Cons(x,p,xs)

  let rec cons x p = function
    | [] -> [x,p]
    | (_,q) :: _ as l when p < q -> (x,p) :: l
    | (y,q) :: l -> (y,q) :: cons x p l
  let cons_opt = function
    | Some(x,p) -> cons x p
    | None -> fun x -> x

  let cmp (_,p1) (_,p2) = p1 - p2

  let append l1 l2 = List.sort cmp (l1 @ l2)
    
  let add_prio f l = List.map (fun x -> x, f x) l

  let concat3 f l1 l2 l3 = List.sort cmp (add_prio f l1 @ add_prio f l2 @ l3)

  let sort f l = add_prio f l |> List.sort cmp

end

let priority_sort l = List.stable_sort cmp_priority l

module Todo = struct

type 'a t = {
  system : ('a system_event option,'a system_event) with_attributes list;
  queue  : ('a queue_event option,'a queue_event)  with_attributes list;
  tasks  : ('a task_event option,'a task_event)   with_attributes Sorted.list;
  ready  : ('a option,'a) with_attributes Sorted.list;
}
[@@deriving show]
let empty = { system = []; queue = [] ; tasks = Sorted.nil; ready = Sorted.nil }

let prune_cancelled { system; queue; tasks; ready } =
  let not_cancelled { cancelled; _ } = !cancelled = false in
  let system = List.filter not_cancelled system in
  let queue  = List.filter not_cancelled queue in
  let tasks  = Sorted.filter not_cancelled tasks in
  let ready  = Sorted.filter not_cancelled ready in
  { system; queue; tasks; ready }


let size todo =
  let { system; queue; tasks; ready } = prune_cancelled todo in
  List.(length system + length queue + Sorted.length tasks + Sorted.length ready )

let is_empty todo =
  let { system; queue; tasks; ready } = prune_cancelled todo in
  system = [] && queue = [] && tasks = Sorted.nil && ready = Sorted.nil

let add { system; queue; tasks; ready } l =
  let new_sys, new_queue, new_tasks = partition_events l in
  { 
    system = system @ new_sys;
    queue = queue @ new_queue;
    tasks = Sorted.append tasks (Sorted.sort (fun x -> x.priority) new_tasks);
    ready;
  }

let only_recurring_events todo =
  let { system; queue; tasks; ready } = prune_cancelled todo in
  List.for_all is_recurring system &&
  List.for_all is_recurring queue &&
  Sorted.for_all is_recurring tasks &&
  ready = Sorted.nil
  
end
  
(* This is blocking wait (modulo a deadline). We check for system events
   (io, process death) or a queue (in case some thread puts a token there). *)
let rec wait_for_system_or_queue_events ~deadline (fds,sys) queue =
  if Unix.gettimeofday () > deadline then [], [], sys, queue, max_int
  else
    let ready_fds, _, _ = Unix.select fds [] [] 0.1 in
    let ready_sys, waiting_sys, min_prio_sys = pull_ready (advance_system ~ready_fds) sys in
    let ready_queue, waiting_queue, min_prio_queue = pull_ready advance_queue queue in
    if ready_sys <> [] || ready_queue <> [] then ready_sys, ready_queue, waiting_sys, waiting_queue, min min_prio_queue min_prio_sys
    else wait_for_system_or_queue_events ~deadline (fds,waiting_sys) queue

let wait_return_sorted (l1,l2,o3,todo) =
  priority_sort l1 |> List.map (fun x -> x.it),
  priority_sort l2 |> List.map (fun x -> x.it),
  (Option.map (fun (x,_) -> x.it) o3),
  todo

let cons_recurring_sorted e p l =
  match e.recurring with
  | None -> l
  | Some x -> Sorted.cons { e with it = x } p l
  

let pull_ready_sorted min_prio l =
  match Sorted.look l with
  | Sorted.Nil -> None, Sorted.nil
  | Sorted.Cons(_,p,_) when min_prio <= p -> None, l
  | Sorted.Cons(x,p,l) -> Some(x,p), cons_recurring_sorted x p l

let wait ?(deadline=max_float) todo =
  let open Todo in
  let { system; queue; tasks; ready } as todo = prune_cancelled todo in
  assert(ready = Sorted.nil); (* mixing with pop *)
  if is_empty todo then
    [], [], None, todo
  else
    let ready_sys, waiting_sys, min_prio_sys = check_for_system_events system in
    let ready_queue, waiting_queue, min_prio_queue = check_for_queue_events queue in
    if tasks = Sorted.nil && ready_queue = [] && ready_sys = [] then
      let fds = map_filter (function { it = ReadInProgress(fd,_); _ } -> Some fd | _ -> None) waiting_sys in
      let ready_sys, ready_queue, waiting_sys, waiting_queue, _ =
        wait_for_system_or_queue_events ~deadline (fds,waiting_sys) waiting_queue in
      ready_sys, ready_queue, None,
      { system = waiting_sys; queue = waiting_queue; tasks = Sorted.nil; ready = Sorted.nil }
    else
      let min_prio = min min_prio_sys min_prio_queue in
      let ready_task, waiting_tasks = pull_ready_sorted min_prio tasks in
      ready_sys, ready_queue, ready_task,
      { system = waiting_sys; queue = waiting_queue; tasks = waiting_tasks; ready = Sorted.nil }


let pop_return (ready_sys,ready_queue,ready_task, todo) =
  let open Todo in
  let ready_sys = List.map cast_to_not_recurring ready_sys in
  let ready_queue = List.map cast_to_not_recurring ready_queue in
  let ready_task = Sorted.cons_opt ready_task Sorted.nil in
  let ready = Sorted.concat3 (fun x -> x.priority) ready_sys ready_queue ready_task in
  match Sorted.look ready with
  | Sorted.Cons(x,_,ready) -> Some x.it, { todo with ready }
  | Sorted.Nil -> None, todo

let pop l =
  match pop_return @@ wait l with
  | None, _ -> raise @@ Failure "nothing to pop"
  | Some x, t -> x, t
let pop_opt l = pop_return @@ wait l

let pop_timeout ~stop_after_being_idle_for:delta l = 
  let deadline = next_deadline delta in
  pop_return @@ wait ~deadline l

let wait_timeout ~stop_after_being_idle_for:delta l =
  let deadline = next_deadline delta in
  wait_return_sorted @@ wait ~deadline l

let wait l = wait_return_sorted @@ wait l
OCaml

Innovation. Community. Security.