package containers-thread

  1. Overview
  2. Docs

Source file CCBlockingQueue.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
(* This file is free software, part of containers. See file "license" for more details. *)

(** {1 Blocking Queue} *)

type 'a t = {
  q: 'a Queue.t;
  lock: Mutex.t;
  cond: Condition.t;
  capacity: int;
  mutable size: int;
}

let create n =
  if n < 1 then invalid_arg "BloquingQueue.create";
  let q =
    {
      q = Queue.create ();
      lock = Mutex.create ();
      cond = Condition.create ();
      capacity = n;
      size = 0;
    }
  in
  q

let incr_size_ q =
  assert (q.size < q.capacity);
  q.size <- q.size + 1

let decr_size_ q =
  assert (q.size > 0);
  q.size <- q.size - 1

let finally_ f x ~h =
  try
    let res = f x in
    ignore (h ());
    res
  with e ->
    ignore (h ());
    raise e

let with_lock_ q f =
  Mutex.lock q.lock;
  finally_ f () ~h:(fun () -> Mutex.unlock q.lock)

let push q x =
  with_lock_ q (fun () ->
      while q.size = q.capacity do
        Condition.wait q.cond q.lock
      done;
      assert (q.size < q.capacity);
      Queue.push x q.q;
      (* if there are blocked receivers, awake one of them *)
      incr_size_ q;
      Condition.broadcast q.cond)

let take q =
  with_lock_ q (fun () ->
      while q.size = 0 do
        Condition.wait q.cond q.lock
      done;
      let x = Queue.take q.q in
      (* if there are blocked senders, awake one of them *)
      decr_size_ q;
      Condition.broadcast q.cond;
      x)

let push_list q l =
  (* push elements until it's not possible.
     Assumes the lock is acquired. *)
  let rec push_ q l =
    match l with
    | [] -> l
    | _ :: _ when q.size = q.capacity -> l (* no room remaining *)
    | x :: tl ->
      Queue.push x q.q;
      incr_size_ q;
      push_ q tl
  in
  (* push chunks of [l] in [q] until [l] is empty *)
  let rec aux q l =
    match l with
    | [] -> ()
    | _ :: _ ->
      let l =
        with_lock_ q (fun () ->
            while q.size = q.capacity do
              Condition.wait q.cond q.lock
            done;
            let l = push_ q l in
            Condition.broadcast q.cond;
            l)
      in
      aux q l
  in
  aux q l

let take_list q n =
  (* take at most [n] elements of [q] and prepend them to [acc] *)
  let rec pop_ acc q n =
    if n = 0 || Queue.is_empty q.q then
      acc, n
    else (
      (* take next element *)
      let x = Queue.take q.q in
      decr_size_ q;
      pop_ (x :: acc) q (n - 1)
    )
  in
  (* call [pop_] until [n] elements have been gathered *)
  let rec aux acc q n =
    if n = 0 then
      List.rev acc
    else (
      let acc, n =
        with_lock_ q (fun () ->
            while q.size = 0 do
              Condition.wait q.cond q.lock
            done;
            let acc, n = pop_ acc q n in
            Condition.broadcast q.cond;
            acc, n)
      in
      aux acc q n
    )
  in
  aux [] q n

let try_take q =
  with_lock_ q (fun () ->
      if q.size = 0 then
        None
      else (
        decr_size_ q;
        Some (Queue.take q.q)
      ))

let try_push q x =
  with_lock_ q (fun () ->
      if q.size = q.capacity then
        false
      else (
        incr_size_ q;
        Queue.push x q.q;
        Condition.signal q.cond;
        true
      ))

let peek q =
  with_lock_ q (fun () -> try Some (Queue.peek q.q) with Queue.Empty -> None)

let size q = with_lock_ q (fun () -> q.size)
let capacity q = q.capacity