Page
Library
Module
Module type
Parameter
Class
Class type
Source
Streaming.SourceSourceModule with defintions for sources.
Elements are pulled from a source when needed. A source can have an internal state that will be lazily initialized when (and if) a consumer requests elements. The internal state will be safely disposed when the source runs out of elements, when the consumer terminates, or if an exception is raised at any point in the streaming pipeline.
Sources are a great way to define decoupled producers that can be consumed with Stream.from.
Sources are "single shot" and will haver their input exhausted by most operations. Consider buffering sources if you need to reuse their input.
Implementing your own custom sources enables access to many useful operations. The most flexible way to create a source is with the Source.make function.
The following example creates a source that counts down to zero:
let countdown n =
let init () = n in
let pull i =
if i = 0 then None
else Some (i, i - 1)) in
Source.make ~init ~pullAlternatively, existing list/array/seq/string sources, or others listed below, can be used.
generate ~len f generates a source of length len mapping each index to an element with f.
iterate x f is an infinite source where the first item is calculated by applying f to x, the second item by applying the function on the previous result and so on.
unfold seed next is a finite source created from a seed state and a function that produces elements and an updated state.
val make :
init:(unit -> 's) ->
pull:('s -> ('a * 's) option) ->
?stop:('s -> unit) ->
unit ->
'a tmake ~init ~pull ~stop () is a value source created from the init, pull and stop. This function is similar to unfold but without lazy state initialization and state termination functions.
Note: For better performance, it is recommended that the pull function caches the termination condition in case it is expensive.
zip_with f src1 src2 is a source that pulls elements from src1 and src2 one by one, combining them with f.
zip src1 src2 is a source of pairs with elements elements pulled from src1 and src2 one by one.
Equivalent to zip_with (fun x y -> (x, y)) src1 src2.
Note: Instead of applying the transformation functions at the source, consider using Stream.from or defining your compuation as a Flow to make it reusable.
A source with all elements transformed with a mapping function.
A source that includes only the elements that satisfy a predicate.
Similar to take but takes the last n elements.
Take first elements from the source that satisfy a predicate and discard the rest.
Similar to drop but drops the last n elements.
Drpo first elements from the source that satisfy a predicate and keep the rest.
Many consumers are available in the Sink module. You can consume any source using a sink with:
let source = Source.count 10 in
source
|> Stream.from
|> Stream.into Sink.lastAlternatively use the source consumers below for simple operations.
fold step init source reduces the values of source with the step function, starting with init.
If the step function raises an exception, the source will be properly terminated.
each f src applies an effectful function f to all elements in src.
dispose source forces the termination of the source state. This function is useful in situations when a leftover source is produced in Stream.run.
Note: If the source is not already initialized, calling this function will first initialize its state before it is terminated.