Library
Module
Module type
Parameter
Class
Class type
Module with defintions for streams.
Stream is a purely functional abstraction for incremental, push-based, sequential processing of elements. Streams can be easily and efficiently transformed and concatenated.
Stream operations do not leak resources. This is guaranteed in the presence of early termination (when not all stream elements are consumed) and in case of exceptions in the streaming pipeline.
Streams are built to be compatible with sources, sinks and flows. To create a stream that produces all elements from a source use Stream.from
, to consume a stream with a sink use Stream.into
and to transform stream elements with a flow use Stream.via
. For more sophisticated pipelines that might have source leftovers, run
can be used.
A simple echo program that loops over standard input and prints every line to standard output until Ctrl-D is hit:
# Stream.stdin |> Stream.stdout;;
hello<Enter>
hello
world<Enter>
world
<Ctrl+d>
- : unit = ()
type 'a t = 'a stream
Type for streams with elements of type 'a
.
val empty : 'a t
Empty stream with no elements.
val single : 'a -> 'a t
single a
is a stream with a single element a
.
val double : 'a -> 'a -> 'a t
double a b
is a stream with two elements: a
and b
.
val triple : 'a -> 'a -> 'a -> 'a t
triple a b c
is a stream with elements: a
, b
and c
.
val count : int -> int t
count n
is an infinite stream with integers starting from n
.
val range : ?by:int -> int -> int -> int t
range ~by:step n m
is a sequence of integers starting from n
to m
(excluding m
) incremented by step
. The range is open on the right side.
val iota : int -> int t
iota n
is range ~by:1 0 n
, that is a range from 0
to n
incremented by 1
.
val (-<) : int -> int -> int t
n -< m
is range n m
.
val (--) : int -> int -> int t
n -- m
is range n (m - 1)
.
val generate : len:int -> (int -> 'a) -> 'a t
generate ~len f
generates a stream of length n
mapping each index to an element with f
.
val repeat : ?times:int -> 'a -> 'a t
repeat ~times:n x
produces a stream by repeating x
n
times. If times
is omitted, x
is repeated ad infinitum.
val repeatedly : ?times:int -> (unit -> 'a) -> 'a t
repeatedly ~times:n f
produces a stream by repeatedly calling f ()
n
times. If times
is omitted, f
is called ad infinitum.
val iterate : 'a -> ('a -> 'a) -> 'a t
iterate x f
is an infinite source where the first item is calculated by applying f
to x
, the second item by applying the function on the previous result and so on.
val unfold : 's -> ('s -> ('a * 's) option) -> 'a t
unfold seed next
is a stream created from a seed
state and a function that produces elements and an updated state. The stream will terminate when next
produces None
.
val yield : 'a -> 'a t
yield x
is a stream with a single element x
.
val of_list : 'a list -> 'a t
of_list items
is a stream with all elements in the list items
.
val to_list : 'a t -> 'a list
to_list stream
converts stream
into a list.
val of_array : 'a array -> 'a t
of_array items
is a stream with all elements in the array items
.
val to_array : 'a t -> 'a array
to_array stream
converts stream
into an array.
val of_string : string -> char t
of_string string
is a stream with all characters in string
.
val to_string : char t -> string
to_string stream
converts stream
of characters into a string.
A stream that includes only the elements that satisfy a predicate.
Take first elements from the stream that satisfy a predicate and discard the rest.
Drpo first elements from the stream that satisfy a predicate and keep the rest.
concat stream1 stream2
is a stream that exhausts all elements from stream1
and then all elements from stream2
.
Examples
# let stream1 = Stream.of_list ['a'; 'b'; 'c'] in
let stream2 = Stream.of_list ['d'; 'e'; 'f'] in
Stream.to_list (Stream.concat stream1 stream2)
- : char list = ['a'; 'b'; 'c'; 'd'; 'e'; 'f']
stream1 ++ stream2
is the infix operator version of concat stream1 stream2
.
flat_map f stream
is a stream concatenated from sub-streams produced by applying f
to all elements of stream
.
let duplicated =
[1; 2; 3]
|> String.of_list
|> String.flat_map (fun x -> Stream.of_list [x; x])
|> Stream.to_list in
assert (duplicated = [1; 1; 2; 2; 3; 3])
cycle ~times:n stream
produces a stream by repeating all elements from stream
n
times. If times
is omitted, x
is repeated ad infinitum.
Operations that traverse the the stream computing a single result value.
If the stream is infinite and the consumer accumulates the elements, the processing will not terminate, potentially resulting in a memory leak.
val len : 'a t -> int
len stream
counts the number of elements in stream
.
Will exhaust the stream during processing.
Examples
# Stream.len (Stream.of_list ['a'; 'b'; 'c']);
- : int = 3
val each : ('a -> unit) -> 'a t -> unit
each f stream
applies an effectful function f
to all elements of stream
.
val fold : ('r -> 'a -> 'r) -> 'r -> 'a t -> 'r
fold step init stream
reduces the values of stream
with the step
function, starting with init
.
If the step
function raises an exception, the stream will be properly terminated.
val is_empty : 'a t -> bool
is_empty stream
is true
if the stream has no elements and false
otherwise. This operations consumes the first elements of the stream.
val first : 'a t -> 'a option
Return the first element in the stream.
val last : 'a t -> 'a option
Return the last element in the stream, in linear time.
val drain : 'a t -> unit
val of_file : string -> string t
of_file path
is a stream of lines read from the file located at path
.
The file is opened lazily only when the stream is consumed and will be closed even if the stream processing terminates with an exception.
val to_file : string -> string t -> unit
to_file path stream
writes lines from stream
into the file located at path
.
val stdin : string t
The stream that reads lines from the standard input channel.
val stdout : string t -> unit
The stream that writes lines to standard output channel.
val stderr : string t -> unit
The stream that writes lines to standard error channel.
Integration adaptors for sources, sinks and flows.
from source
is a stream created from a source.
Examples
# Stream.len (Stream.from (Source.list [0; 1; 2]))
- : int = 3
into sink stream
is the result value produced by streaming all elements of stream
into sink
.
Examples
# Stream.into Sink.sum (Stream.of_list [0; 1; 2])
- : int = 3
fill sink stream
is similar to into
but, in addition to the result value produced by sink
, will optionally return a leftover stream with elements that were not consumed by sink
.
via flow stream
is stream produced by transforming all elements of stream
via flow
.
Examples
Stream.count 100
|> Stream.via (Flow.filter (fun x -> x mod 2 = 0))
|> Stream.via (Flow.take 50)
|> Stream.into Sink.sum
Fuses sources, sinks and flows and produces a result and a leftover.
let (r, leftover) = Stream.run ~from:source via:flow ~into:sink
Streams elements from source
into sink
via a stream transformer flow
. In addition to the result value r
produced by sink
, a leftover
source is returned, if source
was not exhausted.
Warning: If a leftover source is produced, it is required to either consume it or manually dispose its resources. Not doing so might lead to resource leaks.
Examples
# let (x, leftover) =
let source = Source.list ["1"; "2"; "3"] in
let flow = Flow.map int_of_string in
Stream.run ~from:source ~via:flow ~into:Sink.first
val x : int option = Some 1
val leftover : string source option = Some <abstr>
# match leftover with
| Some source -> Source.dispose source
| None -> print_endline "No leftover"
- : unit = ()
Streams can be constructed with the let-binding syntax which is similar to list comprehensions. The following example demonstrates this feature:
open Stream.Syntax
let items =
let* n = Stream.range 1 3 in
let* c = Stream.of_list ['x'; 'y'] in
yield (n, c) in
assert (Strea.to_list items = [(1, 'x'); (1, 'y'); (2, 'x'); (2, 'y')])
module Syntax : sig ... end
Module with syntax definitions for streams.