Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file event_generator.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365openCoreopenPolyopenCore_profileropenCore_profiler_disabled(* (As per Path.readme,) This does not have the full power of regular expressions
Notably, the same point may not appear in a path twice, except for when its
second appearance is the last point in the path. *)(* The [current_session] of all groups is initialised to 0; and the session of
all points is initialised to -1.
To save memory and avoid allocations, junk values (-1) are stored in
[point_state.at_index], [point_state.value] and [point_state.time] at startup.
The mis-match in [session] between group and point prevents the junk values in
[at_index], [value] and [time] from ever being used. *)typegroup_state={mutablecurrent_session:int(* The number of calls to [at] (on points in this group) in this session *);mutablesession_at_count:int}typepoint_state={mutabletime:Time_ns.t;mutablesession:int(* [at_index]: when this point was marked in this session
(i.e., set to [group_state.session_at_count] when marked) *);mutableat_index:int;mutablevalue:int}typet={id_map:Reader.Header.t;epoch:Profiler_epoch.t(* Read only tables, but the cells are mutable records. *);group_state:(group_state,read)Id_table.t;point_state:(point_state,read)Id_table.t(* Interests at this point, or interests in paths ending at this point. *);interests:(Probe_id.tInterest.Raw.tarray,read)Id_table.t;buffer:(read,Iobuf.no_seek)Iobuf.t}typetimer_path={interest:Probe_id.tInterest.Raw.t;time:Time_ns.t;time_delta:Time_ns.Span.t}[@@derivingsexp,compare]typeprobe_path={interest:Probe_id.tInterest.Raw.t;time:Time_ns.t;time_delta:Time_ns.Span.t;value:int;delta:int}[@@derivingsexp,compare]typeevent=|TimerofProbe_id.tInterest.Raw.t*Time_ns.t|ProbeofProbe_id.tInterest.Raw.t*Time_ns.t*int|Timer_pathoftimer_path|Probe_pathofprobe_path[@@derivingsexp,compare]letevent_time=function|Timer(_,time)->time|Probe(_,time,_)->time|Timer_path{time;_}->time|Probe_path{time;_}->timeletcreateepochid_mapinterestsbuffer=letinterests_lookup=Reader.Header.create_tableid_map~groups:falseInterest.Raw.I.Set.emptyinList.iterinterests~f:(fun(interest:Probe_id.tInterest.Raw.t)->letpoint=matchinterestwith|Singleid->id|Group_point(_grp,id)->id|Group_path(_grp,path)->path.lastinId_table.find_exninterests_lookuppoint|>Fn.flipSet.addinterest|>Id_table.set_exninterests_lookuppoint);letinterests_lookup=Id_table.map~f:(fun_idset->Set.to_arrayset)interests_lookupin{id_map;epoch;group_state=Id_table.filter_mapid_map~f:(fun_idheader_item->matchheader_itemwith|Reader.Header.Item.Group_->Some{current_session=0;session_at_count=0}|Single_|Group_point_->None);point_state=Id_table.filter_mapid_map~f:(fun_idheader_item->matchheader_itemwith|Reader.Header.Item.Group_point_->Some{session=-1;time=Time_ns.epoch;value=-1;at_index=0}|Single_|Group_->None);interests=Id_table.read_onlyinterests_lookup;buffer=Iobuf.no_seek(Iobuf.read_onlybuffer)}letat_group_pointt~point_id~group_idtimevalue=letgroup_state=Id_table.find_exnt.group_stategroup_idinletpoint_state=Id_table.find_exnt.point_statepoint_idinpoint_state.session<-group_state.current_session;point_state.at_index<-group_state.session_at_count;point_state.time<-time;Option.itervalue~f:(funvalue->point_state.value<-value);group_state.session_at_count<-group_state.session_at_count+1lettest_pathtgroup_state(path:Probe_id.tPath.t)=letcurrent_session=group_state.current_sessionin(* [last_at_index] is the [at_index] of the previous point that was considered.
It's used to check whether a point was marked before the previous point
(it walks from the last point to the first), and whether an edge was direct. *)lettestpointlast_at_index=matchpointwith|Path.Direct_pointid->letpoint_state=Id_table.find_exnt.point_stateidinletat_index=point_state.at_indexinif(point_state.session=current_session)&&(at_index=last_at_index-1)then`Point_okat_indexelse`No_match|Pointid->letpoint_state=Id_table.find_exnt.point_stateidinletat_index=point_state.at_indexinif(point_state.session=current_session)&&(at_index<last_at_index)then`Point_okat_indexelse`No_matchin(* Note that we walk from the last point to the first. *)letreclooppointslast_at_index=matchpointswith|[]->beginmatchtestpath.firstlast_at_indexwith|`Point_ok_->true|`No_match->falseend|pt::points->beginmatchtestptlast_at_indexwith|`Point_okat_index->looppointsat_index|`No_match->falseendinlooppath.Path.rest_revgroup_state.session_at_countletiter_eventst~f=Reader.iter_short_messagest.buffert.epocht.id_map~f:(funmessage->letid=Reader.Short_message.idmessageinletheader_item=Id_table.find_exnt.id_mapidin(* This variable represents both /whether/ this is a group point, and if so,
/what/ its group id is *)letgroup_point_parent=matchheader_itemwith|Group_point{parent;_}->Someparent|Single_|Group_->Noneinletgroup_state=lazy(Option.value_exngroup_point_parent|>Id_table.find_exnt.group_state)inletat_group_point'timevalue=Option.itergroup_point_parent~f:(funparent->at_group_pointt~point_id:id~group_id:parenttimevalue)inletinterests=Id_table.findt.interestsidinmatchmessagewith|Timer(id,time)->Array.iter(Option.value_exninterests)~f:(funinterest->matchinterestwith|Singleid2|Group_point(_,id2)->assert(id=id2);f(Timer(interest,time))|Group_path(_gp,path)->assert(id=Path.lastpath);iftest_patht(Lazy.forcegroup_state)paththenbeginletfirst_point_state=Id_table.find_exnt.point_state(Path.firstpath)inlettime_delta=Time_ns.difftimefirst_point_state.timeinf(Timer_path{interest;time;time_delta})end);at_group_point'timeNone|Probe(id,time,value)->Array.iter(Option.value_exninterests)~f:(funinterest->matchinterestwith|Singleid2|Group_point(_,id2)->assert(id=id2);f(Probe(interest,time,value))|Group_path(_gp,path)->assert(id=Path.lastpath);iftest_patht(Lazy.forcegroup_state)paththenbeginletfirst_point_state=Id_table.find_exnt.point_state(Path.firstpath)inlettime_delta=Time_ns.difftimefirst_point_state.timeinletdelta=value-first_point_state.valueinf(Probe_path{interest;time;time_delta;delta;value})end);at_group_point'time(Somevalue)|Group_reset(id,_)->letgroup_state=Id_table.find_exnt.group_stateidingroup_state.current_session<-group_state.current_session+1)let%test_module"iter_group_events"=(modulestructmoduleProtocol=Core_profiler.Protocolletto_id=Probe_id.of_int_exnletto_time_delta=Time_ns.Span.of_int_secletto_timen=Profiler_epoch.addProtocol.Writer.epoch(to_time_deltan)letheader=letopenProtocolinprotect~finally:Buffer.Unsafe_internals.reset~f:(fun()->Writer.Unsafe_internals.write_epoch();Writer.write_new_group(to_id0)"group"(Probe_type.ProbeProfiler_units.Seconds);List.iter[("a",1);("b",2);("c",3);("d",4);("e",5);("f",6);("g",7)]~f:(fun(name,id)->Writer.write_new_group_point~id:(to_idid)~group_id:(to_id0)name[||]);Writer.Unsafe_internals.write_end_of_header();Buffer.get_header_chunk()|>Reader.consume_header|>snd)letname_map=Util.Name_map.of_id_mapheaderletheader_group=Map.find_exnname_map.groups"group"letto_paths=Path.string_t_of_strings|>Option.value_exn|>Fn.flipPath.lookup_idsheader_groupletto_path_ints=Interest.Raw.Group_path(to_id0,to_paths)letrun_caseatsinterests=protect~finally:Protocol.Buffer.Unsafe_internals.reset~f:(fun()->letatidn=Protocol.Writer.write_probe_at(to_idid)(to_timen)ninString.to_listats|>List.iteri~f:(funnc->matchcwith|'a'->at1n|'b'->at2n|'c'->at3n|'d'->at4n|'e'->at5n|'f'->at6n|'g'->at7n|'r'->Protocol.Writer.write_group_reset(to_id0)(to_timen)|' '->()|_->failwith"Bad test case");letbuffer=matchProtocol.Buffer.get_chunks()with|[x]->x|_->failwith"expected one chunk"inletev_gen=createProtocol.Writer.epochheaderinterestsbufferinletevents_rev=ref[]initer_eventsev_gen~f:(funx->events_rev:=x::!events_rev);List.rev!events_rev)letto_eventinterestvaluedelta=Probe_path{interest;time=to_timevalue;time_delta=to_time_deltadelta;value;delta}let%test_unit"multiple simultaneous events"=[%test_eq:eventlist](run_case"abc"[to_path_int"a..c";to_path_int"b,c"])[to_event(to_path_int"b,c")21;to_event(to_path_int"a..c")22]let%test_unit"reset"=[%test_eq:eventlist](run_case"aaa r ccc"[to_path_int"a..c"])[]let%test_unit"directness"=[%test_eq:eventlist](run_case"cd d dc r c d ced r ced"[to_path_int"c,d"])[to_event(to_path_int"c,d")11;to_event(to_path_int"c,d")144]let%test_unit"repeated"=letp=to_path_int"a,a"in[%test_eq:eventlist](run_case"aaaaa r a"[p])[to_eventp11;to_eventp21;to_eventp31;to_eventp41](* TEST_UNIT "multiple simultaneous events" =
* <:test_eq< event list >>
* (run_case "abc" [ to_path_int "a,c"; to_path_int "b.c" ])
* [ to_event (to_path_int "b.c") 2 1; to_event (to_path_int "a,c") 2 2 ]
*
* TEST_UNIT "reset" =
* <:test_eq< event list >> (run_case "aaa r ccc" [ to_path_int "a,c" ]) []
*
* TEST_UNIT "directness" =
* <:test_eq< event list >>
* (run_case "cd d dc r c d ced r ced" [ to_path_int "c.d" ])
* [ to_event (to_path_int "c.d") 1 1; to_event (to_path_int "c.d") 14 4 ]
*
* TEST_UNIT "repeated" =
* let p = to_path_int "a.a" in
* <:test_eq< event list >>
* (run_case "aaaaa r a" [ p ])
* [ to_event p 1 1; to_event p 2 1; to_event p 3 1; to_event p 4 1 ] *)end)