Source file implementation.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
open Core
open Async_kernel
open Protocol
open Implementation_types.Implementation
module Expert = struct
module Responder = struct
type t = Expert.Responder.t =
{ query_id : Query_id.t
; writer : Transport.Writer.t
; mutable responded : bool
}
[@@deriving sexp_of]
let create query_id writer = { query_id; writer; responded = false }
end
type implementation_result = Expert.implementation_result =
| Replied
| Delayed_response of unit Deferred.t
end
module F = struct
type ('a, 'b) result_mode = ('a, 'b) F.result_mode =
| Blocking : ('a, 'a) result_mode
| Deferred : ('a, 'a Deferred.t) result_mode
type ('connection_state, 'query, 'init, 'update) streaming_impl =
('connection_state, 'query, 'init, 'update) F.streaming_impl =
| Pipe of
('connection_state
-> 'query
-> ('init * 'update Pipe.Reader.t, 'init) Result.t Deferred.t)
| Direct of
('connection_state
-> 'query
-> 'update Implementation_types.Direct_stream_writer.t
-> ('init, 'init) Result.t Deferred.t)
type 'connection_state t = 'connection_state F.t =
| One_way :
'msg Bin_prot.Type_class.reader * ('connection_state -> 'msg -> unit)
-> 'connection_state t
| One_way_expert :
('connection_state -> Bigstring.t -> pos:int -> len:int -> unit)
-> 'connection_state t
| Rpc :
'query Bin_prot.Type_class.reader
* 'response Bin_prot.Type_class.writer
* ('connection_state -> 'query -> 'result)
* ('response, 'result) result_mode
-> 'connection_state t
| Rpc_expert :
('connection_state
-> Expert.Responder.t
-> Bigstring.t
-> pos:int
-> len:int
-> 'result)
* (Expert.implementation_result, 'result) result_mode
-> 'connection_state t
| Streaming_rpc :
'query Bin_prot.Type_class.reader
* 'init Bin_prot.Type_class.writer
* 'update Bin_prot.Type_class.writer
* ('connection_state, 'query, 'init, 'update) streaming_impl
-> 'connection_state t
let sexp_of_t _ = function
| One_way_expert _ | One_way _ -> [%message "one-way"]
| Rpc_expert _ | Rpc _ -> [%message "rpc"]
| Streaming_rpc _ -> [%message "streaming-rpc"]
;;
let lift t ~f =
match t with
| One_way (bin_msg, impl) -> One_way (bin_msg, fun state str -> impl (f state) str)
| One_way_expert impl ->
One_way_expert (fun state buf ~pos ~len -> impl (f state) buf ~pos ~len)
| Rpc (bin_query, bin_response, impl, result_mode) ->
Rpc (bin_query, bin_response, (fun state q -> impl (f state) q), result_mode)
| Rpc_expert (impl, result_mode) ->
Rpc_expert
((fun state resp buf ~pos ~len -> impl (f state) resp buf ~pos ~len), result_mode)
| Streaming_rpc (bin_q, bin_i, bin_u, impl) ->
let impl =
match impl with
| Pipe impl -> Pipe (fun state q -> impl (f state) q)
| Direct impl -> Direct (fun state q w -> impl (f state) q w)
in
Streaming_rpc (bin_q, bin_i, bin_u, impl)
;;
end
type nonrec 'connection_state t = 'connection_state t =
{ tag : Rpc_tag.t
; version : int
; f : 'connection_state F.t
; shapes : Sexp.t Lazy.t
; on_exception : On_exception.t
}
[@@deriving sexp_of]
let description t = { Description.name = Rpc_tag.to_string t.tag; version = t.version }
let lift t ~f = { t with f = F.lift ~f t.f }
let update_on_exception t ~f = { t with on_exception = f t.on_exception }