Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file lwt_pipe.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)openLwt.Infixtype'at={queue:(int*'a)Queue.t;mutablecurrent_size:int;max_size:int;compute_size:'a->int;mutableclosed:bool;mutablepush_waiter:(unitLwt.t*unitLwt.u)option;mutablepop_waiter:(unitLwt.t*unitLwt.u)option;empty:unitLwt_condition.t;}letis_closed{closed;_}=closedletpush_overhead=4*(Sys.word_size/8)letcreate?size()=let(max_size,compute_size)=matchsizewith|None->(max_int,fun_->0)|Some(max_size,compute_size)->(max_size,compute_size)in{queue=Queue.create();current_size=0;max_size;compute_size;closed=false;push_waiter=None;pop_waiter=None;empty=Lwt_condition.create();}letnotify_pushq=matchq.push_waiterwith|None->()|Some(_,w)->q.push_waiter<-None;Lwt.wakeup_laterw()letnotify_popq=matchq.pop_waiterwith|None->()|Some(_,w)->q.pop_waiter<-None;Lwt.wakeup_laterw()letwait_pushq=matchq.push_waiterwith|Some(t,_)->Lwt.protectedt|None->let(waiter,wakener)=Lwt.wait()inq.push_waiter<-Some(waiter,wakener);Lwt.protectedwaiterletwait_popq=matchq.pop_waiterwith|Some(t,_)->Lwt.protectedt|None->let(waiter,wakener)=Lwt.wait()inq.pop_waiter<-Some(waiter,wakener);Lwt.protectedwaiterletlength{queue;_}=Queue.lengthqueueletis_empty{queue;_}=Queue.is_emptyqueueletrecemptyq=ifis_emptyqthenLwt.return_unitelseLwt_condition.waitq.empty>>=fun()->emptyqexceptionClosedletrecpush({closed;queue;current_size;max_size;compute_size;_}asq)elt=letelt_size=compute_sizeeltinifclosedthenLwt.failClosedelseifcurrent_size+elt_size<max_size||Queue.is_emptyqueuethen(Queue.push(elt_size,elt)queue;q.current_size<-current_size+elt_size;notify_pushq;Lwt.return_unit)elsewait_popq>>=fun()->pushqeltletpush_now({closed;queue;compute_size;current_size;max_size;_}asq)elt=ifclosedthenraiseClosed;letelt_size=compute_sizeeltin(current_size+elt_size<max_size||Queue.is_emptyqueue)&&(Queue.push(elt_size,elt)queue;q.current_size<-current_size+elt_size;notify_pushq;true)letrecpop({closed;queue;empty;current_size;_}asq)=ifnot(Queue.is_emptyqueue)then(let(elt_size,elt)=Queue.popqueueinnotify_popq;q.current_size<-current_size-elt_size;ifQueue.lengthqueue=0thenLwt_condition.signalempty();Lwt.returnelt)elseifclosedthenLwt.failClosedelsewait_pushq>>=fun()->popqletrecpop_with_timeouttimeoutq=ifnot(Queue.is_emptyq.queue)then(Lwt.canceltimeout;popq>>=Lwt.return_some)elseifLwt.is_sleepingtimeoutthenifq.closedthen(Lwt.canceltimeout;Lwt.failClosed)elseletwaiter=wait_pushqinLwt.pick[timeout;Lwt.protectedwaiter]>>=fun()->pop_with_timeouttimeoutqelseLwt.return_noneletrecpeek({closed;queue;_}asq)=ifnot(Queue.is_emptyqueue)thenlet(_elt_size,elt)=Queue.peekqueueinLwt.returneltelseifclosedthenLwt.failClosedelsewait_pushq>>=fun()->peekqletpeek_all{queue;closed;_}=ifclosedthen[]elseList.rev(Queue.fold(funacc(_,e)->e::acc)[]queue)letpop_now({closed;queue;empty;current_size;_}asq)=(* We only check for closed-ness when the queue is empty to allow reading from
a closed pipe. This is because closing is just closing the write-end of the
pipe. *)ifQueue.is_emptyqueue&&closedthenraiseClosed;Queue.take_optqueue|>Stdlib.Option.map(fun(elt_size,elt)->ifQueue.lengthqueue=0thenLwt_condition.signalempty();q.current_size<-current_size-elt_size;notify_popq;elt)letrecpop_all_loopqacc=matchpop_nowqwith|None->List.revacc|Somee->pop_all_loopq(e::acc)letpop_allq=popq>>=fune->Lwt.return(pop_all_loopq[e])letpop_all_nowq=pop_all_loopq[]letcloseq=ifnotq.closedthen(q.closed<-true;notify_pushq;notify_popq)