package amqp-client-async

  1. Overview
  2. Docs

Source file queue.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
open Thread
open Amqp_client_lib
open Spec.Queue

type t = { name: string }

let message_ttl v = "x-message-ttl", Types.VLonglong v
let auto_expire v = "x-expires", Types.VLonglong v
let max_length v = "x-max-length", Types.VLonglong v
let max_length_bytes v = "x-max-length-bytes", Types.VLonglong v
let dead_letter_exchange v = "x-dead-letter-exchange", Types.VLongstr v
let dead_letter_routing_key v = "x-dead-letter-routing-key", Types.VLongstr v
let maximum_priority v = "x-max-priority", Types.VLonglong v

let declare channel ?(durable=false) ?(exclusive=false) ?(auto_delete=false) ?(passive=false) ?(arguments=[]) ?(autogenerate=false) name =
  if autogenerate && String.length name != 0 then
    invalid_arg "Queue.declare name must be empty if autogenerate is true.";
  if not autogenerate && String.length name == 0 then
    invalid_arg "Queue.declare autogenerate must be true if name is empty.";
  let channel = Channel.channel channel in
  let req = { Declare.queue=name; passive; durable; exclusive;
              auto_delete; no_wait=false; arguments }
  in
  Declare.request channel req >>= fun rep ->
  if not autogenerate && name <> rep.Declare_ok.queue then
    failwith "Queue name returned by server doesn't match requested.";
  return { name = rep.Declare_ok.queue }

let get ~no_ack channel t =
  let open Spec.Basic in
  let channel = Channel.channel channel in
  Get.request channel { Get.queue=t.name; no_ack } >>= function
  | `Get_empty () ->
    return None
  | `Get_ok (get_ok, (header, body))  ->
    return (Some { Message.delivery_tag = get_ok.Get_ok.delivery_tag;
                   Message.redelivered = get_ok.Get_ok.redelivered;
                   Message.exchange = get_ok.Get_ok.exchange;
                   Message.routing_key = get_ok.Get_ok.routing_key;
                   Message.message = (header, body) })

(** Publish a message directly to a queue *)
let publish channel t ?mandatory message =
  Exchange.publish channel Exchange.default ?mandatory
    ~routing_key:t.name
    message

type 'a consumer = { channel: 'a Channel.t;
                     tag: string;
                     writer: Message.t Pipe.Writer.t }

(** Consume message from a queue. *)
let consume ~id ?(no_local=false) ?(no_ack=false) ?(exclusive=false)
    ?on_cancel channel t =
  let open Spec.Basic in
  let (reader, writer) = Pipe.create () in
  let consumer_tag = Printf.sprintf "%s.%s" (Channel.Internal.unique_id channel) id
  in
  let on_cancel () =
    Pipe.close_without_pushback writer;
    match on_cancel with
    | Some f -> f ()
    | None -> raise (Types.Consumer_cancelled consumer_tag)
  in

  let to_writer (deliver, header, body) =
    { Message.delivery_tag = deliver.Deliver.delivery_tag;
      Message.redelivered = deliver.Deliver.redelivered;
      Message.exchange = deliver.Deliver.exchange;
      Message.routing_key = deliver.Deliver.routing_key;
      Message.message = (header, body) }
    |> Pipe.write_without_pushback writer
  in
  let req = { Consume.queue=t.name;
              consumer_tag;
              no_local;
              no_ack;
              exclusive;
              no_wait = false;
              arguments = [];
            }
  in
  let var = Ivar.create () in
  let on_receive consume_ok =
    Channel.Internal.register_consumer_handler channel consume_ok.Consume_ok.consumer_tag to_writer on_cancel;
    Ivar.fill var consume_ok
  in
  let read = snd Consume_ok.Internal.read in
  read ~once:true on_receive (Channel.channel channel);

  Consume.Internal.write (Channel.channel channel) req >>= fun () ->
  Ivar.read var >>= fun rep ->
  let tag = rep.Consume_ok.consumer_tag in
  return ({ channel; tag; writer }, reader)

let cancel consumer =
  let open Spec.Basic in
  Cancel.request (Channel.channel consumer.channel) { Cancel.consumer_tag = consumer.tag; no_wait = false } >>= fun _rep ->
  Channel.Internal.deregister_consumer_handler consumer.channel consumer.tag;
  Pipe.close consumer.writer

let bind channel t exchange = Exchange.Internal.bind_queue channel exchange t.name
let unbind channel t exchange = Exchange.Internal.unbind_queue channel exchange t.name

(** Purge the queue *)
let purge channel t =
  Purge.request (Channel.channel channel)
    { Purge.queue = t.name;
      no_wait = false;
    } >>= fun _rep ->
  return ()

(** Delete the queue. *)
let delete ?(if_unused=false) ?(if_empty=false) channel t =
  Delete.request (Channel.channel channel)
    { Delete.queue = t.name;
      if_unused;
      if_empty;
      no_wait = false;
    } >>= fun _rep -> return ()


(** Name of the queue *)
let name t = t.name

(** Construct a queue without any validation *)
let fake _channel name = return { name }