package ocluster-worker

  1. Overview
  2. Docs

Source file log_data.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
open Lwt.Infix

let max_chunk_size = 10240L

type t = {
  data : Buffer.t;
  mutable cond : [ `Running of unit Lwt_condition.t
                 | `Finished ]
}

let create () =
  {
    data = Buffer.create 10240;
    cond = `Running (Lwt_condition.create ());
  }

let rec stream t ~start =
  let len = Int64.of_int (Buffer.length t.data) in
  let start = if start < 0L then max 0L (Int64.add len start) else start in
  let avail = Int64.sub len start in
  if avail < 0L then Fmt.failwith "Start value out of range!";
  if avail = 0L then (
    match t.cond with
    | `Running cond ->
      Lwt_condition.wait cond >>= fun () ->
      stream t ~start
    | `Finished ->
      Lwt.return ("", start)
  ) else (
    let chunk = min avail max_chunk_size in
    let next = Int64.add start chunk in
    let start = Int64.to_int start in
    let chunk = Int64.to_int chunk in
    Lwt.return (Buffer.sub t.data start chunk, next)
  )

let write t data =
  match t.cond with
  | `Running cond ->
    Buffer.add_string t.data data;
    Lwt_condition.broadcast cond ()
  | `Finished ->
    Fmt.failwith "Attempt to write to log after close: %S" data

let copy_from_stream t src =
  let rec aux () =
    Lwt_io.read ~count:4096 src >>= function
    | "" -> Lwt.return_unit
    | data -> write t data; aux ()
  in
  aux ()

let close t =
  match t.cond with
  | `Running cond ->
    t.cond <- `Finished;
    Lwt_condition.broadcast cond ()
  | `Finished ->
    Fmt.failwith "Log already closed!"

let info t fmt =
  Fmt.kstr (write t) (fmt ^^ "@.")