Page
Library
Module
Module type
Parameter
Class
Class type
Source
Streaming.StreamSourceModule with defintions for streams.
Stream is a purely functional abstraction for incremental, push-based, sequential processing of elements. Streams can be easily and efficiently transformed and concatenated.
Stream operations do not leak resources. This is guaranteed in the presence of early termination (when not all stream elements are consumed) and in case of exceptions in the streaming pipeline.
Streams are built to be compatible with sources, sinks and flows. To create a stream that produces all elements from a source use Stream.from, to consume a stream with a sink use Stream.into and to transform stream elements with a flow use Stream.via. For more sophisticated pipelines that might have source leftovers, run can be used.
A simple echo program that loops over standard input and prints every line to standard output until Ctrl-D is hit:
# Stream.stdin |> Stream.stdout;;
hello<Enter>
hello
world<Enter>
world
<Ctrl+d>
- : unit = ()range ~by:step n m is a sequence of integers starting from n to m (excluding m) incremented by step. The range is open on the right side.
iota n is range ~by:1 0 n, that is a range from 0 to n incremented by 1.
generate ~len f generates a stream of length n mapping each index to an element with f.
repeat ~times:n x produces a stream by repeating x n times. If times is omitted, x is repeated ad infinitum.
repeatedly ~times:n f produces a stream by repeatedly calling f () n times. If times is omitted, f is called ad infinitum.
iterate x f is an infinite source where the first item is calculated by applying f to x, the second item by applying the function on the previous result and so on.
unfold seed next is a stream created from a seed state and a function that produces elements and an updated state. The stream will terminate when next produces None.
of_array items is a stream with all elements in the array items.
A stream with all elements transformed with a mapping function.
A stream that includes only the elements that satisfy a predicate.
Take first elements from the stream that satisfy a predicate and discard the rest.
Drpo first elements from the stream that satisfy a predicate and keep the rest.
concat stream1 stream2 is a stream that exhausts all elements from stream1 and then all elements from stream2.
Examples
# let stream1 = Stream.of_list ['a'; 'b'; 'c'] in
let stream2 = Stream.of_list ['d'; 'e'; 'f'] in
Stream.to_list (Stream.concat stream1 stream2)
- : char list = ['a'; 'b'; 'c'; 'd'; 'e'; 'f']stream1 ++ stream2 is the infix operator version of concat stream1 stream2.
prepend x stream adds the element x to the beginning of stream.
flat_map f stream is a stream concatenated from sub-streams produced by applying f to all elements of stream.
let duplicated =
[1; 2; 3]
|> String.of_list
|> String.flat_map (fun x -> Stream.of_list [x; x])
|> Stream.to_list in
assert (duplicated = [1; 1; 2; 2; 3; 3])cycle ~times:n stream produces a stream by repeating all elements from stream n times. If times is omitted, x is repeated ad infinitum.
partition n partitions the stream into sub-streams of size n.
Operations that traverse the the stream computing a single result value.
If the stream is infinite and the consumer accumulates the elements, the processing will not terminate, potentially resulting in a memory leak.
len stream counts the number of elements in stream.
Will exhaust the stream during processing.
Examples
# Stream.len (Stream.of_list ['a'; 'b'; 'c']);
- : int = 3each f stream applies an effectful function f to all elements of stream.
fold step init stream reduces the values of stream with the step function, starting with init.
If the step function raises an exception, the stream will be properly terminated.
is_empty stream is true if the stream has no elements and false otherwise. This operations consumes the first elements of the stream.
of_file path is a stream of lines read from the file located at path.
The file is opened lazily only when the stream is consumed and will be closed even if the stream processing terminates with an exception.
to_file path stream writes lines from stream into the file located at path.
Integration adaptors for sources, sinks and flows.
from source is a stream created from a source.
Examples
# Stream.len (Stream.from (Source.list [0; 1; 2]))
- : int = 3into sink stream is the result value produced by streaming all elements of stream into sink.
Examples
# Stream.into Sink.sum (Stream.of_list [0; 1; 2])
- : int = 3fill sink stream is similar to into but, in addition to the result value produced by sink, will optionally return a leftover stream with elements that were not consumed by sink.
via flow stream is stream produced by transforming all elements of stream via flow.
Examples
Stream.count 100
|> Stream.via (Flow.filter (fun x -> x mod 2 = 0))
|> Stream.via (Flow.take 50)
|> Stream.into Sink.sumFuses sources, sinks and flows and produces a result and a leftover.
let (r, leftover) = Stream.run ~from:source via:flow ~into:sinkStreams elements from source into sink via a stream transformer flow. In addition to the result value r produced by sink, a leftover source is returned, if source was not exhausted.
Warning: If a leftover source is produced, it is required to either consume it or manually dispose its resources. Not doing so might lead to resource leaks.
Examples
# let (x, leftover) =
let source = Source.list ["1"; "2"; "3"] in
let flow = Flow.map int_of_string in
Stream.run ~from:source ~via:flow ~into:Sink.first
val x : int option = Some 1
val leftover : string source option = Some <abstr>
# match leftover with
| Some source -> Source.dispose source
| None -> print_endline "No leftover"
- : unit = ()Streams can be constructed with the let-binding syntax which is similar to list comprehensions. The following example demonstrates this feature:
open Stream.Syntax
let items =
let* n = Stream.range 1 3 in
let* c = Stream.of_list ['x'; 'y'] in
yield (n, c) in
assert (Strea.to_list items = [(1, 'x'); (1, 'y'); (2, 'x'); (2, 'y')])