Legend:
Library
Module
Module type
Parameter
Class
Class type
Basic communication and synchronization primitives for Picos.
The optional padded argument taken by several constructor functions, e.g. Mutex.create and Condition.create, defaults to false. When explicitly specified as ~padded:true the object is allocated in a way to avoid false sharing. For relatively long lived objects this can improve performance and make performance more stable at the cost of using more memory. It is not recommended to use ~padded:true for short lived objects.
Here is an example of a simple bounded (blocking) queue using a mutex and condition variables:
module Bounded_queue : sig
type 'a t
val create : ?capacity:int -> unit -> 'a t
val push : 'a t -> 'a -> unit
val pop : 'a t -> 'a
end = struct
type 'a t = {
mutex : Mutex.t;
queue : 'a Queue.t;
capacity : int;
not_empty : Condition.t;
not_full : Condition.t;
}
let create ?(capacity = Int.max_int) () =
if capacity < 0 then
invalid_arg "capacity cannot be negative"
else {
mutex = Mutex.create ();
queue = Queue.create ();
capacity;
not_empty = Condition.create ();
not_full = Condition.create ();
}
let push t x =
let was_empty =
Mutex.protect t.mutex @@ fun () ->
while t.capacity <= Queue.length t.queue do
Condition.wait t.not_full t.mutex
done;
Queue.push x t.queue;
Queue.length t.queue = 1
in
if was_empty then Condition.signal t.not_empty
let pop t =
let elem, was_full =
Mutex.protect t.mutex @@ fun () ->
while Queue.length t.queue = 0 do
Condition.wait t.not_empty t.mutex
done;
let was_full = Queue.length t.queue = t.capacity in
Queue.pop t.queue, was_full
in
if was_full then Condition.signal t.not_full;
elem
end
The above is definitely not the fastest nor the most scalable bounded queue, but we can now demonstrate it with the cooperative Picos_fifos scheduler:
# Picos_fifos.run ~forbid:false @@ fun () ->
let bq = Bounded_queue.create ~capacity:3 () in
Bundle.join_after begin fun bundle ->
Bundle.fork bundle begin fun () ->
while true do
Printf.printf "Popped %d\n%!" (Bounded_queue.pop bq)
done
end;
for i=1 to 5 do
Printf.printf "Pushing %d\n%!" i;
Bounded_queue.push bq i
done;
Printf.printf "All done?\n%!";
Control.yield ();
Bundle.terminate bundle
end;
Printf.printf "Pushing %d\n%!" 101;
Bounded_queue.push bq 101;
Printf.printf "Popped %d\n%!" (Bounded_queue.pop bq)
Pushing 1
Pushing 2
Pushing 3
Pushing 4
Popped 1
Popped 2
Popped 3
Pushing 5
All done?
Popped 4
Popped 5
Pushing 101
Popped 101
- : unit = ()
Notice how the producer was able to push three elements to the queue after which the fourth push blocked and the consumer was started. Also, after canceling the consumer, the queue could still be used just fine.