package trace-fuchsia

  1. Overview
  2. Docs

Source file collector_fuchsia.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
open Common_
open Types
open Trace_core

type t = {
  active: bool A.t;
  pid: int;
  buf_chain: Buf_chain.t;
  exporter: Exporter.t;
  trace_id_gen: Types.Trace_id.Gen.t;
}
(** Subscriber state *)

open struct
  (** Write the buffers that are ready *)
  let[@inline] write_ready_ (self : t) =
    if Buf_chain.has_ready self.buf_chain then
      Buf_chain.pop_ready self.buf_chain ~f:self.exporter.write_bufs

  (* TODO: nice to have, can we make it optional?
  let print_non_closed_spans_warning spans =
    let module Str_set = Set.Make (String) in
    let spans = Span_tbl.to_list spans in
    if spans <> [] then (
      !on_tracing_error
      @@ Printf.sprintf "warning: %d spans were not closed" (List.length spans);
      let names =
        List.fold_left
          (fun set (_, span) -> Str_set.add span.name set)
          Str_set.empty spans
      in
      Str_set.iter
        (fun name ->
          !on_tracing_error @@ Printf.sprintf "  span %S was not closed" name)
        names;
      flush stderr
    )
    *)
end

let close (self : t) : unit =
  if A.exchange self.active false then (
    Buf_chain.ready_all_non_empty self.buf_chain;
    write_ready_ self;
    self.exporter.close () (* TODO: print_non_closed_spans_warning self.spans *)
  )

let[@inline] active self = A.get self.active

let flush (self : t) : unit =
  Buf_chain.ready_all_non_empty self.buf_chain;
  write_ready_ self;
  self.exporter.flush ()

let create ?(buf_pool = Buf_pool.create ()) ~pid ~exporter () : t =
  let buf_chain = Buf_chain.create ~sharded:true ~buf_pool () in
  {
    active = A.make true;
    buf_chain;
    exporter;
    pid;
    trace_id_gen = Types.Trace_id.Gen.create ();
  }

open struct
  let new_trace_id self = Types.Trace_id.Gen.gen self.trace_id_gen

  let init (self : t) =
    Writer.Metadata.Magic_record.encode self.buf_chain;
    Writer.Metadata.Initialization_record.(
      encode self.buf_chain ~ticks_per_secs:default_ticks_per_sec ());
    Writer.Metadata.Provider_info.encode self.buf_chain ~id:0
      ~name:"ocaml-trace" ();
    (* make sure we write these immediately so they're not out of order *)
    Buf_chain.ready_all_non_empty self.buf_chain;

    write_ready_ self

  let shutdown (self : t) = close self

  (* add function name, if provided, to the metadata *)
  let add_fun_name_ fun_name data : _ list =
    match fun_name with
    | None -> data
    | Some f -> ("function", `String f) :: data

  let rec flavor_of_params = function
    | [] -> `Sync
    | Core_ext.Extension_span_flavor f :: _ -> f
    | _ :: tl -> flavor_of_params tl

  let enter_span (self : t) ~__FUNCTION__ ~__FILE__ ~__LINE__ ~level:_ ~params
      ~data ~parent name : span =
    let flavor = flavor_of_params params in
    let time_ns = Trace_util.Mock_.now_ns () in
    let tid = Trace_util.Mock_.get_tid () in

    match flavor with
    | `Sync ->
      Span_fuchsia_sync
        {
          __FUNCTION__;
          name;
          pid = self.pid;
          tid;
          args = data;
          start_ns = time_ns;
        }
    | `Async ->
      let data = add_fun_name_ __FUNCTION__ data in
      let trace_id =
        match parent with
        | P_some (Span_fuchsia_async sp) -> sp.trace_id
        | _ -> new_trace_id self
      in

      Writer.(
        Event.Async_begin.encode self.buf_chain ~name
          ~args:(args_of_user_data data)
          ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
          ~time_ns ~async_id:trace_id ());
      write_ready_ self;

      Span_fuchsia_async { pid = self.pid; tid; trace_id; name; args = data }

  let exit_span (self : t) sp =
    let end_time_ns = Trace_util.Mock_.now_ns () in

    match sp with
    | Span_fuchsia_sync { __FUNCTION__; name; tid; pid; args = data; start_ns }
      ->
      let data = add_fun_name_ __FUNCTION__ data in
      Writer.(
        Event.Duration_complete.encode self.buf_chain ~name
          ~t_ref:(Thread_ref.inline ~pid ~tid)
          ~time_ns:start_ns ~end_time_ns ~args:(args_of_user_data data) ());
      write_ready_ self
    | Span_fuchsia_async { name; tid; pid; trace_id; args = data } ->
      Writer.(
        Event.Async_end.encode self.buf_chain ~name
          ~args:(args_of_user_data data)
          ~t_ref:(Thread_ref.inline ~pid ~tid)
          ~time_ns:end_time_ns ~async_id:trace_id ());
      write_ready_ self
    | _ -> ()

  let add_data_to_span _st sp data =
    match sp with
    | Span_fuchsia_sync sp -> sp.args <- List.rev_append data sp.args
    | Span_fuchsia_async sp -> sp.args <- List.rev_append data sp.args
    | _ -> ()

  let message (self : t) ~level:_ ~params:_ ~data ~span:_ msg : unit =
    let time_ns = Trace_util.Mock_.now_ns () in
    let tid = Trace_util.Mock_.get_tid () in
    Writer.(
      Event.Instant.encode self.buf_chain
        ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
        ~name:msg ~time_ns ~args:(args_of_user_data data) ());
    write_ready_ self

  let counter_float_ (self : t) ~data name n : unit =
    let tid = Trace_util.Mock_.get_tid () in
    let time_ns = Trace_util.Mock_.now_ns () in
    Writer.(
      Event.Counter.encode self.buf_chain
        ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
        ~name ~time_ns
        ~args:((name, A_float n) :: args_of_user_data data)
        ());
    write_ready_ self

  let counter_int_ self ~data name n =
    let tid = Trace_util.Mock_.get_tid () in
    let time_ns = Trace_util.Mock_.now_ns () in
    Writer.(
      Event.Counter.encode self.buf_chain
        ~t_ref:(Thread_ref.inline ~pid:self.pid ~tid)
        ~name ~time_ns
        ~args:((name, A_int n) :: args_of_user_data data)
        ());
    write_ready_ self

  let metric self ~level:_ ~params:_ ~data name m =
    match m with
    | Core_ext.Metric_int i -> counter_int_ self ~data name i
    | Core_ext.Metric_float n -> counter_float_ self ~data name n
    | _ -> ()

  let name_process_ (self : t) name : unit =
    Writer.Kernel_object.(
      encode self.buf_chain ~name ~ty:ty_process ~kid:self.pid ~args:[] ());
    write_ready_ self

  let name_thread_ (self : t) ~tid name : unit =
    Writer.Kernel_object.(
      encode self.buf_chain ~name ~ty:ty_thread ~kid:tid
        ~args:[ "process", A_kid (Int64.of_int self.pid) ]
        ());
    write_ready_ self

  let extension (self : t) ~level:_ ev =
    match ev with
    | Core_ext.Extension_set_thread_name name ->
      let tid = Trace_util.Mock_.get_tid () in
      name_thread_ self ~tid name
    | Core_ext.Extension_set_process_name name -> name_process_ self name
    | _ -> ()
end

let callbacks : t Collector.Callbacks.t =
  Collector.Callbacks.make ~init ~shutdown ~enter_span ~exit_span
    ~add_data_to_span ~message ~metric ~extension ()

let collector (self : t) : Collector.t = Collector.C_some (self, callbacks)