Source file message_spool.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
open Core
open Async
open Async_smtp_types
module Log = Mail_log
let compare _ _ = `You_are_using_poly_compare
let _silence_unused_warning = compare
module On_disk_spool = Message.On_disk_spool
let create = On_disk_spool.create
let load str = On_disk_spool.load str
let ls t queues =
Deferred.Or_error.List.concat_map queues ~f:(fun queue -> On_disk_spool.list t queue)
;;
let uncheckout_from_queue t queue =
let module Checked_out_entry = On_disk_spool.Expert.Checked_out_entry in
match%bind On_disk_spool.Expert.list_checkouts_unsafe t queue with
| Error e -> return ([], [ e ])
| Ok checked_out_entries ->
List.map checked_out_entries ~f:(fun checked_out_entry ->
Checked_out_entry.save
checked_out_entry
(Checked_out_entry.queue checked_out_entry)
>>| Result.map ~f:(fun () -> Checked_out_entry.name checked_out_entry))
|> Deferred.List.all
>>| List.partition_result
;;
let uncheckout_all_entries t =
let%bind recovered, errors =
Deferred.List.map Message.Queue.all ~f:(uncheckout_from_queue t) >>| List.unzip
in
let recovered, errors = List.concat recovered, List.concat errors in
let errors =
match errors with
| [] -> None
| _ -> Some (Error.of_list errors)
in
return (`Recovered recovered, `Errors errors)
;;
module Entry = struct
include On_disk_spool.Entry
let to_message entry = On_disk_spool.Entry.Direct.contents entry
let to_message_with_envelope entry =
let open Deferred.Or_error.Let_syntax in
let%bind meta = to_message entry in
let data_file = On_disk_spool.Entry.Direct.data_file entry in
let%bind data = On_disk_spool.Data_file.load data_file in
let email = Message.Data.to_email data in
return (meta, Smtp_envelope.create' ~info:(Message.envelope_info meta) ~email)
;;
let size entry =
let open Deferred.Or_error.Let_syntax in
let data_file = On_disk_spool.Entry.Direct.data_file entry in
let%bind stats = On_disk_spool.Data_file.stat data_file in
let size = Unix.Stats.size stats in
return (Byte_units.of_bytes_int64_exn size)
;;
end
let entry t =
let open Deferred.Or_error.Let_syntax in
let spool = On_disk_spool.load_unsafe (Message.spool_dir t) in
let%map queue = Message.Queue.of_status' (Message.status t) |> Deferred.return in
Entry.create spool queue ~name:(Message.Id.to_string (Message.id t))
;;
type t = On_disk_spool.t
type spool = t
let enqueue meta_spool queue ~meta ~id ~data =
let open Deferred.Or_error.Let_syntax in
let%bind (_ : On_disk_spool.Entry.t) =
On_disk_spool.enqueue meta_spool queue meta data (`Use id)
in
Deferred.Or_error.ok_unit
;;
let enqueue spool ~log:_ ~initial_status envelope_batch ~flows ~original_msg =
let parent_id = Smtp_envelope.id original_msg in
Message.Queue.of_status' initial_status
|> Deferred.return
>>=? fun queue ->
Message.of_envelope_batch
envelope_batch
~gen_id:(fun () -> On_disk_spool.Unique_name.reserve spool original_msg)
~spool_dir:(On_disk_spool.dir spool)
~spool_date:(Time.now ())
~failed_recipients:[]
~relay_attempts:[]
~parent_id
~status:initial_status
~flows
>>=? fun messages_with_data ->
Deferred.Or_error.List.map messages_with_data ~f:(fun (meta, data, envelope) ->
let id = Message.id meta in
enqueue spool queue ~meta ~id ~data >>|? fun () -> meta, envelope)
;;
let with_file
t
(f :
On_disk_spool.Data_file.t -> ([ `Sync_meta | `Unlink ] * 'a) Or_error.t Deferred.t)
: 'a Or_error.t Deferred.t
=
entry t
>>=? fun entry ->
return (Message.Queue.of_status' (Message.status t))
>>=? fun original_queue ->
On_disk_spool.with_entry entry ~f:(fun meta data_file ->
match Message.compare t meta = 0 with
| false ->
let e =
Error.create
"spooled message in memory differs from spooled message on disk"
(`In_memory t, `On_disk meta, `Entry entry)
[%sexp_of:
[ `In_memory of Message.t ]
* [ `On_disk of Message.t ]
* [ `Entry of On_disk_spool.Entry.t ]]
in
return (`Save (meta, original_queue), Error e)
| true ->
(match%bind f data_file with
| Error _ as e -> return (`Save (meta, original_queue), e)
| Ok (`Unlink, res) -> return (`Remove, Ok res)
| Ok (`Sync_meta, res) ->
(match%bind
Message.Queue.of_status' (Message.status t)
|> Or_error.tag ~tag:(Sexp.to_string (Message.sexp_of_t t))
|> Deferred.return
with
| Error _ as e -> return (`Save (meta, original_queue), e)
| Ok new_queue -> return (`Save (t, new_queue), Ok res))))
>>| Or_error.join
;;
let freeze t ~log =
with_file t (fun _data_file ->
Log.info
log
(lazy
(Log.Message.create
~here:[%here]
~flows:(Message.flows t)
~component:[ "spool" ]
~spool_id:(Message.Id.to_string (Message.id t))
"frozen"));
Message.set_status t `Frozen;
return (Ok (`Sync_meta, ())))
;;
let mark_for_send_now ~retry_intervals t ~log =
with_file t (fun _data_file ->
Log.info
log
(lazy
(Log.Message.create
~here:[%here]
~flows:(Message.flows t)
~component:[ "spool" ]
~spool_id:(Message.Id.to_string (Message.id t))
"send_now"));
Message.set_status t `Send_now;
Message.add_retry_intervals t retry_intervals;
Message.move_failed_recipients_to_remaining_recipients t;
return (Ok (`Sync_meta, ())))
;;
let remove t ~log =
with_file t (fun _data_file ->
Log.info
log
(lazy
(Log.Message.create
~here:[%here]
~flows:(Message.flows t)
~component:[ "spool" ]
~spool_id:(Message.Id.to_string (Message.id t))
"removing"));
Message.set_status t `Removed;
return (Ok (`Sync_meta, ())))
;;
let send_envelope_via_sendfile client ~log ~flows ~component envelope_info data_file =
let send_data client =
let socket_fd = Writer.fd (Client_raw.writer client) in
Async_sendfile.sendfile
~fd:socket_fd
~file:(On_disk_spool.Data_file.path data_file)
()
in
Client.Expert.send_envelope client ~log ~flows ~component ~send_data envelope_info
;;
let send_to_hops t ~log ~client_cache data_file =
let hops_tag =
Sexp.to_string ([%sexp_of: Host_and_port.t list] (Message.next_hop_choices t))
in
Log.debug
log
(lazy
(Log.Message.create
~here:[%here]
~flows:(Message.flows t)
~component:[ "spool"; "send" ]
~spool_id:(Message.Id.to_string (Message.id t))
~tags:[ "hops", hops_tag ]
"attempting delivery"));
match%bind
Client_cache.Tcp.with_'
~give_up:(Clock.after (Time.Span.of_min 2.))
~cache:client_cache
(Message.next_hop_choices t)
?route:(Smtp_envelope.Info.route (Message.envelope_info t))
~f:(fun ~flows client ->
let flows = Log.Flows.union (Message.flows t) flows in
let envelope_info =
Smtp_envelope.Info.set
(Message.envelope_info t)
~recipients:(Message.remaining_recipients t)
()
in
send_envelope_via_sendfile
client
~log
~flows
~component:[ "spool"; "send" ]
envelope_info
data_file)
with
| `Ok (hop, Error e) ->
let e = Error.tag ~tag:"Unable to send envelope" e in
Log.info
log
(lazy
(Log.Message.of_error
~here:[%here]
~flows:(Message.flows t)
~component:[ "spool"; "send" ]
~spool_id:(Message.Id.to_string (Message.id t))
~remote_address:hop
e));
Message.add_relay_attempt t (Time.now (), e);
return `Try_later
| `Error_opening_all_addresses hops_and_errors ->
List.iter hops_and_errors ~f:(fun (hop, e) ->
let e = Error.tag ~tag:"Unable to open connection for hop" e in
Log.info
log
(lazy
(Log.Message.of_error
~here:[%here]
~flows:(Message.flows t)
~component:[ "spool"; "send" ]
~spool_id:(Message.Id.to_string (Message.id t))
~remote_address:hop
e)));
let e = Error.createf "No hops available" in
Message.add_relay_attempt t (Time.now (), e);
return `Try_later
| `Gave_up_waiting_for_address ->
let e = Error.createf "Gave up waiting for client" in
Log.info
log
(lazy
(Log.Message.of_error
~here:[%here]
~flows:(Message.flows t)
~component:[ "spool"; "send" ]
~spool_id:(Message.Id.to_string (Message.id t))
~tags:[ "hops", hops_tag ]
e));
Message.add_relay_attempt t (Time.now (), e);
return `Try_later
| `Cache_is_closed ->
Log.info
log
(lazy
(Log.Message.create
~here:[%here]
~flows:(Message.flows t)
~component:[ "spool"; "send" ]
~spool_id:(Message.Id.to_string (Message.id t))
~tags:[ "hops", hops_tag ]
"Cache is closed"));
return `Try_later
| `Ok (_hop, Ok envelope_status) ->
(match
Client.Envelope_status.ok_or_error ~allow_rejected_recipients:false envelope_status
with
| Ok _msg_id ->
return `Done
| Error e ->
Message.add_relay_attempt t (Time.now (), e);
(match envelope_status with
| Ok (_ , rejected_recipients)
| Error (`No_recipients rejected_recipients) ->
let permanently_failed_recipients, temporarily_failed_recipients =
List.partition_map rejected_recipients ~f:(fun (recipient, reject) ->
if Smtp_reply.is_permanent_error reject
then First recipient
else Second recipient)
in
Message.set_remaining_recipients t temporarily_failed_recipients;
Message.set_failed_recipients
t
(Message.failed_recipients t @ permanently_failed_recipients);
if List.is_empty (Message.remaining_recipients t)
then return `Fail_permanently
else return `Try_later
| Error
( `Rejected_sender r
| `Rejected_sender_and_recipients (r, _)
| `Rejected_body (r, _) ) ->
if Smtp_reply.is_permanent_error r
then return `Fail_permanently
else return `Try_later))
;;
let do_send t ~log ~client_cache =
let last_relay_error t =
match Message.last_relay_attempt t with
| None -> Error.of_string "No relay attempt"
| Some (_, e) -> e
in
with_file t (fun get_envelope ->
Message.set_status t `Sending;
match%map send_to_hops t ~log ~client_cache get_envelope with
| `Done ->
Message.set_status t `Delivered;
Ok (`Unlink, `Delivered)
| (`Fail_permanently | `Try_later) as fail ->
(match fail, Message.retry_intervals t with
| `Fail_permanently, _ | `Try_later, [] ->
let delivery_failure = last_relay_error t in
Message.set_status t `Frozen;
Ok (`Sync_meta, `Failed delivery_failure)
| `Try_later, r :: rs ->
let delivery_failure = last_relay_error t in
Message.set_status
t
(`Send_at (Time.add (Time.now ()) (Smtp_envelope.Retry_interval.to_span r)));
Message.set_retry_intervals t rs;
Ok (`Sync_meta, `Failed delivery_failure)))
;;
let send t ~log ~client_cache =
match Message.status t with
| `Send_now | `Send_at _ -> do_send t ~log ~client_cache
| `Frozen -> return (Or_error.error_string "Message.send: message is frozen")
| `Removed -> return (Or_error.error_string "Message.send: message is removed")
| `Quarantined _ ->
return (Or_error.error_string "Message.send: message is quarantined")
| `Sending ->
return (Or_error.error_string "Message.send: message is already being sent")
| `Delivered -> return (Or_error.error_string "Message.send: message is delivered")
;;
module On_disk_monitor = struct
include Multispool.Monitor.Make (Message.On_disk)
end