package shuttle

  1. Overview
  2. Docs

Source file connection.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
open! Core
open! Async

let close_channels reader writer =
  let%bind () = Output_channel.close writer in
  Input_channel.close reader
;;

let collect_errors writer fn =
  let monitor = Output_channel.monitor writer in
  ignore (Monitor.detach_and_get_error_stream monitor : _ Stream.t);
  choose
    [ choice (Monitor.get_next_error monitor) (fun e -> Error e)
    ; choice (Monitor.try_with ~run:`Now ~rest:`Log fn) Fn.id
    ]
;;

let listen
  ?max_connections
  ?max_accepts_per_batch
  ?backlog
  ?socket
  ?input_buffer_size
  ?max_input_buffer_size
  ?output_buffer_size
  ?max_output_buffer_size
  ?write_timeout
  ?time_source
  ~on_handler_error
  ~f:handler
  where_to_listen
  =
  Tcp.Server.create_sock
    ?max_connections
    ?max_accepts_per_batch
    ?backlog
    ?socket
    ?time_source
    ~on_handler_error
    where_to_listen
    (fun addr socket ->
    let fd = Socket.fd socket in
    let input_channel =
      Input_channel.create
        ?max_buffer_size:max_input_buffer_size
        ?buf_len:input_buffer_size
        ?time_source
        fd
    in
    let output_channel =
      Output_channel.create
        ?max_buffer_size:max_output_buffer_size
        ?buf_len:output_buffer_size
        ?write_timeout
        ?time_source
        fd
    in
    let%bind res =
      Deferred.any
        [ collect_errors output_channel (fun () ->
            handler addr input_channel output_channel)
        ; Output_channel.remote_closed output_channel |> Deferred.ok
        ]
    in
    let%bind () = close_channels input_channel output_channel in
    match res with
    | Ok () -> Deferred.unit
    | Error exn ->
      Exn.reraise exn "Shuttle.Connection.create: exception from output_channel")
;;

let with_connection
  ?interrupt
  ?timeout
  ?input_buffer_size
  ?max_input_buffer_size
  ?output_buffer_size
  ?max_output_buffer_size
  ?time_source
  ~f
  where_to_connect
  =
  let%bind socket = Tcp.connect_sock ?interrupt ?timeout ?time_source where_to_connect in
  let fd = Socket.fd socket in
  let input_channel =
    Input_channel.create
      ?max_buffer_size:max_input_buffer_size
      ?buf_len:input_buffer_size
      ?time_source
      fd
  in
  let output_channel =
    Output_channel.create
      ?max_buffer_size:max_output_buffer_size
      ?buf_len:output_buffer_size
      ?time_source
      fd
  in
  let res = collect_errors output_channel (fun () -> f input_channel output_channel) in
  let%bind () =
    Deferred.any_unit
      [ (res >>| fun _ -> ())
      ; Output_channel.close_finished output_channel
      ; Input_channel.closed input_channel
      ]
  in
  let%bind () = close_channels input_channel output_channel in
  match%map res with
  | Ok v -> v
  | Error exn ->
    Exn.reraise exn "Shuttle.Connection.with_connection: Exception in output channel"
;;