Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file netServiceTcp.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216openIplocatoropenEndpointopenLwt.InfixopenAperomoduleNetServiceTcp=structmoduleTcpConfig=structopenLwt_unixtypet={locator:TcpLocator.t;backlog:int;max_connections:int;socket_options:(Lwt_unix.file_descr->unit)list;svc_id:int;buf_size:int}letreuseaddrreuse=funsock->setsockoptsockSO_REUSEADDRreuselettcp_nodelaynodelay=funsock->setsockoptsockTCP_NODELAYnodelayletsndbufsize=funsock->setsockopt_intsockSO_SNDBUFsizeletrcvbufsize=funsock->setsockopt_intsockSO_SNDBUFsizeletmake?(backlog=10)?(max_connections=8192)?(buf_size=65536)?(socket_options=[reuseaddrtrue;tcp_nodelaytrue])?(svc_id=0)locator={locator;backlog;max_connections;socket_options;svc_id;buf_size}letbacklogc=c.backlogletlocatorc=c.locatorletsocket_optionsc=c.socket_optionsletmax_connectiosnc=c.max_connectionsletsvc_idc=c.svc_idletbuf_sizec=c.buf_sizeend(* module type S = NetService.S *)moduleConfig=TcpConfigopenNetServicetype'aio_init=TxSession.t->'aLwt.ttype'aio_service=TxSession.t->'a->'aLwt.ttype'asvc_state=[`Runof'a|`CloseSession|`StopService]typeconfig=Config.tmoduleConnectionMap=Map.Make(Id)type'at={socket:Lwt_unix.file_descr;waiter:unitLwt.t;notifier:unitLwt.u;max_connections:Int64.t;mutableconnections_count:Int64.t;mutableconnections:(Lwt_unix.file_descrConnectionMap.t);config:Config.t;mutableio_svc:'aio_service;mutableio_init:'aio_init}letmtu=Unlimitedletcreate_server_socketconfig=letopenLwt_unixinletsock=socketPF_INETSOCK_STREAM0in(Config.socket_optionsconfig)|>List.iter(funsetopt->setoptsock;);letsaddr=IpEndpoint.to_sockaddr@@TcpLocator.endpoint(Config.locatorconfig)inlet_=bindsocksaddrinlet_=listensock(Config.backlogconfig)insockletregister_connectionsvcsock=matchsvc.connections_countwith|countwhencount<svc.max_connections->letsid=Id.next_id()inletconnections=svc.connectionsinsvc.connections<-(ConnectionMap.addsidsockconnections);svc.connections_count<-(Int64.addcountInt64.one);Lwt.return@@Result.oksid|_->Lwt.return@@Result.fail(`ResourceLimitViolation(`Msg"Too many connections"))letunregister_connectionsvcsid=matchConnectionMap.find_optsidsvc.connectionswith|Somesock->svc.connections<-(ConnectionMap.removesidsvc.connections);svc.connections_count<-(Int64.subsvc.connections_countInt64.one);let%lwt_=Net.safe_closesockinLwt.return@@Result.ok()|None->Lwt.return@@Result.fail@@`InvalidSession(`Msg"Unknown tx-session id")letclose_sessionssvc=svc.connections_count<-0L;letconnections=svc.connectionsinsvc.connections<-ConnectionMap.empty;Lwt.join@@ConnectionMap.fold(fun_sockxs->(Net.safe_closesock)::xs)connections[]letyield_period=42letmake_connection_context(sock:Lwt_unix.file_descr)(svc:'at)(sid:Id.t)(io_init:'aio_init)(io_svc:'aio_service)=let%lwt_=Logs_lwt.debug(funm->m"Serving tx-session with Id: %s"(Id.to_stringsid))inlet(wait_close,notifier)=Lwt.wait()inlet(wait_remote_close,notify_remote_close)=Lwt.wait()inlets:'asvc_state=`CloseSessioninletclose_session()=Lwt.wakeupnotifiers;Lwt.return_unitinletsctx=TxSession.make~close:(close_session)~wait_on_close:(wait_remote_close)~mtu:Unlimitedsidsockinlet%lwtinit=io_initsctxinletserve=fun()->letmio_svc=io_svcsctxinletrecloopcaccu=letcontinue=mio_svcaccu>>=funaccu->(ifc=0thenLwt.pause()elseLwt.return_unit)>>=fun()->Lwt.return(`Runaccu)inLwt.choose[continue;wait_close]>>=function|`Runaccu->loop((c+1)modyield_period)accu|_->Logs_lwt.debug(funm->m"Closing tx-session %s "(Id.to_stringsid))>>=fun_->unregister_connectionsvcsid>>=fun_->Lwt.returnaccuinLwt.catch(fun()->loop1init>>=fun_->Lwt.return_unit)(fune->Logs_lwt.warn(funm->m"Closing tx-session %s because of %s %s"(Id.to_stringsid)(Printexc.to_stringe)(Printexc.get_backtrace()))>>=fun_->Lwt.wakeup_laternotify_remote_closetrue;unregister_connectionsvcsid>>=fun_->Lwt.return_unit)inLwt.return(sctx,serve)letserve_connection(sock:Lwt_unix.file_descr)(svc:'at)(sid:Id.t)(io_init:'aio_init)(io_svc:'aio_service)=let%lwt(_,serve)=make_connection_contextsocksvcsidio_initio_svcinserve()letmakeconfig=letsocket=create_server_socketconfiginlet(waiter,notifier)=Lwt.wait()in{socket;waiter;notifier;max_connections=Int64.of_int(Config.max_connectiosnconfig);connections_count=Int64.zero;connections=(ConnectionMap.empty);config;io_svc=(fun__->raise(Exception(`IOError(`Msg("Uninitialized io service")))));io_init=(fun_->raise(Exception(`IOError(`Msg("Uninitialized io service")))))}letstart(svc:'at)io_initio_svc=let%lwt_=Logs_lwt.debug(funm->m"Starting TcpService with svc-id %d "(Config.svc_idsvc.config))inlet%lwt_=Logs_lwt.info(funm->m"TcpService listening on port %s"(TcpLocator.to_string@@Config.locatorsvc.config))insvc.io_svc<-io_svc;svc.io_init<-io_init;letstop=svc.waiter>|=fun()->`Stopinletrecaccept_connectionsvc=Lwt.try_bind(fun()->let%lwt_=Logs_lwt.debug(funm->m"TcpService ready to accept connection")inletaccept=Lwt_unix.acceptsvc.socket>|=(funv->`Acceptv)inLwt.choose[accept;stop]>|=(function|`Stop->`Stop|`Accept_asa->a))(function|`Accept(sock,_)->(match%lwtregister_connectionsvcsockwith|Oksid->let_=serve_connectionsocksvcsidio_initio_svcinaccept_connectionsvc|Errore->let%lwt_=Logs_lwt.warn(funm->m"%s"@@show_errore)(* @AC: Perhaps we should wait for some connection to be closed instead of going back
waiting for a connection. That would be more robust against DoS attack. *)inaccept_connectionsvc)|`Stop->let%lwt_=Logs_lwt.debug(funm->m"Stopping svc...")inlet%lwt_=Net.safe_closesvc.socketinlet%lwt_=Logs_lwt.debug(funm->m"Closing tx-sessions...")inlet_=close_sessionssvcinLwt.return_unit)(function|Lwt.Canceled->Lwt.return_unit|exn->Logs_lwt.debug(funm->m"Error while accepting tx-session: %s"(Printexc.to_stringexn)))inaccept_connectionsvcletstopsvc=Lwt.return@@Lwt.wakeupsvc.notifier()letsocketsvc=svc.socketletconfigsvc=svc.configletestablish_tcp_sessionsvctcp_locator=letopenLwt_unixinletopenLwt.Infixinletsock=socketPF_INETSOCK_STREAM0inletsaddr=IpEndpoint.to_sockaddr@@TcpLocator.endpointtcp_locatorinletpsid=connectsocksaddr>>=fun()->register_connectionsvcsockinmatch%lwtpsidwith|Oksid->let%lwt(txs,serve)=make_connection_contextsocksvcsidsvc.io_initsvc.io_svcinlet_=serve()inLwt.returntxs|Errore->Lwt.fail@@Exceptioneletestablish_session(svc:'at)locator=matchlocatorwith|Locator.Locator.TcpLocatortcplocator->establish_tcp_sessionsvctcplocator|_->Lwt.fail_with"Invalid Locator"end