package picos

  1. Overview
  2. Docs

Basic structured concurrency primitives for Picos.

This library essentially provides one user level interface for structuring fibers with any Picos compatible scheduler. This library is both meant to serve as an example of what can be done and to also provide practical means for programming with fibers. Hopefully there will be many more libraries implemented in Picos like this providing different approaches, patterns, and idioms for structuring concurrent programs.

Modules

module Finally : sig ... end

Syntax for avoiding resource leaks.

module Control : sig ... end

Basic control operations and exceptions for structured concurrency.

module Bundle : sig ... end

A dynamic bundle of fibers guaranteed to be joined at the end.

module Run : sig ... end

Operations for running fibers in specific patterns.

Examples

First we open some modules for convenience:

open Picos_structured.Finally
open Picos_structured
open Picos_stdio
open Picos_sync

A simple echo server and clients

Here is an example of a simple TCP echo server and two clients:

# Picos_fifos.run ~forbid:false @@ fun () ->
  let max_size = 100 in

  (* We let the system pick the port *)
  let loopback_0 = Unix.(ADDR_INET (inet_addr_loopback, 0)) in
  let server_addr = ref loopback_0 in
  let mutex = Mutex.create () in
  let condition = Condition.create () in

  let server () =
    let@ socket =
      finally Unix.close @@ fun () ->
      Unix.socket ~cloexec:true PF_INET SOCK_STREAM 0
    in
    Unix.set_nonblock socket;
    Unix.bind socket loopback_0;
    Mutex.protect mutex begin fun () ->
      server_addr := Unix.getsockname socket
    end;
    Condition.signal condition;
    Unix.listen socket 8;

    Bundle.join_after begin fun bundle ->
      while true do
        let^ client =
          finally Unix.close @@ fun () ->
          Unix.accept ~cloexec:true socket |> fst
        in

        (* Fork a new fiber for each client *)
        Bundle.fork bundle begin fun () ->
          let@ client = move client in
          Unix.set_nonblock client;

          let bytes = Bytes.create max_size in
          let n = Unix.read client bytes 0 (Bytes.length bytes) in

          Unix.write client bytes 0 n |> ignore
        end
      done
    end
  in

  let client () =
    let@ socket =
      finally Unix.close @@ fun () ->
      Unix.socket ~cloexec:true PF_INET SOCK_STREAM 0
    in
    Unix.set_nonblock socket;
    Unix.connect socket !server_addr;

    let msg = "Hello!" in
    Unix.write_substring socket msg 0 (String.length msg) |> ignore;

    let bytes = Bytes.create max_size in
    let n = Unix.read socket bytes 0 (Bytes.length bytes) in

    Printf.printf "Received: %s\n%!" (Bytes.sub_string bytes 0 n)
  in

  Bundle.join_after begin fun bundle ->
    (* Star server *)
    Bundle.fork bundle server;

    (* Wait until server addr has been determined *)
    Mutex.protect mutex begin fun () ->
      while !server_addr == loopback_0 do
        Condition.wait condition mutex
      done
    end;

    (* Start some clients and wait until they are done *)
    Bundle.join_after begin fun bundle ->
      for _=1 to 5 do
        Bundle.fork bundle client
      done
    end;

    (* Stop server *)
    Bundle.terminate bundle
  end
Received: Hello!
Received: Hello!
Received: Hello!
Received: Hello!
Received: Hello!
- : unit = ()

As an exercise, you might want to refactor the server to avoid moving the file descriptors and use a recursive accept loop instead.

OCaml

Innovation. Community. Security.