package async_rpc_kernel

  1. Overview
  2. Docs
Legend:
Library
Module
Module type
Parameter
Class
Class type
val sexp_of_t : t -> Sexplib0.Sexp.t
module Heartbeat_config : sig ... end
module Client_implementations : sig ... end
val create : ?implementations:'s Async_rpc_kernel__.Implementations.t -> ?protocol_version_headers:Protocol_version_header.Pair.t -> connection_state:(t -> 's) -> ?handshake_timeout:Core.Time_ns.Span.t -> ?heartbeat_config:Heartbeat_config.t -> ?max_metadata_size:Core.Byte_units.t -> ?description:Core.Info.t -> ?time_source:Async_kernel.Synchronous_time_source.t -> ?identification:Core.Bigstring.t -> Async_rpc_kernel__.Transport.t -> (t, Core.Exn.t) Core.Result.t Async_kernel.Deferred.t

Initiate an Rpc connection on the given transport. implementations should be the bag of implementations that the calling side implements; it defaults to Implementations.null (i.e., "I implement no RPCs").

protocol_version_headers should contain the pre-shared protocol version headers, if applicable. The client and server must agree on whether these were shared and, if so, what they contained, or there may be a protocol error.

connection_state will be called once, before create's result is determined, on the same connection that create returns. Its output will be provided to the implementations when queries arrive.

WARNING: If specifying a custom heartbeat_config, make sure that both ends of the Rpc connection use compatible settings for timeout and send frequency. Otherwise, your Rpc connections might close unexpectedly.

max_metadata_size will limit how many bytes of metadata this peer can send along with each query. It defaults to 1k. User-provided metadata exceeding that size will be truncated. WARNING: setting this value too high allows this connection to send large amounts of data to the callee, unnoticed, which can severely degrade performance.

description can be used to give some extra information about the connection, which will then show up in error messages and the connection's sexp. If you have lots of connections in your program, this can be useful for distinguishing them.

time_source can be given to define the time_source for which the heartbeating events will be scheduled. Defaults to wall-clock.

identification can be used to send an additional information to the peer. This is intended to be used for identifying the identity of the /process/ as opposed to the identity of the user. We use a bigstring to leave the option for clients to interpret as structured data of their choosing.

val contains_magic_prefix : bool Core.Bin_prot.Type_class.reader

As of Feb 2017, the RPC protocol started to contain a magic number so that one can identify RPC communication. The bool returned by contains_magic_prefix says whether this magic number was observed.

val description : t -> Core.Info.t
val add_heartbeat_callback : t -> (unit -> unit) -> unit

After add_heartbeat_callback t f, f () will be called after every subsequent heartbeat received by t.

val reset_heartbeat_timeout : t -> Core.Time_ns.Span.t -> unit

Changes the heartbeat timeout and restarts the timer by setting last_seen_alive to the current time.

val last_seen_alive : t -> Core.Time_ns.t

The last time either any message has been received or reset_heartbeat_timeout was called.

val close : ?streaming_responses_flush_timeout:Core.Time_ns.Span.t -> ?reason:Core.Info.t -> t -> unit Async_kernel.Deferred.t

close starts closing the connection's transport, and returns a deferred that becomes determined when its close completes. It is ok to call close multiple times on the same t; calls subsequent to the initial call will have no effect, but will return the same deferred as the original call.

Before closing the underlying transport's writer, close waits for all streaming responses to be Pipe.upstream_flushed with a timeout of streaming_responses_flush_timeout.

The reason for closing the connection will be passed to callers of close_reason.

val close_finished : t -> unit Async_kernel.Deferred.t

close_finished becomes determined after the close of the connection's transport completes, i.e. the same deferred that close returns. close_finished differs from close in that it does not have the side effect of initiating a close.

val close_reason : t -> on_close:[ `started | `finished ] -> Core.Info.t Async_kernel.Deferred.t

close_reason ~on_close t becomes determined when close starts or finishes based on on_close, but additionally returns the reason that the connection was closed.

val is_closed : t -> bool

is_closed t returns true iff close t has been called. close may be called internally upon errors or timeouts.

val bytes_to_write : t -> int

bytes_to_write and flushed just call the similarly named function on the Transport.Writer.t within a connection.

val bytes_written : t -> Core.Int63.t

bytes_written just calls the similarly named functions on the Transport.Writer.t within a connection.

val bytes_read : t -> Core.Int63.t

bytes_read just calls the similarly named function on the Transport.Reader.t within a connection.

val flushed : t -> unit Async_kernel.Deferred.t

Peer menu will become determined before any other messages are received. The menu is sent automatically on creation of a connection. If the peer is using an older version, the value is immediately determined to be None. If the connection is closed before the menu is received, an error is returned.

It is expected that one will call Versioned_rpc.Connection_with_menu.create instead of this function and that will request the menu via rpc if it gets None.

val peer_menu' : t -> Menu.t option Rpc_result.t Async_kernel.Deferred.t

Like peer_menu but returns an rpc result

val my_menu : t -> Menu.t option
val peer_identification : t -> Core.Bigstring.t option Async_kernel.Deferred.t

Peer identification will become determined before any other messages are received. If the peer is using an older version, the peer id is immediately determined to be None. If the connection is closed before the menu is received, None is returned.

val with_close : ?implementations:'s Async_rpc_kernel__.Implementations.t -> ?protocol_version_headers:Protocol_version_header.Pair.t -> ?handshake_timeout:Core.Time_ns.Span.t -> ?heartbeat_config:Heartbeat_config.t -> ?description:Core.Info.t -> ?time_source:Async_kernel.Synchronous_time_source.t -> connection_state:(t -> 's) -> Async_rpc_kernel__.Transport.t -> dispatch_queries:(t -> 'a Async_kernel.Deferred.t) -> on_handshake_error: [ `Raise | `Call of Core.Exn.t -> 'a Async_kernel.Deferred.t ] -> 'a Async_kernel.Deferred.t

with_close tries to create a t using the given transport. If a handshake error is the result, it calls on_handshake_error, for which the default behavior is to raise an exception. If no error results, dispatch_queries is called on t.

After dispatch_queries returns, if server is None, the t will be closed and the deferred returned by dispatch_queries will be determined immediately. Otherwise, we'll wait until the other side closes the connection and then close t and determine the deferred returned by dispatch_queries.

When the deferred returned by with_close becomes determined, Transport.close has finished.

NOTE: Because this connection is closed when the Deferred.t returned by dispatch_queries is determined, you should be careful when using this with Pipe_rpc. For example, simply returning the pipe when you get it will close the pipe immediately. You should instead either use the pipe inside dispatch_queries and not determine its result until you are done with the pipe, or use a different function like create.

val server_with_close : ?handshake_timeout:Core.Time_ns.Span.t -> ?heartbeat_config:Heartbeat_config.t -> ?description:Core.Info.t -> ?time_source:Async_kernel.Synchronous_time_source.t -> Async_rpc_kernel__.Transport.t -> implementations:'s Async_rpc_kernel__.Implementations.t -> connection_state:(t -> 's) -> on_handshake_error: [ `Raise | `Ignore | `Call of Core.Exn.t -> unit Async_kernel.Deferred.t ] -> unit Async_kernel.Deferred.t

Runs with_close but dispatches no queries. The implementations are required because this function doesn't let you dispatch any queries (i.e., act as a client), it would be pointless to call it if you didn't want to act as a server.

val compute_metadata : t -> Async_rpc_kernel__.Description.t -> Core.Int63.t -> Rpc_metadata.t option
type response_with_determinable_status =
  1. | Pipe_eof
  2. | Expert_indeterminate
type response_handler_action =
  1. | Keep
  2. | Wait of unit Async_kernel.Deferred.t
  3. | Expert_remove_and_wait of unit Async_kernel.Deferred.t
type response_handler = Bin_prot.Nat0.t Async_rpc_kernel__.Protocol.Response.needs_length -> read_buffer:Core.Bigstring.t -> read_buffer_pos_ref:int Core.ref -> response_handler_action
val sexp_of_t_hum_writer : t -> Core.Sexp.t
module Dispatch_error : sig ... end
val dispatch : t -> kind:Tracing_event.Sent_response_kind.t Tracing_event.Kind.t -> response_handler:response_handler option -> bin_writer_query:'a Core.Bin_prot.Type_class.writer -> query:'a Async_rpc_kernel__.Protocol.Query.needs_length -> (unit, Dispatch_error.t) Core.Result.t
val dispatch_bigstring : t -> tag:Async_rpc_kernel__.Protocol.Rpc_tag.t -> version:int -> metadata:unit -> Core.Bigstring.t -> pos:int -> len:int -> response_handler:response_handler option -> (unit, Dispatch_error.t) Core.Result.t
val schedule_dispatch_bigstring : t -> tag:Async_rpc_kernel__.Protocol.Rpc_tag.t -> version:int -> metadata:unit -> Core.Bigstring.t -> pos:int -> len:int -> response_handler:response_handler option -> (unit Async_kernel.Deferred.t, Dispatch_error.t) Core.Result.t
val schedule_dispatch_bigstring_with_metadata : t -> tag:Async_rpc_kernel__.Protocol.Rpc_tag.t -> version:int -> metadata:string option -> Core.Bigstring.t -> pos:int -> len:int -> response_handler:response_handler option -> (unit Async_kernel.Deferred.t, Dispatch_error.t) Core.Result.t
val default_handshake_timeout : Core.Time_ns.Span.t
val events : t -> (Tracing_event.t -> unit) Bus.Read_only.t

Allows getting information from the RPC that may be used for tracing or metrics. The interface is not yet stable.

val default_handshake_header : Protocol_version_header.t

The header that would be sent at the beginning of a connection. This can be used to pre-share this part of the handshake (see the protocol_version_headers argument to create).

val set_metadata_hooks : t -> when_sending: (Async_rpc_kernel__.Description.t -> query_id:Core.Int63.t -> Rpc_metadata.t option) -> on_receive: (Async_rpc_kernel__.Description.t -> query_id:Core.Int63.t -> Rpc_metadata.t option -> Async_kernel.Execution_context.t -> Async_kernel.Execution_context.t) -> [ `Ok | `Already_set ]

Allows some extra information to be passed between clients and servers (e.g. for tracing). The when_sending function is called to compute the metadata that is sent along with rpc queries. It is called in the same async context as the dispatch function. The on_receive function is called before an rpc implementation to modify the execution context for that implementation. Metadata is not sent if the server is running an old version of the Async_rpc protocol. Handlers should not change the monitor on the execution context (this will have no effect on where errors are sent for rpc implementations).

The passed query_id may be used to correlate with a listener on the events bus.

val have_metadata_hooks_been_set : t -> bool

True if future calls to set_metadata_hooks will return `Already_set.

module For_testing : sig ... end
OCaml

Innovation. Community. Security.