Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file bus.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616open!ImportopenStd_internalmoduleState=structtypet=|Closed|Write_in_progress|Ok_to_write[@@derivingsexp_of]letis_closed=function|Closed->true|Write_in_progress->false|Ok_to_write->false;;endmoduleCallback_arity=structtype_t=|Arity1:('a->unit)t|Arity2:('a->'b->unit)t|Arity3:('a->'b->'c->unit)t|Arity4:('a->'b->'c->'d->unit)t[@@derivingsexp_of]endmoduleOn_subscription_after_first_write=structtypet=|Allow|Allow_and_send_last_value|Raise[@@derivingsexp_of]letallow_subscription_after_first_write=function|Allow->true|Allow_and_send_last_value->true|Raise->false;;endmoduleLast_value:sigtype'callbacktvalcreate:'callbackCallback_arity.t->'callbacktvalset1:('a->unit)t->'a->unitvalset2:('a->'b->unit)t->'a->'b->unitvalset3:('a->'b->'c->unit)t->'a->'b->'c->unitvalset4:('a->'b->'c->'d->unit)t->'a->'b->'c->'d->unitvalsend:'callbackt->'callback->unitend=structtype_tuple=|Tuple1:{mutablearg1:'a}->('a->unit)tuple|Tuple2:{mutablearg1:'a;mutablearg2:'b}->('a->'b->unit)tuple|Tuple3:{mutablearg1:'a;mutablearg2:'b;mutablearg3:'c}->('a->'b->'c->unit)tuple|Tuple4:{mutablearg1:'a;mutablearg2:'b;mutablearg3:'c;mutablearg4:'d}->('a->'b->'c->'d->unit)tupletype'callbackt='callbacktupleoptionrefletcreate(typecallback)(_arity:callbackCallback_arity.t):callbackt=refNoneletset1ta=match!twith|None->t:=Some(Tuple1{arg1=a})|Some(Tuple1args)->args.arg1<-a;;letset2tab=match!twith|None->t:=Some(Tuple2{arg1=a;arg2=b})|Some(Tuple2args)->args.arg1<-a;args.arg2<-b;;letset3tabc=match!twith|None->t:=Some(Tuple3{arg1=a;arg2=b;arg3=c})|Some(Tuple3args)->args.arg1<-a;args.arg2<-b;args.arg3<-c;;letset4tabcd=match!twith|None->t:=Some(Tuple4{arg1=a;arg2=b;arg3=c;arg4=d})|Some(Tuple4args)->args.arg1<-a;args.arg2<-b;args.arg3<-c;args.arg4<-d;;letsend(typecallback)(t:callbackt)(callback:callback):unit=match!twith|None->()|Some(Tuple1{arg1})->callbackarg1|Some(Tuple2{arg1;arg2})->callbackarg1arg2|Some(Tuple3{arg1;arg2;arg3})->callbackarg1arg2arg3|Some(Tuple4{arg1;arg2;arg3;arg4})->callbackarg1arg2arg3arg4;;endmoduleSubscriber_id=Unique_id.Int63()moduleSubscriber=structtype'callbackt={id:Subscriber_id.t;callback:'callback;extract_exn:bool;on_callback_raise:(Error.t->unit)option;on_close:(unit->unit)option;subscribed_from:Source_code_position.t}[@@derivingfields]letsexp_of_t_{callback=_;id;extract_exn;on_callback_raise;on_close=_;subscribed_from}:Sexp.t=List[Atom"Bus.Subscriber.t";[%message""~id:(ifam_running_inline_testthenNoneelseSomeid:Subscriber_id.tsexp_option)(on_callback_raise:(Error.t->unit)sexp_option)~extract_exn:(ifextract_exnthenSometrueelseNone:boolsexp_option)(subscribed_from:Source_code_position.t)]];;letinvariantinvariant_at=Invariant.invariant[%here]t[%sexp_of:_t](fun()->letcheckf=Invariant.check_fieldtfinFields.iter~id:ignore~callback:(checkinvariant_a)~extract_exn:ignore~on_callback_raise:ignore~on_close:ignore~subscribed_from:ignore);;letcreatesubscribed_from~callback~extract_exn~on_callback_raise~on_close={id=Subscriber_id.create();callback;extract_exn;on_callback_raise;on_close;subscribed_from};;endtype('callback,'phantom)t={name:Info.toption;callback_arity:'callbackCallback_arity.t;created_from:Source_code_position.t;on_subscription_after_first_write:On_subscription_after_first_write.t;last_value:'callbackLast_value.toption;mutablestate:State.t;mutablewrite_ever_called:bool;mutablesubscribers:'callbackSubscriber.tSubscriber_id.Map.t;mutablecallbacks:'callbackarray;mutablecallback_raised:int->exn->unit;on_callback_raise:Error.t->unit}[@@derivingfields]letsexp_of_t__{callback_arity;callbacks=_;callback_raised=_;created_from;last_value=_;name;on_callback_raise=_;on_subscription_after_first_write;state;subscribers;write_ever_called}=[%message""(name:Info.tsexp_option)(callback_arity:_Callback_arity.t)(created_from:Source_code_position.t)(on_subscription_after_first_write:On_subscription_after_first_write.t)(state:State.t)(write_ever_called:bool)(subscribers:_Subscriber.tSubscriber_id.Map.t)];;type('callback,'phantom)bus=('callback,'phantom)t[@@derivingsexp_of]letread_onlyt=(t:>(_,read)t)letinvariantinvariant_a_t=Invariant.invariant[%here]t[%sexp_of:(_,_)t](fun()->letcheckf=Invariant.check_fieldtfinFields.iter~name:ignore~callbacks:ignore~callback_raised:ignore~callback_arity:ignore~created_from:ignore~on_subscription_after_first_write:ignore~last_value:ignore~state:ignore~write_ever_called:ignore~subscribers:(check(funsubscribers->Map.iterisubscribers~f:(fun~key:_~data:callback->Subscriber.invariantinvariant_acallback)))~on_callback_raise:ignore);;letis_closedt=State.is_closedt.stateletnum_subscriberst=Map.lengtht.subscribersmoduleRead_write=structtype'callbackt=('callback,read_write)bus[@@derivingsexp_of]letinvariantinvariant_at=invariantinvariant_aignoretendmoduleRead_only=structtype'callbackt=('callback,read)bus[@@derivingsexp_of]letinvariantinvariant_at=invariantinvariant_aignoretendlet[@inlinenever]start_write_failingt=matcht.statewith|Closed->failwiths"[Bus.write] called on closed bus"t[%sexp_of:(_,_)t]|Write_in_progress->failwiths"[Bus.write] called from callback on the same bus"t[%sexp_of:(_,_)t]|Ok_to_write->assertfalse;;let[@inlinealways]finish_writet=matcht.statewith|Closed->()|Ok_to_write->assertfalse|Write_in_progress->t.state<-Ok_to_write;;letcloset=matcht.statewith|Closed->()|Ok_to_write|Write_in_progress->t.state<-Closed;Map.itert.subscribers~f:(funsubscriber->Option.itersubscriber.on_close~f:(funon_close->on_close()));t.subscribers<-Subscriber_id.Map.empty;;letwrite_non_optimizedtcallbackscallback_raiseda1=letlen=Array.lengthcallbacksinleti=ref0inwhile!i<lendotryletcallback=Array.unsafe_getcallbacks!iinincri;callbacka1with|exn->callback_raised!iexndone;finish_writet;;letwrite2_non_optimizedtcallbackscallback_raiseda1a2=letlen=Array.lengthcallbacksinleti=ref0inwhile!i<lendotryletcallback=Array.unsafe_getcallbacks!iinincri;callbacka1a2with|exn->callback_raised!iexndone;finish_writet;;letwrite3_non_optimizedtcallbackscallback_raiseda1a2a3=letlen=Array.lengthcallbacksinleti=ref0inwhile!i<lendotryletcallback=Array.unsafe_getcallbacks!iinincri;callbacka1a2a3with|exn->callback_raised!iexndone;finish_writet;;letwrite4_non_optimizedtcallbackscallback_raiseda1a2a3a4=letlen=Array.lengthcallbacksinleti=ref0inwhile!i<lendotryletcallback=Array.unsafe_getcallbacks!iinincri;callbacka1a2a3a4with|exn->callback_raised!iexndone;finish_writet;;letupdate_write(typecallback)(t:(callback,_)t)=(* Computing [callbacks] takes time proportional to the number of callbacks, which we
have decided is OK because we expect subscription/unsubscription to be rare, and the
number of callbacks to be few. We could do constant-time update of the set of
subscribers using a using a custom bag-like thing with a pair of arrays, one of the
callbacks and one of the subscribers. We've decided not to introduce that complexity
until the performance benefit warrants it. *)letsubscribers=t.subscribers|>Map.data|>Array.of_listinletcallbacks=subscribers|>Array.map~f:Subscriber.callbackinletcall_on_callback_raiseerror=tryt.on_callback_raiseerrorwith|exn->closet;raiseexninletcallback_raisediexn=letbacktrace=Backtrace.Exn.most_recent()in(* [i] was incremented before the callback was called, so we have to subtract one
here. We do this here, rather than at the call site, because there are multiple
call sites due to the optimazations needed to keep this zero-alloc. *)letsubscriber=subscribers.(i-1)inleterror=[%message"Bus subscriber raised"(exn:exn)(backtrace:Backtrace.t)(subscriber:_Subscriber.t)]|>[%of_sexp:Error.t]inmatchsubscriber.on_callback_raisewith|None->call_on_callback_raiseerror|Somef->leterror=ifsubscriber.extract_exnthenError.of_exnexnelseerrorin(tryferrorwith|exn->letbacktrace=Backtrace.Exn.most_recent()incall_on_callback_raise(letoriginal_error=errorin[%message"Bus subscriber's [on_callback_raise] raised"(exn:exn)(backtrace:Backtrace.t)(original_error:Error.t)]|>[%of_sexp:Error.t]))int.callbacks<-callbacks;t.callback_raised<-callback_raised;;(* The [write_N] functions are written to minimise registers live across function calls
(these have to be spilled). They are also annotated for partial inlining (the
one-callback case becomes inlined whereas the >1-callback-case requires a further
direct call). *)let[@inlinealways]writeta1=(* Snapshot [callbacks] and [callback_raised] now just in case one of the callbacks
calls [update_write], above. ([callbacks] is mutable but is never mutated; it is
replaced by a fresh array whenever it is changed.) *)letcallbacks=t.callbacksinletcallback_raised=t.callback_raisedinletlen=Array.lengthcallbacksint.write_ever_called<-true;matcht.statewith|Closed|Write_in_progress->start_write_failingt|Ok_to_write->t.state<-Write_in_progress;iflen=1then((try(Array.unsafe_getcallbacks0)a1with|exn->callback_raised1exn);finish_writet)else(write_non_optimized[@inlinednever])tcallbackscallback_raiseda1;(matcht.last_valuewith|None->()|Somelast_value->Last_value.set1last_valuea1);;let[@inlinealways]write2ta1a2=letcallbacks=t.callbacksinletcallback_raised=t.callback_raisedinletlen=Array.lengthcallbacksint.write_ever_called<-true;matcht.statewith|Closed|Write_in_progress->start_write_failingt|Ok_to_write->t.state<-Write_in_progress;iflen=1then((try(Array.unsafe_getcallbacks0)a1a2with|exn->callback_raised1exn);finish_writet)else(write2_non_optimized[@inlinednever])tcallbackscallback_raiseda1a2;(matcht.last_valuewith|None->()|Somelast_value->Last_value.set2last_valuea1a2);;let[@inlinealways]write3ta1a2a3=letcallbacks=t.callbacksinletcallback_raised=t.callback_raisedinletlen=Array.lengthcallbacksint.write_ever_called<-true;matcht.statewith|Closed|Write_in_progress->start_write_failingt|Ok_to_write->t.state<-Write_in_progress;iflen=1then((try(Array.unsafe_getcallbacks0)a1a2a3with|exn->callback_raised1exn);finish_writet)else(write3_non_optimized[@inlinednever])tcallbackscallback_raiseda1a2a3;(matcht.last_valuewith|None->()|Somelast_value->Last_value.set3last_valuea1a2a3);;let[@inlinealways]write4ta1a2a3a4=letcallbacks=t.callbacksinletcallback_raised=t.callback_raisedinletlen=Array.lengthcallbacksint.write_ever_called<-true;matcht.statewith|Closed|Write_in_progress->start_write_failingt|Ok_to_write->t.state<-Write_in_progress;iflen=1then((try(Array.unsafe_getcallbacks0)a1a2a3a4with|exn->callback_raised1exn);finish_writet)else(write4_non_optimized[@inlinednever])tcallbackscallback_raiseda1a2a3a4;(matcht.last_valuewith|None->()|Somelast_value->Last_value.set4last_valuea1a2a3a4);;letallow_subscription_after_first_writet=On_subscription_after_first_write.allow_subscription_after_first_writet.on_subscription_after_first_write;;letcreate?namecreated_fromcallback_arity~(on_subscription_after_first_write:On_subscription_after_first_write.t)~on_callback_raise=letlast_value=matchon_subscription_after_first_writewith|Allow_and_send_last_value->Some(Last_value.createcallback_arity)|Allow->None|Raise->Noneinlett={name;callback_arity;created_from;on_callback_raise;on_subscription_after_first_write;last_value;subscribers=Subscriber_id.Map.empty;callbacks=[||];callback_raised=(fun__->assertfalse);state=Ok_to_write;write_ever_called=false}inupdate_writet;t;;letcan_subscribet=allow_subscription_after_first_writet||nott.write_ever_calledletsubscribe_exn?(extract_exn=false)?on_callback_raise?on_closetsubscribed_from~f=ifnot(can_subscribet)thenfailwiths"Bus.subscribe_exn called after first write"[%sexp~~(subscribed_from:Source_code_position.t),{bus=(t:(_,_)t)}][%sexp_of:Sexp.t];letsubscriber=Subscriber.createsubscribed_from~callback:f~extract_exn~on_callback_raise~on_closeint.subscribers<-Map.sett.subscribers~key:subscriber.id~data:subscriber;update_writet;(matcht.last_valuewith|None->()|Somelast_value->Last_value.sendlast_valuef);subscriber;;letiter_exntsubscribed_from~f=ifnot(can_subscribet)thenfailwiths"Bus.iter_exn called after first write"t[%sexp_of:(_,_)t];ignore(subscribe_exntsubscribed_from~f:_Subscriber.t);;moduleFold_arity=structtype(_,_,_)t=|Arity1:('a->unit,'s->'a->'s,'s)t|Arity2:('a->'b->unit,'s->'a->'b->'s,'s)t|Arity3:('a->'b->'c->unit,'s->'a->'b->'c->'s,'s)t|Arity4:('a->'b->'c->'d->unit,'s->'a->'b->'c->'d->'s,'s)t[@@derivingsexp_of]endletfold_exn(typecfs)(t:(c,_)t)subscribed_from(fold_arity:(c,f,s)Fold_arity.t)~(init:s)~(f:f)=letstate=refinitinifnot(can_subscribet)thenfailwiths"Bus.fold_exn called after first write"t[%sexp_of:(_,_)t];letmoduleA=Fold_arityiniter_exntsubscribed_from~f:(matchfold_aritywith|A.Arity1->funa1->state:=f!statea1|A.Arity2->funa1a2->state:=f!statea1a2|A.Arity3->funa1a2a3->state:=f!statea1a2a3|A.Arity4->funa1a2a3a4->state:=f!statea1a2a3a4);;letunsubscribet(subscription:_Subscriber.t)=t.subscribers<-Map.removet.subscriberssubscription.id;update_writet;;let%test_module_=(modulestructletassert_no_allocationbuscallbackwrite=letbus_r=read_onlybusinignore(subscribe_exnbus_r[%here]~f:callback);letstarting_minor_words=Gc.minor_words()inletstarting_major_words=Gc.major_words()inwrite();letending_minor_words=Gc.minor_words()inletending_major_words=Gc.major_words()in[%test_result:int](ending_minor_words-starting_minor_words)~expect:0;[%test_result:int](ending_major_words-starting_major_words)~expect:0;;(* This test only works when [write] is properly inlined. It does not guarantee that
[write] never allocates in any situation. For example, if this test is moved to
another library and run with X_LIBRARY_INLINING=false, it fails. *)let%test_unit"write doesn't allocate when inlined"=letcreatecreated_fromarity=createcreated_fromarity~on_subscription_after_first_write:Raise~on_callback_raise:Error.raiseinletbus1=create[%here]Arity1inletbus2=create[%here]Arity2inletbus3=create[%here]Arity3inletbus4=create[%here]Arity4inassert_no_allocationbus1(fun()->())(fun()->writebus1());assert_no_allocationbus2(fun()()->())(fun()->write2bus2()());assert_no_allocationbus3(fun()()()->())(fun()->write3bus3()()());assert_no_allocationbus4(fun()()()()->())(fun()->write4bus4()()()());;end);;