package xapi-stdext-threads
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>
Xapi's standard library extension, Threads
Install
dune-project
Dependency
Authors
Maintainers
Sources
xapi-stdext-4.20.0.tbz
sha256=ec93bcab02739d32f8ef09c30db57101bba7d58bb9a503a57dbfc9d81644b4e6
sha512=30db713ad51a9b1dc806bbb88aecec19ed22598a3c23da26aa80cbba431ba18b67e7f6e3834f1a848e75bfb9beeee2a171ddf0547f8c08f36c119a804aeaaf68
doc/src/xapi-stdext-threads/threadext.ml.html
Source file threadext.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112(* * Copyright (C) 2006-2009 Citrix Systems Inc. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as published * by the Free Software Foundation; version 2.1 only. with the special * exception on linking described in file LICENSE. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. *) module M = Mutex module Mutex = struct (** execute the function f with the mutex hold *) let execute lock f = Mutex.lock lock; Xapi_stdext_pervasives.Pervasiveext.finally f (fun () -> Mutex.unlock lock) end (** Parallel List.iter. Remembers all exceptions and returns an association list mapping input x to an exception. Applications of x which succeed will be missing from the returned list. *) let thread_iter_all_exns f xs = let exns = ref [] in let m = M.create () in List.iter Thread.join (List.map (fun x -> Thread.create (fun () -> try f x with e -> Mutex.execute m (fun () -> exns := (x, e) :: !exns) ) () ) xs); !exns (** Parallel List.iter. Remembers one exception (at random) and throws it in the error case. *) let thread_iter f xs = match thread_iter_all_exns f xs with | [] -> () | (_, e) :: _ -> raise e module Delay = struct (* Concrete type is the ends of a pipe *) type t = { (* A pipe is used to wake up a thread blocked in wait: *) mutable pipe_out: Unix.file_descr option; mutable pipe_in: Unix.file_descr option; (* Indicates that a signal arrived before a wait: *) mutable signalled: bool; m: M.t } let make () = { pipe_out = None; pipe_in = None; signalled = false; m = M.create () } exception Pre_signalled let wait (x: t) (seconds: float) = let finally = Xapi_stdext_pervasives.Pervasiveext.finally in let to_close = ref [ ] in let close' fd = if List.mem fd !to_close then Unix.close fd; to_close := List.filter (fun x -> fd <> x) !to_close in finally (fun () -> try let pipe_out = Mutex.execute x.m (fun () -> if x.signalled then begin x.signalled <- false; raise Pre_signalled; end; let pipe_out, pipe_in = Unix.pipe () in (* these will be unconditionally closed on exit *) to_close := [ pipe_out; pipe_in ]; x.pipe_out <- Some pipe_out; x.pipe_in <- Some pipe_in; x.signalled <- false; pipe_out) in let r, _, _ = Unix.select [ pipe_out ] [] [] seconds in (* flush the single byte from the pipe *) if r <> [] then ignore(Unix.read pipe_out (Bytes.create 1) 0 1); (* return true if we waited the full length of time, false if we were woken *) r = [] with Pre_signalled -> false ) (fun () -> Mutex.execute x.m (fun () -> x.pipe_out <- None; x.pipe_in <- None; List.iter close' !to_close) ) let signal (x: t) = Mutex.execute x.m (fun () -> match x.pipe_in with | Some fd -> ignore(Unix.write fd (Bytes.of_string "X") 0 1) | None -> x.signalled <- true (* If the wait hasn't happened yet then store up the signal *) ) end
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>