package polymarket
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>
OCaml client library for the Polymarket prediction market API
Install
dune-project
Dependency
Authors
Maintainers
Sources
0.2.0.tar.gz
md5=4eb4c5d2f63ff081c9713d90be5a51b2
sha512=0e3de0c9b40683e09ab8f9f966a44784ef1b9b482c3eefef84104a7e8042c92f1d79893ee9588b24fa3d0decaed7f365509f4d1c23c66ce8328efb64e721f276
doc/src/polymarket.websocket/connection.ml.html
Source file connection.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274(** WebSocket connection management with TLS and reconnection support. Uses tls-eio for pure-OCaml TLS, avoiding OpenSSL dependencies. *) let src = Logs.Src.create "polymarket.wss" ~doc:"WebSocket connection" module Log = (val Logs.src_log src : Logs.LOG) (** Connection state *) type state = Disconnected | Connecting | Connected | Closing | Closed type config = { host : string; port : int; resource : string; initial_backoff : float; max_backoff : float; ping_interval : float; } (** Connection configuration *) let default_config ~host ~resource = { host; port = 443; resource; initial_backoff = 1.0; max_backoff = 60.0; ping_interval = 30.0; } (** Network wrapper to hide type parameter *) type net_t = Net : 'a Eio.Net.t -> net_t type t = { config : config; sw : Eio.Switch.t; net : net_t; clock : float Eio.Time.clock_ty Eio.Resource.t; mutable state : state; mutable flow : Tls_eio.t option; message_stream : string Eio.Stream.t; mutable subscription_msg : string option; mutable closed : bool; mutable current_backoff : float; } (** Internal connection type *) (** Create TLS configuration *) let make_tls_config () = let authenticator = match Ca_certs.authenticator () with | Ok auth -> auth | Error (`Msg msg) -> failwith ("CA certs error: " ^ msg) in match Tls.Config.client ~authenticator () with | Ok cfg -> cfg | Error (`Msg msg) -> failwith ("TLS config error: " ^ msg) (** Create a new connection *) let create ~sw ~net ~clock ~host ~resource ?(ping_interval = 30.0) ?(buffer_size = 1000) () = let config = default_config ~host ~resource in { config = { config with ping_interval }; sw; net = Net net; clock; state = Disconnected; flow = None; message_stream = Eio.Stream.create buffer_size; subscription_msg = None; closed = false; current_backoff = 1.0; } (** Establish TCP + TLS connection *) let connect_tls t = let host = t.config.host in let port = t.config.port in let (Net net) = t.net in Log.debug (fun m -> m "Connecting to %s:%d" host port); (* Resolve address *) let addr = match Eio.Net.getaddrinfo_stream net host ~service:(string_of_int port) with | [] -> failwith ("Failed to resolve host: " ^ host) | addr :: _ -> addr in (* Connect TCP *) let socket = Eio.Net.connect ~sw:t.sw net addr in (* Upgrade to TLS *) let tls_config = make_tls_config () in let host_name = Domain_name.of_string_exn host |> Domain_name.host_exn in Log.debug (fun m -> m "TLS handshake"); let tls_flow = Tls_eio.client_of_flow tls_config ~host:host_name socket in Log.debug (fun m -> m "TLS connected"); tls_flow (** Connect and perform WebSocket handshake *) let connect_internal t = t.state <- Connecting; let flow = connect_tls t in (* Perform WebSocket handshake *) match Handshake.perform ~flow ~host:t.config.host ~port:t.config.port ~resource:t.config.resource with | Handshake.Success -> t.flow <- Some flow; t.state <- Connected; t.current_backoff <- t.config.initial_backoff; Log.debug (fun m -> m "Connected"); true | Handshake.Failed msg -> Log.err (fun m -> m "Handshake failed: %s" msg); t.state <- Disconnected; false (** Send a frame *) let send_frame t frame = match t.flow with | Some flow -> let data = Frame.encode ~mask:true frame in Eio.Flow.copy_string data flow; Log.debug (fun m -> m "Frame sent (opcode %d)" (Frame.Opcode.to_int frame.opcode)) | None -> Log.warn (fun m -> m "Send failed: not connected") (** Send a text message *) let send t msg = send_frame t (Frame.text msg); Log.debug (fun m -> m "Message sent") (** Send a ping *) let send_ping t = send_frame t (Frame.ping ()) (** Receive loop - reads frames and dispatches to message stream *) let receive_loop t = match t.flow with | None -> () | Some flow -> ( try while t.state = Connected do let frame = Frame.decode flow in match frame.opcode with | Frame.Opcode.Text | Frame.Opcode.Binary -> Eio.Stream.add t.message_stream frame.payload | Frame.Opcode.Ping -> (* Respond with pong *) send_frame t (Frame.pong ~payload:frame.payload ()); Log.debug (fun m -> m "Ping received") | Frame.Opcode.Pong -> Log.debug (fun m -> m "Pong received") | Frame.Opcode.Close -> Log.debug (fun m -> m "Close received"); t.state <- Closed | _ -> () done with | End_of_file -> Log.debug (fun m -> m "EOF"); t.state <- Disconnected | exn -> Log.err (fun m -> m "Receive error: %s" (Printexc.to_string exn)); t.state <- Disconnected) (** Ping loop - sends periodic pings *) let ping_loop t = try while t.state = Connected && not t.closed do Eio.Time.sleep t.clock t.config.ping_interval; if t.state = Connected then begin send_ping t; Log.debug (fun m -> m "Ping sent") end done with | Eio.Cancel.Cancelled _ -> Log.debug (fun m -> m "Ping cancelled") | _ -> () (** Connect with exponential backoff retry *) let rec connect_with_retry t = if t.closed then () else if connect_internal t then begin (* Send subscription message if set *) match t.subscription_msg with | Some msg -> send t msg; Log.debug (fun m -> m "Resubscribed") | None -> () end else begin Log.warn (fun m -> m "Retrying in %.1fs" t.current_backoff); Eio.Time.sleep t.clock t.current_backoff; t.current_backoff <- min (t.current_backoff *. 2.0) t.config.max_backoff; connect_with_retry t end (** Set subscription message for reconnection *) let set_subscription t msg = t.subscription_msg <- Some msg (** Get message stream *) let message_stream t = t.message_stream (** Check if connected *) let is_connected t = t.state = Connected (** Check if closed *) let is_closed t = t.closed (** Close connection *) let close t = if not t.closed then begin t.closed <- true; (match t.flow with | Some flow -> (try send_frame t (Frame.close ()); Eio.Flow.close flow with _ -> ()); t.flow <- None | None -> ()); t.state <- Closed; Log.debug (fun m -> m "Closed") end (** Start the connection with receive loop *) let start t = Eio.Fiber.fork ~sw:t.sw (fun () -> try while not t.closed do if t.state = Disconnected then connect_with_retry t; if t.state = Connected then receive_loop t; (* Small delay before reconnect attempt *) if t.state = Disconnected && not t.closed then Eio.Time.sleep t.clock 0.1 done with Eio.Cancel.Cancelled _ -> Log.debug (fun m -> m "Receive cancelled")) (** Start ping loop *) let start_ping t = Eio.Fiber.fork ~sw:t.sw (fun () -> ping_loop t) (** Start a message parsing fiber that reads from a connection's raw stream, parses messages using the provided function, and adds them to the output stream. Handles cancellation and errors with consistent logging. @param sw Switch for fiber lifecycle @param channel_name Name for log messages (e.g., "market", "user") @param conn WebSocket connection to read from @param parse Function to parse raw messages into typed messages @param output_stream Output stream for parsed messages *) let start_parsing_fiber (type a) ~sw ~channel_name ~conn ~(parse : string -> a list) ~(output_stream : a Eio.Stream.t) = Eio.Fiber.fork ~sw (fun () -> try let raw_stream = conn.message_stream in while not conn.closed do let raw = Eio.Stream.take raw_stream in let msgs = parse raw in List.iter (fun msg -> Eio.Stream.add output_stream msg) msgs done; Log.debug (fun m -> m "Parser stopped (%s)" channel_name) with | Eio.Cancel.Cancelled _ -> Log.debug (fun m -> m "Parser cancelled (%s)" channel_name) | exn -> Log.err (fun m -> m "Parser error (%s): %s" channel_name (Printexc.to_string exn)))
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>