package async_durable

  1. Overview
  2. Docs

This module is designed to help processess maintain state RPCs that will automatically recover from lost connections. It exposes to the client when the subscription has been lost or recovered, so the client can choose how to handle a lost connection.

module Update : sig ... end
val create : ?time_source:Async_kernel.Time_source.t -> Async_rpc_kernel.Rpc.Connection.t Async_durable__.Durable.t -> ('query, 'state, 'update, 'error) Async_rpc_kernel.Rpc.State_rpc.t -> query:'query -> resubscribe_delay:Core.Time_float.Span.t -> ('state, 'update, 'error, Async_rpc_kernel.Rpc.State_rpc.Metadata.t) Update.t Async_kernel.Pipe.Reader.t

create will immediately dispatch the the supplied Rpc.State_rpc.t with query over the Rpc.Connection.t Async_durable.t. If a connection attempt fails or if the subscription closes, it waits resubscribe_delay and dispatches again to create a new subscription. The pipe returned by create contains all the responses that come over the internal subscription, as well as updates about the state of the t.

Closing the returned pipe will permanently close the subscription.

It is guaranteed that every Connection_success message will be immediately followed by a State message.

val create_or_fail : ?time_source:Async_kernel.Time_source.t -> Async_rpc_kernel.Rpc.Connection.t Async_durable__.Durable.t -> ('query, 'state, 'update, 'error) Async_rpc_kernel.Rpc.State_rpc.t -> query:'query -> resubscribe_delay:Core.Time_float.Span.t -> (('state, 'update, 'error, Async_rpc_kernel.Rpc.State_rpc.Metadata.t) Update.t Async_kernel.Pipe.Reader.t, 'error) Core.Result.t Core.Or_error.t Async_kernel.Deferred.t

create_or_fail will return an Error e if the initial attempt to dispatch the supplied Rpc.Pipe_rpc.t does not succeed, or an Ok (Error 'error) if the initial dispatch returns a server side rpc error.

Like create and create_or_fail, but allow specifying a custom dispatch function. This is useful for clients using babel, where the Rpc.t is not usually exposed.

val create' : ?time_source:Async_kernel.Time_source.t -> 'connection Async_durable__.Durable.t -> dispatch: ('connection -> ('state * 'update Async_kernel.Pipe.Reader.t * 'metadata, 'error) Core.Result.t Core.Or_error.t Async_kernel.Deferred.t) -> resubscribe_delay:Core.Time_float.Span.t -> ('state, 'update, 'error, 'metadata) Update.t Async_kernel.Pipe.Reader.t
val create_or_fail' : ?time_source:Async_kernel.Time_source.t -> 'connection Async_durable__.Durable.t -> dispatch: ('connection -> ('state * 'update Async_kernel.Pipe.Reader.t * 'metadata, 'error) Core.Result.t Core.Or_error.t Async_kernel.Deferred.t) -> resubscribe_delay:Core.Time_float.Span.t -> (('state, 'update, 'error, 'metadata) Update.t Async_kernel.Pipe.Reader.t, 'error) Core.Result.t Core.Or_error.t Async_kernel.Deferred.t

create_versioned, create_or_fail_versioned, create_versioned', create_or_fail_versioned' are identical to create and create_or_fail but work for Caller_converts and Both_converts Versioned State RPCs.

val create_versioned : ?time_source:Async_kernel.Time_source.t -> Async_rpc_kernel.Versioned_rpc.Connection_with_menu.t Async_durable__.Durable.t -> (module Async_rpc_kernel.Versioned_rpc.Both_convert.State_rpc.S with type caller_error = 'error and type caller_query = 'query and type caller_state = 'state and type caller_update = 'update) -> query:'query -> resubscribe_delay:Core.Time_float.Span.t -> ('state, 'update Core.Or_error.t, 'error, Async_rpc_kernel.Rpc.State_rpc.Metadata.t) Update.t Async_kernel.Pipe.Reader.t
val create_versioned' : ?time_source:Async_kernel.Time_source.t -> Async_rpc_kernel.Versioned_rpc.Connection_with_menu.t Async_durable__.Durable.t -> (module Async_rpc_kernel.Versioned_rpc.Caller_converts.State_rpc.S with type error = 'error and type query = 'query and type state = 'state and type update = 'update) -> query:'query -> resubscribe_delay:Core.Time_float.Span.t -> ('state, 'update Core.Or_error.t, 'error, Async_rpc_kernel.Rpc.State_rpc.Metadata.t) Update.t Async_kernel.Pipe.Reader.t
val create_or_fail_versioned : ?time_source:Async_kernel.Time_source.t -> Async_rpc_kernel.Versioned_rpc.Connection_with_menu.t Async_durable__.Durable.t -> (module Async_rpc_kernel.Versioned_rpc.Both_convert.State_rpc.S with type caller_error = 'error and type caller_query = 'query and type caller_state = 'state and type caller_update = 'update) -> query:'query -> resubscribe_delay:Core.Time_float.Span.t -> (('state, 'update Core.Or_error.t, 'error, Async_rpc_kernel.Rpc.State_rpc.Metadata.t) Update.t Async_kernel.Pipe.Reader.t, 'error) Core.Result.t Core.Or_error.t Async_kernel.Deferred.t
val create_or_fail_versioned' : ?time_source:Async_kernel.Time_source.t -> Async_rpc_kernel.Versioned_rpc.Connection_with_menu.t Async_durable__.Durable.t -> (module Async_rpc_kernel.Versioned_rpc.Caller_converts.State_rpc.S with type error = 'error and type query = 'query and type state = 'state and type update = 'update) -> query:'query -> resubscribe_delay:Core.Time_float.Span.t -> (('state, 'update Core.Or_error.t, 'error, Async_rpc_kernel.Rpc.State_rpc.Metadata.t) Update.t Async_kernel.Pipe.Reader.t, 'error) Core.Result.t Core.Or_error.t Async_kernel.Deferred.t
OCaml

Innovation. Community. Security.