Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
append_entries_handler.ml1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186open Core open Cohttp_lwt_unix open Yojson.Basic open State open Printf (* Invoked by leader to replicate log entries (§5.3); also used as * heartbeat (§5.2). * * Receiver implementation: * 1. Reply false if term < currentTerm (§5.1) * 2. Reply false if log doesn't contain an entry at prevLogIndex * whose term matches prevLogTerm (§5.3) * 3. If an existing entry conflicts with a new one (same index * but different terms), delete the existing entry and all that * follow it (§5.3) * 4. Append any new entries not already in the log * 5. If leaderCommit > commitIndex, set commitIndex = * min(leaderCommit, index of last new entry) *) let append_entries ~(conf : Conf.t) ~logger ~state ~(param : Params.append_entries_request) ~(apply_log : Base.apply_log) ~cb_newer_term ~handle_same_term_as_newer = VolatileState.update_leader_id state.volatile_state ~logger param.leader_id; let persistent_log = state.persistent_log in let volatile_state = state.volatile_state in if PersistentState.detect_newer_term state.persistent_state ~logger ~other_term:param.term then cb_newer_term () else if handle_same_term_as_newer && PersistentState.detect_same_term state.persistent_state ~logger ~other_term:param.term then cb_newer_term (); let error = ref None in (* If leaderCommit > commitIndex, * set commitIndex = min(leaderCommit, index of last new entry) *) if VolatileState.detect_higher_commit_index volatile_state ~logger ~other:param.leader_commit then ( match PersistentLog.last_index persistent_log with | Ok last_index -> VolatileState.update_commit_index volatile_state (min param.leader_commit last_index) | Error msg -> let msg = sprintf "Failed to handle append_entries. error:[%s]" msg in Logger.error logger ~loc:__LOC__ msg; error := Some (Error msg) ); if Option.is_none !error then if List.length param.entries > 0 then ( let first_entry = List.hd_exn param.entries in Logger.debug logger ~loc:__LOC__ (sprintf "This param isn't empty, so appending entries(lentgh: %d, first_entry.term: %d, first_entry.index: %d)" (List.length param.entries) first_entry.term first_entry.index ); (* If an existing entry conflicts with a new one (same index * but different terms), delete the existing entry and all that * follow it (§5.3) * * Append any new entries not already in the log *) match PersistentLog.append persistent_log ~entries:param.entries with | Ok () -> () | Error msg -> let msg = sprintf "Failed to handle append_entries. error:[%s]" msg in Logger.error logger ~loc:__LOC__ msg; error := Some (Error msg) ); (* All Servers: * - If commitIndex > lastApplied: increment lastApplied, apply * log[lastApplied] to state machine (§5.3) *) match !error with | None -> VolatileState.apply_logs volatile_state ~logger ~f:(fun i -> match PersistentLog.get persistent_log i with | Ok (Some log) -> apply_log ~node_id:conf.node_id ~log_index:log.index ~log_data:log.data | Ok None -> let msg = sprintf "Failed to handle append_entries. error:[The target log is not found. index:[%d]]" i in Logger.error logger ~loc:__LOC__ msg; Error msg | Error msg -> let msg = sprintf "Failed to handle append_entries. error:[%s]" msg in Logger.error logger ~loc:__LOC__ msg; Error msg ) | Some error -> error let log_error_req ~state ~logger ~msg ~(param : Params.append_entries_request) = let entries_size = List.length param.entries in let persistent_log = state.persistent_log in let first_entry = if entries_size = 0 then "None" else PersistentLogEntry.show (List.nth_exn param.entries 0) in let last_entry = if entries_size = 0 then "None" else PersistentLogEntry.show (List.nth_exn param.entries (entries_size - 1)) in Logger.warn logger ~loc:__LOC__ (sprintf "%s. param:{term:%d, leader_id:%d, prev_log_term:%d, prev_log_index:%d, entries_size:%d, leader_commit:%d, first_entry:%s, last_entry:%s}, state:%s" msg param.term param.leader_id param.prev_log_term param.prev_log_index entries_size param.leader_commit first_entry last_entry (PersistentLog.show persistent_log) ) let handle ~conf ~state ~logger ~apply_log ~cb_valid_request ~cb_newer_term ~handle_same_term_as_newer ~(param : Params.append_entries_request) = let persistent_state = state.persistent_state in let persistent_log = state.persistent_log in match PersistentLog.get persistent_log param.prev_log_index with | Ok stored_prev_log -> ( let result = if PersistentState.detect_old_leader persistent_state ~logger ~other_term:param.term then ( (* Reply false if term < currentTerm (§5.1) *) log_error_req ~state ~logger ~msg:"Received append_entries req that has old team" ~param; Ok false ) else if (not (param.prev_log_term = initial_term && param.prev_log_index = initail_log_index ) ) && match stored_prev_log with | Some l -> l.term <> param.prev_log_term | None -> true then ( cb_valid_request (); (* Reply false if log doesn't contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3) *) log_error_req ~state ~logger ~msg:"Received append_entries req that has unexpected prev_log" ~param; Ok false ) else ( cb_valid_request (); let result = append_entries ~conf ~logger ~state ~param ~apply_log ~cb_newer_term ~handle_same_term_as_newer in State.log state ~logger; match result with Ok () -> Ok true | Error msg -> Error msg ) in match result with | Ok success -> let response_body = `Assoc [ ("term", `Int (PersistentState.current_term persistent_state)); ("success", `Bool success); ] |> to_string in Ok (Server.respond_string ~status:`OK ~body:response_body ()) | Error msg -> Error msg ) | Error msg -> let msg = sprintf "Failed to handle append_entries. error:[%s]" msg in Logger.error logger ~loc:__LOC__ msg; Error msg