package ocamlfuse

  1. Overview
  2. Docs

Source file Thread_pool.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
(*
  This file is part of the "OCamlFuse" library.

  OCamlFuse is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation (version 2 of the License).

  OCamlFuse is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with OCamlFuse.  See the file LICENSE.  If you haven't received
  a copy of the GNU General Public License, write to:

  Free Software Foundation, Inc.,
  59 Temple Place, Suite 330, Boston, MA
  02111-1307  USA

  Alessandro Strada

  alessandro.strada@gmail.com
*)

type t = {
  max_threads : int;
  lock : Mutex.t;
  condition : Condition.t;
  table : (int, Thread.t) Hashtbl.t;
}

let create ?(max_threads = 128) () =
  {
    max_threads;
    lock = Mutex.create ();
    condition = Condition.create ();
    table = Hashtbl.create max_threads;
  }

let signal_work_done thread_id pool =
  Mutex.lock pool.lock;
  try
    Hashtbl.remove pool.table thread_id;
    Condition.signal pool.condition;
    Mutex.unlock pool.lock
  with _ -> Mutex.unlock pool.lock

let add_work f x pool =
  Mutex.lock pool.lock;
  try
    while Hashtbl.length pool.table >= pool.max_threads do
      Condition.wait pool.condition pool.lock
    done;
    let f' x =
      let thread = Thread.self () in
      let thread_id = Thread.id thread in
      let _ = f x in
      signal_work_done thread_id pool
    in
    let thread = Thread.create f' x in
    let thread_id = Thread.id thread in
    Hashtbl.add pool.table thread_id thread;
    Mutex.unlock pool.lock
  with _ -> Mutex.unlock pool.lock

let shutdown pool = Hashtbl.iter (fun _ thread -> Thread.join thread) pool.table