Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
server_connection.ml1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307(*---------------------------------------------------------------------------- Copyright (c) 2017 Inhabited Type LLC. Copyright (c) 2025 Robur Cooperative All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. 3. Neither the name of the author nor the names of his contributors may be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ----------------------------------------------------------------------------*) open Httpun_types module Queue = struct include Queue let peek_exn = peek let peek t = if is_empty t then None else Some (peek_exn t) end module Reader = Parse.Reader module Writer = Serialize.Writer type request_handler = Reqd.t -> unit type error = [ `Bad_gateway | `Bad_request | `Internal_server_error | `Exn of exn ] type error_handler = ?request:Request.t -> error -> (Headers.t -> Body.Writer.t) -> unit type t = { reader : Reader.request; writer : Writer.t; response_body_buffer : Bstr.t; request_handler : request_handler; error_handler : error_handler; request_queue : Reqd.t Queue.t; (* invariant: If [request_queue] is not empty, then the head of the queue has already had [request_handler] called on it. *) mutable is_errored : bool; (* if there is a parse or connection error, we invoke the [error_handler] and set [is_errored] to indicate we should not close the writer yet. *) mutable wakeup_reader : Optional_thunk.t; } let is_closed t = Reader.is_closed t.reader && Writer.is_closed t.writer let is_active t = not (Queue.is_empty t.request_queue) let current_reqd_exn t = Queue.peek_exn t.request_queue let yield_reader t k = if is_closed t then failwith "yield_reader on closed conn" else if Optional_thunk.is_some t.wakeup_reader then failwith "yield_reader: only one callback can be registered at a time" else t.wakeup_reader <- Optional_thunk.some k let wakeup_reader t = let f = t.wakeup_reader in t.wakeup_reader <- Optional_thunk.none; Optional_thunk.call_if_some f let yield_writer t k = if Writer.is_closed t.writer then k () else Writer.on_wakeup t.writer k let wakeup_writer t = Writer.wakeup t.writer let default_error_handler ?request:_ error handle = let message = match error with | `Exn exn -> Printexc.to_string exn | (#Status.client_error | #Status.server_error) as error -> Status.to_string error in let body = handle Headers.empty in Body.Writer.write_string body message; Body.Writer.close body let create ?(config = Config.default) ?(error_handler = default_error_handler) request_handler = let { Config.response_buffer_size; response_body_buffer_size; _ } = config in let writer = Writer.create ~buffer_size:response_buffer_size () in let request_queue = Queue.create () in let response_body_buffer = Bstr.create response_body_buffer_size in let handler request request_body = let reqd = Reqd.create error_handler request request_body writer response_body_buffer in Queue.push reqd request_queue in { reader = Reader.request handler; writer; response_body_buffer; request_handler; error_handler; request_queue; is_errored = false; wakeup_reader = Optional_thunk.none; } let shutdown_reader t = if is_active t then Reqd.close_request_body (current_reqd_exn t); Reader.force_close t.reader; wakeup_reader t let shutdown_writer t = if is_active t then ( let reqd = current_reqd_exn t in (* XXX(dpatti): I'm not sure I understand why we close the *request* body here. Maybe we can write a test such that removing this line causes it to fail? *) Reqd.close_request_body reqd; Reqd.flush_response_body reqd); Writer.close t.writer; wakeup_writer t let error_code t = if is_active t then Reqd.error_code (current_reqd_exn t) else None let shutdown t = shutdown_reader t; shutdown_writer t let set_error_and_handle ?request t error = if is_active t then ( assert (request = None); let reqd = current_reqd_exn t in Reqd.report_error reqd error) else ( t.is_errored <- true; let status = match (error :> [ error | Status.standard ]) with | `Exn _ -> `Internal_server_error | #Status.standard as status -> status in shutdown_reader t; let writer = t.writer in t.error_handler ?request error (fun headers -> let response = Response.create ~headers status in Writer.write_response writer response; let encoding = (* If we haven't parsed the request method, just use GET as a standard placeholder. The method is only used for edge cases, like HEAD or CONNECT. *) let request_method = match request with None -> `GET | Some request -> request.meth in match Response.body_length ~request_method response with | (`Fixed _ | `Close_delimited) as encoding -> encoding | `Chunked -> (* XXX(dpatti): Because we pass the writer's faraday directly to the new body, we don't write the chunked encoding. A client won't be able to interpret this. *) `Close_delimited | `Error (`Bad_gateway | `Internal_server_error) -> failwith "H1.Server_connection.error_handler: invalid response body \ length" in Body.Writer.of_faraday (Writer.faraday writer) writer ~encoding ) ) let report_exn t exn = set_error_and_handle t (`Exn exn) let advance_request_queue t = ignore (Queue.take t.request_queue); if not (Queue.is_empty t.request_queue) then t.request_handler (Queue.peek_exn t.request_queue) let rec _next_read_operation t = if not (is_active t) then ( (* If the request queue is empty, there is no connection error, and the reader is closed, then we can assume that no more user code will be able to write. *) if Reader.is_closed t.reader && not t.is_errored then shutdown_writer t; Reader.next t.reader) else let reqd = current_reqd_exn t in match Reqd.input_state reqd with | Waiting -> _yield_reader t | Ready -> Reader.next t.reader | Complete -> _final_read_operation_for t reqd | Upgraded -> `Upgrade and _final_read_operation_for t reqd = if not (Reqd.persistent_connection reqd) then ( shutdown_reader t; Reader.next t.reader; ) else ( match Reqd.output_state reqd with | Waiting | Ready -> _yield_reader t | Upgraded -> (* If the input state is not [Upgraded], the output state cannot be either. *) assert false | Complete -> advance_request_queue t; _next_read_operation t; ) and _yield_reader t = (* XXX(dpatti): This is a way in which the reader and writer are not parallel -- we tell the writer when it needs to yield but the reader is always asking for more data. This is the only branch in either operation function that does not return `(Reader|Writer).next`, which means there are surprising states you can get into. For example, we ask the runtime to yield but then raise when it tries to because the reader is closed. I think this can be avoided if we allow this module to tell the reader when it should yield/resume, then we'd just do an inlined `Reader.next` call instead. I put this function here to describe why this is subtle. *) if Reader.is_closed t.reader then Reader.next t.reader else `Yield ;; let next_read_operation t = match _next_read_operation t with | `Error (`Parse _) -> set_error_and_handle t `Bad_request; `Close | `Error (`Bad_request request) -> set_error_and_handle ~request t `Bad_request; `Close | (`Read | `Yield | `Close | `Upgrade) as operation -> operation let rec read_with_more t bs ~off ~len more = let call_handler = Queue.is_empty t.request_queue in let consumed = Reader.read_with_more t.reader bs ~off ~len more in if is_active t then ( let reqd = current_reqd_exn t in if call_handler then t.request_handler reqd; Reqd.flush_request_body reqd); (* Keep consuming input as long as progress is made and data is available, in case multiple requests were received at once. *) if consumed > 0 && consumed < len then let off = off + consumed and len = len - consumed in consumed + read_with_more t bs ~off ~len more else consumed let read t bs ~off ~len = read_with_more t bs ~off ~len Incomplete let read_eof t bs ~off ~len = read_with_more t bs ~off ~len Complete let rec _next_write_operation t = if not (is_active t) then Writer.next t.writer else ( let reqd = current_reqd_exn t in match Reqd.output_state reqd with | Waiting -> (* XXX(dpatti): I don't think we should need to call this, but it is necessary in the case of a streaming, non-chunked body so that you can set the appropriate flag. *) Reqd.flush_response_body reqd; Writer.next t.writer | Ready -> Reqd.flush_response_body reqd; Writer.next t.writer | Complete -> _final_write_operation_for t reqd | Upgraded -> wakeup_reader t; (* Even in the Upgrade case, we're still responsible for writing the response header, so we might have work to do. *) if Writer.has_pending_output t.writer then Writer.next t.writer else `Upgrade) and _final_write_operation_for t reqd = let next = if not (Reqd.persistent_connection reqd) then ( shutdown_writer t; Writer.next t.writer) else match Reqd.input_state reqd with | Waiting -> `Yield | Ready -> Writer.next t.writer; | Upgraded -> `Upgrade | Complete -> advance_request_queue t; _next_write_operation t in wakeup_reader t; next let next_write_operation t = _next_write_operation t let report_write_result t result = Writer.report_result t.writer result