package shuttle_http

  1. Overview
  2. Docs

Source file body.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
open! Core
open! Async

module Stream = struct
  type t =
    { encoding : [ `Chunked | `Fixed of int ]
    ; reader : string Pipe.Reader.t
    ; mutable read_started : bool
    }
  [@@deriving sexp_of]

  let of_pipe encoding reader = { encoding; reader; read_started = false }
  let close t = Pipe.close_read t.reader
  let encoding t = t.encoding

  let iter t ~f =
    if t.read_started then raise_s [%message "Only one consumer can read from a stream"];
    t.read_started <- true;
    Pipe.iter t.reader ~f
  ;;

  let iter_without_pushback t ~f =
    if t.read_started then raise_s [%message "Only one consumer can read from a stream"];
    t.read_started <- true;
    Pipe.iter_without_pushback t.reader ~f
  ;;

  let fold t ~init ~f =
    if t.read_started then raise_s [%message "Only one consumer can read from a stream"];
    t.read_started <- true;
    Pipe.fold t.reader ~init ~f
  ;;

  let fold_without_pushback t ~init ~f =
    if t.read_started then raise_s [%message "Only one consumer can read from a stream"];
    t.read_started <- true;
    Pipe.fold_without_pushback t.reader ~init ~f
  ;;

  let read_started t = t.read_started

  let drain t =
    if t.read_started
    then raise_s [%message "Cannot drain a body that's currently being read"];
    Pipe.drain t.reader
  ;;

  let to_string t =
    if t.read_started
    then raise_s [%message "to_string: Only one consumer can read from a stream"];
    t.read_started <- true;
    let%map rope =
      Pipe.fold_without_pushback t.reader ~init:Rope.empty ~f:(fun rope str ->
        Rope.(rope ^ of_string str))
    in
    Rope.to_string rope
  ;;

  let closed t = Pipe.closed t.reader
end

type t =
  | Empty
  | Fixed of string
  | Stream of Stream.t
[@@deriving sexp_of]

let string x = Fixed x
let empty = Empty
let of_pipe encoding reader = Stream { Stream.encoding; reader; read_started = false }
let stream stream = Stream stream

let to_stream = function
  | Empty -> Stream.of_pipe (`Fixed 0) (Pipe.empty ())
  | Fixed x -> Stream.of_pipe (`Fixed (String.length x)) (Pipe.singleton x)
  | Stream x -> x
;;

let to_string = function
  | Empty -> return ""
  | Fixed s -> return s
  | Stream x -> Stream.to_string x
;;