package sel

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file sel.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
(**************************************************************************)
(*                                                                        *)
(*                                 SEL                                    *)
(*                                                                        *)
(*                   Copyright INRIA and contributors                     *)
(*       (see version control and README file for authors & dates)        *)
(*                                                                        *)
(**************************************************************************)
(*                                                                        *)
(*   This file is distributed under the terms of the MIT License.         *)
(*   See LICENSE file.                                                    *)
(*                                                                        *)
(**************************************************************************)

open Events
open Event

let drop_event_type : 'b -> 'a WithAttributes.t -> 'b WithAttributes.t =
  fun b e -> WithAttributes.map (fun _ -> b) e
  
(* Splits the events intro 3 lists: systm, queue and task *)
let partition_events l =
  let rec partition_events sys queue task = function
    | [] -> sys, queue, task
    | { WithAttributes.it = SystemEvent x; _ } as e :: rest ->
        partition_events ((drop_event_type x e,e.priority) :: sys) queue task rest
    | { WithAttributes.it = QueueEvent x; _ } as e :: rest ->
        partition_events sys ((drop_event_type x e,e.priority) :: queue) task rest
    | { WithAttributes.it = Task (u,x); _ } as e :: rest ->
        partition_events sys queue ((Option.map (fun f x y -> f x.WithAttributes.it y.WithAttributes.it) u, drop_event_type x e, e.priority) :: task) rest
  in
    partition_events [] [] [] l

let next_deadline delta = Unix.gettimeofday() +. delta

module Todo = struct

type 'a t = {
  (* The pop API may need to postpone ready events *)
  ready : 'a WithAttributes.t Sorted.t;
  (* The three queues of events *)
  system : ('a system_event) WithAttributes.t Sorted.t;
  queue  : ('a queue_event) WithAttributes.t Sorted.t;
  tasks  : ('a task_event) WithAttributes.t Sorted.t;
}
[@@deriving show]
let empty = {
  system = Sorted.nil;
  queue = Sorted.nil ;
  tasks = Sorted.nil;
  ready = Sorted.nil;
}

let prune_cancelled { system; queue; tasks; ready } =
  let not_cancelled { WithAttributes.cancelled; _ } = !cancelled = false in
  let system = Sorted.filter not_cancelled system in
  let queue  = Sorted.filter not_cancelled queue in
  let tasks  = Sorted.filter not_cancelled tasks in
  let ready  = Sorted.filter not_cancelled ready in
  { system; queue; tasks; ready }

let size todo =
  let { system; queue; tasks; ready } = prune_cancelled todo in
  Sorted.length system + Sorted.length queue + Sorted.length tasks + Sorted.length ready

let is_empty todo =
  let { system; queue; tasks; ready } = prune_cancelled todo in
  Sorted.is_nil system &&
  Sorted.is_nil queue &&
  Sorted.is_nil tasks &&
  Sorted.is_nil ready

(* In order to preserve insertion order we tag each event with a tick *)
let tick = ref 0

let add { system; queue; tasks; ready } l =
  let tick ({ WithAttributes.priority; _ } as e) =
    incr tick;
    let priority = { priority with insertion = !tick } in
    { e with priority } in
  let l = List.map tick l in
  let new_sys, new_queue, new_tasks = partition_events l in
  { 
    ready;
    system = Sorted.append system (Sorted.of_list new_sys);
    queue  = Sorted.append queue (Sorted.of_list new_queue);
    tasks  = Sorted.append_uniq tasks new_tasks;
  }
  
end

(* Like List.filter but also returns the minimum priority of ready events.
   Moreover ~advance can make the event advance (whilst not being ready yet)*)
let pull_ready ~advance st l =
  let rec pull_ready yes min_priority_ready no st l =
    match Sorted.look l with
    | Sorted.Nil -> yes, no, min_priority_ready
    | Sorted.Cons(({ WithAttributes.it; cancelled; priority; _ } as e, _), rest) ->
        match advance st cancelled it with
        | st, Yes y ->
          let min_priority_ready = Sorted.min_user min_priority_ready priority in
          let e = drop_event_type y e in
          pull_ready (Sorted.cons e e.priority yes) min_priority_ready no st rest
        | st, Advanced x  ->
          pull_ready yes min_priority_ready (Sorted.cons { e with it = x } e.priority no) st rest 
        | st, No x  ->
          pull_ready yes min_priority_ready (Sorted.cons { e with it = x } e.priority no) st rest 
  in
    pull_ready Sorted.nil Sorted.max_priority Sorted.nil st l
  
type ('a,'b) ev_checker =
  'a WithAttributes.t Sorted.t -> 'b WithAttributes.t Sorted.t * 'a WithAttributes.t Sorted.t * Sorted.priority

let file_descriptors_of l =
  Sorted.map_filter (function { WithAttributes.it = ReadInProgress(fd,_); _ } -> Some fd | _ -> None) l

let filter_file_descriptor fds = function
  | { WithAttributes.it = ReadInProgress(fd,_); _ } -> List.mem fd fds
  | _ -> false

(* For fairness reasons, even if there are immediately ready events we
   give a shot to system events with 0 wait, otherwise we wait until a
   system event is ready. We never sleep forever, since process death events
   do not wakeup select: we anyway wake up 10 times per second *)
(* After advancing once, check if any waiting tasks have lower priority (more important) than the lowest
   ready queue, task or system event. If so, try to advance the system task event.
   The result is that it when reading 'n' bytes, it is no longer necessary to interleave up to 'n' ready tasks.
*)
let check_for_system_events min_prio_task_queue : ('a system_event,'a) ev_checker = fun waiting ->
  let rec check_for_system_events new_ready waiting_skipped min_prio_ready waiting =
    let fds = file_descriptors_of waiting in
    let ready_fds, _, _ = Unix.select fds [] [] 0.0 in
    let new_ready_1, waiting, min_prio_ready_1 = pull_ready ~advance:advance_system ready_fds waiting in
    let new_ready = Sorted.append new_ready_1 new_ready in
    let min_prio_ready = Sorted.min_user min_prio_ready_1 min_prio_ready in
    if ready_fds = [] then
      new_ready, Sorted.append waiting waiting_skipped, min_prio_ready
    else
      let waiting, waiting_skipped_1 = Sorted.partition (filter_file_descriptor ready_fds) waiting in
      let waiting_skipped = Sorted.concat [waiting_skipped_1; waiting_skipped] in
      check_for_system_events new_ready waiting_skipped min_prio_ready waiting
  in
    let waiting, waiting_skipped = Sorted.partition_priority (fun x -> Sorted.le_user x min_prio_task_queue) waiting in
    check_for_system_events Sorted.nil waiting_skipped Sorted.max_priority waiting

let check_for_queue_events : ('a queue_event,'a) ev_checker =
  fun waiting ->
    let new_ready, waiting, min_prio = pull_ready ~advance:advance_queue () waiting in
    new_ready, waiting, min_prio  

(* This is blocking wait (modulo a deadline). We check for system events
   (io, process death) or a queue (in case some thread puts a token there). *)
let rec wait_for_system_or_queue_events ~deadline (fds,sys) queue =
  if Unix.gettimeofday () > deadline then Sorted.nil, Sorted.nil, sys, queue, Sorted.max_priority
  else
    let ready_fds, _, _ = Unix.select fds [] [] 0.1 in
    let ready_sys, waiting_sys, min_prio_sys = pull_ready ~advance:advance_system ready_fds sys in
    let ready_queue, waiting_queue, min_prio_queue = pull_ready ~advance:advance_queue () queue in
    if not(Sorted.is_nil ready_sys) || not(Sorted.is_nil ready_queue)
    then
      let min_prio =  Sorted.min_priority min_prio_queue min_prio_sys in
      let new_ready_sys, waiting_sys, min_prio_new_ready_sys = check_for_system_events min_prio waiting_sys in
      Sorted.append new_ready_sys ready_sys, ready_queue, waiting_sys, waiting_queue, Sorted.min_priority min_prio_new_ready_sys min_prio
    else wait_for_system_or_queue_events ~deadline (fds,waiting_sys) queue

let wait_for_system_or_queue_events ~deadline sys queue =
  let fds = file_descriptors_of sys in
  wait_for_system_or_queue_events ~deadline (fds,sys) queue

let rec pull_tasks min_prio l =
  match Sorted.look l with
  | Sorted.Nil -> Sorted.nil, Sorted.nil
  | Sorted.Cons((x,p),l) when Sorted.le_user p min_prio ->
      let tasks, l = pull_tasks min_prio l in
      Sorted.cons x p tasks, l
  | _ -> Sorted.nil, l
  
(* Keep only events with a user priority equal to the given one (assumed to be the minimum) *)
let postpone p ready =
  let leq_user_prio { WithAttributes.priority = q; _} = Sorted.le_user q p in
  let ready, postponed = Sorted.partition leq_user_prio ready in
  ready, postponed
        
let wait ?(deadline=max_float) todo : 'a WithAttributes.t list * 'a Todo.t =
  let open Todo in
  let { system; queue; tasks; ready } as todo = prune_cancelled todo in
  if is_empty todo then
    [], todo
  else
    let min_prio, ready = Sorted.min ready in
    let ready_queue, waiting_queue, min_prio_queue = check_for_queue_events queue in
    let min_prio = Sorted.min_priority min_prio min_prio_queue in
    let ready_sys, waiting_sys, min_prio_sys = check_for_system_events min_prio system in
    if Sorted.is_nil ready_sys &&
       Sorted.is_nil ready_queue &&
       Sorted.is_nil ready &&
       Sorted.is_nil tasks
    then
      let ready_sys, ready_queue, waiting_sys, waiting_queue, min_prio =
        wait_for_system_or_queue_events ~deadline waiting_sys waiting_queue in
      let ready_sys, postponed_sys = postpone min_prio ready_sys in
      let ready_queue, postponed_queue = postpone min_prio ready_queue in
      let postponed = Sorted.append postponed_sys postponed_queue in
      let ready = Sorted.to_list (Sorted.append ready_sys ready_queue) in
      ready, { system = waiting_sys; queue = waiting_queue; tasks; ready = postponed }
    else
      let min_prio = Sorted.min_priority min_prio min_prio_sys in
      let ready_old, postponed_ready = pull_tasks min_prio ready in
      let ready_tasks, tasks = pull_tasks min_prio tasks in
      let ready_sys, postponed_sys = postpone min_prio ready_sys in
      let ready_queue, postponed_queue = postpone min_prio ready_queue in
      let postponed = Sorted.concat [postponed_sys; postponed_queue; postponed_ready] in
      let ready = Sorted.to_list (Sorted.concat [ready_sys; ready_queue; ready_tasks; ready_old]) in
      ready, { system = waiting_sys; queue = waiting_queue; tasks; ready = postponed }

let pop_return (ready, todo) =
  match ready with
  | { WithAttributes.it; _} :: rest ->
    let rest_w_prio = List.map (fun x -> x, x.WithAttributes.priority) rest in
    let ready = Sorted.append todo.Todo.ready (Sorted.of_list rest_w_prio) in
    let todo = { todo with Todo.ready } in
    Some it, todo
  | [] ->
    None, todo

let pop l =
  match pop_return @@ wait l with
  | None, _ -> raise @@ Failure "nothing to pop"
  | Some x, t -> x, t
let pop_opt l = pop_return @@ wait l

let pop_timeout ~stop_after_being_idle_for:delta l = 
  let deadline = next_deadline delta in
  pop_return @@ wait ~deadline l

let wait_return (l,todo) =
  l |> List.map (fun x -> x.WithAttributes.it), todo

let wait_timeout ~stop_after_being_idle_for:delta l =
  let deadline = next_deadline delta in
  wait_return @@ wait ~deadline l

let wait l = wait_return @@ wait l

include Events