Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file bus.ml
open!ImportopenStd_internalmoduleState=structtypet=|Closed|Write_in_progress|Ok_to_write[@@derivingsexp_of]letis_closed=function|Closed->true|Write_in_progress->false|Ok_to_write->false;;endmoduleCallback_arity=structtype_t=|Arity1:('a->unit)t|Arity2:('a->'b->unit)t|Arity3:('a->'b->'c->unit)t|Arity4:('a->'b->'c->'d->unit)t[@@derivingsexp_of]endmoduleOn_subscription_after_first_write=structtypet=|Allow|Allow_and_send_last_value|Raise[@@derivingsexp_of]letallow_subscription_after_first_write=function|Allow->true|Allow_and_send_last_value->true|Raise->false;;endmoduleLast_value:sigtype'callbacktvalcreate:'callbackCallback_arity.t->'callbacktvalset1:('a->unit)t->'a->unitvalset2:('a->'b->unit)t->'a->'b->unitvalset3:('a->'b->'c->unit)t->'a->'b->'c->unitvalset4:('a->'b->'c->'d->unit)t->'a->'b->'c->'d->unitvalsend:'callbackt->'callback->unitend=structtype_tuple=|Tuple1:{mutablearg1:'a}->('a->unit)tuple|Tuple2:{mutablearg1:'a;mutablearg2:'b}->('a->'b->unit)tuple|Tuple3:{mutablearg1:'a;mutablearg2:'b;mutablearg3:'c}->('a->'b->'c->unit)tuple|Tuple4:{mutablearg1:'a;mutablearg2:'b;mutablearg3:'c;mutablearg4:'d}->('a->'b->'c->'d->unit)tupletype'callbackt='callbacktupleoptionrefletcreate(typecallback)(_arity:callbackCallback_arity.t):callbackt=refNoneletset1ta=match!twith|None->t:=Some(Tuple1{arg1=a})|Some(Tuple1args)->args.arg1<-a;;letset2tab=match!twith|None->t:=Some(Tuple2{arg1=a;arg2=b})|Some(Tuple2args)->args.arg1<-a;args.arg2<-b;;letset3tabc=match!twith|None->t:=Some(Tuple3{arg1=a;arg2=b;arg3=c})|Some(Tuple3args)->args.arg1<-a;args.arg2<-b;args.arg3<-c;;letset4tabcd=match!twith|None->t:=Some(Tuple4{arg1=a;arg2=b;arg3=c;arg4=d})|Some(Tuple4args)->args.arg1<-a;args.arg2<-b;args.arg3<-c;args.arg4<-d;;letsend(typecallback)(t:callbackt)(callback:callback):unit=match!twith|None->()|Some(Tuple1{arg1})->callbackarg1|Some(Tuple2{arg1;arg2})->callbackarg1arg2|Some(Tuple3{arg1;arg2;arg3})->callbackarg1arg2arg3|Some(Tuple4{arg1;arg2;arg3;arg4})->callbackarg1arg2arg3arg4;;endmoduleSubscriber_id=Unique_id.Int63()moduleSubscriber=structtype'callbackt={id:Subscriber_id.t;callback:'callback;extract_exn:bool;on_callback_raise:(Error.t->unit)option;on_close:(unit->unit)option;subscribed_from:Source_code_position.t}[@@derivingfields]letsexp_of_t_{callback=_;id;extract_exn;on_callback_raise;on_close=_;subscribed_from}:Sexp.t=List[Atom"Bus.Subscriber.t";[%message""~id:(ifam_running_inline_testthenNoneelseSomeid:Subscriber_id.tsexp_option)(on_callback_raise:(Error.t->unit)sexp_option)~extract_exn:(ifextract_exnthenSometrueelseNone:boolsexp_option)(subscribed_from:Source_code_position.t)]];;letinvariantinvariant_at=Invariant.invariant[%here]t[%sexp_of:_t](fun()->letcheckf=Invariant.check_fieldtfinFields.iter~id:ignore~callback:(checkinvariant_a)~extract_exn:ignore~on_callback_raise:ignore~on_close:ignore~subscribed_from:ignore);;letcreatesubscribed_from~callback~extract_exn~on_callback_raise~on_close={id=Subscriber_id.create();callback;extract_exn;on_callback_raise;on_close;subscribed_from};;endtype('callback,'phantom)t={name:Info.toption;callback_arity:'callbackCallback_arity.t;created_from:Source_code_position.t;on_subscription_after_first_write:On_subscription_after_first_write.t;last_value:'callbackLast_value.toption;mutablestate:State.t;mutablewrite_ever_called:bool;mutablesubscribers:'callbackSubscriber.tSubscriber_id.Map.t;mutablecallbacks:'callbackarray;mutablecallback_raised:int->exn->unit;on_callback_raise:Error.t->unit}[@@derivingfields]letsexp_of_t__{callback_arity;callbacks=_;callback_raised=_;created_from;last_value=_;name;on_callback_raise=_;on_subscription_after_first_write;state;subscribers;write_ever_called}=[%message""(name:Info.tsexp_option)(callback_arity:_Callback_arity.t)(created_from:Source_code_position.t)(on_subscription_after_first_write:On_subscription_after_first_write.t)(state:State.t)(write_ever_called:bool)(subscribers:_Subscriber.tSubscriber_id.Map.t)];;type('callback,'phantom)bus=('callback,'phantom)t[@@derivingsexp_of]letread_onlyt=(t:>(_,read)t)letinvariantinvariant_a_t=Invariant.invariant[%here]t[%sexp_of:(_,_)t](fun()->letcheckf=Invariant.check_fieldtfinFields.iter~name:ignore~callbacks:ignore~callback_raised:ignore~callback_arity:ignore~created_from:ignore~on_subscription_after_first_write:ignore~last_value:ignore~state:ignore~write_ever_called:ignore~subscribers:(check(funsubscribers->Map.iterisubscribers~f:(fun~key:_~data:callback->Subscriber.invariantinvariant_acallback)))~on_callback_raise:ignore);;letis_closedt=State.is_closedt.stateletnum_subscriberst=Map.lengtht.subscribersmoduleRead_write=structtype'callbackt=('callback,read_write)bus[@@derivingsexp_of]letinvariantinvariant_at=invariantinvariant_aignoretendmoduleRead_only=structtype'callbackt=('callback,read)bus[@@derivingsexp_of]letinvariantinvariant_at=invariantinvariant_aignoretendlet[@inlinenever]start_write_failingt=matcht.statewith|Closed->failwiths"[Bus.write] called on closed bus"t[%sexp_of:(_,_)t]|Write_in_progress->failwiths"[Bus.write] called from callback on the same bus"t[%sexp_of:(_,_)t]|Ok_to_write->assertfalse;;let[@inlinealways]finish_writet=matcht.statewith|Closed->()|Ok_to_write->assertfalse|Write_in_progress->t.state<-Ok_to_write;;letcloset=matcht.statewith|Closed->()|Ok_to_write|Write_in_progress->t.state<-Closed;Map.itert.subscribers~f:(funsubscriber->Option.itersubscriber.on_close~f:(funon_close->on_close()));t.subscribers<-Subscriber_id.Map.empty;;letwrite_non_optimizedtcallbackscallback_raiseda1=letlen=Array.lengthcallbacksinleti=ref0inwhile!i<lendotryletcallback=Array.unsafe_getcallbacks!iinincri;callbacka1with|exn->callback_raised!iexndone;finish_writet;;letwrite2_non_optimizedtcallbackscallback_raiseda1a2=letlen=Array.lengthcallbacksinleti=ref0inwhile!i<lendotryletcallback=Array.unsafe_getcallbacks!iinincri;callbacka1a2with|exn->callback_raised!iexndone;finish_writet;;letwrite3_non_optimizedtcallbackscallback_raiseda1a2a3=letlen=Array.lengthcallbacksinleti=ref0inwhile!i<lendotryletcallback=Array.unsafe_getcallbacks!iinincri;callbacka1a2a3with|exn->callback_raised!iexndone;finish_writet;;letwrite4_non_optimizedtcallbackscallback_raiseda1a2a3a4=letlen=Array.lengthcallbacksinleti=ref0inwhile!i<lendotryletcallback=Array.unsafe_getcallbacks!iinincri;callbacka1a2a3a4with|exn->callback_raised!iexndone;finish_writet;;letupdate_write(typecallback)(t:(callback,_)t)=(* Computing [callbacks] takes time proportional to the number of callbacks, which we
have decided is OK because we expect subscription/unsubscription to be rare, and the
number of callbacks to be few. We could do constant-time update of the set of
subscribers using a using a custom bag-like thing with a pair of arrays, one of the
callbacks and one of the subscribers. We've decided not to introduce that complexity
until the performance benefit warrants it. *)letsubscribers=t.subscribers|>Map.data|>Array.of_listinletcallbacks=subscribers|>Array.map~f:Subscriber.callbackinletcall_on_callback_raiseerror=tryt.on_callback_raiseerrorwith|exn->closet;raiseexninletcallback_raisediexn=letbacktrace=Backtrace.Exn.most_recent()in(* [i] was incremented before the callback was called, so we have to subtract one
here. We do this here, rather than at the call site, because there are multiple
call sites due to the optimazations needed to keep this zero-alloc. *)letsubscriber=subscribers.(i-1)inleterror=[%message"Bus subscriber raised"(exn:exn)(backtrace:Backtrace.t)(subscriber:_Subscriber.t)]|>[%of_sexp:Error.t]inmatchsubscriber.on_callback_raisewith|None->call_on_callback_raiseerror|Somef->leterror=ifsubscriber.extract_exnthenError.of_exnexnelseerrorin(tryferrorwith|exn->letbacktrace=Backtrace.Exn.most_recent()incall_on_callback_raise(letoriginal_error=errorin[%message"Bus subscriber's [on_callback_raise] raised"(exn:exn)(backtrace:Backtrace.t)(original_error:Error.t)]|>[%of_sexp:Error.t]))int.callbacks<-callbacks;t.callback_raised<-callback_raised;;(* The [write_N] functions are written to minimise registers live across function calls
(these have to be spilled). They are also annotated for partial inlining (the
one-callback case becomes inlined whereas the >1-callback-case requires a further
direct call). *)let[@inlinealways]writeta1=(* Snapshot [callbacks] and [callback_raised] now just in case one of the callbacks
calls [update_write], above. ([callbacks] is mutable but is never mutated; it is
replaced by a fresh array whenever it is changed.) *)letcallbacks=t.callbacksinletcallback_raised=t.callback_raisedinletlen=Array.lengthcallbacksint.write_ever_called<-true;matcht.statewith|Closed|Write_in_progress->start_write_failingt|Ok_to_write->t.state<-Write_in_progress;iflen=1then((try(Array.unsafe_getcallbacks0)a1with|exn->callback_raised1exn);finish_writet)else(write_non_optimized[@inlinednever])tcallbackscallback_raiseda1;(matcht.last_valuewith|None->()|Somelast_value->Last_value.set1last_valuea1);;let[@inlinealways]write2ta1a2=letcallbacks=t.callbacksinletcallback_raised=t.callback_raisedinletlen=Array.lengthcallbacksint.write_ever_called<-true;matcht.statewith|Closed|Write_in_progress->start_write_failingt|Ok_to_write->t.state<-Write_in_progress;iflen=1then((try(Array.unsafe_getcallbacks0)a1a2with|exn->callback_raised1exn);finish_writet)else(write2_non_optimized[@inlinednever])tcallbackscallback_raiseda1a2;(matcht.last_valuewith|None->()|Somelast_value->Last_value.set2last_valuea1a2);;let[@inlinealways]write3ta1a2a3=letcallbacks=t.callbacksinletcallback_raised=t.callback_raisedinletlen=Array.lengthcallbacksint.write_ever_called<-true;matcht.statewith|Closed|Write_in_progress->start_write_failingt|Ok_to_write->t.state<-Write_in_progress;iflen=1then((try(Array.unsafe_getcallbacks0)a1a2a3with|exn->callback_raised1exn);finish_writet)else(write3_non_optimized[@inlinednever])tcallbackscallback_raiseda1a2a3;(matcht.last_valuewith|None->()|Somelast_value->Last_value.set3last_valuea1a2a3);;let[@inlinealways]write4ta1a2a3a4=letcallbacks=t.callbacksinletcallback_raised=t.callback_raisedinletlen=Array.lengthcallbacksint.write_ever_called<-true;matcht.statewith|Closed|Write_in_progress->start_write_failingt|Ok_to_write->t.state<-Write_in_progress;iflen=1then((try(Array.unsafe_getcallbacks0)a1a2a3a4with|exn->callback_raised1exn);finish_writet)else(write4_non_optimized[@inlinednever])tcallbackscallback_raiseda1a2a3a4;(matcht.last_valuewith|None->()|Somelast_value->Last_value.set4last_valuea1a2a3a4);;letallow_subscription_after_first_writet=On_subscription_after_first_write.allow_subscription_after_first_writet.on_subscription_after_first_write;;letcreate?namecreated_fromcallback_arity~(on_subscription_after_first_write:On_subscription_after_first_write.t)~on_callback_raise=letlast_value=matchon_subscription_after_first_writewith|Allow_and_send_last_value->Some(Last_value.createcallback_arity)|Allow->None|Raise->Noneinlett={name;callback_arity;created_from;on_callback_raise;on_subscription_after_first_write;last_value;subscribers=Subscriber_id.Map.empty;callbacks=[||];callback_raised=(fun__->assertfalse);state=Ok_to_write;write_ever_called=false}inupdate_writet;t;;letcan_subscribet=allow_subscription_after_first_writet||nott.write_ever_calledletsubscribe_exn?(extract_exn=false)?on_callback_raise?on_closetsubscribed_from~f=ifnot(can_subscribet)thenfailwiths"Bus.subscribe_exn called after first write"[%sexp~~(subscribed_from:Source_code_position.t),{bus=(t:(_,_)t)}][%sexp_of:Sexp.t];letsubscriber=Subscriber.createsubscribed_from~callback:f~extract_exn~on_callback_raise~on_closeint.subscribers<-Map.sett.subscribers~key:subscriber.id~data:subscriber;update_writet;(matcht.last_valuewith|None->()|Somelast_value->Last_value.sendlast_valuef);subscriber;;letiter_exntsubscribed_from~f=ifnot(can_subscribet)thenfailwiths"Bus.iter_exn called after first write"t[%sexp_of:(_,_)t];ignore(subscribe_exntsubscribed_from~f:_Subscriber.t);;moduleFold_arity=structtype(_,_,_)t=|Arity1:('a->unit,'s->'a->'s,'s)t|Arity2:('a->'b->unit,'s->'a->'b->'s,'s)t|Arity3:('a->'b->'c->unit,'s->'a->'b->'c->'s,'s)t|Arity4:('a->'b->'c->'d->unit,'s->'a->'b->'c->'d->'s,'s)t[@@derivingsexp_of]endletfold_exn(typecfs)(t:(c,_)t)subscribed_from(fold_arity:(c,f,s)Fold_arity.t)~(init:s)~(f:f)=letstate=refinitinifnot(can_subscribet)thenfailwiths"Bus.fold_exn called after first write"t[%sexp_of:(_,_)t];letmoduleA=Fold_arityiniter_exntsubscribed_from~f:(matchfold_aritywith|A.Arity1->funa1->state:=f!statea1|A.Arity2->funa1a2->state:=f!statea1a2|A.Arity3->funa1a2a3->state:=f!statea1a2a3|A.Arity4->funa1a2a3a4->state:=f!statea1a2a3a4);;letunsubscribet(subscription:_Subscriber.t)=t.subscribers<-Map.removet.subscriberssubscription.id;update_writet;;let%test_module_=(modulestructletassert_no_allocationbuscallbackwrite=letbus_r=read_onlybusinignore(subscribe_exnbus_r[%here]~f:callback);letstarting_minor_words=Gc.minor_words()inletstarting_major_words=Gc.major_words()inwrite();letending_minor_words=Gc.minor_words()inletending_major_words=Gc.major_words()in[%test_result:int](ending_minor_words-starting_minor_words)~expect:0;[%test_result:int](ending_major_words-starting_major_words)~expect:0;;(* This test only works when [write] is properly inlined. It does not guarantee that
[write] never allocates in any situation. For example, if this test is moved to
another library and run with X_LIBRARY_INLINING=false, it fails. *)let%test_unit"write doesn't allocate when inlined"=letcreatecreated_fromarity=createcreated_fromarity~on_subscription_after_first_write:Raise~on_callback_raise:Error.raiseinletbus1=create[%here]Arity1inletbus2=create[%here]Arity2inletbus3=create[%here]Arity3inletbus4=create[%here]Arity4inassert_no_allocationbus1(fun()->())(fun()->writebus1());assert_no_allocationbus2(fun()()->())(fun()->write2bus2()());assert_no_allocationbus3(fun()()()->())(fun()->write3bus3()()());assert_no_allocationbus4(fun()()()()->())(fun()->write4bus4()()()());;end);;