package nomad

  1. Overview
  2. Docs

Source file ws.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
open Riot
open Atacama.Handler
include Atacama.Handler.Default

open Riot.Logger.Make (struct
  let namespace = [ "nomad"; "ws" ]
end)

let ( let* ) = Result.bind

type state = {
  upgrade_opts : Trail.Sock.upgrade_opts;
  handler : Trail.Sock.t;
  req : Trail.Request.t;
  buffer : Bytestring.t;
  conn : Atacama.Connection.t;
}

type error = [ `Unknown_opcode of int ]

let pp_err _fmt _ = ()

let make ~upgrade_opts ~handler ~req ~conn () =
  { upgrade_opts; handler; req; buffer = Bytestring.empty; conn }

let handshake (req : Trail.Request.t) conn state =
  let[@warning "-8"] (Some client_key) =
    Http.Header.get req.headers "sec-websocket-key"
  in
  let concatenated_key = client_key ^ "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" in
  let hashed_key =
    Digestif.SHA1.(digest_string concatenated_key |> to_raw_string)
  in
  let server_key = Base64.encode_string hashed_key in

  let res =
    Trail.Response.(
      make `Switching_protocols
        ~headers:
          [
            ("upgrade", "websocket");
            ("connection", "Upgrade");
            ("sec-websocket-accept", server_key);
          ]
        ())
  in

  Adapter.send conn req res;
  state

let handle_connection conn state =
  debug (fun f -> f "switched to ws");
  info (fun f -> f "Request: %a" Trail.Request.pp state.req);
  match Trail.Sock.init state.handler conn with
  | `continue (conn, handler) -> Continue { state with conn; handler }
  | `error (conn, reason) -> Error ({ state with conn }, reason)

let rec send_frames state conn frames return =
  match frames with
  | [] -> return
  | frame :: frames -> (
      let data = Trail.Frame.Response.serialize frame in
      match Atacama.Connection.send conn data with
      | Ok _n -> send_frames state conn frames return
      | Error `Eof ->
          error (fun f -> f "ws.error: end of file");
          `halt (Close state)
      | Error ((`Closed | `Timeout | `Process_down | `Unix_error _ | _) as err)
        ->
          error (fun f -> f "ws.error: %a" IO.pp_err err);
          `halt (Close state))

let handle_data data conn state =
  let data = Bytestring.(to_string (state.buffer ^ data)) in
  trace (fun f -> f "handling data: %d bytes" (String.length data));
  Stream.unfold Trail.Frame.Request.deserialize data
  |> Stream.reduce_while (Continue state) @@ fun frame state ->
     match (frame, state) with
     | `ok frame, Continue state -> (
         trace (fun f -> f "handling frame: %a" Trail.Frame.pp frame);
         match[@warning "-8"]
           Trail.Sock.handle_frame state.handler frame conn
         with
         | `push (frames, handler) ->
             let state = { state with handler } in
             send_frames state conn frames (`continue (Continue state))
         | `continue (conn, handler) ->
             `continue (Continue { state with conn; handler })
         | `close conn -> `halt (Close { state with conn })
         | `error (conn, reason) -> `halt (Error ({ state with conn }, reason)))
     | `more buffer, Continue state -> `halt (Continue { state with buffer })
     | `error reason, Continue state -> `halt (Error (state, reason))
     | _, _ -> failwith "Unexpected_frame_parsing_error"

let handle_message msg conn state =
  match Trail.Sock.handle_message state.handler msg conn with
  | `continue (conn, handler) -> Continue { state with conn; handler }
  | `error (conn, reason) -> Error ({ state with conn }, reason)
  | `push (frames, handler) -> (
      let state = { state with handler } in
      match send_frames state conn frames (`continue (Continue state)) with
      | `continue cont -> cont
      | `halt res -> res)