package shared-block-ring

  1. Overview
  2. Docs

Module Make.ConsumerSource

include S.RING with type disk := B.t with type position = Producer.position with type item = Item.t
Sourcetype t
Sourcetype item = Item.t
Sourcetype error = [
  1. | `Retry
  2. | `Suspended
  3. | `Msg of string
]
Sourcetype 'a result = ('a, error) Result.result
Sourceval pp_error : Format.formatter -> error -> unit
Sourceval open_error : 'a result -> ('a, [> error ]) Result.result
Sourceval error_to_msg : 'a result -> ('a, S.msg) Result.result
Sourceval attach : ?queue:string -> ?client:string -> disk:B.t -> unit -> t result Lwt.t

attach queue client blockdevice attaches to a previously-created shared ring on top of blockdevice.

Sourceval detach : t -> unit Lwt.t

detach t frees all resources associated with t. Attempts to use t after a detach will result in an `Error _

Sourceval state : t -> [ `Running | `Suspended ] result Lwt.t

state t () queries the current state of the ring. If the result is `Suspended then the producer has acknowledged and will nolonger produce items. Clients which support suspend/resume should arrange to call this function periodically.

Sourceval debug_info : t -> (string * string) list result Lwt.t

debug_info t returns a list of key=value pairs which may be useful for debugging. Nothing should be assumed about the keys or the values; they should only be printed or logged.

Sourcetype position = Producer.position

The position within a stream

Sourceval sexp_of_position : position -> Sexplib0.Sexp.t
include S.COMPARABLE with type t := position
Sourceval compare : position -> position -> [ `LessThan | `Equal | `GreaterThan ]

Compare two items

Sourceval advance : t:t -> position:position -> unit -> unit result Lwt.t

advance t position exposes the item associated with position to the Consumer so it can be popped.

Sourceval suspend : t -> unit result Lwt.t

suspend t signals that the producer should stop pushing items. Note this function returns before the producer has acknowledged. The result `Retry means that a previous call to resume has not been acknowledged; the client should retry.

Sourceval resume : t -> unit result Lwt.t

resume t signals that a producer may again start pushing items. This call does not wait for an acknowledgement from the producer. Note it is not an error to resume an already-resumed queue. The result `Retry means that a previous call to suspend has not been acknowledged; the client should retry.

Sourceval pop : t:t -> ?from:position -> unit -> (position * item) result Lwt.t

peek t ?position () returns a pair (position, item) where item is the next item on the ring after from. Repeated calls to pop will return the same item. To indicate that the item has been processed, call advance position. `Retry means there is no item available at the moment and the client should try again later.

Sourceval fold : f:(item -> 'a -> 'a) -> t:t -> ?from:position -> init:'a -> unit -> (position * 'a) result Lwt.t

peek_all f t ?position init () folds f across all the values that can be immediately peeked from the ring. If any of the fold operations fail then the whole operation fails. The successful result includes the final position which can be used to consume all the items at once.