package async_smtp

  1. Overview
  2. Docs
Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source

Source file message_spool.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
open Core
open Async
open Async_smtp_types
module Log = Mail_log

let compare _ _ = `You_are_using_poly_compare
let _silence_unused_warning = compare

module On_disk_spool = Message.On_disk_spool

let create = On_disk_spool.create
let load str = On_disk_spool.load str

let ls t queues =
  Deferred.Or_error.List.concat_map queues ~f:(fun queue -> On_disk_spool.list t queue)
;;

let uncheckout_from_queue t queue =
  let module Checked_out_entry = On_disk_spool.Expert.Checked_out_entry in
  match%bind On_disk_spool.Expert.list_checkouts_unsafe t queue with
  | Error e -> return ([], [ e ])
  | Ok checked_out_entries ->
    List.map checked_out_entries ~f:(fun checked_out_entry ->
      Checked_out_entry.save
        checked_out_entry
        (Checked_out_entry.queue checked_out_entry)
      >>| Result.map ~f:(fun () -> Checked_out_entry.name checked_out_entry))
    |> Deferred.List.all
    >>| List.partition_result
;;

let uncheckout_all_entries t =
  let%bind recovered, errors =
    Deferred.List.map Message.Queue.all ~f:(uncheckout_from_queue t) >>| List.unzip
  in
  let recovered, errors = List.concat recovered, List.concat errors in
  let errors =
    match errors with
    | [] -> None
    | _ -> Some (Error.of_list errors)
  in
  return (`Recovered recovered, `Errors errors)
;;

module Entry = struct
  include On_disk_spool.Entry

  let to_message entry = On_disk_spool.Entry.Direct.contents entry

  let to_message_with_envelope entry =
    let open Deferred.Or_error.Let_syntax in
    let%bind meta = to_message entry in
    let data_file = On_disk_spool.Entry.Direct.data_file entry in
    let%bind data = On_disk_spool.Data_file.load data_file in
    let email = Message.Data.to_email data in
    return (meta, Smtp_envelope.create' ~info:(Message.envelope_info meta) ~email)
  ;;

  let size entry =
    let open Deferred.Or_error.Let_syntax in
    let data_file = On_disk_spool.Entry.Direct.data_file entry in
    let%bind stats = On_disk_spool.Data_file.stat data_file in
    let size = Unix.Stats.size stats in
    return (Byte_units.of_bytes_int64_exn size)
  ;;
end

let entry t =
  let open Deferred.Or_error.Let_syntax in
  let spool = On_disk_spool.load_unsafe (Message.spool_dir t) in
  let%map queue = Message.Queue.of_status' (Message.status t) |> Deferred.return in
  Entry.create spool queue ~name:(Message.Id.to_string (Message.id t))
;;

type t = On_disk_spool.t
type spool = t

let enqueue meta_spool queue ~meta ~id ~data =
  let open Deferred.Or_error.Let_syntax in
  let%bind (_ : On_disk_spool.Entry.t) =
    On_disk_spool.enqueue meta_spool queue meta data (`Use id)
  in
  Deferred.Or_error.ok_unit
;;

let enqueue spool ~log:_ ~initial_status envelope_batch ~flows ~original_msg =
  let parent_id = Smtp_envelope.id original_msg in
  Message.Queue.of_status' initial_status
  |> Deferred.return
  >>=? fun queue ->
  Message.of_envelope_batch
    envelope_batch
    ~gen_id:(fun () -> On_disk_spool.Unique_name.reserve spool original_msg)
    ~spool_dir:(On_disk_spool.dir spool)
    ~spool_date:(Time.now ())
    ~failed_recipients:[]
    ~relay_attempts:[]
    ~parent_id
    ~status:initial_status
    ~flows
  >>=? fun messages_with_data ->
  Deferred.Or_error.List.map messages_with_data ~f:(fun (meta, data, envelope) ->
    let id = Message.id meta in
    enqueue spool queue ~meta ~id ~data >>|? fun () -> meta, envelope)
;;

let with_file
      t
      (f :
         On_disk_spool.Data_file.t -> ([ `Sync_meta | `Unlink ] * 'a) Or_error.t Deferred.t)
  : 'a Or_error.t Deferred.t
  =
  entry t
  >>=? fun entry ->
  return (Message.Queue.of_status' (Message.status t))
  >>=? fun original_queue ->
  On_disk_spool.with_entry entry ~f:(fun meta data_file ->
    match Message.compare t meta = 0 with
    | false ->
      let e =
        Error.create
          "spooled message in memory differs from spooled message on disk"
          (`In_memory t, `On_disk meta, `Entry entry)
          [%sexp_of:
            [ `In_memory of Message.t ]
            * [ `On_disk of Message.t ]
            * [ `Entry of On_disk_spool.Entry.t ]]
      in
      return (`Save (meta, original_queue), Error e)
    | true ->
      (match%bind f data_file with
       | Error _ as e -> return (`Save (meta, original_queue), e)
       | Ok (`Unlink, res) -> return (`Remove, Ok res)
       | Ok (`Sync_meta, res) ->
         (* Derive queue from mutable [Message.status t] as it may have changed in [~f] *)
         (match%bind
            Message.Queue.of_status' (Message.status t)
            |> Or_error.tag ~tag:(Sexp.to_string (Message.sexp_of_t t))
            |> Deferred.return
          with
          | Error _ as e -> return (`Save (meta, original_queue), e)
          | Ok new_queue -> return (`Save (t, new_queue), Ok res))))
  >>| Or_error.join
;;

let freeze t ~log =
  with_file t (fun _data_file ->
    Log.info
      log
      (lazy
        (Log.Message.create
           ~here:[%here]
           ~flows:(Message.flows t)
           ~component:[ "spool" ]
           ~spool_id:(Message.Id.to_string (Message.id t))
           "frozen"));
    Message.set_status t `Frozen;
    return (Ok (`Sync_meta, ())))
;;

let mark_for_send_now ~retry_intervals t ~log =
  with_file t (fun _data_file ->
    Log.info
      log
      (lazy
        (Log.Message.create
           ~here:[%here]
           ~flows:(Message.flows t)
           ~component:[ "spool" ]
           ~spool_id:(Message.Id.to_string (Message.id t))
           "send_now"));
    Message.set_status t `Send_now;
    Message.add_retry_intervals t retry_intervals;
    Message.move_failed_recipients_to_remaining_recipients t;
    return (Ok (`Sync_meta, ())))
;;

let remove t ~log =
  with_file t (fun _data_file ->
    Log.info
      log
      (lazy
        (Log.Message.create
           ~here:[%here]
           ~flows:(Message.flows t)
           ~component:[ "spool" ]
           ~spool_id:(Message.Id.to_string (Message.id t))
           "removing"));
    Message.set_status t `Removed;
    return (Ok (`Sync_meta, ())))
;;

let send_envelope_via_sendfile client ~log ~flows ~component envelope_info data_file =
  let send_data client =
    let socket_fd = Writer.fd (Client_raw.writer client) in
    Async_sendfile.sendfile
      ~fd:socket_fd
      ~file:(On_disk_spool.Data_file.path data_file)
      ()
  in
  Client.Expert.send_envelope client ~log ~flows ~component ~send_data envelope_info
;;

let send_to_hops t ~log ~client_cache data_file =
  let hops_tag =
    Sexp.to_string ([%sexp_of: Host_and_port.t list] (Message.next_hop_choices t))
  in
  Log.debug
    log
    (lazy
      (Log.Message.create
         ~here:[%here]
         ~flows:(Message.flows t)
         ~component:[ "spool"; "send" ]
         ~spool_id:(Message.Id.to_string (Message.id t))
         ~tags:[ "hops", hops_tag ]
         "attempting delivery"));
  match%bind
    Client_cache.Tcp.with_'
      ~give_up:(Clock.after (Time.Span.of_min 2.))
      ~cache:client_cache
      (Message.next_hop_choices t)
      ?route:(Smtp_envelope.Info.route (Message.envelope_info t))
      ~f:(fun ~flows client ->
        let flows = Log.Flows.union (Message.flows t) flows in
        let envelope_info =
          Smtp_envelope.Info.set
            (Message.envelope_info t)
            ~recipients:(Message.remaining_recipients t)
            ()
        in
        send_envelope_via_sendfile
          client
          ~log
          ~flows
          ~component:[ "spool"; "send" ]
          envelope_info
          data_file)
  with
  | `Ok (hop, Error e) ->
    (* The client logs many common failures, so this might be repetitive. But
       duplication in the error case is better than missing potential errors. *)
    let e = Error.tag ~tag:"Unable to send envelope" e in
    Log.info
      log
      (lazy
        (Log.Message.of_error
           ~here:[%here]
           ~flows:(Message.flows t)
           ~component:[ "spool"; "send" ]
           ~spool_id:(Message.Id.to_string (Message.id t))
           ~remote_address:hop
           e));
    Message.add_relay_attempt t (Time.now (), e);
    return `Try_later
  | `Error_opening_all_addresses hops_and_errors ->
    List.iter hops_and_errors ~f:(fun (hop, e) ->
      let e = Error.tag ~tag:"Unable to open connection for hop" e in
      Log.info
        log
        (lazy
          (Log.Message.of_error
             ~here:[%here]
             ~flows:(Message.flows t)
             ~component:[ "spool"; "send" ]
             ~spool_id:(Message.Id.to_string (Message.id t))
             ~remote_address:hop
             e)));
    let e = Error.createf "No hops available" in
    Message.add_relay_attempt t (Time.now (), e);
    return `Try_later
  | `Gave_up_waiting_for_address ->
    let e = Error.createf "Gave up waiting for client" in
    Log.info
      log
      (lazy
        (Log.Message.of_error
           ~here:[%here]
           ~flows:(Message.flows t)
           ~component:[ "spool"; "send" ]
           ~spool_id:(Message.Id.to_string (Message.id t))
           ~tags:[ "hops", hops_tag ]
           e));
    Message.add_relay_attempt t (Time.now (), e);
    return `Try_later
  | `Cache_is_closed ->
    (* Shutdown is initiated, so stop trying to resend. *)
    Log.info
      log
      (lazy
        (Log.Message.create
           ~here:[%here]
           ~flows:(Message.flows t)
           ~component:[ "spool"; "send" ]
           ~spool_id:(Message.Id.to_string (Message.id t))
           ~tags:[ "hops", hops_tag ]
           "Cache is closed"));
    return `Try_later
  | `Ok (_hop, Ok envelope_status) ->
    (match
       Client.Envelope_status.ok_or_error ~allow_rejected_recipients:false envelope_status
     with
     | Ok _msg_id ->
       (* Already logged by the client *)
       return `Done
     | Error e ->
       (* We are being conservative here for simplicity - if we get a permanent error
          from one hop, we assume that we would get the same error from the remaining
          hops. *)
       (* Already logged by the client *)
       Message.add_relay_attempt t (Time.now (), e);
       (match envelope_status with
        | Ok (_ (* envelope_id *), rejected_recipients)
        | Error (`No_recipients rejected_recipients) ->
          let permanently_failed_recipients, temporarily_failed_recipients =
            List.partition_map rejected_recipients ~f:(fun (recipient, reject) ->
              if Smtp_reply.is_permanent_error reject
              then First recipient
              else Second recipient)
          in
          Message.set_remaining_recipients t temporarily_failed_recipients;
          Message.set_failed_recipients
            t
            (Message.failed_recipients t @ permanently_failed_recipients);
          if List.is_empty (Message.remaining_recipients t)
          then return `Fail_permanently
          else return `Try_later
        | Error
            ( `Rejected_sender r
            | `Rejected_sender_and_recipients (r, _)
            | `Rejected_body (r, _) ) ->
          if Smtp_reply.is_permanent_error r
          then return `Fail_permanently
          else return `Try_later))
;;

let do_send t ~log ~client_cache =
  let last_relay_error t =
    match Message.last_relay_attempt t with
    | None -> Error.of_string "No relay attempt"
    | Some (_, e) -> e
  in
  with_file t (fun get_envelope ->
    Message.set_status t `Sending;
    match%map send_to_hops t ~log ~client_cache get_envelope with
    | `Done ->
      Message.set_status t `Delivered;
      Ok (`Unlink, `Delivered)
    | (`Fail_permanently | `Try_later) as fail ->
      (match fail, Message.retry_intervals t with
       | `Fail_permanently, _ | `Try_later, [] ->
         let delivery_failure = last_relay_error t in
         Message.set_status t `Frozen;
         Ok (`Sync_meta, `Failed delivery_failure)
       | `Try_later, r :: rs ->
         let delivery_failure = last_relay_error t in
         Message.set_status
           t
           (`Send_at (Time.add (Time.now ()) (Smtp_envelope.Retry_interval.to_span r)));
         Message.set_retry_intervals t rs;
         Ok (`Sync_meta, `Failed delivery_failure)))
;;

let send t ~log ~client_cache =
  match Message.status t with
  | `Send_now | `Send_at _ -> do_send t ~log ~client_cache
  | `Frozen -> return (Or_error.error_string "Message.send: message is frozen")
  | `Removed -> return (Or_error.error_string "Message.send: message is removed")
  | `Quarantined _ ->
    return (Or_error.error_string "Message.send: message is quarantined")
  | `Sending ->
    return (Or_error.error_string "Message.send: message is already being sent")
  | `Delivered -> return (Or_error.error_string "Message.send: message is delivered")
;;

module On_disk_monitor = struct
  include Multispool.Monitor.Make (Message.On_disk)
end