package obatcher

  1. Overview
  2. Docs
A Framework for building Batched Concurrent Data Structures

Install

Dune Dependency

Authors

Maintainers

Sources

obatcher-1.0.tbz
sha256=bad8af8223b14bd6d582e34eba90048d632f2ac611183e120d0faaeaefb7549e
sha512=cc8ede53c694abbb4aabb6f898e57057c6f2726411eb9f64b056652a0de7adf85432d55df5833d4555ccf03e681869ac0af218119c94f8577008ebd9e0601779

Description

Published: 13 Sep 2024

README

obatcher

OCaml design framework for building batch-parallel "services". Based on "Concurrent structures made easy" study. Whilst the paper was written with a focus on batched data structures, it is discovered that the mechanism can be generalized to work on any type of "service", where a "service" refers to any modular component of software that we interact with via API's.

Contents

Description

At it's core, obatcher is primarily an approach to designing efficient concurrent services. The key observation being that "processing a batch of a priori known operations in parallel is easier than optimising performance for a stream of arbitrary asynchronous concurrent requests". However, designing such services with API's that expects users to pass in an explicit batch (e.g. array or list) of operations is unergonomic and requires systems to be designed around this pattern. obatcher solves this by cleverly wrapping explicitly batched services and then use the scheduler to implicitly batch operations under the hood before passing it to the service. From the clients perspective, interfacing with the batched service looks like any other plain atomic request service.

Benefits of batch-parallel service design

Picos scheduler agnostic

obatcher depends on scheduler primatives to perform transformations on services. As a consequence, it suffers from portability issues across different schedulers. To account for this, obatcher is built on top of picos. Picos provides the low-level building blocks for writing schedulers. By using the same picos primatives to implement obatcher, any picos scheduler becomes compatible with obatcher.

Thread-safe

A defining invariant of batched services is that only a single batch of operations runs at any time. To guarantee this, obatcher adds an efficient lock-free queue in front of the service to collect operations in batches before submitting it to the service. This design takes inspiration from Flat-combining. The research shows that this synchronization method provides better scaling properties as compared to coarse-grained locking.

Batch optimization & Incremental parallelism

The benefit of taking a batch of operations as input is that this enables a whole slew optimizations based on the spread of operations. Furthermore, pre-analysis of operations can better advise how to add parallelism which can be evolved overtime rather than having to guarantee safety across all operations to the service.

Easy to test and reason about

Because services only handle single batches at any time, this makes it fairly easy to design tests that are reproducable just by fixing the same input batch.

Example usage

Concretely, obatcher library consist of 2 things. A signature of service and a functor against that signature. The service signature is simple but crucially expects users to implement their services to handle batches of operations. Services may then perform optimizations based on the spread of operations in the batch and furthermore leverge parallelism via primatives provided by the underlying picos scheduler to speed up processing.

module type Service = sig
  type t
  type cfg

  type 'a op
  type wrapped_op =
    | Mk : 'a op * 'a Picos.Computation.t -> wrapped_op
        (** [wrapped_op] binds the operation on the service with it's
        corresponding suspended continuation to run after its
        completion.  *)

  val init : ?cfg:cfg -> unit -> t
  val run : t -> wrapped_op array -> unit
end

A simple example of a counter that abides by this signature is

module BatchedCounter = struct
  type t = int ref
  type cfg = unit

  let init ?cfg:_ () = ref 0

  type _ op = Incr : unit op | Decr : unit op | Get : int op
  type wrapped_op = Mk : 'a op * 'a Computation.t -> wrapped_op

  let run (t : t) (ops : wrapped_op array) =
	Array.iter (function
	| Mk (Incr, comp) -> incr t; Computation.return comp ()
	| Mk (Decr, comp) -> decr t; Computation.return comp ()
    | Mk (Get, comp) ->  Computation.return comp !t)
end

The issue with this counter is that users now need to form explicit batches of requests. This can become incredibly complicated when software grows and is also a serious balancing act between optimizing for latency or for throughput. For that reason, we provide a functor that composes over your batched services to make batching work implicitly and return back to interfacing with the service with individual operations.

module Make : functor (S : Service) -> sig
  type t
  val init : ?cfg:S.cfg -> unit -> t

  val exec : t -> 'a S.op -> 'a
  (** [exec t op] is the API call for a singular operation on the
      service with operations being automatically batched before
      passed to the service *)
end

include Obatcher.Make (BatchedCounter)

let incr t = exec t Incr
let decr t = exec t Decr
let get t = exec t Get

Now running your program in a Picos scheduler, you have a thread-safe concurrent counter which synchronizes parallel requests and batches them before submitting them to the counter. Our batched counter is not very smart, it just processes requests in order like a Flat-combiner. To demonstrate some types of optimizations that we could do, we first realize that sequential consistency is preserved even if operations are reordered. In particular, we can say that all Get requests are processed at the beginning of the batch. For Incr and Decr, these operations are commutative which means we can use a parallel-for-reduce to gather the total change to the counter. As such we now have:

  let run (t : t) (ops : wrapped_op array) =
    let len = Array.length ops in
		let start = !t in
    let delta =
      Utils.parallel_for_reduce
        ~n_fibers:(Domain.recommended_domain_count () - 1)
        ~start:0 ~finish:(len - 1)
        ~body:(fun i ->
          match ops.(i) with
          | Mk (Incr, comp) ->
              Computation.return comp ();
              1
          | Mk (Decr, comp) ->
              Computation.return comp ();
              -1
          | Mk (Get, comp) ->
              Computation.return comp start;
              0)
        ( + ) 0
    in
    t := (start + delta)

Batching in the wild

Databases commonly use batching to service many small IO requests. In such cases, the IO bandwidth of the system ends up being under-utilized with many wasted cycles because of the latency for each request. Waiting to collect a batch of requests to send at one shot effectively amortizes the cost of performing each singular request.

Nagles's algorithm is the network equivalent of request batching. The algorithm solves the small packet problem by batching packets before sending them out. This is used in many efficient TCP/IP networks.

With spectre and meltdown mitigations, the cost of each syscall context switch has increased. The new asynchronous interface in Linux - io-uring uses batch-processing help applications to reduce the overhead of system calls. Uring allows applications to queue up multiple IO requests for the kernel to perform and post them at one go with a singular system call. See examples/uring for an example of how we can use obatcher to wrap uring to get implicit batching.

obatcher is based on "batch parallel data structures (BPDS)". Unlike typical concurrent structures that use locks or careful ordering to prevent data races, BPDS specify that only one batch runs at a time. Synchronization of parallel operations are performed upon entry to the BPDS. Requests that are made when the BPDS is busy are sent to form a batch that runs next.

Results

Our approach here is new with unfortunately few benchmarks. However, obatcher is designed almost identically to the one described in "Concurrent structures made easy". You can see from the results in the paper that batched structures scales much better than those where threads race for mutual exclusion.

Dependencies (6)

  1. picos_mux
  2. picos_aux
  3. picos_std
  4. picos >= "0.5.0"
  5. ocaml >= "5.0.0"
  6. dune >= "3.9"

Dev Dependencies (6)

  1. odoc with-doc
  2. logs with-test
  3. alcotest with-test
  4. qcheck-alcotest with-test
  5. qcheck with-test
  6. containers with-test

Used by

None

Conflicts

None

OCaml

Innovation. Community. Security.