package async_rpc_kernel

  1. Overview
  2. Docs
Legend:
Library
Module
Module type
Parameter
Class
Class type
type ('query, 'response, 'error) t
module Id : sig ... end
module Metadata : sig ... end
val create : ?client_pushes_back:unit -> name:string -> version:int -> bin_query:'query Core.Bin_prot.Type_class.t -> bin_response:'response Core.Bin_prot.Type_class.t -> bin_error:'error Core.Bin_prot.Type_class.t -> unit -> ('query, 'response, 'error) t
val bin_query : ('query, _, _) t -> 'query Core.Bin_prot.Type_class.t
val bin_response : (_, 'response, _) t -> 'response Core.Bin_prot.Type_class.t
val bin_error : (_, _, 'error) t -> 'error Core.Bin_prot.Type_class.t
val shapes : (_, _, _) t -> Rpc_shapes.t
val implement : ?on_exception:On_exception.t -> ('query, 'response, 'error) t -> ('connection_state -> 'query -> ('response Async_kernel.Pipe.Reader.t, 'error) Core.Result.t Async_kernel.Deferred.t) -> 'connection_state Implementation.t

The pipe returned by the implementation function will be closed automatically when either the connection to the client is closed or the client closes their pipe.

As described in create, elements in the returned pipe will be transferred to the underlying connection, waiting after each batch of elements in the pipe until those bytes have been flushed to the connection. Slow connections or client_pushes_back can cause elements to buffer in the pipe if the server doesn't respect pushback on the pipe.

val implement_with_auth : ?on_exception:On_exception.t -> ('query, 'response, 'error) t -> ('connection_state -> 'query -> ('response Async_kernel.Pipe.Reader.t, 'error) Core.Result.t Or_not_authorized.t Async_kernel.Deferred.t) -> 'connection_state Implementation.t
module Direct_stream_writer : sig ... end

A Direct_stream_writer.t is a simple object for responding to a Pipe_rpc or State_rpc query.

val implement_direct : ?on_exception:On_exception.t -> ('query, 'response, 'error) t -> ('connection_state -> 'query -> 'response Direct_stream_writer.t -> (unit, 'error) Core.Result.t Async_kernel.Deferred.t) -> 'connection_state Implementation.t

Similar to implement, but you are given the writer instead of providing a writer and the writer is a Direct_stream_writer.t instead of a Pipe.Writer.t.

The main advantage of this interface is that it consumes far less memory per open query.

Though the implementation function is given a writer immediately, the result of the client's call to dispatch will not be determined until after the implementation function returns. Elements written before the function returns will be queued up to be written after the function returns.

val implement_direct_with_auth : ?on_exception:On_exception.t -> ('query, 'response, 'error) t -> ('connection_state -> 'query -> 'response Direct_stream_writer.t -> (unit, 'error) Core.Result.t Or_not_authorized.t Async_kernel.Deferred.t) -> 'connection_state Implementation.t
val dispatch : ('query, 'response, 'error) t -> Connection.t -> 'query -> ('response Async_kernel.Pipe.Reader.t * Metadata.t, 'error) Core.Result.t Core.Or_error.t Async_kernel.Deferred.t

This has (..., 'error) Result.t as its return type to represent the possibility of the call itself being somehow erroneous (but understood - the outer Or_error.t encompasses failures of that nature). Note that this cannot be done simply by making 'response a result type, since ('response Pipe.Reader.t, 'error) Result.t is distinct from ('response, 'error) Result.t Pipe.Reader.t.

Note that the pipe will be closed if either of:

  • The implementer closes the pipe
  • The Connection.t is closed, either intentionally or due to a network error or other failure

This means that it's possible for the pipe returned from dispatch to close before all the data that the server wanted to send has been received. If it's important to ensure that you got all the values the server intended to send, you should call close_reason with the provided Metadata.t to check why the pipe was closed.

Closing the pipe has the effect of calling abort.

val dispatch' : ('query, 'response, 'error) t -> Connection.t -> 'query -> ('response Async_kernel.Pipe.Reader.t * Metadata.t, 'error) Core.Result.t Rpc_result.t Async_kernel.Deferred.t

Like dispatch but gives an Rpc_error.t instead of an Error.t.

val dispatch_exn : ('query, 'response, 'error) t -> Connection.t -> 'query -> ('response Async_kernel.Pipe.Reader.t * Metadata.t) Async_kernel.Deferred.t
module Pipe_message = Pipe_message
module Pipe_response = Pipe_response
val dispatch_iter : ('query, 'response, 'error) t -> Connection.t -> 'query -> f:('response Pipe_message.t -> Pipe_response.t) -> (Id.t, 'error) Core.Result.t Core.Or_error.t Async_kernel.Deferred.t

Calling dispatch_iter t conn query ~f is similar to calling dispatch t conn query and then iterating over the result pipe with f. The main advantage it offers is that its memory usage is much lower, making it more suitable for situations where many queries are open at once.

f may be fed any number of Update _ messages, followed by a single Closed _ message.

f can cause the connection to stop reading messages off of its underlying Reader.t by returning Wait _. This is the same as what happens when a client stops reading from the pipe returned by dispatch when the Pipe_rpc.t has client_pushes_back set.

When successful, dispatch_iter returns an Id.t after the subscription is started. This may be fed to abort with the same Pipe_rpc.t and Connection.t as the call to dispatch_iter to cancel the subscription, which will close the pipe on the implementation side. Calling it with a different Pipe_rpc.t or Connection.t has undefined behavior.

module Expert : sig ... end
val abort : (_, _, _) t -> Connection.t -> Id.t -> unit

abort rpc connection id given an RPC and the id returned as part of a call to dispatch, abort requests that the other side of the connection stop sending updates.

If you are using dispatch rather than dispatch_iter, you are encouraged to close the pipe you receive rather than calling abort -- both of these have the same effect.

close_reason metadata will be determined sometime after the pipe associated with metadata is closed. Its value will indicate what caused the pipe to be closed.

val client_pushes_back : (_, _, _) t -> bool
val name : (_, _, _) t -> string
val version : (_, _, _) t -> int
val description : (_, _, _) t -> Description.t
val query_type_id : ('query, _, _) t -> 'query Core.Type_equal.Id.t
val response_type_id : (_, 'response, _) t -> 'response Core.Type_equal.Id.t
val error_type_id : (_, _, 'error) t -> 'error Core.Type_equal.Id.t
OCaml

Innovation. Community. Security.