package tezt

  1. Overview
  2. Docs

Source file echo.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2020-2022 Nomadic Labs <contact@nomadic-labs.com>           *)
(* Copyright (c) 2020 Metastate AG <hello@metastate.dev>                     *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

open Base

type t = {
  queue : string ref Queue.t;
  mutable lwt_channel : Lwt_io.input_channel option;
  mutable closed : bool;
  mutable pending : int list;
}

let wake_up echo =
  let pending = echo.pending in
  echo.pending <- [] ;
  List.iter (fun pending -> Lwt_unix.send_notification pending) pending

let push echo string =
  (* Maintain the invariant that strings in the queue are never empty. *)
  if String.length string > 0 then (
    Queue.push (ref string) echo.queue ;
    wake_up echo)

let close echo =
  if not echo.closed then (
    echo.closed <- true ;
    wake_up echo)

let create () =
  let echo =
    {queue = Queue.create (); lwt_channel = None; closed = false; pending = []}
  in
  let rec read bytes ofs len =
    match Queue.peek_opt echo.queue with
    | None ->
        if echo.closed then return 0
        else
          (* Nothing to read, for now. *)
          let promise, resolver = Lwt.task () in
          let note =
            Lwt_unix.make_notification ~once:true (fun () ->
                Lwt.wakeup_later resolver ())
          in
          echo.pending <- note :: echo.pending ;
          let* () = promise in
          read bytes ofs len
    | Some str_ref ->
        (* Note: we rely on the invariant that strings in the queue are never empty. *)
        let str_len = String.length !str_ref in
        if str_len <= len then (
          (* Caller requested more bytes than available in this item of the queue:
             return the item in full and remove it from the queue. *)
          (* use [Lwt_bytes.blit_from_string] once available *)
          Lwt_bytes.blit_from_bytes
            (Bytes.of_string !str_ref)
            0
            bytes
            ofs
            str_len ;
          let (_ : string ref option) = Queue.take_opt echo.queue in
          return str_len)
        else (
          (* Caller requested strictly less bytes than available in this item of the queue:
             return what caller requested, and only keep the remainder. *)
          (* use [Lwt_bytes.blit_from_string] once available *)
          Lwt_bytes.blit_from_bytes (Bytes.of_string !str_ref) 0 bytes ofs len ;
          str_ref := String.sub !str_ref len (str_len - len) ;
          return len)
  in
  let lwt_channel = Lwt_io.(make ~mode:input) read in
  echo.lwt_channel <- Some lwt_channel ;
  echo

let get_lwt_channel echo =
  match echo.lwt_channel with
  | None ->
      (* Impossible: [lwt_channel] is filled by [Some ...] immediately after the [echo]
         is created by [create_echo]. *)
      assert false
  | Some lwt_channel -> lwt_channel