package flux
Install
dune-project
Dependency
Authors
Maintainers
Sources
sha256=ad74df51aaf796f4ed14f56296380a3aa795a8d6f9217b6383c9ac833ff334fc
sha512=4d03ef0130a0df993dd3e1c879c45162e57f48c2535f115f23e572214f365a7a052056c89661b4d7bc198209c0adf57dae6f96ea82b6b5c5f30223b653a29f5c
doc/flux/Flux/index.html
Module FluxSource
Flux is a library that provides an interface for manipulating streams with the Miou scheduler. We strongly recommend that you familiarise yourself with Miou and read our tutorial.
Sources.
Sources are decoupled producer of values.
Elements are pulled from a source when needed. A source can have an internal state that will be lazily initialized when (and if) a consumer requests elements. The internal state will be safety disposed when the source runs out of elements, when the consumer terminates, or if an exception is raised at any point in the streaming pipeline.
Sources are a great way to define decoupled producers that can be consumed with Stream.from. Sources are single shot and will have their input exhausted by most operations. Consider buffering sources if you need to reuse their input.
The following example creates a source that counts down to zero:
# let countdown n =
let init () = n
and pull = function
| 0 -> None
| n -> Some (n, n - 1)
and stop = Fun.id in
Flux.Source.Source { init; pull; stop }
;;
# Stream.(from (countdown 3) |> into Sink.sum) ;;
- : int = 6Sinks.
Sinks are decoupled consumer of values.
Sinks are streaming abstractions that consume values and produce an aggregated value as a result. The result value is extracted from an internal state that is built incrementally. The internal state can acquire resources that are guaranteed to be terminated when the sink is filled.
Sinks are a great way to define decoupled consumers that can be filled with Stream.into.
The following example demonstrates a sink that consumes all elements into a list:
let list =
let init () = []
and push acc x = x :: acc
and stop acc = List.rev acc
and full _ = false in
Flux.Stream.Sink { init; push; full; stop }Sinks are independent from sources and streams. You can think of them as packed arguments for folding functions with early termination.
Flows.
Flows are decoupled transformers of values.
Flows define streaming transformation, filtering or grouping operations that are fully disconnected from input and output. Their implementation intercepts an internal folding function and modifies the input one value at a time.
Flows are great way to define decoupled transformations that can be used with Stream.via.
A flow can be applied to a stream with Stream.via:
# Stream.range 10 100
|> Stream.via (Flow.map (fun x -> x + 1))
|> Stream.into Sink.sum
- : int = 4995Flows can also be composed to form a pipeline:
# let a = Flow.map (fun x -> x + 1) in
let b = Flow.filter (fun x -> x mod 2 = 0) in
Stream.range 10 100
|> Stream.via Flow.(a >> b)
|> Stream.into Sink.sum
- : int = 2475Stream transformers that consume values of type 'a and produce values of type 'b.
Streams.
Streams combine sources, sinks and flows into a flexible streaming toolkit.
Stream is a purely functional abstraction for incremental, push-based, sequential processing of elements. Streams can be easily and efficiently transformed and concatenated.
Stream operations do not leak resources. This is guaranteed in the presence of early termination (when not all stream elements are consumed) or in case of exceptions in the streaming pipeline.
Streams are built to be compatible with sources, sinks and flows. To create a stream that produces all elements from a source use Stream.from. to consume a stream with a sink use Stream.into and to transform stream elements with a flow use Stream.via. For more sophisticated pipelines that might have source leftovers, Stream.run can be used.
Type for streams with elements of type 'a.