package async_rpc_kernel

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file implementation.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
open Core
open Async_kernel
open Protocol
open Implementation_types.Implementation

module Expert = struct
  module Responder = struct
    type t = Expert.Responder.t =
      { query_id : Query_id.t
      ; writer : Transport.Writer.t
      ; mutable responded : bool
      }
    [@@deriving sexp_of]

    let create query_id writer = { query_id; writer; responded = false }
  end

  type implementation_result = Expert.implementation_result =
    | Replied
    | Delayed_response of unit Deferred.t
end

module F = struct
  type ('a, 'b) result_mode = ('a, 'b) F.result_mode =
    | Blocking : ('a, 'a) result_mode
    | Deferred : ('a, 'a Deferred.t) result_mode

  type ('connection_state, 'query, 'init, 'update) streaming_impl =
    ('connection_state, 'query, 'init, 'update) F.streaming_impl =
    | Pipe of
        ('connection_state
         -> 'query
         -> ('init * 'update Pipe.Reader.t, 'init) Result.t Deferred.t)
    | Direct of
        ('connection_state
         -> 'query
         -> 'update Implementation_types.Direct_stream_writer.t
         -> ('init, 'init) Result.t Deferred.t)

  type 'connection_state t = 'connection_state F.t =
    | One_way :
        'msg Bin_prot.Type_class.reader * ('connection_state -> 'msg -> unit)
        -> 'connection_state t
    | One_way_expert :
        ('connection_state -> Bigstring.t -> pos:int -> len:int -> unit)
        -> 'connection_state t
    | Rpc :
        'query Bin_prot.Type_class.reader
        * 'response Bin_prot.Type_class.writer
        * ('connection_state -> 'query -> 'result)
        * ('response, 'result) result_mode
        -> 'connection_state t
    | Rpc_expert :
        ('connection_state
         -> Expert.Responder.t
         -> Bigstring.t
         -> pos:int
         -> len:int
         -> 'result)
        * (Expert.implementation_result, 'result) result_mode
        -> 'connection_state t
    | Streaming_rpc :
        'query Bin_prot.Type_class.reader
    (* 'init can be an error or an initial state *)
        * 'init Bin_prot.Type_class.writer
        * 'update Bin_prot.Type_class.writer
        * ('connection_state, 'query, 'init, 'update) streaming_impl
        -> 'connection_state t

  let sexp_of_t _ = function
    | One_way_expert _ | One_way _ -> [%message "one-way"]
    | Rpc_expert _ | Rpc _ -> [%message "rpc"]
    | Streaming_rpc _ -> [%message "streaming-rpc"]
  ;;

  let lift t ~f =
    match t with
    | One_way (bin_msg, impl) -> One_way (bin_msg, fun state str -> impl (f state) str)
    | One_way_expert impl ->
      One_way_expert (fun state buf ~pos ~len -> impl (f state) buf ~pos ~len)
    | Rpc (bin_query, bin_response, impl, result_mode) ->
      Rpc (bin_query, bin_response, (fun state q -> impl (f state) q), result_mode)
    | Rpc_expert (impl, result_mode) ->
      Rpc_expert
        ((fun state resp buf ~pos ~len -> impl (f state) resp buf ~pos ~len), result_mode)
    | Streaming_rpc (bin_q, bin_i, bin_u, impl) ->
      let impl =
        match impl with
        | Pipe impl -> Pipe (fun state q -> impl (f state) q)
        | Direct impl -> Direct (fun state q w -> impl (f state) q w)
      in
      Streaming_rpc (bin_q, bin_i, bin_u, impl)
  ;;
end

type nonrec 'connection_state t = 'connection_state t =
  { tag : Rpc_tag.t
  ; version : int
  ; f : 'connection_state F.t
  ; shapes : Sexp.t Lazy.t
  ; on_exception : On_exception.t
  }
[@@deriving sexp_of]

let description t = { Description.name = Rpc_tag.to_string t.tag; version = t.version }
let lift t ~f = { t with f = F.lift ~f t.f }
let update_on_exception t ~f = { t with on_exception = f t.on_exception }