Library
Module
Module type
Parameter
Class
Class type
Basic communication and synchronization primitives for Picos
.
This library essentially provides a conventional set of communication and synchronization primitives for concurrent programming with any Picos compatible scheduler.
For the examples we open some modules:
open Picos_std_structured
open Picos_std_sync
module Mutex : sig ... end
A mutual-exclusion lock or mutex.
module Condition : sig ... end
A condition variable.
module Lazy : sig ... end
A lazy suspension.
module Latch : sig ... end
A dynamic single-use countdown latch.
module Ivar : sig ... end
An incremental or single-assignment poisonable variable.
module Stream : sig ... end
A lock-free, poisonable, many-to-many, stream.
Here is an example of a simple bounded (blocking) queue using a mutex and condition variables:
module Bounded_q : sig
type 'a t
val create : capacity:int -> 'a t
val push : 'a t -> 'a -> unit
val pop : 'a t -> 'a
end = struct
type 'a t = {
mutex : Mutex.t;
queue : 'a Queue.t;
capacity : int;
not_empty : Condition.t;
not_full : Condition.t;
}
let create ~capacity =
if capacity < 0 then
invalid_arg "negative capacity"
else {
mutex = Mutex.create ();
queue = Queue.create ();
capacity;
not_empty = Condition.create ();
not_full = Condition.create ();
}
let is_full_unsafe t =
t.capacity <= Queue.length t.queue
let push t x =
let was_empty =
Mutex.protect t.mutex @@ fun () ->
while is_full_unsafe t do
Condition.wait t.not_full t.mutex
done;
Queue.push x t.queue;
Queue.length t.queue = 1
in
if was_empty then
Condition.broadcast t.not_empty
let pop t =
let elem, was_full =
Mutex.protect t.mutex @@ fun () ->
while Queue.length t.queue = 0 do
Condition.wait
t.not_empty t.mutex
done;
let was_full = is_full_unsafe t in
Queue.pop t.queue, was_full
in
if was_full then
Condition.broadcast t.not_full;
elem
end
The above is definitely not the fastest nor the most scalable bounded queue, but we can now demonstrate it with the cooperative Picos_mux_fifo
scheduler:
# Picos_mux_fifo.run @@ fun () ->
let bq =
Bounded_q.create ~capacity:3
in
Flock.join_after ~on_return:`Terminate begin fun () ->
Flock.fork begin fun () ->
while true do
Printf.printf "Popped %d\n%!"
(Bounded_q.pop bq)
done
end;
for i=1 to 5 do
Printf.printf "Pushing %d\n%!" i;
Bounded_q.push bq i
done;
Printf.printf "All done?\n%!";
Control.yield ();
end;
Printf.printf "Pushing %d\n%!" 101;
Bounded_q.push bq 101;
Printf.printf "Popped %d\n%!"
(Bounded_q.pop bq)
Pushing 1
Pushing 2
Pushing 3
Pushing 4
Popped 1
Popped 2
Popped 3
Pushing 5
All done?
Popped 4
Popped 5
Pushing 101
Popped 101
- : unit = ()
Notice how the producer was able to push three elements to the queue after which the fourth push blocked and the consumer was started. Also, after canceling the consumer, the queue could still be used just fine.
The optional padded
argument taken by several constructor functions, e.g. Latch.create
, Mutex.create
, Condition.create
, Semaphore.Counting.make
, and Semaphore.Binary.make
, defaults to false
. When explicitly specified as ~padded:true
the object is allocated in a way to avoid false sharing. For relatively long lived objects this can improve performance and make performance more stable at the cost of using more memory. It is not recommended to use ~padded:true
for short lived objects.
The primitives provided by this library are generally optimized for low contention scenariors and size. Generally speaking, for best performance and scalability, you should try to avoid high contention scenarios by architecting your program to distribute processing such that sequential bottlenecks are avoided. If high contention is unavoidable then other communication and synchronization primitive implementations may provide better performance.