Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file ocsipersist.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344(* FIX: the log file is never reopened *)openOcsidbmtypesopenLwt.InfixmoduletypeTABLE=Ocsipersist_lib.Sigs.TABLEletsection=Lwt_log.Section.make"ocsigen:ocsipersist:dbm"exceptionOcsipersist_errorletsocketname="socket"moduleConfig=Ocsipersist_settingsmoduleAux=structexternalsys_exit:int->'a="caml_sys_exit"endmoduleDb=structlettry_connectsname=Lwt.catch(fun()->letsocket=Lwt_unix.socketUnix.PF_UNIXUnix.SOCK_STREAM0inLwt_unix.connectsocket(Unix.ADDR_UNIXsname)>>=fun()->Lwt.returnsocket)(fun_->Lwt_log.ign_warning_f~section"Launching a new Ocsidbm process: %s on directory %s."!Config.ocsidbm!Config.directory;letparam=[|!Config.ocsidbm;!Config.directory|]inletchild()=letlog=Unix.openfile!Config.error_log_path[Unix.O_WRONLY;Unix.O_CREAT;Unix.O_APPEND]0o640inUnix.dup2logUnix.stderr;Unix.closelog;letdevnull=Unix.openfile"/dev/null"[Unix.O_WRONLY]0inUnix.dup2devnullUnix.stdout;Unix.closedevnull;Unix.closeUnix.stdin;Unix.execvp!Config.ocsidbmparaminletpid=Lwt_unix.fork()inifpid=0thenif(* double fork *)Lwt_unix.fork()=0thenchild()elseAux.sys_exit0elseLwt_unix.waitpid[]pid>>=fun_->Lwt_unix.sleep1.1>>=fun()->letsocket=Lwt_unix.socketUnix.PF_UNIXUnix.SOCK_STREAM0inLwt_unix.connectsocket(Unix.ADDR_UNIXsname)>>=fun()->Lwt.returnsocket)letrecget_indescri=Lwt.catch(fun()->try_connect(!Config.directory^"/"^socketname))(fune->ifi=0then(Lwt_log.ign_error_f~section"Cannot connect to Ocsidbm. Will continue without persistent session support. Error message is: %s .Have a look at the logs to see if there is an error message from the Ocsidbm process."(matchewith|Unix.Unix_error(a,b,c)->Printf.sprintf"%a in %s(%s)"(fun()->Unix.error_message)abc|_->Printexc.to_stringe);Lwt.faile)elseLwt_unix.sleep2.1>>=fun()->get_indescr(i-1))letsend=letprevious=ref(Lwt.returnOk)infunv->Lwt.catch(fun()->!previous)(fun_->Lwt.returnOk)>>=fun_->!Config.inch>>=funinch->!Config.outch>>=funoutch->(previous:=Lwt_io.write_valueoutchv>>=fun()->Lwt_io.flushoutch>>=fun()->Lwt_io.read_valueinch);!previousletget(store,name)=send(Get(store,name))>>=function|Valuev->Lwt.returnv|Dbm_not_found->Lwt.failNot_found|Errore->Lwt.faile|_->Lwt.failOcsipersist_errorletremove(store,name)=send(Remove(store,name))>>=function|Ok->Lwt.return()|Errore->Lwt.faile|_->Lwt.failOcsipersist_errorletreplace(store,name)value=send(Replace(store,name,value))>>=function|Ok->Lwt.return()|Errore->Lwt.faile|_->Lwt.failOcsipersist_errorletreplace_if_exists(store,name)value=send(Replace_if_exists(store,name,value))>>=function|Ok->Lwt.return()|Dbm_not_found->Lwt.failNot_found|Errore->Lwt.faile|_->Lwt.failOcsipersist_errorletfirstkeystore=send(Firstkeystore)>>=function|Keyk->Lwt.return(Somek)|Errore->Lwt.faile|_->Lwt.returnNoneletnextkeystore=send(Nextkeystore)>>=function|Keyk->Lwt.return(Somek)|Errore->Lwt.faile|_->Lwt.returnNoneletlengthstore=send(Lengthstore)>>=function|Valuev->Lwt.return(Marshal.from_stringv0)|Dbm_not_found->Lwt.return0|Errore->Lwt.faile|_->Lwt.failOcsipersist_errorendmoduleStore=structtypestore=stringtype'at=store*string(** Type of persistent data *)letopen_storename=Lwt.returnnameletmake_persistent_lazy_lwt~store~name~default=letpvname=store,nameinLwt.catch(fun()->Db.getpvname>>=fun_->Lwt.return())(function|Not_found->default()>>=fundef->Db.replacepvname(Marshal.to_stringdef[])|e->Lwt.faile)>>=fun()->Lwt.returnpvnameletmake_persistent_lazy~store~name~default=letdefault()=Lwt.wrapdefaultinmake_persistent_lazy_lwt~store~name~defaultletmake_persistent~store~name~default=make_persistent_lazy~store~name~default:(fun()->default)letget(pvname:'at):'a=Db.getpvname>>=funr->Lwt.return(Marshal.from_stringr0)letsetpvnamev=letdata=Marshal.to_stringv[]inDb.replacepvnamedataendtypestore=Store.storetype'avariable='aStore.tmoduleFunctorial=structtypeinternal=stringmoduletypeCOLUMN=sigtypetvalcolumn_type:stringvalencode:t->stringvaldecode:string->tendmoduleTable(T:sigvalname:stringend)(Key:COLUMN)(Value:COLUMN):Ocsipersist_lib.Sigs.TABLEwithtypekey=Key.tandtypevalue=Value.t=structtypekey=Key.ttypevalue=Value.tletname=T.nameletfindkey=Lwt.mapValue.decode@@Db.get(name,Key.encodekey)letaddkeyvalue=Db.replace(name,Key.encodekey)(Value.encodevalue)letreplace_if_existskeyvalue=Db.replace_if_exists(name,Key.encodekey)(Value.encodevalue)letremovekey=Db.remove(name,Key.encodekey)letfold?count?gt?geq?lt?leqfbeg=leti=ref0Linletrecauxnextkeybeg=matchcountwith|Somecwhen!i>=c->Lwt.returnbeg|_->(nextkeyname>>=function|None->Lwt.returnbeg|Somek->(letk=Key.decodekinmatchgt,geq,lt,leqwith|_,_,Somelt,_whenk>=lt->Lwt.returnbeg|_,_,_,Somelewhenk>le->Lwt.returnbeg|Somegt,_,_,_whenk<=gt->auxDb.nextkeybeg|_,Somege,_,_whenk<ge->auxDb.nextkeybeg|_->i:=Int64.succ!i;findk>>=funr->fkrbeg>>=auxDb.nextkey))inauxDb.firstkeybegletiter?count?gt?geq?lt?leqf=fold?count?gt?geq?lt?leq(funkv()->fkv)()letiter_batch?count:_?gt:_?geq:_?lt:_?leq:__=failwith"Ocsipersist.iter_batch not implemented for DBM"letiter_block?count:_?gt:_?geq:_?lt:_?leq:__=failwith"iter_block not implemented for DBM. Please use Ocsipersist with sqlite"letmodify_optkeyf=Lwt.catch(fun()->findkey>>=funv->Lwt.return_somev)(functionNot_found->Lwt.return_none|_->assertfalse)>>=funold_value->matchfold_valuewith|None->removekey|Somenew_value->replace_if_existskeynew_valueletlength()=(* for DBM the result may be less than the actual lengeth *)Db.lengthnamemoduleVariable=Ocsipersist_lib.Variable(structtypek=keytypev=valueletfind=findletadd=addend)endmoduleColumn=structmoduleString:COLUMNwithtypet=string=structtypet=stringletcolumn_type="_"letencodes=sletdecodes=sendmoduleFloat:COLUMNwithtypet=float=structtypet=floatletcolumn_type="_"letencode=string_of_floatletdecode=float_of_stringendmoduleMarshal(C:sigtypetend):COLUMNwithtypet=C.t=structtypet=C.tletcolumn_type="_"letencodev=Marshal.to_stringv[]letdecodev=Marshal.from_stringv0endendendmodulePolymorphic=Ocsipersist_lib.Polymorphic(Functorial)moduleRef=Ocsipersist_lib.Ref(Store)type'valuetable='valuePolymorphic.table(* iterator: with a separate connexion:
exception Exn1
let iter_table f table =
let first = Marshal.to_string (Firstkey table) [] in
let firstl = String.length first in
let next = Marshal.to_string (Nextkey table) [] in
let nextl = String.length next in
(Lwt_unix.socket Unix.PF_UNIX Unix.SOCK_STREAM 0 >>=
(fun socket ->
Lwt_unix.connect
(Lwt_unix.Plain socket)
(Unix.ADDR_UNIX (Config.directory^"/"^socketname)) >>=
(fun () -> return (Lwt_unix.Plain socket)) >>=
(fun indescr ->
let inch = Lwt_unix.in_channel_of_descr indescr in
let nextkey next nextl =
Lwt_unix.write indescr next 0 nextl >>=
(fun l2 -> if l2 <> nextl
then Lwt.fail Ocsipersist_error
else (Lwt_unix.input_line inch >>=
fun answ -> return (Marshal.from_string answ 0)))
in
let rec aux n l =
nextkey n l >>=
(function
| End -> return ()
| Key k -> find table k >>= f k
| Error e -> Lwt.fail e
| _ -> Lwt.fail Ocsipersist_error) >>=
(fun () -> aux next nextl)
in
catch
(fun () ->
aux first firstl >>=
(fun () -> Unix.close socket; return ()))
(fun e -> Unix.close socket; Lwt.fail e))))
*)letinit()=if!Ocsipersist_settings.delay_loadingthenLwt_log.ign_warning~section"Asynchronuous initialization (may fail later)"elseLwt_log.ign_warning~section"Initializing ...";letindescr=Db.get_indescr2inif!Ocsipersist_settings.delay_loadingthen(Ocsipersist_settings.inch:=Lwt.map(Lwt_io.of_fd~mode:Lwt_io.input)indescr;Ocsipersist_settings.outch:=Lwt.map(Lwt_io.of_fd~mode:Lwt_io.output)indescr)elseletr=Lwt_main.runindescrinOcsipersist_settings.inch:=Lwt.return(Lwt_io.of_fd~mode:Lwt_io.inputr);Ocsipersist_settings.outch:=Lwt.return(Lwt_io.of_fd~mode:Lwt_io.outputr);Lwt_log.ign_warning~section"...Initialization complete"