Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
body.ml1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48open! Core open! Async module Stream = struct type t = { encoding : [ `Chunked | `Fixed of int ] ; reader : string Pipe.Reader.t ; mutable read_started : bool } [@@deriving sexp_of] let of_pipe encoding reader = { encoding; reader; read_started = false } let close t = Pipe.close_read t.reader let encoding t = t.encoding let iter t ~f = if t.read_started then raise_s [%message "Only one consumer can read from a stream"]; t.read_started <- true; Pipe.iter t.reader ~f ;; let read_started t = t.read_started let drain t = if t.read_started then raise_s [%message "Cannot drain a body that's currently being read"]; Pipe.drain t.reader ;; let closed t = Pipe.closed t.reader end type t = | Empty | Fixed of string | Stream of Stream.t [@@deriving sexp_of] let string x = Fixed x let empty = Empty let of_pipe encoding reader = Stream { Stream.encoding; reader; read_started = false } let stream stream = Stream stream let to_stream = function | Empty -> Stream.of_pipe (`Fixed 0) (Pipe.empty ()) | Fixed x -> Stream.of_pipe (`Fixed (String.length x)) (Pipe.singleton x) | Stream x -> x ;;