package rpc_parallel

  1. Overview
  2. Docs
Legend:
Library
Module
Module type
Parameter
Class
Class type

A parallel map/reduce library. See examples/add_numbers.ml and examples/number_stats.ml for examples.

module Config : sig ... end
module type Worker = sig ... end
Map functions

Map_function modules must be created using the Make_map_function or Make_map_function_with_init functors. The init variety allows you to specify an init function that takes a "param" argument. The non-init variety is equivalent to the init variety with init equal to return and a unit "param" argument. Similarly, Map_reduce_function modules must be created using the Make_map_combine_function or Make_map_combine_function_with_init functors.

module type Map_function = sig ... end
module type Map_function_with_init_spec = sig ... end
module type Map_function_spec = sig ... end
module Make_map_function (S : Map_function_spec) : Map_function with type Param.t = unit and type Input.t = S.Input.t and type Output.t = S.Output.t
val map_unordered : Config.t -> 'a Async.Pipe.Reader.t -> m: (module Map_function with type Input.t = 'a and type Output.t = 'b and type Param.t = 'param) -> param:'param -> ('b * int) Async.Pipe.Reader.t Async.Deferred.t

The map_unordered operation takes 'a Pipe.Reader.t along with a Map_function and sends the 'a values to workers for mapping. Each pair in the resulting ('b * int) Pipe.Reader.t contains the mapped value and the index of the value in the input pipe.

val map : Config.t -> 'a Async.Pipe.Reader.t -> m: (module Map_function with type Input.t = 'a and type Output.t = 'b and type Param.t = 'param) -> param:'param -> 'b Async.Pipe.Reader.t Async.Deferred.t

The map operation is similar to map_unordered, but the result is a 'b Pipe.Reader.t where the mapped values are guaranteed to be in the same order as the input values.

val find_map : Config.t -> 'a Async.Pipe.Reader.t -> m: (module Map_function with type Input.t = 'a and type Output.t = 'b option and type Param.t = 'param) -> param:'param -> 'b option Async.Deferred.t

The find_map operation takes 'a Pipe.Reader.t along with a Map_function that returns 'b option values. As soon as map returns Some value, all workers are stopped and Some value is returned. If map never returns Some value then None is returned. If more than one worker returns Some value, one value is chosen arbitrarily and returned.

Map-reduce

functions

module type Map_reduce_function = sig ... end
module type Map_reduce_function_with_init_spec = sig ... end
module type Map_reduce_function_spec = sig ... end
val map_reduce_commutative : Config.t -> 'a Async.Pipe.Reader.t -> m: (module Map_reduce_function with type Accum.t = 'accum and type Input.t = 'a and type Param.t = 'param) -> param:'param -> 'accum option Async.Deferred.t

The map_reduce_commutative operation takes 'a Pipe.Reader.t along with a Map_reduce_function and applies the map function to 'a values (in an unspecified order), resulting in 'accum values. The combine function is then called to combine the 'accum values (in an unspecified order) into a single 'accum value. Commutative map-reduce assumes that combine is associative and commutative.

val map_reduce : Config.t -> 'a Async.Pipe.Reader.t -> m: (module Map_reduce_function with type Accum.t = 'accum and type Input.t = 'a and type Param.t = 'param) -> param:'param -> 'accum option Async.Deferred.t

The map_reduce operation makes strong guarantees about the order in which the values are processed by combine. For a list a_0, a_1, a_2, ..., a_n of 'a values, the noncommutative map-reduce operation applies the map function to produce acc_{i,i+1} from each a_i. The combine function is used to compute combine acc_{i,j} acc_{j,k} for i<j<k, producing acc_{i,k}. The map and combine functions are called repeatedly until the entire list is reduced to a single acc_{0,n+1} value. Noncommutative map-reduce assumes that combine is associative.