Source file event_generator.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
open Core
open Poly
open Core_profiler
open Core_profiler_disabled
module Time_ns = Time_ns_unix
type group_state =
{ mutable current_session : int
; mutable session_at_count : int
}
type point_state =
{ mutable time : Time_ns.t
; mutable session : int
; mutable at_index : int
; mutable value : int
}
type t =
{ id_map : Reader.Header.t
; epoch : Profiler_epoch.t
; group_state : (group_state, read) Id_table.t
; point_state : (point_state, read) Id_table.t
; interests : (Probe_id.t Interest.Raw.t array, read) Id_table.t
; buffer : (read, Iobuf.no_seek) Iobuf.t
}
type timer_path =
{ interest : Probe_id.t Interest.Raw.t
; time : Time_ns.t
; time_delta : Time_ns.Span.t
}
[@@deriving sexp, compare]
type probe_path =
{ interest : Probe_id.t Interest.Raw.t
; time : Time_ns.t
; time_delta : Time_ns.Span.t
; value : int
; delta : int
}
[@@deriving sexp, compare]
type event =
| Timer of Probe_id.t Interest.Raw.t * Time_ns.t
| Probe of Probe_id.t Interest.Raw.t * Time_ns.t * int
| Timer_path of timer_path
| Probe_path of probe_path
[@@deriving sexp, compare]
let event_time = function
| Timer (_, time) -> time
| Probe (_, time, _) -> time
| Timer_path { time; _ } -> time
| Probe_path { time; _ } -> time
let create epoch id_map interests buffer =
let interests_lookup =
Reader.Header.create_table id_map ~groups:false Interest.Raw.I.Set.empty
in
List.iter interests ~f:(fun (interest : Probe_id.t Interest.Raw.t) ->
let point =
match interest with
| Single id -> id
| Group_point (_grp, id) -> id
| Group_path (_grp, path) -> path.last
in
Id_table.find_exn interests_lookup point
|> Fn.flip Set.add interest
|> Id_table.set_exn interests_lookup point
);
let interests_lookup =
Id_table.map ~f:(fun _id set -> Set.to_array set) interests_lookup
in
{ id_map
; epoch
; group_state = Id_table.filter_map id_map ~f:(fun _id ->
match header_item with
| Reader.Header.Item.Group _ ->
Some
{ current_session = 0
; session_at_count = 0
}
| Single _
| Group_point _ -> None
)
; point_state = Id_table.filter_map id_map ~f:(fun _id ->
match header_item with
| Reader.Header.Item.Group_point _ ->
Some
{ session = -1
; time = Time_ns.epoch
; value = -1
; at_index = 0
}
| Single _
| Group _ -> None
)
; interests = Id_table.read_only interests_lookup
; buffer = Iobuf.no_seek (Iobuf.read_only buffer)
}
let at_group_point t ~point_id ~group_id time value =
let group_state = Id_table.find_exn t.group_state group_id in
let point_state = Id_table.find_exn t.point_state point_id in
point_state.session <- group_state.current_session;
point_state.at_index <- group_state.session_at_count;
point_state.time <- time;
Option.iter value ~f:(fun value -> point_state.value <- value);
group_state.session_at_count <- group_state.session_at_count + 1
let test_path t group_state (path : Probe_id.t Path.t) =
let current_session = group_state.current_session in
let test point last_at_index =
match point with
| Path.Direct_point id ->
let point_state = Id_table.find_exn t.point_state id in
let at_index = point_state.at_index in
if (point_state.session = current_session) && (at_index = last_at_index - 1)
then `Point_ok at_index
else `No_match
| Point id ->
let point_state = Id_table.find_exn t.point_state id in
let at_index = point_state.at_index in
if (point_state.session = current_session) && (at_index < last_at_index)
then `Point_ok at_index
else `No_match
in
let rec loop points last_at_index =
match points with
| [] ->
begin
match test path.first last_at_index with
| `Point_ok _ -> true
| `No_match -> false
end
| pt :: points ->
begin
match test pt last_at_index with
| `Point_ok at_index -> loop points at_index
| `No_match -> false
end
in
loop path.Path.rest_rev group_state.session_at_count
let iter_events t ~f =
Reader.iter_short_messages t.buffer t.epoch t.id_map ~f:(fun message ->
let id = Reader.Short_message.id message in
let = Id_table.find_exn t.id_map id in
let group_point_parent =
match header_item with
| Group_point { parent; _ } -> Some parent
| Single _
| Group _ -> None
in
let group_state = lazy (
Option.value_exn group_point_parent
|> Id_table.find_exn t.group_state
)
in
let at_group_point' time value =
Option.iter group_point_parent ~f:(fun parent ->
at_group_point t ~point_id:id ~group_id:parent time value
)
in
let interests = Id_table.find t.interests id in
match message with
| Timer (id, time) ->
Array.iter (Option.value_exn interests) ~f:(fun interest ->
match interest with
| Single id2
| Group_point (_, id2) ->
assert (id = id2);
f (Timer (interest, time))
| Group_path (_gp, path) ->
assert (id = Path.last path);
if test_path t (Lazy.force group_state) path
then begin
let first_point_state = Id_table.find_exn t.point_state (Path.first path) in
let time_delta = Time_ns.diff time first_point_state.time in
f (Timer_path { interest; time; time_delta })
end
);
at_group_point' time None
| Probe (id, time, value) ->
Array.iter (Option.value_exn interests) ~f:(fun interest ->
match interest with
| Single id2
| Group_point (_, id2) ->
assert (id = id2);
f (Probe (interest, time, value))
| Group_path (_gp, path) ->
assert (id = Path.last path);
if test_path t (Lazy.force group_state) path
then begin
let first_point_state = Id_table.find_exn t.point_state (Path.first path) in
let time_delta = Time_ns.diff time first_point_state.time in
let delta = value - first_point_state.value in
f (Probe_path { interest; time; time_delta; delta; value })
end
);
at_group_point' time (Some value)
| Group_reset (id, _) ->
let group_state = Id_table.find_exn t.group_state id in
group_state.current_session <- group_state.current_session + 1
)
let%test_module "iter_group_events" = (module struct
module Protocol = Core_profiler.Protocol
let to_id = Probe_id.of_int_exn
let to_time_delta = Time_ns.Span.of_int_sec
let to_time n = Profiler_epoch.add Protocol.Writer.epoch (to_time_delta n)
let =
let open Protocol in
protect
~finally:Buffer.Unsafe_internals.reset
~f:(fun () ->
Writer.Unsafe_internals.write_epoch ();
Writer.write_new_group (to_id 0) "group" (Probe_type.Probe Profiler_units.Seconds);
List.iter
[ ("a", 1); ("b", 2); ("c", 3); ("d", 4); ("e", 5); ("f", 6); ("g", 7) ]
~f:(fun (name, id) ->
Writer.write_new_group_point ~id:(to_id id) ~group_id:(to_id 0) name [||]
);
Writer.Unsafe_internals.write_end_of_header ();
Buffer.get_header_chunk ()
|> Reader.consume_header
|> snd
)
let name_map = Util.Name_map.of_id_map header
let = Map.find_exn name_map.groups "group"
let to_path s =
Path.string_t_of_string s
|> Option.value_exn
|> Fn.flip Path.lookup_ids header_group
let to_path_int s = Interest.Raw.Group_path (to_id 0, to_path s)
let run_case ats interests =
protect
~finally:Protocol.Buffer.Unsafe_internals.reset
~f:(fun () ->
let at id n =
Protocol.Writer.write_probe_at (to_id id) (to_time n) n
in
String.to_list ats
|> List.iteri ~f:(fun n c ->
match c with
| 'a' -> at 1 n
| 'b' -> at 2 n
| 'c' -> at 3 n
| 'd' -> at 4 n
| 'e' -> at 5 n
| 'f' -> at 6 n
| 'g' -> at 7 n
| 'r' -> Protocol.Writer.write_group_reset (to_id 0) (to_time n)
| ' ' -> ()
| _ -> failwith "Bad test case"
);
let buffer =
match Protocol.Buffer.get_chunks () with
| [x] -> x
| _ -> failwith "expected one chunk"
in
let ev_gen = create Protocol.Writer.epoch header interests buffer in
let events_rev = ref [] in
iter_events ev_gen ~f:(fun x -> events_rev := x :: !events_rev);
List.rev !events_rev
)
let to_event interest value delta =
Probe_path
{ interest
; time = to_time value; time_delta = to_time_delta delta
; value; delta
}
let%test_unit "multiple simultaneous events" =
[%test_eq: event list]
(run_case "abc" [ to_path_int "a..c"; to_path_int "b,c" ])
[ to_event (to_path_int "b,c") 2 1; to_event (to_path_int "a..c") 2 2 ]
let%test_unit "reset" =
[%test_eq: event list] (run_case "aaa r ccc" [ to_path_int "a..c" ]) []
let%test_unit "directness" =
[%test_eq: event list]
(run_case "cd d dc r c d ced r ced" [ to_path_int "c,d" ])
[ to_event (to_path_int "c,d") 1 1; to_event (to_path_int "c,d") 14 4 ]
let%test_unit "repeated" =
let p = to_path_int "a,a" in
[%test_eq: event list]
(run_case "aaaaa r a" [ p ])
[ to_event p 1 1; to_event p 2 1; to_event p 3 1; to_event p 4 1 ]
end)