package dream-pure

  1. Overview
  2. Docs
Internal: shared HTTP types for Dream (server) and Hyper (client)

Install

dune-project
 Dependency

Authors

Maintainers

Sources

dream-1.0.0-alpha4.tar.gz
sha256=a143b3694d67c0089ea16ce4585971d6333f05001abcadcede6696b06ca6af10
md5=20aaa93b13c210324e9dcceeba3c3b21

doc/src/dream-pure/stream.ml.html

Source file stream.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
(* This file is part of Dream, released under the MIT license. See LICENSE.md
   for details, or visit https://github.com/aantron/dream.

   Copyright 2021 Anton Bachin *)



type buffer =
  (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t

type 'a promise =
  'a Lwt.t

type read =
  data:(buffer -> int -> int -> bool -> bool -> unit) ->
  flush:(unit -> unit) ->
  ping:(buffer -> int -> int -> unit) ->
  pong:(buffer -> int -> int -> unit) ->
  close:(int -> unit) ->
  exn:(exn -> unit) ->
    unit

type write =
  close:(int -> unit) ->
  exn:(exn -> unit) ->
  (unit -> unit) ->
    unit

type reader = {
  read : read;
  close : int -> unit;
  abort : exn -> unit;
}

type writer = {
  data : buffer -> int -> int -> bool -> bool -> write;
  flush : write;
  ping : buffer -> int -> int -> write;
  pong : buffer -> int -> int -> write;
  close : int -> unit;
  abort : exn -> unit;
}

type stream = {
  reader : reader;
  writer : writer;
}

let stream reader writer =
  {reader; writer}

let no_reader = {
  read =
    (fun ~data:_ ~flush:_ ~ping:_ ~pong:_ ~close:_ ~exn:_ ->
      raise (Failure "read from a non-readable stream"));
  close =
    ignore;
  abort =
    ignore;
}

let no_writer = {
  data =
    (fun _buffer _offset _length _binary _fin ~close:_ ~exn:_ _ok ->
      raise (Failure "write to a read-only stream"));
  flush =
    (fun ~close:_ ~exn:_ _ok ->
      raise (Failure "flush of a read-only stream"));
  ping =
    (fun _buffer _offset _length ~close:_ ~exn:_ _ok ->
      raise (Failure "ping on a read-only stream"));
  pong =
    (fun _buffer _offset _length ~close:_ ~exn:_ _ok ->
      raise (Failure "pong on a read-only stream"));
  close =
    ignore;
  abort =
    ignore;
}

let reader ~read ~close ~abort = {
  read;
  close;
  abort;
}

let null = {
  reader = no_reader;
  writer = no_writer;
}

let empty_reader =
  reader
    ~read:(fun ~data:_ ~flush:_ ~ping:_ ~pong:_ ~close ~exn:_ -> close 1000)
    ~close:ignore
    ~abort:ignore

let empty = {
  reader = empty_reader;
  writer = no_writer;
}

(* TODO This shows the awkwardness in string-to-string body reading. *)
let string_reader the_string =
  (* Storing the string in a ref here so that we can "lose" it eagerly once
     the stream is closed, making the memory available to the GC. *)
  let string_ref = ref (Some the_string) in
  let exn_ref = ref None in

  let read ~data ~flush:_ ~ping:_ ~pong:_ ~close ~exn =
    match !exn_ref with
    | Some the_exn ->
      exn the_exn
    | None ->
      match !string_ref with
      | Some stored_string ->
        string_ref := None;
        let length = String.length stored_string in
        data
          (Bigstringaf.of_string ~off:0 ~len:length stored_string)
          0 length true true
      | None ->
        close 1000
  in

  let close _code =
    string_ref := None
  in

  let abort exn =
    string_ref := None;
    exn_ref := Some exn
  in

  reader ~read ~close ~abort

let string the_string =
  if String.length the_string = 0 then
    empty
  else
    {
      reader = string_reader the_string;
      writer = no_writer;
    }

let read stream ~data ~flush ~ping ~pong ~close ~exn =
  stream.reader.read ~data ~flush ~ping ~pong ~close ~exn

let close stream code =
  stream.reader.close code;
  stream.writer.close code

let abort stream exn =
  stream.reader.abort exn;
  stream.writer.abort exn

let write stream buffer offset length binary fin ~close ~exn ok =
  stream.writer.data buffer offset length binary fin ~close ~exn ok

let flush stream ~close ~exn ok =
  stream.writer.flush ~close ~exn ok

let ping stream buffer offset length ~close ~exn ok =
  stream.writer.ping buffer offset length ~close ~exn ok

let pong stream buffer offset length ~close ~exn ok =
  stream.writer.pong buffer offset length ~close ~exn ok

(* TODO Restore "double write" checks by adding a state showing that a writer
   is already queued, and add tests for this. This should be done after ping
   and pong get their separate queues. *)
type pipe = {
  mutable state : [
    | `Idle
    | `Reader_waiting
    | `Closed of int
    | `Aborted of exn
  ];

  mutable read_data_callback : buffer -> int -> int -> bool -> bool -> unit;
  mutable read_flush_callback : unit -> unit;
  mutable read_ping_callback : buffer -> int -> int -> unit;
  mutable read_pong_callback : buffer -> int -> int -> unit;
  mutable read_close_callback : int -> unit;
  mutable read_abort_callback : exn -> unit;

  mutable write_ok_callback : unit -> unit;
  mutable write_close_callback : int -> unit;
  mutable write_abort_callback : exn -> unit;
}

let dummy_read_data_callback _buffer _offset _length _binary _fin =
  () [@coverage off]

let dummy_ping_pong_callback _buffer _offset _length =
  () [@coverage off]

let clean_up_reader_fields pipe =
  pipe.read_data_callback <- dummy_read_data_callback;
  pipe.read_flush_callback <- ignore;
  pipe.read_ping_callback <- dummy_ping_pong_callback;
  pipe.read_pong_callback <- dummy_ping_pong_callback;
  pipe.read_close_callback <- ignore;
  pipe.read_abort_callback <- ignore

let clean_up_writer_fields pipe =
  pipe.write_ok_callback <- ignore;
  pipe.write_close_callback <- ignore;
  pipe.write_abort_callback <- ignore

let pipe () =
  let internal = {
    state = `Idle;

    read_data_callback = dummy_read_data_callback;
    read_flush_callback = ignore;
    read_ping_callback = dummy_ping_pong_callback;
    read_pong_callback = dummy_ping_pong_callback;
    read_close_callback = ignore;
    read_abort_callback = ignore;

    write_ok_callback = ignore;
    write_close_callback = ignore;
    write_abort_callback = ignore;
  } in

  let read ~data ~flush ~ping ~pong ~close ~exn =
    match internal.state with
    | `Idle ->
      internal.state <- `Reader_waiting;
      internal.read_data_callback <- data;
      internal.read_flush_callback <- flush;
      internal.read_ping_callback <- ping;
      internal.read_pong_callback <- pong;
      internal.read_close_callback <- close;
      internal.read_abort_callback <- exn;
      let write_ok_callback = internal.write_ok_callback in
      clean_up_writer_fields internal;
      write_ok_callback ()
    | `Reader_waiting ->
      raise (Failure "stream read: the previous read has not completed")
    | `Closed code ->
      close code
    | `Aborted the_exn ->
      exn the_exn
  in

  let rec data buffer offset length binary fin ~close ~exn ok =
    match internal.state with
    | `Idle ->
      internal.write_ok_callback <- (fun () ->
        data buffer offset length binary fin ~close ~exn ok);
      internal.write_close_callback <- close;
      internal.write_abort_callback <- exn
    | `Reader_waiting ->
      internal.state <- `Idle;
      let read_data_callback = internal.read_data_callback in
      clean_up_reader_fields internal;
      read_data_callback buffer offset length binary fin;
      ok ()
    | `Closed code ->
      close code
    | `Aborted the_exn ->
      exn the_exn
  in

  let rec flush ~close ~exn ok =
    match internal.state with
    | `Idle ->
      internal.write_ok_callback <- (fun () ->
        flush ~close ~exn ok);
      internal.write_close_callback <- close;
      internal.write_abort_callback <- exn
    | `Reader_waiting ->
      internal.state <- `Idle;
      let read_flush_callback = internal.read_flush_callback in
      clean_up_reader_fields internal;
      read_flush_callback ();
      ok ()
    | `Closed code ->
      close code
    | `Aborted the_exn ->
      exn the_exn
  in

  let rec ping buffer offset length ~close ~exn ok =
    match internal.state with
    | `Idle ->
      internal.write_ok_callback <- (fun () ->
        ping buffer offset length ~close ~exn ok);
      internal.write_close_callback <- close;
      internal.write_abort_callback <- exn
    | `Reader_waiting ->
      internal.state <- `Idle;
      let read_ping_callback = internal.read_ping_callback in
      clean_up_reader_fields internal;
      read_ping_callback buffer offset length;
      ok ()
    | `Closed code ->
      close code
    | `Aborted the_exn ->
      exn the_exn
  in

  let rec pong buffer offset length ~close ~exn ok =
    match internal.state with
    | `Idle ->
      internal.write_ok_callback <- (fun () ->
        pong buffer offset length ~close ~exn ok);
      internal.write_close_callback <- close;
      internal.write_abort_callback <- exn
    | `Reader_waiting ->
      internal.state <- `Idle;
      let read_pong_callback = internal.read_pong_callback in
      clean_up_reader_fields internal;
      read_pong_callback buffer offset length;
      ok ()
    | `Closed code ->
      close code
    | `Aborted the_exn ->
      exn the_exn
  in

  let close code =
    match internal.state with
    | `Idle ->
      internal.state <- `Closed code;
      let write_close_callback = internal.write_close_callback in
      clean_up_writer_fields internal;
      write_close_callback code
    | `Reader_waiting ->
      internal.state <- `Closed code;
      let read_close_callback = internal.read_close_callback in
      clean_up_reader_fields internal;
      read_close_callback code
    | `Closed _code ->
      ()
    | `Aborted _the_exn ->
      ()
  in

  let abort exn =
    match internal.state with
    | `Idle ->
      internal.state <- `Aborted exn;
      let write_abort_callback = internal.write_abort_callback in
      clean_up_writer_fields internal;
      write_abort_callback exn
    | `Reader_waiting ->
      internal.state <- `Aborted exn;
      let read_abort_callback = internal.read_abort_callback in
      clean_up_reader_fields internal;
      read_abort_callback exn
    | `Closed _code ->
      ()
    | `Aborted _the_exn ->
      ()
  in

  let reader = {
    read;
    close;
    abort;
  }
  and writer = {
    data;
    flush;
    ping;
    pong;
    close;
    abort;
  } in

  (reader, writer)

let forward (reader : reader) stream =
  let rec loop () =
    reader.read
      ~data:(fun buffer offset length binary fin ->
        stream.writer.data
          buffer offset length
          binary fin
          ~close:reader.close ~exn:reader.abort
          loop)
      ~flush:(fun () ->
        stream.writer.flush ~close:reader.close ~exn:reader.abort loop)
      ~ping:(fun buffer offset length ->
        stream.writer.ping
          buffer offset length ~close:reader.close ~exn:reader.abort loop)
      ~pong:(fun buffer offset length ->
        stream.writer.pong
          buffer offset length ~close:reader.close ~exn:reader.abort loop)
      ~close:stream.writer.close
      ~exn:stream.writer.abort
  in
  loop ()

let read_convenience stream =
  let promise, resolver = Lwt.wait () in
  let close _code = Lwt.wakeup_later resolver None in
  let abort exn = Lwt.wakeup_later_exn resolver exn in

  let rec loop () =
    stream.reader.read
      ~data:(fun buffer offset length _binary _fin ->
        Bigstringaf.sub buffer ~off:offset ~len:length
        |> Bigstringaf.to_string
        |> Option.some
        |> Lwt.wakeup_later resolver)

      ~flush:loop

      ~ping:(fun buffer offset length ->
        stream.writer.pong buffer offset length ~close ~exn:abort loop)

      ~pong:(fun _buffer _offset _length ->
        loop ())

      ~close

      ~exn:abort
  in
  loop ();

  promise

(* TODO It's probably best to protect "wakeups" of the promise to prevent
   Invalid_argument from Lwt. *)
let read_until_close stream =
  let promise, resolver = Lwt.wait () in
  let length = ref 0 in
  let buffer = ref (Bigstringaf.create 4096) in
  let close _code =
    Bigstringaf.sub !buffer ~off:0 ~len:!length
    |> Bigstringaf.to_string
    |> Lwt.wakeup_later resolver
  in
  let abort exn = Lwt.wakeup_later_exn resolver exn in

  let rec loop () =
    stream.reader.read
      ~data:(fun chunk offset chunk_length _binary _fin ->
        let new_length = !length + chunk_length in

        if new_length > Bigstringaf.length !buffer then begin
          let new_buffer = Bigstringaf.create (new_length * 2) in
          Bigstringaf.blit
            !buffer ~src_off:0 new_buffer ~dst_off:0 ~len:!length;
          buffer := new_buffer
        end;

        Bigstringaf.blit
          chunk ~src_off:offset !buffer ~dst_off:!length ~len:chunk_length;
        length := new_length;

        loop ())

      ~flush:loop

      ~ping:(fun buffer offset length ->
        stream.writer.pong buffer offset length ~close ~exn:abort loop)

      ~pong:(fun _buffer _offset _length ->
        loop ())

      ~close

      ~exn:abort
  in
  loop ();

  promise
OCaml

Innovation. Community. Security.