Library
Module
Module type
Parameter
Class
Class type
Module with defintions for sinks.
Sinks are streaming abstractions that consume values and produce an aggregated value as a result. The result value is extracted from an internal state that is built incrementally. The internal state can aquire resources that are guaranteed to be terminated when the sink is filled.
Sinks are a great way to define decoupled consumers that can be filled with Stream.into
.
Sinks are independent from sources and streams. You can think of them as packed arguments for folding functions with early termination. Formally, they can also be interpreted as Moore machine.
type ('a, 'b) t = ('a, 'b) sink
Type for sinks that consume elements of type 'a
and, once done, produce a value of type 'b
.
Implementing custom sinks is useful to create a collection of reusable streaming consumers for your application.
The following example demonstrates a sink that consumes all elements into a list:
let list_sink =
let init () = [] in
let push acc x = x :: acc in
let stop acc = List.rev acc in
Sink.make ~init ~push ~stop ()
Alternatively, existing list
/array
/string
/queue
sinks, or others listed below, can be used.
val fill : 'r -> ('a, 'r) t
fill result
use result
to fill the sink. This sink will not consume any input and will immediately produce result
when used.
val fold : ('r -> 'a -> 'r) -> 'r -> ('a, 'r) t
fold f init
is a sink that reduces all input elements with the stepping function f
starting with the accumulator value init
.
val fold_while : ('r -> bool) -> ('r -> 'a -> 'r) -> 'r -> ('a, 'r) t
fold_while full f init
is similar to fold
but can terminate early if full
returns true
.
val make :
init:(unit -> 'acc) ->
push:('acc -> 'a -> 'acc) ->
?full:('acc -> bool) ->
stop:('acc -> 'r) ->
unit ->
('a, 'r) t
Creates a sink from a function that init
ializes a state value, a step
ping function to update that state and a stop
function that produces the final result value. Optionally a full
function can be passed to decide when the sink should terminate early.
Note: The calls to full
should be cheap as this function will be called to avoid allocation of unnecessary resources. If the computation required to decide if the sink is full is expensive, consider caching it whenever possible.
val full : ('a, unit) t
A full sink that will not consume any input and will not produce any results.
val is_empty : ('a, bool) t
is_empty
is true
if the sink did not consume any elements and false
otherwise.
val each : ('a -> unit) -> ('a, unit) t
Applies an effectful action to all input elements producing nothing.
val len : ('a, int) t
Consumes and counts all input elements.
val first : ('a, 'a option) t
The first input element, or None
if the sink did not receive enough input.
Equivalent to nth 0
.
val last : ('a, 'a option) t
The last input element, or None
if the sink did not receive enough input.
val nth : int -> ('a, 'a option) t
The n-th input element, or None
if the sink did not receive enough input.
val drain : ('a, unit) t
Consumes all elements producing nothing. Useful for triggering actions in effectful streams.
val contains : where:('a -> bool) -> ('a, bool) t
contains ~where:pred
finds the first element that satisfies pred
returning None
if there is no such element.
val find : where:('a -> bool) -> ('a, 'a option) t
find ~where:pred
finds the first element that satisfies pred
returning None
if there is no such element.
val index : where:('a -> bool) -> ('a, int option) t
Similar to find
but returns the index of the element that satisfies the predicate.
val minimum : by:('a -> 'a -> bool) -> ('a, 'a option) t
Finds the minimum element in the sequence, using the given predicate as as the comparison between the input elements.
val maximum : by:('a -> 'a -> bool) -> ('a, 'a option) t
Finds the maximum element in the sequence, using the given predicate as as the comparison between the input elements.
val all : where:('a -> bool) -> ('a, bool) t
all ~where:pred
is true
if all input element satisfy pred
. Will stop consuming elements when the first element that does not satisfy pred
is found. Results in true
for empty input.
val any : where:('a -> bool) -> ('a, bool) t
any ~where:pred
is true
if at least one input element satisfies pred
. Will stop consuming elements when such an element is found. Results in false
for empty input.
val list : ('a, 'a list) t
Puts all input elements into a list.
val array : ('a, 'a array) t
Puts all input elements into an array.
val string : (string, string) t
Consumes and concatenates strings.
val bytes : (bytes, bytes) t
Consumes and concatenates bytes.
val print : (string, unit) t
Prints all input string elements to standard output as lines.
val file : string -> (string, unit) t
file path
is a sink that writes input strings as lines into a file located at path
.
val stdout : (string, unit) t
A sink that writes input strings as lines to STDOUT.
val stderr : (string, unit) t
A sink that writes input strings as lines to STDERR.
val sum : (int, int) t
Adds all input integer values.
val product : (int, int) t
Product of input integer values. Stops if any input element is 0
.
val mean : (float, float) t
Computes a numerically stable arithmetic mean of all input elements.
zip left right
computes both left
and right
at the same time with the same input being sent to both sinks. The results of both sinks are produced.
zip_left left right
similar to zip
, but only produces the result of the left
sink.
zip_left left right
similar to zip
, but only produces the result of the right
sink.
zip_with f left right
similar to zip
, but applies an aggregation function to results produced by left
and right
.
left <&> right
is an operator version of zip left right
.
left <& right
is an operator version of zip_left left right
.
left &> right
is an operator version of zip_right left right
.
unzip left right
is a sink that receives pairs 'a * 'b
, sending the first element into left
and the second into right
. Both sinks are computed at the same time and their results returned as an output pair.
The sink becomes full when either left
or right
get full.
unzip_left left right
is similar to unzip
, but only produces the result of the left
sink.
If right
terminates first, left
will be forced to terminate.
unzip_left left right
is similar to unzip
, but only produces the result of the right
sink.
If left
terminates first, right
will be forced to terminate.
unzip_with f left right
similar to unzip
, but applies an aggregation function to results produced by left
and right
.
left <*> right
is an operator version of unzip left right
.
left <* right
is an operator version of unzip_left left right
.
left *> right
is an operator version of unzip_right left right
.
distribute left right
is similar to zip
but distributes the consumed elements over left
and right
alternating in a round robin fashion.
Type for race
result values.
race left right
runs both left
and right
sinks at the same time producing the result for the one that fills first.
If the sink is terminated prematurely, before either left
or right
are filled, Both
of their values are produced.
Examples
let sink = Sink.(race (find ~where:(fun x -> x > 10)) (nth 8)) in
let result = Stream.of_list [1; 9; 0; 8; 30; 4] |> Stream.into sink in
assert (result = Sink.Left (Some 30))
left <|> right
is the operator version of race left right
.
seq left right
runs left
and then right
sequentially producing both of their results.
If the resulting sink is stopped before right
was started, it will be forced to initialize and terminate.
seq_left left right
is similar to seq
, but only produces the result of the left
sink.
seq_right left right
is similar to seq
, but only produces the result of the right
sink.
left <+> right
is an operator version of seq left right
.
left <+ right
is an operator version of seq_left left right
.
left +> right
is an operator version of seq_right left right
.
map f sink
is a sink sink
with the result transformed with f
.
premap f sink
is a sink that premaps the input values.
Examples
If sink
consumes integers, but we have an input with strings, we can provide a conversion from strings to integers to premap
:
let sink = Sink.(premap int_of_string sum) in
let result = Stream.of_list ["1"; "2"; "3"] |> Stream.into sink in
assert (result = 6)
prefilter predicate sink
is a sink that filter the input value for sink
.
val dispose : ('a, 'r) t -> 'r
Close the sink and produce the currently accumulated result. Any internal state will be terminated.
In addition to using the sinks and operations defined above, it is possible to create sinks with a convenient (let)
notation.
A common example of a composed sink is the sink that computes the arithmetic mean:
let mean =
let open Sink.Syntax in
let+ total = Sink.sum
and+ count = Sink.len in
total / count
The resulting sink has type (int, int) sink
and will only consume the input once!
module Syntax : sig ... end
Module with syntax definitions for sinks.
module Functor : sig ... end
Module that implements the "Functor" interface.
module Applicative : sig ... end
Module that implements the "Applicative" interface.