Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file oBus_connection.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667(*
* oBus_connection.ml
* ------------------
* Copyright : (c) 2008, Jeremie Dimino <jeremie@dimino.org>
* Licence : BSD3
*
* This file is a part of obus, an ocaml implementation of D-Bus.
*)letsection=Lwt_log.Section.make"obus(connection)"openLwt_reactopenLwt.Infix(* +-----------------------------------------------------------------+
| Exceptions |
+-----------------------------------------------------------------+ *)exceptionConnection_closedexceptionConnection_lostexceptionTransport_errorofexnlet()=Printexc.register_printer(function|Connection_closed->Some"D-Bus connection closed"|Connection_lost->Some"D-Bus connection lost"|Transport_errorexn->Some(Printf.sprintf"D-Bus transport failure: %s"(Printexc.to_stringexn))|_->None)(* +-----------------------------------------------------------------+
| Types |
+-----------------------------------------------------------------+ *)moduleSerial_map=Map.Make(structtypet=OBus_message.serialletcompare:int32->int32->int=compareend)moduleInt_map=Map.Make(structtypet=intletcompare:int->int->int=compareend)typefilter=OBus_message.t->OBus_message.toption(* Type of message filters *)(* Connection are wrapped into object in order to make them
comparable. In the code, wrapped connection are simply referred has
"connection" and internal connection details are referred as
"active". *)(* Type of active connections *)typeactive_connection={mutablename:OBus_name.bus;(* The name of the connection in case the endpoint is a message bus,
or [""] if not. *)transport:OBus_transport.t;(* The transport used for messages *)mutableon_disconnect:exn->unitLwt.t;(* [on_disconnect] is called the connection is closed
prematurely. This happen on transport errors. *)guid:OBus_address.guidoption;(* Guid of the connection. It may is [Some guid] if this is the
client-side part of a peer-to-peer connection and the connection
is shared. *)down:(unitLwt.t*unitLwt.u)optionsignal;set_down:(unitLwt.t*unitLwt.u)option->unit;(* Waiting thread used to make the connection to stop dispatching
messages. *)state:[`Up|`Down]signal;abort_recv_wakener:OBus_message.tLwt.u;abort_send_wakener:unitLwt.u;abort_recv_waiter:OBus_message.tLwt.t;abort_send_waiter:unitLwt.t;(* Waiting threads wakeup when the connection is closed or
aborted. It is used to make the dispatcher/writer to exit. *)mutablenext_serial:OBus_message.serial;(* The first available serial, incremented for each message *)mutableoutgoing_mutex:Lwt_mutex.t;(* Mutex used to serialise message sending *)incoming_filters:filterLwt_sequence.t;outgoing_filters:filterLwt_sequence.t;mutablereply_waiters:OBus_message.tLwt.uSerial_map.t;(* Mapping serial -> thread waiting for a reply *)mutabledata:exnInt_map.t;(* Set of locally stored values *)wrapper:t;(* The wrapper containing the connection *)}(* State of a connection *)andconnection_state=|Activeofactive_connection(* The connection is currently active *)|Closed(* The connection has been closed gracefully *)|Killed(* The connection has been killed after an error happened *)(* Connections are packed into objects to make them comparable *)andt=<state:connection_state;(* Get the connection state *)set_state:connection_state->unit;(* Sets the state of the connection *)get:active_connection;(* Returns the connection if it is active, and fail otherwise *)active:boolsignal;(* Signal holding the current connection state. *)>letcompare:t->t->int=Pervasives.compare(* +-----------------------------------------------------------------+
| Guids |
+-----------------------------------------------------------------+ *)(* Mapping from server guid to connection. *)moduleGuid_map=Map.Make(structtypet=OBus_address.guidletcompare=Pervasives.compareend)letguid_connection_map=refGuid_map.empty(* +-----------------------------------------------------------------+
| Filters |
+-----------------------------------------------------------------+ *)(* Apply a list of filter on a message, logging failure *)letapply_filterstypmessagefilters=tryLwt_sequence.fold_l(funfiltermessage->matchmessagewith|Somemessage->filtermessage|None->None)filters(Somemessage)withexn->ignore(Lwt_log.error_f~section~exn"an %s filter failed with"typ);None(* +-----------------------------------------------------------------+
| Connection closing |
+-----------------------------------------------------------------+ *)letcleanupactive~is_crash=beginmatchactive.guidwith|Someguid->guid_connection_map:=Guid_map.removeguid!guid_connection_map|None->()end;(* This make the dispatcher to exit if it is waiting on
[get_message] *)Lwt.wakeup_exnactive.abort_recv_wakenerConnection_closed;beginmatchS.valueactive.downwith|Some(waiter,wakener)->Lwt.wakeup_exnwakenerConnection_closed|None->()end;(* Wakeup all reply handlers so they will not wait forever *)Serial_map.iter(fun_wakener->Lwt.wakeup_exnwakenerConnection_closed)active.reply_waiters;(* If the connection is closed normally, flush it *)let%lwt()=ifnotis_crashthenLwt_mutex.with_lockactive.outgoing_mutexLwt.returnelsebeginLwt.wakeup_exnactive.abort_send_wakenerConnection_closed;Lwt.return()endin(* Shutdown the transport *)try%lwtOBus_transport.shutdownactive.transportwithexn->Lwt_log.error~section~exn"failed to abort/shutdown the transport"letcloseconnection=matchconnection#statewith|Killed|Closed->Lwt.return()|Activeactive->connection#set_stateClosed;cleanupactive~is_crash:falseletkillconnectionexn=matchconnection#statewith|Killed|Closed->Lwt.return()|Activeactive->connection#set_stateKilled;let%lwt()=cleanupactive~is_crash:trueintry%lwtactive.on_disconnectexnwithexn->Lwt_log.error~section~exn"the error handler failed with"(* +-----------------------------------------------------------------+
| Sending messages |
+-----------------------------------------------------------------+ *)(* Send a message, maybe adding a reply waiter and return
[return_thread] *)letsend_message_backendconnectiongen_serialreply_waiter_optmessage=letactive=connection#getinLwt_mutex.with_lockactive.outgoing_mutex(fun()->letsend_it,closed=matchconnection#statewith|Active_->(true,false)|Closed->(* Flush the connection if closed gracefully *)(true,true)|Killed->(false,true)inifsend_itthenbeginletmessage=ifgen_serialthen{messagewithOBus_message.serial=active.next_serial}elsemessageinmatchapply_filters"outgoing"messageactive.outgoing_filterswith|None->let%lwt()=Lwt_log.debug~section"outgoing message dropped by filters"inLwt.fail(Failure"message dropped by filters")|Somemessage->ifnotclosedthenbeginmatchreply_waiter_optwith|Some(waiter,wakener)->active.reply_waiters<-Serial_map.add(OBus_message.serialmessage)wakeneractive.reply_waiters;Lwt.on_cancelwaiter(fun()->matchconnection#statewith|Killed|Closed->()|Activeactive->active.reply_waiters<-Serial_map.remove(OBus_message.serialmessage)active.reply_waiters)|None->()end;try%lwtlet%lwt()=Lwt.choose[active.abort_send_waiter;(* Do not cancel a thread while it is marshaling message: *)Lwt.protected(OBus_transport.sendactive.transportmessage)]in(* Everything went OK, continue with a new serial *)ifgen_serialthenactive.next_serial<-Int32.succactive.next_serial;Lwt.return()with|OBus_wire.Data_error_asexn->(* The message can not be marshaled for some
reason. This is not a fatal error. *)Lwt.failexn|Lwt.Canceled->(* Message sending have been canceled by the
user. This is not a fatal error either. *)Lwt.failLwt.Canceled|exn->(* All other errors are considered as fatal. They
are fatal because it is possible that a
message has been partially sent on the
connection, so the message stream is broken *)let%lwt()=killconnectionexninLwt.failexnendelsematchconnection#statewith|Killed|Closed->Lwt.failConnection_closed|Active_->Lwt.return())letsend_messageconnectionmessage=send_message_backendconnectiontrueNonemessageletsend_message_with_replyconnectionmessage=let(waiter,wakener)asv=Lwt.task()inlet%lwt()=send_message_backendconnectiontrue(Somev)messageinwaiterletsend_message_keep_serialconnectionmessage=send_message_backendconnectionfalseNonemessageletsend_message_keep_serial_with_replyconnectionmessage=let(waiter,wakener)asv=Lwt.task()inlet%lwt()=send_message_backendconnectionfalse(Somev)messageinwaiter(* +-----------------------------------------------------------------+
| Helpers for calling methods |
+-----------------------------------------------------------------+ *)letmethod_call_with_message~connection?destination~path?interface~member~i_args~o_argsargs=leti_msg=OBus_message.method_call?destination~path?interface~member(OBus_value.C.make_sequencei_argsargs)inlet%lwto_msg=send_message_with_replyconnectioni_msginmatcho_msgwith|{OBus_message.typ=OBus_message.Method_return_;body}->begintryLwt.return(o_msg,OBus_value.C.cast_sequenceo_argsbody)withOBus_value.C.Signature_mismatch->Lwt.fail(OBus_message.invalid_replyi_msg(OBus_value.C.type_sequenceo_args)o_msg)end|{OBus_message.typ=OBus_message.Error(_,error_name);OBus_message.body=OBus_value.V.Basic(OBus_value.V.Stringmessage)::_}->Lwt.fail(OBus_error.makeerror_namemessage)|{OBus_message.typ=OBus_message.Error(_,error_name)}->Lwt.fail(OBus_error.makeerror_name"")|_->assertfalseletmethod_call~connection?destination~path?interface~member~i_args~o_argsargs=method_call_with_message~connection?destination~path?interface~member~i_args~o_argsargs>|=sndletmethod_call_no_reply~connection?destination~path?interface~member~i_argsargs=send_messageconnection(OBus_message.method_call~flags:{OBus_message.default_flagswithOBus_message.no_reply_expected=true}?destination~path?interface~member(OBus_value.C.make_sequencei_argsargs))(* +-----------------------------------------------------------------+
| Reading/dispatching |
+-----------------------------------------------------------------+ *)letdispatch_messageactivemessage=letopenOBus_messageinmatchmessagewith(* For method return and errors, we lookup at the reply waiters. If
one is find then it get the reply, if none, then the reply is
dropped. *)|{typ=Method_return(reply_serial)}|{typ=Error(reply_serial,_)}->beginmatchtrySome(Serial_map.findreply_serialactive.reply_waiters)withNot_found->Nonewith|Somew->active.reply_waiters<-Serial_map.removereply_serialactive.reply_waiters;Lwt.wakeupwmessage;Lwt.return()|None->Lwt_log.debug_f~section"reply to message with serial %ld dropped%s"reply_serial(matchmessagewith|{typ=Error(_,error_name)}->Printf.sprintf", the reply is the error: %S: %S"error_name(matchmessage.bodywith|OBus_value.V.Basic(OBus_value.V.Stringx)::_->x|_->"")|_->"")end(* Handling of the special "org.freedesktop.DBus.Peer" interface *)|{typ=Method_call(_,"org.freedesktop.DBus.Peer",member);body;sender;serial}->begintry%lwtlet%lwtbody=matchmember,bodywith|"Ping",[]->Lwt.return[]|"GetMachineId",[]->begintry%lwtlet%lwtuuid=Lazy.forceOBus_info.machine_uuidinLwt.return[OBus_value.V.basic_string(OBus_uuid.to_stringuuid)]withexn->ifOBus_error.nameexn=OBus_error.ocamlthenLwt.fail(OBus_error.Failed(Printf.sprintf"Cannot read the machine uuid file (%s)"OBus_config.machine_uuid_file))elseLwt.failexnend|_->Lwt.fail(OBus_error.Unknown_method(Printf.sprintf"Method %S with signature %S on interface \"org.freedesktop.DBus.Peer\" does not exist"member(OBus_value.string_of_signature(OBus_value.V.type_of_sequencebody))))insend_messageactive.wrapper{flags={no_reply_expected=true;no_auto_start=true};serial=0l;typ=Method_returnserial;destination=sender;sender="";body=body;}withexn->letname,msg=OBus_error.castexninsend_messageactive.wrapper{flags={no_reply_expected=true;no_auto_start=true};serial=0l;typ=Error(serial,name);destination=sender;sender="";body=[OBus_value.V.basic_stringmsg];}end|_->(* Other messages are handled by specifics modules *)Lwt.return()letrecdispatch_foreveractive=let%lwt()=(* Wait for the connection to become up *)matchS.valueactive.downwith|Some(waiter,wakener)->waiter|None->Lwt.return()inlet%lwtmessage=try%lwtLwt.choose[OBus_transport.recvactive.transport;active.abort_recv_waiter]withexn->let%lwt()=killactive.wrapper(Transport_errorexn)inLwt.failexninmatchapply_filters"incoming"messageactive.incoming_filterswith|None->let%lwt()=Lwt_log.debug~section"incoming message dropped by filters"indispatch_foreveractive|Somemessage->(* The internal dispatcher accepts only messages destined to
the current connection: *)ifactive.name=""||OBus_message.destinationmessage=active.namethenignore((try%lwtdispatch_messageactivemessagewithexn->Lwt_log.error~section~exn"message dispatching failed with")[%lwt.finallyOBus_value.V.sequence_close(OBus_message.bodymessage)]);dispatch_foreveractive(* +-----------------------------------------------------------------+
| Connection creation |
+-----------------------------------------------------------------+ *)classconnection()=letactive,set_active=S.createfalseinobject(self)methodactive=activevalmutablestate=Closedmethodstate=statemethodset_statenew_state=state<-new_state;matchstatewith|Closed|Killed->set_activefalse|Active_->set_activetruemethodget=matchstatewith|Closed|Killed->raiseConnection_closed|Activeactive->activeendletof_transport?switch?guid?(up=true)transport=Lwt_switch.checkswitch;letmake()=letabort_recv_waiter,abort_recv_wakener=Lwt.wait()andabort_send_waiter,abort_send_wakener=Lwt.wait()andconnection=newconnection()anddown,set_down=S.create(ifupthenNoneelseSome(Lwt.wait()))inletstate=S.map(functionNone->`Up|Some_->`Down)downinletactive={name="";transport;on_disconnect=(funexn->Lwt.return());guid;down;set_down;state;abort_recv_waiter;abort_send_waiter;abort_recv_wakener=abort_recv_wakener;abort_send_wakener=abort_send_wakener;outgoing_mutex=Lwt_mutex.create();next_serial=1l;incoming_filters=Lwt_sequence.create();outgoing_filters=Lwt_sequence.create();reply_waiters=Serial_map.empty;data=Int_map.empty;wrapper=connection;}inconnection#set_state(Activeactive);(* Start the dispatcher *)ignore(dispatch_foreveractive);Lwt_switch.add_hookswitch(fun()->closeconnection);connectioninmatchguidwith|None->make()|Someguid->matchtrySome(Guid_map.findguid!guid_connection_map)withNot_found->Nonewith|Someconnection->Lwt_switch.add_hookswitch(fun()->closeconnection);connection|None->letconnection=make()inguid_connection_map:=Guid_map.addguidconnection!guid_connection_map;connection(* Capabilities turned on by default: *)letcapabilities=[`Unix_fd]letof_addresses?switch?(shared=true)addresses=Lwt_switch.checkswitch;matchsharedwith|false->let%lwtguid,transport=OBus_transport.of_addresses~capabilitiesaddressesinLwt.return(of_transport?switchtransport)|true->(* Try to find a guid that we already have *)letguids=OBus_util.filter_mapOBus_address.guidaddressesinmatchOBus_util.find_map(funguid->trySome(Guid_map.findguid!guid_connection_map)withNot_found->None)guidswith|Someconnection->Lwt_switch.add_hookswitch(fun()->closeconnection);Lwt.returnconnection|None->(* We ask again a shared connection even if we know that
there is no other connection to a server with the same
guid, because during the authentication another
thread can add a new connection. *)let%lwtguid,transport=OBus_transport.of_addresses~capabilitiesaddressesinLwt.return(of_transport?switch~guidtransport)letloopback()=of_transport(OBus_transport.loopback())(* +-----------------------------------------------------------------+
| Local storage |
+-----------------------------------------------------------------+ *)type'akey={key_id:int;key_make:'a->exn;key_cast:exn->'a;}letnext_key_id=ref0letnew_key(typet)()=letkey_id=!next_key_idinnext_key_id:=key_id+1;letmoduleM=structexceptionEoftendin{key_id=key_id;key_make=(funx->M.Ex);key_cast=(functionM.Ex->x|_->assertfalse);}letgetconnectionkey=letactive=connection#getintryletcell=Int_map.findkey.key_idactive.datainSome(key.key_castcell)withNot_found->Noneletsetconnectionkeyvalue=letactive=connection#getinmatchvaluewith|Somex->active.data<-Int_map.addkey.key_id(key.key_makex)active.data|None->active.data<-Int_map.removekey.key_idactive.data(* +-----------------------------------------------------------------+
| Other |
+-----------------------------------------------------------------+ *)letnameconnection=connection#get.nameletset_nameconnectionname=connection#get.name<-nameletactiveconnection=connection#activeletguidconnection=connection#get.guidlettransportconnection=connection#get.transportletcan_send_basic_typeconnection=function|OBus_value.T.Unix_fd->List.mem`Unix_fd(OBus_transport.capabilitiesconnection#get.transport)|_->trueletreccan_send_single_typeconnection=function|OBus_value.T.Basict->can_send_basic_typeconnectiont|OBus_value.T.Arrayt->can_send_single_typeconnectiont|OBus_value.T.Dict(tk,tv)->can_send_basic_typeconnectiontk&&can_send_single_typeconnectiontv|OBus_value.T.Structuretl->List.for_all(can_send_single_typeconnection)tl|OBus_value.T.Variant->trueletcan_send_sequence_typeconnectiontl=List.for_all(can_send_single_typeconnection)tlletset_on_disconnectconnectionf=matchconnection#statewith|Closed|Killed->()|Activeactive->active.on_disconnect<-fletstateconnection=connection#get.stateletset_upconnection=letactive=connection#getinmatchS.valueactive.downwith|None->()|Some(waiter,wakener)->active.set_downNone;Lwt.wakeupwakener()letset_downconnection=letactive=connection#getinmatchS.valueactive.downwith|Some_->()|None->active.set_down(Some(Lwt.wait()))letincoming_filtersconnection=connection#get.incoming_filtersletoutgoing_filtersconnection=connection#get.outgoing_filters