package moonpool
Pools of threads supported by a pool of domains
Install
dune-project
Dependency
Authors
Maintainers
Sources
moonpool-0.6.tbz
sha256=3efd095c82a37bba8c7ab6a2532aee3c445ebe1ecaed84ef3ffb560bc65e7633
sha512=e4bcab82e6638299c2d0beb1dbf204f7b43379a5387418c6edff85b9bf90c13ad1bdd8eb44b69cd421268d1bc45bcf918bcf77e1c924348211ac27d6643aac78
doc/src/moonpool.private/ws_deque_.ml.html
Source file ws_deque_.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
module A = Atomic_ (* terminology: - Bottom: where we push/pop normally. Only one thread can do that. - top: where work stealing happens (older values). This only ever grows. Elements are always added on the bottom end. *) (** Circular array (size is [2 ^ log_size]) *) module CA : sig type 'a t val create : dummy:'a -> unit -> 'a t val size : 'a t -> int val get : 'a t -> int -> 'a val set : 'a t -> int -> 'a -> unit end = struct (** The array has size 256. *) let log_size = 8 type 'a t = { arr: 'a array } [@@unboxed] let[@inline] size (_self : _ t) = 1 lsl log_size let create ~dummy () : _ t = { arr = Array.make (1 lsl log_size) dummy } let[@inline] get (self : 'a t) (i : int) : 'a = Array.unsafe_get self.arr (i land ((1 lsl log_size) - 1)) let[@inline] set (self : 'a t) (i : int) (x : 'a) : unit = Array.unsafe_set self.arr (i land ((1 lsl log_size) - 1)) x end type 'a t = { top: int A.t; (** Where we steal *) bottom: int A.t; (** Where we push/pop from the owning thread *) mutable top_cached: int; (** Last read value of [top] *) arr: 'a CA.t; (** The circular array *) } let create ~dummy () : _ t = let top = A.make 0 in let arr = CA.create ~dummy () in (* allocate far from [top] to avoid false sharing *) let bottom = A.make 0 in { top; top_cached = 0; bottom; arr } let[@inline] size (self : _ t) : int = max 0 (A.get self.bottom - A.get self.top) exception Full let push (self : 'a t) (x : 'a) : bool = try let b = A.get self.bottom in let t_approx = self.top_cached in (* Section 2.3: over-approximation of size. Only if it seems too big do we actually read [t]. *) let size_approx = b - t_approx in if size_approx >= CA.size self.arr - 1 then ( (* we need to read the actual value of [top], which might entail contention. *) let t = A.get self.top in self.top_cached <- t; let size = b - t in if size >= CA.size self.arr - 1 then (* full! *) raise_notrace Full ); CA.set self.arr b x; A.set self.bottom (b + 1); true with Full -> false let pop (self : 'a t) : 'a option = let b = A.get self.bottom in let b = b - 1 in A.set self.bottom b; let t = A.get self.top in self.top_cached <- t; let size = b - t in if size < 0 then ( (* reset to basic empty state *) A.set self.bottom t; None ) else if size > 0 then ( (* can pop without modifying [top] *) let x = CA.get self.arr b in Some x ) else ( assert (size = 0); (* there was exactly one slot, so we might be racing against stealers to update [self.top] *) if A.compare_and_set self.top t (t + 1) then ( let x = CA.get self.arr b in A.set self.bottom (t + 1); Some x ) else ( A.set self.bottom (t + 1); None ) ) let steal (self : 'a t) : 'a option = (* read [top], but do not update [top_cached] as we're in another thread *) let t = A.get self.top in let b = A.get self.bottom in let size = b - t in if size <= 0 then None else ( let x = CA.get self.arr t in if A.compare_and_set self.top t (t + 1) then (* successfully increased top to consume [x] *) Some x else None )
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>