Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file client.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742openCoreopenAsyncopenCommonletdisconnect_message="Disconnected from Redis: see server logs for detail"(** Represents a subscriber to a pub/sub message where deserialization and result type are
specific to the individual subscriber. *)typesubscriber=|Subscriber:{writer:'aPipe.Writer.t;consume:(read,Iobuf.seek)Iobuf.t->subscription:string->'a}->subscriber(** This is a table from pub/sub channel to a list of subscribers. Each channel can have
multiple subscribers, but that is represented by a single redis subscription that is
fanned out.
If the list of subscribers for a channel is empty, that means we issued an unsubscribe
command to redis.
*)typesubscription_table=subscriberlistString.Table.ttype'at={pending_response:(moduleResponse_intf.S)Queue.t;reader:Reader.t;writer:Writer.t;mutableinvalidations:[`All|`Keyof'a]Pipe.Writer.tlist;subscriptions:subscription_table;pattern_subscriptions:subscription_table}moduleMake(Key:Bulk_io_intf.S)(Field:Bulk_io_intf.S)(Value:Bulk_io_intf.S)=structmoduleKey_parser=Parse_bulk.Make(Key)moduleField_parser=Parse_bulk.Make(Field)moduleValue_parser=Parse_bulk.Make(Value)moduleField_value_map_parser=Parse_bulk.Make_map(Field_parser)(Value_parser)letwrite_array_headerwriterlen=Writer.write_charwriter'*';Writer.writewriter(itoalen);write_crlfwriter;;letwrite_array_el(typew)writer(moduleIO:Bulk_io_intf.Swithtypet=w)?(prefix="")el=letlen=IO.Redis_bulk_io.lengthelinWriter.write_charwriter'$';Writer.writewriter(len+String.lengthprefix|>itoa);write_crlfwriter;Writer.writewriterprefix;IO.Redis_bulk_io.write~lenwriterel;write_crlfwriter;;letwith_writertf=ifWriter.is_closedt.writerthenDeferred.Or_error.error_stringdisconnect_messageelseft.writer;;letcommand_key(typer)t?result_of_empty_inputcmdsargs(moduleR:Response_intf.Swithtypet=r)=matchresult_of_empty_inputwith|SomeresultwhenList.is_emptyargs->returnresult|_->with_writert(funwriter->Queue.enqueuet.pending_response(moduleR);write_array_headerwriter(List.lengthcmds+List.lengthargs);List.itercmds~f:(funcmd->write_array_elwriter(moduleBulk_io.String)cmd);List.iterargs~f:(funarg->write_array_elwriter(moduleKey)arg);Ivar.readR.this);;letcommand_keys_args(typera)t?result_of_empty_inputcmdskey_argsargs(moduleArg:Bulk_io_intf.Swithtypet=a)(moduleR:Response_intf.Swithtypet=r)=matchresult_of_empty_inputwith|SomeresultwhenList.is_emptykey_args||List.is_emptyargs->returnresult|_->with_writert(funwriter->Queue.enqueuet.pending_response(moduleR);write_array_headerwriter(List.lengthcmds+List.lengthkey_args+List.lengthargs);List.itercmds~f:(funcmd->write_array_elwriter(moduleBulk_io.String)cmd);List.iterkey_args~f:(funarg->write_array_elwriter(moduleKey)arg);List.iterargs~f:(funarg->write_array_elwriter(moduleArg)arg);Ivar.readR.this);;letcommand_keys_valuest?result_of_empty_inputcmdskey_argsvalue_argsresponse=command_keys_argst?result_of_empty_inputcmdskey_argsvalue_args(moduleValue)response;;letcommand_keys_fieldst?result_of_empty_inputcmdskey_argsfield_argsresponse=command_keys_argst?result_of_empty_inputcmdskey_argsfield_args(moduleField)response;;letcommand_keys_string_argst?result_of_empty_inputcmdskey_argsargsresponse=command_keys_argst?result_of_empty_inputcmdskey_argsargs(moduleBulk_io.String)response;;letcommand_keys_fields_and_values(typer)t?result_of_empty_inputcmdskey_argsfields_and_value_args(moduleR:Response_intf.Swithtypet=r)=matchresult_of_empty_inputwith|SomeresultwhenList.is_emptykey_args||List.is_emptyfields_and_value_args->returnresult|_->with_writert(funwriter->Queue.enqueuet.pending_response(moduleR);write_array_headerwriter(List.lengthcmds+List.lengthkey_args+(List.lengthfields_and_value_args*2));List.itercmds~f:(funcmd->write_array_elwriter(moduleBulk_io.String)cmd);List.iterkey_args~f:(funarg->write_array_elwriter(moduleKey)arg);List.iterfields_and_value_args~f:(fun(field,value)->write_array_elwriter(moduleField)field;write_array_elwriter(moduleValue)value);Ivar.readR.this);;letcommand_string(typer)tcmds(moduleR:Response_intf.Swithtypet=r)=command_keytcmds[](moduleR);;letcommand_kv(typer)t?result_of_empty_inputcmdsalistargs(moduleR:Response_intf.Swithtypet=r)=matchresult_of_empty_inputwith|SomeresultwhenList.is_emptyalist->returnresult|_->with_writert(funwriter->Queue.enqueuet.pending_response(moduleR);write_array_headerwriter(List.lengthcmds+(List.lengthalist*2)+List.lengthargs);List.itercmds~f:(funcmd->write_array_elwriter(moduleBulk_io.String)cmd);List.iteralist~f:(fun(key,value)->write_array_elwriter(moduleKey)key;write_array_elwriter(moduleValue)value);List.iterargs~f:(funcmd->write_array_elwriter(moduleBulk_io.String)cmd);Ivar.readR.this);;letcommand_key_scores_values(typer)t?result_of_empty_inputcmdskeyalist(moduleR:Response_intf.Swithtypet=r)=matchresult_of_empty_inputwith|SomeresultwhenList.is_emptyalist->returnresult|_->with_writert(funwriter->Queue.enqueuet.pending_response(moduleR);write_array_headerwriter(List.lengthcmds+1+(List.lengthalist*2));List.itercmds~f:(funcmd->write_array_elwriter(moduleBulk_io.String)cmd);write_array_elwriter(moduleKey)key;List.iteralist~f:(fun(`Scorescore,value)->write_array_elwriter(moduleBulk_io.Float)score;write_array_elwriter(moduleValue)value);Ivar.readR.this);;letcommand_key_range(typer)tcmdskey~min_index~max_index(moduleR:Response_intf.Swithtypet=r)=with_writert(funwriter->Queue.enqueuet.pending_response(moduleR);write_array_headerwriter(List.lengthcmds+3);List.itercmds~f:(funcmd->write_array_elwriter(moduleBulk_io.String)cmd);write_array_elwriter(moduleKey)key;write_array_elwriter(moduleBulk_io.Int)min_index;write_array_elwriter(moduleBulk_io.Int)max_index;Ivar.readR.this);;letcommand_key_bounded_range(typers)tcmdskey~min~max~infinity_min~infinity_max~incl_prefix(moduleR:Response_intf.Swithtypet=r)(moduleValue:Bulk_io.Swithtypet=s)=with_writert(funwriter->letwrite_boundboundinfinite_symbol=matchboundwith|Unbounded->write_array_elwriter(moduleBulk_io.String)infinite_symbol|Inclvalue->write_array_elwriter(moduleValue)value~prefix:incl_prefix|Exclvalue->write_array_elwriter(moduleValue)value~prefix:"("inQueue.enqueuet.pending_response(moduleR);write_array_headerwriter(List.lengthcmds+3);List.itercmds~f:(funcmd->write_array_elwriter(moduleBulk_io.String)cmd);write_array_elwriter(moduleKey)key;write_boundmininfinity_min;write_boundmaxinfinity_max;Ivar.readR.this);;letcommand_key_score_range(typer)tcmdskey~min_score:min~max_score:max(moduleR:Response_intf.Swithtypet=r)=(* https://redis.io/commands/zrangebyscore/#exclusive-intervals-and-infinity *)command_key_bounded_rangetcmdskey~min~max~infinity_min:"-inf"~infinity_max:"+inf"~incl_prefix:""(moduleR)(moduleBulk_io.Float);;letcommand_key_lex_range(typer)tcmdskey~min~max(moduleR:Response_intf.Swithtypet=r)=(* https://redis.io/commands/zrangebylex/#how-to-specify-intervals *)command_key_bounded_rangetcmdskey~min~max~infinity_min:"-"~infinity_max:"+"~incl_prefix:"["(moduleR)(moduleValue);;(** Handle invalidation PUSH messages *)letinvalidationtdata=letwas_changed,invalidations=List.foldt.invalidations~init:(false,[])~f:(fun(was_changed,invalidations)invalidation->ifPipe.is_closedinvalidationthentrue,invalidationselse(Pipe.write_without_pushbackinvalidationdata;was_changed,invalidation::invalidations))inifwas_changedthen(t.invalidations<-invalidations;ifList.is_emptyinvalidationsthendon't_wait_for(Deferred.ignore_m(command_stringt["CLIENT";"TRACKING";"OFF"](Response.create_ok()))));;lethandle_messagesubscriptionsbuf=Resp3.expect_charbuf'$';letsubscription=Resp3.blob_stringbufinmatchHashtbl.findsubscriptionssubscriptionwith|None->raise_s[%message[%here]"BUG: Received a message that was not subscribed to"(subscription:string)]|Somewriters->letlo=Iobuf.Expert.lobufinList.iteriwriters~f:(funi(Subscriber{writer;consume})->ifi<>0thenIobuf.Expert.set_lobuflo;letpayload=consume(buf:>(read,Iobuf.seek)Iobuf.t)~subscriptioninPipe.write_without_pushback_if_openwriterpayload);;(** Read RESP3 out-of-band push messages *)letread_pushtbuf=letlen=Int.of_string(Resp3.simple_stringbuf)inResp3.expect_charbuf'$';letpushed=Resp3.blob_stringbufinmatchlen,pushedwith|2,"invalidate"->(* As of Redis 6.0.8
- When using BCAST the invalidation array can be larger than size 1, which is not
documented in the protocol.
- The invalidation messages are decoupled from the atomicity guarantees inside
Redis, {{: https://github.com/redis/redis/issues/7563 } which arguably should not
be the case}. For example: If I invalidate 3 keys using MSET the client should
ideally receive 1 invalidation message with 3 keys, but instead receives 3
invalidation message each with one key. *)(matchResp3.peek_charbufwith|'*'->letkeys=Key_parser.listbuf|>Or_error.ok_exninList.iterkeys~f:(funkey->invalidationt(`Keykey))|'_'->Iobuf.advancebuf1;Resp3.expect_crlfbuf;invalidationt`All|unexpected->raise(Resp3.Protocol_error(sprintf"Expected an invalidation message but observed '%c'"unexpected)))|3,("subscribe"|"psubscribe"|"unsubscribe"|"punsubscribe")->(* Intentionally ignored, see comments for the [subscribe] command *)()|3,"message"->handle_messaget.subscriptionsbuf|4,"pmessage"->handle_messaget.pattern_subscriptionsbuf|len,_->raise_s[%message"Received a PUSH message type which is not implemented"(len:int)(pushed:string)];;(** Read messages coming from the Redis to the client *)letreadt=Reader.read_one_iobuf_at_a_timet.reader~handle_chunk:(funbuf->(* We will receive a callback when new data from the server is available.
When reading RESP3 there's no way to know the full length of the message until we
have parsed the whole thing. This is a flaw in the protocol. There's also no
guarantee that our buffer contains a full message, so we need to handle the case
where the buffer contains an incomplete message. We do this by raising and
handling the exception [Need_more_data] which causes message parsing to wait for
more data before continuing.
Because parsing must start from the beginning of a message and there's no way to
consistently know how much more data is needed to obtain a complete message, one
can imagine a pathological case where the cost of parsing a large message that
arrives in many pieces becomes quadratic.
We can make an improvement to this behavior: The presence of the characters
"\r\n" at the end of the buffer is necessary (but not sufficient) for the buffer
to end in a complete RESP3 message. If these characters are not found then it
must be true that more data will arrive from the server. We can use this as an
inexpensive check to defer parsing that would fail without more data. *)ifResp3.ends_in_crlfbufthen(trywhilenot(Iobuf.is_emptybuf)domatchIobuf.Unsafe.Peek.charbuf~pos:0with|'>'->Iobuf.advancebuf1;read_pushtbuf|_->ifQueue.is_emptyt.pending_responsethenraise_s[%message[%here]"Received a response when none was expected"(buf:(read_write,Iobuf.seek)Iobuf.Window.Hexdump.t)]else(letresponse=Queue.peek_exnt.pending_responseinletmoduleR=(valresponse:Response_intf.S)inIvar.fillR.this(R.parsebuf);(* Parsing this message succeeded. It is now safe to dispose of the
pending response and mark that this portion of the buffer has been
consumed. *)Iobuf.narrow_lobuf;ignore(Queue.dequeue_exnt.pending_response:(moduleResponse_intf.S)))done;return`Continuewith|Need_more_data->Iobuf.rewindbuf;return`Continue|exn->return(`Stopexn))else(* There is an incomplete message at the end of the buffer *)return`Continue);;letcloset=let%bind()=Writer.closet.writerinlet%map()=Reader.closet.readerinList.itert.invalidations~f:Pipe.close;t.invalidations<-[];List.iter(Hashtbl.datat.subscriptions@Hashtbl.datat.pattern_subscriptions)~f:(funsubscribers->List.itersubscribers~f:(fun(Subscriber{writer;consume=_})->Pipe.closewriter));;letclose_finishedt=let%bind()=Writer.close_finishedt.writerinReader.close_finishedt.reader;;lethas_close_startedt=Writer.is_closedt.writer||Reader.is_closedt.readerletcreate?on_disconnect?auth~where_to_connect()=let%bind.Deferred.Or_error_socket,reader,writer=(* Tcp.connect will raise if the connection attempt times out, but we'd prefer to
return an Error. *)Monitor.try_with_or_error(fun()->Tcp.connectwhere_to_connect)inletpending_response=Queue.create()inlett={pending_response;reader;writer;invalidations=[];subscriptions=String.Table.create();pattern_subscriptions=String.Table.create()}inWriter.set_raise_when_consumer_leaveswriterfalse;don't_wait_for(let%bindreason=readtinletreason=matchreasonwith|`Eof|`Eof_with_unconsumed_data_->Error.of_stringdisconnect_message|`Stoppedexn->Error.of_exnexninlet%map()=closetinQueue.itert.pending_response~f:(funresponse->letmoduleR=(valresponse:Response_intf.S)inIvar.fillR.this(Errorreason));Queue.cleart.pending_response;Option.iteron_disconnect~f:(funf->f()));(* Tell the session that we will be speaking RESP3 and authenticate if need be *)letcmds=["HELLO";"3"](* When protover (i.e. 2/3) is used, we can also pass [AUTH] and [SETNAME] to [HELLO]. *)@Option.value_mapauth~default:[]~f:(fun{Auth.username;password}->["AUTH";username;password])inlet%map.Deferred.Or_error(_:Resp3.tString.Map.t)=command_stringtcmds(Response.create_string_map())int;;letwith_?on_disconnect?auth~where_to_connectf=let%bind.Deferred.Or_errorconn=create?on_disconnect?auth~where_to_connect()inMonitor.protect~finally:(fun()->closeconn)(fun()->fconn)|>Deferred.ok;;letget_leader_addresssentinel~leader_name=match%bindcommand_stringsentinel["SENTINEL";"GET-MASTER-ADDR-BY-NAME";leader_name](Response.create_host_and_port())with|Errore->Deferred.Or_error.faile|Okleader->Tcp.Where_to_connect.of_host_and_portleader|>Deferred.Or_error.return;;letis_leaderconn=match%bind.Deferred.Or_errorcommand_stringconn["ROLE"](Response.create_role())with|(Sentinel_|Replica_)asrole->Deferred.Or_error.error_s[%message"Not the leader"(role:Role.t)]|Leader_->Deferred.Or_error.ok_unit;;letcreate_using_sentinel?on_disconnect?sentinel_auth?auth~leader_name~where_to_connect()=(* Sentinel requires two connection steps:
1. Connect to the sentinel and ask for the leader address
2. Connect to the proposed leader and confirm that it is a leader
Read more here:
https://redis.io/docs/reference/sentinel-clients/#redis-service-discovery-via-sentinel
If all sentinels fail to connect or return a leader, then the client should return
an error. The leader node will disconnect from the client on failover, so the
client does not need to poll or listen to a subscription event to determine when to
disconnect from a stale leader. *)Deferred.Or_error.find_map_okwhere_to_connect~f:(funsentinel_addr->let%bind.Deferred.Or_errorleader_addr=with_~where_to_connect:sentinel_addr?auth:sentinel_auth(funsentinel_conn->get_leader_addresssentinel_conn~leader_name|>Deferred.Or_error.tag_s~tag:[%message"Failed to determine leader"~leader_name(sentinel_addr:[<Socket.Address.t]Tcp.Where_to_connect.t)])|>Deferred.Or_error.tag_s~tag:[%message"Failed to connect to sentinel"]|>Deferred.map~f:Or_error.joininlet%bind.Deferred.Or_errorleader_conn=create?on_disconnect?auth~where_to_connect:leader_addr()|>Deferred.Or_error.tag_s~tag:[%message"Failed to connect to leader"~leader_name(leader_addr:Tcp.Where_to_connect.inet)(sentinel_addr:[<Socket.Address.t]Tcp.Where_to_connect.t)]inmatch%bindis_leaderleader_connwith|Ok()->Deferred.Or_error.returnleader_conn|Errorerror->let%bind()=closeleader_conninDeferred.Or_error.fail(Error.tag_serror~tag:[%message"Failed to verify leader"~leader_name(leader_addr:Tcp.Where_to_connect.inet)(sentinel_addr:[<Socket.Address.t]Tcp.Where_to_connect.t)]));;letclient_trackingt?(bcast=false)()=letcommands=matchbcastwith|false->["CLIENT";"TRACKING";"ON";"NOLOOP"]|true->["CLIENT";"TRACKING";"ON";"NOLOOP";"BCAST"]inletreader,writer=Pipe.create()inletwas_empty=List.is_emptyt.invalidationsint.invalidations<-writer::t.invalidations;let%map.Deferred.Or_error()=ifwas_emptythencommand_stringtcommands(Response.create_ok())elseDeferred.Or_error.return()inreader;;letadd_subscribert(lookup:subscription_table)~unsubscribe_command(Subscriber{writer;_}assubscriber)~channel=Hashtbl.add_multilookup~key:channel~data:subscriber;don't_wait_for@@let%bind()=Pipe.closedwriterin(* We know for a fact that [Hashtbl.find lookup channel] exists and has
this entry because the only way that entry would be removed is if the
list is empty, and that list is only empty if all subscribers have been
removed. The only code that removes this subscriber is the one below.
*)letremaining_subscribers=Hashtbl.update_and_returnlookupchannel~f:(function|None->raise_s[%message"BUG: [Redis.Client.add_subscriber] could not find entry for channel in \
the subscription table when cleaning up a closed subscriber"(channel:string)]|Somesubscribers->List.filtersubscribers~f:(funsubscriber_in_list->not(phys_equalsubscriber_in_listsubscriber)))inifList.is_emptyremaining_subscribersthenDeferred.ignore_m(* Ignore the return value because:
- A failure probably means the connection was closed
- There's nothing we can do about an unsubscription failure in the first place
*)(with_writert(funwriter->write_array_headerwriter2;write_array_elwriter(moduleBulk_io.String)unsubscribe_command;let(moduleR)=Response.create_unsubscription~channel~on_success:(fun()->Hashtbl.removelookupchannel)inQueue.enqueuet.pending_response(moduleR);write_array_elwriter(moduleBulk_io.String)channel;Ivar.readR.this))elsereturn();;letsubscribe_impltchannels~command~unsubscribe_command~lookup~consume=(* Subscription command replies are unusual: Redis will respond using a separate push
message for each channel subscribed to, but unlike normal push messages, each is
expected to be in pipelined sequence like a normal command.
To deal with this we listen for push messages of "subscribe" and ignore them. The
receive loop continues and will treat the following message fragment as a normal
in-band protocol message. This command expects the same number of responses of this
shape as channels specified to the command or an error, in which case it will
dequeue whatever other responses it may be expecting. *)letsubscription_reader,subscription_writer=Pipe.create()inletsubscriber=Subscriber{consume;writer=subscription_writer}inmatchList.filterchannels~f:(funchannel->matchHashtbl.find_multilookupchannelwith|_::_->(* If the list is ever empty, then we know for a fact that an unsubscribe
has been issued or we have never subscribed, so we have to resubscribe. *)add_subscribertlookupsubscriber~unsubscribe_command~channel;false|[]->true)with|[]->Deferred.Or_error.returnsubscription_reader|new_channels->let%map.Deferred.Or_error()=with_writert(funwriter->write_array_headerwriter(List.lengthchannels+1);write_array_elwriter(moduleBulk_io.String)command;List.mapnew_channels~f:(funchannel->letr=Response.create_subscription~channel~on_success:(fun()->(* Subscriber registration must be added synchronously and immediately
after a subscription succeeds, as opposed to waiting for the Response
to be determined. This is necessary because the receive loop processes
a buffer without yielding that may contain multiple messages, and a
message destined for a new subscriber could be within that buffer
following the subscription success. *)add_subscribertlookupsubscriber~unsubscribe_command~channel)inlet(moduleR)=rinQueue.enqueuet.pending_response(moduleR);write_array_elwriter(moduleBulk_io.String)channel;channel,r)|>List.fold~init:Deferred.Or_error.ok_unit~f:(funacc(_channel,r)->match%bind.Deferredaccwith|Errorerror->(* If there was an error, dequeue the next subscription request, as there will never be a response. *)ignore(Queue.dequeue_exnt.pending_response:(moduleResponse_intf.S));Deferred.Or_error.failerror|Ok()->let(moduleR)=rinlet%map.Deferred.Or_error_=Ivar.readR.thisin()))insubscription_reader;;letsubscribe_rawt=function|`Literalsubscriptions->subscribe_impltsubscriptions~command:"SUBSCRIBE"~unsubscribe_command:"UNSUBSCRIBE"~lookup:t.subscriptions|`Patternsubscriptions->subscribe_impltsubscriptions~command:"PSUBSCRIBE"~unsubscribe_command:"PUNSUBSCRIBE"~lookup:t.pattern_subscriptions;;end