package oraft

  1. Overview
  2. Docs

Source file follower.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
open Core
open Base
open State
open Result

(** Followers (§5.2):
  * - Respond to RPCs from candidates and leaders
  *
  * - If election timeout elapses without receiving AppendEntries
  *   RPC from current leader or granting vote to candidate:
  *   convert to candidate
  *)

let mode = FOLLOWER

type t = {
  conf : Conf.t;
  logger : Logger.t;
  apply_log : apply_log;
  state : State.common;
  lock : Lwt_mutex.t;
}

let init ~conf ~apply_log ~state =
  Ok
    {
      conf;
      logger =
        Logger.create ~node_id:conf.node_id ~mode ~output_path:conf.log_file
          ~level:conf.log_level ();
      apply_log;
      state;
      lock = Lwt_mutex.create ();
    }


let unexpected_error msg =
  Cohttp_lwt_unix.Server.respond_string ~status:`Internal_server_error ~body:msg
    ()


let unexpected_request t =
  Logger.error t.logger ~loc:__LOC__ "Unexpected request";
  Lwt.return (Cohttp.Response.make ~status:`Internal_server_error (), `Empty)


let request_handlers t ~election_timer =
  let handlers = Stdlib.Hashtbl.create 2 in
  let open Params in
  Stdlib.Hashtbl.add handlers (`POST, "/append_entries")
    ( (fun json ->
        match append_entries_request_of_yojson json with
        | Ok x -> Ok (APPEND_ENTRIES_REQUEST x)
        | Error _ as e -> e
      ),
      function
      | APPEND_ENTRIES_REQUEST x ->
          Lwt_mutex.with_lock t.lock (fun () ->
              let result =
                Append_entries_handler.handle ~conf:t.conf ~state:t.state
                  ~logger:t.logger
                  ~apply_log:t.apply_log
                    (* If election timeout elapses without receiving AppendEntries
                     * RPC from current leader or granting vote to candidate:
                     * convert to candidate *)
                  ~cb_valid_request:(fun () -> Timer.update election_timer)
                  ~cb_newer_term:(fun () -> ())
                  ~handle_same_term_as_newer:false ~param:x
              in
              match result with
              | Ok response -> response
              | Error msg -> unexpected_error msg
          )
      | _ -> unexpected_request t
    );
  Stdlib.Hashtbl.add handlers (`POST, "/request_vote")
    ( (fun json ->
        match request_vote_request_of_yojson json with
        | Ok x -> Ok (REQUEST_VOTE_REQUEST x)
        | Error _ as e -> e
      ),
      function
      | REQUEST_VOTE_REQUEST x ->
          Lwt_mutex.with_lock t.lock (fun () ->
              let result =
                Request_vote_handler.handle ~state:t.state
                  ~logger:t.logger
                    (* If election timeout elapses without receiving AppendEntries
                     * RPC from current leader or granting vote to candidate:
                     * convert to candidate *)
                  ~cb_valid_request:(fun () -> Timer.update election_timer)
                  ~cb_newer_term:(fun () -> ())
                  ~param:x
              in
              match result with
              | Ok response -> response
              | Error msg -> unexpected_error msg
          )
      | _ -> unexpected_request t
    );
  handlers


let run ~conf ~apply_log ~state =
  init ~conf ~apply_log ~state >>= fun t ->
  VolatileState.reset_leader_id t.state.volatile_state ~logger:t.logger;
  PersistentState.set_voted_for t.state.persistent_state ~logger:t.logger
    ~voted_for:None;
  Logger.info t.logger ~loc:__LOC__
  @@ Printf.sprintf "### Follower: Start (term:%d) ###"
  @@ PersistentState.current_term t.state.persistent_state;
  State.log t.state ~logger:t.logger;
  let election_timer =
    Timer.create ~logger:t.logger ~timeout_millis:t.conf.election_timeout_millis
  in
  let handlers = request_handlers t ~election_timer in
  let server, server_stopper =
    Request_dispatcher.create ~port:(Conf.my_node t.conf).port ~logger:t.logger
      ~table:handlers
  in
  let election_timer_thread =
    Timer.start election_timer ~on_stop:(fun () -> Lwt.wakeup server_stopper ())
  in
  Logger.debug t.logger ~loc:__LOC__ "Starting";
  let next () = Lwt.return CANDIDATE in
  let all = Lwt.join [ election_timer_thread; server ] in
  Ok (Lwt.bind all next)