package async_extended

  1. Overview
  2. Docs
type 'a t

Erlang style mailboxes built on top of async streams

include sig ... end
val sexp_of_t : ('a -> Ppx_sexp_conv_lib.Sexp.t) -> 'a t -> Ppx_sexp_conv_lib.Sexp.t
module Filter : sig ... end
val create : ?to_sexp:('a -> Async.Sexp.t) -> (unit -> 'a option Async.Deferred.t) -> 'a t
val of_stream : ?to_sexp:('a -> Async.Sexp.t) -> 'a Async.Stream.t -> 'a t
val of_pipe : ?to_sexp:('a -> Async.Sexp.t) -> 'a Async.Pipe.Reader.t -> 'a t
val receive : ?debug:string -> ?timeout:unit Async.Deferred.t -> ?swallow:bool -> 'a t -> filter:('a, 'b) Filter.t -> postcond:('b list -> bool) -> 'b list Async.Deferred.t

receive t ~filter ~postcond apply postcond to the messages picked by filter. Return if postcond returns true, otherwise keep trying until timeout becomes determined, which will raise an exception.

If this returns successfully, the remaining data in the mailbox will be:

  • the list of messages that did not pass the filter, in the order received, iff swallow is false.
  • the list of messages that did not pass the filter AND arrived after the last message that did pass the filter, in the order received, iff swallow is true OR if no messages passed the filter.
val peek : 'a t -> ('a, 'b) Filter.t -> 'b list
val zero : ?debug:string -> 'a t -> ('a, 'b) Filter.t -> unit

zero t f asserts that there are exactly zero matching messages.

val one : ?debug:string -> ?timeout:unit Async.Deferred.t -> ?swallow:bool -> 'a t -> ('a, 'b) Filter.t -> 'b Async.Deferred.t

one t f run receive, asserting that there is exactly one matching message.

val two : ?debug:string -> ?timeout:unit Async.Deferred.t -> ?swallow:bool -> 'a t -> ('a, 'b) Filter.t -> ('b * 'b) Async.Deferred.t

two t f run receive, asserting that there are exactly two matching messages.

val many : ?debug:string -> ?timeout:unit Async.Deferred.t -> ?swallow:bool -> 'a t -> int -> ('a, 'b) Filter.t -> 'b list Async.Deferred.t

many t n f run receive, asserting that there are exactly n matching messages.

val not_empty : ?debug:string -> ?timeout:unit Async.Deferred.t -> 'a t -> ('a, 'b) Filter.t -> 'b list Async.Deferred.t

not_empty t f run receive, asserting that there is at least one matching message.

val clear : ?to_remove:('a -> bool) -> 'a t -> unit

clear t wipes out all previously received messages matching the to_remove predicate. If to_remove is not provided, wipes out all the messages.

Immediately after calling clear t, zero t f succeeds for any f.

val check_clear : _ t -> unit Core.Or_error.t

check_clear t - Ok if the mailbox is empty, descriptive error if the mailbox has any messages

val filter : 'a t -> ('a, _) Filter.t -> unit

filter t f removes all elements currently in t that satisfy f. Future arrivals are unaffected.

OCaml

Innovation. Community. Security.