Library
Module
Module type
Parameter
Class
Class type
Streaming abstractions that combine, transform and reduce large amounts of sequential data efficiently, in constant space and without leaking resources.
Streaming uses composable stream producers (sources), consumers (sinks) and transformers (flows). The central model that abstracts over them is a Stream
.
The following features are provided:
Stream |
Push-based streams with excellent overall performance, safe and lazy resource management and a multitude of operations. Streams can be built with comprehensions. |
Source |
Pull-based producers of values with good performance, safe and lazy resource management and zipping operations. |
Sink |
Consumers of values with excellent performance, safe and lazy resource management and flexible composition operations. Sinks are like first-class folds with early termination! |
Flow |
Transformers of values that form composable streaming pipelines. Excellent for defining source and sink independent value transformations. |
For more information on each module consult the entrypoint module Streaming
.
The library can be installed with OPAM: opam install streaming
. You can run opam info streaming
to make sure that the library is installed.
Add "@opam/streaming": "0.1"
to dependencies in your package.json
file and install the dependencies with esy install
. Run esy ls-modules
to make sure that the library is installed for your project.
To start using Streaming
in your dune project add it to libraries
in the dune
file.
(executable
(public_name myexe)
(libraries streaming))
Open the entrypoint module in your code to start using Streams:
open Streaming
# #require "streaming";;
# open Streaming;;
# Stream.(stdin |> stdout);;
We're streaming!<Enter>
We're streaming!
<Ctrl+d>
- : unit = ()
That's it! Scroll down to see some examples or jump into the API documentation.
STDIN
# Stream.stdin
|> Stream.filter ((<>) "")
|> Stream.map (fun line -> "You wrote: " ^ line)
|> Stream.each print_endline
# let items =
let* n = Stream.range 1 3 in
let* c = Stream.of_list ['x'; 'y'] in
yield (n, c)
# Strea.to_list items
- : (int * char) list = [(1, 'x'); (1, 'y'); (2, 'x'); (2, 'y')])
Compute the arithmetic mean in a single iteration of the input.
# let mean =
let open Sink.Syntax in
let+ total = Sink.sum
and+ count = Sink.length in
total / count
# Stream.(iota 20 |> into mean)
- : int = 9
Input and output resources involved in stream processing will have the following two properties:
Streaming.Stream.file
will only open the file for reading when the stream is being consumed. Similarly, a sink for a file will only open the file when elements are being written into it. Sinks are initialized before sources, when the computation begins.The following examples demonstrate these two properties in practice.
(* Create a source that must not be initialized. *)
# let bomb () =
Source.make
~init:(fun () -> failwith "Boom!")
~pull:(fun () -> None)
()
val bomb : unit -> 'a source = <fun>
(* Feed it into a stream that terminates early. *)
# bomb ()
|> Stream.from
|> Stream.take 0
|> Stream.to_list
- : 'a list = []
And... nothing! As you can see our "bomb" was never detonated. This is because in streams, sinks (in our case it's to_list
combined with take
) are checked before for "fullness" before sources are initialized.
In the next example let's look at how streams behave in the presence of exceptions.
# let im_a_source_i_must_not_leak () =
Source.make
~init:(fun () -> `Dangerous_input)
~pull:(fun st -> Some ("always blue", st))
~stop:(fun `Dangerous_input ->
print_endline "Stopping input... Phew!")
()
val im_a_source_i_must_not_leak : unit -> string source = <fun>
# let im_a_sink_i_must_not_leak () =
Sink.make
~init:(fun () -> `Dangerous_output)
~push:(fun `Dangerous_output x -> `Dangerous_output)
~stop:(fun `Dangerous_output ->
print_endline "Stopping output... That was close!")
()
val im_a_sink_i_must_not_leak : unit -> ('a, unit) sink = <fun>
# im_a_source_i_must_not_leak ()
|> Stream.from
|> Stream.map (fun x -> failwith "Boom!")
|> Stream.into (im_a_sink_i_must_not_leak ())
Stopping input... Phew!
Stopping output... That was close!
Exception: Failure "Boom!".
Our termination functions run just before the world exploded!
The termination functions in streaming are always guaranteed to be called. What is currently not well specified is the state they will be called with. It is possible for a source, for example, to stream lines from multiple files, while seamlessly opening and closing them as the input is read. In normal termination conditions, streaming will correctly call all termination functions with correct states.
The same is not true in situations when exceptions are raised. Currently, when there is an exception, streaming will call the termination function on the first instance of the state, even though it might have changed.
This is not a difficult problem to solve, but a correct implementation has a high performance cost.
In the future, streaming might expose safer stream management functions to help with these situations. For now, it is recommended that sources and sinks implement their stop
functions in a way that allows them to close all allocated resources when given only the first state. This can be achieved by aggregating the intermediate states or using refs to allow the initial state to point to the latest sate.
streaming
?streaming
is a general-purpose streaming library with abstractions meant to be used as a drop-in replacement for concrete sequential data-structures such as lists. It is always a good idea to use a streaming model if you need sequential access to data. For very small collections that do not need to be processed multiple times, using lists is adequate. In all other situations, streams are significantly better in terms of performance and composition.
Streaming abstractions are an excellent choice for stateful producers and consumers that require precise resource management. Consuming elements from a file or a database handler with streams is significantly safer. All models in streaming
are lazy (they will only initialise resources when needed) and support prompt termination (the resources will be terminate immediately when they are no longer needed). This is guaranteed even when streaming pipelines raise exceptions.
Finally, streaming encourages implementation of small decoupled sources, sinks and flow that can be reused in a wide spectrum of situations.
In short - fast (see for yourself). The Streaming.Stream
module was designed to improve the most efficient iteration model currently available for OCaml, which is the so called "internal iterator" with the type ('a -> unit) -> unit
. This iterator is related to the commonly available "iter" functions in the standard library.
Streams are a variant of internal iterators that add support for resource safety, early termination and avoid the need for mutations and exceptions in the combinators. In addition to that, they provide a similar performance profile.
Both sources and streams produce values. The main difference is flow control: with sources, consumers of the elements are in charge of control; while with streams, it's the producers who drive the computation. This means that sources should be used for situations where the elements are requested on-demand, while streams are best suited for "reactive" inputs.
In general, streams offer better performance than sources for the most common operations (including concatenation) and offer integration with sinks and flows. On the other hand, sources are easier to create and support zipping.
It is recommended to use sources to define decoupled producers that can be consumed with streams. Any source can become a stream, but the opposite is not so easy.
Even though the core types in streaming are very simple, stable and have many nice properties, it is possible that the types might change in the future to support new functionality such as backpressure and concurrency. To avoid breaking changes, and until the library reaches the 1.0 milestone, the types are going to be abstract.
If you have a use-case that requires access to the internal type, please open an issue.
let answer () =
let x = "84726982693273833265833289698432737883857070736773697" ^
"8843268658465327079823265327769657873787170857632657883876982
What if there was no space and time?" in
let open Format in let (%^) = printf in
let open Streaming in let open Stream in
let (%)=fun f g x->f (g x) and
(|/-)=(%^)"\027[1m\027[41m";"\\|/-"in let (//) x = (|/-)
|> of_string
|> interpose '\b' |> append '\b'
|> cycle ~times:(Random.int 10)
|> append x
|> each (fun c -> "%c%!" %^ (Unix.sleepf 0.003; c)) in
from @@ (let pull i =
if i >= 114 then None
else Some (String.sub x i 2, i + 2)
in Source.make ~init:(fun () -> 0) ~pull ())
|> via (Flow.map (char_of_int%int_of_string))
|> into (Sink.each (//)); (%^) "\027[0m"
Stream
module - if you forget to open Streaming
you might start using the Stream
module from the standard library. This will result in Unbound value
and type mismatch errors.Streaming.Stream.take
or Streaming.Stream.drop
.This library is based on ideas found in other libraries and research projects such as: Haskell's Pipes and Foldl libraries, Scala's ZIO Streams, Clojure's Transducers and the Iteratees streaming model by Oleg Kiselyov.