Library
Module
Module type
Parameter
Class
Class type
Lock-free single-producer, single-consumer queue.
Warning: This queue does not include safety mechanisms to prevent misuse. If consumer-only functions are called concurrently by multiple domains, the queue may enter an unexpected state, due to data races and a lack of linearizability. The same goes for producer-only functions.
val create : size_exponent:int -> 'a t
create ~size_exponent
creates a new single-producer single-consumer queue with a maximum size of 2^size_exponent
and initially empty.
🐌 This is a linear-time operation in 2^size_exponent
.
val of_list_exn : size_exponent:int -> 'a list -> 'a t
of_list_exn ~size_exponent list
creates a new queue from a list.
val length : 'a t -> int
length
returns the length of the queue. This method linearizes only when called from either the consumer or producer domain. Otherwise, it is safe to call but provides only an *indication* of the size of the structure.
Raised when push_exn
is applied to a full queue.
val push_exn : 'a t -> 'a -> unit
push queue elt
adds the element elt
at the end of the queue
. This method can be used by at most one domain at a time.
val try_push : 'a t -> 'a -> bool
try_push queue elt
tries to add the element elt
at the end of the queue
. If the queue q
is full, false
is returned. This method can be used by at most one domain at a time.
val pop_exn : 'a t -> 'a
pop_exn queue
removes and returns the first element in queue
. This method can be used by at most one domain at a time.
val pop_opt : 'a t -> 'a option
pop_opt queue
removes and returns Some
of the first element of the queue
, or None
if the queue is empty. This method can be used by at most one domain at a time.
val peek_exn : 'a t -> 'a
peek_exn queue
returns the first element in queue
without removing it. This method can be used by at most one domain at a time.
val peek_opt : 'a t -> 'a option
peek_opt queue
returns Some
of the first element in queue
, or None
if the queue is empty. This method can be used by at most one domain at a time.
val drop_exn : 'a t -> unit
drop_exn queue
removes the top element of the queue
.
# open Saturn.Single_prod_single_cons_queue
# let t : int t = create ~size_exponent:2
val t : int t = <abstr>
# push_exn t 1
- : unit = ()
# push_exn t 2
- : unit = ()
# try_push t 3
- : bool = true
# try_push t 4
- : bool = true
# try_push t 5
- : bool = false
# pop_opt t
- : int option = Some 1
# peek_opt t
- : int option = Some 2
# drop_exn t
- : unit = ()
# pop_exn t
- : int = 3
# pop_opt t
- : int option = Some 4
# pop_exn t
Exception: Saturn__Spsc_queue.Empty.
Note: The barrier is used in this example solely to make the results more interesting by increasing the likelihood of parallelism. Spawning a domain is a costly operation, especially compared to the relatively small amount of work being performed here. In practice, using a barrier in this manner is unnecessary.
# open Saturn.Single_prod_single_cons_queue
# let t : int t = create ~size_exponent:5
val t : int t = <abstr>
# let nwork = 5
val nwork : int = 5
# let barrier = Atomic.make 2
val barrier : int Atomic.t = <abstr>
# let consumer_work () =
(* Atomic.decr barrier;
while Atomic.get barrier <> 0 do Domain.cpu_relax () done; *)
let rec loop n =
if n < 1 then ()
else
(Domain.cpu_relax ();
match pop_opt t with
| Some p -> Format.printf "Popped %d\n%!" p; loop (n-1)
| None -> loop n)
in
loop nwork
val consumer_work : unit -> unit = <fun>
# let producer_work () =
(* Atomic.decr barrier;
while Atomic.get barrier <> 0 do Domain.cpu_relax () done; *)
for i = 1 to nwork do
Domain.cpu_relax ();
try_push t i |> ignore;
Format.printf "Pushed %d\n%!" i
done
val producer_work : unit -> unit = <fun>
# let consumer = Domain.spawn consumer_work
val consumer : unit Domain.t = <abstr>
# let producer = Domain.spawn producer_work
Pushed 1
Popped 1
Pushed 2
Popped 2
Pushed 3
Popped 3
Pushed 4
Popped 4
Popped 5
Pushed 5
val producer : unit Domain.t = <abstr>
# Domain.join consumer
- : unit = ()
# Domain.join producer
- : unit = ()