package sel
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file sel.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479
(**************************************************************************) (* *) (* SEL *) (* *) (* Copyright INRIA and contributors *) (* (see version control and README file for authors & dates) *) (* *) (**************************************************************************) (* *) (* This file is distributed under the terms of the MIT License. *) (* See LICENSE file. *) (* *) (**************************************************************************) type 'a res = ('a,exn) result (* Events in progress *) type _ in_progress = | Line : Bytes.t * Buffer.t -> string in_progress | Bytes : int * Bytes.t -> Bytes.t in_progress (* Reified function composition *) type _ fcomp = | FNil : 'a -> 'a fcomp | FCons : 'a in_progress * ('a res -> 'b fcomp) -> 'b fcomp let rec map_fcomp f = function | FNil x -> FNil (f x) | FCons(pending,g) -> FCons(pending,(fun x -> map_fcomp f (g x))) let finish_with (k : 'a res -> 'b) x = FNil(k x) let (--) = fun x xs -> FCons(x,xs) (* locally one does let (--?) x y = err k x y in as a cons with error handling *) let err : type a b c. (c res -> b) -> a in_progress -> (a -> b fcomp) -> b fcomp = fun k x xs -> let xs = function | Ok v -> xs v | Error e -> FNil(k (Error e)) in FCons(x,xs) (* Read events can be composed of many read operations *) type 'a system_event = | ReadInProgress of Unix.file_descr * 'a fcomp | WaitProcess of int * (Unix.process_status -> 'a) let pp_system_event _ fmt = function | ReadInProgress(_,_) -> Stdlib.Format.fprintf fmt "ReadInProgress" | WaitProcess(pid,_) -> Stdlib.Format.fprintf fmt "WaitProcess %d" pid type 'a queue_event = | WaitQueue1 : 'b Queue.t * ('b -> 'a) -> 'a queue_event | WaitQueue2 : 'b Queue.t * 'c Queue.t * ('b -> 'c -> 'a) -> 'a queue_event let pp_queue_event _ fmt = function | WaitQueue1 _ -> Stdlib.Format.fprintf fmt "WaitQueue1" | WaitQueue2 _ -> Stdlib.Format.fprintf fmt "WaitQueue2" type 'a task_event = 'a [@@deriving show] module Event = struct type cancellation_handle = bool ref [@@deriving show] type ('a,'b) with_attributes = { name : string option ; recurring : 'a; priority : int; it : 'b; cancelled : cancellation_handle; } [@@deriving show] let cmp_priority { priority = t1; _} { priority = t2; _} = t1 - t2 let make_event it = let cancelled = ref false in { name = None; recurring = false; priority = 0; cancelled; it } type 'a event_ = | SystemEvent of 'a system_event | QueueEvent of 'a queue_event | Task of 'a task_event [@@deriving show] let map_system_event f = function | WaitProcess(pid,k) -> WaitProcess(pid, (fun x -> f (k x))) | ReadInProgress(fd,fcomp) -> ReadInProgress(fd,map_fcomp f fcomp) let map_queue_event f = function | WaitQueue1(q1,k) -> WaitQueue1(q1,(fun x -> f (k x))) | WaitQueue2(q1,q2,k) -> WaitQueue2(q1,q2,(fun x y -> f (k x y))) let map_task_event f x = f x let map_with_attributes f { name; recurring; priority; cancelled; it } = { name; recurring; priority; cancelled; it = f it } let map f = function | Task e -> Task (map_task_event f e) | SystemEvent e -> SystemEvent (map_system_event f e) | QueueEvent e -> QueueEvent(map_queue_event f e) type 'a t = (bool,'a event_) with_attributes [@@deriving show] let map f e = map_with_attributes (map f) e let name name it = { it with name = Some name } let recurring x = { x with recurring = true } let is_recurring { recurring; _ } = recurring <> None let at_priority priority x = { x with priority } let get_cancellation_handle { cancelled; _ } = cancelled let cancel x = x := true end open Event type ('a,'b) has_finished = | Yes of 'a | No of 'b (* can make_event a step *) let mkReadInProgress fd = function | FCons _ as f -> No (ReadInProgress(fd,f)) | FNil x -> Yes x let one_line () = Line (Bytes.make 1 '0', Buffer.create 40) let some_bytes n ?(buff=Bytes.create n) () = Bytes(n,buff) module On = struct type 'a res = ('a,exn) result let line fd k : 'a Event.t = make_event @@ SystemEvent (ReadInProgress(fd, one_line () -- finish_with k)) let bytes fd n k : 'a Event.t = make_event @@ SystemEvent (ReadInProgress(fd, some_bytes n () -- finish_with k)) let death_of ~pid k : 'a Event.t = make_event @@ SystemEvent (WaitProcess(pid,k)) let an_ocaml_value (k : 'a res -> 'b) : 'b fcomp = let (--?) x y = err k x y in some_bytes Marshal.header_size () --? (fun buff -> let data_size = Marshal.data_size buff 0 in some_bytes data_size ~buff:(Bytes.extend buff 0 data_size) () --? (fun buff -> let value = Marshal.from_bytes buff 0 in finish_with k (Ok value))) ;; let ocaml_value fd k : 'a Event.t = make_event @@ SystemEvent (ReadInProgress(fd, an_ocaml_value k)) let parse_content_length_or err k s = try let input = Scanf.Scanning.from_string (String.uppercase_ascii s) in Scanf.bscanf input "CONTENT-LENGTH: %d" k with (Scanf.Scan_failure _ | Failure _ | End_of_file | Invalid_argument _) as e -> err (Error e) let an_httpcle (k : Bytes.t res -> 'b) : 'b fcomp = let (--?) x y = err k x y in one_line () --? (parse_content_length_or (finish_with k) (fun length -> one_line () --? (fun _discard -> some_bytes length () --? (fun buff -> finish_with k (Ok buff))))) let httpcle fd k : 'a Event.t = make_event @@ SystemEvent (ReadInProgress(fd, an_httpcle k)) let queue q1 k : 'a Event.t = make_event @@ QueueEvent (WaitQueue1(q1,k)) let queues q1 q2 k : 'a Event.t = make_event @@ QueueEvent (WaitQueue2(q1,q2,k)) end let now task : 'a Event.t = make_event @@ Task task let cons_recurring e l = match e.recurring with | None -> l | Some x -> { e with it = x } :: l let rec pull_ready f yes no min_priority = function | [] -> List.rev yes, List.rev no, min_priority | { it; cancelled; _ } as e :: rest -> match f cancelled it with | Yes y -> pull_ready f ({ e with it = y } :: yes) (cons_recurring e no) (min min_priority e.priority) rest | No x -> pull_ready f yes ({ e with it = x } :: no) min_priority rest let pull_ready f l = pull_ready f [] [] max_int l let cast_to_recurring : (bool, 'a) with_attributes -> 'b -> ('b option, 'b) with_attributes = fun e it -> { it; recurring = if e.recurring then Some it else None; name = e.name; priority = e.priority; cancelled = e.cancelled; } let cast_to_not_recurring : ('b, 'a) with_attributes -> ('c option, 'a) with_attributes = fun e -> { it = e.it; recurring = None; name = e.name; priority = e.priority; cancelled = e.cancelled; } let rec partition_events sys queue task = function | [] -> List.rev sys, List.rev queue, List.rev task | { it = SystemEvent x; _ } as e :: rest -> partition_events (cast_to_recurring e x :: sys) queue task rest | { it = QueueEvent x; _ } as e :: rest -> partition_events sys (cast_to_recurring e x :: queue) task rest | { it = Task x; _ } as e :: rest -> partition_events sys queue (cast_to_recurring e x :: task) rest let partition_events l = partition_events [] [] [] l let advance_queue _ = function | WaitQueue1(q1,_) as x when Queue.is_empty q1 -> No x | WaitQueue1(q1,k) -> Yes(k (Queue.pop q1)) | WaitQueue2(q1,q2,_) as x when Queue.is_empty q1 || Queue.is_empty q2 -> No x | WaitQueue2(q1,q2,k) -> Yes(k (Queue.pop q1) (Queue.pop q2)) let advance_system ~ready_fds _ = function | WaitProcess(pid,k) as x -> let rc, code = Unix.waitpid [Unix.WNOHANG] pid in if rc == 0 then No x else Yes (k code) | ReadInProgress(_, FNil _) -> assert false | ReadInProgress(fd,_) as x when not (List.mem fd ready_fds) -> No x | ReadInProgress(fd, FCons(Line (buff,acc) as line,rest)) -> begin try let n = Unix.read fd buff 0 1 in if n = 0 then begin Buffer.clear acc; mkReadInProgress fd (rest (Error End_of_file)) end else let c = Bytes.get buff 0 in if c != '\n' then begin Buffer.add_char acc c; mkReadInProgress fd (FCons(line,rest)) end else begin let one_line = Buffer.contents acc in Buffer.clear acc; mkReadInProgress fd (rest (Ok one_line)) end with Unix.Unix_error _ as e -> Buffer.clear acc; mkReadInProgress fd (rest (Error e)) end | ReadInProgress(fd,FCons(Bytes(n,buff),rest)) -> begin try let m = Unix.read fd buff (Bytes.length buff - n) n in if m = 0 then mkReadInProgress fd (rest (Error End_of_file)) else if m != n then mkReadInProgress fd (FCons(Bytes(n-m,buff),rest)) else mkReadInProgress fd (rest (Ok buff)) with Unix.Unix_error _ as e -> mkReadInProgress fd (rest (Error e)) end let rec map_filter f = function | [] -> [] | x :: xs -> match f x with | None -> map_filter f xs | Some x -> x :: map_filter f xs (* For fairness reasons, even if there are immediately ready events we give a shot to system events with 0 wait, otherwise we wait until a system event is ready. We never sleep forever, since process death events do not wakeup select: we anyway wake up 10 times per second *) let check_for_system_events l = let fds = map_filter (function { it = ReadInProgress(fd,_); _ } -> Some fd | _ -> None) l in let ready_fds, _, _ = Unix.select fds [] [] 0.0 in let ready, waiting, min_prio = pull_ready (advance_system ~ready_fds) l in ready, waiting, min_prio let check_for_queue_events l = pull_ready advance_queue l let next_deadline delta = Unix.gettimeofday() +. delta module Sorted : sig type 'a list type 'a list_ = | Nil | Cons of 'a * int * 'a list val look : 'a list -> 'a list_ val cons : 'a -> int -> 'a list -> 'a list val cons_opt : ('a * int) option -> 'a list -> 'a list val nil : 'a list val length : 'a list -> int val filter : ('a -> bool) -> 'a list -> 'a list val for_all : ('a -> bool) -> 'a list -> bool val append : 'a list -> 'a list -> 'a list val concat3 : ('a -> int) -> 'a List.t -> 'a List.t -> 'a list -> 'a list val sort : ('a -> int) -> 'a List.t -> 'a list val pp_list : (Format.formatter -> 'a -> unit) -> Format.formatter -> 'a list -> unit end = struct module L = struct type 'a t = 'a list [@@deriving show] end type 'a list = ('a * int) L.t [@@deriving show] type 'a list_ = | Nil | Cons of 'a * int * ('a * int) L.t let nil = [] let length = List.length let on_fst f = (); fun (x,_) -> f x let filter f l = List.filter (on_fst f) l let for_all f l = List.for_all (on_fst f) l let look = function | [] -> Nil | (x,p) :: xs -> Cons(x,p,xs) let rec cons x p = function | [] -> [x,p] | (_,q) :: _ as l when p < q -> (x,p) :: l | (y,q) :: l -> (y,q) :: cons x p l let cons_opt = function | Some(x,p) -> cons x p | None -> fun x -> x let cmp (_,p1) (_,p2) = p1 - p2 let append l1 l2 = List.sort cmp (l1 @ l2) let add_prio f l = List.map (fun x -> x, f x) l let concat3 f l1 l2 l3 = List.sort cmp (add_prio f l1 @ add_prio f l2 @ l3) let sort f l = add_prio f l |> List.sort cmp end let priority_sort l = List.stable_sort cmp_priority l module Todo = struct type 'a t = { system : ('a system_event option,'a system_event) with_attributes list; queue : ('a queue_event option,'a queue_event) with_attributes list; tasks : ('a task_event option,'a task_event) with_attributes Sorted.list; ready : ('a option,'a) with_attributes Sorted.list; } [@@deriving show] let empty = { system = []; queue = [] ; tasks = Sorted.nil; ready = Sorted.nil } let prune_cancelled { system; queue; tasks; ready } = let not_cancelled { cancelled; _ } = !cancelled = false in let system = List.filter not_cancelled system in let queue = List.filter not_cancelled queue in let tasks = Sorted.filter not_cancelled tasks in let ready = Sorted.filter not_cancelled ready in { system; queue; tasks; ready } let size todo = let { system; queue; tasks; ready } = prune_cancelled todo in List.(length system + length queue + Sorted.length tasks + Sorted.length ready ) let is_empty todo = let { system; queue; tasks; ready } = prune_cancelled todo in system = [] && queue = [] && tasks = Sorted.nil && ready = Sorted.nil let add { system; queue; tasks; ready } l = let new_sys, new_queue, new_tasks = partition_events l in { system = system @ new_sys; queue = queue @ new_queue; tasks = Sorted.append tasks (Sorted.sort (fun x -> x.priority) new_tasks); ready; } let only_recurring_events todo = let { system; queue; tasks; ready } = prune_cancelled todo in List.for_all is_recurring system && List.for_all is_recurring queue && Sorted.for_all is_recurring tasks && ready = Sorted.nil end (* This is blocking wait (modulo a deadline). We check for system events (io, process death) or a queue (in case some thread puts a token there). *) let rec wait_for_system_or_queue_events ~deadline (fds,sys) queue = if Unix.gettimeofday () > deadline then [], [], sys, queue, max_int else let ready_fds, _, _ = Unix.select fds [] [] 0.1 in let ready_sys, waiting_sys, min_prio_sys = pull_ready (advance_system ~ready_fds) sys in let ready_queue, waiting_queue, min_prio_queue = pull_ready advance_queue queue in if ready_sys <> [] || ready_queue <> [] then ready_sys, ready_queue, waiting_sys, waiting_queue, min min_prio_queue min_prio_sys else wait_for_system_or_queue_events ~deadline (fds,waiting_sys) queue let wait_return_sorted (l1,l2,o3,todo) = priority_sort l1 |> List.map (fun x -> x.it), priority_sort l2 |> List.map (fun x -> x.it), (Option.map (fun (x,_) -> x.it) o3), todo let cons_recurring_sorted e p l = match e.recurring with | None -> l | Some x -> Sorted.cons { e with it = x } p l let pull_ready_sorted min_prio l = match Sorted.look l with | Sorted.Nil -> None, Sorted.nil | Sorted.Cons(_,p,_) when min_prio <= p -> None, l | Sorted.Cons(x,p,l) -> Some(x,p), cons_recurring_sorted x p l let wait ?(deadline=max_float) todo = let open Todo in let { system; queue; tasks; ready } as todo = prune_cancelled todo in assert(ready = Sorted.nil); (* mixing with pop *) if is_empty todo then [], [], None, todo else let ready_sys, waiting_sys, min_prio_sys = check_for_system_events system in let ready_queue, waiting_queue, min_prio_queue = check_for_queue_events queue in if tasks = Sorted.nil && ready_queue = [] && ready_sys = [] then let fds = map_filter (function { it = ReadInProgress(fd,_); _ } -> Some fd | _ -> None) waiting_sys in let ready_sys, ready_queue, waiting_sys, waiting_queue, _ = wait_for_system_or_queue_events ~deadline (fds,waiting_sys) waiting_queue in ready_sys, ready_queue, None, { system = waiting_sys; queue = waiting_queue; tasks = Sorted.nil; ready = Sorted.nil } else let min_prio = min min_prio_sys min_prio_queue in let ready_task, waiting_tasks = pull_ready_sorted min_prio tasks in ready_sys, ready_queue, ready_task, { system = waiting_sys; queue = waiting_queue; tasks = waiting_tasks; ready = Sorted.nil } let pop_return (ready_sys,ready_queue,ready_task, todo) = let open Todo in let ready_sys = List.map cast_to_not_recurring ready_sys in let ready_queue = List.map cast_to_not_recurring ready_queue in let ready_task = Sorted.cons_opt ready_task Sorted.nil in let ready = Sorted.concat3 (fun x -> x.priority) ready_sys ready_queue ready_task in match Sorted.look ready with | Sorted.Cons(x,_,ready) -> Some x.it, { todo with ready } | Sorted.Nil -> None, todo let pop l = match pop_return @@ wait l with | None, _ -> raise @@ Failure "nothing to pop" | Some x, t -> x, t let pop_opt l = pop_return @@ wait l let pop_timeout ~stop_after_being_idle_for:delta l = let deadline = next_deadline delta in pop_return @@ wait ~deadline l let wait_timeout ~stop_after_being_idle_for:delta l = let deadline = next_deadline delta in wait_return_sorted @@ wait ~deadline l let wait l = wait_return_sorted @@ wait l