Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Page
Library
Module
Module type
Parameter
Class
Class type
Source
events.ml1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431open Common open Lwt.Infix let log_src = Logs.Src.create "ozulip.events" module Log = (val Logs_lwt.src_log log_src) module Logi = (val Logs.src_log log_src) module Smap = Map.Make(String) module Message = struct type recipients = | Stream of int * string | Private of int list type flag = | Read | Starred | Collapsed | Mentioned | Wildcard_mentioned | Has_alert_word | Historical let flag = Json_encoding.( string_enum [ "read", Read; "starred", Starred; "collapsed", Collapsed; "mentioned", Mentioned; "wildcard_mentioned", Wildcard_mentioned; "has_alert_word", Has_alert_word; "historical", Historical; ] ) type t = { id: int ; sender_id: int ; sender_email: string ; sender_full_name: string; recipients: recipients ; content : string ; flags : flag list ; } let content { content; _ } = content let sender_mention ?(silent = false) { sender_full_name; sender_id; _ } = Format.asprintf "@%s**%s|%d**" (if silent then "_" else "") sender_full_name sender_id let sender_destination { sender_id; _ } = Messages.private_ids [sender_id] let destination { recipients; _ } = match recipients with | Stream (stream_id, topic) -> Messages.stream_id stream_id topic | Private ids -> Messages.private_ids ids (* Message filters *) let is_trusted ?(trusted_ids = []) ?(trusted_emails = []) { sender_id; sender_email; _ } = List.mem sender_id trusted_ids || List.mem sender_email trusted_emails let has_flag flag m = List.mem flag m.flags let is_own_message config m = m.sender_email = config.Config.email let is_selfmsg { recipients; _ } = match recipients with | Private [ _ ] -> true | _ -> false let is_privmsg { recipients; _ } = (* A message is a privmsg if it has exactly two recipients: us and the sender. *) match recipients with | Private [ _; _ ] -> true | Private _ | Stream _ -> false let is_groupmsg { recipients; _ } = match recipients with | Private [] | Private [ _ ] | Private [ _; _ ] | Stream _ -> false | Private _ -> true let reply ?(privmsg = false) ?(mention = true) config m = (* Note: the slightly convoluted implementation ensures that we don't keep a reference to [m] in the closure. *) let dest = if privmsg then sender_destination m else destination m in let mention = if privmsg || is_privmsg m || not mention then None else Some (sender_mention m) in fun reply -> let reply = match mention with | Some mention -> mention ^ " " ^ reply | None -> reply in Messages.send_message config dest reply let replyf ?privmsg ?mention config m = Format.kasprintf (reply ?privmsg ?mention config m) end type event = .. type event += | Message of Message.t | Heartbeat | Other of Json_repr.ezjsonm type event_type = [ `Message ] type queue_id = string type events_queue = { queue_id : queue_id; last_event_id : int; zulip_feature_level : int; event_queue_longpoll_timeout_seconds : int; } let pp_event_type ppf = function | `Message -> Format.fprintf ppf "\"message\"" | `EventType et -> Format.fprintf ppf "\"%s\"" (String.escaped et) let add pp key x args = (key, [ Format.asprintf "%a" pp x ]) :: args let add_opt pp key opt args = match opt with Some x -> add pp key x args | None -> args let pp_string = Format.pp_print_string let bool_opt = add_opt Format.pp_print_bool let int_opt = add_opt pp_int let list_opt pp = add_opt (pp_list pp) let list pp = add (pp_list pp) let bool = add Format.pp_print_bool let string = add pp_string let register_result_enc = let open EzEncoding in let open Json_encoding in conv (fun { queue_id; last_event_id; zulip_feature_level; event_queue_longpoll_timeout_seconds; } -> ( queue_id, last_event_id, zulip_feature_level, event_queue_longpoll_timeout_seconds )) (fun ( queue_id, last_event_id, zulip_feature_level, event_queue_longpoll_timeout_seconds ) -> { queue_id; last_event_id; zulip_feature_level; event_queue_longpoll_timeout_seconds; }) @@ obj4 (req ~description:"The ID of the queue allocated for the client." "queue_id" string) (req ~description: "The initial value of `last_event_id` to pass to `events`." "last_event_id" int) (req ~description:"The server's current Zulip feature level." "zulip_feature_level" int) (dft "event_queue_longpoll_timeout_seconds" int 90) let request meth config endpoint args = let data = EzAPI.Url.encode_args ~url:true args in Request.request_api config meth endpoint mime_form_url data let register ?apply_markdown ?client_gravatar ?include_subscribers ?slim_presence ?(event_types = []) ?all_public_streams ?fetch_event_types ?narrow config = let args = bool_opt "apply_markdown" apply_markdown @@ bool_opt "client_gravatar" client_gravatar @@ bool_opt "include_subscribers" include_subscribers @@ bool_opt "slim_presence" slim_presence @@ list pp_event_type "event_types" event_types @@ bool_opt "all_public_streams" all_public_streams @@ list_opt pp_event_type "fetch_event_types" fetch_event_types @@ list_opt (pp_pair pp_string pp_string) "narrow" narrow @@ [] in let open Lwt_result.Infix in request `POST config "register" args >|= fun s -> Json_encoding.destruct ~ignore_extra_fields:true register_result_enc (EzEncoding.Ezjsonm.from_string s) let deregister ~queue_id config = let data = EzAPI.Url.encode_args ~url:true (string "queue_id" queue_id []) in Request.request_api config `DELETE "deregister" mime_form_url data >>= function | Ok _ -> Lwt_result.return () | Error e -> Lwt_result.fail e module Encodings = struct open Json_encoding let message = let open Message in let unexpected what expected = raise (Cannot_destruct ([], Unexpected (what, expected))) in let missing_field key = raise (Cannot_destruct ([], Missing_field key)) in let assoc fs key = try Smap.find key fs with Not_found -> missing_field key in let ( |>> ) x f = Json_encoding.destruct ~ignore_extra_fields:true f x in custom ~is_object:true ~schema:Json_schema.any (fun _ -> failwith "construct: not supported") (function | `O fs -> let fs = List.to_seq fs |> Smap.of_seq in let field = assoc fs in let id = field "id" |>> int in let sender_id = field "sender_id" |>> int in let sender_email = field "sender_email" |>> string in let sender_full_name = field "sender_full_name" |>> string in let type_ = field "type" |>> string_enum ["stream", `Stream; "private", `Private] in let recipients = match type_ with | `Stream -> let stream_id = field "stream_id" |>> int in let topic = field "subject" |>> string in Stream (stream_id, topic) | `Private -> let display_recipient = field "display_recipient" |>> list (obj1 (req "id" int)) in Private display_recipient in let content = field "content" |>> string in { id; sender_id; sender_email; sender_full_name; recipients; content; flags = [] } | `Bool _ -> unexpected "boolean" "object" | `Null -> unexpected "null" "object" | `Float _ -> unexpected "float" "object" | `String _ -> unexpected "string" "object" | `A _ -> unexpected "array" "object") let message_event = let open Message in conv (fun message -> ((), message, message.flags)) (fun ((), message, flags) -> { message with flags }) @@ obj3 (req "type" (constant "message")) (req "message" message) (req "flags" (list Message.flag)) let heartbeat_event = obj1 (req "type" (constant "heartbeat")) let event = merge_objs (obj1 (req "id" int)) @@ union [ case message_event (function Message e -> Some e | _ -> None) (fun e -> Message e); case heartbeat_event (function Heartbeat -> Some () | _ -> None) (fun () -> Heartbeat); (* It looks like [any_ezjson_object] doesn't actually work with [merge_objs]... *) case any_ezjson_object (function Other e -> Some e | _ -> None) (fun e -> Other e); ] let events_response = obj1 (req "events" (list event)) end let events ?last_event_id ?(blocking = true) ~queue_id config = let args = int_opt "last_event_id" last_event_id @@ bool "dont_block" (not blocking) @@ string "queue_id" queue_id @@ [] in let open Lwt_result.Infix in Request.api_get config "events" args >|= fun x -> Format.printf "%s@." x; Json_encoding.destruct ~ignore_extra_fields:true Encodings.events_response (EzEncoding.Ezjsonm.from_string x) (* Exponential backoff helper. *) let backoff ?switch ?(exp = 2.) ?(ceiling = 10) f x = let rec aux i t = Lwt_switch.check switch; f x >>= function | Ok r -> Lwt.return r | Error (code, status) -> Log.debug (fun m -> m "HTTP error %d: %a" code Format.(pp_print_option pp_print_string) status) >>= fun () -> Log.debug (fun m -> m "Retrying with exponential backoff...") >>= fun () -> Lwt_unix.sleep t >>= fun () -> if i < ceiling then aux (i + 1) (exp *. t) else aux i t in aux 0 1. let stream ?switch ?event_types config = let cancelled = Lwt_mvar.create_empty () in Lwt_switch.add_hook switch (Lwt_mvar.put cancelled); let stream, push = Lwt_stream.create () in let register = backoff @@ register ?event_types in (* Repeatedly request events using the provided timeout. The timeout returned by [register] is normally higher than the heartbeat frequency of the server, and so we shouldn't hit it unless we get disconnected. *) let rec events' ({ queue_id; last_event_id; _ } as events_config) = let timeout = float_of_int events_config.event_queue_longpoll_timeout_seconds in Lwt.pick [ (Lwt_unix.sleep timeout >|= fun () -> None); (Lwt_mvar.take cancelled >|= fun () -> None); (events ~last_event_id ~queue_id config >|= fun e -> Some e); ] >>= function | None -> Lwt_switch.check switch; Log.debug (fun m -> m "events timeout reached, retrying") >>= fun () -> events' events_config | Some e -> Lwt.return e in let rec aux ({ queue_id; last_event_id; _ } as events_config) = events' events_config >>= function | Ok events -> let last_event_id = List.fold_left (fun last_id (id, ev) -> begin match ev with | Heartbeat -> () | _ -> push (Some ev) end; max id last_id) last_event_id events in aux { events_config with last_event_id } | Error (code, status) -> (* If we get a 4XX HTTP error (client error), we stop trying -- if we are making a bogus query somehow, we probably are going to be stuck in an error loop otherwise. This is also the case for negative error codes, which are returned on internal errors of some of the upstream libraries (ez_api, cohttp, or conduit, I am not sure). *) if code < 0 || 400 <= code && code < 500 then begin push None; Lwt.fail_with ("HTTP client error " ^ string_of_int code) end else (* TODO: only register a new queue if we get a BAD_EVENT_QUEUE_ID error code. But I do not know how to get that information. *) Log.err (fun m -> m "HTTP error %d: %a" code Format.(pp_print_option pp_print_string) status ) >>= fun () -> register config >>= aux in Lwt.dont_wait (fun () -> register config >>= aux) (function | Lwt_switch.Off -> push None; () | e -> Logi.err (fun m -> m "uncaught: %s" (Printexc.to_string e))); stream let messages ?switch config = Lwt_stream.map (function | Message m -> m | _ -> assert false) @@ stream ?switch ~event_types:[`Message] config let strip_initial_mentions = let re = Re.(Perl.re {|^\s*@\*\*[^\*]+\*\*\s*|} |> compile) in fun s -> let g = Re.exec re s in let pos = Re.Group.stop g 0 in let len = String.length s - pos in String.sub s pos len let commands ?switch ?trusted_ids ?trusted_emails config = let is_trusted = match trusted_ids, trusted_emails with | None, None -> Fun.const true | _ -> Message.is_trusted ?trusted_ids ?trusted_emails in messages ?switch config |> Lwt_stream.filter_map (fun m -> if not (Message.is_own_message config m) && (Message.has_flag Mentioned m || Message.is_privmsg m) && not (Message.has_flag Read m) && is_trusted m then if Message.has_flag Mentioned m then try Some { m with content = strip_initial_mentions m.content } with Not_found -> None else Some m else None) let interact ?switch ?trusted_ids ?trusted_emails ?privmsg ?mention config f = commands ?trusted_ids ?trusted_emails config |> Lwt_stream.iter_p (fun m -> let send = Message.reply ?privmsg ?mention config m in f m.Message.content >>= function | Some reply -> send reply >>= begin function | Ok _ -> Lwt.return_unit | Error (code, msg) -> Log.err (fun m -> m "HTTP error %d: %a" code (Format.pp_print_option Format.pp_print_string) msg) end | None -> Lwt.return_unit)