package tezt

  1. Overview
  2. Docs

Running tasks on a set number of forked processes.

The main content of this module is:

  • a function add_task to add tasks to a queue;
  • a function run to run those tasks in separate worker processes;
  • a module Message to send messages to and from workers;
  • a module Timer to schedule delayed functions.

The scheduler (i.e. run) maintains a pool of workers. Workers receive tasks from the scheduler, execute them and send the result to the scheduler. Tasks can be given a limited amount of time to run. New tasks can be added (with add_task) while the scheduler is running in response to events such as:

  • a task being started;
  • a task finishing (successfully or not);
  • a message being received from a worker (i.e. from a running task);
  • the task queue becoming empty.

Examples of messages include:

  • logs: tasks can send log messages to the scheduler, so that the scheduler can write them in a single file with no interleaving;
  • global resource queries: workers can query a resource such as a free port number, and the scheduler can respond with this resource.

Messages are meant to be used while a task is running. The return value of a task is also sent to the scheduler as a message, but this is handled transparently.

Examples of use cases for this library include:

  • a test framework that wants to sandbox tests in parallel and in separate processes (e.g. to be able to recover from crashes, and to kill them if they take too long);
  • a build system that wants to compile multiple targets in parallel, and to add the reverse dependencies of a target as new tasks once this target is built (such as make -j).

Contexts

Message sending functions are supposed to be run from a specific context:

  • the worker process, which can send messages to and receive message from the scheduler;
  • or the scheduler process with a specific worker in mind, from which to receive from and to which to send to.

Those message sending functions take a value of type worker_context or scheduler_context as proof. (There are ways to leak the contexts to other contexts but it would be a programming error.)

type worker_context

Values used by message sending functions meant to be called from a worker.

type scheduler_context

Values used by message sending functions meant to be called from the scheduler.

val get_current_worker_context : unit -> worker_context option

Get the current worker context.

Returns None if not currently in a worker process.

Messages

module Message : sig ... end

Send and receive messages to and from workers.

Timers

module Timer : sig ... end

Call functions after a given amount of time.

Task Queue

val add_task : ?sigterm:int -> ?term_timeout:float -> ?kill_timeout:float -> ?on_term_timeout:(unit -> unit) -> ?on_kill_timeout:(unit -> unit) -> ?on_start:(scheduler_context -> unit) -> ?on_message:(scheduler_context -> Message.t -> unit) -> ?on_finish:(('a, string) result -> unit) -> 'a Message.typ -> (worker_context -> 'a) -> unit

Add a task to the queue.

Usage: add_task typ execute

typ is a type description for values returned by execute. When a worker is ready to execute this task, this worker will run execute. Note that execute is serialized to the worker using Marshal. If this closure captures some variables, those variables should thus be serializable using Marshal.

add_task can be called before run, or while run is running (i.e. from an event handler like on_start, on_message, on_finish; or the on_empty_queue argument of run).

If term_timeout is specified, sigterm is sent to the worker if the task has not finished (successfully or not) after term_timeout seconds. sigterm defaults to Sys.sigterm.

If kill_timeout is specified but term_timeout is not, SIGKILL is sent to the worker if the task has not finished (successfully or not) after kill_timeout seconds.

If both term_timeout and kill_timeout are specified, sigterm is sent first, and if the task is not willing to end gracefully kill_timeout seconds after sigterm was sent, SIGKILL is sent as well. Note that in that case, kill_timeout is relative to the time sigterm was sent, not to the time the task started.

on_start is triggered when the task is sent to a worker. It takes a scheduler_context argument that allows to send a message to this worker, typically with additional information that was not known at the time the task was queued, such as a free port number that the worker can use.

on_message is triggered for each message that is sent from the worker. It also takes a scheduler_context argument to be able to respond.

on_finish is triggered with:

  • Ok result when the task returns successfully, in which case result is the return value of execute;
  • Error error_message when the task fails, in which case error_message can be the result of Printexc.to_string (if execute raised an exception) or something else (e.g. if the worker died).
val clear : unit -> unit

Clear the queue of tasks.

This has no effect on tasks that are already running, because they have been removed from the queue.

Main Loop

val run : ?worker_idle_timeout:float -> ?worker_kill_timeout:float -> ?on_worker_kill_timeout:(unit -> unit) -> ?on_empty_queue:(unit -> unit) -> ?on_message:(Message.t -> unit) -> ?on_unexpected_worker_exit:(Unix.process_status -> unit) -> fork:(unit -> int) -> int -> unit

Run tasks until the queue is empty.

on_empty_queue is called when a worker is available and the task queue is empty. It can in particular use add_task to fill the queue.

on_message is called when a worker emits a message while not executing a task. This can happen in particular if you use at_exit. Messages received from a worker which is running a task are passed to the on_message of the corresponding add_task call instead.

When a worker exits:

  • if it is running a task, the task fails (its on_finish is triggered with Error);
  • if it is not running a task and the queue is not empty, or if the exit code is not 0, on_unexpected_worker_exit is called so that you can emit a warning.

If worker_idle_timeout is specified, workers stop if they are not given any task after worker_idle_timeout seconds of doing nothing. This can be useful to prevent workers from running forever, although in general they should detect that the scheduler is dead by receiving end of file while trying to receive their next task.

If worker_kill_timeout is specified, send SIGKILL to workers if they are still running worker_kill_timeout seconds after they were told to stop. This only applies when they were told to stop because the task queue is empty. When this happens, on_worker_kill_timeout is called.

fork is supposed to be Unix.fork. But if tasks may use Lwt, it should be Lwt_unix.fork instead. You can also modify fork to run some code on each fork, for instance to initialize some global variables when a worker starts.

The last argument is the maximum number of tasks to run in parallel.

This function is blocking. It returns once:

  • no task is currently running;
  • the queue is empty (unless you called stop);
  • and no timer is currently active. In particular, it returns immediately if you never called add_task and Timer.on_delay.
val stop : unit -> unit

Stop the current run.

This function is meant to be called from the event handlers (on_ functions) of tasks. It does nothing if run is not currently running.

This function is not blocking. It causes run to stop starting new tasks. It also causes run to consider that all current tasks are passed their term_timeout, even if they do not actually have such a timeout, except that on_term_timeout is not triggered. In other words, all workers receive SIGTERM if they didn't already.

Calling add_task still adds tasks to the queue but they will not be started unless you call run again.

This function does not cancel timers.

Miscellaneous

val show_process_status : Unix.process_status -> string

Convert a process status to a human-readable string.

Example results:

  • "exited with code 0"
  • "was killed by SIGTERM"
  • "was stopped by unknown signal (-100)"
OCaml

Innovation. Community. Security.