package streamable

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file versioned_state_rpc.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
open! Core
open! Async_kernel
open! Import
include Versioned_state_rpc_intf

module Caller_converts = struct
  module type S = Caller_converts

  module Make (Model : sig
    val name : string

    type query
    type state
    type update
  end) =
  struct
    let name = Model.name

    type dispatch_fun =
      Rpc.Connection.t
      -> Model.query
      -> (Model.state * Model.update Or_error.t Pipe.Reader.t) Or_error.t
         Deferred.Or_error.t

    let registry : dispatch_fun Callers_rpc_version_table.t =
      Callers_rpc_version_table.create ~rpc_name:name
    ;;

    let dispatch_multi' conn_with_menu query =
      let conn = Versioned_rpc.Connection_with_menu.connection conn_with_menu in
      let menu = Versioned_rpc.Connection_with_menu.menu conn_with_menu in
      match Callers_rpc_version_table.lookup_most_recent registry ~callee_menu:menu with
      | Error e -> return (Error e)
      | Ok dispatch -> dispatch conn query
    ;;

    let dispatch_multi conn_with_menu query =
      dispatch_multi' conn_with_menu query |> Deferred.map ~f:Or_error.join
    ;;

    module Register (Version : sig
      type query [@@deriving bin_io]
      type state

      module State : Main.S_rpc with type t = state

      type update

      module Update : Main.S_rpc with type t = update

      val version : int
      val query_of_model : Model.query -> query
      val model_of_state : state -> Model.state
      val model_of_update : update -> Model.update
      val client_pushes_back : bool
    end) =
    struct
      (* introduce [rpc] *)
      include State_rpc.Make (struct
        let name = name

        include Version
      end)

      let version = Version.version

      let dispatch' conn query =
        let open Deferred.Or_error.Let_syntax in
        let query = Version.query_of_model query in
        let%bind server_response = State_rpc.dispatch' rpc conn query in
        Or_error.map server_response ~f:(fun (state, updates) ->
          let state = Version.model_of_state state in
          let updates =
            Pipe.map updates ~f:(fun update ->
              Or_error.try_with (fun () -> Version.model_of_update update))
          in
          state, updates)
        |> return
      ;;

      let () = Callers_rpc_version_table.add_exn registry ~version dispatch'
    end
  end
end

module Callee_converts = struct
  module type S = Callee_converts

  module Make (Model : sig
    val name : string

    type query
    type state
    type update
  end) =
  struct
    let name = Model.name

    type implementation =
      { implement :
          's.
          ?on_exception:Rpc.On_exception.t
          -> ('s
              -> version:int
              -> Model.query
              -> (Model.state * Model.update Pipe.Reader.t) Deferred.Or_error.t)
          -> 's Rpc.Implementation.t
      }

    let registry : implementation Callers_rpc_version_table.t =
      Callers_rpc_version_table.create ~rpc_name:name
    ;;

    let implement_multi ?on_exception f =
      List.map (Callers_rpc_version_table.data registry) ~f:(fun { implement } ->
        implement ?on_exception f)
    ;;

    module Register (Version : sig
      type query [@@deriving bin_io]
      type state

      module State : Main.S_rpc with type t = state

      type update

      module Update : Main.S_rpc with type t = update

      val version : int
      val model_of_query : query -> Model.query
      val state_of_model : Model.state -> state
      val update_of_model : Model.update -> update
      val client_pushes_back : bool
    end) =
    struct
      (* introduce [rpc] *)
      include State_rpc.Make (struct
        let name = name

        include Version
      end)

      let version = Version.version

      let implement
        (type s)
        ?on_exception
        (f :
          s
          -> version:int
          -> Model.query
          -> (Model.state * Model.update Pipe.Reader.t) Deferred.Or_error.t)
        =
        State_rpc.implement ?on_exception rpc (fun conn_state query ->
          let open Deferred.Or_error.Let_syntax in
          let query = Version.model_of_query query in
          let%bind state, updates = f ~version conn_state query in
          let state = Version.state_of_model state in
          let updates = Pipe.map updates ~f:Version.update_of_model in
          return (state, updates))
      ;;

      let () = Callers_rpc_version_table.add_exn registry ~version { implement }
    end
  end
end

module Both_convert = struct
  module type S = Both_convert

  module Make (Model : sig
    val name : string

    module Caller : sig
      type query
      type state
      type update
    end

    module Callee : sig
      type query
      type state
      type update
    end
  end) =
  struct
    let name = Model.name

    module Caller = Caller_converts.Make (struct
      let name = name

      include Model.Caller
    end)

    module Callee = Callee_converts.Make (struct
      let name = name

      include Model.Callee
    end)

    module Register (Version : sig
      val version : int

      type query [@@deriving bin_io]
      type state

      module State : Main.S_rpc with type t = state

      type update

      module Update : Main.S_rpc with type t = update

      val query_of_caller_model : Model.Caller.query -> query
      val callee_model_of_query : query -> Model.Callee.query
      val state_of_callee_model : Model.Callee.state -> state
      val caller_model_of_state : state -> Model.Caller.state
      val update_of_callee_model : Model.Callee.update -> update
      val caller_model_of_update : update -> Model.Caller.update
      val client_pushes_back : bool
    end) =
    struct
      include Callee.Register (struct
        include Version

        let model_of_query = callee_model_of_query
        let state_of_model = state_of_callee_model
        let update_of_model = update_of_callee_model
      end)

      include Caller.Register (struct
        include Version

        let query_of_model = query_of_caller_model
        let model_of_state = caller_model_of_state
        let model_of_update = caller_model_of_update
      end)
    end

    let dispatch_multi = Caller.dispatch_multi
    let dispatch_multi' = Caller.dispatch_multi'
    let implement_multi = Callee.implement_multi
  end
end
OCaml

Innovation. Community. Security.