package async_rpc_kernel

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file implementation_types.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
open Core
open Async_kernel
open Protocol

(** The types of the [Implementation] and [Implementations] modules, which have
    a dependency cyle: [Implementation] -> [Direct_stream_writer] ->
    [Implementations] -> [Implementation]. *)

module Direct_stream_writer_id = Unique_id.Int63 ()

module On_exception = struct
  type t =
    { callback : (exn -> unit) option [@sexp.omit_nil]
    ; close_connection_if_no_return_value : bool
    }
  [@@deriving sexp_of]
end

module rec Implementation : sig
  module Expert : sig
    module Responder : sig
      type t =
        { query_id : Query_id.t
        ; writer : Transport.Writer.t
        ; mutable responded : bool
        }
    end

    type implementation_result =
      | Replied
      | Delayed_response of unit Deferred.t
  end

  module F : sig
    type (_, _) result_mode =
      | Blocking : ('a, 'a Or_not_authorized.t) result_mode
      | Deferred : ('a, 'a Or_not_authorized.t Deferred.t) result_mode

    type ('connection_state, 'query, 'init, 'update) streaming_impl =
      | Pipe of
          ('connection_state
           -> 'query
           -> ('init * 'update Pipe.Reader.t, 'init) Result.t Or_not_authorized.t
                Deferred.t)
      | Direct of
          ('connection_state
           -> 'query
           -> 'update Direct_stream_writer.t
           -> ('init, 'init) Result.t Or_not_authorized.t Deferred.t)

    type ('connection_state, 'query, 'init, 'update) streaming_rpc =
      { bin_query_reader : 'query Bin_prot.Type_class.reader
      ; bin_init_writer : 'init Bin_prot.Type_class.writer
      ; bin_update_writer : 'update Bin_prot.Type_class.writer
      (* 'init can be an error or an initial state *)
      ; impl : ('connection_state, 'query, 'init, 'update) streaming_impl
      }

    type 'connection_state t =
      | One_way :
          'msg Bin_prot.Type_class.reader
          * ('connection_state -> 'msg -> unit Or_not_authorized.t Deferred.t)
          -> 'connection_state t
      | One_way_expert :
          ('connection_state
           -> Bigstring.t
           -> pos:int
           -> len:int
           -> unit Or_not_authorized.t Deferred.t)
          -> 'connection_state t
      | Rpc :
          'query Bin_prot.Type_class.reader
          * 'response Bin_prot.Type_class.writer
          * ('connection_state -> 'query -> 'result)
          * ('response, 'result) result_mode
          -> 'connection_state t
      | Rpc_expert :
          ('connection_state
           -> Expert.Responder.t
           -> Bigstring.t
           -> pos:int
           -> len:int
           -> 'result)
          * (Expert.implementation_result, 'result) result_mode
          -> 'connection_state t
      | Streaming_rpc :
          ('connection_state, 'query, 'init, 'update) streaming_rpc
          -> 'connection_state t
  end

  type 'connection_state t =
    { tag : Rpc_tag.t
    ; version : int
    ; f : 'connection_state F.t
    ; shapes : Rpc_shapes.t Lazy.t
    ; on_exception : On_exception.t
    }
end =
  Implementation

and Implementations : sig
  type 'connection_state on_unknown_rpc =
    [ `Raise
    | `Continue
    | `Close_connection
    | `Call of
        'connection_state
        -> rpc_tag:string
        -> version:int
        -> [ `Close_connection | `Continue ]
    | `Expert of
        'connection_state
        -> rpc_tag:string
        -> version:int
        -> Implementation.Expert.Responder.t
        -> Bigstring.t
        -> pos:int
        -> len:int
        -> unit Deferred.t
    ]

  type 'connection_state t =
    { implementations : 'connection_state Implementation.t Description.Table.t
    ; on_unknown_rpc : 'connection_state on_unknown_rpc
    }

  type 'connection_state implementations = 'connection_state t

  module rec Instance : sig
    type streaming_response =
      | Pipe : _ Pipe.Reader.t -> streaming_response
      | Direct : _ Direct_stream_writer.t -> streaming_response

    type 'a unpacked =
      { implementations : 'a implementations
      ; writer : Transport.Writer.t
      ; open_streaming_responses : (Query_id.t, streaming_response) Hashtbl.t
      ; mutable stopped : bool
      ; connection_state : 'a
      ; connection_description : Info.t
      ; connection_close_started : Info.t Deferred.t
      ; mutable
        last_dispatched_implementation :
          (Description.t * 'a Implementation.t) option
      ; packed_self : t
      }

    and t = T : _ unpacked -> t
  end
end =
  Implementations

and Direct_stream_writer : sig
  module Pending_response : sig
    type 'a t =
      | Normal of 'a
      | Expert of string
  end

  module State : sig
    type 'a t =
      | Not_started of 'a Pending_response.t Queue.t
      | Started
  end

  module Id = Direct_stream_writer_id

  type 'a t =
    { id : Id.t
    ; mutable state : 'a State.t
    ; closed : unit Ivar.t
    ; instance : Implementations.Instance.t
    ; query_id : Query_id.t
    ; stream_writer : 'a Cached_bin_writer.t
    ; groups : 'a group_entry Bag.t
    }

  and 'a group_entry =
    { group : 'a Direct_stream_writer.Group.t
    ; element_in_group : 'a t Bag.Elt.t
    }

  module Group : sig
    type 'a direct_stream_writer = 'a t

    type 'a t =
      { (* [components] is only tracked separately from [components_by_id] so we can iterate
           over its elements more quickly than we could iterate over the values of
           [components_by_id]. *)
        mutable components : 'a direct_stream_writer Bag.t
      ; components_by_id : 'a component Id.Table.t
      ; buffer : Bigstring.t ref
      }

    and 'a component =
      { writer_element_in_group : 'a direct_stream_writer Bag.Elt.t
      ; group_element_in_writer : 'a group_entry Bag.Elt.t
      }
  end
  with type 'a direct_stream_writer := 'a t
end =
  Direct_stream_writer

and Cached_bin_writer : sig
  type 'a t =
    { header_prefix : string (* Bin_protted constant prefix of the message *)
    ; mutable data_len : Nat0.t
    ; bin_writer : 'a Bin_prot.Type_class.writer
    }
end =
  Cached_bin_writer