Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
follower.ml1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127open Core open Base open State open Result (** Followers (§5.2): * - Respond to RPCs from candidates and leaders * * - If election timeout elapses without receiving AppendEntries * RPC from current leader or granting vote to candidate: * convert to candidate *) let mode = FOLLOWER type t = { conf : Conf.t; logger : Logger.t; apply_log : apply_log; state : State.common; lock : Lwt_mutex.t; } let init ~conf ~apply_log ~state = Ok { conf; logger = Logger.create ~node_id:conf.node_id ~mode ~output_path:conf.log_file ~level:conf.log_level (); apply_log; state; lock = Lwt_mutex.create (); } let unexpected_error msg = Cohttp_lwt_unix.Server.respond_string ~status:`Internal_server_error ~body:msg () let unexpected_request t = Logger.error t.logger ~loc:__LOC__ "Unexpected request"; Lwt.return (Cohttp.Response.make ~status:`Internal_server_error (), `Empty) let request_handlers t ~election_timer = let handlers = Stdlib.Hashtbl.create 2 in let open Params in Stdlib.Hashtbl.add handlers (`POST, "/append_entries") ( (fun json -> match append_entries_request_of_yojson json with | Ok x -> Ok (APPEND_ENTRIES_REQUEST x) | Error _ as e -> e ), function | APPEND_ENTRIES_REQUEST x -> Lwt_mutex.with_lock t.lock (fun () -> let result = Append_entries_handler.handle ~conf:t.conf ~state:t.state ~logger:t.logger ~apply_log:t.apply_log (* If election timeout elapses without receiving AppendEntries * RPC from current leader or granting vote to candidate: * convert to candidate *) ~cb_valid_request:(fun () -> Timer.update election_timer) ~cb_newer_term:(fun () -> ()) ~handle_same_term_as_newer:false ~param:x in match result with | Ok response -> response | Error msg -> unexpected_error msg ) | _ -> unexpected_request t ); Stdlib.Hashtbl.add handlers (`POST, "/request_vote") ( (fun json -> match request_vote_request_of_yojson json with | Ok x -> Ok (REQUEST_VOTE_REQUEST x) | Error _ as e -> e ), function | REQUEST_VOTE_REQUEST x -> Lwt_mutex.with_lock t.lock (fun () -> let result = Request_vote_handler.handle ~state:t.state ~logger:t.logger (* If election timeout elapses without receiving AppendEntries * RPC from current leader or granting vote to candidate: * convert to candidate *) ~cb_valid_request:(fun () -> Timer.update election_timer) ~cb_newer_term:(fun () -> ()) ~param:x in match result with | Ok response -> response | Error msg -> unexpected_error msg ) | _ -> unexpected_request t ); handlers let run ~conf ~apply_log ~state = init ~conf ~apply_log ~state >>= fun t -> VolatileState.reset_leader_id t.state.volatile_state ~logger:t.logger; PersistentState.set_voted_for t.state.persistent_state ~logger:t.logger ~voted_for:None; Logger.info t.logger ~loc:__LOC__ @@ Printf.sprintf "### Follower: Start (term:%d) ###" @@ PersistentState.current_term t.state.persistent_state; State.log t.state ~logger:t.logger; let election_timer = Timer.create ~logger:t.logger ~timeout_millis:t.conf.election_timeout_millis in let handlers = request_handlers t ~election_timer in let server, server_stopper = Request_dispatcher.create ~port:(Conf.my_node t.conf).port ~logger:t.logger ~table:handlers in let election_timer_thread = Timer.start election_timer ~on_stop:(fun () -> Lwt.wakeup server_stopper ()) in Logger.debug t.logger ~loc:__LOC__ "Starting"; let next () = Lwt.return CANDIDATE in let all = Lwt.join [ election_timer_thread; server ] in Ok (Lwt.bind all next)