Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
trace_tef.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498
open Trace_core open Trace_private_util module A = Trace_core.Internal_.Atomic_ module Mock_ = struct let enabled = ref false let now = ref 0 let[@inline never] now_us () : float = let x = !now in incr now; float_of_int x end let counter = Mtime_clock.counter () (** Now, in microseconds *) let[@inline] now_us () : float = if !Mock_.enabled then Mock_.now_us () else ( let t = Mtime_clock.count counter in Mtime.Span.to_float_ns t /. 1e3 ) let on_tracing_error = ref (fun s -> Printf.eprintf "trace-tef error: %s\n%!" s) type event = | E_tick | E_message of { tid: int; msg: string; time_us: float; data: (string * user_data) list; } | E_define_span of { tid: int; name: string; time_us: float; id: span; fun_name: string option; data: (string * user_data) list; } | E_exit_span of { id: span; time_us: float; } | E_add_data of { id: span; data: (string * user_data) list; } | E_enter_manual_span of { tid: int; name: string; time_us: float; id: int; flavor: [ `Sync | `Async ] option; fun_name: string option; data: (string * user_data) list; } | E_exit_manual_span of { tid: int; name: string; time_us: float; flavor: [ `Sync | `Async ] option; data: (string * user_data) list; id: int; } | E_counter of { name: string; tid: int; time_us: float; n: float; } | E_name_process of { name: string } | E_name_thread of { tid: int; name: string; } module Span_tbl = Hashtbl.Make (struct include Int64 let hash : t -> int = Hashtbl.hash end) type span_info = { tid: int; name: string; start_us: float; mutable data: (string * user_data) list; } (** key used to carry a unique "id" for all spans in an async context *) let key_async_id : int Meta_map.Key.t = Meta_map.Key.create () let key_async_data : (string * [ `Sync | `Async ] option) Meta_map.Key.t = Meta_map.Key.create () let key_data : (string * user_data) list ref Meta_map.Key.t = Meta_map.Key.create () (** Writer: knows how to write entries to a file in TEF format *) module Writer = struct type t = { oc: out_channel; mutable first: bool; (** first event? *) buf: Buffer.t; (** Buffer to write into *) must_close: bool; (** Do we have to close the underlying channel [oc]? *) pid: int; } (** A writer to a [out_channel]. It writes JSON entries in an array and closes the array at the end. *) let create ~out () : t = let oc, must_close = match out with | `Stdout -> stdout, false | `Stderr -> stderr, false | `File path -> open_out path, true in let pid = if !Mock_.enabled then 2 else Unix.getpid () in output_char oc '['; { oc; first = true; pid; must_close; buf = Buffer.create 2_048 } let close (self : t) : unit = output_char self.oc ']'; flush self.oc; if self.must_close then close_out self.oc let with_ ~out f = let writer = create ~out () in Fun.protect ~finally:(fun () -> close writer) (fun () -> f writer) let[@inline] flush (self : t) : unit = flush self.oc (** Emit "," if we need, and get the buffer ready *) let emit_sep_and_start_ (self : t) = Buffer.reset self.buf; if self.first then self.first <- false else Buffer.add_string self.buf ",\n" let char = Buffer.add_char let raw_string = Buffer.add_string let str_val (buf : Buffer.t) (s : string) = char buf '"'; let encode_char c = match c with | '"' -> raw_string buf {|\"|} | '\\' -> raw_string buf {|\\|} | '\n' -> raw_string buf {|\n|} | '\b' -> raw_string buf {|\b|} | '\r' -> raw_string buf {|\r|} | '\t' -> raw_string buf {|\t|} | _ when Char.code c <= 0x1f -> raw_string buf {|\u00|}; Printf.bprintf buf "%02x" (Char.code c) | c -> char buf c in String.iter encode_char s; char buf '"' let pp_user_data_ (out : Buffer.t) : [< user_data ] -> unit = function | `None -> raw_string out "null" | `Int i -> Printf.bprintf out "%d" i | `Bool b -> Printf.bprintf out "%b" b | `String s -> str_val out s | `Float f -> Printf.bprintf out "%g" f (* emit args, if not empty. [ppv] is used to print values. *) let emit_args_o_ ppv (out : Buffer.t) args : unit = if args <> [] then ( Printf.bprintf out {json|,"args": {|json}; List.iteri (fun i (n, value) -> if i > 0 then raw_string out ","; Printf.bprintf out {json|"%s":%a|json} n ppv value) args; char out '}' ) let emit_duration_event ~tid ~name ~start ~end_ ~args (self : t) : unit = let dur = end_ -. start in let ts = start in emit_sep_and_start_ self; Printf.bprintf self.buf {json|{"pid":%d,"cat":"","tid": %d,"dur": %.2f,"ts": %.2f,"name":%a,"ph":"X"%a}|json} self.pid tid dur ts str_val name (emit_args_o_ pp_user_data_) args; Buffer.output_buffer self.oc self.buf let emit_manual_begin ~tid ~name ~id ~ts ~args ~flavor (self : t) : unit = emit_sep_and_start_ self; Printf.bprintf self.buf {json|{"pid":%d,"cat":"trace","id":%d,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json} self.pid id tid ts str_val name (match flavor with | None | Some `Async -> 'b' | Some `Sync -> 'B') (emit_args_o_ pp_user_data_) args; Buffer.output_buffer self.oc self.buf let emit_manual_end ~tid ~name ~id ~ts ~flavor ~args (self : t) : unit = emit_sep_and_start_ self; Printf.bprintf self.buf {json|{"pid":%d,"cat":"trace","id":%d,"tid": %d,"ts": %.2f,"name":%a,"ph":"%c"%a}|json} self.pid id tid ts str_val name (match flavor with | None | Some `Async -> 'e' | Some `Sync -> 'E') (emit_args_o_ pp_user_data_) args; Buffer.output_buffer self.oc self.buf let emit_instant_event ~tid ~name ~ts ~args (self : t) : unit = emit_sep_and_start_ self; Printf.bprintf self.buf {json|{"pid":%d,"cat":"","tid": %d,"ts": %.2f,"name":%a,"ph":"I"%a}|json} self.pid tid ts str_val name (emit_args_o_ pp_user_data_) args; Buffer.output_buffer self.oc self.buf let emit_name_thread ~tid ~name (self : t) : unit = emit_sep_and_start_ self; Printf.bprintf self.buf {json|{"pid":%d,"tid": %d,"name":"thread_name","ph":"M"%a}|json} self.pid tid (emit_args_o_ pp_user_data_) [ "name", `String name ]; Buffer.output_buffer self.oc self.buf let emit_name_process ~name (self : t) : unit = emit_sep_and_start_ self; Printf.bprintf self.buf {json|{"pid":%d,"name":"process_name","ph":"M"%a}|json} self.pid (emit_args_o_ pp_user_data_) [ "name", `String name ]; Buffer.output_buffer self.oc self.buf let emit_counter ~name ~tid ~ts (self : t) f : unit = emit_sep_and_start_ self; Printf.bprintf self.buf {json|{"pid":%d,"tid":%d,"ts":%.2f,"name":"c","ph":"C"%a}|json} self.pid tid ts (emit_args_o_ pp_user_data_) [ name, `Float f ]; Buffer.output_buffer self.oc self.buf end (** Background thread, takes events from the queue, puts them in context using local state, and writes fully resolved TEF events to [out]. *) let bg_thread ~out (events : event B_queue.t) : unit = (* open a writer to [out] *) Writer.with_ ~out @@ fun writer -> (* local state, to keep track of span information and implicit stack context *) let spans : span_info Span_tbl.t = Span_tbl.create 32 in (* add function name, if provided, to the metadata *) let add_fun_name_ fun_name data : _ list = match fun_name with | None -> data | Some f -> ("function", `String f) :: data in (* how to deal with an event *) let handle_ev (ev : event) : unit = match ev with | E_tick -> Writer.flush writer | E_message { tid; msg; time_us; data } -> Writer.emit_instant_event ~tid ~name:msg ~ts:time_us ~args:data writer | E_define_span { tid; name; id; time_us; fun_name; data } -> let data = add_fun_name_ fun_name data in let info = { tid; name; start_us = time_us; data } in (* save the span so we find it at exit *) Span_tbl.add spans id info | E_exit_span { id; time_us = stop_us } -> (match Span_tbl.find_opt spans id with | None -> !on_tracing_error (Printf.sprintf "cannot find span %Ld" id) | Some { tid; name; start_us; data } -> Span_tbl.remove spans id; Writer.emit_duration_event ~tid ~name ~start:start_us ~end_:stop_us ~args:data writer) | E_add_data { id; data } -> (match Span_tbl.find_opt spans id with | None -> !on_tracing_error (Printf.sprintf "cannot find span %Ld" id) | Some info -> info.data <- List.rev_append data info.data) | E_enter_manual_span { tid; time_us; name; id; data; fun_name; flavor } -> let data = add_fun_name_ fun_name data in Writer.emit_manual_begin ~tid ~name ~id ~ts:time_us ~args:data ~flavor writer | E_exit_manual_span { tid; time_us; name; id; flavor; data } -> Writer.emit_manual_end ~tid ~name ~id ~ts:time_us ~flavor ~args:data writer | E_counter { tid; name; time_us; n } -> Writer.emit_counter ~name ~tid ~ts:time_us writer n | E_name_process { name } -> Writer.emit_name_process ~name writer | E_name_thread { tid; name } -> Writer.emit_name_thread ~tid ~name writer in try while true do (* get all the events in the incoming blocking queue, in one single critical section. *) let local = B_queue.pop_all events in List.iter handle_ev local done with B_queue.Closed -> (* write a message about us closing *) Writer.emit_instant_event ~name:"tef-worker.exit" ~tid:(Thread.id @@ Thread.self ()) ~ts:(now_us ()) ~args:[] writer; (* warn if app didn't close all spans *) if Span_tbl.length spans > 0 then Printf.eprintf "trace-tef: warning: %d spans were not closed\n%!" (Span_tbl.length spans); () (** Thread that simply regularly "ticks", sending events to the background thread so it has a chance to write to the file *) let tick_thread events : unit = try while true do Thread.delay 0.5; B_queue.push events E_tick done with B_queue.Closed -> () type output = [ `Stdout | `Stderr | `File of string ] let collector ~out () : collector = let module M = struct let active = A.make true (** generator for span ids *) let span_id_gen_ = A.make 0 (* queue of messages to write *) let events : event B_queue.t = B_queue.create () (** writer thread. It receives events and writes them to [oc]. *) let t_write : Thread.t = Thread.create (fun () -> bg_thread ~out events) () (** ticker thread, regularly sends a message to the writer thread. no need to join it. *) let _t_tick : Thread.t = Thread.create (fun () -> tick_thread events) () let shutdown () = if A.exchange active false then ( B_queue.close events; (* wait for writer thread to be done. The writer thread will exit after processing remaining events because the queue is now closed *) Thread.join t_write ) let get_tid_ () : int = if !Mock_.enabled then 3 else Thread.id (Thread.self ()) let[@inline] enter_span_ ~fun_name ~data name : span = let span = Int64.of_int (A.fetch_and_add span_id_gen_ 1) in let tid = get_tid_ () in let time_us = now_us () in B_queue.push events (E_define_span { tid; name; time_us; id = span; fun_name; data }); span let enter_span ~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name : span = enter_span_ ~fun_name ~data name let exit_span span : unit = let time_us = now_us () in B_queue.push events (E_exit_span { id = span; time_us }) (* re-raise exception with its backtrace *) external reraise : exn -> 'a = "%reraise" let with_span ~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name f = let span = enter_span_ ~fun_name ~data name in try let x = f span in exit_span span; x with exn -> exit_span span; reraise exn let add_data_to_span span data = if data <> [] then B_queue.push events (E_add_data { id = span; data }) let enter_manual_span ~(parent : explicit_span option) ~flavor ~__FUNCTION__:fun_name ~__FILE__:_ ~__LINE__:_ ~data name : explicit_span = (* get the id, or make a new one *) let id = match parent with | Some m -> Meta_map.find_exn key_async_id m.meta | None -> A.fetch_and_add span_id_gen_ 1 in let time_us = now_us () in B_queue.push events (E_enter_manual_span { id; time_us; tid = get_tid_ (); data; name; fun_name; flavor }); { span = 0L; meta = Meta_map.( empty |> add key_async_id id |> add key_async_data (name, flavor)); } let exit_manual_span (es : explicit_span) : unit = let id = Meta_map.find_exn key_async_id es.meta in let name, flavor = Meta_map.find_exn key_async_data es.meta in let data = try !(Meta_map.find_exn key_data es.meta) with Not_found -> [] in let time_us = now_us () in let tid = get_tid_ () in B_queue.push events (E_exit_manual_span { tid; id; name; time_us; data; flavor }) let add_data_to_manual_span (es : explicit_span) data = if data <> [] then ( let data_ref, add = try Meta_map.find_exn key_data es.meta, false with Not_found -> ref [], true in let new_data = List.rev_append data !data_ref in data_ref := new_data; if add then es.meta <- Meta_map.add key_data data_ref es.meta ) let message ?span:_ ~data msg : unit = let time_us = now_us () in let tid = get_tid_ () in B_queue.push events (E_message { tid; time_us; msg; data }) let counter_float ~data:_ name f = let time_us = now_us () in let tid = get_tid_ () in B_queue.push events (E_counter { name; n = f; time_us; tid }) let counter_int ~data name i = counter_float ~data name (float_of_int i) let name_process name : unit = B_queue.push events (E_name_process { name }) let name_thread name : unit = let tid = get_tid_ () in B_queue.push events (E_name_thread { tid; name }) end in (module M) let setup ?(out = `Env) () = match out with | `Stderr -> Trace_core.setup_collector @@ collector ~out:`Stderr () | `Stdout -> Trace_core.setup_collector @@ collector ~out:`Stdout () | `File path -> Trace_core.setup_collector @@ collector ~out:(`File path) () | `Env -> (match Sys.getenv_opt "TRACE" with | Some ("1" | "true") -> let path = "trace.json" in let c = collector ~out:(`File path) () in Trace_core.setup_collector c | Some "stdout" -> Trace_core.setup_collector @@ collector ~out:`Stdout () | Some "stderr" -> Trace_core.setup_collector @@ collector ~out:`Stderr () | Some path -> let c = collector ~out:(`File path) () in Trace_core.setup_collector c | None -> ()) let with_setup ?out () f = setup ?out (); Fun.protect ~finally:Trace_core.shutdown f module Internal_ = struct let mock_all_ () = Mock_.enabled := true let on_tracing_error = on_tracing_error end