Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file websocket_async.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291(*
* Copyright (c) 2012-2016 Vincent Bernardoff <vb@luminar.eu.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*
*)includeWebsocketopenCoreopenAsyncopenCohttpmoduleAsync_IO=IO(Cohttp_async.Io)openAsync_IOmoduleRequest_async=Request.Make(Cohttp_async.Io)moduleResponse_async=Response.Make(Cohttp_async.Io)letset_tcp_nodelaywriter=letsocket=Socket.of_fd(Writer.fdwriter)Socket.Type.tcpinSocket.setoptsocketSocket.Opt.nodelaytrueletclient?log?(name="websocket.client")?(extra_headers=Header.init())?(random_string=Rng.init())?initialized~app_to_ws~ws_to_app~net_to_ws~ws_to_neturi=letdrain_handshakerw=letnonce=random_string16|>B64.encode~pad:trueinletheaders=Header.add_listextra_headers["Upgrade","websocket";"Connection","Upgrade";"Sec-WebSocket-Key",nonce;"Sec-WebSocket-Version","13"]inletreq=Request.make~headersuriinOption.iterlog~f:(funlog->Log.debuglog"%s"Sexp.(to_string_humRequest.(sexp_of_treq)));Request_async.write(fun_->Deferred.unit)reqw>>=fun()->Response_async.readr>>=function|`Eof->raiseEnd_of_file|`Invalids->failwiths|`Okresponse->Option.iterlog~f:(funlog->Log.debuglog"%s"Sexp.(to_string_humResponse.(sexp_of_tresponse)));letstatus=Response.statusresponseinletheaders=Response.headersresponseinifCode.(is_error@@code_of_statusstatus)thenReader.contentsr>>=funmsg->Option.iterlog~f:(funlog->Log.errorlog"%s"msg);failwith@@"HTTP Error "^Code.(string_of_statusstatus)elseifResponse.versionresponse<>`HTTP_1_1thenfailwith"HTTP version error"elseifstatus<>`Switching_protocolsthenfailwith@@"status error "^Code.(string_of_statusstatus)elseifHeader.(getheaders"upgrade")|>Option.map~f:String.lowercase<>Some"websocket"thenfailwith"upgrade error"elseifnot@@upgrade_presentheadersthenfailwith"update not present"elseifHeader.getheaders"sec-websocket-accept"<>Some(nonce^websocket_uuid|>b64_encoded_sha1sum)thenfailwith"accept error"elseDeferred.unitinletrun()=drain_handshakenet_to_wsws_to_net>>=fun()->Option.iterinitialized~f:(funivar->Ivar.fillivar());letread_frame=make_read_frame~mode:(Clientrandom_string)net_to_wsws_to_netinletbuf=Buffer.create128inletrecforward_frames_to_appws_to_app=read_frame()>>=funfr->beginifnot@@Pipe.is_closedws_to_appthenPipe.writews_to_appfrelseDeferred.unitend>>=fun()->forward_frames_to_appws_to_appinletforward_frames_to_netws_to_netapp_to_ws=Writer.transferws_to_netapp_to_wsbeginfunfr->Buffer.clearbuf;write_frame_to_buf~mode:(Clientrandom_string)buffr;letcontents=Buffer.contentsbufinWriter.writews_to_netcontentsendinDeferred.any_unit[forward_frames_to_appws_to_app;forward_frames_to_netws_to_netapp_to_ws;Deferred.all_unitPipe.[closedapp_to_ws;closedws_to_app;];]inletfinally_f=lazybeginPipe.close_readapp_to_ws;Pipe.closews_to_appendinMonitor.try_with_or_error~namerun>>|funres->Lazy.forcefinally_f;resletclient_ez?opcode?log?(name="websocket.client_ez")?extra_headers?heartbeat?random_stringurinet_to_wsws_to_net=letapp_to_ws,reactor_write=Pipe.create()inletto_reactor_write,client_write=Pipe.create()inletclient_read,ws_to_app=Pipe.create()inletinitialized=Ivar.create()inletinitialized_d=Ivar.readinitializedinletlast_pong=ref@@Time_ns.epochinletcleanup=lazybeginPipe.closews_to_app;Pipe.close_readapp_to_ws;Pipe.close_readto_reactor_write;Pipe.closeclient_writeendinletsend_pingwspan=letnow=Time_ns.now()inOption.iterlog~f:(funlog->Log.debuglog"-> PING");Pipe.writew@@Frame.create~opcode:Frame.Opcode.Ping~content:(Time_ns.to_string_fix_proto`Utcnow)()>>|fun()->lettime_since_last_pong=Time_ns.diffnow!last_ponginif!last_pong>Time_ns.epoch&&Time_ns.Span.(time_since_last_pong>span+span)thenLazy.forcecleanupinletreactwfr=letopenFrameinOption.iterlog~f:(funlog->Log.debuglog"<- %s"Frame.(showfr));matchfr.opcodewith|Opcode.Ping->Pipe.writew@@Frame.create~opcode:Opcode.Pong()>>|fun()->None|Opcode.Close->(* Immediately echo and pass this last message to the user *)(ifString.lengthfr.content>=2thenPipe.writew@@Frame.create~opcode:Opcode.Close~content:(String.subfr.content~pos:0~len:2)()elsePipe.writew@@Frame.close1000)>>|fun()->Pipe.closew;None|Opcode.Pong->last_pong:=Time_ns.now();returnNone|Opcode.Text|Opcode.Binary->return@@Somefr.content|_->Pipe.writew@@Frame.close1002>>|fun()->Pipe.closew;Noneinletclient_read=Pipe.filter_map'client_read~f:(reactreactor_write)inletreact()=initialized_d>>=fun()->Pipe.transferto_reactor_writereactor_write~f:(funcontent->Frame.create?opcode~content())in(* Run send_ping every heartbeat when heartbeat is set. *)don't_wait_forbeginmatchheartbeatwith|None->Deferred.unit|Somespan->initialized_d>>|fun()->Clock_ns.run_at_intervals'~continue_on_error:false~stop:(Pipe.closedreactor_write)span(fun()->send_pingreactor_writespan)end;don't_wait_forbeginMonitor.protect~finally:(fun()->Lazy.forcecleanup;Deferred.unit)beginfun()->Deferred.any_unit[(client~name?extra_headers?log?random_string~initialized~app_to_ws~ws_to_app~net_to_ws~ws_to_neturi|>Deferred.ignore);react();Deferred.all_unitPipe.[closedclient_read;closedclient_write;]]endend;client_read,client_writeletserver?log?(name="websocket.server")?(check_request=fun_->Deferred.returntrue)?(select_protocol=fun_->None)~reader~writer~app_to_ws~ws_to_app()=lethandshakerw=(Request_async.readr>>|function|`Okr->r|`Eof->(* Remote endpoint closed connection. No further action
necessary here. *)Option.iterlog~f:beginfunlog->Log.infolog"Remote endpoint closed connection"end;raiseEnd_of_file|`Invalidreason->Option.iterlog~f:beginfunlog->Log.infolog"Invalid input from remote endpoint: %s"reasonend;failwithreason)>>=funrequest->begincheck_requestrequest>>=function|true->Deferred.unit|false->letbody="403 Forbidden"inletresponse=Cohttp.Response.make~status:`Forbidden()~encoding:(Cohttp.Transfer.Fixed(String.lengthbody|>Int64.of_int))inletopenResponse_asyncinwrite~flush:true(funw->write_bodywbody)responsew>>=fun()->raiseExitend>>=fun()->letmeth=Request.methrequestinletversion=Request.versionrequestinletheaders=Request.headersrequestinifnotbeginversion=`HTTP_1_1&&meth=`GET&&Option.map(Header.getheaders"upgrade")~f:String.lowercase=Some"websocket"&&upgrade_presentheadersendthenfailwith"Protocol error";letkey=Option.value_exn~message:"missing sec-websocket-key"(Header.getheaders"sec-websocket-key")inlethash=key^websocket_uuid|>b64_encoded_sha1suminletsubprotocol=Option.value_map(Header.getheaders"sec-websocket-protocol")~default:[]~f:beginfunp->Option.value_map(select_protocolp)~default:[]~f:beginfunselected->["Sec-WebSocket-Protocol",selected]endendinletresponse_headers=("Upgrade","websocket")::("Connection","Upgrade")::("Sec-WebSocket-Accept",hash)::subprotocolinletresponse=Response.make~status:`Switching_protocols~encoding:Transfer.Unknown~headers:(Header.of_listresponse_headers)()inResponse_async.write(fun_->Deferred.unit)responsewinMonitor.try_with_or_error~name~extract_exn:true(fun()->handshakereaderwriter)|>Deferred.Or_error.bind~f:beginfun()->set_tcp_nodelaywriter;letread_frame=make_read_frame~mode:Serverreaderwriterinletrecloop()=read_frame()>>=Pipe.writews_to_app>>=loopinlettransfer_end=letbuf=Buffer.create128inPipe.transferapp_to_wsWriter.(pipewriter)~f:beginfunfr->Buffer.clearbuf;write_frame_to_buf~mode:Serverbuffr;Buffer.contentsbufendinMonitor.protect~finally:beginfun()->Pipe.closews_to_app;Pipe.close_readapp_to_ws;Deferred.unitendbeginfun()->Deferred.any[transfer_end;loop();Pipe.closedws_to_app;Pipe.closedapp_to_ws]end>>=Deferred.Or_error.returnend