package miou

  1. Overview
  2. Docs

Merkle tree and parallelism.

Miou allows you to perform tasks in parallel. This short tutorial will introduce 2 concepts offered by Miou:

  • how to launch and obtain the results of parallel tasks
  • how to optimise Miou according to the type of application.

The first problem relates more generally to parallel programming, while the second relates more specifically to Miou and its round-robin scheduler.

We would also like to make it clear that "not everything is parallel". In the case of parallelism, ensuring that 2 tasks can agree on a specific memory state requires synchronisation mechanisms which can be costly.

The key question is not whether or not you can parallelise, but at what level you should parallelise and whether it's worth it.

Definitions of terms.

The pool of domains.

Miou has a very basic design as far as parallelisation is concerned: it will allocate several domains (depending on your number of cores) and put them on standby for tasks. So, a call to Miou.call corresponds to sending a task to these domains. Finally, one of these domains will take charge of the task and run it in parallel with dom0, which runs your main code.

This is called a domain pool. It's a fairly basic design that has the advantage of taking care of domain allocation and management (as well as synchronisation) while allowing the user to run a task in parallel.

Fork & Join.

A recurring pattern in parallel programming the fork and join. It consists of "divide and conquer". It is often possible to divide a long task into several smaller tasks and then implement a way of merging the results of these smaller tasks to obtain the expected final result.

This is perhaps the simplest pattern to use and one that can be applied to a whole host of basic problems. Miou drives the user to use such a pattern via the Miou.parallel function.

The dom0.

Miou allocates and makes available domains which expect tasks to run in parallel with dom0. dom0 is the main domain on which your program runs - it is allocated by the system.

dom0 is different from the other domains in that it is the domain that considers the termination of your program. The other domains have neither the possibility nor the ability to stop your program. They do, however, have the prerogative to stop if dom0 so wishes.

In this sense, dom0 cannot receive tasks from other domains. A parallel task (launched by Miou.call_cc or Miou.parallel) will never run on dom0. However, dom0 can launch concurrent tasks (via Miou.call_cc).

A Merkle-tree.

Our objective is quite simple:

  • obtain the hash of a file
  • obtain the hash of a folder which corresponds to the composition of the hashes of the sub-folders and files in that folder

Let's start by obtaining the hash of a file:

module Hash = Digestif.SHA1

let hash_of_blob filename =
  let ic = open_in filename in
  let ln = in_channel_length ic in
  let rec go buf ctx =
    match input ic buf 0 (Bytes.length buf) with
    | 0 | (exception End_of_file) -> Hash.get ctx
    | len ->
        let ctx = Hash.feed_bytes ctx buf ~len in
        go buf ctx
  in
  let ctx = Hash.empty in
  let str = Fmt.str "blob %d\000" ln in
  let ctx = Hash.feed_string ctx str in
  let res = go (Bytes.create 0x1000) ctx in
  close_in ic; res

This code basically calculates the hash of a file "à la Git" and returns the result. The calculation for a directory is a little more complex, and consists of using Sys.readdir, sorting the result and calculating the hash for all the items in the directory, then serialising (merging?) these hashes in a certain form and calculating the hash of this serialisation:

let ( / ) = Filename.concat

let rec hash_of_tree filename =
  let entries = Sys.readdir filename in
  let entries =
    List.map
      (fun v ->
        let filename = filename / v in
        if Sys.is_directory filename then (`Dir, filename)
        else (`Normal, filename))
      (List.sort String.compare (Array.to_list entries))
      (* sort and recognize if it's a file or a directory. *)
  in
  hash_of_entries entries

and hash_of_entries entries =
  let entries =
    (* compute the hash of all items *)
    List.map
      (function
        | `Dir, filename ->
            let name = Filename.basename filename in
            let hash = hash_of_tree filename in
            Fmt.str "40000 %s\000%s" name (Hash.to_raw_string hash)
        | `Normal, filename ->
            let name = Filename.basename filename in
            let hash = hash_of_blob filename in
            Fmt.str "100644 %s\000%s" name (Hash.to_raw_string hash))
      entries
  in
  let ctx = Hash.empty in
  let len = List.fold_left (fun acc str -> acc + String.length str) 0 entries in
  (* serialisation *)
  let str = Fmt.str "tree %d\000" len in
  let ctx = Hash.feed_string ctx str in
  let ctx =
    List.fold_left (fun ctx str -> Hash.feed_string ctx str) ctx entries
  in
  Hash.get ctx

Let's run!

Finally, we just need a final function to handle the user's argument, such as:

let () =
  match Sys.argv with
  | [| _; filename |] when Sys.file_exists filename ->
      if Sys.is_directory filename then
        let hash = hash_of_tree filename in
        Format.printf "%a\n%!" Hash.pp hash
      else
        let hash = hash_of_blob filename in
        Format.printf "%a\n%!" Hash.pp hash
  | [| _; filename |] ->
      Format.eprintf "%s: %s not found\n%!" Sys.argv.(0) filename
  | _ -> Format.eprintf "%s <filename>\n%!" Sys.argv.(0)

We can compile the program with:

$ ocamlfind opt -linkpkg -package fmt,digestif,digestif.c main.ml
$ ./a.out $PWD
aa146b5524a5c7ed221efda5382beeabcbc58d54

Parallelisation.

First iteration.

If you followed our explanation of patterns for parallel programming, you can already see where we might use Miou.call rather than sequentially executing the code. We're talking, of course, about the List.map in our hash_of_entries function. We could return promises (fork) rather than the result as such for each item. Finally, we could use Miou.await_all (join) as our synchronisation point. So let's make the change accordingly:

and hash_of_entries entries =
  let entries =
    List.rev_map
      (function
        | `Dir, filename ->
            Miou.call @@ fun () -> (* Add a [call] here! *)
            let name = Filename.basename filename in
            let hash = hash_of_tree filename in
            Fmt.str "40000 %s\000%s" name (Hash.to_raw_string hash)
        | `Normal, filename ->
            Miou.call @@ fun () -> (* Add a [call] here! *)
            let name = Filename.basename filename in
            let hash = hash_of_blob filename in
            Fmt.str "100644 %s\000%s" name (Hash.to_raw_string hash))
      entries
  in
  let entries =
    List.rev_map
      (function Ok str -> str | Error exn -> raise exn)
      (Miou.await_all entries)
  in
  let ctx = Hash.empty in
  let len = List.fold_left (fun acc str -> acc + String.length str) 0 entries in
  let str = Fmt.str "tree %d\000" len in
  let ctx = Hash.feed_string ctx str in
  let ctx =
    List.fold_left (fun ctx str -> Hash.feed_string ctx str) ctx entries
  in
  Hash.get ctx

Note the double use of List.rev_map instead of List.map, which provides a tail-rec function.

Second iteration.

We have just applied our "fork and join" pattern where we divide the calculation of the hashes of our entries using Miou.call_cc and we finish by: 1) waiting for the results of all these tasks (Miou.await_all) 2) merge the results to obtain a final hash

However, Miou is quite precise when it comes to assigning tasks in parallel to domains - it does so randomly! So it may happen that one of the domains has more tasks than another.

If we want to have a fair distribution of tasks across domains, we should use Miou.parallel:

and perform = function
  | `Dir, filename ->
      let name = Filename.basename filename in
      let hash = hash_of_tree filename in
      Fmt.str "40000 %s\000%s" name (Hash.to_raw_string hash)
  | `Normal, filename ->
      let name = Filename.basename filename in
      let hash = hash_of_blob filename in
      Fmt.str "100644 %s\000%s" name (Hash.to_raw_string hash))

and hash_of_entries entries =
  let entries = Miou.parallel perform entries in
  let ctx = Hash.empty in
  let len = List.fold_left (fun acc str -> acc + String.length str) 0 entries in
  let str = Fmt.str "tree %d\000" len in
  let ctx = Hash.feed_string ctx str in
  let ctx =
    List.fold_left (fun ctx str -> Hash.feed_string ctx str) ctx entries
  in
  Hash.get ctx

Note that parallel starts and waits for tasks. Thus, the use of await_all is no longer necessary.

Third iteration.

There is one final problem with our code. If you've read our definitions carefully, you'll see that dom0 does nothing but wait. This isn't always a problem because dom0 could be busy doing something other than waiting, but in this specific application, all we're doing is trying to calculate the hashes.

So it can be interesting to involve dom0 in the work. It is also said that, even if dom0 cannot be assigned by the other domains to do a task in parallel, it can still do concurrently.

As far as our application is concerned, we are essentially doing 2 things, one of which may take some time:

  1. we 'traverse' the structure of our folders (by going into sub-folders and sub-sub-folders)
  2. we calculate the file hash

The task that takes the longest is the one we would parallelise (to share the time). The other task could be assigned to our dom0! In this way, the traversal could be taken by dom0 and the rest in parallel:

and perform = function
  | `Dir, filename ->
      Miou.call_cc @@ fun () ->
      let name = Filename.basename filename in
      let hash = hash_of_tree filename in
      Fmt.str "40000 %s\000%s" name (Hash.to_raw_string hash)
  | `Normal, filename ->
      Miou.call @@ fun () ->
      let name = Filename.basename filename in
      let hash = hash_of_blob filename in
      Fmt.str "100644 %s\000%s" name (Hash.to_raw_string hash))

and hash_of_entries entries =
  let entries = List.rev_map perform entries in
  let entries =
    List.rev_map
      (function Ok str -> str | Error exn -> raise exn)
      (Miou.await_all entries)
  in
  let ctx = Hash.empty in
  let len = List.fold_left (fun acc str -> acc + String.length str) 0 entries in
  let str = Fmt.str "tree %d\000" len in
  let ctx = Hash.feed_string ctx str in
  let ctx =
    List.fold_left (fun ctx str -> Hash.feed_string ctx str) ctx entries
  in
  Hash.get ctx

As you can see, we've just returned to the previous code with the subtlety of using Miou.call_cc as soon as we need to traverse a directory.

It was also mentioned that the assignment of a task to a domain is random and that one domain may have more tasks than another. However, for a large number of tasks, we can consider that the distribution is normal - and that, roughly speaking, the tasks are assigned equitably.

Miou and large computations.

Pure and impure tasks.

The code we have just developed involves fairly limited system interaction (reading a folder or file) which does not, in this case, involve the use of Miou_unix.

In this regard, the tasks in our application can be considered pure in the sense that they do not emit a suspension effect that is necessary when waiting for a system event. With a little hindsight, this means that we can roughly calculate the time it will take our tasks to execute.

It can then be interesting to prioritise tasks in relation to what they depend on. For example, we could have 2 Miou.awaits, one for a task calculating the hash of a 1KB file, another for a task calculating the hash of a 1GB file. In this particular case, it would be more interesting to try the first Miou.await rather than the second (which would take longer).

This is task prioritisation and can be very useful for 'large computations'. However, Miou is more concerned with the development of system and network applications for which availability is more important (even if it means wasting time waiting for our second task rather than the first).

Involvement in performance.

The availability of tasks means that Miou can often attempt an Miou.await on a promise which associated task has not yet finished. We then waste some of our time observing, only to do nothing - and then move on to the next task.

This problem can be solved in 2 ways:

  1. don't use Miou and prefer a scheduler that implements task prioritisation (which may have other implications/flaws)
  2. modify Miou's behaviour

Quanta.

In truth, in our example, the real bottleneck slowing down our code is the number of quantas. In the case of our dom0, this limitation will consist of our domain launching a task (1 quanta) and then spending 1 quanta waiting for another task (which is certainly not finished) to finally reschedule the launch of a new task (1 quanta).

In other words, the launch of all our tasks is intermittently disrupted by unnecessary awaits... However, we definitely wanted to launch lots of tasks and then wait for all of them (Miou.await_all). But Miou makes sure that all the tasks (whether they are waiting and/or launching other tasks) consume the same number of quantas.

One way would be to allow a task, particularly the one we run on dom0, to consume enough quantas (and launch all our sub-tasks) without it having the intermittency of an Miou.await for the sake of fairness.

Tuning & results.

We're going to compare the results and to do this, we're simply going to try and calculate the hash of our ".opam/5.0.0":

dinosaure@turbine:~$ hyperfine --warmup=1 \
  'MIOU_QUANTA=1000000 ./miou_p.out ~/.opam/5.0.0' \
  './miou_s.out ~/.opam/5.0.0'
Benchmark 1: MIOU_QUANTA=1000000 ./miou_p.out ~/.opam/5.0.0
  Time (mean ± σ):     894.8 ms ±  25.3 ms    [User: 5394.3 ms, System: 1176.7 ms]
  Range (min … max):   852.1 ms … 939.8 ms    10 runs
 
Benchmark 2: ./miou_s.out ~/.opam/5.0.0
  Time (mean ± σ):      2.720 s ±  0.013 s    [User: 2.382 s, System: 0.334 s]
  Range (min … max):    2.702 s …  2.742 s    10 runs
 
Summary
  MIOU_QUANTA=1000000 ./miou_p.out ~/.opam/5.0.0 ran
    3.04 ± 0.09 times faster than ./miou_s.out ~/.opam/5.0.0

The first version "miou_p" is the on that uses Miou.call. The second "miou_s" corresponds to our first code (without parallelisation). The parallelised version is at least 3 times faster!

Note the use of MIOU_QUANTA, which allows you to specify the number of quanta that a task can consume. In this case, to avoid wasting time (checking "awaits" unnecessarily), we consider that a task can consume up to 1000000 quantas (and therefore produce 1000000 effects, which is equivalent to saying: to launch 1000000 tasks at once without interruption).

Conclusion.

This tutorial shows the subtleties that Miou can have with regard to parallel tasks. The second part also shows that Miou is not a scheduler that looks for the optimal order in which to execute tasks (considering, of factor, the intermittency of system events). Miou is more interested in the availability of tasks to be synchronised with system events than in prioritising tasks.

In this case, the example is not a system application: its interactions are not notified (quite rightly) to Miou - reading a file or folder is rarely blocking. Nonetheless, knowing the facts, it is still possible to obtain a high-performance application. The Miou.run function can be set so that tasks can consume more than one quanta.

This small example shows that the task execution management policy has an impact on the way an application is written. There is no one way of writing a scheduler and it is perhaps fairer to inform the user of the implications that our implementation may have in the development of applications (as we do in this tutorial) than to oversell a product.

OCaml

Innovation. Community. Security.