package grpc-eio

  1. Overview
  2. Docs

Source file connection.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
let grpc_recv_streaming body message_buffer_writer =
  let request_buffer = Grpc.Buffer.v () in
  let on_eof () = Seq.close_writer message_buffer_writer in
  let rec on_read buffer ~off ~len =
    Grpc.Buffer.copy_from_bigstringaf ~src_off:off ~src:buffer
      ~dst:request_buffer ~length:len;
    Grpc.Message.extract_all (Seq.write message_buffer_writer) request_buffer;
    H2.Body.Reader.schedule_read body ~on_read ~on_eof
  in
  H2.Body.Reader.schedule_read body ~on_read ~on_eof

let grpc_send_streaming_client body encoder_stream =
  Seq.iter
    (fun encoder ->
      let payload = Grpc.Message.make encoder in
      H2.Body.Writer.write_string body payload)
    encoder_stream;
  H2.Body.Writer.close body

let grpc_send_streaming request encoder_stream status_promise =
  let body =
    H2.Reqd.respond_with_streaming ~flush_headers_immediately:true request
      (H2.Response.create
         ~headers:
           (H2.Headers.of_list [ ("content-type", "application/grpc+proto") ])
         `OK)
  in
  Seq.iter
    (fun input ->
      let payload = Grpc.Message.make input in
      H2.Body.Writer.write_string body payload;
      H2.Body.Writer.flush body (fun () -> ()))
    encoder_stream;
  let status = Eio.Promise.await status_promise in
  H2.Reqd.schedule_trailers request
    (H2.Headers.of_list
       ([
          ( "grpc-status",
            string_of_int (Grpc.Status.int_of_code (Grpc.Status.code status)) );
        ]
       @
       match Grpc.Status.message status with
       | None -> []
       | Some message -> [ ("grpc-message", message) ]));
  H2.Body.Writer.close body