package nomad

  1. Overview
  2. Docs

Source file http2.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
open Riot
open Atacama.Handler
include Atacama.Handler.Default

module Logger = Logger.Make (struct
  let namespace = [ "nomad"; "http2" ]
end)

let ( let* ) = Result.bind

module H2_stream = struct
  type Message.t += Frame of Frame.t

  let rec loop conn =
    match receive_any () with
    | Frame frame ->
        Logger.debug (fun f -> f "frame: %a" Frame.pp frame);
        loop conn
    | _ -> loop conn

  let init stream_id conn =
    Logger.debug (fun f ->
        f "Stream %a stream_id=%d initialized" Pid.pp (self ()) stream_id);
    loop conn

  let start_link ~stream_id ~conn =
    let pid = spawn_link (fun () -> init stream_id conn) in
    Result.Ok pid

  let send_frame ~frame pid = send pid (Frame frame)
end

type settings = {
  header_table_size : int;
  initial_window_size : int;
  max_frame_size : int;
}

let default_settings =
  {
    header_table_size = 4_096;
    initial_window_size = 65_535;
    max_frame_size = 16_384;
  }

type state = {
  request : Http.Request.t;
  handler : Handler.t;
  buffer : Bytestring.t;
  conn : Atacama.Connection.t;
  settings : settings;
  streams : (Frame.stream_id, Pid.t) Hashtbl.t;
}

type error =
  [ `protocol_error of
    [ `continuation_frame_with_zero_stream_id
    | `data_frame_with_invalid_padding_length
    | `data_frame_with_zero_stream_id
    | `headers_frame_with_invalid_padding_length
    | `headers_frame_with_zero_stream_id
    | `invalid_payload_size_in_goaway_frame
    | `invalid_payload_size_in_ping_frame
    | `invalid_payload_size_in_priority_frame
    | `invalid_payload_size_in_rst_stream
    | `invalid_settings_frame_with_stream_id of Frame.stream_id
    | `invalid_stream_id_in_goaway_frame
    | `invalid_stream_id_in_ping_frame
    | `invalid_window_update_frame
    | `invalid_window_update_size_increment
    | `priority_frame_with_zero_stream_id
    | `push_promise_frame_received
    | `rst_stream_frame_with_zero_stream_id ]
  | `settings_error of
    [ `frame_size_too_large of int
    | `frame_size_too_small of int
    | `invalid_enable_push_value
    | `window_size_too_large of int ]
  | `could_not_initialize_connection ]

let err_to_str (err : error) =
  match err with
  | `could_not_initialize_connection -> "Could not initialize HTTP/2 connection"
  | `protocol_error (`invalid_settings_frame_with_stream_id sid) ->
      Format.sprintf "Protocol error: invalid SETTINGS frame with stream_id=%d"
        sid
  | `protocol_error `continuation_frame_with_zero_stream_id ->
      "Protocol error: invalid CONTINUATION frame with stream_id=0"
  | `protocol_error `headers_frame_with_zero_stream_id ->
      "Protocol error: invalid HEADERS frame with stream_id=0"
  | `protocol_error `data_frame_with_zero_stream_id ->
      "Protocol error: invalid DATA frame with stream_id=0"
  | `protocol_error `data_frame_with_invalid_padding_length ->
      "Protocol error: DATA frame with invalid padding length"
  | `settings_error (`frame_size_too_small size) ->
      Format.sprintf "SETTINGS error: max frame size of %d is too small " size
  | `settings_error (`frame_size_too_large size) ->
      Format.sprintf "SETTINGS error: max frame size of %d is too large " size
  | `settings_error (`window_size_too_large size) ->
      Format.sprintf "SETTINGS error: initial window size of %d is too large "
        size
  | `settings_error `invalid_enable_push_value ->
      "Protocol error: invalid `enable_push` frame settings value"
  | err -> Marshal.to_string err []

let pp_err fmt err = Format.fprintf fmt "%s" (err_to_str err)

let make ?(settings = default_settings) ~handler ~conn () =
  {
    request = Http.Request.make "";
    handler;
    buffer = Bytestring.empty;
    conn;
    settings;
    streams = Hashtbl.create 128;
  }

let handshake req conn state =
  let res =
    Trail.Response.(
      make `Switching_protocols
        ~headers:[ ("upgrade", "h2c"); ("connection", "Upgrade") ]
        ())
  in
  Adapter.send conn req res;
  state

let handle_connection conn state =
  Logger.debug (fun f -> f "switched to http2");
  let frame = Frame.serialize Frame.empty_settings in
  match Atacama.Connection.send conn frame with
  | Ok () ->
      Logger.debug (fun f -> f "sent %d bytes" (Bytestring.length frame));
      Continue state
  | _ -> Error (state, `could_not_initialize_connection)

let handle_frame frame conn state =
  let stream_id = Frame.stream_id frame in
  let stream_pid =
    match Hashtbl.find_opt state.streams stream_id with
    | Some pid -> pid
    | None ->
        (* FIXME(@leostera): T_T *)
        let[@warning "-8"] Result.(Ok pid) =
          H2_stream.start_link ~stream_id ~conn
        in
        Hashtbl.add state.streams stream_id pid;
        pid
  in
  H2_stream.send_frame stream_pid ~frame;
  `continue (Continue state)

let handle_data data conn state =
  let data = Bytestring.to_string state.buffer ^ Bytestring.to_string data in
  data
  |> Stream.unfold
       (Frame.deserialize ~max_frame_size:state.settings.max_frame_size)
  |> Stream.reduce_while (Continue state) @@ fun frame state ->
     match (frame, state) with
     | `ok frame, Continue state -> handle_frame frame conn state
     | `more buffer, Continue state -> `halt (Continue { state with buffer })
     | `error reason, Continue state -> `halt (Error (state, reason))
     | _, _ -> failwith "Unexpected_frame_parsing_error"