Library
Module
Module type
Parameter
Class
Class type
Module with defintions for sources.
Elements are pulled from a source when needed. A source can have an internal state that will be lazily initialized when (and if) a consumer requests elements. The internal state will be safely disposed when the source runs out of elements, when the consumer terminates, or if an exception is raised at any point in the streaming pipeline.
Sources are a great way to define decoupled producers that can be consumed with Stream.from
.
Sources are "single shot" and will haver their input exhausted by most operations. Consider buffering sources if you need to reuse their input.
type 'a t = 'a source
The type for sources that produce elements of type 'a
.
Implementing your own custom sources enables access to many useful operations. The most flexible way to create a source is with the Source.make
function.
The following example creates a source that counts down to zero:
let countdown n =
let init () = n in
let pull i =
if i = 0 then None
else Some (i, i - 1)) in
Source.make ~init ~pull
Alternatively, existing list
/array
/seq
/string
sources, or others listed below, can be used.
val empty : 'a t
zero
is an empty source.
val single : 'a -> 'a t
single a
is a source with a single element a
.
val list : 'a list -> 'a t
list items
is a source with all elements from the items
list.
val array : 'a array -> 'a t
array items
is a source with all elements from the items
array.
val string : string -> char t
string str
is a source with all characters from the str
string.
val bytes : bytes -> char t
bytes b
is a source with all characters from the b
bytes.
val generate : len:int -> (int -> 'a) -> 'a t
generate ~len f
generates a source of length len
mapping each index to an element with f
.
val count : int -> int t
count n
is an infinite source with integers starting from n
.
val iterate : 'a -> ('a -> 'a) -> 'a t
iterate x f
is an infinite source where the first item is calculated by applying f
to x
, the second item by applying the function on the previous result and so on.
val unfold : 's -> ('s -> ('a * 's) option) -> 'a t
unfold seed next
is a finite source created from a seed
state and a function that produces elements and an updated state.
val make :
init:(unit -> 's) ->
pull:('s -> ('a * 's) option) ->
?stop:('s -> unit) ->
unit ->
'a t
make ~init ~pull ~stop ()
is a value source created from the init
, pull
and stop
. This function is similar to unfold
but without lazy state initialization and state termination functions.
Note: For better performance, it is recommended that the pull
function caches the termination condition in case it is expensive.
zip_with f src1 src2
is a source that pulls elements from src1
and src2
one by one, combining them with f
.
zip src1 src2
is a source of pairs with elements elements pulled from src1
and src2
one by one.
Equivalent to zip_with (fun x y -> (x, y)) src1 src2
.
Note: Instead of applying the transformation functions at the source, consider using Stream.from
or defining your compuation as a Flow
to make it reusable.
A source that includes only the elements that satisfy a predicate.
Similar to take
but takes the last n
elements.
Take first elements from the source that satisfy a predicate and discard the rest.
Similar to drop
but drops the last n
elements.
Drpo first elements from the source that satisfy a predicate and keep the rest.
Many consumers are available in the Sink
module. You can consume any source using a sink with:
let source = Source.count 10 in
source
|> Stream.from
|> Stream.into Sink.last
Alternatively use the source consumers below for simple operations.
val fold : ('r -> 'a -> 'r) -> 'r -> 'a t -> 'r
fold step init source
reduces the values of source
with the step
function, starting with init
.
If the step
function raises an exception, the source will be properly terminated.
val len : 'a t -> int
len src
is the count of elements in src
.
val each : ('a -> unit) -> 'a t -> unit
each f src
applies an effectful function f
to all elements in src
.
val dispose : 'a t -> unit
dispose source
forces the termination of the source state. This function is useful in situations when a leftover source is produced in Stream.run
.
Note: If the source is not already initialized, calling this function will first initialize its state before it is terminated.