Page
Library
Module
Module type
Parameter
Class
Class type
Source
Streaming.SinkSourceModule with defintions for sinks.
Sinks are streaming abstractions that consume values and produce an aggregated value as a result. The result value is extracted from an internal state that is built incrementally. The internal state can aquire resources that are guaranteed to be terminated when the sink is filled.
Sinks are a great way to define decoupled consumers that can be filled with Stream.into.
Sinks are independent from sources and streams. You can think of them as packed arguments for folding functions with early termination. Formally, they can also be interpreted as Moore machine.
Type for sinks that consume elements of type 'a and, once done, produce a value of type 'b.
Implementing custom sinks is useful to create a collection of reusable streaming consumers for your application.
The following example demonstrates a sink that consumes all elements into a list:
let list_sink =
let init () = [] in
let push acc x = x :: acc in
let stop acc = List.rev acc in
Sink.make ~init ~push ~stop ()Alternatively, existing list/array/string/queue sinks, or others listed below, can be used.
fill result use result to fill the sink. This sink will not consume any input and will immediately produce result when used.
fold f init is a sink that reduces all input elements with the stepping function f starting with the accumulator value init.
fold_while full f init is similar to fold but can terminate early if full returns true.
val make :
init:(unit -> 'acc) ->
push:('acc -> 'a -> 'acc) ->
?full:('acc -> bool) ->
stop:('acc -> 'r) ->
unit ->
('a, 'r) tCreates a sink from a function that initializes a state value, a stepping function to update that state and a stop function that produces the final result value. Optionally a full function can be passed to decide when the sink should terminate early.
Note: The calls to full should be cheap as this function will be called to avoid allocation of unnecessary resources. If the computation required to decide if the sink is full is expensive, consider caching it whenever possible.
A full sink that will not consume any input and will not produce any results.
is_empty is true if the sink did not consume any elements and false otherwise.
Applies an effectful action to all input elements producing nothing.
The first input element, or None if the sink did not receive enough input.
Equivalent to nth 0.
The last input element, or None if the sink did not receive enough input.
The n-th input element, or None if the sink did not receive enough input.
Consumes all elements producing nothing. Useful for triggering actions in effectful streams.
contains ~where:pred finds the first element that satisfies pred returning None if there is no such element.
find ~where:pred finds the first element that satisfies pred returning None if there is no such element.
Similar to find but returns the index of the element that satisfies the predicate.
Finds the minimum element in the sequence, using the given predicate as as the comparison between the input elements.
Finds the maximum element in the sequence, using the given predicate as as the comparison between the input elements.
all ~where:pred is true if all input element satisfy pred. Will stop consuming elements when the first element that does not satisfy pred is found. Results in true for empty input.
any ~where:pred is true if at least one input element satisfies pred. Will stop consuming elements when such an element is found. Results in false for empty input.
file path is a sink that writes input strings as lines into a file located at path.
Computes a numerically stable arithmetic mean of all input elements.
zip left right computes both left and right at the same time with the same input being sent to both sinks. The results of both sinks are produced.
zip_left left right similar to zip, but only produces the result of the left sink.
zip_left left right similar to zip, but only produces the result of the right sink.
zip_with f left right similar to zip, but applies an aggregation function to results produced by left and right.
left <&> right is an operator version of zip left right.
left <& right is an operator version of zip_left left right.
left &> right is an operator version of zip_right left right.
unzip left right is a sink that receives pairs 'a * 'b, sending the first element into left and the second into right. Both sinks are computed at the same time and their results returned as an output pair.
The sink becomes full when either left or right get full.
unzip_left left right is similar to unzip, but only produces the result of the left sink.
If right terminates first, left will be forced to terminate.
unzip_left left right is similar to unzip, but only produces the result of the right sink.
If left terminates first, right will be forced to terminate.
unzip_with f left right similar to unzip, but applies an aggregation function to results produced by left and right.
left <*> right is an operator version of unzip left right.
left <* right is an operator version of unzip_left left right.
left *> right is an operator version of unzip_right left right.
distribute left right is similar to zip but distributes the consumed elements over left and right alternating in a round robin fashion.
race left right runs both left and right sinks at the same time producing the result for the one that fills first.
If the sink is terminated prematurely, before either left or right are filled, Both of their values are produced.
Examples
let sink = Sink.(race (find ~where:(fun x -> x > 10)) (nth 8)) in
let result = Stream.of_list [1; 9; 0; 8; 30; 4] |> Stream.into sink in
assert (result = Sink.Left (Some 30))left <|> right is the operator version of race left right.
seq left right runs left and then right sequentially producing both of their results.
If the resulting sink is stopped before right was started, it will be forced to initialize and terminate.
seq_left left right is similar to seq, but only produces the result of the left sink.
seq_right left right is similar to seq, but only produces the result of the right sink.
left <+> right is an operator version of seq left right.
left <+ right is an operator version of seq_left left right.
left +> right is an operator version of seq_right left right.
map f sink is a sink sink with the result transformed with f.
f <@> sink is the operator version of map f sink.
premap f sink is a sink that premaps the input values.
Examples
If sink consumes integers, but we have an input with strings, we can provide a conversion from strings to integers to premap:
let sink = Sink.(premap int_of_string sum) in
let result = Stream.of_list ["1"; "2"; "3"] |> Stream.into sink in
assert (result = 6)prefilter predicate sink is a sink that filter the input value for sink.
Close the sink and produce the currently accumulated result. Any internal state will be terminated.
In addition to using the sinks and operations defined above, it is possible to create sinks with a convenient (let) notation.
A common example of a composed sink is the sink that computes the arithmetic mean:
let mean =
let open Sink.Syntax in
let+ total = Sink.sum
and+ count = Sink.len in
total / countThe resulting sink has type (int, int) sink and will only consume the input once!
Module that implements the "Applicative" interface.