package polymarket
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>
OCaml client library for the Polymarket prediction market API
Install
dune-project
Dependency
Authors
Maintainers
Sources
0.2.0.tar.gz
md5=4eb4c5d2f63ff081c9713d90be5a51b2
sha512=0e3de0c9b40683e09ab8f9f966a44784ef1b9b482c3eefef84104a7e8042c92f1d79893ee9588b24fa3d0decaed7f365509f4d1c23c66ce8328efb64e721f276
doc/src/polymarket.rtds/client.ml.html
Source file client.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177(** High-level WebSocket client for Polymarket Real-Time Data Socket (RTDS). Provides typed streaming access to crypto prices and comments. *) module Connection = Websocket.Connection let src = Logs.Src.create "polymarket.rtds" ~doc:"Polymarket RTDS client" module Log = (val Logs.src_log src : Logs.LOG) (** RTDS WebSocket host *) let default_host = "ws-live-data.polymarket.com" module Constants = Polymarket_common.Constants (** {1 Internal Helpers} *) (** Create a WebSocket client with filtering for specialized message types. Used by Crypto_prices and Comments clients that need type-specific streams. *) let make_filtered_client (type a) ~sw ~net ~clock ~subscription ~(filter : Types.message -> a option) ~channel_name : Connection.t * a Eio.Stream.t = let conn = Connection.create ~sw ~net ~clock ~host:default_host ~resource:"/ws" ~ping_interval:Constants.rtds_ping_interval ~buffer_size:Constants.message_buffer_size () in let message_stream = Eio.Stream.create Constants.message_buffer_size in let subscribe_msg = Types.subscribe_json ~subscriptions:[ subscription ] in Connection.set_subscription conn subscribe_msg; Connection.start conn; Connection.start_ping conn; (* Parsing with filter *) let parse raw = Types.parse_message raw |> List.filter_map filter in Connection.start_parsing_fiber ~sw ~channel_name ~conn ~parse ~output_stream:message_stream; (conn, message_stream) (** {1 Unified RTDS Client} *) type t = { conn : Connection.t; message_stream : Types.message Eio.Stream.t; mutable subscriptions : Types.subscription list; } let connect ~sw ~net ~clock () = Log.debug (fun m -> m "Unified: connecting"); let conn = Connection.create ~sw ~net ~clock ~host:default_host ~resource:"/ws" ~ping_interval:Constants.rtds_ping_interval ~buffer_size:Constants.message_buffer_size () in let message_stream = Eio.Stream.create Constants.message_buffer_size in Connection.start conn; Connection.start_ping conn; Connection.start_parsing_fiber ~sw ~channel_name:"unified" ~conn ~parse:Types.parse_message ~output_stream:message_stream; { conn; message_stream; subscriptions = [] } let stream t = t.message_stream let subscribe t ~subscriptions = Log.debug (fun m -> m "Unified: subscribing to %d channels (total: %d)" (List.length subscriptions) (List.length t.subscriptions + List.length subscriptions)); t.subscriptions <- t.subscriptions @ subscriptions; let msg = Types.subscribe_json ~subscriptions in Connection.send t.conn msg; (* Update stored subscription for reconnect *) let full_msg = Types.subscribe_json ~subscriptions:t.subscriptions in Connection.set_subscription t.conn full_msg let unsubscribe t ~subscriptions = let new_subs = List.filter (fun s -> not (List.exists (fun u -> Types.equal_subscription s u) subscriptions)) t.subscriptions in Log.debug (fun m -> m "Unified: unsubscribing from %d channels (remaining: %d)" (List.length subscriptions) (List.length new_subs)); t.subscriptions <- new_subs; let msg = Types.unsubscribe_json ~subscriptions in Connection.send t.conn msg; (* Update stored subscription for reconnect *) let full_msg = Types.subscribe_json ~subscriptions:t.subscriptions in Connection.set_subscription t.conn full_msg let close t = Connection.close t.conn (** {1 Convenience Clients} *) module Crypto_prices = struct (** Specialized client for crypto price streams *) type source = Binance | Chainlink type t = { conn : Connection.t; message_stream : Types.crypto_message Eio.Stream.t; symbols : string list option; source : source; } let crypto_filter = function `Crypto m -> Some m | _ -> None let connect_binance ~sw ~net ~clock ?symbols () = Log.debug (fun m -> m "Crypto: connecting to Binance%s" (match symbols with | Some s -> Printf.sprintf " (%d symbols)" (List.length s) | None -> "")); let subscription = let filters = Option.map Types.binance_symbol_filter symbols in Types.crypto_prices_subscription ?filters () in let conn, message_stream = make_filtered_client ~sw ~net ~clock ~subscription ~filter:crypto_filter ~channel_name:"crypto_binance" in { conn; message_stream; symbols; source = Binance } let connect_chainlink ~sw ~net ~clock ?symbol () = Log.debug (fun m -> m "Crypto: connecting to Chainlink%s" (match symbol with Some s -> Printf.sprintf " (%s)" s | None -> "")); let subscription = let filters = Option.map Types.chainlink_symbol_filter symbol in Types.crypto_prices_chainlink_subscription ?filters () in let conn, message_stream = make_filtered_client ~sw ~net ~clock ~subscription ~filter:crypto_filter ~channel_name:"crypto_chainlink" in { conn; message_stream; symbols = Option.map (fun s -> [ s ]) symbol; source = Chainlink; } let stream t = t.message_stream let symbols t = t.symbols let source t = t.source let close t = Connection.close t.conn end module Comments = struct (** Specialized client for comment streams *) type t = { conn : Connection.t; message_stream : Types.comment Eio.Stream.t; gamma_auth : Types.gamma_auth option; } let comment_filter = function `Comment m -> Some m | _ -> None let connect ~sw ~net ~clock ?gamma_auth () = Log.debug (fun m -> m "Comments: connecting%s" (if Option.is_some gamma_auth then " (authenticated)" else "")); let subscription = Types.comments_subscription ?gamma_auth () in let conn, message_stream = make_filtered_client ~sw ~net ~clock ~subscription ~filter:comment_filter ~channel_name:"comments" in { conn; message_stream; gamma_auth } let stream t = t.message_stream let gamma_auth t = t.gamma_auth let close t = Connection.close t.conn end
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>