package atacama

  1. Overview
  2. Docs

Source file acceptor_pool.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
[@@@warning "-8"]

open Riot

type 'ctx state = {
  buffer_size : int;
  socket : Net.Socket.listen_socket;
  transport : (module Transport.Intf);
  initial_ctx : 'ctx;
  handler : 'ctx Handler.t;
}

let rec accept_loop state =
  let (Ok (conn, client_addr)) = Net.Socket.accept state.socket in
  Logger.debug (fun f -> f "Accepted connection: %a" Net.Addr.pp client_addr);
  Telemetry_.accepted_connection client_addr;
  let conn = Socket.make conn state.transport state.buffer_size in
  let (Ok _pid) = Connection.start_link conn state.handler state.initial_ctx in
  accept_loop state

let start_link state =
  let pid =
    spawn_link (fun () ->
        process_flag (Trap_exit true);
        accept_loop state)
  in
  Ok pid

let child_spec ~socket ?(buffer_size = 128) transport handler initial_ctx =
  let state = { socket; buffer_size; transport; handler; initial_ctx } in
  Supervisor.child_spec ~start_link state

module Sup = struct
  type 'ctx state = {
    port : int;
    acceptor_count : int;
    transport_module : (module Transport.Intf);
    handler_module : 'ctx Handler.t;
    initial_ctx : 'ctx;
  }

  let start_link
      { port; acceptor_count; transport_module; handler_module; initial_ctx } =
    let (Ok socket) = Net.Socket.listen ~port () in
    Logger.debug (fun f -> f "Listening on 0.0.0.0:%d" port);
    Telemetry_.listening socket;
    let child_specs =
      List.init acceptor_count (fun _ ->
          child_spec ~socket transport_module handler_module initial_ctx)
    in
    Supervisor.start_link ~child_specs ()

  let child_spec ~port ~acceptor_count ~transport_module ~handler_module
      initial_ctx =
    let state =
      { acceptor_count; port; transport_module; handler_module; initial_ctx }
    in
    Supervisor.child_spec ~start_link state
end