Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file threadext.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412(*
* Copyright (C) 2006-2009 Citrix Systems Inc.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation; version 2.1 only. with the special
* exception on linking described in file LICENSE.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*)moduleMutex=structincludeMutex(** execute the function f with the mutex hold *)letexecutelockf=Mutex.locklock;Xapi_stdext_pervasives.Pervasiveext.finallyf(fun()->Mutex.unlocklock)endmoduleAlarm=structtypet={token:Mutex.t;mutablequeue:(float*(unit->unit))list;mutablenotifier:(Unix.file_descr*Unix.file_descr)option;}letcreate()={token=Mutex.create();queue=[];notifier=None;}letglobal_alarm=create()letrecwatchalarm=matchalarm.notifierwith|None->assertfalse|Some(pipe_in,pipe_out)->whileThread.wait_timed_readpipe_in0.doignore(Unix.readpipe_in(Bytes.create1)01)done;letnext=Mutex.executealarm.token(fun()->letnow=Unix.time()inletnqueue=List.filter(fun(clock,callback)->(* Create helper thread in case callback could block us *)clock>now||(let_=Thread.createcallback()infalse))alarm.queueinalarm.queue<-nqueue;matchnqueuewith|[]->Unix.closepipe_out;Unix.closepipe_in;alarm.notifier<-None;None|(c,_)::_->Somec)inmatchnextwith|None->Thread.exit()|Somec->letnow=Unix.time()inifc>nowthenignore(Thread.wait_timed_readpipe_in(c-.now));watchalarmletregister?(alarm=global_alarm)timecallback=Mutex.executealarm.token(fun()->letnqueue=(time,callback)::alarm.queueinalarm.queue<-List.sort(funx1x2->compare(fstx1)(fstx2))nqueue;matchalarm.notifierwith|Some(_,pipe_out)->ignore(Unix.writepipe_out(Bytes.of_string"X")01)|None->letpipe_in,pipe_out=Unix.pipe()inalarm.notifier<-Some(pipe_in,pipe_out);ignore(Thread.createwatchalarm))endmoduleThread=structtypet=|RunningofThread.t|Pendingofpthreadandpthread=float*int*Thread.tlazy_ttypeschedule=Now|Timeoutoffloat|Indefinitetypepolicy=|AlwaysRun|MaxCapacityofint*floatoption|WaitConditionof(unit->schedule)letcount=ref0modulePQueue=Set.Make(structtypet=pthreadletcompare=compareend)letrunning=ref0letpqueue=refPQueue.empty(* This info can be deduced from pqueue, but having a specific int val allow
us to inspect it with lower cost and be lock free *)letpending=ref0letrunning_threads()=!runningletpending_threads()=!pendingletscheduler_token=Mutex.create()letpolicy=refAlwaysRun(* Should be protected by scheduler_token *)letrun_thread((_,_,pt)ast)=(* Might have run by other scheduling policy *)ifPQueue.memt!pqueuethen(pqueue:=PQueue.removet!pqueue;decrpending);ifnot(Lazy.is_valpt)thenlet_=Lazy.forceptinincrrunningletfake_pivot=max_float,0,lazy(Thread.createignore())letpivot=reffake_pivotletpre_pivot=refmax_int(* Should be protected by scheduler_token, this could be triggered either
because a thread finishes running and hence possibly provide an running
slot, or the scheduling policy has been updated hence more oppotunities
appear. *)letrecrun_pendings()=ifnot(PQueue.is_empty!pqueue)thenletnow=Unix.time()inlet(c,_,_)ast=PQueue.min_elt!pqueuein(* Just in case policy has been changed *)letto_run=match!policywith|AlwaysRun->true|MaxCapacity(max_threads,_)->c<=now||!running<max_threads|WaitConditionf->f()=Nowinifto_runthen(run_threadt;run_pendings())else(* extra logic to avoid starvation or wrongly programmed deadlock *)lettimeouts,exist,indefs=PQueue.split!pivot!pqueueinifnotexist||(PQueue.cardinaltimeouts>=!pre_pivot&&(run_thread!pivot;true))thenpivot:=ifPQueue.is_emptyindefsthenfake_pivotelsePQueue.min_eltindefs;pre_pivot:=PQueue.cardinaltimeoutsletexit()=Mutex.executescheduler_token(fun()->decrrunning;run_pendings());Thread.exit()letset_policyp=Mutex.executescheduler_token(fun()->policy:=p;run_pendings())letcreate?(schedule=Indefinite)fx=letfinally=Xapi_stdext_pervasives.Pervasiveext.finallyinletf'x=finally(fun()->fx)exitinMutex.executescheduler_token(fun()->run_pendings();lettimeout=matchschedulewith|Now->0.|Timeoutt->t|Indefinite->max_floatinlettimeout=iftimeout=0.then0.elsematch!policywith|AlwaysRun->0.|MaxCapacity(max_threads,max_wait_opt)->if!running<max_threads&&PQueue.is_empty!pqueuethen0.elsebeginmatchmax_wait_optwith|None->timeout|Somet->mintimeouttend|WaitConditionf->matchf()with|Now->0.|Timeoutt->minttimeout|Indefinite->timeoutiniftimeout<=0.thenlett=Thread.createf'xinincrrunning;Runningtelseletdeadline=iftimeout<max_floatthentimeout+.Unix.time()elsemax_floatinletpt=lazy(Thread.createf'x)inincrcount;if!count=max_intthencount:=0;lett=(deadline,!count,pt)inpqueue:=PQueue.addt!pqueue;incrpending;ifdeadline<max_floatthenAlarm.registerdeadline(fun()->Mutex.executescheduler_token(fun()->run_threadt));(* It's fine that a pended thread might get scheduled later on so
that the information held in 't' becomes meaningless. This is
comparable to the case that a Thread.t finishes running and its
thread id still exits.
*)Pendingt)letself()=(* When we get here, the thread must be running *)Running(Thread.self())letid=function|Runningt->Thread.idt|Pending(_,id,_)->(* Pending thread have a negative id to avoid overlapping with running
thread id *)-idletjoin=function|Runningt->Thread.joint|Pending((_,_,pt)ast)->ifnot(Lazy.is_valpt)thenbegin(* Give priority to those to be joined *)Mutex.executescheduler_token(fun()->run_threadt);assert(Lazy.is_valpt);end;Thread.join(Lazy.forcept)letkill=function|Runningt->(* Not implemented in stdlib *)Thread.killt|Pending((_,_,pt)ast)->ifLazy.is_valptthenThread.kill(Lazy.forcept)elseMutex.executescheduler_token(fun()->(* Just in case something happens before we grab the lock *)ifLazy.is_valptthenThread.kill(Lazy.forcept)else(pqueue:=PQueue.removet!pqueue;decrpending))letdelay=Thread.delayletexit=Thread.exitletwait_read=Thread.wait_readletwait_write=Thread.wait_writeletwait_timed_read=Thread.wait_timed_readletwait_timed_write=Thread.wait_timed_writeletwait_pid=Thread.wait_pidletselect=Thread.selectletyield=Thread.yieldletsigmask=Thread.sigmaskletwait_signal=Thread.wait_signalend(** create thread loops which periodically applies a function *)moduleThread_loop:functor(Tr:sigtypetvaldelay:unit->floatend)->sigvalstart:Tr.t->(unit->unit)->unitvalstop:Tr.t->unitvalupdate:Tr.t->(unit->unit)->unitend=functor(Tr:sigtypetvaldelay:unit->floatend)->structexceptionDone_loopletref_table:((Tr.t,(Mutex.t*Thread.t*boolref))Hashtbl.t)=Hashtbl.create1(** Create a thread which periodically applies a function to the
reference specified, and exits cleanly when removed *)letstartxreffn=letmut=Mutex.create()inletexit_var=reffalsein(* create thread which periodically applies the function *)lettid=Thread.create(fun()->trywhiletruedoThread.delay(Tr.delay());Mutex.executemut(fun()->if!exit_varthenraiseDone_loop;let()=fn()in());done;withDone_loop->();)()in(* create thread to manage the reference table and clean it up
safely once the delay thread is removed *)let_=Thread.create(fun()->Hashtbl.addref_tablexref(mut,tid,exit_var);Thread.jointid;List.iter(fun(_,t,_)->iftid=tthenHashtbl.removeref_tablexref)(Hashtbl.find_allref_tablexref))()in()(** Remove a reference from the thread table *)letstopxref=tryletmut,_,exit_ref=Hashtbl.findref_tablexrefinMutex.executemut(fun()->exit_ref:=true)withNot_found->()(** Replace a thread with another one *)letupdatexreffn=stopxref;startxreffnend(** Parallel List.iter. Remembers all exceptions and returns an association list mapping input x to an exception.
Applications of x which succeed will be missing from the returned list. *)letthread_iter_all_exnsfxs=letexns=ref[]inletm=Mutex.create()inList.iterThread.join(List.map(funx->Thread.create(fun()->tryfxwithe->Mutex.executem(fun()->exns:=(x,e)::!exns))())xs);!exns(** Parallel List.iter. Remembers one exception (at random) and throws it in the
error case. *)letthread_iterfxs=matchthread_iter_all_exnsfxswith|[]->()|(_,e)::_->raiseemoduleDelay=struct(* Concrete type is the ends of a pipe *)typet={(* A pipe is used to wake up a thread blocked in wait: *)mutablepipe_out:Unix.file_descroption;mutablepipe_in:Unix.file_descroption;(* Indicates that a signal arrived before a wait: *)mutablesignalled:bool;m:Mutex.t}letmake()={pipe_out=None;pipe_in=None;signalled=false;m=Mutex.create()}exceptionPre_signalledletwait(x:t)(seconds:float)=letfinally=Xapi_stdext_pervasives.Pervasiveext.finallyinletto_close=ref[]inletclose'fd=ifList.memfd!to_closethenUnix.closefd;to_close:=List.filter(funx->fd<>x)!to_closeinfinally(fun()->tryletpipe_out=Mutex.executex.m(fun()->ifx.signalledthenbeginx.signalled<-false;raisePre_signalled;end;letpipe_out,pipe_in=Unix.pipe()in(* these will be unconditionally closed on exit *)to_close:=[pipe_out;pipe_in];x.pipe_out<-Somepipe_out;x.pipe_in<-Somepipe_in;x.signalled<-false;pipe_out)inletr,_,_=Unix.select[pipe_out][][]secondsin(* flush the single byte from the pipe *)ifr<>[]thenignore(Unix.readpipe_out(Bytes.create1)01);(* return true if we waited the full length of time, false if we were woken *)r=[]withPre_signalled->false)(fun()->Mutex.executex.m(fun()->x.pipe_out<-None;x.pipe_in<-None;List.iterclose'!to_close))letsignal(x:t)=Mutex.executex.m(fun()->matchx.pipe_inwith|Somefd->ignore(Unix.writefd(Bytes.of_string"X")01)|None->x.signalled<-true(* If the wait hasn't happened yet then store up the signal *))endletkeep_alive()=whiletruedoThread.delay20000.done