package opentelemetry-client-cohttp-lwt
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>
Collector client for opentelemetry, using cohttp + lwt
Install
dune-project
Dependency
Authors
Maintainers
Sources
opentelemetry-0.13.tbz
sha256=e29a0aa7168357ebbed0f50b1ba9374bc277b280935531e77d90183c732b98f6
sha512=2fd9dcf03695be7b7888c5fb3d0dbe2acdcfbb7c99dfa7b2ff2fc0bb626c4e35ec8d2be71632e440b9df1cc79529c5258ca98876a373a41cff48e4b1757c5767
doc/src/opentelemetry-client-cohttp-lwt/opentelemetry_client_cohttp_lwt.ml.html
Source file opentelemetry_client_cohttp_lwt.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502(* https://github.com/open-telemetry/oteps/blob/main/text/0035-opentelemetry-protocol.md https://github.com/open-telemetry/oteps/blob/main/text/0099-otlp-http.md *) module OT = Opentelemetry module Config = Config module Signal = Opentelemetry_client.Signal module Batch = Opentelemetry_client.Batch open Opentelemetry open Common_ let set_headers = Config.Env.set_headers let get_headers = Config.Env.get_headers external reraise : exn -> 'a = "%reraise" (** This is equivalent to [Lwt.reraise]. We inline it here so we don't force to use Lwt's latest version *) let needs_gc_metrics = Atomic.make false let last_gc_metrics = Atomic.make (Mtime_clock.now ()) let timeout_gc_metrics = Mtime.Span.(20 * s) let gc_metrics = ref [] (* side channel for GC, appended to {!E_metrics}'s data *) (* capture current GC metrics if {!needs_gc_metrics} is true, or it has been a long time since the last GC metrics collection, and push them into {!gc_metrics} for later collection *) let sample_gc_metrics_if_needed () = let now = Mtime_clock.now () in let alarm = Atomic.compare_and_set needs_gc_metrics true false in let timeout () = let elapsed = Mtime.span now (Atomic.get last_gc_metrics) in Mtime.Span.compare elapsed timeout_gc_metrics > 0 in if alarm || timeout () then ( Atomic.set last_gc_metrics now; let l = OT.Metrics.make_resource_metrics ~attrs:(Opentelemetry.GC_metrics.get_runtime_attributes ()) @@ Opentelemetry.GC_metrics.get_metrics () in gc_metrics := l :: !gc_metrics ) type error = [ `Status of int * Opentelemetry.Proto.Status.status | `Failure of string | `Sysbreak ] let n_errors = Atomic.make 0 let n_dropped = Atomic.make 0 let report_err_ = function | `Sysbreak -> Printf.eprintf "opentelemetry: ctrl-c captured, stopping\n%!" | `Failure msg -> Format.eprintf "@[<2>opentelemetry: export failed: %s@]@." msg | `Status (code, { Opentelemetry.Proto.Status.code = scode; message; details }) -> let pp_details out l = List.iter (fun s -> Format.fprintf out "%S;@ " (Bytes.unsafe_to_string s)) l in Format.eprintf "@[<2>opentelemetry: export failed with@ http code=%d@ status \ {@[code=%ld;@ message=%S;@ details=[@[%a@]]@]}@]@." code scode (Bytes.unsafe_to_string message) pp_details details module Httpc : sig type t val create : unit -> t val send : t -> url:string -> decode:[ `Dec of Pbrt.Decoder.t -> 'a | `Ret of 'a ] -> string -> ('a, error) result Lwt.t val cleanup : t -> unit end = struct open Opentelemetry.Proto open Lwt.Syntax module Httpc = Cohttp_lwt_unix.Client type t = unit let create () : t = () let cleanup _self = () (* send the content to the remote endpoint/path *) let send (_self : t) ~url ~decode (bod : string) : ('a, error) result Lwt.t = let uri = Uri.of_string url in let open Cohttp in let headers = Header.(add_list (init ()) (Config.Env.get_headers ())) in let headers = Header.(add headers "Content-Type" "application/x-protobuf") in let body = Cohttp_lwt.Body.of_string bod in let* r = try%lwt let+ r = Httpc.post ~headers ~body uri in Ok r with e -> Lwt.return @@ Error e in match r with | Error e -> let err = `Failure (spf "sending signals via http POST to %S\nfailed with:\n%s" url (Printexc.to_string e)) in Lwt.return @@ Error err | Ok (resp, body) -> let* body = Cohttp_lwt.Body.to_string body in let code = Response.status resp |> Code.code_of_status in if not (Code.is_error code) then ( match decode with | `Ret x -> Lwt.return @@ Ok x | `Dec f -> let dec = Pbrt.Decoder.of_string body in let r = try Ok (f dec) with e -> let bt = Printexc.get_backtrace () in Error (`Failure (spf "decoding failed with:\n%s\n%s" (Printexc.to_string e) bt)) in Lwt.return r ) else ( let dec = Pbrt.Decoder.of_string body in let r = try let status = Status.decode_pb_status dec in Error (`Status (code, status)) with e -> let bt = Printexc.get_backtrace () in Error (`Failure (spf "httpc: decoding of status (url=%S, code=%d) failed with:\n\ %s\n\ status: %S\n\ %s" url code (Printexc.to_string e) body bt)) in Lwt.return r ) end (** An emitter. This is used by {!Backend} below to forward traces/metrics/… from the program to whatever collector client we have. *) module type EMITTER = sig open Opentelemetry.Proto val push_trace : Trace.resource_spans list -> unit val push_metrics : Metrics.resource_metrics list -> unit val push_logs : Logs.resource_logs list -> unit val set_on_tick_callbacks : (unit -> unit) AList.t -> unit val tick : unit -> unit val cleanup : on_done:(unit -> unit) -> unit -> unit end (* make an emitter. exceptions inside should be caught, see https://opentelemetry.io/docs/reference/specification/error-handling/ *) let mk_emitter ~stop ~(config : Config.t) () : (module EMITTER) = let open Proto in let open Lwt.Syntax in (* local helpers *) let open struct let timeout = if config.batch_timeout_ms > 0 then Some Mtime.Span.(config.batch_timeout_ms * ms) else None let batch_traces : Trace.resource_spans Batch.t = Batch.make ?batch:config.batch_traces ?timeout () let batch_metrics : Metrics.resource_metrics Batch.t = Batch.make ?batch:config.batch_metrics ?timeout () let batch_logs : Logs.resource_logs Batch.t = Batch.make ?batch:config.batch_logs ?timeout () let on_tick_cbs_ = Atomic.make (AList.make ()) let set_on_tick_callbacks = Atomic.set on_tick_cbs_ let send_http_ (httpc : Httpc.t) ~url data : unit Lwt.t = let* r = Httpc.send httpc ~url ~decode:(`Ret ()) data in match r with | Ok () -> Lwt.return () | Error `Sysbreak -> Printf.eprintf "ctrl-c captured, stopping\n%!"; Atomic.set stop true; Lwt.return () | Error err -> (* TODO: log error _via_ otel? *) Atomic.incr n_errors; report_err_ err; (* avoid crazy error loop *) Lwt_unix.sleep 3. let send_metrics_http client (l : Metrics.resource_metrics list) = Signal.Encode.metrics l |> send_http_ client ~url:config.url_metrics let send_traces_http client (l : Trace.resource_spans list) = Signal.Encode.traces l |> send_http_ client ~url:config.url_traces let send_logs_http client (l : Logs.resource_logs list) = Signal.Encode.logs l |> send_http_ client ~url:config.url_logs (* emit metrics, if the batch is full or timeout lapsed *) let emit_metrics_maybe ~now ?force httpc : bool Lwt.t = match Batch.pop_if_ready ?force ~now batch_metrics with | None -> Lwt.return false | Some l -> let batch = !gc_metrics @ l in gc_metrics := []; let+ () = send_metrics_http httpc batch in true let emit_traces_maybe ~now ?force httpc : bool Lwt.t = match Batch.pop_if_ready ?force ~now batch_traces with | None -> Lwt.return false | Some l -> let+ () = send_traces_http httpc l in true let emit_logs_maybe ~now ?force httpc : bool Lwt.t = match Batch.pop_if_ready ?force ~now batch_logs with | None -> Lwt.return false | Some l -> let+ () = send_logs_http httpc l in true let[@inline] guard_exn_ where f = try f () with e -> let bt = Printexc.get_backtrace () in Printf.eprintf "opentelemetry-curl: uncaught exception in %s: %s\n%s\n%!" where (Printexc.to_string e) bt let emit_all_force (httpc : Httpc.t) : unit Lwt.t = let now = Mtime_clock.now () in let+ (_ : bool) = emit_traces_maybe ~now ~force:true httpc and+ (_ : bool) = emit_logs_maybe ~now ~force:true httpc and+ (_ : bool) = emit_metrics_maybe ~now ~force:true httpc in () (* thread that calls [tick()] regularly, to help enforce timeouts *) let setup_ticker_thread ~tick ~finally () = let rec tick_thread () = if Atomic.get stop then ( finally (); Lwt.return () ) else let* () = Lwt_unix.sleep 0.5 in let* () = tick () in tick_thread () in Lwt.async tick_thread end in let httpc = Httpc.create () in let module M = struct (* we make sure that this is thread-safe, even though we don't have a background thread. There can still be a ticker thread, and there can also be several user threads that produce spans and call the emit functions. *) let push_to_batch b e = match Batch.push b e with | `Ok -> () | `Dropped -> Atomic.incr n_errors let push_trace e = let@ () = guard_exn_ "push trace" in push_to_batch batch_traces e; let now = Mtime_clock.now () in Lwt.async (fun () -> let+ (_ : bool) = emit_traces_maybe ~now httpc in ()) let push_metrics e = let@ () = guard_exn_ "push metrics" in sample_gc_metrics_if_needed (); push_to_batch batch_metrics e; let now = Mtime_clock.now () in Lwt.async (fun () -> let+ (_ : bool) = emit_metrics_maybe ~now httpc in ()) let push_logs e = let@ () = guard_exn_ "push logs" in push_to_batch batch_logs e; let now = Mtime_clock.now () in Lwt.async (fun () -> let+ (_ : bool) = emit_logs_maybe ~now httpc in ()) let set_on_tick_callbacks = set_on_tick_callbacks let tick_ () = if Config.Env.get_debug () then Printf.eprintf "tick (from %d)\n%!" (tid ()); sample_gc_metrics_if_needed (); List.iter (fun f -> try f () with e -> Printf.eprintf "on tick callback raised: %s\n" (Printexc.to_string e)) (AList.get @@ Atomic.get on_tick_cbs_); let now = Mtime_clock.now () in let+ (_ : bool) = emit_traces_maybe ~now httpc and+ (_ : bool) = emit_logs_maybe ~now httpc and+ (_ : bool) = emit_metrics_maybe ~now httpc in () let () = setup_ticker_thread ~tick:tick_ ~finally:ignore () (* if called in a blocking context: work in the background *) let tick () = Lwt.async tick_ let cleanup ~on_done () = if Config.Env.get_debug () then Printf.eprintf "opentelemetry: exiting…\n%!"; Lwt.async (fun () -> let* () = emit_all_force httpc in Httpc.cleanup httpc; on_done (); Lwt.return ()) end in (module M) module Backend (Arg : sig val stop : bool Atomic.t val config : Config.t end) () : Opentelemetry.Collector.BACKEND = struct include (val mk_emitter ~stop:Arg.stop ~config:Arg.config ()) open Opentelemetry.Proto open Opentelemetry.Collector let send_trace : Trace.resource_spans list sender = { send = (fun l ~ret -> (if Config.Env.get_debug () then let@ () = Lock.with_lock in Format.eprintf "send spans %a@." (Format.pp_print_list Trace.pp_resource_spans) l); push_trace l; ret ()); } let last_sent_metrics = Atomic.make (Mtime_clock.now ()) let timeout_sent_metrics = Mtime.Span.(5 * s) (* send metrics from time to time *) let signal_emit_gc_metrics () = if Config.Env.get_debug () then Printf.eprintf "opentelemetry: emit GC metrics requested\n%!"; Atomic.set needs_gc_metrics true let additional_metrics () : Metrics.resource_metrics list = (* add exporter metrics to the lot? *) let last_emit = Atomic.get last_sent_metrics in let now = Mtime_clock.now () in let add_own_metrics = let elapsed = Mtime.span last_emit now in Mtime.Span.compare elapsed timeout_sent_metrics > 0 in (* there is a possible race condition here, as several threads might update metrics at the same time. But that's harmless. *) if add_own_metrics then ( Atomic.set last_sent_metrics now; let open OT.Metrics in [ make_resource_metrics [ sum ~name:"otel.export.dropped" ~is_monotonic:true [ int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) ~now:(Mtime.to_uint64_ns now) (Atomic.get n_dropped); ]; sum ~name:"otel.export.errors" ~is_monotonic:true [ int ~start_time_unix_nano:(Mtime.to_uint64_ns last_emit) ~now:(Mtime.to_uint64_ns now) (Atomic.get n_errors); ]; ]; ] ) else [] let send_metrics : Metrics.resource_metrics list sender = { send = (fun m ~ret -> (if Config.Env.get_debug () then let@ () = Lock.with_lock in Format.eprintf "send metrics %a@." (Format.pp_print_list Metrics.pp_resource_metrics) m); let m = List.rev_append (additional_metrics ()) m in push_metrics m; ret ()); } let send_logs : Logs.resource_logs list sender = { send = (fun m ~ret -> (if Config.Env.get_debug () then let@ () = Lock.with_lock in Format.eprintf "send logs %a@." (Format.pp_print_list Logs.pp_resource_logs) m); push_logs m; ret ()); } end let create_backend ?(stop = Atomic.make false) ?(config = Config.make ()) () = let module B = Backend (struct let stop = stop let config = config end) () in (module B : OT.Collector.BACKEND) let setup_ ?stop ?config () : unit = let backend = create_backend ?stop ?config () in OT.Collector.set_backend backend; () let setup ?stop ?config ?(enable = true) () = if enable then setup_ ?stop ?config () let remove_backend () : unit Lwt.t = let done_fut, done_u = Lwt.wait () in OT.Collector.remove_backend ~on_done:(fun () -> Lwt.wakeup_later done_u ()) (); done_fut let with_setup ?stop ?(config = Config.make ()) ?(enable = true) () f : _ Lwt.t = if enable then ( let open Lwt.Syntax in setup_ ?stop ~config (); Lwt.catch (fun () -> let* res = f () in let+ () = remove_backend () in res) (fun exn -> let* () = remove_backend () in reraise exn) ) else f ()
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>