Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
request_sender.ml1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77open Core open Cohttp_lwt_unix open Base let handle_response ~logger ~converter ~node ~resp ~body = let status_code = resp |> Response.status |> Cohttp.Code.code_of_status in if status_code / 100 = 2 then ( Logger.debug logger ~loc:__LOC__ (Printf.sprintf "Received response from node: %d, body: %s" node.id body); let json = Yojson.Safe.from_string body in match converter json with | Ok param -> Some param | Error err -> Logger.error logger ~loc:__LOC__ (Printf.sprintf "Received an error response from node %d. err: %s, body: %s" node.id err body ); None ) else ( Logger.error logger ~loc:__LOC__ @@ Printf.sprintf "Received an error status code from node %d : %d" node.id status_code; None ) let post ~logger ~url_path ~request_json ~timeout_millis ~(converter : Yojson.Safe.t -> (Params.response, string) Result.t) ~my_node_id node = let request_param = request_json |> Yojson.Safe.to_string |> Cohttp_lwt.Body.of_string in let headers = Cohttp.Header.init_with "X-Raft-Node-Id" (string_of_int my_node_id) in let timeout : Params.response option Lwt.t = let%lwt _ = Lwt_unix.sleep (float_of_int timeout_millis /. 1000.0) in Logger.warn logger ~loc:__LOC__ (Printf.sprintf "Request timeout. node_id: %d, url_path: %s" node.id url_path ); Lwt.return None in let send_req node = let%lwt resp, response_body = Client.post ~body:request_param ~headers (Uri.of_string (Printf.sprintf "http://%s:%d/%s" node.host node.port url_path) ) in Lwt.map (fun body -> try handle_response ~logger ~converter ~node ~resp ~body with e -> let msg = Stdlib.Printexc.to_string e in Logger.error logger ~loc:__LOC__ (Printf.sprintf "Failed to handle response body. node_id: %d, error: %s" node.id msg ); None ) (Cohttp_lwt.Body.to_string response_body) in Lwt.catch (fun () -> Lwt.pick [ send_req node; timeout ]) (fun e -> let msg = Stdlib.Printexc.to_string e in Logger.error logger ~loc:__LOC__ (Printf.sprintf "Failed to send a request. node_id: %d, error: %s" node.id msg ); Lwt.return None )