package dream-httpaf
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>
Internal: shared http/af stack for Dream (server) and Hyper (client)
Install
dune-project
Dependency
Authors
Maintainers
Sources
dream-1.0.0-alpha6.tar.gz
sha256=8d3b6344c0e175aca628b3d5bb8ee58265e8c1074fc2d40d63f136fef83daf90
doc/src/dream-httpaf/websocket.ml.html
Source file websocket.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239(* This file is part of Dream, released under the MIT license. See LICENSE.md for details, or visit https://github.com/aantron/dream. Copyright 2022 Anton Bachin *) module Websocketaf = Dream_websocketaf.Websocketaf module Stream = Dream_pure.Stream let websocket_handler stream socket = (* Queue of received frames. There doesn't appear to be a nice way to achieve backpressure with the current API of websocket/af, so that will have to be added later. The user-facing API of Dream does support backpressure. *) let frames, push_frame = Lwt_stream.create () in let message_is_binary = ref `Binary in (* Frame reader called by websocket/af on each frame received. There is no good way to truly throttle this, hence this frame reader pushes frame objects into the above frame queue for the reader to take from later. See https://github.com/anmonteiro/websocketaf/issues/34. *) let frame ~opcode ~is_fin ~len:_ payload = match opcode with | `Connection_close -> push_frame (Some (`Close, payload)) | `Ping -> push_frame (Some (`Ping, payload)) | `Pong -> push_frame (Some (`Pong, payload)) | `Other _ -> push_frame (Some (`Other, payload)) | `Text -> message_is_binary := `Text; push_frame (Some (`Data (`Text, is_fin), payload)) | `Binary -> message_is_binary := `Binary; push_frame (Some (`Data (`Binary, is_fin), payload)) | `Continuation -> push_frame (Some (`Data (!message_is_binary, is_fin), payload)) in let eof () = push_frame None in (* The reader retrieves the next frame. If it is a data frame, it keeps a reference to the payload across multiple reader calls, until the payload is exhausted. *) let closed = ref false in let close_code = ref 1005 in let current_payload = ref None in (* Used to convert the separate on_eof payload reading callback into a FIN bit on the last chunk read. See https://github.com/anmonteiro/websocketaf/issues/35. *) let last_chunk = ref None in (* TODO Review per-chunk allocations, including current_payload contents. *) (* For control frames, the payload can be at most 125 bytes long. We assume that the first chunk will contain the whole payload, and discard any other chunks that may be reported by websocket/af. *) let first_chunk_received = ref false in let first_chunk = ref Bigstringaf.empty in let first_chunk_offset = ref 0 in let first_chunk_length = ref 0 in let rec drain_payload payload continuation = Websocketaf.Payload.schedule_read payload ~on_read:(fun buffer ~off ~len -> if not !first_chunk_received then begin first_chunk := buffer; first_chunk_offset := off; first_chunk_length := len; first_chunk_received := true end else (* TODO How to integrate this thing with logging? *) (* websocket_log.warning (fun log -> log "Received fragmented control frame"); *) (); drain_payload payload continuation) ~on_eof:(fun () -> let payload = !first_chunk in let offset = !first_chunk_offset in let length = !first_chunk_length in first_chunk_received := false; first_chunk := Bigstringaf.empty; first_chunk_offset := 0; first_chunk_length := 0; continuation payload offset length) in (* TODO Can this be canceled by a user's close? i.e. will that eventually cause a call to eof above? *) let rec read ~data ~flush ~ping ~pong ~close ~exn = if !closed then close !close_code else match !current_payload with | None -> Lwt.on_success (Lwt_stream.get frames) begin function | None -> if not !closed then begin closed := true; close_code := 1005 end; Websocketaf.Wsd.close socket; close !close_code | Some (`Close, payload) -> drain_payload payload @@ fun buffer offset length -> let code = if length < 2 then 1005 else let high_byte = Char.code buffer.{offset} and low_byte = Char.code buffer.{offset + 1} in high_byte lsl 8 lor low_byte in if not !closed then close_code := code; close !close_code | Some (`Ping, payload) -> drain_payload payload @@ ping | Some (`Pong, payload) -> drain_payload payload @@ pong | Some (`Other, payload) -> drain_payload payload @@ fun _buffer _offset length -> ignore length; (* TODO log instead *) (* websocket_log.warning (fun log -> log "Unknown frame type with length %i" length); *) read ~data ~flush ~ping ~pong ~close ~exn | Some (`Data properties, payload) -> current_payload := Some (properties, payload); read ~data ~flush ~ping ~pong ~close ~exn end | Some ((binary, fin), payload) -> Websocketaf.Payload.schedule_read payload ~on_read:(fun buffer ~off ~len -> match !last_chunk with | None -> last_chunk := Some (buffer, off, len); read ~data ~flush ~ping ~pong ~close ~exn | Some (last_buffer, last_offset, last_length) -> last_chunk := Some (buffer, off, len); let binary = binary = `Binary in data last_buffer last_offset last_length binary false) ~on_eof:(fun () -> current_payload := None; match !last_chunk with | None -> read ~data ~flush ~ping ~pong ~close ~exn | Some (last_buffer, last_offset, last_length) -> last_chunk := None; let binary = binary = `Binary in data last_buffer last_offset last_length binary fin) in let bytes_since_flush = ref 0 in let flush ~close ok = bytes_since_flush := 0; if !closed then close !close_code else Websocketaf.Wsd.flushed socket ok in let close code = if not !closed then begin (* TODO Really need to work out the "close handshake" and how it is exposed in the Stream API. *) (* closed := true; *) Websocketaf.Wsd.close ~code:(`Other code) socket end in let abort _exn = close 1005 in let reader = Stream.reader ~read ~close ~abort in Stream.forward reader stream; let rec outgoing_loop () = Stream.read stream ~data:(fun buffer offset length binary fin -> let kind = if binary then `Binary else `Text in if !closed then close !close_code else begin Websocketaf.Wsd.schedule socket ~is_fin:fin ~kind buffer ~off:offset ~len:length; bytes_since_flush := !bytes_since_flush + length; if !bytes_since_flush >= 4096 then flush ~close outgoing_loop else outgoing_loop () end) ~flush:(fun () -> flush ~close outgoing_loop) ~ping:(fun buffer offset length -> if length > 125 then raise (Failure "Ping payload cannot exceed 125 bytes"); if !closed then close !close_code else begin if length = 0 then Websocketaf.Wsd.send_ping socket else Websocketaf.Wsd.send_ping ~application_data:{Faraday.buffer; off = offset; len = length} socket; outgoing_loop () end) ~pong:(fun buffer offset length -> (* TODO Is there any way for the peer to send a ping payload with more than 125 bytes, forcing a too-large pong and an exception? *) if length > 125 then raise (Failure "Pong payload cannot exceed 125 bytes"); if !closed then close !close_code else begin if length = 0 then Websocketaf.Wsd.send_pong socket else Websocketaf.Wsd.send_pong ~application_data:{Faraday.buffer; off = offset; len = length} socket; outgoing_loop () end) ~close ~exn:abort in outgoing_loop (); Websocketaf.Websocket_connection.{frame; eof}
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>