package rpc_parallel

  1. Overview
  2. Docs
Legend:
Library
Module
Module type
Parameter
Class
Class type
type worker
type worker_state = Worker_state.t
type connection_state = Connection_state.t
val create_rpc : ?name:string -> f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> 'response Async.Deferred.t) -> bin_input:'query Core.Bin_prot.Type_class.t -> bin_output:'response Core.Bin_prot.Type_class.t -> unit -> (worker, 'query, 'response) Parallel.Function.t

create_rpc ?name ~f ~bin_input ~bin_output () will create an Rpc.Rpc.t with name if specified and use f as an implementation for this Rpc. It returns back a _function, a type-safe Rpc protocol.

val create_pipe : ?name:string -> ?client_pushes_back:unit -> f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> 'response Async.Pipe.Reader.t Async.Deferred.t) -> bin_input:'query Core.Bin_prot.Type_class.t -> bin_output:'response Core.Bin_prot.Type_class.t -> unit -> (worker, 'query, 'response Async.Pipe.Reader.t) Parallel.Function.t

create_pipe ?name ~f ~bin_input ~bin_output () will create an Rpc.Pipe_rpc.t with name if specified. The implementation for this Rpc is a function that creates a Pipe.Reader.t and a Pipe.Writer.t, then calls f arg ~writer and returns the reader.

Notice that aborted is not exposed. The pipe is closed upon aborted.

val create_direct_pipe : ?name:string -> ?client_pushes_back:unit -> f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> 'response Async.Rpc.Pipe_rpc.Direct_stream_writer.t -> unit Async.Deferred.t) -> bin_input:'query Core.Bin_prot.Type_class.t -> bin_output:'response Core.Bin_prot.Type_class.t -> unit -> (worker, 'query, 'response) Parallel.Function.Direct_pipe.t

create_direct_pipe ?name ~f ~bin_input ~bin_output () will create an Rpc.Pipe_rpc.t with name if specified.

As per the documentation at lib/async_rpc_kernel/src/rpc.mli:

Though the implementation function is given a writer immediately, the result of the client's call to dispatch will not be determined until after the implementation function returns. Elements written before the function returns will be queued up to be written after the function returns.

val create_one_way : ?name:string -> f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> unit) -> bin_input:'query Core.Bin_prot.Type_class.t -> unit -> (worker, 'query, unit) Parallel.Function.t

create_one_way ?name ~f ~bin_msg () will create an Rpc.One_way.t with name if specified and use f as an implementation.

val create_reverse_pipe : ?name:string -> ?client_pushes_back:unit -> f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> 'update Async.Pipe.Reader.t -> 'response Async.Deferred.t) -> bin_query:'query Core.Bin_prot.Type_class.t -> bin_update:'update Core.Bin_prot.Type_class.t -> bin_response:'response Core.Bin_prot.Type_class.t -> unit -> (worker, 'query * 'update Async.Pipe.Reader.t, 'response) Parallel.Function.t

create_reverse_pipe ?name ~f ~bin_query ~bin_update ~bin_response () generates a function allowing you to send a query and a pipe of updates to a worker. The worker will send back a response. It is up to you whether to send a response before or after finishing with the pipe; Rpc_parallel doesn't care.

val create_reverse_direct_pipe : ?name:string -> ?client_pushes_back:unit -> f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> 'update Async.Pipe.Reader.t -> 'response Async.Deferred.t) -> bin_query:'query Core.Bin_prot.Type_class.t -> bin_update:'update Core.Bin_prot.Type_class.t -> bin_response:'response Core.Bin_prot.Type_class.t -> unit -> (worker, 'query * ('update Async.Rpc.Pipe_rpc.Direct_stream_writer.t -> unit Core.Or_error.t Async.Deferred.t), 'response) Parallel.Function.t

create_reverse_direct_pipe ?name ~f ~bin_query ~bin_update ~bin_response () generates a function allowing you to send a query and a direct stream of updates to a worker. The worker will send back a response. It is up to you whether to send a response before or after finishing with the pipe; Rpc_parallel doesn't care.

val create_state : ?name:string -> ?client_pushes_back:unit -> f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> ('state * 'update Async.Pipe.Reader.t) Async.Deferred.t) -> bin_query:'query Core.Bin_prot.Type_class.t -> bin_state:'state Core.Bin_prot.Type_class.t -> bin_update:'update Core.Bin_prot.Type_class.t -> unit -> (worker, 'query, 'state * 'update Async.Pipe.Reader.t) Parallel.Function.t

create_state ?name ~f ~bin_query ~bin_state ~bin_update () will create an Rpc.State_rpc.t with name if specified.

val of_async_rpc : f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> 'response Async.Deferred.t) -> ('query, 'response) Async.Rpc.Rpc.t -> (worker, 'query, 'response) Parallel.Function.t

of_async_rpc ~f rpc is the analog to create_rpc but instead of creating an Rpc protocol, it uses the supplied one

val of_async_pipe_rpc : f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> 'response Async.Pipe.Reader.t Async.Deferred.t) -> ('query, 'response, Core.Error.t) Async.Rpc.Pipe_rpc.t -> (worker, 'query, 'response Async.Pipe.Reader.t) Parallel.Function.t

of_async_pipe_rpc ~f rpc is the analog to create_pipe but instead of creating a Pipe rpc protocol, it uses the supplied one.

Notice that aborted is not exposed. The pipe is closed upon aborted.

val of_async_direct_pipe_rpc : f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> 'response Async.Rpc.Pipe_rpc.Direct_stream_writer.t -> unit Async.Deferred.t) -> ('query, 'response, Core.Error.t) Async.Rpc.Pipe_rpc.t -> (worker, 'query, 'response) Parallel.Function.Direct_pipe.t

of_async_direct_pipe_rpc ~f rpc is the analog to create_direct_pipe but instead of creating a Pipe rpc protocol, it uses the supplied one.

val of_async_one_way_rpc : f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> unit) -> 'query Async.Rpc.One_way.t -> (worker, 'query, unit) Parallel.Function.t

of_async_one_way_rpc ~f rpc is the analog to create_one_way but instead of creating a One_way rpc protocol, it uses the supplied one

val of_async_state_rpc : f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> ('state * 'update Async.Pipe.Reader.t) Async.Deferred.t) -> ('query, 'state, 'update, Core.Error.t) Async.Rpc.State_rpc.t -> (worker, 'query, 'state * 'update Async.Pipe.Reader.t) Parallel.Function.t

of_async_state_rpc ~f rpc is the analog to create_state but instead of creating a State rpc protocol, it uses the supplied one

OCaml

Innovation. Community. Security.