package oraft

  1. Overview
  2. Docs

Source file append_entries_handler.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
open Core
open Cohttp_lwt_unix
open Yojson.Basic
open State
open Printf

(* Invoked by leader to replicate log entries (§5.3); also used as
 * heartbeat (§5.2).
 *
 * Receiver implementation:
 * 1. Reply false if term < currentTerm (§5.1)
 * 2. Reply false if log doesn't contain an entry at prevLogIndex
 *     whose term matches prevLogTerm (§5.3)
 * 3. If an existing entry conflicts with a new one (same index
 *    but different terms), delete the existing entry and all that
 *    follow it (§5.3)
 * 4. Append any new entries not already in the log
 * 5. If leaderCommit > commitIndex, set commitIndex =
 *    min(leaderCommit, index of last new entry)
 *)

let append_entries ~(conf : Conf.t) ~logger ~state
    ~(param : Params.append_entries_request) ~(apply_log : Base.apply_log)
    ~cb_newer_term ~handle_same_term_as_newer =
  VolatileState.update_leader_id state.volatile_state ~logger param.leader_id;
  let persistent_log = state.persistent_log in
  let volatile_state = state.volatile_state in
  if PersistentState.detect_newer_term state.persistent_state ~logger
       ~other_term:param.term
  then cb_newer_term ()
  else if handle_same_term_as_newer
          && PersistentState.detect_same_term state.persistent_state ~logger
               ~other_term:param.term
  then cb_newer_term ();

  let error = ref None in

  (* If leaderCommit > commitIndex,
   * set commitIndex = min(leaderCommit, index of last new entry) *)
  if VolatileState.detect_higher_commit_index volatile_state ~logger
       ~other:param.leader_commit
  then (
    match PersistentLog.last_index persistent_log with
    | Ok last_index ->
        VolatileState.update_commit_index volatile_state
          (min param.leader_commit last_index)
    | Error msg ->
        let msg = sprintf "Failed to handle append_entries. error:[%s]" msg in
        Logger.error logger ~loc:__LOC__ msg;
        error := Some (Error msg)
  );

  if Option.is_none !error
  then
    if List.length param.entries > 0
    then (
      let first_entry = List.hd_exn param.entries in
      Logger.debug logger ~loc:__LOC__
        (sprintf
           "This param isn't empty, so appending entries(lentgh: %d, first_entry.term: %d, first_entry.index: %d)"
           (List.length param.entries)
           first_entry.term first_entry.index
        );
      (* If an existing entry conflicts with a new one (same index
         *  but different terms), delete the existing entry and all that
         *  follow it (§5.3)
         *
         * Append any new entries not already in the log *)
      match PersistentLog.append persistent_log ~entries:param.entries with
      | Ok () -> ()
      | Error msg ->
          let msg = sprintf "Failed to handle append_entries. error:[%s]" msg in
          Logger.error logger ~loc:__LOC__ msg;
          error := Some (Error msg)
    );

  (* All Servers:
   * - If commitIndex > lastApplied: increment lastApplied, apply
   *   log[lastApplied] to state machine (§5.3)
   *)
  match !error with
  | None ->
      VolatileState.apply_logs volatile_state ~logger ~f:(fun i ->
          match PersistentLog.get persistent_log i with
          | Ok (Some log) ->
              apply_log ~node_id:conf.node_id ~log_index:log.index
                ~log_data:log.data
          | Ok None ->
              let msg =
                sprintf
                  "Failed to handle append_entries. error:[The target log is not found. index:[%d]]"
                  i
              in
              Logger.error logger ~loc:__LOC__ msg;
              Error msg
          | Error msg ->
              let msg =
                sprintf "Failed to handle append_entries. error:[%s]" msg
              in
              Logger.error logger ~loc:__LOC__ msg;
              Error msg
      )
  | Some error -> error


let log_error_req ~state ~logger ~msg ~(param : Params.append_entries_request) =
  let entries_size = List.length param.entries in
  let persistent_log = state.persistent_log in
  let first_entry =
    if entries_size = 0
    then "None"
    else PersistentLogEntry.show (List.nth_exn param.entries 0)
  in
  let last_entry =
    if entries_size = 0
    then "None"
    else PersistentLogEntry.show (List.nth_exn param.entries (entries_size - 1))
  in
  Logger.warn logger ~loc:__LOC__
    (sprintf
       "%s. param:{term:%d, leader_id:%d, prev_log_term:%d, prev_log_index:%d, entries_size:%d, leader_commit:%d, first_entry:%s, last_entry:%s}, state:%s"
       msg param.term param.leader_id param.prev_log_term param.prev_log_index
       entries_size param.leader_commit first_entry last_entry
       (PersistentLog.show persistent_log)
    )


let handle ~conf ~state ~logger ~apply_log ~cb_valid_request ~cb_newer_term
    ~handle_same_term_as_newer ~(param : Params.append_entries_request) =
  let persistent_state = state.persistent_state in
  let persistent_log = state.persistent_log in
  match PersistentLog.get persistent_log param.prev_log_index with
  | Ok stored_prev_log -> (
      let result =
        if PersistentState.detect_old_leader persistent_state ~logger
             ~other_term:param.term
        then (
          (* Reply false if term < currentTerm (§5.1) *)
          log_error_req ~state ~logger
            ~msg:"Received append_entries req that has old team" ~param;
          Ok false
        )
        else if (not
                   (param.prev_log_term = initial_term
                   && param.prev_log_index = initail_log_index
                   )
                )
                &&
                match stored_prev_log with
                | Some l -> l.term <> param.prev_log_term
                | None -> true
        then (
          cb_valid_request ();
          (* Reply false if log doesn't contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3) *)
          log_error_req ~state ~logger
            ~msg:"Received append_entries req that has unexpected prev_log"
            ~param;
          Ok false
        )
        else (
          cb_valid_request ();
          let result =
            append_entries ~conf ~logger ~state ~param ~apply_log ~cb_newer_term
              ~handle_same_term_as_newer
          in
          State.log state ~logger;
          match result with Ok () -> Ok true | Error msg -> Error msg
        )
      in
      match result with
      | Ok success ->
          let response_body =
            `Assoc
              [
                ("term", `Int (PersistentState.current_term persistent_state));
                ("success", `Bool success);
              ]
            |> to_string
          in
          Ok (Server.respond_string ~status:`OK ~body:response_body ())
      | Error msg -> Error msg
    )
  | Error msg ->
      let msg = sprintf "Failed to handle append_entries. error:[%s]" msg in
      Logger.error logger ~loc:__LOC__ msg;
      Error msg