Library
Module
Module type
Parameter
Class
Class type
Streaming abstractions that combine, transform and reduce large amounts of sequential data efficiently, in constant space and without leaking resources.
Sources are decoupled producer of values.
Elements are pulled from a source when needed. A source can have an internal state that will be lazily initialized when (and if) a consumer requests elements. The internal state will be safely disposed when the source runs out of elements, when the consumer terminates, or if an exception is raised at any point in the streaming pipeline.
Sources are a great way to define decoupled producers that can be consumed with Stream.from
. To learn more about how to create sources see "Creating a source".
The following example creates a source that counts down to zero:
let countdown n =
let init () = n in
let pull i =
if i = 0 then None
else Some (i, i - 1) in
Source.make ~init ~pull ()
It can be consumed with:
# Stream.(from (countdown 3) |> into Sink.sum)
- : int = 6
module Source : sig ... end
Module with defintions for sources.
Sinks are decoupled consumer of values.
Sinks are streaming abstractions that consume values and produce an aggregated value as a result. The result value is extracted from an internal state that is built incrementally. The internal state can aquire resources that are guaranteed to be terminated when the sink is filled.
Sinks are a great way to define decoupled consumers that can be filled with Stream.into
. To learn more about how to create sinks see "Creating a sink".
The following example demonstrates a sink that consumes all elements into a list:
let list_sink =
let init () = [] in
let push acc x = x :: acc in
let stop acc = List.rev acc in
Sink.make ~init ~push ~stop ()
It can be used with:
# Stream.(iota 5 |> into list_sink)
- : int list = [0; 1; 2; 3; 4]
Type for sinks that consume elements of type 'a
and, once done, produce a value of type 'b
.
module Sink : sig ... end
Module with defintions for sinks.
Flows are decoupled transformers of values.
Flows define streaming transformation, filtering or groupping operations that are fully disconnected from input and output. Their implementation intercepts an internal folding function and modifies the input one value at a time.
Flows are a great way to define decoupled transformations that can be used with Stream.via
.
A flow can be applied to a stream with Stream.via
:
# Stream.range 10 100
|> Stream.via (Flow.map (fun x -> x + 1))
|> Stream.into Sink.sum
- : int = 4995
Flows can also be composed to form a pipeline:
# let flow = Flow.(map (fun x -> x + 1) >> filter (fun x -> x mod 2 = 0)) in
Stream.range 10 100
|> Stream.via flow
|> Stream.into Sink.sum
- : int = 2475
module Flow : sig ... end
Module with definitions for flows.
Streams combine sources, sinks and flows into a flexible streaming toolkit.
Stream is a purely functional abstraction for incremental, push-based, sequential processing of elements. Streams can be easily and efficiently transformed and concatenated.
Stream operations do not leak resources. This is guaranteed in the presence of early termination (when not all stream elements are consumed) or in case of exceptions in the streaming pipeline.
Streams are built to be compatible with sources, sinks and flows. To create a stream that produces all elements from a source use Stream.from
, to consume a stream with a sink use Stream.into
and to transform stream elements with a flow use Stream.via
. For more sophisticated pipelines that might have source leftovers, Stream.run
can be used.
A simple echo program that loops over standard input and prints every line to standard output until Ctrl-D is hit:
# Stream.stdin |> Stream.stdout;;
hello<Enter>
hello
world<Enter>
world
<Ctrl+d>
- : unit = ()
module Stream : sig ... end
Module with defintions for streams.