package xapi-stdext-threads

  1. Overview
  2. Docs

Source file threadext.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
(*
 * Copyright (C) 2006-2009 Citrix Systems Inc.
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation; version 2.1 only. with the special
 * exception on linking described in file LICENSE.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *)

module M = Mutex

module Mutex = struct
  (** execute the function f with the mutex hold *)
  let execute lock f =
    Mutex.lock lock ;
    Xapi_stdext_pervasives.Pervasiveext.finally f (fun () -> Mutex.unlock lock)
end

(** Parallel List.iter. Remembers all exceptions and returns an association list mapping input x to an exception.
    Applications of x which succeed will be missing from the returned list. *)
let thread_iter_all_exns f xs =
  let exns = ref [] in
  let m = M.create () in
  List.iter Thread.join
    (List.map
       (fun x ->
         Thread.create
           (fun () ->
             try f x
             with e -> Mutex.execute m (fun () -> exns := (x, e) :: !exns)
           )
           ()
       )
       xs
    ) ;
  !exns

(** Parallel List.iter. Remembers one exception (at random) and throws it in the
    error case. *)
let thread_iter f xs =
  match thread_iter_all_exns f xs with [] -> () | (_, e) :: _ -> raise e

module Delay = struct
  (* Concrete type is the ends of a pipe *)
  type t = {
      (* A pipe is used to wake up a thread blocked in wait: *)
      mutable pipe_out: Unix.file_descr option
    ; mutable pipe_in: Unix.file_descr option
    ; (* Indicates that a signal arrived before a wait: *)
      mutable signalled: bool
    ; m: M.t
  }

  let make () =
    {pipe_out= None; pipe_in= None; signalled= false; m= M.create ()}

  exception Pre_signalled

  let wait (x : t) (seconds : float) =
    let finally = Xapi_stdext_pervasives.Pervasiveext.finally in
    let to_close = ref [] in
    let close' fd =
      if List.mem fd !to_close then Unix.close fd ;
      to_close := List.filter (fun x -> fd <> x) !to_close
    in
    finally
      (fun () ->
        try
          let pipe_out =
            Mutex.execute x.m (fun () ->
                if x.signalled then (
                  x.signalled <- false ;
                  raise Pre_signalled
                ) ;
                let pipe_out, pipe_in = Unix.pipe () in
                (* these will be unconditionally closed on exit *)
                to_close := [pipe_out; pipe_in] ;
                x.pipe_out <- Some pipe_out ;
                x.pipe_in <- Some pipe_in ;
                x.signalled <- false ;
                pipe_out
            )
          in
          let r, _, _ = Unix.select [pipe_out] [] [] seconds in
          (* flush the single byte from the pipe *)
          if r <> [] then ignore (Unix.read pipe_out (Bytes.create 1) 0 1) ;
          (* return true if we waited the full length of time, false if we were woken *)
          r = []
        with Pre_signalled -> false
      )
      (fun () ->
        Mutex.execute x.m (fun () ->
            x.pipe_out <- None ;
            x.pipe_in <- None ;
            List.iter close' !to_close
        )
      )

  let signal (x : t) =
    Mutex.execute x.m (fun () ->
        match x.pipe_in with
        | Some fd ->
            ignore (Unix.write fd (Bytes.of_string "X") 0 1)
        | None ->
            x.signalled <- true
        (* If the wait hasn't happened yet then store up the signal *)
    )
end