Page
Library
Module
Module type
Parameter
Class
Class type
Source
Saturn.Queue_unsafeOptimized Michael-Scott lock-free multi-producer multi-consumer queue.
All functions are lockfree. It is the recommended starting point when needing FIFO structure. It is inspired by Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms.
If you need a length function, you can use the bounded queue Saturn.Bounded_queue instead with maximun capacity (default value). However, this adds a general overhead to the operation.
val create : unit -> 'a tcreate () returns a new queue, initially empty.
val of_list : 'a list -> 'a tof_list list creates a new queue from a list.
🐌 This is a linear-time operation.
# open Saturn.Queue
# let t : int t = of_list [1;2;3;4]
val t : int t = <abstr>
# pop_opt t
- : int option = Some 1
# pop_opt t
- : int option = Some 2val is_empty : 'a t -> boolis_empty q returns true if q is empty and false otherwise.
val peek_exn : 'a t -> 'apeek_exn queue returns the first element of the queue without removing it.
val peek_opt : 'a t -> 'a optionpeek_opt queue returns Some of the first element of the queue without removing it, or None if the queue is empty.
val pop_exn : 'a t -> 'apop_exn queue removes and returns the first element of the queue.
val pop_opt : 'a t -> 'a optionpop_opt q removes and returns the first element in queue q, or returns None if the queue is empty.
val drop_exn : 'a t -> unitdrop_exn queue removes the top element of the queue.
val push : 'a t -> 'a -> unitpush q v adds the element v at the end of the queue q.
An example top-level session:
# open Saturn.Queue
# let t : int t = of_list [1;2;3]
val t : int t = <abstr>
# push t 42
- : unit = ()
# pop_exn t
- : int = 1
# peek_opt t
- : int option = Some 2
# drop_exn t
- : unit = ()
# pop_opt t
- : int option = Some 3
# pop_opt t
- : int option = Some 42
# pop_exn t
Exception: Saturn__Michael_scott_queue.Empty.Note: The barrier is used in this example solely to make the results more interesting by increasing the likelihood of parallelism. Spawning a domain is a costly operation, especially compared to the relatively small amount of work being performed here. In practice, using a barrier in this manner is unnecessary.
# open Saturn.Queue
# let t : string t = create ()
val t : string t = <abstr>
# Random.self_init ()
- : unit = ()
# let barrier = Atomic.make 2
val barrier : int Atomic.t = <abstr>
# let work id =
Atomic.decr barrier;
while Atomic.get barrier <> 0 do
Domain.cpu_relax ()
done;
for _ = 1 to 4 do
Domain.cpu_relax ();
if Random.bool () then push t id
else
match pop_opt t with
| None -> Format.printf "Domain %s sees an empty queue.\n%!" id
| Some v -> Format.printf "Domain %s pops values pushed by %s.\n%!" id v
done
val work : string -> unit = <fun>
# let domainA = Domain.spawn (fun () -> work "A")
val domainA : unit Domain.t = <abstr>
# let domainB = Domain.spawn (fun () -> work "B")
Domain B pops values pushed by B.
Domain A pops values pushed by A.
Domain B pops values pushed by A.
Domain B pops values pushed by A.
val domainB : unit Domain.t = <abstr>
# Domain.join domainA; Domain.join domainB
- : unit = ()