package moonpool
Pools of threads supported by a pool of domains
Install
dune-project
Dependency
Authors
Maintainers
Sources
moonpool-0.6.tbz
sha256=3efd095c82a37bba8c7ab6a2532aee3c445ebe1ecaed84ef3ffb560bc65e7633
sha512=e4bcab82e6638299c2d0beb1dbf204f7b43379a5387418c6edff85b9bf90c13ad1bdd8eb44b69cd421268d1bc45bcf918bcf77e1c924348211ac27d6643aac78
doc/src/moonpool.fib/fiber.ml.html
Source file fiber.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
module A = Atomic module FM = Handle.Map module Int_map = Map.Make (Int) type 'a callback = 'a Exn_bt.result -> unit (** Callbacks that are called when a fiber is done. *) type cancel_callback = Exn_bt.t -> unit let prom_of_fut : 'a Fut.t -> 'a Fut.promise = Fut.Private_.unsafe_promise_of_fut module Private_ = struct type 'a t = { id: Handle.t; (** unique identifier for this fiber *) state: 'a state A.t; (** Current state in the lifetime of the fiber *) res: 'a Fut.t; runner: Runner.t; ls: Task_local_storage.t; } and 'a state = | Alive of { children: children; on_cancel: cancel_callback Int_map.t; cancel_id: int; } | Terminating_or_done of 'a Exn_bt.result A.t and children = any FM.t and any = Any : _ t -> any [@@unboxed] (** Key to access the current fiber. *) let k_current_fiber : any option Task_local_storage.key = Task_local_storage.new_key ~init:(fun () -> None) () let[@inline] get_cur () : any option = Task_local_storage.get k_current_fiber let[@inline] is_closed (self : _ t) = match A.get self.state with | Alive _ -> false | Terminating_or_done _ -> true end include Private_ let create_ ~ls ~runner () : 'a t = let id = Handle.generate_fresh () in let res, _promise = Fut.make () in { state = A.make @@ Alive { children = FM.empty; on_cancel = Int_map.empty; cancel_id = 0 }; id; res; runner; ls; } let create_done_ ~res () : _ t = let id = Handle.generate_fresh () in { state = A.make @@ Alive { children = FM.empty; on_cancel = Int_map.empty; cancel_id = 0 }; id; res; runner = Runner.dummy; ls = Task_local_storage.dummy; } let[@inline] return x = create_done_ ~res:(Fut.return x) () let[@inline] fail ebt = create_done_ ~res:(Fut.fail_exn_bt ebt) () let[@inline] res self = self.res let[@inline] peek self = Fut.peek self.res let[@inline] is_done self = Fut.is_done self.res let[@inline] is_success self = Fut.is_success self.res let[@inline] is_cancelled self = Fut.is_failed self.res let[@inline] on_result (self : _ t) f = Fut.on_result self.res f let[@inline] await self = Fut.await self.res let[@inline] wait_block self = Fut.wait_block self.res let[@inline] wait_block_exn self = Fut.wait_block_exn self.res (** Resolve [promise] once [children] are all done *) let resolve_once_children_are_done_ ~children ~promise (res : 'a Exn_bt.result A.t) : unit = let n_children = FM.cardinal children in if n_children > 0 then ( (* wait for all children to be done *) let n_waiting = A.make (FM.cardinal children) in let on_child_finish (r : _ result) = (* make sure the parent fails if any child fails *) (match r with | Ok _ -> () | Error ebt -> A.set res (Error ebt)); (* if we're the last to finish, resolve the parent fiber's [res] *) if A.fetch_and_add n_waiting (-1) = 1 then ( let res = A.get res in Fut.fulfill promise res ) in FM.iter (fun _ (Any f) -> Fut.on_result f.res on_child_finish) children ) else Fut.fulfill promise @@ A.get res let rec resolve_as_failed_ : type a. a t -> Exn_bt.t -> unit = fun self ebt -> let promise = prom_of_fut self.res in while match A.get self.state with | Alive { children; cancel_id = _; on_cancel } as old -> let new_st = Terminating_or_done (A.make @@ Error ebt) in if A.compare_and_set self.state old new_st then ( (* here, unlike in {!resolve_fiber}, we immediately cancel children *) cancel_children_ ~children ebt; Int_map.iter (fun _ cb -> cb ebt) on_cancel; resolve_once_children_are_done_ ~children ~promise (A.make @@ Error ebt); false ) else true | Terminating_or_done _ -> false do () done (** Cancel eagerly all children *) and cancel_children_ ebt ~children : unit = FM.iter (fun _ (Any f) -> resolve_as_failed_ f ebt) children type cancel_handle = int let add_on_cancel (self : _ t) cb : cancel_handle = let h = ref 0 in while match A.get self.state with | Alive { children; cancel_id; on_cancel } as old -> let new_st = Alive { children; cancel_id = cancel_id + 1; on_cancel = Int_map.add cancel_id cb on_cancel; } in if A.compare_and_set self.state old new_st then ( h := cancel_id; false ) else true | Terminating_or_done r -> (match A.get r with | Error ebt -> cb ebt | Ok _ -> ()); false do () done; !h let remove_on_cancel (self : _ t) h = while match A.get self.state with | Alive ({ on_cancel; _ } as alive) as old -> let new_st = Alive { alive with on_cancel = Int_map.remove h on_cancel } in not (A.compare_and_set self.state old new_st) | Terminating_or_done _ -> false do () done let with_on_cancel (self : _ t) cb (k : unit -> 'a) : 'a = let h = add_on_cancel self cb in Fun.protect k ~finally:(fun () -> remove_on_cancel self h) (** Successfully resolve the fiber *) let resolve_ok_ (self : 'a t) (r : 'a) : unit = let r = A.make @@ Ok r in let promise = prom_of_fut self.res in while match A.get self.state with | Alive { children; _ } as old -> let new_st = Terminating_or_done r in if A.compare_and_set self.state old new_st then ( resolve_once_children_are_done_ ~children ~promise r; false ) else true | Terminating_or_done _ -> false do () done let remove_child_ (self : _ t) (child : _ t) = while match A.get self.state with | Alive ({ children; _ } as alive) as old -> let new_st = Alive { alive with children = FM.remove child.id children } in not (A.compare_and_set self.state old new_st) | _ -> false do () done (** Add a child to [self]. @param protected if true, the child's failure will not affect [self]. *) let add_child_ ~protect (self : _ t) (child : _ t) = while match A.get self.state with | Alive ({ children; _ } as alive) as old -> let new_st = Alive { alive with children = FM.add child.id (Any child) children } in if A.compare_and_set self.state old new_st then ( (* make sure to remove [child] from [self.children] once it's done; fail [self] is [child] failed and [protect=false] *) Fut.on_result child.res (function | Ok _ -> remove_child_ self child | Error ebt -> (* child failed, we must fail too *) remove_child_ self child; if not protect then resolve_as_failed_ self ebt); false ) else true | Terminating_or_done r -> (match A.get r with | Error ebt -> (* cancel child immediately *) resolve_as_failed_ child ebt | Ok _ -> ()); false do () done let spawn_ ~ls ~parent ~runner (f : unit -> 'a) : 'a t = (match parent with | Some p when is_closed p -> failwith "spawn: nursery is closed" | _ -> ()); let fib = create_ ~ls ~runner () in let run () = (* make sure the fiber is accessible from inside itself *) Task_local_storage.set k_current_fiber (Some (Any fib)); try let res = f () in resolve_ok_ fib res with exn -> let bt = Printexc.get_raw_backtrace () in let ebt = Exn_bt.make exn bt in resolve_as_failed_ fib ebt in Runner.run_async ~ls runner run; fib let spawn_top ~on f : _ t = let ls = Task_local_storage.Direct.create () in spawn_ ~ls ~runner:on ~parent:None f let spawn ?on ?(protect = true) f : _ t = (* spawn [f()] with a copy of our local storage *) let (Any p) = match get_cur () with | None -> failwith "Fiber.spawn: must be run from within another fiber." | Some p -> p in let ls = Task_local_storage.Direct.copy p.ls in let runner = match on with | Some r -> r | None -> p.runner in let child = spawn_ ~ls ~parent:(Some p) ~runner f in add_child_ ~protect p child; child let[@inline] spawn_ignore ?protect f : unit = ignore (spawn ?protect f : _ t) let[@inline] self () : any = match Task_local_storage.get k_current_fiber with | None -> failwith "Fiber.self: must be run from inside a fiber." | Some f -> f let with_on_self_cancel cb (k : unit -> 'a) : 'a = let (Any self) = self () in let h = add_on_cancel self cb in Fun.protect k ~finally:(fun () -> remove_on_cancel self h) module Suspend_ = Moonpool.Private.Suspend_ let check_if_cancelled_ (self : _ t) = match A.get self.state with | Terminating_or_done r -> (match A.get r with | Error ebt -> Exn_bt.raise ebt | _ -> ()) | _ -> () let check_if_cancelled () = match Task_local_storage.get k_current_fiber with | None -> failwith "Fiber.check_if_cancelled: must be run from inside a fiber." | Some (Any self) -> check_if_cancelled_ self let yield () : unit = match Task_local_storage.get k_current_fiber with | None -> failwith "Fiber.yield: must be run from inside a fiber." | Some (Any self) -> check_if_cancelled_ self; Suspend_.yield (); check_if_cancelled_ self
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>