Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file bus.ml
open!Core_kernelmoduleState=structtypet=|Closed|Write_in_progress|Ok_to_write[@@derivingsexp_of]letis_closed=function|Closed->true|Write_in_progress->false|Ok_to_write->false;;endmoduleCallback_arity=structtype_t=|Arity1:('a->unit)t|Arity2:('a->'b->unit)t|Arity3:('a->'b->'c->unit)t|Arity4:('a->'b->'c->'d->unit)t|Arity5:('a->'b->'c->'d->'e->unit)t[@@derivingsexp_of]endmoduleOn_subscription_after_first_write=structtypet=|Allow|Allow_and_send_last_value|Raise[@@derivingsexp_of]letallow_subscription_after_first_write=function|Allow->true|Allow_and_send_last_value->true|Raise->false;;endmoduleLast_value:sigtype'callbacktvalcreate:'callbackCallback_arity.t->'callbacktvalset1:('a->unit)t->'a->unitvalset2:('a->'b->unit)t->'a->'b->unitvalset3:('a->'b->'c->unit)t->'a->'b->'c->unitvalset4:('a->'b->'c->'d->unit)t->'a->'b->'c->'d->unitvalset5:('a->'b->'c->'d->'e->unit)t->'a->'b->'c->'d->'e->unitvalsend:'callbackt->'callback->unitend=structtype_tuple=|Tuple1:{mutablearg1:'a}->('a->unit)tuple|Tuple2:{mutablearg1:'a;mutablearg2:'b}->('a->'b->unit)tuple|Tuple3:{mutablearg1:'a;mutablearg2:'b;mutablearg3:'c}->('a->'b->'c->unit)tuple|Tuple4:{mutablearg1:'a;mutablearg2:'b;mutablearg3:'c;mutablearg4:'d}->('a->'b->'c->'d->unit)tuple|Tuple5:{mutablearg1:'a;mutablearg2:'b;mutablearg3:'c;mutablearg4:'d;mutablearg5:'e}->('a->'b->'c->'d->'e->unit)tupletype'callbackt='callbacktupleoptionrefletcreate(typecallback)(_arity:callbackCallback_arity.t):callbackt=refNoneletset1ta=match!twith|None->t:=Some(Tuple1{arg1=a})|Some(Tuple1args)->args.arg1<-a;;letset2tab=match!twith|None->t:=Some(Tuple2{arg1=a;arg2=b})|Some(Tuple2args)->args.arg1<-a;args.arg2<-b;;letset3tabc=match!twith|None->t:=Some(Tuple3{arg1=a;arg2=b;arg3=c})|Some(Tuple3args)->args.arg1<-a;args.arg2<-b;args.arg3<-c;;letset4tabcd=match!twith|None->t:=Some(Tuple4{arg1=a;arg2=b;arg3=c;arg4=d})|Some(Tuple4args)->args.arg1<-a;args.arg2<-b;args.arg3<-c;args.arg4<-d;;letset5targ1arg2arg3arg4arg5=match!twith|None->t:=Some(Tuple5{arg1;arg2;arg3;arg4;arg5})|Some(Tuple5args)->args.arg1<-arg1;args.arg2<-arg2;args.arg3<-arg3;args.arg4<-arg4;args.arg5<-arg5;;letsend(typecallback)(t:callbackt)(callback:callback):unit=match!twith|None->()|Some(Tuple1{arg1})->callbackarg1|Some(Tuple2{arg1;arg2})->callbackarg1arg2|Some(Tuple3{arg1;arg2;arg3})->callbackarg1arg2arg3|Some(Tuple4{arg1;arg2;arg3;arg4})->callbackarg1arg2arg3arg4|Some(Tuple5{arg1;arg2;arg3;arg4;arg5})->callbackarg1arg2arg3arg4arg5;;endmoduleBus_id=Unique_id.Int63()moduleSubscriber=structtype'callbackt={bus_id:Bus_id.t;callback:'callback;extract_exn:bool;(* [subscribers_index] is the index of this subscriber in the bus's [subscribers]
array. [-1] indicates that this subscriber is not subscribed. *)mutablesubscribers_index:int;on_callback_raise:(Error.t->unit)option;on_close:(unit->unit)option;subscribed_from:Source_code_position.t}[@@derivingfields]letis_subscribedt~to_=t.subscribers_index>=0&&Bus_id.equalt.bus_idto_letsexp_of_t_{callback=_;bus_id=_;extract_exn;subscribers_index;on_callback_raise;on_close=_;subscribed_from}:Sexp.t=List[Atom"Bus.Subscriber.t";[%message""~subscribers_index:(ifam_running_inline_testthenNoneelseSomesubscribers_index:(intoption[@sexp.option]))(on_callback_raise:((Error.t->unit)option[@sexp.option]))~extract_exn:(ifextract_exnthenSometrueelseNone:(booloption[@sexp.option]))(subscribed_from:Source_code_position.t)]];;letinvariantinvariant_at=Invariant.invariant[%here]t[%sexp_of:_t](fun()->letcheckf=Invariant.check_fieldtfinFields.iter~bus_id:ignore~callback:(checkinvariant_a)~extract_exn:ignore~subscribers_index:ignore~on_callback_raise:ignore~on_close:ignore~subscribed_from:ignore);;letcreatesubscribed_from~callback~bus_id~extract_exn~subscribers_index~on_callback_raise~on_close={bus_id;callback;extract_exn;subscribers_index;on_callback_raise;on_close;subscribed_from};;endtype('callback,'phantom)t={bus_id:Bus_id.t;name:Info.toption;callback_arity:'callbackCallback_arity.t;created_from:Source_code_position.t;on_subscription_after_first_write:On_subscription_after_first_write.t;on_callback_raise:Error.t->unit;last_value:'callbackLast_value.toption;mutablestate:State.t;mutablewrite_ever_called:bool;mutablenum_subscribers:int;(* [subscribers] contains all subscribers to the bus, in a contiguous prefix from
index [0] to [num_subscribers - 1]. *)mutablesubscribers:'callbackSubscriber.tOption_array.t;(* [callbacks] holds the callbacks of the corresponding entries of [subscribers]. *)mutablecallbacks:'callbackOption_array.t;mutableunsubscribes_during_write:'callbackSubscriber.tlist}[@@derivingfields]letsexp_of_t__{bus_id=_;callback_arity;callbacks=_;created_from;last_value=_;name;num_subscribers;on_subscription_after_first_write;on_callback_raise=_;state;subscribers;write_ever_called;unsubscribes_during_write=_}=letsubscribers=Array.initnum_subscribers~f:(funi->Option_array.get_some_exnsubscribersi)in[%message""(name:(Info.toption[@sexp.option]))(callback_arity:_Callback_arity.t)(created_from:Source_code_position.t)(on_subscription_after_first_write:On_subscription_after_first_write.t)(state:State.t)(write_ever_called:bool)(subscribers:_Subscriber.tArray.t)];;type('callback,'phantom)bus=('callback,'phantom)t[@@derivingsexp_of]letread_onlyt=(t:>(_,read)t)letinvariantinvariant_a_t=Invariant.invariant[%here]t[%sexp_of:(_,_)t](fun()->letcheckf=Invariant.check_fieldtfinFields.iter~bus_id:ignore~name:ignore~callbacks:(check(funcallbacks->assert(Option_array.lengthcallbacks=Option_array.lengtht.subscribers);fori=0toOption_array.lengthcallbacks-1doifi<t.num_subscriberstheninvariant_a(Option_array.get_some_exncallbacksi)elseassert(Option_array.is_nonecallbacksi)done))~callback_arity:ignore~created_from:ignore~num_subscribers:(check(funnum_subscribers->assert(num_subscribers>=0)))~on_subscription_after_first_write:ignore~on_callback_raise:ignore~last_value:ignore~state:ignore~write_ever_called:ignore~subscribers:(check(funsubscribers->fori=0toOption_array.lengthsubscribers-1doifi<t.num_subscribersthen(letsubscriber=Option_array.get_some_exnsubscribersiinSubscriber.invariantinvariant_asubscriber;assert(i=subscriber.subscribers_index))elseassert(Option_array.is_nonesubscribersi)done))~unsubscribes_during_write:ignore);;letis_closedt=State.is_closedt.statemoduleRead_write=structtype'callbackt=('callback,read_write)bus[@@derivingsexp_of]letinvariantinvariant_at=invariantinvariant_aignoretendmoduleRead_only=structtype'callbackt=('callback,read)bus[@@derivingsexp_of]letinvariantinvariant_at=invariantinvariant_aignoretendlet[@cold]start_write_failingt=matcht.statewith|Closed->failwiths~here:[%here]"[Bus.write] called on closed bus"t[%sexp_of:(_,_)t]|Write_in_progress->failwiths~here:[%here]"[Bus.write] called from callback on the same bus"t[%sexp_of:(_,_)t]|Ok_to_write->assertfalse;;letcapacityt=Option_array.lengtht.subscribersletmaybe_shrink_capacityt=ift.num_subscribers*4<=capacitytthen(letdesired_capacity=t.num_subscribersinletcopy_and_shrinkarray=letnew_array=Option_array.create~len:desired_capacityinOption_array.blit~src:array~src_pos:0~dst:new_array~dst_pos:0~len:t.num_subscribers;new_arrayint.subscribers<-copy_and_shrinkt.subscribers;t.callbacks<-copy_and_shrinkt.callbacks);;letadd_subscribert(subscriber:_Subscriber.t)~at_subscribers_index=subscriber.subscribers_index<-at_subscribers_index;Option_array.set_somet.subscribersat_subscribers_indexsubscriber;Option_array.set_somet.callbacksat_subscribers_indexsubscriber.callback;;letremove_subscribert(subscriber:_Subscriber.t)=letsubscribers_index=subscriber.subscribers_indexinsubscriber.subscribers_index<--1;Option_array.set_nonet.subscriberssubscribers_index;Option_array.set_nonet.callbackssubscribers_index;;letunsubscribe_assuming_valid_subscribert(subscriber:_Subscriber.t)=letsubscriber_index=subscriber.subscribers_indexinletlast_subscriber_index=t.num_subscribers-1inremove_subscribertsubscriber;ifsubscriber_index<last_subscriber_indexthen(letlast_subscriber=Option_array.get_some_exnt.subscriberslast_subscriber_indexinremove_subscribertlast_subscriber;add_subscribertlast_subscriber~at_subscribers_index:subscriber_index);t.num_subscribers<-t.num_subscribers-1;maybe_shrink_capacityt;;letunsubscribetsubscriber=ifSubscriber.is_subscribedsubscriber~to_:t.bus_idthen(matcht.statewith|Write_in_progress->t.unsubscribes_during_write<-subscriber::t.unsubscribes_during_write|Closed->(* This can happen if during [write], [unsubscribe] is called after [close]. We
don't do anything here because all subscribers will be unsubscribed after the
[write] finishes. *)()|Ok_to_write->unsubscribe_assuming_valid_subscribertsubscriber);;let[@cold]unsubscribe_after_finish_writet=List.itert.unsubscribes_during_write~f:(unsubscribe_assuming_valid_subscribert);t.unsubscribes_during_write<-[];;let[@cold]unsubscribe_allt=assert(is_closedt);fori=0tot.num_subscribers-1doletsubscriber=Option_array.get_some_exnt.subscribersiinOption.itersubscriber.on_close~f:(funon_close->on_close());remove_subscribertsubscriberdone;t.num_subscribers<-0;maybe_shrink_capacityt;;let[@inlinealways]finish_writet=ifnot(List.is_emptyt.unsubscribes_during_write)thenunsubscribe_after_finish_writet;matcht.statewith|Closed->unsubscribe_allt|Ok_to_write->assertfalse|Write_in_progress->t.state<-Ok_to_write;;let[@cold]closet=matcht.statewith|Closed->()|Write_in_progress->t.state<-Closed|Ok_to_write->t.state<-Closed;unsubscribe_allt;;letcall_on_callback_raiseterror=tryt.on_callback_raiseerrorwith|exn->closet;raiseexn;;letcallback_raisedtiexn=letbacktrace=Backtrace.Exn.most_recent()in(* [i] was incremented before the callback was called, so we have to subtract one
here. We do this here, rather than at the call site, because there are multiple
call sites due to the optimizations needed to keep this zero-alloc. *)letsubscriber=Option_array.get_some_exnt.subscribers(i-1)inleterror=[%message"Bus subscriber raised"(exn:exn)(backtrace:Backtrace.t)(subscriber:_Subscriber.t)]|>[%of_sexp:Error.t]inmatchsubscriber.on_callback_raisewith|None->call_on_callback_raiseterror|Somef->leterror=ifsubscriber.extract_exnthenError.of_exnexnelseerrorin(tryferrorwith|exn->letbacktrace=Backtrace.Exn.most_recent()incall_on_callback_raiset(letoriginal_error=errorin[%message"Bus subscriber's [on_callback_raise] raised"(exn:exn)(backtrace:Backtrace.t)(original_error:Error.t)]|>[%of_sexp:Error.t]));;let[@inlinealways]unsafe_get_callbackai=(* We considered using [Option_array.get_some_exn] and
[Option_array.unsafe_get_some_exn] here, but both are significantly slower. Check
the write benchmarks in [bench_bus.ml] before changing this. *)Option_array.unsafe_get_some_assuming_someai;;letwrite_non_optimizedtcallbacksa1=letlen=t.num_subscribersinleti=ref0inwhile!i<lendotryletcallback=unsafe_get_callbackcallbacks!iinincri;callbacka1with|exn->callback_raisedt!iexndone;finish_writet;;letwrite2_non_optimizedtcallbacksa1a2=letlen=t.num_subscribersinleti=ref0inwhile!i<lendotryletcallback=unsafe_get_callbackcallbacks!iinincri;callbacka1a2with|exn->callback_raisedt!iexndone;finish_writet;;letwrite3_non_optimizedtcallbacksa1a2a3=letlen=t.num_subscribersinleti=ref0inwhile!i<lendotryletcallback=unsafe_get_callbackcallbacks!iinincri;callbacka1a2a3with|exn->callback_raisedt!iexndone;finish_writet;;letwrite4_non_optimizedtcallbacksa1a2a3a4=letlen=t.num_subscribersinleti=ref0inwhile!i<lendotryletcallback=unsafe_get_callbackcallbacks!iinincri;callbacka1a2a3a4with|exn->callback_raisedt!iexndone;finish_writet;;letwrite5_non_optimizedtcallbacksa1a2a3a4a5=letlen=t.num_subscribersinleti=ref0inwhile!i<lendotryletcallback=unsafe_get_callbackcallbacks!iinincri;callbacka1a2a3a4a5with|exn->callback_raisedt!iexndone;finish_writet;;(* The [write_N] functions are written to minimise registers live across function calls
(these have to be spilled). They are also annotated for partial inlining (the
one-callback case becomes inlined whereas the >1-callback-case requires a further
direct call). *)let[@inlinealways]writeta1=letcallbacks=t.callbacksint.write_ever_called<-true;matcht.statewith|Closed|Write_in_progress->start_write_failingt|Ok_to_write->t.state<-Write_in_progress;ift.num_subscribers=1then((try(unsafe_get_callbackcallbacks0)a1with|exn->callback_raisedt1exn);finish_writet)else(write_non_optimized[@inlinednever])tcallbacksa1;(matcht.last_valuewith|None->()|Somelast_value->Last_value.set1last_valuea1);;let[@inlinealways]write2ta1a2=letcallbacks=t.callbacksint.write_ever_called<-true;matcht.statewith|Closed|Write_in_progress->start_write_failingt|Ok_to_write->t.state<-Write_in_progress;ift.num_subscribers=1then((try(unsafe_get_callbackcallbacks0)a1a2with|exn->callback_raisedt1exn);finish_writet)else(write2_non_optimized[@inlinednever])tcallbacksa1a2;(matcht.last_valuewith|None->()|Somelast_value->Last_value.set2last_valuea1a2);;let[@inlinealways]write3ta1a2a3=letcallbacks=t.callbacksint.write_ever_called<-true;matcht.statewith|Closed|Write_in_progress->start_write_failingt|Ok_to_write->t.state<-Write_in_progress;ift.num_subscribers=1then((try(unsafe_get_callbackcallbacks0)a1a2a3with|exn->callback_raisedt1exn);finish_writet)else(write3_non_optimized[@inlinednever])tcallbacksa1a2a3;(matcht.last_valuewith|None->()|Somelast_value->Last_value.set3last_valuea1a2a3);;let[@inlinealways]write4ta1a2a3a4=letcallbacks=t.callbacksint.write_ever_called<-true;matcht.statewith|Closed|Write_in_progress->start_write_failingt|Ok_to_write->t.state<-Write_in_progress;ift.num_subscribers=1then((try(unsafe_get_callbackcallbacks0)a1a2a3a4with|exn->callback_raisedt1exn);finish_writet)else(write4_non_optimized[@inlinednever])tcallbacksa1a2a3a4;(matcht.last_valuewith|None->()|Somelast_value->Last_value.set4last_valuea1a2a3a4);;let[@inlinealways]write5ta1a2a3a4a5=letcallbacks=t.callbacksint.write_ever_called<-true;matcht.statewith|Closed|Write_in_progress->start_write_failingt|Ok_to_write->t.state<-Write_in_progress;ift.num_subscribers=1then((try(unsafe_get_callbackcallbacks0)a1a2a3a4a5with|exn->callback_raisedt1exn);finish_writet)else(write5_non_optimized[@inlinednever])tcallbacksa1a2a3a4a5;(matcht.last_valuewith|None->()|Somelast_value->Last_value.set5last_valuea1a2a3a4a5);;letallow_subscription_after_first_writet=On_subscription_after_first_write.allow_subscription_after_first_writet.on_subscription_after_first_write;;letcreate?namecreated_fromcallback_arity~(on_subscription_after_first_write:On_subscription_after_first_write.t)~on_callback_raise=letlast_value=matchon_subscription_after_first_writewith|Allow_and_send_last_value->Some(Last_value.createcallback_arity)|Allow->None|Raise->Nonein{bus_id=Bus_id.create();name;callback_arity;created_from;num_subscribers=0;on_subscription_after_first_write;on_callback_raise;last_value;subscribers=Option_array.create~len:0;callbacks=Option_array.create~len:0;state=Ok_to_write;write_ever_called=false;unsubscribes_during_write=[]};;letcan_subscribet=allow_subscription_after_first_writet||nott.write_ever_calledletenlarge_capacityt=letcapacity=capacitytinletnew_capacity=Int.max1(capacity*2)inletcopy_and_doublearray=letnew_array=Option_array.create~len:new_capacityinOption_array.blit~src:array~src_pos:0~dst:new_array~dst_pos:0~len:capacity;new_arrayint.subscribers<-copy_and_doublet.subscribers;t.callbacks<-copy_and_doublet.callbacks;;letsubscribe_exn?(extract_exn=false)?on_callback_raise?on_closetsubscribed_from~f:callback=ifnot(can_subscribet)thenfailwiths~here:[%here]"Bus.subscribe_exn called after first write"[%sexp~~(subscribed_from:Source_code_position.t),{bus=(t:(_,_)t)}][%sexp_of:Sexp.t];matcht.statewith|Closed->(* Anything that satisfies the return type will do. Since the subscriber is never
stored in the arrays, the [on_close] callback will never be called. *)Subscriber.createsubscribed_from~bus_id:t.bus_id~callback~extract_exn~subscribers_index:(-1)~on_callback_raise~on_close|Ok_to_write|Write_in_progress->(* The code below side effects [t], which potentially could interfere with a write in
progress. However, the side effects don't change the prefix of [t.callbacks] that
write uses; they only change [t.callbacks] beyond that prefix. And all writes
extract [t.num_subscribers] at the start, so that they will not see any subsequent
changes to it. *)letsubscriber=Subscriber.createsubscribed_from~bus_id:t.bus_id~callback~extract_exn~subscribers_index:t.num_subscribers~on_callback_raise~on_closeinifcapacityt=t.num_subscribersthenenlarge_capacityt;add_subscribertsubscriber~at_subscribers_index:t.num_subscribers;t.num_subscribers<-t.num_subscribers+1;(matcht.last_valuewith|None->()|Somelast_value->Last_value.sendlast_valuecallback);subscriber;;letiter_exntsubscribed_from~f=ifnot(can_subscribet)thenfailwiths~here:[%here]"Bus.iter_exn called after first write"t[%sexp_of:(_,_)t];ignore(subscribe_exntsubscribed_from~f:_Subscriber.t);;moduleFold_arity=structtype(_,_,_)t=|Arity1:('a->unit,'s->'a->'s,'s)t|Arity2:('a->'b->unit,'s->'a->'b->'s,'s)t|Arity3:('a->'b->'c->unit,'s->'a->'b->'c->'s,'s)t|Arity4:('a->'b->'c->'d->unit,'s->'a->'b->'c->'d->'s,'s)t|Arity5:('a->'b->'c->'d->'e->unit,'s->'a->'b->'c->'d->'e->'s,'s)t[@@derivingsexp_of]endletfold_exn(typecfs)(t:(c,_)t)subscribed_from(fold_arity:(c,f,s)Fold_arity.t)~(init:s)~(f:f)=letstate=refinitinifnot(can_subscribet)thenfailwiths~here:[%here]"Bus.fold_exn called after first write"t[%sexp_of:(_,_)t];letmoduleA=Fold_arityiniter_exntsubscribed_from~f:(matchfold_aritywith|Arity1->funa1->state:=f!statea1|Arity2->funa1a2->state:=f!statea1a2|Arity3->funa1a2a3->state:=f!statea1a2a3|Arity4->funa1a2a3a4->state:=f!statea1a2a3a4|Arity5->funa1a2a3a4a5->state:=f!statea1a2a3a4a5);;let%test_module_=(modulestructletassert_no_allocationbuscallbackwrite=letbus_r=read_onlybusinignore(subscribe_exnbus_r[%here]~f:callback:_Subscriber.t);letstarting_minor_words=Gc.minor_words()inletstarting_major_words=Gc.major_words()inwrite();letending_minor_words=Gc.minor_words()inletending_major_words=Gc.major_words()in[%test_result:int](ending_minor_words-starting_minor_words)~expect:0;[%test_result:int](ending_major_words-starting_major_words)~expect:0;;(* This test only works when [write] is properly inlined. It does not guarantee that
[write] never allocates in any situation. For example, if this test is moved to
another library and run with X_LIBRARY_INLINING=false, it fails. *)let%test_unit"write doesn't allocate when inlined"=letcreatecreated_fromarity=createcreated_fromarity~on_subscription_after_first_write:Raise~on_callback_raise:Error.raiseinletbus1=create[%here]Arity1inletbus2=create[%here]Arity2inletbus3=create[%here]Arity3inletbus4=create[%here]Arity4inletbus5=create[%here]Arity5inassert_no_allocationbus1(fun()->())(fun()->writebus1());assert_no_allocationbus2(fun()()->())(fun()->write2bus2()());assert_no_allocationbus3(fun()()()->())(fun()->write3bus3()()());assert_no_allocationbus4(fun()()()()->())(fun()->write4bus4()()()());assert_no_allocationbus5(fun()()()()()->())(fun()->write5bus5()()()()());;end);;