Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file persistent_connection_kernel_intf.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137(** An actively maintained connection to some service that eagerly and repeatedly attempts
to reconnect whenever the underlying connection is lost, until a new one can be
established. *)open!Coreopen!Async_kernel(** The address of a service to which one can connect. E.g. [Host_and_port.t] is a
reasonable choice when making a TCP connection.
*)moduletypeAddress=sigtypet[@@derivingequal,sexp_of]endmoduletypeClosable=sig(** a connection type *)typet(** [close t] closes the connection. The returned deferred becomes determined once any
resources needed to maintain the connection have been released. *)valclose:t->unitDeferred.t(** [is_closed t] returns true if [close] has ever been called (even if the returned
deferred has not yet been fulfilled).
Note that some modules implementing [Closable] may call close internally upon
noticing that the connection was closed by the other side. The interface of such a
module ought to say that this is the case. *)valis_closed:t->bool(** [close_finished t] becomes determined at the same time as the result of the first
call to [close]. [close_finished] differs from [close] in that it does not have the
side effect of initiating a close. *)valclose_finished:t->unitDeferred.tendmoduletypeS=sigtypet[@@derivingsexp_of](** A connection, perhaps embellished with additional information upon connection. *)typeconnmoduleEvent:sigtype'addresst=|Attempting_to_connect|Obtained_addressof'address|Failed_to_connectofError.t|Connectedofconn|Disconnected[@@derivingsexp_of]vallog_level:_t->[`Info|`Debug|`Error]end(** [create ~server_name ~on_event ~retry_delay get_address] returns a persistent
connection to a server whose host and port are obtained via [get_address] every time
we try to connect. For example, [get_address] might look up a server's host and port
in catalog at a particular path to which multiple redundant copies of a service are
publishing their location. If one copy dies, we get the address of the another one
when looking up the address afterwards.
All connection events (see the type above) are passed to the [on_event] callback, if
given. When this callback becomes determined, we move on to the next step in our
connection attempt (e.g. we won't actually attempt to connect until [on_event
Attempting_to_connect] is finished). Note that [on_event Disconnected] will only be
called once [on_event (Connected conn)] finishes even if the connection goes down
during that callback.
[`Failed_to_connect error] and [`Obtained_address addr] events are only reported if
they are distinct from the most recent event of the same type that has taken place
since the most recent [`Attempting_to_connect] event.
Connection is by default retried after [Time.Span.randomize
~percent:(Percent.of_mult 0.3) (retry_delay ())]. The default for [retry_delay] is
[const (sec 10.)]. Note that what this retry delay actually throttles is the delay
between two connection attempts, so when a long-lived connection dies, connection is
usually immediately retried, and if that failed, wait for another retry delay and
retry.
The [random_state] and [time_source] arguments are there to make persistent
connection code more deterministically testable. They default to
[`State Random.State.default] and [Time_source.wall_clock ()], respectively.
If random_state is set to [`Non_random], retry_delay will be used directly.
*)valcreate:server_name:string->?on_event:('addressEvent.t->unitDeferred.t)->?retry_delay:(unit->Time_ns.Span.t)->?random_state:[`Non_random|`StateofRandom.State.t]->?time_source:Time_source.t->connect:('address->connOr_error.tDeferred.t)->address:(moduleAddresswithtypet='address)->(unit->'addressOr_error.tDeferred.t)->t(** [connected] returns the first available connection from the time it is called. When
currently connected, the returned deferred is already determined. If [closed] has
been called, then the returned deferred is never determined. *)valconnected:t->connDeferred.t(** [event] returns a bus which is written to whenever an event happens.
Since the ['address] used in create is not exposed as a parameter of the
[t] type, we replace it with (). *)valevent_bus:t->(unitEvent.t->unit)Bus.Read_only.t(** [connected_or_failed_to_connect] is immediately determined as [Ok _] if [t] is
already connected. Otherwise it becomes determined the next time [t] becomes
connected or fails to connect or when [t] is closed. *)valconnected_or_failed_to_connect:t->connOr_error.tDeferred.t(** The current connection, if any. *)valcurrent_connection:t->connoptionvalserver_name:t->string(** [close t] closes the current connection and stops it from trying to reconnect. After
the deferred it returns becomes determined, the last connection has been closed and
no others will be attempted.
Note: no [close] calls are ever generated internally in response to the connection
being closed by the other side.
*)includeClosablewithtypet:=t(** [close_when_current_connection_is_closed t] causes the persistent connection to not
reconnect if the current connection closes or if it is not currently connected. It
does not close any active connection. *)valclose_when_current_connection_is_closed:t->unitendmoduletypePersistent_connection_kernel=sigmoduletypeAddress=AddressmoduletypeClosable=ClosablemoduletypeS=SmoduleMake(Conn:Closable):Swithtypeconn=Conn.tend