package opentelemetry

  1. Overview
  2. Docs
Instrumentation for https://opentelemetry.io

Install

dune-project
 Dependency

Authors

Maintainers

Sources

opentelemetry-0.12.tbz
sha256=ca92e7395495f73b46316607c514ce0dbe5fab129dddd9a17b353f835dcbf77d
sha512=ea2afd07c8db955364681f90388959db97d7b7f5b0836bc4045eca929968d6d77905e3aa66802226c0791c2552d0e281bdf2dbfe7ed90e9877ce3cedc343823f

doc/src/opentelemetry.client/batch.ml.html

Source file batch.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
type 'a t = {
  mutable size: int;
  mutable q: 'a list;
      (** The queue is a FIFO represented as a list in reverse order *)
  batch: int;  (** Minimum size to batch before popping *)
  high_watermark: int;  (** Size above which we start dropping signals *)
  timeout: Mtime.span option;
  mutable start: Mtime.t;
  mutex: Mutex.t;
}

(* Mutex.protect was added in OCaml 5.1, but we want support back to 4.08.
   cannot inline, otherwise flambda might move code around. (as per Stdlib) *)
let[@inline never] protect_mutex m f =
  Mutex.lock m;
  Fun.protect f ~finally:(fun () -> Mutex.unlock m)

let default_high_watermark batch_size =
  if batch_size = 1 then
    100
  else
    batch_size * 10

let make ?(batch = 1) ?high_watermark ?now ?timeout () : _ t =
  let high_watermark =
    match high_watermark with
    | Some x -> x
    | None -> default_high_watermark batch
  in
  let start =
    match now with
    | Some x -> x
    | None -> Mtime_clock.now ()
  in
  let mutex = Mutex.create () in
  assert (batch > 0);
  { size = 0; q = []; start; batch; timeout; high_watermark; mutex }

let timeout_expired_ ~now self : bool =
  match self.timeout with
  | Some t ->
    let elapsed = Mtime.span now self.start in
    Mtime.Span.compare elapsed t >= 0
  | None -> false

(* Big enough to send a batch *)
let is_full_ self : bool = self.size >= self.batch

let ready_to_pop ~force ~now self =
  self.size > 0 && (force || is_full_ self || timeout_expired_ ~now self)

let pop_if_ready ?(force = false) ~now (self : _ t) : _ list option =
  let rev_batch_opt =
    protect_mutex self.mutex @@ fun () ->
    if ready_to_pop ~force ~now self then (
      assert (self.q <> []);
      let batch = self.q in
      self.q <- [];
      self.size <- 0;
      Some batch
    ) else
      None
  in
  match rev_batch_opt with
  | None -> None
  | Some batch ->
    (* Reverse the list to retrieve the FIFO order. *)
    Some (List.rev batch)

let rec push_unprotected (self : _ t) ~(elems : _ list) : unit =
  match elems with
  | [] -> ()
  | x :: xs ->
    self.q <- x :: self.q;
    self.size <- 1 + self.size;
    push_unprotected self ~elems:xs

let push (self : _ t) elems : [ `Dropped | `Ok ] =
  protect_mutex self.mutex @@ fun () ->
  if self.size >= self.high_watermark then
    (* drop this to prevent queue from growing too fast *)
    `Dropped
  else (
    if self.size = 0 && Option.is_some self.timeout then
      (* current batch starts now *)
      self.start <- Mtime_clock.now ();

    (* add to queue *)
    push_unprotected self ~elems;
    `Ok
  )