Legend:
Library
Module
Module type
Parameter
Class
Class type
Basic structured concurrency primitives for Picos.
This library essentially provides one user level interface for structuring fibers with any Picos compatible scheduler. This library is both meant to serve as an example of what can be done and to also provide practical means for programming with fibers. Hopefully there will be many more libraries implemented in Picos like this providing different approaches, patterns, and idioms for structuring concurrent programs.
Operations for running fibers in specific patterns.
Examples
First we open some modules for convenience:
open Picos_structured.Finally
open Picos_structured
open Picos_stdio
open Picos_sync
A simple echo server and clients
Here is an example of a simple TCP echo server and two clients:
# Picos_fifos.run ~forbid:false @@ fun () ->
let max_size = 100 in
(* We let the system pick the port *)
let loopback_0 = Unix.(ADDR_INET (inet_addr_loopback, 0)) in
let server_addr = ref loopback_0 in
let mutex = Mutex.create () in
let condition = Condition.create () in
let server () =
let@ socket =
finally Unix.close @@ fun () ->
Unix.socket ~cloexec:true PF_INET SOCK_STREAM 0
in
Unix.set_nonblock socket;
Unix.bind socket loopback_0;
Mutex.protect mutex begin fun () ->
server_addr := Unix.getsockname socket
end;
Condition.signal condition;
Unix.listen socket 8;
Bundle.join_after begin fun bundle ->
while true do
let^ client =
finally Unix.close @@ fun () ->
Unix.accept ~cloexec:true socket |> fst
in
(* Fork a new fiber for each client *)
Bundle.fork bundle begin fun () ->
let@ client = move client in
Unix.set_nonblock client;
let bytes = Bytes.create max_size in
let n = Unix.read client bytes 0 (Bytes.length bytes) in
Unix.write client bytes 0 n |> ignore
end
done
end
in
let client () =
let@ socket =
finally Unix.close @@ fun () ->
Unix.socket ~cloexec:true PF_INET SOCK_STREAM 0
in
Unix.set_nonblock socket;
Unix.connect socket !server_addr;
let msg = "Hello!" in
Unix.write_substring socket msg 0 (String.length msg) |> ignore;
let bytes = Bytes.create max_size in
let n = Unix.read socket bytes 0 (Bytes.length bytes) in
Printf.printf "Received: %s\n%!" (Bytes.sub_string bytes 0 n)
in
Bundle.join_after begin fun bundle ->
(* Star server *)
Bundle.fork bundle server;
(* Wait until server addr has been determined *)
Mutex.protect mutex begin fun () ->
while !server_addr == loopback_0 do
Condition.wait condition mutex
done
end;
(* Start some clients and wait until they are done *)
Bundle.join_after begin fun bundle ->
for _=1 to 5 do
Bundle.fork bundle client
done
end;
(* Stop server *)
Bundle.terminate bundle
end
Received: Hello!
Received: Hello!
Received: Hello!
Received: Hello!
Received: Hello!
- : unit = ()
As an exercise, you might want to refactor the server to avoid moving the file descriptors and use a recursive accept loop instead.