Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file map_reduce.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565openCoreopenAsyncmoduleHalf_open_interval=structmoduleT=structtypet=int*int[@@derivingsexp]letcreate_exnlu=ifl>=uthenfailwiths"Lower bound must be less than upper bound"(l,u)[%sexp_of:int*int];l,u;;letlboundt=fsttletuboundt=sndtletintersectst1t2=lboundt1<uboundt2&&lboundt2<uboundt1letcomparet1t2=ift1=t2then0else(ifintersectst1t2thenfailwiths"Cannot compare unequal intersecting intervals"(t1,t2)[%sexp_of:t*t];Int.compare(lboundt1)(lboundt2));;endincludeTincludeComparable.Make(T)endletappend_indexreader=letindex=ref0inPipe.mapreader~f:(funitem->letitem_index=!indexinindex:=!index+1;item,item_index);;typepacked_remote=Packed_remote:'remoteRemote_executable.t->packed_remotemoduleConfig=structtypet={local:int;remote:(packed_remote*int)list;cd:stringoption;redirect_stderr:[`Dev_null|`File_appendofstring];redirect_stdout:[`Dev_null|`File_appendofstring]}letdefault_cores()=(ok_exnLinux_ext.cores)()letcreate?(local=0)?(remote=[])?cd~redirect_stderr~redirect_stdout()=letlocal,remote=iflocal=0&&List.is_emptyremotethendefault_cores(),remoteelselocal,remotein{local;remote=List.mapremote~f:(fun(remote,n)->Packed_remoteremote,n);cd;redirect_stderr;redirect_stdout};;end(* Wrappers for generic worker *)moduletypeWorker=sigtypettypeparam_typetyperun_input_typetyperun_output_typevalspawn_config_exn:Config.t->param_type->tlistDeferred.tvalrun_exn:t->run_input_type->run_output_typeDeferred.tvalshutdown_exn:t->unitDeferred.tendmoduletypeRpc_parallel_worker_spec=sigtypestate_typemoduleParam:BinablemoduleRun_input:BinablemoduleRun_output:Binablevalinit:Param.t->state_typeDeferred.tvalexecute:state_type->Run_input.t->Run_output.tDeferred.tendmoduleMake_rpc_parallel_worker(S:Rpc_parallel_worker_spec)=structmoduleParallel_worker=structmoduleT=structtype'workerfunctions={execute:('worker,S.Run_input.t,S.Run_output.t)Parallel.Function.t}moduleWorker_state=structtypeinit_arg=S.Param.t[@@derivingbin_io]typet=S.state_typeendmoduleConnection_state=structtypeinit_arg=unit[@@derivingbin_io]typet=unitendmoduleFunctions(C:Parallel.Creatorwithtypeworker_state:=Worker_state.tandtypeconnection_state:=Connection_state.t)=structletexecute=C.create_rpc~f:(fun~worker_state~conn_state:()->S.executeworker_state)~bin_input:S.Run_input.bin_t~bin_output:S.Run_output.bin_t();;letfunctions={execute}letinit_worker_state=S.initletinit_connection_state~connection:_~worker_state:_()=return()endendincludeParallel.Make(T)endtypet=Parallel_worker.Connection.ttypeparam_type=S.Param.ttyperun_input_type=S.Run_input.ttyperun_output_type=S.Run_output.tletspawn_exnwhereparam?cd~redirect_stderr~redirect_stdout=Parallel_worker.spawn_exn~where?cd~shutdown_on:Disconnect~redirect_stderr~redirect_stdoutparam~on_failure:Error.raise~connection_state_init_arg:();;letspawn_config_exn{Config.local;remote;cd;redirect_stderr;redirect_stdout}param=iflocal<0thenfailwiths"config.local must be nonnegative"localInt.sexp_of_t;(matchList.findremote~f:(fun(_remote,n)->n<0)with|Someremote->failwiths"remote number of workers must be nonnegative"(sndremote)Int.sexp_of_t|None->());iflocal=0&¬(List.existsremote~f:(fun(_remote,n)->n>0))thenfailwiths"total number of workers must be positive"(local,List.mapremote~f:snd)[%sexp_of:int*intlist];letspawn_nwheren=Deferred.List.initn~f:(fun_i->spawn_exnwhereparam?cd~redirect_stderr:(redirect_stderr:>Fd_redirection.t)~redirect_stdout:(redirect_stdout:>Fd_redirection.t))inlet%maplocal_workers,remote_workers=Deferred.both(spawn_nLocallocal)(Deferred.List.concat_map~how:`Parallelremote~f:(fun(Packed_remoteremote,n)->spawn_n(Remoteremote)n))inlocal_workers@remote_workers;;letrun_exntinput=Parallel_worker.Connection.run_exnt~f:Parallel_worker.functions.execute~arg:input;;letshutdown_exnconn=Parallel_worker.Connection.closeconnend(* Map *)moduletypeMap_function=sigmoduleParam:BinablemoduleInput:BinablemoduleOutput:BinablemoduleWorker:Workerwithtypeparam_type=Param.twithtyperun_input_type=Input.twithtyperun_output_type=Output.tendmoduletypeMap_function_with_init_spec=sigtypestate_typemoduleParam:BinablemoduleInput:BinablemoduleOutput:Binablevalinit:Param.t->state_typeDeferred.tvalmap:state_type->Input.t->Output.tDeferred.tendmoduleMake_map_function_with_init(S:Map_function_with_init_spec)=structmoduleParam=S.ParammoduleInput=S.InputmoduleOutput=S.OutputmoduleWorker=Make_rpc_parallel_worker(structtypestate_type=S.state_typemoduleParam=ParammoduleRun_input=InputmoduleRun_output=Outputletinit=S.initletexecute=S.mapend)endmoduletypeMap_function_spec=sigmoduleInput:BinablemoduleOutput:Binablevalmap:Input.t->Output.tDeferred.tendmoduleMake_map_function(S:Map_function_spec)=Make_map_function_with_init(structtypestate_type=unitmoduleParam=structtypet=unit[@@derivingbin_io]endmoduleInput=S.InputmoduleOutput=S.Outputletinit=returnletmap()=S.mapend)(* Map-combine *)moduletypeMap_reduce_function=sigmoduleParam:BinablemoduleAccum:BinablemoduleInput:BinablemoduleWorker:Workerwithtypeparam_type=Param.twithtyperun_input_type=[`MapofInput.t|`CombineofAccum.t*Accum.t|`Map_right_combineofAccum.t*Input.t(* combine accum (map input) *)]withtyperun_output_type=Accum.tendmoduletypeMap_reduce_function_with_init_spec=sigtypestate_typemoduleParam:BinablemoduleAccum:BinablemoduleInput:Binablevalinit:Param.t->state_typeDeferred.tvalmap:state_type->Input.t->Accum.tDeferred.tvalcombine:state_type->Accum.t->Accum.t->Accum.tDeferred.tendmoduleMake_map_reduce_function_with_init(S:Map_reduce_function_with_init_spec)=structmoduleParam=S.ParammoduleAccum=S.AccummoduleInput=S.InputmoduleWorker=Make_rpc_parallel_worker(structtypestate_type=S.state_typemoduleParam=ParammoduleRun_input=structtypet=[`MapofInput.t|`CombineofAccum.t*Accum.t|`Map_right_combineofAccum.t*Input.t][@@derivingbin_io]endmoduleRun_output=Accumletinit=S.initletexecutestate=function|`Mapinput->S.mapstateinput|`Combine(accum1,accum2)->S.combinestateaccum1accum2|`Map_right_combine(accum1,input)->let%bindaccum2=S.mapstateinputinS.combinestateaccum1accum2;;end)endmoduletypeMap_reduce_function_spec=sigmoduleAccum:BinablemoduleInput:Binablevalmap:Input.t->Accum.tDeferred.tvalcombine:Accum.t->Accum.t->Accum.tDeferred.tendmoduleMake_map_reduce_function(S:Map_reduce_function_spec)=Make_map_reduce_function_with_init(structtypestate_type=unitmoduleParam=structtypet=unit[@@derivingbin_io]endmoduleAccum=S.AccummoduleInput=S.Inputletinit=returnletmap()=S.mapletcombine()=S.combineend)letmap_unordered(typeparamab)configinput_reader~m~(param:param)=letmoduleMap_function=(valm:Map_functionwithtypeParam.t=paramandtypeInput.t=aandtypeOutput.t=b)inlet%bindworkers=Map_function.Worker.spawn_config_exnconfigparaminletinput_with_index_reader=append_indexinput_readerinletoutput_reader,output_writer=Pipe.create()inletconsumer=Pipe.add_consumerinput_with_index_reader~downstream_flushed:(fun()->Pipe.downstream_flushedoutput_writer)inletrecmap_loopworker=match%bindPipe.read~consumerinput_with_index_readerwith|`Eof->Map_function.Worker.shutdown_exnworker|`Ok(input,index)->let%bindoutput=Map_function.Worker.run_exnworkerinputinlet%bind()=Pipe.writeoutput_writer(output,index)inmap_loopworkerindon't_wait_for(let%map()=Deferred.all_unit(List.mapworkers~f:map_loop)inPipe.closeoutput_writer);returnoutput_reader;;letmapconfiginput_reader~m~param=let%bindmapped_reader=map_unorderedconfiginput_reader~m~paraminletnew_reader,new_writer=Pipe.create()inletexpecting_index=ref0inletout_of_order_output=Heap.create~cmp:(fun(_,index1)(_,index2)->Int.compareindex1index2)()in(* Pops in-order output until we reach a gap. *)letrecwrite_out_of_order_output()=matchHeap.topout_of_order_outputwith|Some(output,index)whenindex=!expecting_index->expecting_index:=!expecting_index+1;Heap.remove_topout_of_order_output;let%bind()=Pipe.writenew_writeroutputinwrite_out_of_order_output()|_->Deferred.unitindon't_wait_for(let%map()=Pipe.itermapped_reader~f:(fun((output,index)asoutput_and_index)->ifindex=!expecting_indexthen(expecting_index:=!expecting_index+1;let%bind()=Pipe.writenew_writeroutputinwrite_out_of_order_output())elseifindex>!expecting_indexthen(Heap.addout_of_order_outputoutput_and_index;Deferred.unit)elseassertfalse)inPipe.closenew_writer);returnnew_reader;;letfind_map(typeparamab)configinput_reader~m~(param:param)=letmoduleMap_function=(valm:Map_functionwithtypeParam.t=paramandtypeInput.t=aandtypeOutput.t=boption)inlet%bindworkers=Map_function.Worker.spawn_config_exnconfigparaminletfound_value=refNoneinletrecfind_loopworker=match%bindPipe.readinput_readerwith|`Eof->Map_function.Worker.shutdown_exnworker|`Okinput->(* Check result and exit early if we've found something. *)(match!found_valuewith|Some_->Map_function.Worker.shutdown_exnworker|None->(match%bindMap_function.Worker.run_exnworkerinputwith|Somevalue->found_value:=Somevalue;Map_function.Worker.shutdown_exnworker|None->find_loopworker))inlet%map()=Deferred.all_unit(List.mapworkers~f:find_loop)in!found_value;;letmap_reduce_commutative(typeparamaaccum)configinput_reader~m~(param:param)=letmoduleMap_reduce_function=(valm:Map_reduce_functionwithtypeParam.t=paramandtypeInput.t=aandtypeAccum.t=accum)inlet%bindworkers=Map_reduce_function.Worker.spawn_config_exnconfigparaminletrecmap_and_combine_loopworkeracc=match%bindPipe.readinput_readerwith|`Eof->returnacc|`Okinput->let%bindacc=matchaccwith|Someacc->Map_reduce_function.Worker.run_exnworker(`Map_right_combine(acc,input))|None->Map_reduce_function.Worker.run_exnworker(`Mapinput)inmap_and_combine_loopworker(Someacc)inletcombined_acc=refNoneinletreccombine_loopworkeracc=match!combined_accwith|Someother_acc->combined_acc:=None;Map_reduce_function.Worker.run_exnworker(`Combine(other_acc,acc))>>=combine_loopworker|None->combined_acc:=Someacc;Map_reduce_function.Worker.shutdown_exnworkerinlet%map()=Deferred.all_unit(List.mapworkers~f:(funworker->match%bindmap_and_combine_loopworkerNonewith|Someacc->combine_loopworkeracc|None->Map_reduce_function.Worker.shutdown_exnworker))in!combined_acc;;letmap_reduce(typeparamaaccum)configinput_reader~m~(param:param)=letmoduleMap_reduce_function=(valm:Map_reduce_functionwithtypeParam.t=paramandtypeInput.t=aandtypeAccum.t=accum)inlet%bindworkers=Map_reduce_function.Worker.spawn_config_exnconfigparaminletinput_with_index_reader=append_indexinput_readerinletmoduleH=Half_open_intervalinletacc_map=refH.Map.emptyinletreccombine_loopworkerkeyacc(dir:[`Left|`Left_nothing_right|`Right|`Right_nothing_left])=matchdirwith|(`Left|`Left_nothing_right)asdir'->(matchH.Map.closest_key!acc_map`Less_thankeywith|Some(left_key,left_acc)whenH.uboundleft_key=H.lboundkey->(* combine acc_{left_lbound, left_ubound} acc_{this_lbound, this_ubound}
-> acc_{left_lbound, this_ubound} *)(* We need to remove both nodes from the tree to indicate that we are working on
combining them. *)acc_map:=H.Map.remove(H.Map.remove!acc_mapkey)left_key;let%bindnew_acc=Map_reduce_function.Worker.run_exnworker(`Combine(left_acc,acc))inletnew_key=H.create_exn(H.lboundleft_key)(H.uboundkey)inacc_map:=H.Map.set!acc_map~key:new_key~data:new_acc;(* Continue searching in the same direction. (See above comment.) *)combine_loopworkernew_keynew_acc`Left|_->(matchdir'with|`Left->combine_loopworkerkeyacc`Right_nothing_left|`Left_nothing_right->Deferred.unit))|(`Right|`Right_nothing_left)asdir'->(matchH.Map.closest_key!acc_map`Greater_thankeywith|Some(right_key,right_acc)whenH.lboundright_key=H.uboundkey->(* combine acc_{this_lbound, this_ubound} acc_{right_lbound, right_ubound}
-> acc_{this_lbound, right_ubound} *)acc_map:=H.Map.remove(H.Map.remove!acc_mapkey)right_key;let%bindnew_acc=Map_reduce_function.Worker.run_exnworker(`Combine(acc,right_acc))inletnew_key=H.create_exn(H.lboundkey)(H.uboundright_key)inacc_map:=H.Map.set!acc_map~key:new_key~data:new_acc;combine_loopworkernew_keynew_acc`Right|_->(matchdir'with|`Right->combine_loopworkerkeyacc`Left_nothing_right|`Right_nothing_left->Deferred.unit))inletrecmap_and_combine_loopworker=match%bindPipe.readinput_with_index_readerwith|`Eof->Map_reduce_function.Worker.shutdown_exnworker|`Ok(input,index)->letkey=H.create_exnindex(index+1)inlet%bind()=matchH.Map.closest_key!acc_map`Less_thankeywith|Some(left_key,left_acc)whenH.uboundleft_key=H.lboundkey->(* combine acc_{left_lbound, left_ubound} (map a_index)
-> acc_{left_lbound, index + 1} *)acc_map:=H.Map.remove!acc_mapleft_key;let%bindacc=Map_reduce_function.Worker.run_exnworker(`Map_right_combine(left_acc,input))inletkey=H.create_exn(H.lboundleft_key)(H.uboundkey)inacc_map:=H.Map.set!acc_map~key~data:acc;combine_loopworkerkeyacc`Left|_->(* map a_index -> acc_{index, index + 1} *)let%bindacc=Map_reduce_function.Worker.run_exnworker(`Mapinput)inacc_map:=H.Map.set!acc_map~key~data:acc;combine_loopworkerkeyacc`Leftinmap_and_combine_loopworkerinlet%map()=Deferred.all_unit(List.mapworkers~f:map_and_combine_loop)inassert(Map.length!acc_map<=1);Option.map(Map.min_elt!acc_map)~f:snd;;