Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
input_channel0.ml1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205open Core open Async_kernel open Async_unix open! Async_kernel_require_explicit_time_source type t = { fd : Fd.t ; mutable is_closed : bool ; closed : unit Ivar.t ; buf : Bytebuffer.t ; time_source : Time_source.t } [@@deriving sexp_of] let create ?buf_len ?time_source fd = Fd.with_file_descr_exn fd ignore ~nonblocking:true; let time_source = match time_source with | None -> Time_source.wall_clock () | Some t -> Time_source.read_only t in let buf_len = match buf_len with | None -> 64 * 1024 | Some buf_len -> if buf_len > 0 then buf_len else raise_s [%message "Reader.create got negative buf_len" (buf_len : int) (fd : Fd.t)] in { fd ; is_closed = false ; closed = Ivar.create () ; buf = Bytebuffer.create buf_len ; time_source } ;; let consume t n = Bytebuffer.drop t.buf n let is_closed t = t.is_closed let closed t = Ivar.read t.closed let close t = if not t.is_closed then ( t.is_closed <- true; Fd.close t.fd >>> fun () -> Ivar.fill t.closed ()); closed t ;; exception Timeout let refill_with_timeout t span = Bytebuffer.compact t.buf; if Bytebuffer.available_to_write t.buf = 0 then Bytebuffer.ensure_space t.buf 1; let result = Bytebuffer.read_assume_fd_is_nonblocking t.buf (Fd.file_descr_exn t.fd) in if Unix.Syscall_result.Int.is_ok result then ( match Unix.Syscall_result.Int.ok_exn result with | 0 -> return `Eof | n -> assert (n > 0); return `Ok) else ( match Unix.Syscall_result.Int.error_exn result with | EAGAIN | EWOULDBLOCK | EINTR -> let event = Time_source.Event.after t.time_source span in let interrupt = match%bind Time_source.Event.fired event with | Time_source.Event.Fired.Aborted () -> Deferred.never () | Time_source.Event.Fired.Happened () -> Deferred.unit in let rec loop t = Fd.interruptible_ready_to ~interrupt t.fd `Read >>= function | `Interrupted -> (match Time_source.Event.abort event () with | Time_source.Event.Abort_result.Previously_happened () -> raise Timeout | Ok | Previously_aborted () -> raise_s [%message "Input_channel.refill_with_timeout bug. Timeout event can't be aborted \ if Fd is interrupted"]) | `Ready -> let result = Bytebuffer.read_assume_fd_is_nonblocking t.buf (Fd.file_descr_exn t.fd) in if Unix.Syscall_result.Int.is_ok result then ( match Unix.Syscall_result.Int.ok_exn result with | 0 -> Time_source.Event.abort_if_possible event (); return `Eof | n -> assert (n > 0); Time_source.Event.abort_if_possible event (); return `Ok) else ( match Unix.Syscall_result.Int.error_exn result with | EAGAIN | EWOULDBLOCK | EINTR -> loop t | EPIPE | ECONNRESET | EHOSTUNREACH | ENETDOWN | ENETRESET | ENETUNREACH | ETIMEDOUT -> Time_source.Event.abort_if_possible event (); return `Eof | error -> Time_source.Event.abort_if_possible event (); raise (Unix.Unix_error (error, "read", ""))) | `Closed -> Time_source.Event.abort_if_possible event (); return `Eof | `Bad_fd -> let%bind () = close t in Time_source.Event.abort_if_possible event (); raise_s [%message "Shuttle.Input_channel.refill_with_timeout: bad file descriptor" ~fd:(t.fd : Fd.t)] in loop t | EPIPE | ECONNRESET | EHOSTUNREACH | ENETDOWN | ENETRESET | ENETUNREACH | ETIMEDOUT -> return `Eof | error -> raise (Unix.Unix_error (error, "read", ""))) ;; let refill t = Bytebuffer.compact t.buf; if Bytebuffer.available_to_write t.buf = 0 then Bytebuffer.ensure_space t.buf 1; let result = Bytebuffer.read_assume_fd_is_nonblocking t.buf (Fd.file_descr_exn t.fd) in if Unix.Syscall_result.Int.is_ok result then ( match Unix.Syscall_result.Int.ok_exn result with | 0 -> return `Eof | n -> assert (n > 0); return `Ok) else ( match Unix.Syscall_result.Int.error_exn result with | EAGAIN | EWOULDBLOCK | EINTR -> let rec loop t = Fd.ready_to t.fd `Read >>= function | `Ready -> let result = Bytebuffer.read_assume_fd_is_nonblocking t.buf (Fd.file_descr_exn t.fd) in if Unix.Syscall_result.Int.is_ok result then ( match Unix.Syscall_result.Int.ok_exn result with | 0 -> return `Eof | n -> assert (n > 0); return `Ok) else ( match Unix.Syscall_result.Int.error_exn result with | EAGAIN | EWOULDBLOCK | EINTR -> loop t | EPIPE | ECONNRESET | EHOSTUNREACH | ENETDOWN | ENETRESET | ENETUNREACH | ETIMEDOUT -> return `Eof | error -> raise (Unix.Unix_error (error, "read", ""))) | `Closed -> return `Eof | `Bad_fd -> raise_s [%message "Shuttle.Input_channel.read: bad file descriptor" ~fd:(t.fd : Fd.t)] in loop t | EPIPE | ECONNRESET | EHOSTUNREACH | ENETDOWN | ENETRESET | ENETUNREACH | ETIMEDOUT -> return `Eof | error -> raise (Unix.Unix_error (error, "read", ""))) ;; let view t = Bytebuffer.unsafe_peek t.buf let transfer t writer = let finished = Ivar.create () in upon (Pipe.closed writer) (fun () -> Ivar.fill_if_empty finished ()); let rec loop () = refill t >>> function | `Eof -> Ivar.fill_if_empty finished () | `Ok -> let payload = Bytebuffer.to_string t.buf in Bytebuffer.drop t.buf (String.length payload); Pipe.write writer payload >>> fun () -> loop () in loop (); Ivar.read finished ;; let pipe t = let reader, writer = Pipe.create () in (transfer t writer >>> fun () -> close t >>> fun () -> Pipe.close writer); reader ;; let drain t = Pipe.drain (pipe t)