Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
ws.ml1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103open Riot open Atacama.Handler include Atacama.Handler.Default open Riot.Logger.Make (struct let namespace = [ "nomad"; "ws" ] end) let ( let* ) = Result.bind type state = { upgrade_opts : Trail.Sock.upgrade_opts; handler : Trail.Sock.t; req : Trail.Request.t; buffer : Bytestring.t; conn : Atacama.Connection.t; } type error = [ `Unknown_opcode of int ] let pp_err _fmt _ = () let make ~upgrade_opts ~handler ~req ~conn () = { upgrade_opts; handler; req; buffer = Bytestring.empty; conn } let handshake (req : Trail.Request.t) conn state = let[@warning "-8"] (Some client_key) = Http.Header.get req.headers "sec-websocket-key" in let concatenated_key = client_key ^ "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" in let hashed_key = Digestif.SHA1.(digest_string concatenated_key |> to_raw_string) in let server_key = Base64.encode_string hashed_key in let res = Trail.Response.( make `Switching_protocols ~headers: [ ("upgrade", "websocket"); ("connection", "Upgrade"); ("sec-websocket-accept", server_key); ] ()) in Adapter.send conn req res; state let handle_connection conn state = debug (fun f -> f "switched to ws"); info (fun f -> f "Request: %a" Trail.Request.pp state.req); match Trail.Sock.init state.handler conn with | `continue (conn, handler) -> Continue { state with conn; handler } | `error (conn, reason) -> Error ({ state with conn }, reason) let rec send_frames state conn frames return = match frames with | [] -> return | frame :: frames -> ( let data = Trail.Frame.Response.serialize frame in match Atacama.Connection.send conn data with | Ok _n -> send_frames state conn frames return | Error `Eof -> error (fun f -> f "ws.error: end of file"); `halt (Close state) | Error ((`Closed | `Timeout | `Process_down | `Unix_error _ | _) as err) -> error (fun f -> f "ws.error: %a" IO.pp_err err); `halt (Close state)) let handle_data data conn state = let data = Bytestring.(to_string (state.buffer ^ data)) in trace (fun f -> f "handling data: %d bytes" (String.length data)); Stream.unfold Trail.Frame.Request.deserialize data |> Stream.reduce_while (Continue state) @@ fun frame state -> match (frame, state) with | `ok frame, Continue state -> ( trace (fun f -> f "handling frame: %a" Trail.Frame.pp frame); match[@warning "-8"] Trail.Sock.handle_frame state.handler frame conn with | `push (frames, handler) -> let state = { state with handler } in send_frames state conn frames (`continue (Continue state)) | `continue (conn, handler) -> `continue (Continue { state with conn; handler }) | `close conn -> `halt (Close { state with conn }) | `error (conn, reason) -> `halt (Error ({ state with conn }, reason))) | `more buffer, Continue state -> `halt (Continue { state with buffer }) | `error reason, Continue state -> `halt (Error (state, reason)) | _, _ -> failwith "Unexpected_frame_parsing_error" let handle_message msg conn state = match Trail.Sock.handle_message state.handler msg conn with | `continue (conn, handler) -> Continue { state with conn; handler } | `error (conn, reason) -> Error ({ state with conn }, reason) | `push (frames, handler) -> ( let state = { state with handler } in match send_frames state conn frames (`continue (Continue state)) with | `continue cont -> cont | `halt res -> res)