package picos_lwt
Install
    
    dune-project
 Dependency
Authors
Maintainers
Sources
sha256=3f5a08199cf65c2dae2f7d68f3877178f1da8eabf5376e15114e5a8958087dfa
    
    
  sha512=ad24910c47ce614268c4268874bb918da7f8b5f03b3ad706bbf30323635262e94ddab6be24eaebbca706bfa82c0a517d4272b396459e020c185942125c9bdb7b
    
    
  Description
Published: 03 Nov 2024
README
API reference · Benchmarks · Stdlib Benchmarks
Picos — Interoperable effects based concurrency
Picos is a systems programming interface between effects based schedulers and concurrent abstractions.
Picos is designed to enable an open ecosystem of interoperable and interchangeable elements of effects based cooperative concurrent programming models such as
- schedulers that multiplex large numbers of user level fibers to run on a small number of system level threads,
- mechanisms for managing fibers and for structuring concurrency,
- communication and synchronization primitives, such as mutexes and condition variables, message queues, STMs, and more, and
- integrations with low level asynchronous IO systems
by decoupling such elements from each other.
Picos comes with a reference manual and many sample libraries.
⚠️ Please note that Picos is still considered experimental and unstable.
Introduction
Picos addresses the incompatibility of effects based schedulers at a fundamental level by introducing an interface to decouple schedulers and other concurrent abstractions that need services from a scheduler.
The core abstractions of Picos are
- Trigger— the ability to await for a signal,
- Computation— a cancelable computation, and
- Fiber— an independent thread of execution,
that are implemented partially by the Picos interface in terms of the effects
- Trigger.Await— to suspend and resume a fiber,
- Computation.Cancel_after— to cancel a computation after given period of time,
- Fiber.Current— to obtain the current fiber,
- Fiber.Yield— to request rescheduling, and
- Fiber.Spawn— to start a new fiber.
The partial implementation of the abstractions and the effects define a contract between schedulers and other concurrent abstractions. By handling the Picos effects according to the contract a scheduler becomes Picos compatible, which allows any abstractions written against the Picos interface, i.e. Implemented in Picos, to be used with the scheduler.
Understanding cancelation
A central idea or goal of Picos is to provide a collection of building blocks for parallelism safe cancelation that allows the implementation of both blocking abstractions as well as the implementation of abstractions for structuring fibers for cancelation or managing the propagation and scope of cancelation.
While cancelation, which is essentially a kind of asynchronous exception or signal, is not necessarily recommended as a general control mechanism, the ability to cancel fibers in case of errors is crucial for the implementation of practical concurrent programming models.
Consider the following characteristic example:
Mutex.protect mutex begin fun () ->
  while true do
    Condition.wait condition mutex
  done
endAssume that a fiber executing the above code might be canceled, at any point, by another fiber running in parallel. This could be necessary, for example, due to an error that requires the application to be shut down. How could that be done while ensuring both safety and liveness?
- For safety, cancelation should not leave the program in an invalid state or cause the program to leak memory. In this case, Condition.waitmust exit with the mutex locked, even in case of cancelation, and, asMutex.protectexits, the ownership of the mutex must be transferred to the next fiber, if any, waiting in queue for the mutex. No references to unused objects may be left in the mutex or the condition variable.
- For liveness, cancelation should ensure that the fiber will eventually continue after cancelation. In this case, cancelation could be triggered during the Mutex.lockoperation insideMutex.protector theCondition.waitoperation, when the fiber might be in a suspended state, and cancelation should then allow the fiber to continue.
The set of abstractions, Trigger, Computation, and Fiber, work together to support cancelation. Briefly, a fiber corresponds to an independent thread of execution and every fiber is associated with a computation at all times. When a fiber creates a trigger in order to await for a signal, it ask the scheduler to suspend the fiber on the trigger. Assuming the fiber has not forbidden the propagation of cancelation, which is required, for example, in the implementation of Condition.wait to lock the mutex upon exit, the scheduler must also attach the trigger to the computation associated with the fiber. If the computation is then canceled before the trigger is otherwise signaled, the trigger will be signaled by the cancelation of the computation, and the fiber will be resumed by the scheduler as canceled.
This cancelable suspension protocol and its partial implementation designed around the first-order Trigger.Await effect creates a clear separation between schedulers and user code running in fibers and is designed to handle the possibility of a trigger being signaled or a computation being canceled at any point during the suspension of a fiber. Schedulers are given maximal freedom to decide which fiber to resume next. As an example, a scheduler could give priority to canceled fibers — going as far as moving a fiber already in the ready queue of the scheduler to the front of the queue at the point of cancelation — based on the assumption that user code promptly cancels external requests and frees critical resources.
Trigger
A trigger provides the ability to await for a signal and is perhaps the best established and least controversial element of the Picos interface.
Here is an extract from the signature of the Trigger module:
type t
val create : unit -> t
val await : t -> (exn * Printexc.raw_backtrace) option
val signal : t -> unit
val on_signal : (* for schedulers *)The idea is that a fiber may create a trigger, insert it into some shared data structure, and then call await to ask the scheduler to suspend the fiber until something signals the trigger. When await returns an exception with a backtrace it means that the fiber has been canceled.
As an example, let's consider the implementation of an Ivar or incremental or single-assignment variable:
type 'a t
val create : unit -> 'a t
val try_fill : 'a t -> 'a -> bool
val read : 'a t -> 'aAn Ivar is created as empty and can be filled with a value once. An attempt to read an Ivar blocks until the Ivar is filled.
Using Trigger and Atomic, we can represent an Ivar as follows:
type 'a state =
  | Filled of 'a
  | Empty of Trigger.t list
type 'a t = 'a state Atomic.tThe try_fill operation is then fairly straightforward to implement:
let rec try_fill t value =
  match Atomic.get t with
  | Filled _ -> false
  | Empty triggers as before ->
    let after = Filled value in
    if Atomic.compare_and_set t before after then
      begin
        List.iter Trigger.signal triggers; (* ! *)
        true
      end
    else
      try_fill t valueThe interesting detail above is that after successfully filling an Ivar, the triggers are signaled. This allows the await inside the read operation to return:
let rec read t =
  match Atomic.get t with
  | Filled value -> value
  | Empty triggers as before ->
    let trigger = Trigger.create () in
    let after = Empty (trigger :: triggers) in
    if Atomic.compare_and_set t before after then
      match Trigger.await trigger with
      | None -> read t
      | Some (exn, bt) ->
        cleanup t trigger; (* ! *)
        Printexc.raise_with_backtrace exn bt
    else
      read tAn important detail above is that when await returns an exception with a backtrace, meaning that the fiber has been canceled, the cleanup operation (which is omitted) is called to remove the trigger from the Ivar to avoid potentially accumulating unbounded numbers of triggers in an empty Ivar.
As simple as it is, the design of Trigger is far from arbitrary:
- First of all, Triggerhas single-assignment semantics. After being signaled, a trigger takes a constant amount of space and does not point to any other heap object. This makes it easier to reason about the behavior and can also help to avoid leaks or optimize data structures containing triggers, because it is safe to hold bounded amounts of signaled triggers.
- The Triggerabstraction is essentially first-order, which provides a clear separation between a scheduler and programs, or fibers, running on a scheduler. Theawaitoperation performs theAwaiteffect, which passes the trigger to the scheduler. The scheduler then attaches its own callback to the trigger usingon_signal. This way a scheduler does not call arbitrary user specified code in theAwaiteffect handler.
- Separating the creation of a trigger from the awaitoperation allows one to easily insert a trigger into any number of places and allows the trigger to be potentially concurrently signaled before theAwaiteffect is performed in which case the effect can be skipped entirely.
- No value is propagated with a trigger. This makes triggers simpler and makes it less likely for one to e.g. accidentally drop such a value. In many cases, like with the Ivar, there is already a data structure through which values can be propagated.
- The signaloperation gives no indication of whether a fiber will then be resumed as canceled or not. This gives maximal flexibility for the scheduler and also makes it clear that cancelation must be handled based on the return value ofawait.
Computation
A Computation basically holds the status, i.e. running, returned, or canceled, of some sort of computation and allows anyone with access to the computation to attach triggers to it to be signaled in case the computation stops running.
Here is an extract from the signature of the Computation module:
type 'a t
val create : unit -> 'a t
val try_attach : 'a t -> Trigger.t -> bool
val detach : 'a t -> Trigger.t -> unit
val try_return : 'a t -> 'a -> bool
val try_cancel : 'a t -> exn -> Printexc.raw_backtrace -> bool
val check : 'a t -> unit
val await : 'a t -> 'aA Computation directly provides a superset of the functionality of the Ivar we sketched in the previous section:
type 'a t = 'a Computation.t
let create : unit -> 'a t = Computation.create
let try_fill : 'a t -> 'a -> bool =
  Computation.try_return
let read : 'a t -> 'a = Computation.awaitHowever, what really makes the Computation useful is the ability to momentarily attach triggers to it. A Computation essentially implements a specialized lock-free bag of triggers, which allows one to implement dynamic completion propagation networks.
The Computation abstraction is also designed with both simplicity and flexibility in mind:
- Similarly to Trigger,Computationhas single-assignment semantics, which makes it easier to reason about.
- Unlike a typical cancelation context of a structured concurrency model, Computationis unopinionated in that it does not impose a specific hierarchical structure.
- Anyone may ask to be notified when a Computationis completed by attaching triggers to it and anyone may complete aComputation. This makesComputationan omnidirectional communication primitive.
Interestingly, and unintentionally, it turns out that, given the ability to complete two (or more) computations atomically, Computation is essentially expressive enough to implement the event abstraction of Concurrent ML. The same features that make Computation suitable for implementing more or less arbitrary dynamic completion propagation networks make it suitable for implementing Concurrent ML style abstractions.
Fiber
A fiber corresponds to an independent thread of execution. Technically an effects based scheduler creates a fiber, effectively giving it an identity, as it runs some function under its handler. The Fiber abstraction provides a way to share a proxy identity, and a bit of state, between a scheduler and other concurrent abstractions.
Here is an extract from the signature of the Fiber module:
type t
val current : unit -> t
val create : forbid:bool -> 'a Computation.t -> t
val spawn : t -> (t -> unit) -> unit
val get_computation : t -> Computation.packed
val set_computation : t -> Computation.packed -> unit
val has_forbidden : t -> bool
val exchange : t -> forbid:bool -> bool
module FLS : sig (* ... *) endFibers are where all of the low level bits and pieces of Picos come together, which makes it difficult to give both meaningful and concise examples, but let's implement a slightly simplistic structured concurrency mechanism:
type t (* represents a scope *)
val run : (t -> unit) -> unit
val fork : t -> (unit -> unit) -> unitThe idea here is that run creates a "scope" and waits until all of the fibers forked into the scope have finished. In case any fiber raises an unhandled exception, or the main fiber that created the scope is canceled, all of the fibers are canceled and an exception is raised. To keep things slightly simpler, only the first exception is kept.
A scope can be represented by a simple record type:
type t = {
  count : int Atomic.t;
  inner : unit Computation.t;
  ended : Trigger.t;
}The idea is that after a fiber is finished, we decrement the count and if it becomes zero, we finish the computation and signal the main fiber that the scope has ended:
let decr t =
  let n = Atomic.fetch_and_add t.count (-1) in
  if n = 1 then begin
    Computation.finish t.inner;
    Trigger.signal t.ended
  endWhen forking a fiber, we increment the count unless it already was zero, in which case we raise an error:
let rec incr t =
  let n = Atomic.get t.count in
  if n = 0 then invalid_arg "ended";
  if not (Atomic.compare_and_set t.count n (n + 1))
  then incr tThe fork operation is now relatively straightforward to implement:
let fork t action =
  incr t;
  try
    let main _ =
      match action () with
      | () -> decr t
      | exception exn ->
          let bt = Printexc.get_raw_backtrace () in
          Computation.cancel t.inner exn bt;
          decr t
    in
    let fiber =
      Fiber.create ~forbid:false t.inner
    in
    Fiber.spawn fiber main
  with canceled_exn ->
    decr t;
    raise canceled_exnThe above fork first increments the count and then tries to spawn a fiber. The Picos interface specifies that when Fiber.spawn returns normally, the action, main, must be called by the scheduler. This allows us to ensure that the increment is always matched with a decrement.
Setting up a scope is the most complex operation:
let run body =
  let count = Atomic.make 1 in
  let inner = Computation.create () in
  let ended = Trigger.create () in
  let t = { count; inner; ended } in
  let fiber = Fiber.current () in
  let (Packed outer) =
    Fiber.get_computation fiber
  in
  let canceler =
    Computation.attach_canceler
      ~from:outer
      ~into:t.inner
  in
  match
    Fiber.set_computation fiber (Packed t.inner);
    body t
  with
  | () -> join t outer canceler fiber
  | exception exn ->
      let bt = Printexc.get_raw_backtrace () in
      Computation.cancel t.inner exn bt;
      join t outer canceler fiber;
      Printexc.raise_with_backtrace exn btThe Computation.attach_canceler operation attaches a special trigger to propagate cancelation from one computation into another. After the body exits, join
let join t outer canceler fiber =
  decr t;
  Fiber.set_computation fiber (Packed outer);
  let forbid = Fiber.exchange fiber ~forbid:true in
  Trigger.await t.ended |> ignore;
  Fiber.set fiber ~forbid;
  Computation.detach outer canceler;
  Computation.check t.inner;
  Fiber.check fiberis called to wait for the scoped fibers and restore the state of the main fiber. An important detail is that propagation of cancelation is forbidden by setting the forbid flag to true before the call of Trigger.await. This is necessary to ensure that join does not exit, due to the fiber being canceled, before all of the child fibers have actually finished. Finally, join checks the inner computation and the fiber, which means that an exception will be raised in case either was canceled.
The design of Fiber includes several key features:
- The low level design allows one to both avoid unnecessary overheads, such as allocating a Computation.tfor every fiber, when implementing simple abstractions and also to implement more complex behaviors that might prove difficult given e.g. a higher level design with a built-in notion of hierarchy.
- As Fiber.tstores theforbidflag and theComputation.tassociated with the fiber one need not pass those as arguments through the program. This allows various concurrent abstractions to be given traditional interfaces, which would otherwise need to be complicated.
- Effects are relatively expensive. The cost of performing effects can be amortized by obtaining the Fiber.tonce and then manipulating it multiple times.
- A Fiber.talso provides an identity for the fiber. It has so far proven to be sufficient for most purposes. Fiber local storage, which we do not cover here, can be used to implement, for example, a unique integer id for fibers.
Assumptions
Now, consider the Ivar abstraction presented earlier as an example of the use of the Trigger abstraction. That Ivar implementation, as well as the Computation based implementation, works exactly as desired inside the scope abstraction presented in the previous section. In particular, a blocked Ivar.read can be canceled, either when another fiber in a scope raises an unhandled exception or when the main fiber of the scope is canceled, which allows the fiber to continue by raising an exception after cleaning up. In fact, Picos comes with a number of libraries that all would work quite nicely with the examples presented here.
For example, a library provides an operation to run a block with a timeout on the current fiber. One could use it with Ivar.read to implement a read operation with a timeout:
let read_in ~seconds ivar =
  Control.terminate_after ~seconds @@ fun () ->
  Ivar.read ivarThis interoperability is not accidental. For example, the scope abstraction basically assumes that one does not use Fiber.set_computation, in an arbitrary unscoped manner inside the scoped fibers. An idea with the Picos interface actually is that it is not supposed to be used by applications at all and most higher level libraries should be built on top of libraries that do not directly expose elements of the Picos interface.
Perhaps more interestingly, there are obviously limits to what can be achieved in an "interoperable" manner. Imagine an operation like
val at_exit : (unit -> unit) -> unitthat would allow one to run an action just before a fiber exits. One could, of course, use a custom spawn function that would support such cleanup, but then at_exit could only be used on fibers spawned through that particular spawn function.
The effects
As mentioned previously, the Picos interface is implemented partially in terms of five effects:
type _ Effect.t +=
  | Await : Trigger.t -> (exn * Printexc.raw_backtrace) option Effect.t
  | Cancel_after : {
      seconds : float;
      exn: exn;
      bt : Printexc.raw_backtrace;
      computation : 'a Computation.t;
    }
      -> unit Effect.t
  | Current : t Effect.t
  | Yield : unit Effect.t
  | Spawn : {
      fiber : Fiber.t;
      main : (Fiber.t -> unit);
    }
      -> unit Effect.tA scheduler must handle those effects as specified in the Picos documentation.
The Picos interface does not, in particular, dictate which ready fibers a scheduler must run next and on which domains. Picos also does not require that a fiber should stay on the domain on which it was spawned. Abstractions implemented against the Picos interface should not assume any particular scheduling.
Picos actually comes with a randomized multithreaded scheduler, that, after handling any of the effects, picks the next ready fiber randomly. It has proven to be useful for testing that abstractions implemented in Picos do not make invalid scheduling assumptions.
When a concurrent abstraction requires a particular scheduling, it should primarily be achieved through the use of synchronization abstractions like when programming with traditional threads. Application programs may, of course, pick specific schedulers.
Status and results
We have an experimental design and implementation of the core Picos interface as illustrated in the previous section. We have also created several Picos compatible sample schedulers. A scheduler, in this context, just multiplexes fibers to run on one or more system level threads. We have also created some sample higher-level scheduler agnostic libraries Implemented in Picos. These libraries include a library for resource management, a library for structured concurrency, a library of synchronization primitives, and an asynchronous I/O library. The synchronization library and the I/O library intentionally mimic libraries that come with the OCaml distribution. All of the libraries work with all of the schedulers and all of these elements are interoperable and entirely opt-in.
What is worth explicitly noting is that all of these schedulers and libraries are small, independent, and highly modular pieces of code. They all crucially depend on and are decoupled from each other via the core Picos interface library. A basic single threaded scheduler implementation requires only about 100 lines of code (LOC). A more complex parallel scheduler might require a couple of hundred LOC. The scheduler agnostic libraries are similarly small.
Here is an example of a concurrent echo server using the scheduler agnostic libraries provided as samples:
let run_server server_fd =
  Unix.listen server_fd 8;
  Flock.join_after begin fun () ->
    while true do
      let@ client_fd = instantiate Unix.close @@ fun () ->
        Unix.accept ~cloexec:true server_fd |> fst
      in
      Flock.fork begin fun () ->
        let@ client_fd = move client_fd in
        Unix.set_nonblock client_fd;
        let bs = Bytes.create 100 in
        let n =
          Unix.read client_fd bs 0 (Bytes.length bs)
        in
        Unix.write client_fd bs 0 n |> ignore
      end
    done
  endThe Unix module is provided by the I/O library. The operations on file descriptors on that module, such as accept, read, and write, use the Picos interface to suspend fibers allowing other fibers to run while waiting for I/O. The Flock module comes from the structured concurrency library. A call of join_after returns only after all the fibers forked into the flock have terminated. If the main fiber of the flock is canceled, or any fiber within the flock raises an unhandled exception, all the fibers within the flock will be canceled and an exception will be raised on the main fiber of the flock. The let@, finally, and move operations come from the resource management library and allow dealing with resources in a leak-free manner. The responsibility to close the client_fd socket is moved from the main server fiber to a fiber forked to handle that client.
We should emphasize that the above is just an example. The Picos interface should be both expressive and efficient enough to support practical implementations of many different kinds of concurrent programming models. Also, as described previously, the Picos interface does not, for example, internally implement structured concurrency. However, the abstractions provided by Picos are designed to allow structured and unstructured concurrency to be Implemented in Picos as libraries that will then work with any Picos compatible scheduler and with other concurrent abstractions.
Finally, an interesting demonstration that Picos really fundamentally is an interface is a prototype Picos compatible direct style interface to Lwt. The implementation uses shallow effect handlers and defers all scheduling decisions to Lwt. Running a program with the scheduler returns a Lwt promise.
Future work
As mentioned previously, Picos is still an ongoing project and the design is considered experimental. We hope that Picos soon matures to serve the needs of both the commercial users of OCaml and the community at large.
Previous sections already touched a couple of updates currently in development, such as the support for finalizing resources stored in FLS and the development of Concurrent ML style abstractions. We also have ongoing work to formalize aspects of the Picos interface.
One potential change we will be investigating is whether the Computation abstraction should be simplified to only support cancelation.
The implementation of some operations, such as Fiber.current to retrieve the current fiber proxy identity, do not strictly need to be effects. Performing an effect is relatively expensive and we will likely design a mechanism to store a reference to the current fiber in some sort of local storage, which could significantly improve the performance of certain abstractions, such as checked mutexes, that need to access the current fiber.
We also plan to develop a minimalist library for spawning threads over domains, much like Moonpool, in a cooperative manner for schedulers and other libraries.
We also plan to make Domainslib Picos compatible, which will require developing a more efficient non-effects based interface for spawning fibers, and investigate making Eio Picos compatible.
We also plan to design and implement asynchronous IO libraries for Picos using various system call interface for asynchronous IO such as io_uring.
Finally, Picos is supposed to be an open ecosystem. If you have feedback or would like to work on something mentioned above, let us know.
Motivation
There are already several concrete effects-based concurrent programming libraries and models being developed. Here is a list of some such publicly available projects:*
- Affect — "Composable concurrency primitives with OCaml effects handlers (unreleased)",
- Domainslib — "Nested-parallel programming",
- Eio — "Effects-Based Parallel IO for OCaml",
- Fuseau — "Lightweight fiber library for OCaml 5",
- Miou — "A simple scheduler for OCaml 5",
- Moonpool — "Commodity thread pools for OCaml 5", and
- Riot — "An actor-model multi-core scheduler for OCaml 5".
All of the above libraries are mutually incompatible with each other with the exception that Domainslib, Eio, and Moonpool implement an earlier interoperability proposal called domain-local-await or DLA, which allows a concurrent programming library like Kcas* to work on all of those. Unfortunately, DLA, by itself, is known to be insufficient and the design has not been universally accepted.
By introducing a scheduler interface and key libraries, such as an IO library, implemented on top of the interface, we hope that the scarce resources of the OCaml community are not further divided into mutually incompatible ecosystems built on top of such mutually incompatible concurrent programming libraries, while, simultaneously, making it possible to experiment with many kinds of concurrent programming models.
It should be technically* possible for all the previously mentioned libraries, except Miou, to
- be made Picos compatible, i.e. to handle the Picos effects, and
- have their elements implemented in Picos, i.e. to make them usable on other Picos-compatible schedulers.
Please read the reference manual for further information.