package capnp-rpc-net
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>
Cap'n Proto is a capability-based RPC system with bindings for many languages
Install
dune-project
Dependency
Authors
Maintainers
Sources
capnp-rpc-2.1.1.tbz
sha256=6e9675034c8eac5873ed511f9b968db5223278145bb02ac4a970053a53970a48
sha512=2e2eb8389071bdad3ceef1d15200bf28987f13319f754f4d1603828d0d79202b4de90a6eb294f12ee088c7e3b73755286fbe7076b8fd3d0b29644221e0e7e080
doc/src/capnp-rpc-net/endpoint.ml.html
Source file endpoint.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134open Eio.Std module Metrics = struct open Prometheus let namespace = "capnp" let subsystem = "net" let connections = let help = "Number of live capnp-rpc connections" in Gauge.v ~help ~namespace ~subsystem "connections" let messages_inbound_received_total = let help = "Total number of messages received" in Counter.v ~help ~namespace ~subsystem "messages_inbound_received_total" let messages_outbound_enqueued_total = let help = "Total number of messages enqueued to be transmitted" in Counter.v ~help ~namespace ~subsystem "messages_outbound_enqueued_total" end module Write = Eio.Buf_write let src = Logs.Src.create "endpoint" ~doc:"Send and receive Cap'n'Proto messages" module Log = (val Logs.src_log src: Logs.LOG) let compression = `None let record_sent_messages = false type flow = Eio.Flow.two_way_ty r type t = { flow : flow; writer : Write.t; decoder : Capnp.Codecs.FramedStream.t; peer_id : Auth.Digest.t; recv_buf : Cstruct.t; } let peer_id t = t.peer_id let of_flow ~peer_id flow = let decoder = Capnp.Codecs.FramedStream.empty compression in let flow = (flow :> flow) in let writer = Write.create 4096 in let recv_buf = Cstruct.create 4096 in { flow; writer; decoder; peer_id; recv_buf } let dump_msg = let next = ref 0 in fun data -> let name = Fmt.str "/tmp/msg-%d.capnp" !next in Log.info (fun f -> f "Saved message as %S" name); incr next; let ch = open_out_bin name in output_string ch data; close_out ch let send t msg = Log.debug (fun f -> let module M = Capnp_rpc.Private.Schema.MessageWrapper.Message in f "queue_send: %d/%d allocated bytes in %d segs" (M.total_size msg) (M.total_alloc_size msg) (M.num_segments msg)); Capnp.Codecs.serialize_iter_copyless ~compression msg ~f:(fun x len -> Write.string t.writer x ~len); Prometheus.Counter.inc_one Metrics.messages_outbound_enqueued_total; if record_sent_messages then dump_msg (Capnp.Codecs.serialize ~compression msg) let rec recv ~ t = match Capnp.Codecs.FramedStream.get_next_frame t.decoder with | Ok msg -> Prometheus.Counter.inc_one Metrics.messages_inbound_received_total; (* We often want to send multiple response messages while processing a batch of requests, so pause the writer to collect them. We'll unpause on the next [single_read]. *) Write.pause t.writer; Ok (Capnp.BytesMessage.Message.readonly msg) | Error Capnp.Codecs.FramingError.Unsupported -> failwith "Unsupported Cap'n'Proto frame received" | Error Capnp.Codecs.FramingError.Incomplete -> Log.debug (fun f -> f ~tags "Incomplete; waiting for more data..."); (* We probably scheduled one or more application fibers to run while handling the last batch of messages. Give them a chance to run now while the writer is paused, because they might want to send more messages immediately. *) Fiber.yield (); Write.unpause t.writer; match Eio.Flow.single_read t.flow t.recv_buf with | got -> Log.debug (fun f -> f ~tags "Read %d bytes" got); Capnp.Codecs.FramedStream.add_fragment t.decoder (Cstruct.to_string t.recv_buf ~len:got); recv ~tags t | exception End_of_file -> Log.info (fun f -> f ~tags "Received end-of-stream"); Error `Closed | exception (Eio.Io (Eio.Net.E Connection_reset _, _) as ex) -> Log.info (fun f -> f ~tags "Receive failed: %a" Eio.Exn.pp ex); Error `Closed let disconnect t = try Eio.Flow.shutdown t.flow `All with Eio.Io (Eio.Net.E Connection_reset _, _) -> (* TCP connection already shut down, so TLS shutdown failed. Ignore. *) () let shutdown_send t = Write.unpause t.writer; Write.close t.writer let rec run_writer ~ t = match Write.await_batch t.writer with | exception End_of_file -> () (* Due to [shutdown_send] closing it. *) | bufs -> match Eio.Flow.single_write t.flow bufs with | n -> Write.shift t.writer n; run_writer ~tags t | exception (Eio.Io (Eio.Net.E Connection_reset _, _) as ex) -> Log.info (fun f -> f ~tags "Send failed: %a" Eio.Exn.pp ex) | exception ex -> Eio.Fiber.check (); Log.warn (fun f -> f ~tags "Error sending messages: %a (will shutdown connection)" Fmt.exn ex) let run_writer ~ t = let cleanup () = Prometheus.Gauge.dec_one Metrics.connections; disconnect t (* The listen fiber will read end-of-stream soon *) in Prometheus.Gauge.inc_one Metrics.connections; match run_writer ~tags t with | () -> cleanup () | exception ex -> let bt = Printexc.get_raw_backtrace () in cleanup (); Printexc.raise_with_backtrace ex bt
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>