Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file durable_state_rpc.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212openCore_kernelopenAsync_kernelopenAsync_rpc_kernelmoduleUpdate=structtype('state,'update,'error,'metadata)t=|Attempting_new_connection|Connection_successof'metadata|Lost_connection|Failed_to_connectofError.t|Rpc_errorof'error|Updateof'update|Stateof'stateendtype('state,'update,'error,'metadata,'connection)t={updates_writer:('state,'update,'error,'metadata)Update.tPipe.Writer.t;connection:'connectionDurable.t;resubscribe_delay:Time_ns.Span.t;dispatch:'connection->('state*'updatePipe.Reader.t*'metadata,'error)Result.tOr_error.tDeferred.t}letsubscription_activet=not(Pipe.is_closedt.updates_writer)(* [subscription_active] will only be false if the client closes the reader returned by
[create*] *)letwritetupdate=ifsubscription_activetthenPipe.write_without_pushbackt.updates_writerupdate;;lettry_to_get_fresh_pipet=writetAttempting_new_connection;match%mapDurable.with_t.connection~f:t.dispatchwith|Errorerr->Error(`Failed_to_connecterr)|Okresult->matchresultwith|Errore->Error(`Rpc_errore)|Okresult->Okresult;;letrecsubscribet=ifnot(subscription_activet)thenreturn`Subscription_no_longer_activeelsebeginmatch%bindtry_to_get_fresh_pipetwith|Errorerr->(matcherrwith|`Failed_to_connecte->writet(Failed_to_connecte);|`Rpc_errore->writet(Rpc_errore));let%bind()=Clock_ns.aftert.resubscribe_delayinsubscribet|Ok(state,pipe,id)->writet(Connection_successid);writet(Statestate);return(`Okpipe)end;;letrechandle_update_pipetdeferred_pipe=deferred_pipe>>>function|`Subscription_no_longer_active->()|`Okpipe->(* Pipe.transfer_is determined when [pipe] is closed (as when we lose our connection),
or when [t.updates_writeer] is closed (as when the client closes the reader
returned by [create*] *)Pipe.transferpipet.updates_writer~f:(funupdate->Updateupdate)>>>fun()->writetLost_connection;handle_update_pipet(subscribet);;moduleExpert=structletcreate_internalconnection~dispatch~resubscribe_delay=letupdates_reader,updates_writer=Pipe.create()inletresubscribe_delay=Time_ns.Span.of_sec(Time.Span.to_secresubscribe_delay)inlett={updates_writer;connection;resubscribe_delay;dispatch}inupdates_reader,t;;letcreateconnection~dispatch~resubscribe_delay=letupdates_reader,t=create_internalconnection~dispatch~resubscribe_delayinhandle_update_pipet(subscribet);updates_reader;;letcreate_or_failconnection~dispatch~resubscribe_delay=letupdates_reader,t=create_internalconnection~dispatch~resubscribe_delayinmatch%maptry_to_get_fresh_pipetwith|Error(`Failed_to_connecte)->Errore|Error(`Rpc_errore)->Ok(Errore)|Ok(new_state,fresh_pipe,id)->writet(Connection_successid);writet(Statenew_state);handle_update_pipet(return(`Okfresh_pipe));Ok(Okupdates_reader);;endletcreateconnectionrpc~query~resubscribe_delay=letdispatchconn=Rpc.State_rpc.dispatchrpcconnqueryinExpert.createconnection~dispatch~resubscribe_delay;;letcreate_versioned(typequery)(typestate)(typeupdate)(typeerror)connectionrpc_module~(query:query)~resubscribe_delay=letdispatchconn=letmoduleState_rpc=(valrpc_module:Versioned_rpc.Both_convert.State_rpc.Swithtypecaller_query=queryandtypecaller_state=stateandtypecaller_update=updateandtypecaller_error=error)inState_rpc.dispatch_multiconnqueryinExpert.createconnection~dispatch~resubscribe_delay;;letcreate_versioned'(typequery)(typestate)(typeupdate)(typeerror)connectionrpc_module~(query:query)~resubscribe_delay=letdispatchconn=letmoduleState_rpc=(valrpc_module:Versioned_rpc.Caller_converts.State_rpc.Swithtypequery=queryandtypestate=stateandtypeupdate=updateandtypeerror=error)inState_rpc.dispatch_multiconnqueryinExpert.createconnection~dispatch~resubscribe_delay;;letcreate_or_failconnectionrpc~query~resubscribe_delay=letdispatchconn=Rpc.State_rpc.dispatchrpcconnqueryinExpert.create_or_failconnection~dispatch~resubscribe_delay;;letcreate_or_fail_versioned(typequery)(typestate)(typeupdate)(typeerror)connectionrpc_module~(query:query)~resubscribe_delay=letdispatchconn=letmoduleState_rpc=(valrpc_module:Versioned_rpc.Both_convert.State_rpc.Swithtypecaller_query=queryandtypecaller_state=stateandtypecaller_update=updateandtypecaller_error=error)inState_rpc.dispatch_multiconnqueryinExpert.create_or_failconnection~dispatch~resubscribe_delay;;letcreate_or_fail_versioned'(typequery)(typestate)(typeupdate)(typeerror)connectionrpc_module~(query:query)~resubscribe_delay=letdispatchconn=letmoduleState_rpc=(valrpc_module:Versioned_rpc.Caller_converts.State_rpc.Swithtypequery=queryandtypestate=stateandtypeupdate=updateandtypeerror=error)inState_rpc.dispatch_multiconnqueryinExpert.create_or_failconnection~dispatch~resubscribe_delay;;