package flux

  1. Overview
  2. Docs

Flux, a streaming library.

Flux is a library that draws heavily on Conduit (in Haskell). The aim is to define a pipeline that:

  1. consumes elements 'a from a producer (Flux.source)
  2. transforms the producer's elements into other elements (Flux.flow)
  3. manipulates these elements to generate a result 'r (Flux.sink)
     'a source
  -> ('a, 'b) flow
  -> ('b, 'c) flow
  -> ('c, 'r) sink

A fundamental idea behind Flux is that it is a pull-based library, meaning that the pipeline only operates when you want to manipulate elements from a source. This differs from a push-based approach, where the appearance of a new element always triggers the execution of our pipeline.

In other words, in a pull-based approach, you have the ability to say stop and stop consuming a source. In a push-based approach, you cannot say stop.

let push_based_pipeline pipeline = on_http_requests @@ fun req -> pipeline req

let pull_based_pipeline stop acc pipeline = match next () with
  | Some req ->
    let acc = pipeline req acc in
    if not stop then pull_based_pipeline stop acc pipeline
  | None -> acc

This approach allows us to exercise fairly fine-grained control over sources (particularly if they are associated with resources that we need to release, such as file descriptors). In particular, resources can be associated with a finally in OCaml terms using Fun.protect or in Miou terms using Miou.Ownership (and thus properly handle the case of cancelling a task in possession of such a resource).

A simple example.

Let's take a fairly simple example to show the composition proposed by Flux:

let only_error = String.starts_with ~prefix:"[error]"

let () =
  Miou_unix.run @@ fun () ->
  let open Flux in
  let from = Source.file ~filename:"data.log" in
  let split_on_newline = Flow.split_on_char '\n' in
  let filter_on_error = Flow.filter only_error in
  let add_newline = Flow.map (fun x -> x ^ "\n") in
  let via = Flow.(split_on_newline << filter_on_error << add_newline) in
  let into = Sink.file ~filename:"error.log" in
  let (), leftover = Stream.run ~from ~via ~into in
  Option.iter Source.dispose leftover

In this fairly simple example, we consider a source called "data.log". We want to stream the content line by line (using split_on_char), filter out errors (using filter and only_error), add '\n' (since we just removed it), and write the stream to a file called "error.log".

The actual execution of our pipeline (from, via, and into) is done with Flux.Stream.run. It is possible (but not the case here) that our pipeline does not consume our entire "data.log" file. A leftover may be returned and can be properly released using Flux.Source.dispose.

What is interesting in our example is the composition of our Flux.flows using the Flux.Flow.(<<) operator. This operator allows us to chain several transformations so that we start with a simple series of strings and end up with lines that notify us of errors.

A sink.

Before introducing you to flow composition, we will look at the Flux.sink type, which allows you to merge the elements of a stream into a single result. For example, we would like to obtain the SHA256 hash of a stream of strings. To do this, using digestif, we would need to:

let sha256sum =
  let open Digestif in
  let init = Fun.const SHA256.empty
  and push ctx str = SHA256.feed_string ctx str
  and full = Fun.const false
  and stop = SHA256.get in
  Flux.Sink { init; push; full; stop }

let is_regular filename =
  Sys.file_exists filename && not (Sys.is_directory filename)

let () =
  Miou_unix.run @@ fun () ->
  let via = Flux.Flow.identity in
  let into = sha256sum in
  let from, filename =
    match Sys.argv with
    | [| _; filename |] when is_regular filename ->
        (Flux.Source.file ~filename 0x7ff, filename)
    | [| _ |] -> (Flux.Source.in_channel stdin, "-")
    | _ -> Fmt.failwith "%s [<filename>]" Sys.executable_name
  in
  let hash, leftover = Flux.Stream.run ~from ~via ~into in
  Option.iter Flux.Source.dispose leftover;
  Fmt.pr "%a  %s\n%!" Digestif.SHA256.pp hash filename

A Flux.sink therefore needs four functions:

  1. a function to initialise an internal state init (of type 's)
  2. a function push that receives elements (of type 'a) one by one
  3. a function that signals when our sink is full
  4. a final function stop that constructs the result (of type 'r) according to the internal state 's

The above example clearly shows that the source is also completely decoupled from our sha256sum process. Whether the source comes from a file or stdin makes no difference to our pipeline. More generally, any source that outputs strings (such as a list of strings, an array, a socket, etc.) can be used with sha256sum.

A Flux extension exists and implements various hash algorithms. These sinks are available with the fluxt.hash library and the Flux_hash module.

A full sink.

A Flux.sink can respond during pipeline execution that it is full and thus stop consuming the source being used. The advantage of such a mechanism is that it allows us to build a result without having to consume our entire source. In this case, we could reimplement the head command in this way:

let n = ref 10
let filename = ref None
let usage = Fmt.str "%s [-n <NUM>] [FILE]" Sys.executable_name

let is_regular filename =
  Sys.file_exists filename && not (Sys.is_directory filename)

let anon str = if is_regular str then filename := Some str

let args =
  [ ("-n", Arg.Set_int n, "Print the first NUM lines instead of the first 10") ]

let () =
  Miou_unix.run @@ fun () ->
  Arg.parse args anon usage;
  let from =
    match !filename with
    | Some filename -> Flux.Source.file ~filename 0x7ff
    | None -> Flux.Source.in_channel stdin
  in
  let via = Flux.Flow.split_on_char '\n' in
  let into = Flux.Sink.buffer !n in
  let lines, leftover = Flux.Stream.run ~from ~via ~into in
  Option.iter Flux.Source.dispose leftover;
  let from = Flux.Source.array lines in
  let via = Flux.Flow.map (fun x -> x ^ "\n") in
  let into = Flux.Sink.out_channel stdout in
  let (), leftover = Flux.Stream.run ~from ~via ~into in
  Option.iter Flux.Source.dispose leftover

In this example, our first leftover, if the source contains more lines than we would like to keep, will contain the rest of our source (and we will apply Flux.Source.dispose in order to correctly free up the file descriptor used). The implementation of Flux.Sink.buffer alerts us, via its full function, if our result is full: in this case, if our array has been completely filled:

let buffer len =
  if len < 0 then invalid_arg "buffer: negative buffer size";
  if len = 0 then Flux.Sink.fill [||]
  else
    let buf = Array.make len None in
    let init () = 0
    and push idx x = Array.set buf idx (Some x); idx + 1
    and full idx = idx = len (* our array is completely filled! *)
    and stop len = Array.init len (fun idx -> Option.get buf.(idx)) in
    Flux.Sink { init; push; full; stop }

So now we know the basics about the Flux.sink type. We can now look at the Flux.flow type and its composition.

A flow.

As we saw in our first example, the Flux.flow type has the advantage of combining well with other flows, allowing us to chain several transformations so that our elements initially have a type 'a and are transformed into elements of type 'b.

type ('a, 'b) flow

val compose : ('a, 'b) flow -> ('b, 'c) flow -> ('a, 'c) flow

Specifically, a Flux.flow is a particular composition between a sub-Flux.sink and a new sink performing the transformation. In other words, we must implement a Flux.sink (as we did before) that will have to push the transformed elements to our sub-sink. Let us take the example of a simple transformation that adds 1 to the numbers received.

let succ =
  let flow (Sink k) =
    let init () = k.init ()
    and push acc n = k.push acc (n + 1)
    and full acc = k.full acc
    and stop acc = k.stop acc in
    Sink { init; push; full; stop } in
  { Flux.flow }

let () =
  let open Flux in
  let from = Source.list [0; 1; 2] in
  let into = Sink.list in
  let lst, _ = Stream.run ~from ~via:succ ~into in
  assert (lst = [1;2;3])

As we can see, we can clearly see the + 1 transformation in our push function. The result of this transformation will be pushed to our sub-sink k (using k.push) and we will keep the internal state of our sub-sink (in the example, this is our acc value) throughout our flow. We can, of course, extend the internal state to much more than acc (and have a state that ultimately overlaps with acc).

This example helps us understand what a flow essentially is: a cascade of Flux.sinks.

A flow is simply a new Flux.sink that performs a transformation (in the push function) and repushes the result of this transformation into its sub-sink k. The init function must essentially initialise the internal state of the sub-sink k as well as possibly an internal state of our current Flux.sink. The stop function simply returns what the sub-sink k has to return.

Composition & full.

There is one last function that a Flux.flow must define: the full function. This function is interesting because even though by default we would like to know if our sub-sink k is full (and we should systematically check this), a Flux.flow can preempt the result of full and consider itself full even if the sub-sink k is not.

In this case, in the example of our head command, we unfortunately had to store the first lines of our stream in an array (our first Stream.run) and then transform this result into a source that we would like to display (via our second Stream.run).

It should be noted that what allows us to keep the first n lines of a stream is Flux.Sink.buffer. Since a Flux.flow is just a composition of sinks, could we instead define a Flux.flow that bufferises the first n lines and then displays them directly?

let buffer len =
  if len < 0 then invalid_arg "buffer: negative buffer size";
  let flow (Flux.Sink k) =
    let init () = (k.init (), 0)
    and push (acc, idx) x =
      if idx = len then (acc, idx) else (k.push acc x, idx + 1)
    and full (acc, idx) = k.full acc || idx = len
    and stop (acc, _) = k.stop acc in
    Flux.Sink { init; push; full; stop }
  in
  { Flux.flow }

let () =
  Miou_unix.run @@ fun () ->
  Arg.parse args anon usage;
  let open Flux in
  let from =
    match !filename with
    | Some filename -> Source.file ~filename 0x7ff
    | None -> Source.in_channel stdin
  in
  let via = Flow.(split_on_char '\n' << buffer !n << map (fun x -> x ^ "\n")) in
  let into = Flux.Sink.out_channel stdout in
  let (), leftover = Flux.Stream.run ~from ~via ~into in
  Option.iter Flux.Source.dispose leftover

What is interesting in this example is the full function, which not only checks that our sub-sink k is not full, but also that our current Flux.sink is not full according to the number of elements we would like to keep: k.full acc || idx = len. This is how a Flux.flow can preempt its fullness on top of its sub-sink k.

We have just optimised the memory consumption of our head command! We now only need a single Flux.Stream.run and to compose our flows together using the Flux.Flow.(<<) operator.

As we can see, what is fundamentally interesting about Flux.flows is the ability to define a series of flows in order to transform one type of content into another. The first advantage of the Flux.flow type is that it can keep an internal state in order to perform this transformation. The other advantage of the Flux.flow type is that the arrival of a new element can be decoupled from the production of a transformed element: in other words, a new element in our Flux.source does not necessarily imply the appearance of a new transformed element.

This is particularly the case for Flux.Flow.split_on_char, for example, which requires us to bufferise the content in order to split it as soon as the desired character appears (in our example, '\n').

Composition and multiple elements.

A Flux.flow may generate several elements for a single received element. In this case, these elements must be sent to our sub-sink k. However, adding even a single element can fill up our sub-sink k. This is another aspect of Flux.sink composition: systematically checking that the sub-sink k is not full after pushing a new element. Let's take the example of a Flux.flow that repeats elements:

let repeat n =
  if n < 0 then invalid_arg "repeat: negative repeat number";
  let flow (Flux.Sink k) =
    let init () = k.init () in
    let rec push acc x =
      assert (not (full acc));
      let rec go acc = function
        | 0 -> acc
        | _ when k.full acc -> acc
        | n -> go (k.push acc x) (n - 1)
      in
      go acc n
    and full acc = k.full acc in
    let stop acc = k.stop acc in
    Flux.Sink { init; push; full; stop }
  in
  { Flux.flow }

There is indeed a pre-condition for our push function, which is that it is only called if we know that the current Flux.sink is not full (and the latter should not be full if the sub-sink k is not full either).

Then, all subsequent calls to k.push after the first k.push must check that our sub-sink k is not full. In this case, we can add several elements to our sub-sink k safely.

Since a Flux.flow is a cascade of Flux.sinks, and more specifically a cascade of pushes, the full function must be composed with the others in such a way that if at least one of the Flux.sinks is full, the Flux.flow resulting from the composition of several flows is also full: this is why, in the case of buffer, we used the || operator (if the current Flux.sink is full or if sub-sink k is full, the Flux.flow is full).

Concrete example with zlib.

Flux offers ready-made implementations for compressing and decompressing streams. Using decompress, the library fluxt.zl and the module Flux_zl offers two Flux.flows:

val deflate : cfg -> (Bstr.t, string) Flux.flow
val inflate : (Bstr.t, string) Flux.flow

The latter accept bigstrings and transmit strings (deflated or inflated). Although the use of bigstrings may be debatable, it is more convenient to have a source that transmits strings rather than bigstrings.

In this case, this is where we can take advantage of flow composition and, in particular, use Flux.Flow.bstr, which allows us to transform strings from a source into bigstrings that can then be processed by our deflation/inflation process. But for the sake of this tutorial, we will try to re-implement Flux.Flow.bstr.

Here is the most straightforward way to implement such a flow:

let to_bigstring =
  let flow (Flux.Sink k) =
    let init () = k.init ()
    and push acc str = k.push acc (Bstr.of_string str)
    and full acc = k.full acc
    and stop acc = k.stop acc in
    Flux.Sink { init; push; full; stop } in
  { Flux.flow }

One could even reduce this implementation to:

let to_bigstring = Flux.Flow.map Bstr.of_string

The disadvantage is that a new bigstring is allocated for each element, which means that malloc(3) is used each time. This function can slow down the entire pipeline. Another strategy would be to allocate only one bigstring and reuse it for each element:

let to_bigstring =
  let flow (Flux.Sink k) =
    let init () = (k.init (), Bstr.create 0x7ff, 0)
    and push (acc, bstr, dst_off) str =
      let rec go acc src_off dst_off =
        if src_off = String.length str
        then (acc, bstr, dst_off)
        else
          let rem_bstr = Bstr.length bstr - dst_off
          and rem_str = String.length str - src_off in
          let len = Int.min rem_bstr rem_str in
          Bstr.blit_from_string str ~src_off bstr ~dst_off ~len;
          if dst_off + len = Bstr.length bstr
          then let acc = k.push acc bstr in
               if k.full acc then (acc, bstr, 0)
               else go acc (src_off + len) 0
          else (acc, bstr, dst_off + len) in
        go acc 0 dst_off
      and full (acc, _, _) = k.full acc
      and stop (acc, bstr, dst_off) =
        if dst_off > 0 && not (k.full acc)
        then let bstr = Bstr.sub bstr ~off:0 ~len:dst_off in
             k.stop (k.push acc bstr)
        else k.stop acc in
      Flux.Sink { init; push; full; stop } in
    { Flux.flow }

This code is much better because it only allocates a single bigstring and reuses it once our sub-sink's k.push has consumed it. Note that our go function can attempt to pass the bigstring to our sub-sink k, but its recursion (and possibly pushing the bigstring again) is protected by k.full (in order to know if our sub-sink k is not full for the rest of the process).

Now we can compose!

let () =
  Miou_unix.run @@ fun () ->
  let zl =
    match Sys.argv with
    | [| _; "-d" |] -> Flux_zl.(deflate (config ()))
    | _ -> Flux_zl.inflate
  in
  let via = Flux.Flow.(to_bigstring << zl) in
  let from = Flux.Source.in_channel stdin in
  let into = Flux.Sink.out_channel stdout in
  let (), leftover = Flux.Stream.run ~from ~via ~into in
  Option.iter Flux.Source.dispose leftover

This results in the creation of a program capable of deflating/inflating (in zlib format) content. We can test our programme in this way:

$ ./zpipe -d < samle.txt > sample.z
$ ./zpipe < sample.z > out.txt
$ diff sample.txt out.txt
$ echo $?
0

A source.

There is one last element we haven't seen yet: Flux.sources. Until now, our code has been fairly sequential and our sources have more or less corresponded to files (or stdin). However, there may be times when we want a source from a more complicated location.

This is where Miou comes in. There may be sources that require asynchronous mechanics. Take, for example, a file that we want to obtain using the HTTP protocol. In this case, it is possible to create a source from a task (according to Miou's terminology) in order to retrieve the elements of our source and process these elements at the same time.

Among all the sources that Flux can offer you, there is one that is very interesting: the Flux.Source.with_task source.

It consists of creating a task (in Miou terms) that can be executed cooperatively or in parallel. A bounded queue will be transmitted to this task, and the objective will be to fill this bounded queue.

For more information on the bounded queue, please refer to the Flux.Bqueue module documentation, which provides some useful examples using Miou.

Fetch through HTTP.

To demonstrate the benefits of this source, we will attempt to download content using httpcats, a library that enables communication with an HTTP server. The objective is simply to download the content and save it to a file.

let ( let@ ) finally fn = Fun.protect ~finally fn
let ( >>= ) = Result.bind

let uri = ref None
let output = ref None
let anon str = uri := Some str
let usage = Fmt.str "%s -o <file> <uri>" Sys.executable_name

let is_redirection resp = Httpcats.Status.is_redirection resp.Httpcats.status
let or_failwith fn = function Ok v -> v | Error err -> failwith (fn err)

let args =
  [ ("-o", Arg.String (fun str -> output := Some str), "The output file") ]

let () =
  Miou_unix.run @@ fun () ->
  Arg.parse args anon usage;
  match (!uri, !output) with
  | None, _ | _, None -> Fmt.epr "%s\n%!" usage; exit 1
  | Some uri, Some output ->
      let rng = Mirage_crypto_rng_miou_unix.(initialize (module Pfortuna)) in
      let@ () = fun () -> Mirage_crypto_rng_miou_unix.kill rng in
      let from =
        Flux.Source.with_task ~size:0x7ff @@ fun q ->
        let fn _ _ resp () str =
          if not (is_redirection resp) then
            match str with
            | Some str -> Flux.Bqueue.put q str
            | None -> Flux.Bqueue.close q
        in
        Httpcats.request ~uri ~fn ()
        >>= (fun (_, ()) -> Ok ())
        |> or_failwith (Fmt.str "%a" Httpcats.pp_error)
      in
      let via = Flux.Flow.identity in
      let into = Flux.Sink.file ~filename:output in
      let (), leftover = Flux.Stream.run ~from ~via ~into in
      Option.iter Flux.Source.dispose leftover

In this code, there are several elements related to httpcats (notably rng) which we recommend you read the documentation for. But what is noteworthy is the use of Flux.Source.with_task. Inside the function, we perform our HTTP request and httpcats will execute the fn function, which aims to transmit what httpcats has successfully decoded into our queue.

It is important to understand that at this stage, the request has not yet been actually made! We are just describing a source. The source will then be actually executed when we use Flux.Stream.run.

We do not wish to make any changes (Flux.Flow.identity) and decide to save our source in a file using Flux.Sink.file. All we need to do is run our programme in this way:

$ ./fetch.exe -o index.tar.gz https://opam.ocaml.org/index.tar.gz
$ file index.tar.gz
index.tar.gz: gzip compressed data, from Unix

It should be noted that Flux.Source.dispose is particularly important here. This is because the source we are manipulating is a task ('a Miou.t), and one of Miou's fundamental rules is to never forget its tasks. We must therefore call our source's stop function to ensure that our task has been completed successfully.

Parallelism.

If you have taken the time to look at the Flux.Bqueue module, you will have noticed that it is domain-safe data-structure, meaning that it can be shared between multiple domains. It is fairly simple to parallelise this code, since we use the bounded-queue to transfer elements from our source to the rest of our pipeline.

In addition, Flux.Source.with_task can launch a task in parallel with the domain in which we are located; we simply need to specify the ~parallel:true option! Voilà!

Resources.

If you are familiar with Miou, you have probably seen the Miou.Ownership module. The purpose of this module is to associate a finaliser with a value so that if a task is cancelled or ends abnormally (for example, due to an exception being raised), Miou ensures that the finaliser will be executed.

This is particularly useful if you want to ensure that your file descriptors, for example, are closed at the end of a task.

This is why Flux offers Flux.Source.resource, which associates a finaliser with a given resource via Miou. This resource can be used to generate elements and will be released in all cases:

  1. the normal case where our pipeline has executed correctly
  2. the case where the execution of our pipeline has raised an exception
  3. the case where we attempt to Miou.cancel the task that executes our pipeline
let () =
  Miou_unix.run @@ fun () ->
  let socket = Miou_unix.tcpv4 () in
  let finally = Miou_unix.close in
  let buf = Bytes.create 0x7ff in
  let pull socket =
    match Miou_unix.read socket buf with
    | 0 -> None
    | len -> Some (Bytes.sub_string buf 0 len) in
  let from = Flux.Source.resource ~finally pull socket in
  ...

Composition of sinks.

There is one last composition that we have not yet seen, which consists of producing two different results for the same source (transformed or not).

For example, we would like to download a file using httpcats but also display a download bar to make the display a little fancier.

There is a library for creating download bars called progress. Once again, for more details on how to use this library, I recommend reading the documentation. The idea with Flux is very simple: we will create another Flux.sink that will display the bar according to the bytes transmitted by httpcats.

However, in order to display a bar, we need to know the size of the document we want to download. This information is only available via httpcats, and we need to find a way to transfer it to our Flux.sink, which will display the download bar.

Waiting state and Flux.

This is where we need to introduce a concept that is more related to Miou than to Flux, namely write-once variable or Miou.Computation.t. This value allows us to transmit a single piece of information from one task to another:

val create : unit -> 'a t
val try_return : 'a t -> 'a -> bool
val await_exn : 'a t -> 'a

We will therefore use this variable in our source to transmit the size of the document we are attempting to download:

let set_length length (resp : Httpcats.response) =
  let hdrs = resp.Httpcats.headers in
  let content_length = Httpcats.Headers.get hdrs "content-length" in
  let value = Option.bind content_length int_of_string_opt in
  ignore (Miou.Computation.try_return length value)

...

  let length = Miou.Computation.create () in
  let from =
    Flux.Source.with_task ~parallel:true ~size:0x7ff @@ fun q ->
    let fn _ _ resp () str =
      if not (is_redirection resp) then
        let () = set_length length resp in (* here, we transfer the length *)
        match str with
        | Some str -> Flux.Bqueue.put q str
        | None -> Flux.Bqueue.close q
    in
    Httpcats.request ~uri ~fn ()
    >>= (fun (_, ()) -> Ok ())
    |> or_failwith (Fmt.str "%a" Httpcats.pp_error)

Progress and Sink.

On the consumption side, we will need to define a process that, at the beginning, does not know how to display a download bar (since we do not yet know the size of the document). As soon as we receive the first chunk of the response, we will be able to obtain the size of the document we are downloading (because if we receive a chunk of the document, it means we have necessarily received the headers from the HTTP response).

let progress length =
  let open Progress in
  let config = Config.v ~ppf:Fmt.stdout () in
  let fn state str =
    match state with
    | Some (reporter, display) ->
        reporter (String.length str);
        Some (reporter, display)
    | None ->
        let length = Miou.Computation.await_exn length in
        let line = line length in
        let display = Multi.line line |> Display.start ~config in
        let[@warning "-8"] Reporter.[ reporter ] = Display.reporters display in
        reporter (String.length str);
        Some (reporter, display)
  in
  Flux.Sink.fold fn None

The line function in the code above is a function that constructs how we want to display the bar. It may happen that we do not know the size of the document, in which case we display a spinner.

let sizes = [| "B"; "KiB"; "MiB"; "GiB"; "TiB"; "PiB"; "EiB"; "ZiB"; "YiB" |]

let bytes_to_size ?(decimals = 2) ppf = function
  | 0 -> Fmt.string ppf "0 byte"
  | n ->
      let n = float_of_int n in
      let i = Float.floor (Float.log n /. Float.log 1024.) in
      let r = n /. Float.pow 1024. i in
      Fmt.pf ppf "%.*f %s" decimals r sizes.(int_of_float i)

let line total =
  let open Progress.Line in
  let style = if Fmt.utf_8 Fmt.stdout then `UTF8 else `ASCII in
  match total with
  | Some total ->
      let width = `Fixed 30 in
      let metric = bytes_to_size ~decimals:2 in
      list [ bar ~style ~width total; bytes; constf " / %a" metric total ]
  | None ->
      let frames = [ "⠋"; "⠙"; "⠹"; "⠸"; "⠼"; "⠴"; "⠦"; "⠧"; "⠇"; "⠏" ] in
      let spin = spinner ~frames () in
      list [ spin; bytes; bytes_per_sec ]

Composition!

Flux offers let-bindings that allow you to compose multiple sinks together. In this case, we would like to save the document to a file and display our download bar. We can then use Flux.Sink.Syntax.(let+) and Flux.Sink.Syntax.(and+) to combine multiple sinks and manipulate their results:

  let into =
    let open Flux.Sink.Syntax in
    let+ () = Flux.Sink.file ~filename:output
    and+ reporter_and_display = progress length in
    let display = Option.map snd reporter_and_display in
    Option.iter Progress.Display.finalise display
  in

There you go! If you test this program, you will see a download bar!

https://raw.githubusercontent.com/robur-coop/flux/refs/heads/main/assets/fetch.gif

The most attentive user of Miou and this tutorial might imagine that the actual application of saving the response in a file and displaying the download bar could be done in parallel. And they would be right!

The operator Flux.Sink.Syntax.(and+) corresponds to the Flux.Sink.zip function, but there is also the Flux.Sink.both function, which executes sinks in tasks that run in parallel (via Miou.call). However, we would like to warn users that Flux.Sink.both cannot be used with let-binding, as Miou's structured concurrency paradigm does not apply well in a chain of and+.

Flux, Miou and awaiting state.

One final note is necessary regarding the awaiting state that can be induced by Miou.Computation.await_exn and the actual execution of the Flux.sinks.

We could modify our progress function to directly attempt to obtain the size of the document we want to download. However, this implies that the creation of our progress Flux.sink depends on our length variable, which is only filled once the source begins to emit.

It has been mentioned that when you use Flux.Source.with_task, the code that must be executed by the task is only executed during Flux.Stream.run (and not before). The problem is that Flux.Stream.run needs the value into, which comes from the composition of our two sinks, but one of which, progress, should only be created if and only if the length variable is filled.

So, if we wanted to create our progress Flux.sink based on the length variable, we would end up with a deadlock because length would never be filled since the Flux.source would never be consumed because the into value does not exist yet.

This is why we perform our Miou.Computation.await_exn during the actual execution (in the fn function used by Flux.Sink.fold) of our Flux.sink progress (and not before).