package sel
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file sel.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221(**************************************************************************) (* *) (* SEL *) (* *) (* Copyright INRIA and contributors *) (* (see version control and README file for authors & dates) *) (* *) (**************************************************************************) (* *) (* This file is distributed under the terms of the MIT License. *) (* See LICENSE file. *) (* *) (**************************************************************************) open Events open Event let drop_event_type : 'b -> 'a WithAttributes.t -> 'b WithAttributes.t = fun b e -> WithAttributes.map (fun _ -> b) e (* Splits the events intro 3 lists: systm, queue and task *) let partition_events l = let rec partition_events sys queue task = function | [] -> sys, queue, task | { WithAttributes.it = SystemEvent x; _ } as e :: rest -> partition_events ((drop_event_type x e,e.priority) :: sys) queue task rest | { WithAttributes.it = QueueEvent x; _ } as e :: rest -> partition_events sys ((drop_event_type x e,e.priority) :: queue) task rest | { WithAttributes.it = Task x; _ } as e :: rest -> partition_events sys queue ((drop_event_type x e,e.priority) :: task) rest in partition_events [] [] [] l let next_deadline delta = Unix.gettimeofday() +. delta module Todo = struct type 'a t = { (* The pop API may need to postpone ready events *) ready : 'a WithAttributes.t Sorted.t; (* The three queues of events *) system : ('a system_event) WithAttributes.t Sorted.t; queue : ('a queue_event) WithAttributes.t Sorted.t; tasks : ('a task_event) WithAttributes.t Sorted.t; } [@@deriving show] let empty = { system = Sorted.nil; queue = Sorted.nil ; tasks = Sorted.nil; ready = Sorted.nil; } let prune_cancelled { system; queue; tasks; ready } = let not_cancelled { WithAttributes.cancelled; _ } = !cancelled = false in let system = Sorted.filter not_cancelled system in let queue = Sorted.filter not_cancelled queue in let tasks = Sorted.filter not_cancelled tasks in let ready = Sorted.filter not_cancelled ready in { system; queue; tasks; ready } let size todo = let { system; queue; tasks; ready } = prune_cancelled todo in Sorted.length system + Sorted.length queue + Sorted.length tasks + Sorted.length ready let is_empty todo = let { system; queue; tasks; ready } = prune_cancelled todo in Sorted.is_nil system && Sorted.is_nil queue && Sorted.is_nil tasks && Sorted.is_nil ready (* In order to preserve insertion order we tag each event with a tick *) let tick = ref 0 let add { system; queue; tasks; ready } l = let tick ({ WithAttributes.priority; _ } as e) = incr tick; let priority = { priority with insertion = !tick } in { e with priority } in let l = List.map tick l in let new_sys, new_queue, new_tasks = partition_events l in { ready; system = Sorted.append system (Sorted.of_list new_sys); queue = Sorted.append queue (Sorted.of_list new_queue); tasks = Sorted.append tasks (Sorted.of_list new_tasks); } end (* Like List.filter but also returns the minimum priority of ready events. Moreover ~advance can make the event advance (whilst not being ready yet)*) let pull_ready ~advance l = let rec pull_ready yes no min_priority l = match Sorted.look l with | Sorted.Nil -> yes, no, min_priority | Sorted.Cons(({ WithAttributes.it; cancelled; priority; _ } as e, _), rest) -> match advance cancelled it with | Yes y -> let min_priority = min min_priority priority in let e = drop_event_type y e in pull_ready (Sorted.cons e e.priority yes) no min_priority rest | No x -> pull_ready yes (Sorted.cons { e with it = x } e.priority no) min_priority rest in pull_ready Sorted.nil Sorted.nil Sorted.max_priority l type ('a,'b) ev_checker = 'a WithAttributes.t Sorted.t -> 'b WithAttributes.t Sorted.t * 'a WithAttributes.t Sorted.t * Sorted.priority let file_descriptors_of l = Sorted.map_filter (function { WithAttributes.it = ReadInProgress(fd,_); _ } -> Some fd | _ -> None) l (* For fairness reasons, even if there are immediately ready events we give a shot to system events with 0 wait, otherwise we wait until a system event is ready. We never sleep forever, since process death events do not wakeup select: we anyway wake up 10 times per second *) let check_for_system_events : ('a system_event,'a) ev_checker = fun waiting -> let fds = file_descriptors_of waiting in let ready_fds, _, _ = Unix.select fds [] [] 0.0 in let new_ready, waiting, min_prio = pull_ready ~advance:(advance_system ~ready_fds) waiting in new_ready, waiting, min_prio let check_for_queue_events : ('a queue_event,'a) ev_checker = fun waiting -> let new_ready, waiting, min_prio = pull_ready ~advance:advance_queue waiting in new_ready, waiting, min_prio (* This is blocking wait (modulo a deadline). We check for system events (io, process death) or a queue (in case some thread puts a token there). *) let rec wait_for_system_or_queue_events ~deadline (fds,sys) queue = if Unix.gettimeofday () > deadline then Sorted.nil, Sorted.nil, sys, queue, Sorted.max_priority else let ready_fds, _, _ = Unix.select fds [] [] 0.1 in let ready_sys, waiting_sys, min_prio_sys = pull_ready ~advance:(advance_system ~ready_fds) sys in let ready_queue, waiting_queue, min_prio_queue = pull_ready ~advance:advance_queue queue in if ready_sys <> Sorted.nil || ready_queue <> Sorted.nil then ready_sys, ready_queue, waiting_sys, waiting_queue, Sorted.min_priority min_prio_queue min_prio_sys else wait_for_system_or_queue_events ~deadline (fds,waiting_sys) queue let wait_for_system_or_queue_events ~deadline sys queue = let fds = file_descriptors_of sys in wait_for_system_or_queue_events ~deadline (fds,sys) queue let rec pull_tasks min_prio l = match Sorted.look l with | Sorted.Nil -> Sorted.nil, Sorted.nil | Sorted.Cons((x,p),l) when Sorted.le_user p min_prio -> let tasks, l = pull_tasks min_prio l in Sorted.cons x p tasks, l | _ -> Sorted.nil, l (* Keep only events with a user priority equal to the given one (assumed to be the minimum) *) let postpone p ready = let same_user_prio { WithAttributes.priority = q; _} = Sorted.eq_user p q in let ready, postponed = Sorted.partition same_user_prio ready in ready, postponed let wait ?(deadline=max_float) todo : 'a WithAttributes.t list * 'a Todo.t = let open Todo in let { system; queue; tasks; ready } as todo = prune_cancelled todo in if is_empty todo then [], todo else let ready_sys, waiting_sys, min_prio_sys = check_for_system_events system in let ready_queue, waiting_queue, min_prio_queue = check_for_queue_events queue in if Sorted.is_nil ready_sys && Sorted.is_nil ready_queue && Sorted.is_nil ready && Sorted.is_nil tasks then let ready_sys, ready_queue, waiting_sys, waiting_queue, min_prio = wait_for_system_or_queue_events ~deadline waiting_sys waiting_queue in let ready_sys, postponed_sys = postpone min_prio ready_sys in let ready_queue, postponed_queue = postpone min_prio ready_queue in let postponed = Sorted.append postponed_sys postponed_queue in let ready = Sorted.to_list (Sorted.append ready_sys ready_queue) in ready, { system = waiting_sys; queue = waiting_queue; tasks; ready = postponed } else let min_prio, ready = Sorted.min ready in let min_prio = Sorted.min_priority min_prio min_prio_sys in let min_prio = Sorted.min_priority min_prio min_prio_queue in let ready_tasks, tasks = pull_tasks min_prio tasks in let ready_sys, postponed_sys = postpone min_prio ready_sys in let ready_queue, postponed_queue = postpone min_prio ready_queue in let ready, postponed_ready = postpone min_prio ready in let postponed = Sorted.concat [postponed_sys; postponed_queue; postponed_ready] in let ready = Sorted.to_list (Sorted.concat [ready_sys; ready_queue; ready_tasks; ready]) in ready, { system = waiting_sys; queue = waiting_queue; tasks; ready = postponed } let pop_return (ready, todo) = match ready with | { WithAttributes.it; _} :: rest -> let rest_w_prio = List.map (fun x -> x, x.WithAttributes.priority) rest in let ready = Sorted.append todo.Todo.ready (Sorted.of_list rest_w_prio) in let todo = { todo with Todo.ready } in Some it, todo | [] -> None, todo let pop l = match pop_return @@ wait l with | None, _ -> raise @@ Failure "nothing to pop" | Some x, t -> x, t let pop_opt l = pop_return @@ wait l let pop_timeout ~stop_after_being_idle_for:delta l = let deadline = next_deadline delta in pop_return @@ wait ~deadline l let wait_return (l,todo) = l |> List.map (fun x -> x.WithAttributes.it), todo let wait_timeout ~stop_after_being_idle_for:delta l = let deadline = next_deadline delta in wait_return @@ wait ~deadline l let wait l = wait_return @@ wait l include Events