polymarket_apis
Polymarket APIs - Unified interface for Polymarket's various APIs.
This package provides a comprehensive interface to Polymarket's APIs including:
- CLOB (Central Limit Order Book) API for trading
- Gamma API for event and market information
- Data API for user information
- Web3 API for blockchain interactions
- WebSocket API for real-time data streams
- GraphQL API for flexible data queries
1""" 2Polymarket APIs - Unified interface for Polymarket's various APIs. 3 4This package provides a comprehensive interface to Polymarket's APIs including: 5- CLOB (Central Limit Order Book) API for trading 6- Gamma API for event and market information 7- Data API for user information 8- Web3 API for blockchain interactions 9- WebSocket API for real-time data streams 10- GraphQL API for flexible data queries 11""" 12 13__version__ = "0.3.9" 14__author__ = "Razvan Gheorghe" 15__email__ = "razvan@gheorghe.me" 16 17from .clients import ( 18 AsyncPolymarketGraphQLClient, 19 PolymarketClobClient, 20 PolymarketDataClient, 21 PolymarketGammaClient, 22 PolymarketGraphQLClient, 23 PolymarketWeb3Client, 24 PolymarketWebsocketsClient, 25) 26from .types.clob_types import MarketOrderArgs, OrderArgs, OrderType 27 28__all__ = [ 29 "AsyncPolymarketGraphQLClient", 30 "MarketOrderArgs", 31 "OrderArgs", 32 "OrderType", 33 "PolymarketClobClient", 34 "PolymarketDataClient", 35 "PolymarketGammaClient", 36 "PolymarketGraphQLClient", 37 "PolymarketWeb3Client", 38 "PolymarketWebsocketsClient", 39 "__author__", 40 "__email__", 41 "__version__", 42]
37class AsyncPolymarketGraphQLClient: 38 """Asynchronous GraphQL client for Polymarket subgraphs.""" 39 40 def __init__( 41 self, 42 endpoint_name: Literal[ 43 "activity_subgraph", 44 "fpmm_subgraph", 45 "open_interest_subgraph", 46 "orderbook_subgraph", 47 "pnl_subgraph", 48 "positions_subgraph", 49 "sports_oracle_subgraph", 50 "wallet_subgraph", 51 ], 52 ) -> None: 53 endpoint_url = GRAPHQL_ENDPOINTS[endpoint_name] 54 self.transport = HTTPXAsyncTransport(url=endpoint_url) 55 self.client = Client( 56 transport=self.transport, fetch_schema_from_transport=False 57 ) 58 59 async def query(self, query_string: str) -> dict: 60 async with self.client as session: 61 return await session.execute(gql(query_string))
Asynchronous GraphQL client for Polymarket subgraphs.
40 def __init__( 41 self, 42 endpoint_name: Literal[ 43 "activity_subgraph", 44 "fpmm_subgraph", 45 "open_interest_subgraph", 46 "orderbook_subgraph", 47 "pnl_subgraph", 48 "positions_subgraph", 49 "sports_oracle_subgraph", 50 "wallet_subgraph", 51 ], 52 ) -> None: 53 endpoint_url = GRAPHQL_ENDPOINTS[endpoint_name] 54 self.transport = HTTPXAsyncTransport(url=endpoint_url) 55 self.client = Client( 56 transport=self.transport, fetch_schema_from_transport=False 57 )
439class MarketOrderArgs(BaseModel): 440 token_id: str 441 """ 442 TokenID of the Conditional token asset being traded 443 """ 444 445 amount: float 446 """ 447 BUY orders: $$$ Amount to buy 448 SELL orders: Shares to sell 449 """ 450 451 side: str 452 """ 453 Side of the order 454 """ 455 456 price: float = 0 457 """ 458 Price used to create the order 459 """ 460 461 fee_rate_bps: int = 0 462 """ 463 Fee rate, in basis points, charged to the order maker, charged on proceeds. 464 """ 465 466 nonce: int = 0 467 """ 468 Nonce used for onchain cancellations. 469 """ 470 471 taker: str = ADDRESS_ZERO 472 """ 473 Address of the order taker. The zero address is used to indicate a public order. 474 """ 475 476 order_type: OrderType = OrderType.FOK
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
__class_vars__: The names of the class variables defined on the model.
__private_attributes__: Metadata about the private attributes of the model.
__signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The core schema of the model.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
__args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a [`RootModel`][pydantic.root_model.RootModel].
__pydantic_serializer__: The `pydantic-core` `SchemaSerializer` used to dump instances of the model.
__pydantic_validator__: The `pydantic-core` `SchemaValidator` used to validate instances of the model.
__pydantic_fields__: A dictionary of field names and their corresponding [`FieldInfo`][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__: A dictionary of computed field names and their corresponding [`ComputedFieldInfo`][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__: A dictionary containing extra values, if [`extra`][pydantic.config.ConfigDict.extra]
is set to `'allow'`.
__pydantic_fields_set__: The names of fields explicitly set during instantiation.
__pydantic_private__: Values of private attributes set on the model instance.
Address of the order taker. The zero address is used to indicate a public order.
397class OrderArgs(BaseModel): 398 token_id: str 399 """ 400 TokenID of the Conditional token asset being traded 401 """ 402 403 price: float 404 """ 405 Price used to create the order 406 """ 407 408 size: float 409 """ 410 Size in terms of the ConditionalToken 411 """ 412 413 side: str 414 """ 415 Side of the order 416 """ 417 418 fee_rate_bps: int = 0 419 """ 420 Fee rate, in basis points, charged to the order maker, charged on proceeds 421 """ 422 423 nonce: int = 0 424 """ 425 Nonce used for onchain cancellations 426 """ 427 428 expiration: int = 0 429 """ 430 Timestamp after which the order is expired. 431 """ 432 433 taker: str = ADDRESS_ZERO 434 """ 435 Address of the order taker. The zero address is used to indicate a public order. 436 """
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
__class_vars__: The names of the class variables defined on the model.
__private_attributes__: Metadata about the private attributes of the model.
__signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The core schema of the model.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
__args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a [`RootModel`][pydantic.root_model.RootModel].
__pydantic_serializer__: The `pydantic-core` `SchemaSerializer` used to dump instances of the model.
__pydantic_validator__: The `pydantic-core` `SchemaValidator` used to validate instances of the model.
__pydantic_fields__: A dictionary of field names and their corresponding [`FieldInfo`][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__: A dictionary of computed field names and their corresponding [`ComputedFieldInfo`][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__: A dictionary containing extra values, if [`extra`][pydantic.config.ConfigDict.extra]
is set to `'allow'`.
__pydantic_fields_set__: The names of fields explicitly set during instantiation.
__pydantic_private__: Values of private attributes set on the model instance.
371class OrderType(str, Enum): 372 GTC = "GTC" # Good Till Cancelled 373 GTD = "GTD" # Good Till Date 374 FOK = "FOK" # Fill or Kill 375 FAK = "FAK" # Fill and Kill
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
93class PolymarketClobClient: 94 def __init__( 95 self, 96 private_key: str, 97 address: EthAddress, 98 creds: ApiCreds | None = None, 99 chain_id: Literal[137, 80002] = POLYGON, 100 signature_type: Literal[0, 1, 2] = 1, 101 # 0 - EOA wallet, 1 - Proxy wallet, 2 - Gnosis Safe wallet 102 ): 103 self.address = address 104 self.client = httpx.Client(http2=True, timeout=30.0) 105 self.async_client = httpx.AsyncClient(http2=True, timeout=30.0) 106 self.base_url: str = "https://clob.polymarket.com" 107 self.signer = Signer(private_key=private_key, chain_id=chain_id) 108 self.builder = OrderBuilder( 109 signer=self.signer, 110 sig_type=signature_type, 111 funder=address, 112 ) 113 self.creds = creds if creds else self.create_or_derive_api_creds() 114 115 # local cache 116 self.__tick_sizes: dict[str, TickSize] = {} 117 self.__neg_risk: dict[str, bool] = {} 118 self.__fee_rates: dict[str, int] = {} 119 120 def _build_url(self, endpoint: str) -> str: 121 return urljoin(self.base_url, endpoint) 122 123 def get_ok(self) -> str: 124 response = self.client.get(self.base_url) 125 response.raise_for_status() 126 return response.json() 127 128 def create_api_creds(self, nonce: int | None = None) -> ApiCreds: 129 headers = create_level_1_headers(self.signer, nonce) 130 response = self.client.post(self._build_url(CREATE_API_KEY), headers=headers) 131 response.raise_for_status() 132 return ApiCreds(**response.json()) 133 134 def derive_api_key(self, nonce: int | None = None) -> ApiCreds: 135 headers = create_level_1_headers(self.signer, nonce) 136 response = self.client.get(self._build_url(DERIVE_API_KEY), headers=headers) 137 response.raise_for_status() 138 return ApiCreds(**response.json()) 139 140 def create_or_derive_api_creds(self, nonce: int | None = None) -> ApiCreds: 141 try: 142 return self.create_api_creds(nonce) 143 except HTTPStatusError: 144 return self.derive_api_key(nonce) 145 146 def set_api_creds(self, creds: ApiCreds) -> None: 147 self.creds = creds 148 149 def get_api_keys(self) -> dict: 150 request_args = RequestArgs(method="GET", request_path=GET_API_KEYS) 151 headers = create_level_2_headers(self.signer, self.creds, request_args) 152 response = self.client.get(self._build_url(GET_API_KEYS), headers=headers) 153 response.raise_for_status() 154 return response.json() 155 156 def delete_api_keys(self) -> Literal["OK"]: 157 request_args = RequestArgs(method="DELETE", request_path=DELETE_API_KEY) 158 headers = create_level_2_headers(self.signer, self.creds, request_args) 159 response = self.client.delete(self._build_url(DELETE_API_KEY), headers=headers) 160 response.raise_for_status() 161 return response.json() 162 163 def get_utc_time(self) -> datetime: 164 # parse server timestamp into utc datetime 165 response = self.client.get(self._build_url(TIME)) 166 response.raise_for_status() 167 return datetime.fromtimestamp(response.json(), tz=UTC) 168 169 def get_tick_size(self, token_id: str) -> TickSize: 170 if token_id in self.__tick_sizes: 171 return self.__tick_sizes[token_id] 172 173 params = {"token_id": token_id} 174 response = self.client.get(self._build_url(GET_TICK_SIZE), params=params) 175 response.raise_for_status() 176 self.__tick_sizes[token_id] = cast( 177 "TickSize", str(response.json()["minimum_tick_size"]) 178 ) 179 180 return self.__tick_sizes[token_id] 181 182 def get_neg_risk(self, token_id: str) -> bool: 183 if token_id in self.__neg_risk: 184 return self.__neg_risk[token_id] 185 186 params = {"token_id": token_id} 187 response = self.client.get(self._build_url(GET_NEG_RISK), params=params) 188 response.raise_for_status() 189 self.__neg_risk[token_id] = response.json()["neg_risk"] 190 191 return self.__neg_risk[token_id] 192 193 def get_fee_rate_bps(self, token_id: str) -> int: 194 if token_id in self.__fee_rates: 195 return self.__fee_rates[token_id] 196 197 params = {"token_id": token_id} 198 response = self.client.get(self._build_url(GET_FEE_RATE), params=params) 199 response.raise_for_status() 200 fee_rate = response.json().get("base_fee") or 0 201 self.__fee_rates[token_id] = fee_rate 202 203 return fee_rate 204 205 def __resolve_tick_size( 206 self, 207 token_id: str, 208 tick_size: TickSize | None = None, 209 ) -> TickSize: 210 min_tick_size = self.get_tick_size(token_id) 211 if tick_size is not None: 212 if is_tick_size_smaller(tick_size, min_tick_size): 213 msg = f"invalid tick size ({tick_size!s}), minimum for the market is {min_tick_size!s}" 214 raise InvalidTickSizeError(msg) 215 else: 216 tick_size = min_tick_size 217 return tick_size 218 219 def __resolve_fee_rate( 220 self, 221 token_id: str, 222 user_fee_rate: int | None = None, 223 ) -> int: 224 market_fee_rate_bps = self.get_fee_rate_bps(token_id) 225 # If both fee rate on the market and the user supplied fee rate are non-zero, validate that they match 226 # else return the market fee rate 227 if ( 228 market_fee_rate_bps > 0 229 and user_fee_rate is not None 230 and user_fee_rate > 0 231 and user_fee_rate != market_fee_rate_bps 232 ): 233 msg = f"invalid user provided fee rate: ({user_fee_rate}), fee rate for the market must be {market_fee_rate_bps}" 234 raise InvalidFeeRateError(msg) 235 return market_fee_rate_bps 236 237 def get_midpoint(self, token_id: str) -> Midpoint: 238 """Get the mid-market price for the given token.""" 239 params = {"token_id": token_id} 240 response = self.client.get(self._build_url(MID_POINT), params=params) 241 response.raise_for_status() 242 return Midpoint(token_id=token_id, value=float(response.json()["mid"])) 243 244 def get_midpoints(self, token_ids: list[str]) -> dict: 245 """Get the mid-market prices for a set of tokens.""" 246 data = [{"token_id": token_id} for token_id in token_ids] 247 response = self.client.post(self._build_url(MID_POINTS), json=data) 248 response.raise_for_status() 249 return TokenValueDict(**response.json()).root 250 251 def get_spread(self, token_id: str) -> Spread: 252 """Get the spread for the given token.""" 253 params = {"token_id": token_id} 254 response = self.client.get(self._build_url(GET_SPREAD), params=params) 255 response.raise_for_status() 256 return Spread(token_id=token_id, value=float(response.json()["mid"])) 257 258 def get_spreads(self, token_ids: list[str]) -> dict: 259 """Get the spreads for a set of tokens.""" 260 data = [{"token_id": token_id} for token_id in token_ids] 261 response = self.client.post(self._build_url(GET_SPREADS), json=data) 262 response.raise_for_status() 263 return TokenValueDict(**response.json()).root 264 265 def get_price(self, token_id: str, side: Literal["BUY", "SELL"]) -> Price: 266 """Get the market price for the given token and side.""" 267 params = {"token_id": token_id, "side": side} 268 response = self.client.get(self._build_url(PRICE), params=params) 269 response.raise_for_status() 270 return Price(**response.json(), token_id=token_id, side=side) 271 272 def get_prices(self, params: list[BookParams]) -> dict[str, BidAsk]: 273 """Get the market prices for a set of tokens and sides.""" 274 data = [{"token_id": param.token_id, "side": param.side} for param in params] 275 response = self.client.post(self._build_url(GET_PRICES), json=data) 276 response.raise_for_status() 277 return TokenBidAskDict(**response.json()).root 278 279 def get_last_trade_price(self, token_id) -> Price: 280 """Fetches the last trade price for a token_id.""" 281 params = {"token_id": token_id} 282 response = self.client.get(self._build_url(GET_LAST_TRADE_PRICE), params=params) 283 response.raise_for_status() 284 return Price(**response.json(), token_id=token_id) 285 286 def get_last_trades_prices(self, token_ids: list[str]) -> list[Price]: 287 """Fetches the last trades prices for a set of token ids.""" 288 body = [{"token_id": token_id} for token_id in token_ids] 289 response = self.client.post(self._build_url(GET_LAST_TRADES_PRICES), json=body) 290 response.raise_for_status() 291 return [Price(**price) for price in response.json()] 292 293 def get_order_book(self, token_id) -> OrderBookSummary: 294 """Get the orderbook for the given token.""" 295 params = {"token_id": token_id} 296 response = self.client.get(self._build_url(GET_ORDER_BOOK), params=params) 297 response.raise_for_status() 298 return OrderBookSummary(**response.json()) 299 300 def get_order_books(self, token_ids: list[str]) -> list[OrderBookSummary]: 301 """Get the orderbook for a set of tokens.""" 302 body = [{"token_id": token_id} for token_id in token_ids] 303 response = self.client.post(self._build_url(GET_ORDER_BOOKS), json=body) 304 response.raise_for_status() 305 return [OrderBookSummary(**obs) for obs in response.json()] 306 307 async def get_order_books_async( 308 self, token_ids: list[str] 309 ) -> list[OrderBookSummary]: 310 """Get the orderbook for a set of tokens asynchronously.""" 311 body = [{"token_id": token_id} for token_id in token_ids] 312 response = await self.async_client.post( 313 self._build_url(GET_ORDER_BOOKS), json=body 314 ) 315 response.raise_for_status() 316 return [OrderBookSummary(**obs) for obs in response.json()] 317 318 def get_market(self, condition_id) -> ClobMarket: 319 """Get a ClobMarket by condition_id.""" 320 response = self.client.get(self._build_url(GET_MARKET + condition_id)) 321 response.raise_for_status() 322 return ClobMarket(**response.json()) 323 324 def get_markets(self, next_cursor="MA==") -> PaginatedResponse[ClobMarket]: 325 """Get paginated ClobMarkets.""" 326 params = {"next_cursor": next_cursor} 327 response = self.client.get(self._build_url(GET_MARKETS), params=params) 328 response.raise_for_status() 329 return PaginatedResponse[ClobMarket](**response.json()) 330 331 def get_all_markets(self, next_cursor="MA==") -> list[ClobMarket]: 332 """Recursively fetch all ClobMarkets using pagination.""" 333 # Base case: Stop recursion if next_cursor indicates the last page 334 if next_cursor == "LTE=": 335 print("Reached the last page of markets.") 336 return [] 337 338 # Fetch current page of markets 339 paginated_response = self.get_markets(next_cursor=next_cursor) 340 341 # Collect current page data 342 current_markets = paginated_response.data 343 344 # Recursively fetch remaining pages 345 next_page_markets = self.get_all_markets( 346 next_cursor=paginated_response.next_cursor, 347 ) 348 349 # Combine current page data with data from subsequent pages 350 return current_markets + next_page_markets 351 352 def get_recent_history( 353 self, 354 token_id: str, 355 interval: Literal["1h", "6h", "1d", "1w", "1m", "max"] = "1d", 356 fidelity: int = 1, # resolution in minutes 357 ) -> PriceHistory: 358 """Get the recent price history of a token (up to now) - 1h, 6h, 1d, 1w, 1m.""" 359 min_fidelities: dict[str, int] = { 360 "1h": 1, 361 "6h": 1, 362 "1d": 1, 363 "1w": 5, 364 "1m": 10, 365 "max": 2, 366 } 367 368 if fidelity < min_fidelities[interval]: 369 msg = f"invalid filters: minimum fidelity' for '{interval}' range is {min_fidelities.get(interval)}" 370 raise ValueError(msg) 371 372 params: dict[str, int | str] = { 373 "market": token_id, 374 "interval": interval, 375 "fidelity": fidelity, 376 } 377 response = self.client.get(self._build_url("/prices-history"), params=params) 378 response.raise_for_status() 379 return PriceHistory(**response.json(), token_id=token_id) 380 381 def get_history( 382 self, 383 token_id: str, 384 start_time: datetime | None = None, 385 end_time: datetime | None = None, 386 fidelity: int = 2, # resolution in minutes 387 ) -> PriceHistory: 388 """Get the price history of a token between a selected date range of max 15 days or from start_time to now.""" 389 if start_time is None and end_time is None: 390 msg = "At least 'start_time' or ('start_time' and 'end_time') must be provided" 391 raise ValueError(msg) 392 393 if ( 394 start_time 395 and end_time 396 and start_time + timedelta(days=15, seconds=1) < end_time 397 ): 398 msg = "'start_time' - 'end_time' range cannot exceed 15 days. Remove 'end_time' to get prices up to now or set a shorter range." 399 raise ValueError(msg) 400 401 params: dict[str, int | str] = { 402 "market": token_id, 403 "fidelity": fidelity, 404 } 405 if start_time: 406 params["startTs"] = int(start_time.timestamp()) 407 if end_time: 408 params["endTs"] = int(end_time.timestamp()) 409 410 response = self.client.get(self._build_url("/prices-history"), params=params) 411 response.raise_for_status() 412 return PriceHistory(**response.json(), token_id=token_id) 413 414 def get_all_history(self, token_id: str) -> PriceHistory: 415 """Get the full price history of a token.""" 416 return self.get_history( 417 token_id=token_id, 418 start_time=datetime(2020, 1, 1, tzinfo=UTC), 419 ) 420 421 def get_orders( 422 self, 423 order_id: str | None = None, 424 condition_id: Keccak256 | None = None, 425 token_id: str | None = None, 426 next_cursor: str = "MA==", 427 ) -> list[OpenOrder]: 428 """Gets your active orders, filtered by order_id, condition_id, token_id.""" 429 params = {} 430 if order_id: 431 params["id"] = order_id 432 if condition_id: 433 params["market"] = condition_id 434 if token_id: 435 params["asset_id"] = token_id 436 437 request_args = RequestArgs(method="GET", request_path=ORDERS) 438 headers = create_level_2_headers(self.signer, self.creds, request_args) 439 440 results = [] 441 next_cursor = next_cursor if next_cursor is not None else "MA==" 442 while next_cursor != END_CURSOR: 443 params["next_cursor"] = next_cursor 444 response = self.client.get( 445 self._build_url(ORDERS), headers=headers, params=params 446 ) 447 response.raise_for_status() 448 next_cursor = response.json()["next_cursor"] 449 results += [OpenOrder(**order) for order in response.json()["data"]] 450 451 return results 452 453 def create_order( 454 self, order_args: OrderArgs, options: PartialCreateOrderOptions | None = None 455 ) -> SignedOrder: 456 """Creates and signs an order.""" 457 # add resolve_order_options, or similar 458 tick_size = self.__resolve_tick_size( 459 order_args.token_id, 460 options.tick_size if options else None, 461 ) 462 463 if not price_valid(order_args.price, tick_size): 464 msg = f"price ({order_args.price}), min: {tick_size} - max: {1 - float(tick_size)}" 465 raise InvalidPriceError(msg) 466 467 neg_risk = ( 468 options.neg_risk 469 if options and options.neg_risk is not None 470 else self.get_neg_risk(order_args.token_id) 471 ) 472 473 # fee rate 474 fee_rate_bps = self.__resolve_fee_rate( 475 order_args.token_id, order_args.fee_rate_bps 476 ) 477 order_args.fee_rate_bps = fee_rate_bps 478 479 return self.builder.create_order( 480 order_args, 481 CreateOrderOptions( 482 tick_size=tick_size, 483 neg_risk=neg_risk, 484 ), 485 ) 486 487 def post_order( 488 self, order: SignedOrder, order_type: OrderType = OrderType.GTC 489 ) -> OrderPostResponse | None: 490 """Posts a SignedOrder.""" 491 body = order_to_json(order, self.creds.key, order_type) 492 headers = create_level_2_headers( 493 self.signer, 494 self.creds, 495 RequestArgs(method="POST", request_path=POST_ORDER, body=body), 496 ) 497 498 try: 499 response = self.client.post( 500 self._build_url("/order"), 501 headers=headers, 502 content=json.dumps(body).encode("utf-8"), 503 ) 504 response.raise_for_status() 505 return OrderPostResponse(**response.json()) 506 except httpx.HTTPStatusError as exc: 507 msg = f"Client Error '{exc.response.status_code} {exc.response.reason_phrase}' while posting order" 508 logger.warning(msg) 509 error_json = exc.response.json() 510 print("Details:", error_json["error"]) 511 return None 512 513 def create_and_post_order( 514 self, 515 order_args: OrderArgs, 516 options: PartialCreateOrderOptions | None = None, 517 order_type: OrderType = OrderType.GTC, 518 ) -> OrderPostResponse | None: 519 """Utility function to create and publish an order.""" 520 order = self.create_order(order_args, options) 521 return self.post_order(order=order, order_type=order_type) 522 523 def post_orders(self, args: list[PostOrdersArgs]) -> list[OrderPostResponse] | None: 524 """Posts multiple SignedOrders at once.""" 525 body = [ 526 order_to_json(arg.order, self.creds.key, arg.order_type) for arg in args 527 ] 528 headers = create_level_2_headers( 529 self.signer, 530 self.creds, 531 RequestArgs(method="POST", request_path=POST_ORDERS, body=body), 532 ) 533 534 try: 535 response = self.client.post( 536 self._build_url("/orders"), 537 headers=headers, 538 content=json.dumps(body).encode("utf-8"), 539 ) 540 response.raise_for_status() 541 order_responses = [] 542 for index, item in enumerate(response.json()): 543 resp = OrderPostResponse(**item) 544 order_responses.append(resp) 545 if resp.error_msg: 546 msg = ( 547 f"Error posting order in position {index} \n" 548 f"Details: {resp.error_msg}" 549 ) 550 logger.warning(msg) 551 except httpx.HTTPStatusError as exc: 552 msg = f"Client Error '{exc.response.status_code} {exc.response.reason_phrase}' while posting order" 553 logger.warning(msg) 554 error_json = exc.response.json() 555 print("Details:", error_json["error"]) 556 return None 557 else: 558 return order_responses 559 560 def create_and_post_orders( 561 self, args: list[OrderArgs], order_types: list[OrderType] 562 ) -> list[OrderPostResponse] | None: 563 """Utility function to create and publish multiple orders at once.""" 564 return self.post_orders( 565 [ 566 PostOrdersArgs( 567 order=self.create_order(order_args), order_type=order_type 568 ) 569 for order_args, order_type in zip(args, order_types, strict=True) 570 ], 571 ) 572 573 def calculate_market_price( 574 self, token_id: str, side: str, amount: float, order_type: OrderType 575 ) -> float: 576 """Calculates the matching price considering an amount and the current orderbook.""" 577 book = self.get_order_book(token_id) 578 if book is None: 579 msg = "Order book is None" 580 raise MissingOrderbookError(msg) 581 if side == "BUY": 582 if book.asks is None: 583 msg = "No ask orders available" 584 raise LiquidityError(msg) 585 return self.builder.calculate_buy_market_price( 586 book.asks, 587 amount, 588 order_type, 589 ) 590 if side == "SELL": 591 if book.bids is None: 592 msg = "No bid orders available" 593 raise LiquidityError(msg) 594 return self.builder.calculate_sell_market_price( 595 book.bids, 596 amount, 597 order_type, 598 ) 599 msg = 'Side must be "BUY" or "SELL"' 600 raise ValueError(msg) 601 602 def create_market_order( 603 self, 604 order_args: MarketOrderArgs, 605 options: PartialCreateOrderOptions | None = None, 606 ): 607 """Creates and signs a market order.""" 608 tick_size = self.__resolve_tick_size( 609 order_args.token_id, 610 options.tick_size if options else None, 611 ) 612 613 if order_args.price is None or order_args.price <= 0: 614 order_args.price = self.calculate_market_price( 615 order_args.token_id, 616 order_args.side, 617 order_args.amount, 618 order_args.order_type, 619 ) 620 621 if not price_valid(order_args.price, tick_size): 622 msg = f"price ({order_args.price}), min: {tick_size} - max: {1 - float(tick_size)}" 623 raise InvalidPriceError(msg) 624 625 neg_risk = ( 626 options.neg_risk 627 if options and options.neg_risk is not None 628 else self.get_neg_risk(order_args.token_id) 629 ) 630 631 # fee rate 632 fee_rate_bps = self.__resolve_fee_rate( 633 order_args.token_id, order_args.fee_rate_bps 634 ) 635 order_args.fee_rate_bps = fee_rate_bps 636 637 return self.builder.create_market_order( 638 order_args, 639 CreateOrderOptions( 640 tick_size=tick_size, 641 neg_risk=neg_risk, 642 ), 643 ) 644 645 def create_and_post_market_order( 646 self, 647 order_args: MarketOrderArgs, 648 options: PartialCreateOrderOptions | None = None, 649 order_type: OrderType = OrderType.FOK, 650 ) -> OrderPostResponse | None: 651 """Utility function to create and publish a market order.""" 652 order = self.create_market_order(order_args, options) 653 return self.post_order(order=order, order_type=order_type) 654 655 def cancel_order(self, order_id: Keccak256) -> OrderCancelResponse: 656 """Cancels an order.""" 657 body = {"orderID": order_id} 658 659 request_args = RequestArgs(method="DELETE", request_path=CANCEL, body=body) 660 headers = create_level_2_headers(self.signer, self.creds, request_args) 661 662 response = self.client.request( 663 "DELETE", 664 self._build_url(CANCEL), 665 headers=headers, 666 content=json.dumps(body).encode("utf-8"), 667 ) 668 response.raise_for_status() 669 return OrderCancelResponse(**response.json()) 670 671 def cancel_orders(self, order_ids: list[Keccak256]) -> OrderCancelResponse: 672 """Cancels orders.""" 673 body = order_ids 674 675 request_args = RequestArgs( 676 method="DELETE", 677 request_path=CANCEL_ORDERS, 678 body=body, 679 ) 680 headers = create_level_2_headers(self.signer, self.creds, request_args) 681 682 response = self.client.request( 683 "DELETE", 684 self._build_url(CANCEL_ORDERS), 685 headers=headers, 686 content=json.dumps(body).encode("utf-8"), 687 ) 688 response.raise_for_status() 689 return OrderCancelResponse(**response.json()) 690 691 def cancel_all(self) -> OrderCancelResponse: 692 """Cancels all available orders for the user.""" 693 request_args = RequestArgs(method="DELETE", request_path=CANCEL_ALL) 694 headers = create_level_2_headers(self.signer, self.creds, request_args) 695 696 response = self.client.delete(self._build_url(CANCEL_ALL), headers=headers) 697 response.raise_for_status() 698 return OrderCancelResponse(**response.json()) 699 700 def is_order_scoring(self, order_id: Keccak256) -> bool: 701 """Check if the order is currently scoring.""" 702 request_args = RequestArgs(method="GET", request_path=IS_ORDER_SCORING) 703 headers = create_level_2_headers(self.signer, self.creds, request_args) 704 705 response = self.client.get( 706 self._build_url(IS_ORDER_SCORING), 707 headers=headers, 708 params={"order_id": order_id}, 709 ) 710 response.raise_for_status() 711 return response.json()["scoring"] 712 713 def are_orders_scoring(self, order_ids: list[Keccak256]) -> dict[Keccak256, bool]: 714 """Check if the orders are currently scoring.""" 715 body = order_ids 716 request_args = RequestArgs( 717 method="POST", 718 request_path=ARE_ORDERS_SCORING, 719 body=body, 720 ) 721 headers = create_level_2_headers(self.signer, self.creds, request_args) 722 headers["Content-Type"] = "application/json" 723 724 response = self.client.post( 725 self._build_url(ARE_ORDERS_SCORING), headers=headers, json=body 726 ) 727 response.raise_for_status() 728 return response.json() 729 730 def get_market_rewards(self, condition_id: Keccak256) -> MarketRewards: 731 """ 732 Get the MarketRewards for a given market (condition_id). 733 734 - metadata, tokens, max_spread, min_size, rewards_config, market_competitiveness. 735 """ 736 request_args = RequestArgs(method="GET", request_path="/rewards/markets/") 737 headers = create_level_2_headers(self.signer, self.creds, request_args) 738 739 response = self.client.get( 740 self._build_url("/rewards/markets/" + condition_id), headers=headers 741 ) 742 response.raise_for_status() 743 return next(MarketRewards(**market) for market in response.json()["data"]) 744 745 def get_trades( 746 self, 747 condition_id: Keccak256 | None = None, 748 token_id: str | None = None, 749 trade_id: str | None = None, 750 before: datetime | None = None, 751 after: datetime | None = None, 752 address: EthAddress | None = None, 753 next_cursor="MA==", 754 ) -> list[PolygonTrade]: 755 """Fetches the trade history for a user.""" 756 params: dict[str, str | int] = {} 757 if condition_id: 758 params["market"] = condition_id 759 if token_id: 760 params["asset_id"] = token_id 761 if trade_id: 762 params["id"] = trade_id 763 if before: 764 params["before"] = int(before.replace(microsecond=0).timestamp()) 765 if after: 766 params["after"] = int(after.replace(microsecond=0).timestamp()) 767 if address: 768 params["maker_address"] = address 769 770 request_args = RequestArgs(method="GET", request_path=TRADES) 771 headers = create_level_2_headers(self.signer, self.creds, request_args) 772 773 results = [] 774 next_cursor = next_cursor if next_cursor is not None else "MA==" 775 while next_cursor != END_CURSOR: 776 params["next_cursor"] = next_cursor 777 response = self.client.get( 778 self._build_url(TRADES), headers=headers, params=params 779 ) 780 response.raise_for_status() 781 next_cursor = response.json()["next_cursor"] 782 results += [PolygonTrade(**trade) for trade in response.json()["data"]] 783 784 return results 785 786 def get_total_rewards(self, date: datetime | None = None) -> DailyEarnedReward: 787 """Get the total rewards earned on a given date (seems to only hold the 6 most recent data points).""" 788 if date is None: 789 date = datetime.now(UTC) 790 params = { 791 "authenticationType": "magic", 792 "date": f"{date.strftime('%Y-%m-%d')}", 793 } 794 795 request_args = RequestArgs(method="GET", request_path="/rewards/user/total") 796 headers = create_level_2_headers(self.signer, self.creds, request_args) 797 params["l2Headers"] = json.dumps(headers) 798 799 response = self.client.get( 800 "https://polymarket.com/api/rewards/totalEarnings", params=params 801 ) 802 response.raise_for_status() 803 if response.json(): 804 return DailyEarnedReward(**response.json()[0]) 805 return DailyEarnedReward( 806 date=date, 807 asset_address="0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174", 808 maker_address=self.address, 809 earnings=0.0, 810 asset_rate=0.0, 811 ) 812 813 def get_reward_markets( 814 self, 815 query: str | None = None, 816 sort_by: Literal[ 817 "market", 818 "max_spread", 819 "min_size", 820 "rate_per_day", 821 "spread", 822 "price", 823 "earnings", 824 "earning_percentage", 825 ] 826 | None = "market", 827 sort_direction: Literal["ASC", "DESC"] | None = None, 828 show_favorites: bool = False, 829 ) -> list[RewardMarket]: 830 """ 831 Search through markets that offer rewards (polymarket.com/rewards items) by query, sorted by different metrics. If query is empty, returns all markets with rewards. 832 833 - market start date ("market") - TODO confirm this 834 - max spread for rewards in usdc 835 - min size for rewards in shares 836 - reward rate per day in usdc 837 - current spread of a market 838 - current price of a market 839 - your daily earnings on a market - only need auth for these last two 840 - your current earning percentage on a market. 841 """ 842 results = [] 843 desc = {"ASC": False, "DESC": True} 844 params: dict[str, bool | str] = { 845 "authenticationType": "magic", 846 "showFavorites": show_favorites, 847 } 848 if sort_by: 849 params["orderBy"] = sort_by 850 if query: 851 params["query"] = query 852 params["desc"] = False 853 if sort_direction: 854 params["desc"] = desc[sort_direction] 855 856 request_args = RequestArgs(method="GET", request_path="/rewards/user/markets") 857 headers = create_level_2_headers(self.signer, self.creds, request_args) 858 params["l2Headers"] = json.dumps(headers) 859 860 next_cursor = "MA==" 861 while next_cursor != END_CURSOR: 862 params["nextCursor"] = next_cursor 863 response = self.client.get( 864 "https://polymarket.com/api/rewards/markets", params=params 865 ) 866 response.raise_for_status() 867 next_cursor = response.json()["next_cursor"] 868 results += [RewardMarket(**reward) for reward in response.json()["data"]] 869 870 return results 871 872 def __enter__(self): 873 return self 874 875 def __exit__(self, exc_type, exc_val, exc_tb): 876 self.client.close()
94 def __init__( 95 self, 96 private_key: str, 97 address: EthAddress, 98 creds: ApiCreds | None = None, 99 chain_id: Literal[137, 80002] = POLYGON, 100 signature_type: Literal[0, 1, 2] = 1, 101 # 0 - EOA wallet, 1 - Proxy wallet, 2 - Gnosis Safe wallet 102 ): 103 self.address = address 104 self.client = httpx.Client(http2=True, timeout=30.0) 105 self.async_client = httpx.AsyncClient(http2=True, timeout=30.0) 106 self.base_url: str = "https://clob.polymarket.com" 107 self.signer = Signer(private_key=private_key, chain_id=chain_id) 108 self.builder = OrderBuilder( 109 signer=self.signer, 110 sig_type=signature_type, 111 funder=address, 112 ) 113 self.creds = creds if creds else self.create_or_derive_api_creds() 114 115 # local cache 116 self.__tick_sizes: dict[str, TickSize] = {} 117 self.__neg_risk: dict[str, bool] = {} 118 self.__fee_rates: dict[str, int] = {}
149 def get_api_keys(self) -> dict: 150 request_args = RequestArgs(method="GET", request_path=GET_API_KEYS) 151 headers = create_level_2_headers(self.signer, self.creds, request_args) 152 response = self.client.get(self._build_url(GET_API_KEYS), headers=headers) 153 response.raise_for_status() 154 return response.json()
156 def delete_api_keys(self) -> Literal["OK"]: 157 request_args = RequestArgs(method="DELETE", request_path=DELETE_API_KEY) 158 headers = create_level_2_headers(self.signer, self.creds, request_args) 159 response = self.client.delete(self._build_url(DELETE_API_KEY), headers=headers) 160 response.raise_for_status() 161 return response.json()
169 def get_tick_size(self, token_id: str) -> TickSize: 170 if token_id in self.__tick_sizes: 171 return self.__tick_sizes[token_id] 172 173 params = {"token_id": token_id} 174 response = self.client.get(self._build_url(GET_TICK_SIZE), params=params) 175 response.raise_for_status() 176 self.__tick_sizes[token_id] = cast( 177 "TickSize", str(response.json()["minimum_tick_size"]) 178 ) 179 180 return self.__tick_sizes[token_id]
182 def get_neg_risk(self, token_id: str) -> bool: 183 if token_id in self.__neg_risk: 184 return self.__neg_risk[token_id] 185 186 params = {"token_id": token_id} 187 response = self.client.get(self._build_url(GET_NEG_RISK), params=params) 188 response.raise_for_status() 189 self.__neg_risk[token_id] = response.json()["neg_risk"] 190 191 return self.__neg_risk[token_id]
193 def get_fee_rate_bps(self, token_id: str) -> int: 194 if token_id in self.__fee_rates: 195 return self.__fee_rates[token_id] 196 197 params = {"token_id": token_id} 198 response = self.client.get(self._build_url(GET_FEE_RATE), params=params) 199 response.raise_for_status() 200 fee_rate = response.json().get("base_fee") or 0 201 self.__fee_rates[token_id] = fee_rate 202 203 return fee_rate
237 def get_midpoint(self, token_id: str) -> Midpoint: 238 """Get the mid-market price for the given token.""" 239 params = {"token_id": token_id} 240 response = self.client.get(self._build_url(MID_POINT), params=params) 241 response.raise_for_status() 242 return Midpoint(token_id=token_id, value=float(response.json()["mid"]))
Get the mid-market price for the given token.
244 def get_midpoints(self, token_ids: list[str]) -> dict: 245 """Get the mid-market prices for a set of tokens.""" 246 data = [{"token_id": token_id} for token_id in token_ids] 247 response = self.client.post(self._build_url(MID_POINTS), json=data) 248 response.raise_for_status() 249 return TokenValueDict(**response.json()).root
Get the mid-market prices for a set of tokens.
251 def get_spread(self, token_id: str) -> Spread: 252 """Get the spread for the given token.""" 253 params = {"token_id": token_id} 254 response = self.client.get(self._build_url(GET_SPREAD), params=params) 255 response.raise_for_status() 256 return Spread(token_id=token_id, value=float(response.json()["mid"]))
Get the spread for the given token.
258 def get_spreads(self, token_ids: list[str]) -> dict: 259 """Get the spreads for a set of tokens.""" 260 data = [{"token_id": token_id} for token_id in token_ids] 261 response = self.client.post(self._build_url(GET_SPREADS), json=data) 262 response.raise_for_status() 263 return TokenValueDict(**response.json()).root
Get the spreads for a set of tokens.
265 def get_price(self, token_id: str, side: Literal["BUY", "SELL"]) -> Price: 266 """Get the market price for the given token and side.""" 267 params = {"token_id": token_id, "side": side} 268 response = self.client.get(self._build_url(PRICE), params=params) 269 response.raise_for_status() 270 return Price(**response.json(), token_id=token_id, side=side)
Get the market price for the given token and side.
272 def get_prices(self, params: list[BookParams]) -> dict[str, BidAsk]: 273 """Get the market prices for a set of tokens and sides.""" 274 data = [{"token_id": param.token_id, "side": param.side} for param in params] 275 response = self.client.post(self._build_url(GET_PRICES), json=data) 276 response.raise_for_status() 277 return TokenBidAskDict(**response.json()).root
Get the market prices for a set of tokens and sides.
279 def get_last_trade_price(self, token_id) -> Price: 280 """Fetches the last trade price for a token_id.""" 281 params = {"token_id": token_id} 282 response = self.client.get(self._build_url(GET_LAST_TRADE_PRICE), params=params) 283 response.raise_for_status() 284 return Price(**response.json(), token_id=token_id)
Fetches the last trade price for a token_id.
286 def get_last_trades_prices(self, token_ids: list[str]) -> list[Price]: 287 """Fetches the last trades prices for a set of token ids.""" 288 body = [{"token_id": token_id} for token_id in token_ids] 289 response = self.client.post(self._build_url(GET_LAST_TRADES_PRICES), json=body) 290 response.raise_for_status() 291 return [Price(**price) for price in response.json()]
Fetches the last trades prices for a set of token ids.
293 def get_order_book(self, token_id) -> OrderBookSummary: 294 """Get the orderbook for the given token.""" 295 params = {"token_id": token_id} 296 response = self.client.get(self._build_url(GET_ORDER_BOOK), params=params) 297 response.raise_for_status() 298 return OrderBookSummary(**response.json())
Get the orderbook for the given token.
300 def get_order_books(self, token_ids: list[str]) -> list[OrderBookSummary]: 301 """Get the orderbook for a set of tokens.""" 302 body = [{"token_id": token_id} for token_id in token_ids] 303 response = self.client.post(self._build_url(GET_ORDER_BOOKS), json=body) 304 response.raise_for_status() 305 return [OrderBookSummary(**obs) for obs in response.json()]
Get the orderbook for a set of tokens.
307 async def get_order_books_async( 308 self, token_ids: list[str] 309 ) -> list[OrderBookSummary]: 310 """Get the orderbook for a set of tokens asynchronously.""" 311 body = [{"token_id": token_id} for token_id in token_ids] 312 response = await self.async_client.post( 313 self._build_url(GET_ORDER_BOOKS), json=body 314 ) 315 response.raise_for_status() 316 return [OrderBookSummary(**obs) for obs in response.json()]
Get the orderbook for a set of tokens asynchronously.
318 def get_market(self, condition_id) -> ClobMarket: 319 """Get a ClobMarket by condition_id.""" 320 response = self.client.get(self._build_url(GET_MARKET + condition_id)) 321 response.raise_for_status() 322 return ClobMarket(**response.json())
Get a ClobMarket by condition_id.
324 def get_markets(self, next_cursor="MA==") -> PaginatedResponse[ClobMarket]: 325 """Get paginated ClobMarkets.""" 326 params = {"next_cursor": next_cursor} 327 response = self.client.get(self._build_url(GET_MARKETS), params=params) 328 response.raise_for_status() 329 return PaginatedResponse[ClobMarket](**response.json())
Get paginated ClobMarkets.
331 def get_all_markets(self, next_cursor="MA==") -> list[ClobMarket]: 332 """Recursively fetch all ClobMarkets using pagination.""" 333 # Base case: Stop recursion if next_cursor indicates the last page 334 if next_cursor == "LTE=": 335 print("Reached the last page of markets.") 336 return [] 337 338 # Fetch current page of markets 339 paginated_response = self.get_markets(next_cursor=next_cursor) 340 341 # Collect current page data 342 current_markets = paginated_response.data 343 344 # Recursively fetch remaining pages 345 next_page_markets = self.get_all_markets( 346 next_cursor=paginated_response.next_cursor, 347 ) 348 349 # Combine current page data with data from subsequent pages 350 return current_markets + next_page_markets
Recursively fetch all ClobMarkets using pagination.
352 def get_recent_history( 353 self, 354 token_id: str, 355 interval: Literal["1h", "6h", "1d", "1w", "1m", "max"] = "1d", 356 fidelity: int = 1, # resolution in minutes 357 ) -> PriceHistory: 358 """Get the recent price history of a token (up to now) - 1h, 6h, 1d, 1w, 1m.""" 359 min_fidelities: dict[str, int] = { 360 "1h": 1, 361 "6h": 1, 362 "1d": 1, 363 "1w": 5, 364 "1m": 10, 365 "max": 2, 366 } 367 368 if fidelity < min_fidelities[interval]: 369 msg = f"invalid filters: minimum fidelity' for '{interval}' range is {min_fidelities.get(interval)}" 370 raise ValueError(msg) 371 372 params: dict[str, int | str] = { 373 "market": token_id, 374 "interval": interval, 375 "fidelity": fidelity, 376 } 377 response = self.client.get(self._build_url("/prices-history"), params=params) 378 response.raise_for_status() 379 return PriceHistory(**response.json(), token_id=token_id)
Get the recent price history of a token (up to now) - 1h, 6h, 1d, 1w, 1m.
381 def get_history( 382 self, 383 token_id: str, 384 start_time: datetime | None = None, 385 end_time: datetime | None = None, 386 fidelity: int = 2, # resolution in minutes 387 ) -> PriceHistory: 388 """Get the price history of a token between a selected date range of max 15 days or from start_time to now.""" 389 if start_time is None and end_time is None: 390 msg = "At least 'start_time' or ('start_time' and 'end_time') must be provided" 391 raise ValueError(msg) 392 393 if ( 394 start_time 395 and end_time 396 and start_time + timedelta(days=15, seconds=1) < end_time 397 ): 398 msg = "'start_time' - 'end_time' range cannot exceed 15 days. Remove 'end_time' to get prices up to now or set a shorter range." 399 raise ValueError(msg) 400 401 params: dict[str, int | str] = { 402 "market": token_id, 403 "fidelity": fidelity, 404 } 405 if start_time: 406 params["startTs"] = int(start_time.timestamp()) 407 if end_time: 408 params["endTs"] = int(end_time.timestamp()) 409 410 response = self.client.get(self._build_url("/prices-history"), params=params) 411 response.raise_for_status() 412 return PriceHistory(**response.json(), token_id=token_id)
Get the price history of a token between a selected date range of max 15 days or from start_time to now.
414 def get_all_history(self, token_id: str) -> PriceHistory: 415 """Get the full price history of a token.""" 416 return self.get_history( 417 token_id=token_id, 418 start_time=datetime(2020, 1, 1, tzinfo=UTC), 419 )
Get the full price history of a token.
421 def get_orders( 422 self, 423 order_id: str | None = None, 424 condition_id: Keccak256 | None = None, 425 token_id: str | None = None, 426 next_cursor: str = "MA==", 427 ) -> list[OpenOrder]: 428 """Gets your active orders, filtered by order_id, condition_id, token_id.""" 429 params = {} 430 if order_id: 431 params["id"] = order_id 432 if condition_id: 433 params["market"] = condition_id 434 if token_id: 435 params["asset_id"] = token_id 436 437 request_args = RequestArgs(method="GET", request_path=ORDERS) 438 headers = create_level_2_headers(self.signer, self.creds, request_args) 439 440 results = [] 441 next_cursor = next_cursor if next_cursor is not None else "MA==" 442 while next_cursor != END_CURSOR: 443 params["next_cursor"] = next_cursor 444 response = self.client.get( 445 self._build_url(ORDERS), headers=headers, params=params 446 ) 447 response.raise_for_status() 448 next_cursor = response.json()["next_cursor"] 449 results += [OpenOrder(**order) for order in response.json()["data"]] 450 451 return results
Gets your active orders, filtered by order_id, condition_id, token_id.
453 def create_order( 454 self, order_args: OrderArgs, options: PartialCreateOrderOptions | None = None 455 ) -> SignedOrder: 456 """Creates and signs an order.""" 457 # add resolve_order_options, or similar 458 tick_size = self.__resolve_tick_size( 459 order_args.token_id, 460 options.tick_size if options else None, 461 ) 462 463 if not price_valid(order_args.price, tick_size): 464 msg = f"price ({order_args.price}), min: {tick_size} - max: {1 - float(tick_size)}" 465 raise InvalidPriceError(msg) 466 467 neg_risk = ( 468 options.neg_risk 469 if options and options.neg_risk is not None 470 else self.get_neg_risk(order_args.token_id) 471 ) 472 473 # fee rate 474 fee_rate_bps = self.__resolve_fee_rate( 475 order_args.token_id, order_args.fee_rate_bps 476 ) 477 order_args.fee_rate_bps = fee_rate_bps 478 479 return self.builder.create_order( 480 order_args, 481 CreateOrderOptions( 482 tick_size=tick_size, 483 neg_risk=neg_risk, 484 ), 485 )
Creates and signs an order.
487 def post_order( 488 self, order: SignedOrder, order_type: OrderType = OrderType.GTC 489 ) -> OrderPostResponse | None: 490 """Posts a SignedOrder.""" 491 body = order_to_json(order, self.creds.key, order_type) 492 headers = create_level_2_headers( 493 self.signer, 494 self.creds, 495 RequestArgs(method="POST", request_path=POST_ORDER, body=body), 496 ) 497 498 try: 499 response = self.client.post( 500 self._build_url("/order"), 501 headers=headers, 502 content=json.dumps(body).encode("utf-8"), 503 ) 504 response.raise_for_status() 505 return OrderPostResponse(**response.json()) 506 except httpx.HTTPStatusError as exc: 507 msg = f"Client Error '{exc.response.status_code} {exc.response.reason_phrase}' while posting order" 508 logger.warning(msg) 509 error_json = exc.response.json() 510 print("Details:", error_json["error"]) 511 return None
Posts a SignedOrder.
513 def create_and_post_order( 514 self, 515 order_args: OrderArgs, 516 options: PartialCreateOrderOptions | None = None, 517 order_type: OrderType = OrderType.GTC, 518 ) -> OrderPostResponse | None: 519 """Utility function to create and publish an order.""" 520 order = self.create_order(order_args, options) 521 return self.post_order(order=order, order_type=order_type)
Utility function to create and publish an order.
523 def post_orders(self, args: list[PostOrdersArgs]) -> list[OrderPostResponse] | None: 524 """Posts multiple SignedOrders at once.""" 525 body = [ 526 order_to_json(arg.order, self.creds.key, arg.order_type) for arg in args 527 ] 528 headers = create_level_2_headers( 529 self.signer, 530 self.creds, 531 RequestArgs(method="POST", request_path=POST_ORDERS, body=body), 532 ) 533 534 try: 535 response = self.client.post( 536 self._build_url("/orders"), 537 headers=headers, 538 content=json.dumps(body).encode("utf-8"), 539 ) 540 response.raise_for_status() 541 order_responses = [] 542 for index, item in enumerate(response.json()): 543 resp = OrderPostResponse(**item) 544 order_responses.append(resp) 545 if resp.error_msg: 546 msg = ( 547 f"Error posting order in position {index} \n" 548 f"Details: {resp.error_msg}" 549 ) 550 logger.warning(msg) 551 except httpx.HTTPStatusError as exc: 552 msg = f"Client Error '{exc.response.status_code} {exc.response.reason_phrase}' while posting order" 553 logger.warning(msg) 554 error_json = exc.response.json() 555 print("Details:", error_json["error"]) 556 return None 557 else: 558 return order_responses
Posts multiple SignedOrders at once.
560 def create_and_post_orders( 561 self, args: list[OrderArgs], order_types: list[OrderType] 562 ) -> list[OrderPostResponse] | None: 563 """Utility function to create and publish multiple orders at once.""" 564 return self.post_orders( 565 [ 566 PostOrdersArgs( 567 order=self.create_order(order_args), order_type=order_type 568 ) 569 for order_args, order_type in zip(args, order_types, strict=True) 570 ], 571 )
Utility function to create and publish multiple orders at once.
573 def calculate_market_price( 574 self, token_id: str, side: str, amount: float, order_type: OrderType 575 ) -> float: 576 """Calculates the matching price considering an amount and the current orderbook.""" 577 book = self.get_order_book(token_id) 578 if book is None: 579 msg = "Order book is None" 580 raise MissingOrderbookError(msg) 581 if side == "BUY": 582 if book.asks is None: 583 msg = "No ask orders available" 584 raise LiquidityError(msg) 585 return self.builder.calculate_buy_market_price( 586 book.asks, 587 amount, 588 order_type, 589 ) 590 if side == "SELL": 591 if book.bids is None: 592 msg = "No bid orders available" 593 raise LiquidityError(msg) 594 return self.builder.calculate_sell_market_price( 595 book.bids, 596 amount, 597 order_type, 598 ) 599 msg = 'Side must be "BUY" or "SELL"' 600 raise ValueError(msg)
Calculates the matching price considering an amount and the current orderbook.
602 def create_market_order( 603 self, 604 order_args: MarketOrderArgs, 605 options: PartialCreateOrderOptions | None = None, 606 ): 607 """Creates and signs a market order.""" 608 tick_size = self.__resolve_tick_size( 609 order_args.token_id, 610 options.tick_size if options else None, 611 ) 612 613 if order_args.price is None or order_args.price <= 0: 614 order_args.price = self.calculate_market_price( 615 order_args.token_id, 616 order_args.side, 617 order_args.amount, 618 order_args.order_type, 619 ) 620 621 if not price_valid(order_args.price, tick_size): 622 msg = f"price ({order_args.price}), min: {tick_size} - max: {1 - float(tick_size)}" 623 raise InvalidPriceError(msg) 624 625 neg_risk = ( 626 options.neg_risk 627 if options and options.neg_risk is not None 628 else self.get_neg_risk(order_args.token_id) 629 ) 630 631 # fee rate 632 fee_rate_bps = self.__resolve_fee_rate( 633 order_args.token_id, order_args.fee_rate_bps 634 ) 635 order_args.fee_rate_bps = fee_rate_bps 636 637 return self.builder.create_market_order( 638 order_args, 639 CreateOrderOptions( 640 tick_size=tick_size, 641 neg_risk=neg_risk, 642 ), 643 )
Creates and signs a market order.
645 def create_and_post_market_order( 646 self, 647 order_args: MarketOrderArgs, 648 options: PartialCreateOrderOptions | None = None, 649 order_type: OrderType = OrderType.FOK, 650 ) -> OrderPostResponse | None: 651 """Utility function to create and publish a market order.""" 652 order = self.create_market_order(order_args, options) 653 return self.post_order(order=order, order_type=order_type)
Utility function to create and publish a market order.
655 def cancel_order(self, order_id: Keccak256) -> OrderCancelResponse: 656 """Cancels an order.""" 657 body = {"orderID": order_id} 658 659 request_args = RequestArgs(method="DELETE", request_path=CANCEL, body=body) 660 headers = create_level_2_headers(self.signer, self.creds, request_args) 661 662 response = self.client.request( 663 "DELETE", 664 self._build_url(CANCEL), 665 headers=headers, 666 content=json.dumps(body).encode("utf-8"), 667 ) 668 response.raise_for_status() 669 return OrderCancelResponse(**response.json())
Cancels an order.
671 def cancel_orders(self, order_ids: list[Keccak256]) -> OrderCancelResponse: 672 """Cancels orders.""" 673 body = order_ids 674 675 request_args = RequestArgs( 676 method="DELETE", 677 request_path=CANCEL_ORDERS, 678 body=body, 679 ) 680 headers = create_level_2_headers(self.signer, self.creds, request_args) 681 682 response = self.client.request( 683 "DELETE", 684 self._build_url(CANCEL_ORDERS), 685 headers=headers, 686 content=json.dumps(body).encode("utf-8"), 687 ) 688 response.raise_for_status() 689 return OrderCancelResponse(**response.json())
Cancels orders.
691 def cancel_all(self) -> OrderCancelResponse: 692 """Cancels all available orders for the user.""" 693 request_args = RequestArgs(method="DELETE", request_path=CANCEL_ALL) 694 headers = create_level_2_headers(self.signer, self.creds, request_args) 695 696 response = self.client.delete(self._build_url(CANCEL_ALL), headers=headers) 697 response.raise_for_status() 698 return OrderCancelResponse(**response.json())
Cancels all available orders for the user.
700 def is_order_scoring(self, order_id: Keccak256) -> bool: 701 """Check if the order is currently scoring.""" 702 request_args = RequestArgs(method="GET", request_path=IS_ORDER_SCORING) 703 headers = create_level_2_headers(self.signer, self.creds, request_args) 704 705 response = self.client.get( 706 self._build_url(IS_ORDER_SCORING), 707 headers=headers, 708 params={"order_id": order_id}, 709 ) 710 response.raise_for_status() 711 return response.json()["scoring"]
Check if the order is currently scoring.
713 def are_orders_scoring(self, order_ids: list[Keccak256]) -> dict[Keccak256, bool]: 714 """Check if the orders are currently scoring.""" 715 body = order_ids 716 request_args = RequestArgs( 717 method="POST", 718 request_path=ARE_ORDERS_SCORING, 719 body=body, 720 ) 721 headers = create_level_2_headers(self.signer, self.creds, request_args) 722 headers["Content-Type"] = "application/json" 723 724 response = self.client.post( 725 self._build_url(ARE_ORDERS_SCORING), headers=headers, json=body 726 ) 727 response.raise_for_status() 728 return response.json()
Check if the orders are currently scoring.
730 def get_market_rewards(self, condition_id: Keccak256) -> MarketRewards: 731 """ 732 Get the MarketRewards for a given market (condition_id). 733 734 - metadata, tokens, max_spread, min_size, rewards_config, market_competitiveness. 735 """ 736 request_args = RequestArgs(method="GET", request_path="/rewards/markets/") 737 headers = create_level_2_headers(self.signer, self.creds, request_args) 738 739 response = self.client.get( 740 self._build_url("/rewards/markets/" + condition_id), headers=headers 741 ) 742 response.raise_for_status() 743 return next(MarketRewards(**market) for market in response.json()["data"])
Get the MarketRewards for a given market (condition_id).
- metadata, tokens, max_spread, min_size, rewards_config, market_competitiveness.
745 def get_trades( 746 self, 747 condition_id: Keccak256 | None = None, 748 token_id: str | None = None, 749 trade_id: str | None = None, 750 before: datetime | None = None, 751 after: datetime | None = None, 752 address: EthAddress | None = None, 753 next_cursor="MA==", 754 ) -> list[PolygonTrade]: 755 """Fetches the trade history for a user.""" 756 params: dict[str, str | int] = {} 757 if condition_id: 758 params["market"] = condition_id 759 if token_id: 760 params["asset_id"] = token_id 761 if trade_id: 762 params["id"] = trade_id 763 if before: 764 params["before"] = int(before.replace(microsecond=0).timestamp()) 765 if after: 766 params["after"] = int(after.replace(microsecond=0).timestamp()) 767 if address: 768 params["maker_address"] = address 769 770 request_args = RequestArgs(method="GET", request_path=TRADES) 771 headers = create_level_2_headers(self.signer, self.creds, request_args) 772 773 results = [] 774 next_cursor = next_cursor if next_cursor is not None else "MA==" 775 while next_cursor != END_CURSOR: 776 params["next_cursor"] = next_cursor 777 response = self.client.get( 778 self._build_url(TRADES), headers=headers, params=params 779 ) 780 response.raise_for_status() 781 next_cursor = response.json()["next_cursor"] 782 results += [PolygonTrade(**trade) for trade in response.json()["data"]] 783 784 return results
Fetches the trade history for a user.
786 def get_total_rewards(self, date: datetime | None = None) -> DailyEarnedReward: 787 """Get the total rewards earned on a given date (seems to only hold the 6 most recent data points).""" 788 if date is None: 789 date = datetime.now(UTC) 790 params = { 791 "authenticationType": "magic", 792 "date": f"{date.strftime('%Y-%m-%d')}", 793 } 794 795 request_args = RequestArgs(method="GET", request_path="/rewards/user/total") 796 headers = create_level_2_headers(self.signer, self.creds, request_args) 797 params["l2Headers"] = json.dumps(headers) 798 799 response = self.client.get( 800 "https://polymarket.com/api/rewards/totalEarnings", params=params 801 ) 802 response.raise_for_status() 803 if response.json(): 804 return DailyEarnedReward(**response.json()[0]) 805 return DailyEarnedReward( 806 date=date, 807 asset_address="0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174", 808 maker_address=self.address, 809 earnings=0.0, 810 asset_rate=0.0, 811 )
Get the total rewards earned on a given date (seems to only hold the 6 most recent data points).
813 def get_reward_markets( 814 self, 815 query: str | None = None, 816 sort_by: Literal[ 817 "market", 818 "max_spread", 819 "min_size", 820 "rate_per_day", 821 "spread", 822 "price", 823 "earnings", 824 "earning_percentage", 825 ] 826 | None = "market", 827 sort_direction: Literal["ASC", "DESC"] | None = None, 828 show_favorites: bool = False, 829 ) -> list[RewardMarket]: 830 """ 831 Search through markets that offer rewards (polymarket.com/rewards items) by query, sorted by different metrics. If query is empty, returns all markets with rewards. 832 833 - market start date ("market") - TODO confirm this 834 - max spread for rewards in usdc 835 - min size for rewards in shares 836 - reward rate per day in usdc 837 - current spread of a market 838 - current price of a market 839 - your daily earnings on a market - only need auth for these last two 840 - your current earning percentage on a market. 841 """ 842 results = [] 843 desc = {"ASC": False, "DESC": True} 844 params: dict[str, bool | str] = { 845 "authenticationType": "magic", 846 "showFavorites": show_favorites, 847 } 848 if sort_by: 849 params["orderBy"] = sort_by 850 if query: 851 params["query"] = query 852 params["desc"] = False 853 if sort_direction: 854 params["desc"] = desc[sort_direction] 855 856 request_args = RequestArgs(method="GET", request_path="/rewards/user/markets") 857 headers = create_level_2_headers(self.signer, self.creds, request_args) 858 params["l2Headers"] = json.dumps(headers) 859 860 next_cursor = "MA==" 861 while next_cursor != END_CURSOR: 862 params["nextCursor"] = next_cursor 863 response = self.client.get( 864 "https://polymarket.com/api/rewards/markets", params=params 865 ) 866 response.raise_for_status() 867 next_cursor = response.json()["next_cursor"] 868 results += [RewardMarket(**reward) for reward in response.json()["data"]] 869 870 return results
Search through markets that offer rewards (polymarket.com/rewards items) by query, sorted by different metrics. If query is empty, returns all markets with rewards.
- market start date ("market") - TODO confirm this
- max spread for rewards in usdc
- min size for rewards in shares
- reward rate per day in usdc
- current spread of a market
- current price of a market
- your daily earnings on a market - only need auth for these last two
- your current earning percentage on a market.
24class PolymarketDataClient: 25 def __init__(self, base_url: str = "https://data-api.polymarket.com"): 26 self.base_url = base_url 27 self.client = httpx.Client(http2=True, timeout=30.0) 28 self.gql_positions_client = PolymarketGraphQLClient( 29 endpoint_name="positions_subgraph" 30 ) 31 32 def _build_url(self, endpoint: str) -> str: 33 return urljoin(self.base_url, endpoint) 34 35 def get_ok(self) -> str: 36 response = self.client.get(self.base_url) 37 response.raise_for_status() 38 return response.json()["data"] 39 40 def get_all_positions( 41 self, 42 user: EthAddress, 43 size_threshold: float = 0.0, 44 ): 45 # data-api /positions endpoint does not support fetching all positions without filters 46 # a workaround is to use the GraphQL positions subgraph directly 47 query = f"""query {{ 48 userBalances(where: {{ 49 user: "{user.lower()}", 50 balance_gt: "{int(size_threshold * 10**6)}" 51 }}) {{ 52 user 53 asset {{ 54 id 55 condition {{ 56 id 57 }} 58 complement 59 outcomeIndex 60 }} 61 balance 62 }} 63 }} 64 """ 65 66 response = self.gql_positions_client.query(query) 67 return [GQLPosition(**pos) for pos in response["userBalances"]] 68 69 def get_positions( 70 self, 71 user: EthAddress, 72 condition_id: Optional[ 73 Union[str, list[str]] 74 ] = None, # mutually exclusive with event_id 75 event_id: Optional[ 76 Union[int, list[int]] 77 ] = None, # mutually exclusive with condition_id 78 size_threshold: float = 1.0, 79 redeemable: bool = False, 80 mergeable: bool = False, 81 title: Optional[str] = None, 82 limit: int = 100, 83 offset: int = 0, 84 sort_by: Literal[ 85 "TOKENS", 86 "CURRENT", 87 "INITIAL", 88 "CASHPNL", 89 "PERCENTPNL", 90 "TITLE", 91 "RESOLVING", 92 "PRICE", 93 "AVGPRICE", 94 ] = "TOKENS", 95 sort_direction: Literal["ASC", "DESC"] = "DESC", 96 ) -> list[Position]: 97 params: dict[str, str | list[str] | int | float] = { 98 "user": user, 99 "sizeThreshold": size_threshold, 100 "limit": min(limit, 500), 101 "offset": offset, 102 } 103 if isinstance(condition_id, str): 104 params["market"] = condition_id 105 if isinstance(condition_id, list): 106 params["market"] = ",".join(condition_id) 107 if isinstance(event_id, str): 108 params["eventId"] = event_id 109 if isinstance(event_id, list): 110 params["eventId"] = [str(i) for i in event_id] 111 if redeemable is not None: 112 params["redeemable"] = redeemable 113 if mergeable is not None: 114 params["mergeable"] = mergeable 115 if title: 116 params["title"] = title 117 if sort_by: 118 params["sortBy"] = sort_by 119 if sort_direction: 120 params["sortDirection"] = sort_direction 121 122 response = self.client.get(self._build_url("/positions"), params=params) 123 response.raise_for_status() 124 return [Position(**pos) for pos in response.json()] 125 126 def get_trades( 127 self, 128 limit: int = 100, 129 offset: int = 0, 130 taker_only: bool = True, 131 filter_type: Optional[Literal["CASH", "TOKENS"]] = None, 132 filter_amount: Optional[ 133 float 134 ] = None, # must be provided together with filter_type 135 condition_id: Optional[str | list[str]] = None, 136 event_id: Optional[int | list[int]] = None, 137 user: Optional[str] = None, 138 side: Optional[Literal["BUY", "SELL"]] = None, 139 ) -> list[Trade]: 140 params: dict[str, int | bool | float | str | list[str]] = { 141 "limit": min(limit, 500), 142 "offset": offset, 143 "takerOnly": taker_only, 144 } 145 if filter_type: 146 params["filterType"] = filter_type 147 if filter_amount: 148 params["filterAmount"] = filter_amount 149 if isinstance(condition_id, str): 150 params["market"] = condition_id 151 if isinstance(condition_id, list): 152 params["market"] = ",".join(condition_id) 153 if isinstance(event_id, str): 154 params["eventId"] = event_id 155 if isinstance(event_id, list): 156 params["eventId"] = [str(i) for i in event_id] 157 if user: 158 params["user"] = user 159 if side: 160 params["side"] = side 161 162 response = self.client.get(self._build_url("/trades"), params=params) 163 response.raise_for_status() 164 return [Trade(**trade) for trade in response.json()] 165 166 def get_activity( 167 self, 168 user: EthAddress, 169 limit: int = 100, 170 offset: int = 0, 171 condition_id: Optional[Union[str, list[str]]] = None, 172 event_id: Optional[Union[int, list[int]]] = None, 173 type: Optional[ 174 Union[ 175 Literal["TRADE", "SPLIT", "MERGE", "REDEEM", "REWARD", "CONVERSION"], 176 list[ 177 Literal["TRADE", "SPLIT", "MERGE", "REDEEM", "REWARD", "CONVERSION"] 178 ], 179 ] 180 ] = None, 181 start: Optional[datetime] = None, 182 end: Optional[datetime] = None, 183 side: Optional[Literal["BUY", "SELL"]] = None, 184 sort_by: Literal["TIMESTAMP", "TOKENS", "CASH"] = "TIMESTAMP", 185 sort_direction: Literal["ASC", "DESC"] = "DESC", 186 ) -> list[Activity]: 187 params: dict[str, str | list[str] | int] = { 188 "user": user, 189 "limit": min(limit, 500), 190 "offset": offset, 191 } 192 if isinstance(condition_id, str): 193 params["market"] = condition_id 194 if isinstance(condition_id, list): 195 params["market"] = ",".join(condition_id) 196 if isinstance(event_id, str): 197 params["eventId"] = event_id 198 if isinstance(event_id, list): 199 params["eventId"] = [str(i) for i in event_id] 200 if isinstance(type, str): 201 params["type"] = type 202 if isinstance(type, list): 203 params["type"] = ",".join(type) 204 if start: 205 params["start"] = int(start.timestamp()) 206 if end: 207 params["end"] = int(end.timestamp()) 208 if side: 209 params["side"] = side 210 if sort_by: 211 params["sortBy"] = sort_by 212 if sort_direction: 213 params["sortDirection"] = sort_direction 214 215 response = self.client.get(self._build_url("/activity"), params=params) 216 response.raise_for_status() 217 return [Activity(**activity) for activity in response.json()] 218 219 def get_holders( 220 self, 221 condition_id: str, 222 limit: int = 500, 223 min_balance: int = 1, 224 ) -> list[HolderResponse]: 225 """Takes in a condition_id and returns top holders for each corresponding token_id.""" 226 params: dict[str, int | str] = { 227 "market": condition_id, 228 "limit": limit, 229 "min_balance": min_balance, 230 } 231 response = self.client.get(self._build_url("/holders"), params=params) 232 response.raise_for_status() 233 return [HolderResponse(**holder_data) for holder_data in response.json()] 234 235 def get_value( 236 self, 237 user: EthAddress, 238 condition_ids: Optional[Union[str, list[str]]] = None, 239 ) -> ValueResponse: 240 """ 241 Get the current value of a user's position in a set of markets. 242 243 Takes in condition_id as: 244 - None --> total value of positions 245 - str --> value of position 246 - list[str] --> sum of the values of positions. 247 """ 248 params = {"user": user} 249 if isinstance(condition_ids, str): 250 params["market"] = condition_ids 251 if isinstance(condition_ids, list): 252 params["market"] = ",".join(condition_ids) 253 254 response = self.client.get(self._build_url("/value"), params=params) 255 response.raise_for_status() 256 return ValueResponse(**response.json()[0]) 257 258 def get_closed_positions( 259 self, 260 user: EthAddress, 261 condition_ids: Optional[Union[str, list[str]]] = None, 262 ) -> list[Position]: 263 """Get all closed positions.""" 264 params = {"user": user} 265 if isinstance(condition_ids, str): 266 params["market"] = condition_ids 267 if isinstance(condition_ids, list): 268 params["market"] = ",".join(condition_ids) 269 270 response = self.client.get(self._build_url("/closed-positions"), params=params) 271 response.raise_for_status() 272 return [Position(**pos) for pos in response.json()] 273 274 def get_total_markets_traded( 275 self, 276 user: EthAddress, 277 ) -> int: 278 """Get the total number of markets a user has traded in.""" 279 params = {"user": user} 280 281 response = self.client.get(self._build_url("/traded"), params=params) 282 response.raise_for_status() 283 return response.json()["traded"] 284 285 def get_open_interest( 286 self, 287 condition_ids: Optional[Union[str, list[str]]] = None, 288 ) -> list[MarketValue]: 289 """Get open interest.""" 290 params = {} 291 292 if isinstance(condition_ids, str): 293 params["market"] = condition_ids 294 if isinstance(condition_ids, list): 295 params["market"] = ",".join(condition_ids) 296 297 response = self.client.get(self._build_url("/oi"), params=params) 298 response.raise_for_status() 299 return [MarketValue(**oi) for oi in response.json()] 300 301 def get_live_volume( 302 self, 303 event_id: int, 304 ) -> EventLiveVolume: 305 """Get live volume for a given event.""" 306 params = {"id": str(event_id)} 307 308 response = self.client.get(self._build_url("/live-volume"), params=params) 309 response.raise_for_status() 310 return EventLiveVolume(**response.json()[0]) 311 312 # website endpoints 313 314 def get_pnl( 315 self, 316 user: EthAddress, 317 period: Literal["all", "1m", "1w", "1d"] = "all", 318 frequency: Literal["1h", "3h", "12h", "1d"] = "1h", 319 ) -> list[TimeseriesPoint]: 320 """Get a user's PnL timeseries in the last day, week, month or all with a given frequency.""" 321 params = { 322 "user_address": user, 323 "interval": period, 324 "fidelity": frequency, 325 } 326 327 response = self.client.get( 328 "https://user-pnl-api.polymarket.com/user-pnl", params=params 329 ) 330 response.raise_for_status() 331 return [TimeseriesPoint(**point) for point in response.json()] 332 333 def get_user_metric( 334 self, 335 user: EthAddress, 336 metric: Literal["profit", "volume"] = "profit", 337 window: Literal["1d", "7d", "30d", "all"] = "all", 338 ): 339 """Get a user's overall profit or volume in the last day, week, month or all.""" 340 params: dict[str, int | str] = { 341 "address": user, 342 "window": window, 343 "limit": 1, 344 } 345 response = self.client.get( 346 "https://lb-api.polymarket.com/" + metric, params=params 347 ) 348 response.raise_for_status() 349 return UserMetric(**response.json()[0]) 350 351 def get_leaderboard_user_rank( 352 self, 353 user: EthAddress, 354 metric: Literal["profit", "volume"] = "profit", 355 window: Literal["1d", "7d", "30d", "all"] = "all", 356 ): 357 """Get a user's rank on the leaderboard by profit or volume.""" 358 params = { 359 "address": user, 360 "window": window, 361 "rankType": "pnl" if metric == "profit" else "vol", 362 } 363 response = self.client.get("https://lb-api.polymarket.com/rank", params=params) 364 response.raise_for_status() 365 return UserRank(**response.json()[0]) 366 367 def get_leaderboard_top_users( 368 self, 369 metric: Literal["profit", "volume"] = "profit", 370 window: Literal["1d", "7d", "30d", "all"] = "all", 371 limit: int = 100, 372 ): 373 """Get the leaderboard of the top at most 100 users by profit or volume.""" 374 params = { 375 "window": window, 376 "limit": limit, 377 } 378 response = self.client.get( 379 "https://lb-api.polymarket.com/" + metric, params=params 380 ) 381 response.raise_for_status() 382 return [UserMetric(**user) for user in response.json()] 383 384 def __enter__(self): 385 return self 386 387 def __exit__(self, exc_type, exc_val, exc_tb): 388 self.client.close()
40 def get_all_positions( 41 self, 42 user: EthAddress, 43 size_threshold: float = 0.0, 44 ): 45 # data-api /positions endpoint does not support fetching all positions without filters 46 # a workaround is to use the GraphQL positions subgraph directly 47 query = f"""query {{ 48 userBalances(where: {{ 49 user: "{user.lower()}", 50 balance_gt: "{int(size_threshold * 10**6)}" 51 }}) {{ 52 user 53 asset {{ 54 id 55 condition {{ 56 id 57 }} 58 complement 59 outcomeIndex 60 }} 61 balance 62 }} 63 }} 64 """ 65 66 response = self.gql_positions_client.query(query) 67 return [GQLPosition(**pos) for pos in response["userBalances"]]
69 def get_positions( 70 self, 71 user: EthAddress, 72 condition_id: Optional[ 73 Union[str, list[str]] 74 ] = None, # mutually exclusive with event_id 75 event_id: Optional[ 76 Union[int, list[int]] 77 ] = None, # mutually exclusive with condition_id 78 size_threshold: float = 1.0, 79 redeemable: bool = False, 80 mergeable: bool = False, 81 title: Optional[str] = None, 82 limit: int = 100, 83 offset: int = 0, 84 sort_by: Literal[ 85 "TOKENS", 86 "CURRENT", 87 "INITIAL", 88 "CASHPNL", 89 "PERCENTPNL", 90 "TITLE", 91 "RESOLVING", 92 "PRICE", 93 "AVGPRICE", 94 ] = "TOKENS", 95 sort_direction: Literal["ASC", "DESC"] = "DESC", 96 ) -> list[Position]: 97 params: dict[str, str | list[str] | int | float] = { 98 "user": user, 99 "sizeThreshold": size_threshold, 100 "limit": min(limit, 500), 101 "offset": offset, 102 } 103 if isinstance(condition_id, str): 104 params["market"] = condition_id 105 if isinstance(condition_id, list): 106 params["market"] = ",".join(condition_id) 107 if isinstance(event_id, str): 108 params["eventId"] = event_id 109 if isinstance(event_id, list): 110 params["eventId"] = [str(i) for i in event_id] 111 if redeemable is not None: 112 params["redeemable"] = redeemable 113 if mergeable is not None: 114 params["mergeable"] = mergeable 115 if title: 116 params["title"] = title 117 if sort_by: 118 params["sortBy"] = sort_by 119 if sort_direction: 120 params["sortDirection"] = sort_direction 121 122 response = self.client.get(self._build_url("/positions"), params=params) 123 response.raise_for_status() 124 return [Position(**pos) for pos in response.json()]
126 def get_trades( 127 self, 128 limit: int = 100, 129 offset: int = 0, 130 taker_only: bool = True, 131 filter_type: Optional[Literal["CASH", "TOKENS"]] = None, 132 filter_amount: Optional[ 133 float 134 ] = None, # must be provided together with filter_type 135 condition_id: Optional[str | list[str]] = None, 136 event_id: Optional[int | list[int]] = None, 137 user: Optional[str] = None, 138 side: Optional[Literal["BUY", "SELL"]] = None, 139 ) -> list[Trade]: 140 params: dict[str, int | bool | float | str | list[str]] = { 141 "limit": min(limit, 500), 142 "offset": offset, 143 "takerOnly": taker_only, 144 } 145 if filter_type: 146 params["filterType"] = filter_type 147 if filter_amount: 148 params["filterAmount"] = filter_amount 149 if isinstance(condition_id, str): 150 params["market"] = condition_id 151 if isinstance(condition_id, list): 152 params["market"] = ",".join(condition_id) 153 if isinstance(event_id, str): 154 params["eventId"] = event_id 155 if isinstance(event_id, list): 156 params["eventId"] = [str(i) for i in event_id] 157 if user: 158 params["user"] = user 159 if side: 160 params["side"] = side 161 162 response = self.client.get(self._build_url("/trades"), params=params) 163 response.raise_for_status() 164 return [Trade(**trade) for trade in response.json()]
166 def get_activity( 167 self, 168 user: EthAddress, 169 limit: int = 100, 170 offset: int = 0, 171 condition_id: Optional[Union[str, list[str]]] = None, 172 event_id: Optional[Union[int, list[int]]] = None, 173 type: Optional[ 174 Union[ 175 Literal["TRADE", "SPLIT", "MERGE", "REDEEM", "REWARD", "CONVERSION"], 176 list[ 177 Literal["TRADE", "SPLIT", "MERGE", "REDEEM", "REWARD", "CONVERSION"] 178 ], 179 ] 180 ] = None, 181 start: Optional[datetime] = None, 182 end: Optional[datetime] = None, 183 side: Optional[Literal["BUY", "SELL"]] = None, 184 sort_by: Literal["TIMESTAMP", "TOKENS", "CASH"] = "TIMESTAMP", 185 sort_direction: Literal["ASC", "DESC"] = "DESC", 186 ) -> list[Activity]: 187 params: dict[str, str | list[str] | int] = { 188 "user": user, 189 "limit": min(limit, 500), 190 "offset": offset, 191 } 192 if isinstance(condition_id, str): 193 params["market"] = condition_id 194 if isinstance(condition_id, list): 195 params["market"] = ",".join(condition_id) 196 if isinstance(event_id, str): 197 params["eventId"] = event_id 198 if isinstance(event_id, list): 199 params["eventId"] = [str(i) for i in event_id] 200 if isinstance(type, str): 201 params["type"] = type 202 if isinstance(type, list): 203 params["type"] = ",".join(type) 204 if start: 205 params["start"] = int(start.timestamp()) 206 if end: 207 params["end"] = int(end.timestamp()) 208 if side: 209 params["side"] = side 210 if sort_by: 211 params["sortBy"] = sort_by 212 if sort_direction: 213 params["sortDirection"] = sort_direction 214 215 response = self.client.get(self._build_url("/activity"), params=params) 216 response.raise_for_status() 217 return [Activity(**activity) for activity in response.json()]
219 def get_holders( 220 self, 221 condition_id: str, 222 limit: int = 500, 223 min_balance: int = 1, 224 ) -> list[HolderResponse]: 225 """Takes in a condition_id and returns top holders for each corresponding token_id.""" 226 params: dict[str, int | str] = { 227 "market": condition_id, 228 "limit": limit, 229 "min_balance": min_balance, 230 } 231 response = self.client.get(self._build_url("/holders"), params=params) 232 response.raise_for_status() 233 return [HolderResponse(**holder_data) for holder_data in response.json()]
Takes in a condition_id and returns top holders for each corresponding token_id.
235 def get_value( 236 self, 237 user: EthAddress, 238 condition_ids: Optional[Union[str, list[str]]] = None, 239 ) -> ValueResponse: 240 """ 241 Get the current value of a user's position in a set of markets. 242 243 Takes in condition_id as: 244 - None --> total value of positions 245 - str --> value of position 246 - list[str] --> sum of the values of positions. 247 """ 248 params = {"user": user} 249 if isinstance(condition_ids, str): 250 params["market"] = condition_ids 251 if isinstance(condition_ids, list): 252 params["market"] = ",".join(condition_ids) 253 254 response = self.client.get(self._build_url("/value"), params=params) 255 response.raise_for_status() 256 return ValueResponse(**response.json()[0])
Get the current value of a user's position in a set of markets.
Takes in condition_id as: - None --> total value of positions - str --> value of position - list[str] --> sum of the values of positions.
258 def get_closed_positions( 259 self, 260 user: EthAddress, 261 condition_ids: Optional[Union[str, list[str]]] = None, 262 ) -> list[Position]: 263 """Get all closed positions.""" 264 params = {"user": user} 265 if isinstance(condition_ids, str): 266 params["market"] = condition_ids 267 if isinstance(condition_ids, list): 268 params["market"] = ",".join(condition_ids) 269 270 response = self.client.get(self._build_url("/closed-positions"), params=params) 271 response.raise_for_status() 272 return [Position(**pos) for pos in response.json()]
Get all closed positions.
274 def get_total_markets_traded( 275 self, 276 user: EthAddress, 277 ) -> int: 278 """Get the total number of markets a user has traded in.""" 279 params = {"user": user} 280 281 response = self.client.get(self._build_url("/traded"), params=params) 282 response.raise_for_status() 283 return response.json()["traded"]
Get the total number of markets a user has traded in.
285 def get_open_interest( 286 self, 287 condition_ids: Optional[Union[str, list[str]]] = None, 288 ) -> list[MarketValue]: 289 """Get open interest.""" 290 params = {} 291 292 if isinstance(condition_ids, str): 293 params["market"] = condition_ids 294 if isinstance(condition_ids, list): 295 params["market"] = ",".join(condition_ids) 296 297 response = self.client.get(self._build_url("/oi"), params=params) 298 response.raise_for_status() 299 return [MarketValue(**oi) for oi in response.json()]
Get open interest.
301 def get_live_volume( 302 self, 303 event_id: int, 304 ) -> EventLiveVolume: 305 """Get live volume for a given event.""" 306 params = {"id": str(event_id)} 307 308 response = self.client.get(self._build_url("/live-volume"), params=params) 309 response.raise_for_status() 310 return EventLiveVolume(**response.json()[0])
Get live volume for a given event.
314 def get_pnl( 315 self, 316 user: EthAddress, 317 period: Literal["all", "1m", "1w", "1d"] = "all", 318 frequency: Literal["1h", "3h", "12h", "1d"] = "1h", 319 ) -> list[TimeseriesPoint]: 320 """Get a user's PnL timeseries in the last day, week, month or all with a given frequency.""" 321 params = { 322 "user_address": user, 323 "interval": period, 324 "fidelity": frequency, 325 } 326 327 response = self.client.get( 328 "https://user-pnl-api.polymarket.com/user-pnl", params=params 329 ) 330 response.raise_for_status() 331 return [TimeseriesPoint(**point) for point in response.json()]
Get a user's PnL timeseries in the last day, week, month or all with a given frequency.
333 def get_user_metric( 334 self, 335 user: EthAddress, 336 metric: Literal["profit", "volume"] = "profit", 337 window: Literal["1d", "7d", "30d", "all"] = "all", 338 ): 339 """Get a user's overall profit or volume in the last day, week, month or all.""" 340 params: dict[str, int | str] = { 341 "address": user, 342 "window": window, 343 "limit": 1, 344 } 345 response = self.client.get( 346 "https://lb-api.polymarket.com/" + metric, params=params 347 ) 348 response.raise_for_status() 349 return UserMetric(**response.json()[0])
Get a user's overall profit or volume in the last day, week, month or all.
351 def get_leaderboard_user_rank( 352 self, 353 user: EthAddress, 354 metric: Literal["profit", "volume"] = "profit", 355 window: Literal["1d", "7d", "30d", "all"] = "all", 356 ): 357 """Get a user's rank on the leaderboard by profit or volume.""" 358 params = { 359 "address": user, 360 "window": window, 361 "rankType": "pnl" if metric == "profit" else "vol", 362 } 363 response = self.client.get("https://lb-api.polymarket.com/rank", params=params) 364 response.raise_for_status() 365 return UserRank(**response.json()[0])
Get a user's rank on the leaderboard by profit or volume.
367 def get_leaderboard_top_users( 368 self, 369 metric: Literal["profit", "volume"] = "profit", 370 window: Literal["1d", "7d", "30d", "all"] = "all", 371 limit: int = 100, 372 ): 373 """Get the leaderboard of the top at most 100 users by profit or volume.""" 374 params = { 375 "window": window, 376 "limit": limit, 377 } 378 response = self.client.get( 379 "https://lb-api.polymarket.com/" + metric, params=params 380 ) 381 response.raise_for_status() 382 return [UserMetric(**user) for user in response.json()]
Get the leaderboard of the top at most 100 users by profit or volume.
31class PolymarketGammaClient: 32 def __init__(self, base_url: str = "https://gamma-api.polymarket.com"): 33 self.base_url = base_url 34 self.client = httpx.Client(http2=True, timeout=30.0) 35 36 def _build_url(self, endpoint: str) -> str: 37 return urljoin(self.base_url, endpoint) 38 39 def search( 40 self, 41 query: str, 42 cache: Optional[bool] = None, 43 status: Optional[Literal["active", "resolved"]] = None, 44 limit_per_type: Optional[int] = None, # max is 50 45 page: Optional[int] = None, 46 tags: Optional[list[str]] = None, 47 keep_closed_markets: Optional[bool] = None, 48 sort: Optional[ 49 Literal[ 50 "volume", 51 "volume_24hr", 52 "liquidity", 53 "start_date", 54 "end_date", 55 "competitive", 56 ] 57 ] = None, 58 ascending: Optional[bool] = None, 59 search_tags: Optional[bool] = None, 60 search_profiles: Optional[bool] = None, 61 recurrence: Optional[ 62 Literal["hourly", "daily", "weekly", "monthly", "annual"] 63 ] = None, 64 exclude_tag_ids: Optional[list[int]] = None, 65 optimized: Optional[bool] = None, 66 ) -> SearchResult: 67 params: dict[str, str | list[str] | int | bool] = { 68 "q": query, 69 } 70 if cache is not None: 71 params["cache"] = str(cache).lower() 72 if status: 73 params["events_status"] = status 74 if limit_per_type: 75 params["limit_per_type"] = limit_per_type 76 if page: 77 params["page"] = page 78 if tags: 79 params["events_tag"] = json.dumps([json.dumps(item) for item in tags]) 80 if keep_closed_markets is not None: 81 params["keep_closed_markets"] = keep_closed_markets 82 if sort: 83 params["sort"] = sort 84 if ascending is not None: 85 params["ascending"] = str(ascending).lower() 86 if search_tags is not None: 87 params["search_tags"] = str(search_tags).lower() 88 if search_profiles is not None: 89 params["search_profiles"] = str(search_profiles).lower() 90 if recurrence: 91 params["recurrence"] = recurrence 92 if exclude_tag_ids: 93 params["exclude_tag_id"] = [str(i) for i in exclude_tag_ids] 94 if optimized is not None: 95 params["optimized"] = str(optimized).lower() 96 response = self.client.get(self._build_url("/public-search"), params=params) 97 response.raise_for_status() 98 return SearchResult(**response.json()) 99 100 def get_market(self, market_id: str) -> GammaMarket: 101 """Get a GammaMarket by market_id.""" 102 response = self.client.get(self._build_url(f"/markets/{market_id}")) 103 response.raise_for_status() 104 return GammaMarket(**response.json()) 105 106 def get_markets( 107 self, 108 limit: int | None = None, 109 offset: int | None = None, 110 order: str | None = None, 111 ascending: bool = True, 112 archived: bool | None = None, 113 active: bool | None = None, 114 closed: bool | None = None, 115 slugs: list[str] | None = None, 116 market_ids: list[int] | None = None, 117 token_ids: list[str] | None = None, 118 condition_ids: list[str] | None = None, 119 tag_id: int | None = None, 120 related_tags: bool | None = False, 121 liquidity_num_min: float | None = None, 122 liquidity_num_max: float | None = None, 123 volume_num_min: float | None = None, 124 volume_num_max: float | None = None, 125 start_date_min: datetime | None = None, 126 start_date_max: datetime | None = None, 127 end_date_min: datetime | None = None, 128 end_date_max: datetime | None = None, 129 ) -> list[GammaMarket]: 130 params: dict[str, float | int | list[int] | str | list[str] | bool] = {} 131 if limit: 132 params["limit"] = limit 133 if offset: 134 params["offset"] = offset 135 if order: 136 params["order"] = order 137 params["ascending"] = ascending 138 if slugs: 139 params["slug"] = slugs 140 if archived is not None: 141 params["archived"] = archived 142 if active is not None: 143 params["active"] = active 144 if closed is not None: 145 params["closed"] = closed 146 if market_ids: 147 params["id"] = market_ids 148 if token_ids: 149 params["clob_token_ids"] = token_ids 150 if condition_ids: 151 params["condition_ids"] = condition_ids 152 if liquidity_num_min: 153 params["liquidity_num_min"] = liquidity_num_min 154 if liquidity_num_max: 155 params["liquidity_num_max"] = liquidity_num_max 156 if volume_num_min: 157 params["volume_num_min"] = volume_num_min 158 if volume_num_max: 159 params["volume_num_max"] = volume_num_max 160 if start_date_min: 161 params["start_date_min"] = start_date_min.isoformat() 162 if start_date_max: 163 params["start_date_max"] = start_date_max.isoformat() 164 if end_date_min: 165 params["end_date_min"] = end_date_min.isoformat() 166 if end_date_max: 167 params["end_date_max"] = end_date_max.isoformat() 168 if tag_id: 169 params["tag_id"] = tag_id 170 if related_tags: 171 params["related_tags"] = related_tags 172 173 response = self.client.get(self._build_url("/markets"), params=params) 174 response.raise_for_status() 175 return [GammaMarket(**market) for market in response.json()] 176 177 def get_market_by_id( 178 self, market_id: str, include_tag: Optional[bool] = None 179 ) -> GammaMarket: 180 params = {} 181 if include_tag: 182 params["include_tag"] = include_tag 183 response = self.client.get( 184 self._build_url(f"/markets/{market_id}"), params=params 185 ) 186 response.raise_for_status() 187 return GammaMarket(**response.json()) 188 189 def get_market_tags(self, market_id: str) -> list[Tag]: 190 response = self.client.get(self._build_url(f"/markets/{market_id}/tags")) 191 response.raise_for_status() 192 return [Tag(**tag) for tag in response.json()] 193 194 def get_market_by_slug( 195 self, slug: str, include_tag: Optional[bool] = None 196 ) -> GammaMarket: 197 params = {} 198 if include_tag: 199 params["include_tag"] = include_tag 200 response = self.client.get( 201 self._build_url(f"/markets/slug/{slug}"), params=params 202 ) 203 response.raise_for_status() 204 return GammaMarket(**response.json()) 205 206 def get_events( 207 self, 208 limit: int = 500, 209 offset: int = 0, 210 order: Optional[str] = None, 211 ascending: bool = True, 212 event_ids: Optional[Union[str, list[str]]] = None, 213 slugs: Optional[list[str]] = None, 214 archived: Optional[bool] = None, 215 active: Optional[bool] = None, 216 closed: Optional[bool] = None, 217 liquidity_min: Optional[float] = None, 218 liquidity_max: Optional[float] = None, 219 volume_min: Optional[float] = None, 220 volume_max: Optional[float] = None, 221 start_date_min: Optional[datetime] = None, 222 start_date_max: Optional[datetime] = None, 223 end_date_min: Optional[datetime] = None, 224 end_date_max: Optional[datetime] = None, 225 tag: Optional[str] = None, 226 tag_id: Optional[int] = None, 227 tag_slug: Optional[str] = None, 228 related_tags: bool = False, 229 ) -> list[Event]: 230 params: dict[str, int | str | list[str] | float] = { 231 "limit": limit, 232 "offset": offset, 233 } 234 if order: 235 params["order"] = order 236 params["ascending"] = ascending 237 if event_ids: 238 params["id"] = event_ids 239 if slugs: 240 params["slug"] = slugs 241 if archived is not None: 242 params["archived"] = archived 243 if active is not None: 244 params["active"] = active 245 if closed is not None: 246 params["closed"] = closed 247 if liquidity_min: 248 params["liquidity_min"] = liquidity_min 249 if liquidity_max: 250 params["liquidity_max"] = liquidity_max 251 if volume_min: 252 params["volume_min"] = volume_min 253 if volume_max: 254 params["volume_max"] = volume_max 255 if start_date_min: 256 params["start_date_min"] = start_date_min.isoformat() 257 if start_date_max: 258 params["start_date_max"] = start_date_max.isoformat() 259 if end_date_min: 260 params["end_date_min"] = end_date_min.isoformat() 261 if end_date_max: 262 params["end_date_max"] = end_date_max.isoformat() 263 if tag: 264 params["tag"] = tag 265 elif tag_id: 266 params["tag_id"] = tag_id 267 if related_tags: 268 params["related_tags"] = related_tags 269 elif tag_slug: 270 params["tag_slug"] = tag_slug 271 272 response = self.client.get(self._build_url("/events"), params=params) 273 response.raise_for_status() 274 return [Event(**event) for event in response.json()] 275 276 def get_all_events( 277 self, 278 order: Optional[str] = None, 279 ascending: bool = True, 280 event_ids: Optional[Union[str, list[str]]] = None, 281 slugs: Optional[list[str]] = None, 282 archived: Optional[bool] = None, 283 active: Optional[bool] = None, 284 closed: Optional[bool] = None, 285 liquidity_min: Optional[float] = None, 286 liquidity_max: Optional[float] = None, 287 volume_min: Optional[float] = None, 288 volume_max: Optional[float] = None, 289 start_date_min: Optional[datetime] = None, 290 start_date_max: Optional[datetime] = None, 291 end_date_min: Optional[datetime] = None, 292 end_date_max: Optional[datetime] = None, 293 tag: Optional[str] = None, 294 tag_id: Optional[int] = None, 295 tag_slug: Optional[str] = None, 296 related_tags: bool = False, 297 ) -> list[Event]: 298 offset = 0 299 events = [] 300 301 while True: 302 part = self.get_events( 303 offset=offset, 304 order=order, 305 ascending=ascending, 306 event_ids=event_ids, 307 slugs=slugs, 308 archived=archived, 309 active=active, 310 closed=closed, 311 liquidity_min=liquidity_min, 312 liquidity_max=liquidity_max, 313 volume_min=volume_min, 314 volume_max=volume_max, 315 start_date_min=start_date_min, 316 start_date_max=start_date_max, 317 end_date_min=end_date_min, 318 end_date_max=end_date_max, 319 tag=tag, 320 tag_id=tag_id, 321 tag_slug=tag_slug, 322 related_tags=related_tags, 323 ) 324 events.extend(part) 325 326 if len(part) < 500: 327 break 328 329 offset += 500 330 331 return events 332 333 def get_event_by_id( 334 self, 335 event_id: int, 336 include_chat: Optional[bool] = None, 337 include_template: Optional[bool] = None, 338 ) -> Event: 339 params = {} 340 if include_chat: 341 params["include_chat"] = include_chat 342 if include_template: 343 params["include_template"] = include_template 344 response = self.client.get( 345 self._build_url(f"/events/{event_id}"), params=params 346 ) 347 response.raise_for_status() 348 return Event(**response.json()) 349 350 def get_event_by_slug( 351 self, 352 slug: str, 353 include_chat: Optional[bool] = None, 354 include_template: Optional[bool] = None, 355 ) -> Event: 356 params = {} 357 if include_chat: 358 params["include_chat"] = include_chat 359 if include_template: 360 params["include_template"] = include_template 361 response = self.client.get( 362 self._build_url(f"/events/slug/{slug}"), params=params 363 ) 364 response.raise_for_status() 365 return Event(**response.json()) 366 367 def get_event_tags(self, event_id: int) -> list[Tag]: 368 response = self.client.get(self._build_url(f"/events/{event_id}/tags")) 369 response.raise_for_status() 370 return [Tag(**tag) for tag in response.json()] 371 372 def get_teams( 373 self, 374 limit: int = 500, 375 offset: int = 0, 376 order: Optional[ 377 Literal[ 378 "id", 379 "name", 380 "league", 381 "record", 382 "logo", 383 "abbreviation", 384 "alias", 385 "createdAt", 386 "updatedAt", 387 ] 388 ] = None, 389 ascending: bool = True, 390 league: Optional[str] = None, 391 name: Optional[str] = None, 392 abbreviation: Optional[str] = None, 393 ) -> list[Team]: 394 params: dict[str, int | str] = { 395 "limit": limit, 396 "offset": offset, 397 } 398 if order: 399 params["order"] = order 400 params["ascending"] = str(ascending).lower() 401 if league: 402 params["league"] = league.lower() 403 if name: 404 params["name"] = name 405 if abbreviation: 406 params["abbreviation"] = abbreviation.lower() 407 response = self.client.get(self._build_url("/teams"), params=params) 408 response.raise_for_status() 409 return [Team(**team) for team in response.json()] 410 411 def get_all_teams( 412 self, 413 order: Optional[ 414 Literal[ 415 "id", 416 "name", 417 "league", 418 "record", 419 "logo", 420 "abbreviation", 421 "alias", 422 "createdAt", 423 "updatedAt", 424 ] 425 ] = None, 426 ascending: bool = True, 427 league: Optional[str] = None, 428 name: Optional[str] = None, 429 abbreviation: Optional[str] = None, 430 ) -> list[Team]: 431 offset = 0 432 teams = [] 433 434 while True: 435 part = self.get_teams( 436 offset=offset, 437 order=order, 438 ascending=ascending, 439 league=league, 440 name=name, 441 abbreviation=abbreviation, 442 ) 443 teams.extend(part) 444 445 if len(part) < 500: 446 break 447 448 offset += 500 449 450 return teams 451 452 def get_sports_metadata( 453 self, 454 ) -> list[Sport]: 455 response = self.client.get(self._build_url("/sports")) 456 response.raise_for_status() 457 return [Sport(**sport) for sport in response.json()] 458 459 def get_tags( 460 self, 461 limit: int = 300, 462 offset: int = 0, 463 order: Optional[ 464 Literal[ 465 "id", 466 "label", 467 "slug", 468 "forceShow", 469 "forceHide", 470 "isCarousel", 471 "createdAt", 472 "updatedAt", 473 "createdBy", 474 "updatedBy", 475 ] 476 ] = None, 477 ascending: bool = True, 478 include_templates: Optional[bool] = None, 479 is_carousel: Optional[bool] = None, 480 ) -> list[Tag]: 481 params: dict[str, int | str] = { 482 "limit": limit, 483 "offset": offset, 484 } 485 if order: 486 params["order"] = order 487 params["ascending"] = str(ascending).lower() 488 if include_templates is not None: 489 params["include_templates"] = str(include_templates).lower() 490 if is_carousel is not None: 491 params["is_carousel"] = str(is_carousel).lower() 492 response = self.client.get(self._build_url("/tags"), params=params) 493 response.raise_for_status() 494 return [Tag(**tag) for tag in response.json()] 495 496 def get_all_tags( 497 self, 498 order: Optional[ 499 Literal[ 500 "id", 501 "label", 502 "slug", 503 "forceShow", 504 "forceHide", 505 "isCarousel", 506 "createdAt", 507 "updatedAt", 508 "createdBy", 509 "updatedBy", 510 ] 511 ] = None, 512 ascending: bool = True, 513 include_templates: Optional[bool] = None, 514 is_carousel: Optional[bool] = None, 515 ) -> list[Tag]: 516 offset = 0 517 tags = [] 518 519 while True: 520 part = self.get_tags( 521 offset=offset, 522 order=order, 523 ascending=ascending, 524 include_templates=include_templates, 525 is_carousel=is_carousel, 526 ) 527 tags.extend(part) 528 529 if len(part) < 300: 530 break 531 532 offset += 300 533 534 return tags 535 536 def get_tag(self, tag_id: str, include_template: Optional[bool] = None) -> Tag: 537 params = {} 538 if include_template is not None: 539 params = {"include_template": str(include_template).lower()} 540 response = self.client.get(self._build_url(f"/tags/{tag_id}"), params=params) 541 response.raise_for_status() 542 return Tag(**response.json()) 543 544 def get_related_tag_ids_by_tag_id( 545 self, 546 tag_id: int, 547 omit_empty: Optional[bool] = None, 548 status: Optional[Literal["active", "closed", "all"]] = None, 549 ) -> list[TagRelation]: 550 params = {} 551 if omit_empty is not None: 552 params["omit_empty"] = str(omit_empty).lower() 553 if status: 554 params["status"] = status 555 response = self.client.get( 556 self._build_url(f"/tags/{tag_id}/related-tags"), params=params 557 ) 558 response.raise_for_status() 559 return [TagRelation(**tag) for tag in response.json()] 560 561 def get_related_tag_ids_by_slug( 562 self, 563 slug: str, 564 omit_empty: Optional[bool] = None, 565 status: Optional[Literal["active", "closed", "all"]] = None, 566 ) -> list[TagRelation]: 567 params = {} 568 if omit_empty is not None: 569 params["omit_empty"] = str(omit_empty).lower() 570 if status: 571 params["status"] = status 572 response = self.client.get( 573 self._build_url(f"/tags/slug/{slug}/related-tags"), params=params 574 ) 575 response.raise_for_status() 576 return [TagRelation(**tag) for tag in response.json()] 577 578 def get_related_tags_by_tag_id( 579 self, 580 tag_id: int, 581 omit_empty: Optional[bool] = None, 582 status: Optional[Literal["active", "closed", "all"]] = None, 583 ) -> list[Tag]: 584 params = {} 585 if omit_empty is not None: 586 params["omit_empty"] = str(omit_empty).lower() 587 if status: 588 params["status"] = status 589 response = self.client.get( 590 self._build_url(f"/tags/{tag_id}/related-tags/tags"), params=params 591 ) 592 response.raise_for_status() 593 return [Tag(**tag) for tag in response.json()] 594 595 def get_related_tags_by_slug( 596 self, 597 slug: str, 598 omit_empty: Optional[bool] = None, 599 status: Optional[Literal["active", "closed", "all"]] = None, 600 ) -> list[Tag]: 601 params = {} 602 if omit_empty is not None: 603 params["omit_empty"] = str(omit_empty).lower() 604 if status: 605 params["status"] = status 606 response = self.client.get( 607 self._build_url(f"/tags/slug/{slug}/related-tags/tags"), params=params 608 ) 609 response.raise_for_status() 610 return [Tag(**tag) for tag in response.json()] 611 612 def get_series( 613 self, 614 limit: int = 300, 615 offset: int = 0, 616 order: Optional[str] = None, 617 ascending: bool = True, 618 slug: Optional[str] = None, 619 closed: Optional[bool] = None, 620 include_chat: Optional[bool] = None, 621 recurrence: Optional[ 622 Literal[ 623 "hourly", "daily", "weekly", "monthly", "annual" 624 ] # results also contain "15m" but the server returns a 422 Unprocessable Content 625 ] = None, 626 ) -> list[Series]: 627 params: dict[str, str | int | list[int]] = { 628 "limit": limit, 629 "offset": offset, 630 } 631 if order: 632 params["order"] = order 633 params["ascending"] = str(ascending).lower() 634 if slug: 635 params["slug"] = slug 636 if closed is not None: 637 params["closed"] = str(closed).lower() 638 if include_chat is not None: 639 params["include_chat"] = str(include_chat).lower() 640 if recurrence is not None: 641 params["recurrence"] = str(recurrence).lower() 642 643 response = self.client.get(self._build_url("/series"), params=params) 644 response.raise_for_status() 645 return [Series(**series) for series in response.json()] 646 647 def get_all_series( 648 self, 649 order: Optional[str] = None, 650 ascending: bool = True, 651 slug: Optional[str] = None, 652 closed: Optional[bool] = None, 653 include_chat: Optional[bool] = None, 654 recurrence: Optional[ 655 Literal["hourly", "daily", "weekly", "monthly", "annual"] 656 ] = None, 657 ) -> list[Series]: 658 offset = 0 659 series = [] 660 661 while True: 662 part = self.get_series( 663 offset=offset, 664 order=order, 665 ascending=ascending, 666 slug=slug, 667 closed=closed, 668 include_chat=include_chat, 669 recurrence=recurrence, 670 ) 671 series.extend(part) 672 673 if len(part) < 300: 674 break 675 676 offset += 300 677 return series 678 679 def get_series_by_id(self, series_id: str) -> Series: 680 response = self.client.get(self._build_url(f"/series/{series_id}")) 681 response.raise_for_status() 682 return Series(**response.json()) 683 684 def get_comments( 685 self, 686 parent_entity_type: Literal["Event", "Series", "market"], 687 parent_entity_id: int, 688 limit=500, 689 offset=0, 690 order: Optional[str] = None, 691 ascending: bool = True, 692 get_positions: Optional[bool] = None, 693 holders_only: Optional[bool] = None, 694 ) -> list[Comment]: 695 """Warning, the server doesn't give back the right amount of comments you asked for.""" 696 params: dict[str, str | int] = { 697 "parent_entity_type": parent_entity_type, 698 "parent_entity_id": parent_entity_id, 699 "limit": limit, 700 "offset": offset, 701 } 702 if order: 703 params["order"] = order 704 params["ascending"] = str(ascending).lower() 705 if get_positions is not None: 706 params["get_positions"] = str(get_positions).lower() 707 if holders_only is not None: 708 params["holders_only"] = str(holders_only).lower() 709 response = self.client.get(self._build_url("/comments"), params=params) 710 response.raise_for_status() 711 return [Comment(**comment) for comment in response.json()] 712 713 def get_comments_by_id( 714 self, comment_id: str, get_positions: Optional[bool] = None 715 ) -> list[Comment]: 716 """Returns all comments that belong to the comment's thread.""" 717 params = {} 718 if get_positions is not None: 719 params["get_positions"] = str(get_positions).lower() 720 response = self.client.get( 721 self._build_url(f"/comments/{comment_id}"), params=params 722 ) 723 response.raise_for_status() 724 return [Comment(**comment) for comment in response.json()] 725 726 def get_comments_by_user_address( 727 self, 728 user_address: EthAddress, # warning, this is the base address, not the proxy address 729 limit=500, 730 offset=0, 731 order: Optional[str] = None, 732 ascending: bool = True, 733 ) -> list[Comment]: 734 params: dict[str, str | int] = { 735 "limit": limit, 736 "offset": offset, 737 } 738 if order: 739 params["order"] = order 740 params["ascending"] = str(ascending).lower() 741 response = self.client.get( 742 self._build_url(f"/comments/user_address/{user_address}"), params=params 743 ) 744 response.raise_for_status() 745 return [Comment(**comment) for comment in response.json()] 746 747 def grok_event_summary(self, event_slug: str): 748 json_payload = { 749 "id": generate_random_id(), 750 "messages": [{"role": "user", "content": "", "parts": []}], 751 } 752 753 params = { 754 "prompt": event_slug, 755 } 756 757 with self.client.stream( 758 method="POST", 759 url="https://polymarket.com/api/grok/event-summary", 760 params=params, 761 json=json_payload, 762 ) as stream: 763 messages = [] 764 citations = [] 765 seen_urls = set() 766 767 for line_bytes in stream.iter_lines(): 768 line = ( 769 line_bytes.decode() if isinstance(line_bytes, bytes) else line_bytes 770 ) 771 if line.startswith("__SOURCES__:"): 772 sources_json_str = line[len("__SOURCES__:") :] 773 try: 774 sources_obj = json.loads(sources_json_str) 775 for source in sources_obj.get("sources", []): 776 url = source.get("url") 777 if url and url not in seen_urls: 778 citations.append(source) 779 seen_urls.add(url) 780 except json.JSONDecodeError: 781 pass 782 else: 783 messages.append(line) 784 print(line, end="") # or handle message text as needed 785 786 # After reading streamed lines: 787 if citations: 788 print("\n\nSources:") 789 for source in citations: 790 print(f"- {source.get('url', 'Unknown URL')}") 791 792 def grok_election_market_explanation( 793 self, candidate_name: str, election_title: str 794 ): 795 text = f"Provide candidate information for {candidate_name} in the {election_title} on Polymarket." 796 json_payload = { 797 "id": generate_random_id(), 798 "messages": [ 799 { 800 "role": "user", 801 "content": text, 802 "parts": [{"type": "text", "text": text}], 803 }, 804 ], 805 } 806 807 response = self.client.post( 808 url="https://polymarket.com/api/grok/election-market-explanation", 809 json=json_payload, 810 ) 811 response.raise_for_status() 812 813 parts = [p.strip() for p in response.text.split("**") if p.strip()] 814 for i, part in enumerate(parts): 815 if ":" in part and i != 0: 816 print() 817 print(part) 818 819 def __enter__(self): 820 return self 821 822 def __exit__(self, exc_type, exc_val, exc_tb): 823 self.client.close()
39 def search( 40 self, 41 query: str, 42 cache: Optional[bool] = None, 43 status: Optional[Literal["active", "resolved"]] = None, 44 limit_per_type: Optional[int] = None, # max is 50 45 page: Optional[int] = None, 46 tags: Optional[list[str]] = None, 47 keep_closed_markets: Optional[bool] = None, 48 sort: Optional[ 49 Literal[ 50 "volume", 51 "volume_24hr", 52 "liquidity", 53 "start_date", 54 "end_date", 55 "competitive", 56 ] 57 ] = None, 58 ascending: Optional[bool] = None, 59 search_tags: Optional[bool] = None, 60 search_profiles: Optional[bool] = None, 61 recurrence: Optional[ 62 Literal["hourly", "daily", "weekly", "monthly", "annual"] 63 ] = None, 64 exclude_tag_ids: Optional[list[int]] = None, 65 optimized: Optional[bool] = None, 66 ) -> SearchResult: 67 params: dict[str, str | list[str] | int | bool] = { 68 "q": query, 69 } 70 if cache is not None: 71 params["cache"] = str(cache).lower() 72 if status: 73 params["events_status"] = status 74 if limit_per_type: 75 params["limit_per_type"] = limit_per_type 76 if page: 77 params["page"] = page 78 if tags: 79 params["events_tag"] = json.dumps([json.dumps(item) for item in tags]) 80 if keep_closed_markets is not None: 81 params["keep_closed_markets"] = keep_closed_markets 82 if sort: 83 params["sort"] = sort 84 if ascending is not None: 85 params["ascending"] = str(ascending).lower() 86 if search_tags is not None: 87 params["search_tags"] = str(search_tags).lower() 88 if search_profiles is not None: 89 params["search_profiles"] = str(search_profiles).lower() 90 if recurrence: 91 params["recurrence"] = recurrence 92 if exclude_tag_ids: 93 params["exclude_tag_id"] = [str(i) for i in exclude_tag_ids] 94 if optimized is not None: 95 params["optimized"] = str(optimized).lower() 96 response = self.client.get(self._build_url("/public-search"), params=params) 97 response.raise_for_status() 98 return SearchResult(**response.json())
100 def get_market(self, market_id: str) -> GammaMarket: 101 """Get a GammaMarket by market_id.""" 102 response = self.client.get(self._build_url(f"/markets/{market_id}")) 103 response.raise_for_status() 104 return GammaMarket(**response.json())
Get a GammaMarket by market_id.
106 def get_markets( 107 self, 108 limit: int | None = None, 109 offset: int | None = None, 110 order: str | None = None, 111 ascending: bool = True, 112 archived: bool | None = None, 113 active: bool | None = None, 114 closed: bool | None = None, 115 slugs: list[str] | None = None, 116 market_ids: list[int] | None = None, 117 token_ids: list[str] | None = None, 118 condition_ids: list[str] | None = None, 119 tag_id: int | None = None, 120 related_tags: bool | None = False, 121 liquidity_num_min: float | None = None, 122 liquidity_num_max: float | None = None, 123 volume_num_min: float | None = None, 124 volume_num_max: float | None = None, 125 start_date_min: datetime | None = None, 126 start_date_max: datetime | None = None, 127 end_date_min: datetime | None = None, 128 end_date_max: datetime | None = None, 129 ) -> list[GammaMarket]: 130 params: dict[str, float | int | list[int] | str | list[str] | bool] = {} 131 if limit: 132 params["limit"] = limit 133 if offset: 134 params["offset"] = offset 135 if order: 136 params["order"] = order 137 params["ascending"] = ascending 138 if slugs: 139 params["slug"] = slugs 140 if archived is not None: 141 params["archived"] = archived 142 if active is not None: 143 params["active"] = active 144 if closed is not None: 145 params["closed"] = closed 146 if market_ids: 147 params["id"] = market_ids 148 if token_ids: 149 params["clob_token_ids"] = token_ids 150 if condition_ids: 151 params["condition_ids"] = condition_ids 152 if liquidity_num_min: 153 params["liquidity_num_min"] = liquidity_num_min 154 if liquidity_num_max: 155 params["liquidity_num_max"] = liquidity_num_max 156 if volume_num_min: 157 params["volume_num_min"] = volume_num_min 158 if volume_num_max: 159 params["volume_num_max"] = volume_num_max 160 if start_date_min: 161 params["start_date_min"] = start_date_min.isoformat() 162 if start_date_max: 163 params["start_date_max"] = start_date_max.isoformat() 164 if end_date_min: 165 params["end_date_min"] = end_date_min.isoformat() 166 if end_date_max: 167 params["end_date_max"] = end_date_max.isoformat() 168 if tag_id: 169 params["tag_id"] = tag_id 170 if related_tags: 171 params["related_tags"] = related_tags 172 173 response = self.client.get(self._build_url("/markets"), params=params) 174 response.raise_for_status() 175 return [GammaMarket(**market) for market in response.json()]
177 def get_market_by_id( 178 self, market_id: str, include_tag: Optional[bool] = None 179 ) -> GammaMarket: 180 params = {} 181 if include_tag: 182 params["include_tag"] = include_tag 183 response = self.client.get( 184 self._build_url(f"/markets/{market_id}"), params=params 185 ) 186 response.raise_for_status() 187 return GammaMarket(**response.json())
194 def get_market_by_slug( 195 self, slug: str, include_tag: Optional[bool] = None 196 ) -> GammaMarket: 197 params = {} 198 if include_tag: 199 params["include_tag"] = include_tag 200 response = self.client.get( 201 self._build_url(f"/markets/slug/{slug}"), params=params 202 ) 203 response.raise_for_status() 204 return GammaMarket(**response.json())
206 def get_events( 207 self, 208 limit: int = 500, 209 offset: int = 0, 210 order: Optional[str] = None, 211 ascending: bool = True, 212 event_ids: Optional[Union[str, list[str]]] = None, 213 slugs: Optional[list[str]] = None, 214 archived: Optional[bool] = None, 215 active: Optional[bool] = None, 216 closed: Optional[bool] = None, 217 liquidity_min: Optional[float] = None, 218 liquidity_max: Optional[float] = None, 219 volume_min: Optional[float] = None, 220 volume_max: Optional[float] = None, 221 start_date_min: Optional[datetime] = None, 222 start_date_max: Optional[datetime] = None, 223 end_date_min: Optional[datetime] = None, 224 end_date_max: Optional[datetime] = None, 225 tag: Optional[str] = None, 226 tag_id: Optional[int] = None, 227 tag_slug: Optional[str] = None, 228 related_tags: bool = False, 229 ) -> list[Event]: 230 params: dict[str, int | str | list[str] | float] = { 231 "limit": limit, 232 "offset": offset, 233 } 234 if order: 235 params["order"] = order 236 params["ascending"] = ascending 237 if event_ids: 238 params["id"] = event_ids 239 if slugs: 240 params["slug"] = slugs 241 if archived is not None: 242 params["archived"] = archived 243 if active is not None: 244 params["active"] = active 245 if closed is not None: 246 params["closed"] = closed 247 if liquidity_min: 248 params["liquidity_min"] = liquidity_min 249 if liquidity_max: 250 params["liquidity_max"] = liquidity_max 251 if volume_min: 252 params["volume_min"] = volume_min 253 if volume_max: 254 params["volume_max"] = volume_max 255 if start_date_min: 256 params["start_date_min"] = start_date_min.isoformat() 257 if start_date_max: 258 params["start_date_max"] = start_date_max.isoformat() 259 if end_date_min: 260 params["end_date_min"] = end_date_min.isoformat() 261 if end_date_max: 262 params["end_date_max"] = end_date_max.isoformat() 263 if tag: 264 params["tag"] = tag 265 elif tag_id: 266 params["tag_id"] = tag_id 267 if related_tags: 268 params["related_tags"] = related_tags 269 elif tag_slug: 270 params["tag_slug"] = tag_slug 271 272 response = self.client.get(self._build_url("/events"), params=params) 273 response.raise_for_status() 274 return [Event(**event) for event in response.json()]
276 def get_all_events( 277 self, 278 order: Optional[str] = None, 279 ascending: bool = True, 280 event_ids: Optional[Union[str, list[str]]] = None, 281 slugs: Optional[list[str]] = None, 282 archived: Optional[bool] = None, 283 active: Optional[bool] = None, 284 closed: Optional[bool] = None, 285 liquidity_min: Optional[float] = None, 286 liquidity_max: Optional[float] = None, 287 volume_min: Optional[float] = None, 288 volume_max: Optional[float] = None, 289 start_date_min: Optional[datetime] = None, 290 start_date_max: Optional[datetime] = None, 291 end_date_min: Optional[datetime] = None, 292 end_date_max: Optional[datetime] = None, 293 tag: Optional[str] = None, 294 tag_id: Optional[int] = None, 295 tag_slug: Optional[str] = None, 296 related_tags: bool = False, 297 ) -> list[Event]: 298 offset = 0 299 events = [] 300 301 while True: 302 part = self.get_events( 303 offset=offset, 304 order=order, 305 ascending=ascending, 306 event_ids=event_ids, 307 slugs=slugs, 308 archived=archived, 309 active=active, 310 closed=closed, 311 liquidity_min=liquidity_min, 312 liquidity_max=liquidity_max, 313 volume_min=volume_min, 314 volume_max=volume_max, 315 start_date_min=start_date_min, 316 start_date_max=start_date_max, 317 end_date_min=end_date_min, 318 end_date_max=end_date_max, 319 tag=tag, 320 tag_id=tag_id, 321 tag_slug=tag_slug, 322 related_tags=related_tags, 323 ) 324 events.extend(part) 325 326 if len(part) < 500: 327 break 328 329 offset += 500 330 331 return events
333 def get_event_by_id( 334 self, 335 event_id: int, 336 include_chat: Optional[bool] = None, 337 include_template: Optional[bool] = None, 338 ) -> Event: 339 params = {} 340 if include_chat: 341 params["include_chat"] = include_chat 342 if include_template: 343 params["include_template"] = include_template 344 response = self.client.get( 345 self._build_url(f"/events/{event_id}"), params=params 346 ) 347 response.raise_for_status() 348 return Event(**response.json())
350 def get_event_by_slug( 351 self, 352 slug: str, 353 include_chat: Optional[bool] = None, 354 include_template: Optional[bool] = None, 355 ) -> Event: 356 params = {} 357 if include_chat: 358 params["include_chat"] = include_chat 359 if include_template: 360 params["include_template"] = include_template 361 response = self.client.get( 362 self._build_url(f"/events/slug/{slug}"), params=params 363 ) 364 response.raise_for_status() 365 return Event(**response.json())
372 def get_teams( 373 self, 374 limit: int = 500, 375 offset: int = 0, 376 order: Optional[ 377 Literal[ 378 "id", 379 "name", 380 "league", 381 "record", 382 "logo", 383 "abbreviation", 384 "alias", 385 "createdAt", 386 "updatedAt", 387 ] 388 ] = None, 389 ascending: bool = True, 390 league: Optional[str] = None, 391 name: Optional[str] = None, 392 abbreviation: Optional[str] = None, 393 ) -> list[Team]: 394 params: dict[str, int | str] = { 395 "limit": limit, 396 "offset": offset, 397 } 398 if order: 399 params["order"] = order 400 params["ascending"] = str(ascending).lower() 401 if league: 402 params["league"] = league.lower() 403 if name: 404 params["name"] = name 405 if abbreviation: 406 params["abbreviation"] = abbreviation.lower() 407 response = self.client.get(self._build_url("/teams"), params=params) 408 response.raise_for_status() 409 return [Team(**team) for team in response.json()]
411 def get_all_teams( 412 self, 413 order: Optional[ 414 Literal[ 415 "id", 416 "name", 417 "league", 418 "record", 419 "logo", 420 "abbreviation", 421 "alias", 422 "createdAt", 423 "updatedAt", 424 ] 425 ] = None, 426 ascending: bool = True, 427 league: Optional[str] = None, 428 name: Optional[str] = None, 429 abbreviation: Optional[str] = None, 430 ) -> list[Team]: 431 offset = 0 432 teams = [] 433 434 while True: 435 part = self.get_teams( 436 offset=offset, 437 order=order, 438 ascending=ascending, 439 league=league, 440 name=name, 441 abbreviation=abbreviation, 442 ) 443 teams.extend(part) 444 445 if len(part) < 500: 446 break 447 448 offset += 500 449 450 return teams
536 def get_tag(self, tag_id: str, include_template: Optional[bool] = None) -> Tag: 537 params = {} 538 if include_template is not None: 539 params = {"include_template": str(include_template).lower()} 540 response = self.client.get(self._build_url(f"/tags/{tag_id}"), params=params) 541 response.raise_for_status() 542 return Tag(**response.json())
612 def get_series( 613 self, 614 limit: int = 300, 615 offset: int = 0, 616 order: Optional[str] = None, 617 ascending: bool = True, 618 slug: Optional[str] = None, 619 closed: Optional[bool] = None, 620 include_chat: Optional[bool] = None, 621 recurrence: Optional[ 622 Literal[ 623 "hourly", "daily", "weekly", "monthly", "annual" 624 ] # results also contain "15m" but the server returns a 422 Unprocessable Content 625 ] = None, 626 ) -> list[Series]: 627 params: dict[str, str | int | list[int]] = { 628 "limit": limit, 629 "offset": offset, 630 } 631 if order: 632 params["order"] = order 633 params["ascending"] = str(ascending).lower() 634 if slug: 635 params["slug"] = slug 636 if closed is not None: 637 params["closed"] = str(closed).lower() 638 if include_chat is not None: 639 params["include_chat"] = str(include_chat).lower() 640 if recurrence is not None: 641 params["recurrence"] = str(recurrence).lower() 642 643 response = self.client.get(self._build_url("/series"), params=params) 644 response.raise_for_status() 645 return [Series(**series) for series in response.json()]
647 def get_all_series( 648 self, 649 order: Optional[str] = None, 650 ascending: bool = True, 651 slug: Optional[str] = None, 652 closed: Optional[bool] = None, 653 include_chat: Optional[bool] = None, 654 recurrence: Optional[ 655 Literal["hourly", "daily", "weekly", "monthly", "annual"] 656 ] = None, 657 ) -> list[Series]: 658 offset = 0 659 series = [] 660 661 while True: 662 part = self.get_series( 663 offset=offset, 664 order=order, 665 ascending=ascending, 666 slug=slug, 667 closed=closed, 668 include_chat=include_chat, 669 recurrence=recurrence, 670 ) 671 series.extend(part) 672 673 if len(part) < 300: 674 break 675 676 offset += 300 677 return series
684 def get_comments( 685 self, 686 parent_entity_type: Literal["Event", "Series", "market"], 687 parent_entity_id: int, 688 limit=500, 689 offset=0, 690 order: Optional[str] = None, 691 ascending: bool = True, 692 get_positions: Optional[bool] = None, 693 holders_only: Optional[bool] = None, 694 ) -> list[Comment]: 695 """Warning, the server doesn't give back the right amount of comments you asked for.""" 696 params: dict[str, str | int] = { 697 "parent_entity_type": parent_entity_type, 698 "parent_entity_id": parent_entity_id, 699 "limit": limit, 700 "offset": offset, 701 } 702 if order: 703 params["order"] = order 704 params["ascending"] = str(ascending).lower() 705 if get_positions is not None: 706 params["get_positions"] = str(get_positions).lower() 707 if holders_only is not None: 708 params["holders_only"] = str(holders_only).lower() 709 response = self.client.get(self._build_url("/comments"), params=params) 710 response.raise_for_status() 711 return [Comment(**comment) for comment in response.json()]
Warning, the server doesn't give back the right amount of comments you asked for.
713 def get_comments_by_id( 714 self, comment_id: str, get_positions: Optional[bool] = None 715 ) -> list[Comment]: 716 """Returns all comments that belong to the comment's thread.""" 717 params = {} 718 if get_positions is not None: 719 params["get_positions"] = str(get_positions).lower() 720 response = self.client.get( 721 self._build_url(f"/comments/{comment_id}"), params=params 722 ) 723 response.raise_for_status() 724 return [Comment(**comment) for comment in response.json()]
Returns all comments that belong to the comment's thread.
726 def get_comments_by_user_address( 727 self, 728 user_address: EthAddress, # warning, this is the base address, not the proxy address 729 limit=500, 730 offset=0, 731 order: Optional[str] = None, 732 ascending: bool = True, 733 ) -> list[Comment]: 734 params: dict[str, str | int] = { 735 "limit": limit, 736 "offset": offset, 737 } 738 if order: 739 params["order"] = order 740 params["ascending"] = str(ascending).lower() 741 response = self.client.get( 742 self._build_url(f"/comments/user_address/{user_address}"), params=params 743 ) 744 response.raise_for_status() 745 return [Comment(**comment) for comment in response.json()]
747 def grok_event_summary(self, event_slug: str): 748 json_payload = { 749 "id": generate_random_id(), 750 "messages": [{"role": "user", "content": "", "parts": []}], 751 } 752 753 params = { 754 "prompt": event_slug, 755 } 756 757 with self.client.stream( 758 method="POST", 759 url="https://polymarket.com/api/grok/event-summary", 760 params=params, 761 json=json_payload, 762 ) as stream: 763 messages = [] 764 citations = [] 765 seen_urls = set() 766 767 for line_bytes in stream.iter_lines(): 768 line = ( 769 line_bytes.decode() if isinstance(line_bytes, bytes) else line_bytes 770 ) 771 if line.startswith("__SOURCES__:"): 772 sources_json_str = line[len("__SOURCES__:") :] 773 try: 774 sources_obj = json.loads(sources_json_str) 775 for source in sources_obj.get("sources", []): 776 url = source.get("url") 777 if url and url not in seen_urls: 778 citations.append(source) 779 seen_urls.add(url) 780 except json.JSONDecodeError: 781 pass 782 else: 783 messages.append(line) 784 print(line, end="") # or handle message text as needed 785 786 # After reading streamed lines: 787 if citations: 788 print("\n\nSources:") 789 for source in citations: 790 print(f"- {source.get('url', 'Unknown URL')}")
792 def grok_election_market_explanation( 793 self, candidate_name: str, election_title: str 794 ): 795 text = f"Provide candidate information for {candidate_name} in the {election_title} on Polymarket." 796 json_payload = { 797 "id": generate_random_id(), 798 "messages": [ 799 { 800 "role": "user", 801 "content": text, 802 "parts": [{"type": "text", "text": text}], 803 }, 804 ], 805 } 806 807 response = self.client.post( 808 url="https://polymarket.com/api/grok/election-market-explanation", 809 json=json_payload, 810 ) 811 response.raise_for_status() 812 813 parts = [p.strip() for p in response.text.split("**") if p.strip()] 814 for i, part in enumerate(parts): 815 if ":" in part and i != 0: 816 print() 817 print(part)
10class PolymarketGraphQLClient: 11 """Synchronous GraphQL client for Polymarket subgraphs.""" 12 13 def __init__( 14 self, 15 endpoint_name: Literal[ 16 "activity_subgraph", 17 "fpmm_subgraph", 18 "open_interest_subgraph", 19 "orderbook_subgraph", 20 "pnl_subgraph", 21 "positions_subgraph", 22 "sports_oracle_subgraph", 23 "wallet_subgraph", 24 ], 25 ) -> None: 26 endpoint_url = GRAPHQL_ENDPOINTS[endpoint_name] 27 self.transport = HTTPXTransport(url=endpoint_url) 28 self.client = Client( 29 transport=self.transport, fetch_schema_from_transport=False 30 ) 31 32 def query(self, query_string: str) -> dict: 33 with self.client as session: 34 return session.execute(gql(query_string))
Synchronous GraphQL client for Polymarket subgraphs.
13 def __init__( 14 self, 15 endpoint_name: Literal[ 16 "activity_subgraph", 17 "fpmm_subgraph", 18 "open_interest_subgraph", 19 "orderbook_subgraph", 20 "pnl_subgraph", 21 "positions_subgraph", 22 "sports_oracle_subgraph", 23 "wallet_subgraph", 24 ], 25 ) -> None: 26 endpoint_url = GRAPHQL_ENDPOINTS[endpoint_name] 27 self.transport = HTTPXTransport(url=endpoint_url) 28 self.client = Client( 29 transport=self.transport, fetch_schema_from_transport=False 30 )
41class PolymarketWeb3Client: 42 def __init__( 43 self, 44 private_key: str, 45 signature_type: Literal[0, 1, 2] = 1, 46 chain_id: Literal[137, 80002] = POLYGON, 47 ): 48 self.w3 = Web3(Web3.HTTPProvider("https://polygon-rpc.com")) 49 self.w3.middleware_onion.inject(ExtraDataToPOAMiddleware, layer=0) # type: ignore[arg-type] 50 self.w3.middleware_onion.inject( 51 SignAndSendRawMiddlewareBuilder.build(private_key), # type: ignore[arg-type] 52 layer=0, 53 ) 54 55 self.account = self.w3.eth.account.from_key(private_key) 56 self.signature_type = signature_type 57 58 self.config = get_contract_config(chain_id, neg_risk=False) 59 self.neg_risk_config = get_contract_config(chain_id, neg_risk=True) 60 61 self.usdc_address = Web3.to_checksum_address(self.config.collateral) 62 self.usdc_abi = _load_abi("UChildERC20Proxy") 63 self.usdc = self._contract(self.usdc_address, self.usdc_abi) 64 65 self.conditional_tokens_address = Web3.to_checksum_address( 66 self.config.conditional_tokens 67 ) 68 self.conditional_tokens_abi = _load_abi("ConditionalTokens") 69 self.conditional_tokens = self._contract( 70 self.conditional_tokens_address, self.conditional_tokens_abi 71 ) 72 73 self.exchange_address = Web3.to_checksum_address(self.config.exchange) 74 self.exchange_abi = _load_abi("CTFExchange") 75 self.exchange = self._contract(self.exchange_address, self.exchange_abi) 76 77 self.neg_risk_exchange_address = Web3.to_checksum_address( 78 self.neg_risk_config.exchange 79 ) 80 self.neg_risk_exchange_abi = _load_abi("NegRiskCtfExchange") 81 self.neg_risk_exchange = self._contract( 82 self.neg_risk_exchange_address, self.neg_risk_exchange_abi 83 ) 84 85 self.neg_risk_adapter_address = Web3.to_checksum_address( 86 "0xd91E80cF2E7be2e162c6513ceD06f1dD0dA35296" 87 ) 88 self.neg_risk_adapter_abi = _load_abi("NegRiskAdapter") 89 self.neg_risk_adapter = self._contract( 90 self.neg_risk_adapter_address, self.neg_risk_adapter_abi 91 ) 92 93 self.proxy_factory_address = Web3.to_checksum_address( 94 "0xaB45c5A4B0c941a2F231C04C3f49182e1A254052" 95 ) 96 self.proxy_factory_abi = _load_abi("ProxyWalletFactory") 97 self.proxy_factory = self._contract( 98 self.proxy_factory_address, self.proxy_factory_abi 99 ) 100 101 self.safe_proxy_factory_address = Web3.to_checksum_address( 102 "0xaacFeEa03eb1561C4e67d661e40682Bd20E3541b" 103 ) 104 self.safe_proxy_factory_abi = _load_abi("SafeProxyFactory") 105 self.safe_proxy_factory = self._contract( 106 self.safe_proxy_factory_address, self.safe_proxy_factory_abi 107 ) 108 109 match self.signature_type: 110 case 0: 111 self.address = self.account.address 112 case 1: 113 self.address = self.get_poly_proxy_address() 114 case 2: 115 self.address = self.get_safe_proxy_address() 116 self.safe_abi = _load_abi("Safe") 117 self.safe = self._contract(self.address, self.safe_abi) 118 119 def _contract(self, address, abi): 120 return self.w3.eth.contract( 121 address=Web3.to_checksum_address(address), 122 abi=abi, 123 ) 124 125 def _encode_usdc_approve(self, address: ChecksumAddress) -> str: 126 return self.usdc.encode_abi( 127 abi_element_identifier="approve", 128 args=[address, int(MAX_INT, base=16)], 129 ) 130 131 def _encode_condition_tokens_approve(self, address: ChecksumAddress) -> str: 132 return self.conditional_tokens.encode_abi( 133 abi_element_identifier="setApprovalForAll", 134 args=[address, True], 135 ) 136 137 def _encode_transfer_usdc(self, address: ChecksumAddress, amount: int) -> str: 138 return self.usdc.encode_abi( 139 abi_element_identifier="transfer", 140 args=[address, amount], 141 ) 142 143 def _encode_transfer_token( 144 self, token_id: str, address: ChecksumAddress, amount: int 145 ) -> str: 146 return self.conditional_tokens.encode_abi( 147 abi_element_identifier="safeTransferFrom", 148 args=[self.address, address, int(token_id), amount, HASH_ZERO], 149 ) 150 151 def _encode_split(self, condition_id: Keccak256, amount: int) -> str: 152 return self.conditional_tokens.encode_abi( 153 abi_element_identifier="splitPosition", 154 args=[self.usdc_address, HASH_ZERO, condition_id, [1, 2], amount], 155 ) 156 157 def _encode_merge(self, condition_id: Keccak256, amount: int) -> str: 158 return self.conditional_tokens.encode_abi( 159 abi_element_identifier="mergePositions", 160 args=[self.usdc_address, HASH_ZERO, condition_id, [1, 2], amount], 161 ) 162 163 def _encode_redeem(self, condition_id: Keccak256) -> str: 164 return self.conditional_tokens.encode_abi( 165 abi_element_identifier="redeemPositions", 166 args=[self.usdc_address, HASH_ZERO, condition_id, [1, 2]], 167 ) 168 169 def _encode_redeem_neg_risk( 170 self, condition_id: Keccak256, amounts: list[int] 171 ) -> str: 172 return self.neg_risk_adapter.encode_abi( 173 abi_element_identifier="redeemPositions", 174 args=[condition_id, amounts], 175 ) 176 177 def _encode_convert( 178 self, neg_risk_market_id: Keccak256, index_set: int, amount: int 179 ) -> str: 180 return self.neg_risk_adapter.encode_abi( 181 abi_element_identifier="convertPositions", 182 args=[neg_risk_market_id, index_set, amount], 183 ) 184 185 def _build_base_transaction(self) -> TxParams: 186 """Build base transaction parameters.""" 187 nonce = self.w3.eth.get_transaction_count(self.account.address) 188 189 current_gas_price: int = self.w3.eth.gas_price 190 adjusted_gas_price = Wei(int(current_gas_price * 1.05)) 191 192 base_transaction: TxParams = { 193 "nonce": nonce, 194 "gasPrice": adjusted_gas_price, 195 "gas": 1000000, 196 "from": self.account.address, 197 } 198 199 return base_transaction 200 201 def _build_proxy_transaction(self, to, data, base_transaction) -> TxParams: 202 proxy_txn = { 203 "typeCode": 1, 204 "to": to, 205 "value": 0, 206 "data": data, 207 } 208 txn_data = self.proxy_factory.functions.proxy([proxy_txn]).build_transaction( 209 transaction=base_transaction 210 ) 211 212 return txn_data 213 214 def _build_safe_transaction(self, to, data, base_transaction) -> TxParams: 215 safe_nonce = self.safe.functions.nonce().call() 216 safe_txn = { 217 "to": to, 218 "data": data, 219 "operation": 0, # 1 for delegatecall, 0 for call 220 "value": 0, 221 } 222 packed_sig = sign_safe_transaction( 223 self.account, 224 self.safe, 225 safe_txn, 226 safe_nonce, 227 ) 228 txn_data = self.safe.functions.execTransaction( 229 safe_txn["to"], 230 safe_txn["value"], 231 safe_txn["data"], 232 safe_txn.get("operation", 0), 233 0, # safeTxGas 234 0, # baseGas 235 0, # gasPrice 236 ADDRESS_ZERO, # gasToken 237 ADDRESS_ZERO, # refundReceiver 238 packed_sig, 239 ).build_transaction(transaction=base_transaction) 240 241 return txn_data 242 243 def _execute_transaction( 244 self, txn_data: TxParams, operation_name: str = "Transaction" 245 ) -> TransactionReceipt: 246 """ 247 Execute a transaction, wait for receipt, and print status. 248 249 Args: 250 txn_data: Built transaction data 251 operation_name: Name of operation for logging 252 253 """ 254 signed_txn = self.account.sign_transaction(txn_data) 255 tx_hash = self.w3.eth.send_raw_transaction(signed_txn.raw_transaction) 256 tx_hash_hex = tx_hash.hex() 257 258 print(f"Txn hash: 0x{tx_hash_hex}") 259 260 # Wait for transaction to be mined and get receipt 261 receipt_dict = self.w3.eth.wait_for_transaction_receipt(tx_hash) 262 receipt = TransactionReceipt.model_validate(receipt_dict) 263 264 print(f"{operation_name} succeeded") if receipt.status == 1 else print( 265 f"{operation_name} failed" 266 ) 267 268 return receipt 269 270 def get_poly_proxy_address(self, address: EthAddress | None = None) -> EthAddress: 271 """Get the polymarket proxy address for the current account.""" 272 address = address if address else self.account.address 273 return self.exchange.functions.getPolyProxyWalletAddress(address).call() 274 275 def get_safe_proxy_address(self, address: EthAddress | None = None) -> EthAddress: 276 """Get the safe proxy address for the current account.""" 277 address = address if address else self.account.address 278 return self.safe_proxy_factory.functions.computeProxyAddress(address).call() 279 280 def get_usdc_balance(self, address: EthAddress | None = None) -> float: 281 """ 282 Get the usdc balance of the given address. 283 284 If no address is given, the balance of the proxy account corresponding to 285 the private key is returned (i.e. Polymarket balance). 286 Explicitly passing the proxy address is faster due to only one contract function call. 287 """ 288 if address is None: 289 address = self.address 290 balance_res = self.usdc.functions.balanceOf(address).call() 291 return float(balance_res / 1e6) 292 293 def get_token_balance( 294 self, token_id: str, address: EthAddress | None = None 295 ) -> float: 296 """Get the token balance of the given address.""" 297 if not address: 298 address = self.address 299 balance_res = self.conditional_tokens.functions.balanceOf( 300 address, int(token_id) 301 ).call() 302 return float(balance_res / 1e6) 303 304 def get_token_complement(self, token_id: str) -> str | None: 305 """Get the complement of the given token.""" 306 try: 307 return str( 308 self.neg_risk_exchange.functions.getComplement(int(token_id)).call() 309 ) 310 except ContractCustomError as e: 311 if e.args[0] in CUSTOM_ERROR_DICT: 312 try: 313 return str( 314 self.exchange.functions.getComplement(int(token_id)).call() 315 ) 316 except ContractCustomError as e2: 317 if e2.args[0] in CUSTOM_ERROR_DICT: 318 msg = f"{CUSTOM_ERROR_DICT[e2.args[0]]}" 319 raise ContractCustomError( 320 msg, 321 ) from e2 322 return None 323 return None 324 325 def get_condition_id_neg_risk(self, question_id: Keccak256) -> Keccak256: 326 """ 327 Get the condition id for a given question id. 328 329 Warning: this works for neg risk markets (where the 330 outcomeSlotCount is represented by the last two digits of question id). Returns a keccak256 hash of 331 the oracle and question id. 332 """ 333 return ( 334 "0x" 335 + self.neg_risk_adapter.functions.getConditionId(question_id).call().hex() 336 ) 337 338 def deploy_safe(self) -> TransactionReceipt: 339 """Deploy a Safe wallet using the SafeProxyFactory contract.""" 340 safe_address = self.get_safe_proxy_address() 341 if self.w3.eth.get_code(self.w3.to_checksum_address(safe_address)) != b"": 342 msg = f"Safe already deployed at {safe_address}" 343 raise SafeAlreadyDeployedError(msg) 344 345 # Create the EIP-712 signature for Safe creation 346 sig = create_safe_create_signature(account=self.account, chain_id=POLYGON) 347 348 # Split the signature into r, s, v components 349 split_sig = split_signature(sig) 350 351 # Build the transaction 352 base_transaction = self._build_base_transaction() 353 354 # Execute the createProxy function 355 txn_data = self.safe_proxy_factory.functions.createProxy( 356 ADDRESS_ZERO, # paymentToken 357 0, # payment 358 ADDRESS_ZERO, # paymentReceiver 359 ( 360 split_sig["v"], 361 split_sig["r"], 362 split_sig["s"], 363 ), # createSig tuple (uint8, bytes32, bytes32) 364 ).build_transaction(transaction=base_transaction) 365 366 # Sign and send transaction 367 return self._execute_transaction( 368 txn_data, operation_name="Gnosis Safe Deployment" 369 ) 370 371 def set_collateral_approval(self, spender: ChecksumAddress) -> TransactionReceipt: 372 to = self.usdc_address 373 data = self._encode_usdc_approve(address=spender) 374 base_transaction = self._build_base_transaction() 375 txn_data: TxParams = {} 376 377 match self.signature_type: 378 case 0: 379 txn_data = self.usdc.functions.approve( 380 spender, int(MAX_INT, base=16) 381 ).build_transaction(transaction=base_transaction) 382 case 1: 383 txn_data = self._build_proxy_transaction(to, data, base_transaction) 384 case 2: 385 txn_data = self._build_safe_transaction(to, data, base_transaction) 386 387 return self._execute_transaction(txn_data, operation_name="Collateral Approval") 388 389 def set_conditional_tokens_approval( 390 self, spender: ChecksumAddress 391 ) -> TransactionReceipt: 392 to = self.conditional_tokens_address 393 data = self._encode_condition_tokens_approve(address=spender) 394 base_transaction = self._build_base_transaction() 395 txn_data: TxParams = {} 396 397 match self.signature_type: 398 case 0: 399 txn_data = self.conditional_tokens.functions.setApprovalForAll( 400 spender, True 401 ).build_transaction(transaction=base_transaction) 402 case 1: 403 txn_data = self._build_proxy_transaction(to, data, base_transaction) 404 case 2: 405 txn_data = self._build_safe_transaction(to, data, base_transaction) 406 407 return self._execute_transaction( 408 txn_data, operation_name="Conditional Tokens Approval" 409 ) 410 411 def set_all_approvals(self) -> list[TransactionReceipt]: 412 """Sets both collateral and conditional tokens approvals.""" 413 receipts = [] 414 print("Approving ConditionalTokens as spender on USDC") 415 receipts.append( 416 self.set_collateral_approval( 417 spender=self.conditional_tokens_address, 418 ) 419 ) 420 print("Approving CTFExchange as spender on USDC") 421 receipts.append( 422 self.set_collateral_approval( 423 spender=self.exchange_address, 424 ) 425 ) 426 print("Approving NegRiskCtfExchange as spender on USDC") 427 receipts.append( 428 self.set_collateral_approval( 429 spender=self.neg_risk_exchange_address, 430 ) 431 ) 432 print("Approving NegRiskAdapter as spender on USDC") 433 receipts.append( 434 self.set_collateral_approval( 435 spender=self.neg_risk_adapter_address, 436 ) 437 ) 438 print("Approving CTFExchange as spender on ConditionalTokens") 439 receipts.append( 440 self.set_conditional_tokens_approval( 441 spender=self.exchange_address, 442 ) 443 ) 444 print("Approving NegRiskCtfExchange as spender on ConditionalTokens") 445 receipts.append( 446 self.set_conditional_tokens_approval( 447 spender=self.neg_risk_exchange_address, 448 ) 449 ) 450 print("Approving NegRiskAdapter as spender on ConditionalTokens") 451 receipts.append( 452 self.set_conditional_tokens_approval( 453 spender=self.neg_risk_adapter_address, 454 ) 455 ) 456 print("All approvals set!") 457 458 return receipts 459 460 def transfer_usdc(self, recipient: EthAddress, amount: float) -> TransactionReceipt: 461 """Transfers usdc.e from the account to the proxy address.""" 462 balance = self.get_usdc_balance(address=self.address) 463 if balance < amount: 464 msg = f"Insufficient USDC.e balance: {balance} < {amount}" 465 raise ValueError(msg) 466 amount = int(balance * 1e6) 467 468 to = self.usdc_address 469 data = self._encode_transfer_usdc( 470 self.w3.to_checksum_address(recipient), amount 471 ) 472 base_transaction = self._build_base_transaction() 473 txn_data: TxParams = {} 474 475 match self.signature_type: 476 case 0: 477 txn_data = self.usdc.functions.transfer( 478 recipient, 479 amount, 480 ).build_transaction(transaction=base_transaction) 481 case 1: 482 txn_data = self._build_proxy_transaction(to, data, base_transaction) 483 case 2: 484 txn_data = self._build_safe_transaction(to, data, base_transaction) 485 486 # Sign and send transaction 487 return self._execute_transaction(txn_data, operation_name="USDC Transfer") 488 489 def transfer_token( 490 self, token_id: str, recipient: EthAddress, amount: float 491 ) -> TransactionReceipt: 492 """Transfers conditional tokens from the account to the recipient address.""" 493 balance = self.get_token_balance(token_id=token_id, address=self.address) 494 if balance < amount: 495 msg = f"Insufficient token balance: {balance} < {amount}" 496 raise ValueError(msg) 497 amount = int(balance * 1e6) 498 499 to = self.conditional_tokens_address 500 data = self._encode_transfer_token( 501 token_id, self.w3.to_checksum_address(recipient), amount 502 ) 503 base_transaction = self._build_base_transaction() 504 txn_data: TxParams = {} 505 506 match self.signature_type: 507 case 0: 508 txn_data = self.conditional_tokens.functions.safeTransferFrom( 509 self.address, 510 recipient, 511 int(token_id), 512 amount, 513 b"", 514 ).build_transaction(transaction=base_transaction) 515 case 1: 516 txn_data = self._build_proxy_transaction(to, data, base_transaction) 517 case 2: 518 txn_data = self._build_safe_transaction(to, data, base_transaction) 519 520 # Sign and send transaction 521 return self._execute_transaction(txn_data, operation_name="Token Transfer") 522 523 def split_position( 524 self, condition_id: Keccak256, amount: float, neg_risk: bool = True 525 ) -> TransactionReceipt: 526 """Splits usdc into two complementary positions of equal size.""" 527 amount = int(amount * 1e6) 528 529 to = ( 530 self.neg_risk_adapter_address 531 if neg_risk 532 else self.conditional_tokens_address 533 ) 534 data = self._encode_split(condition_id, amount) 535 base_transaction = self._build_base_transaction() 536 txn_data: TxParams = {} 537 538 match self.signature_type: 539 case 0: 540 _contract = ( 541 self.neg_risk_adapter if neg_risk else self.conditional_tokens 542 ) 543 txn_data = _contract.functions.splitPosition( 544 self.usdc_address, 545 HASH_ZERO, 546 condition_id, 547 [1, 2], 548 amount, 549 ).build_transaction(transaction=base_transaction) 550 case 1: 551 txn_data = self._build_proxy_transaction(to, data, base_transaction) 552 case 2: 553 txn_data = self._build_safe_transaction(to, data, base_transaction) 554 555 # Sign and send transaction 556 return self._execute_transaction(txn_data, operation_name="Split Position") 557 558 def merge_position( 559 self, condition_id: Keccak256, amount: float, neg_risk: bool = True 560 ) -> TransactionReceipt: 561 """Merges two complementary positions into usdc.""" 562 amount = int(amount * 1e6) 563 564 to = ( 565 self.neg_risk_adapter_address 566 if neg_risk 567 else self.conditional_tokens_address 568 ) 569 data = self._encode_merge(condition_id, amount) 570 base_transaction = self._build_base_transaction() 571 txn_data: TxParams = {} 572 573 match self.signature_type: 574 case 0: 575 _contract = ( 576 self.neg_risk_adapter if neg_risk else self.conditional_tokens 577 ) 578 txn_data = _contract.functions.mergePositions( 579 self.usdc_address, 580 HASH_ZERO, 581 condition_id, 582 [1, 2], 583 amount, 584 ).build_transaction(transaction=base_transaction) 585 case 1: 586 txn_data = self._build_proxy_transaction(to, data, base_transaction) 587 case 2: 588 txn_data = self._build_safe_transaction(to, data, base_transaction) 589 590 # Sign and send transaction 591 return self._execute_transaction(txn_data, operation_name="Merge Position") 592 593 def redeem_position( 594 self, condition_id: Keccak256, amounts: list[float], neg_risk: bool = True 595 ) -> TransactionReceipt: 596 """ 597 Redeem a position into usdc. 598 599 Takes a condition id and a list of sizes in shares [x, y] 600 where x is the number of shares of the first outcome 601 y is the number of shares of the second outcome. 602 """ 603 int_amounts = [int(amount * 1e6) for amount in amounts] 604 605 to = ( 606 self.neg_risk_adapter_address 607 if neg_risk 608 else self.conditional_tokens_address 609 ) 610 data = ( 611 self._encode_redeem_neg_risk(condition_id, int_amounts) 612 if neg_risk 613 else self._encode_redeem(condition_id) 614 ) 615 base_transaction = self._build_base_transaction() 616 txn_data: TxParams = {} 617 618 match self.signature_type: 619 case 0: 620 _contract = ( 621 self.neg_risk_adapter if neg_risk else self.conditional_tokens 622 ) 623 if neg_risk: 624 txn_data = _contract.functions.redeemPositions( 625 condition_id, int_amounts 626 ).build_transaction(transaction=base_transaction) 627 else: 628 txn_data = _contract.functions.redeemPositions( 629 self.usdc_address, 630 HASH_ZERO, 631 condition_id, 632 [1, 2], 633 ).build_transaction(transaction=base_transaction) 634 case 1: 635 txn_data = self._build_proxy_transaction(to, data, base_transaction) 636 case 2: 637 txn_data = self._build_safe_transaction(to, data, base_transaction) 638 639 # Sign and send transaction 640 return self._execute_transaction(txn_data, operation_name="Redeem Position") 641 642 def convert_positions( 643 self, 644 question_ids: list[Keccak256], 645 amount: float, 646 ) -> TransactionReceipt: 647 amount = int(amount * 1e6) 648 neg_risk_market_id = question_ids[0][:-2] + "00" 649 650 to = self.neg_risk_adapter_address 651 data = self._encode_convert( 652 neg_risk_market_id, get_index_set(question_ids), amount 653 ) 654 base_transaction = self._build_base_transaction() 655 txn_data: TxParams = {} 656 657 match self.signature_type: 658 case 0: 659 txn_data = self.neg_risk_adapter.functions.convertPositions( 660 neg_risk_market_id, 661 get_index_set(question_ids), 662 amount, 663 ).build_transaction(transaction=base_transaction) 664 case 1: 665 txn_data = self._build_proxy_transaction(to, data, base_transaction) 666 case 2: 667 txn_data = self._build_safe_transaction(to, data, base_transaction) 668 669 # Sign and send transaction 670 return self._execute_transaction(txn_data, operation_name="Convert Positions")
42 def __init__( 43 self, 44 private_key: str, 45 signature_type: Literal[0, 1, 2] = 1, 46 chain_id: Literal[137, 80002] = POLYGON, 47 ): 48 self.w3 = Web3(Web3.HTTPProvider("https://polygon-rpc.com")) 49 self.w3.middleware_onion.inject(ExtraDataToPOAMiddleware, layer=0) # type: ignore[arg-type] 50 self.w3.middleware_onion.inject( 51 SignAndSendRawMiddlewareBuilder.build(private_key), # type: ignore[arg-type] 52 layer=0, 53 ) 54 55 self.account = self.w3.eth.account.from_key(private_key) 56 self.signature_type = signature_type 57 58 self.config = get_contract_config(chain_id, neg_risk=False) 59 self.neg_risk_config = get_contract_config(chain_id, neg_risk=True) 60 61 self.usdc_address = Web3.to_checksum_address(self.config.collateral) 62 self.usdc_abi = _load_abi("UChildERC20Proxy") 63 self.usdc = self._contract(self.usdc_address, self.usdc_abi) 64 65 self.conditional_tokens_address = Web3.to_checksum_address( 66 self.config.conditional_tokens 67 ) 68 self.conditional_tokens_abi = _load_abi("ConditionalTokens") 69 self.conditional_tokens = self._contract( 70 self.conditional_tokens_address, self.conditional_tokens_abi 71 ) 72 73 self.exchange_address = Web3.to_checksum_address(self.config.exchange) 74 self.exchange_abi = _load_abi("CTFExchange") 75 self.exchange = self._contract(self.exchange_address, self.exchange_abi) 76 77 self.neg_risk_exchange_address = Web3.to_checksum_address( 78 self.neg_risk_config.exchange 79 ) 80 self.neg_risk_exchange_abi = _load_abi("NegRiskCtfExchange") 81 self.neg_risk_exchange = self._contract( 82 self.neg_risk_exchange_address, self.neg_risk_exchange_abi 83 ) 84 85 self.neg_risk_adapter_address = Web3.to_checksum_address( 86 "0xd91E80cF2E7be2e162c6513ceD06f1dD0dA35296" 87 ) 88 self.neg_risk_adapter_abi = _load_abi("NegRiskAdapter") 89 self.neg_risk_adapter = self._contract( 90 self.neg_risk_adapter_address, self.neg_risk_adapter_abi 91 ) 92 93 self.proxy_factory_address = Web3.to_checksum_address( 94 "0xaB45c5A4B0c941a2F231C04C3f49182e1A254052" 95 ) 96 self.proxy_factory_abi = _load_abi("ProxyWalletFactory") 97 self.proxy_factory = self._contract( 98 self.proxy_factory_address, self.proxy_factory_abi 99 ) 100 101 self.safe_proxy_factory_address = Web3.to_checksum_address( 102 "0xaacFeEa03eb1561C4e67d661e40682Bd20E3541b" 103 ) 104 self.safe_proxy_factory_abi = _load_abi("SafeProxyFactory") 105 self.safe_proxy_factory = self._contract( 106 self.safe_proxy_factory_address, self.safe_proxy_factory_abi 107 ) 108 109 match self.signature_type: 110 case 0: 111 self.address = self.account.address 112 case 1: 113 self.address = self.get_poly_proxy_address() 114 case 2: 115 self.address = self.get_safe_proxy_address() 116 self.safe_abi = _load_abi("Safe") 117 self.safe = self._contract(self.address, self.safe_abi)
270 def get_poly_proxy_address(self, address: EthAddress | None = None) -> EthAddress: 271 """Get the polymarket proxy address for the current account.""" 272 address = address if address else self.account.address 273 return self.exchange.functions.getPolyProxyWalletAddress(address).call()
Get the polymarket proxy address for the current account.
275 def get_safe_proxy_address(self, address: EthAddress | None = None) -> EthAddress: 276 """Get the safe proxy address for the current account.""" 277 address = address if address else self.account.address 278 return self.safe_proxy_factory.functions.computeProxyAddress(address).call()
Get the safe proxy address for the current account.
280 def get_usdc_balance(self, address: EthAddress | None = None) -> float: 281 """ 282 Get the usdc balance of the given address. 283 284 If no address is given, the balance of the proxy account corresponding to 285 the private key is returned (i.e. Polymarket balance). 286 Explicitly passing the proxy address is faster due to only one contract function call. 287 """ 288 if address is None: 289 address = self.address 290 balance_res = self.usdc.functions.balanceOf(address).call() 291 return float(balance_res / 1e6)
Get the usdc balance of the given address.
If no address is given, the balance of the proxy account corresponding to the private key is returned (i.e. Polymarket balance). Explicitly passing the proxy address is faster due to only one contract function call.
293 def get_token_balance( 294 self, token_id: str, address: EthAddress | None = None 295 ) -> float: 296 """Get the token balance of the given address.""" 297 if not address: 298 address = self.address 299 balance_res = self.conditional_tokens.functions.balanceOf( 300 address, int(token_id) 301 ).call() 302 return float(balance_res / 1e6)
Get the token balance of the given address.
304 def get_token_complement(self, token_id: str) -> str | None: 305 """Get the complement of the given token.""" 306 try: 307 return str( 308 self.neg_risk_exchange.functions.getComplement(int(token_id)).call() 309 ) 310 except ContractCustomError as e: 311 if e.args[0] in CUSTOM_ERROR_DICT: 312 try: 313 return str( 314 self.exchange.functions.getComplement(int(token_id)).call() 315 ) 316 except ContractCustomError as e2: 317 if e2.args[0] in CUSTOM_ERROR_DICT: 318 msg = f"{CUSTOM_ERROR_DICT[e2.args[0]]}" 319 raise ContractCustomError( 320 msg, 321 ) from e2 322 return None 323 return None
Get the complement of the given token.
325 def get_condition_id_neg_risk(self, question_id: Keccak256) -> Keccak256: 326 """ 327 Get the condition id for a given question id. 328 329 Warning: this works for neg risk markets (where the 330 outcomeSlotCount is represented by the last two digits of question id). Returns a keccak256 hash of 331 the oracle and question id. 332 """ 333 return ( 334 "0x" 335 + self.neg_risk_adapter.functions.getConditionId(question_id).call().hex() 336 )
Get the condition id for a given question id.
Warning: this works for neg risk markets (where the outcomeSlotCount is represented by the last two digits of question id). Returns a keccak256 hash of the oracle and question id.
338 def deploy_safe(self) -> TransactionReceipt: 339 """Deploy a Safe wallet using the SafeProxyFactory contract.""" 340 safe_address = self.get_safe_proxy_address() 341 if self.w3.eth.get_code(self.w3.to_checksum_address(safe_address)) != b"": 342 msg = f"Safe already deployed at {safe_address}" 343 raise SafeAlreadyDeployedError(msg) 344 345 # Create the EIP-712 signature for Safe creation 346 sig = create_safe_create_signature(account=self.account, chain_id=POLYGON) 347 348 # Split the signature into r, s, v components 349 split_sig = split_signature(sig) 350 351 # Build the transaction 352 base_transaction = self._build_base_transaction() 353 354 # Execute the createProxy function 355 txn_data = self.safe_proxy_factory.functions.createProxy( 356 ADDRESS_ZERO, # paymentToken 357 0, # payment 358 ADDRESS_ZERO, # paymentReceiver 359 ( 360 split_sig["v"], 361 split_sig["r"], 362 split_sig["s"], 363 ), # createSig tuple (uint8, bytes32, bytes32) 364 ).build_transaction(transaction=base_transaction) 365 366 # Sign and send transaction 367 return self._execute_transaction( 368 txn_data, operation_name="Gnosis Safe Deployment" 369 )
Deploy a Safe wallet using the SafeProxyFactory contract.
371 def set_collateral_approval(self, spender: ChecksumAddress) -> TransactionReceipt: 372 to = self.usdc_address 373 data = self._encode_usdc_approve(address=spender) 374 base_transaction = self._build_base_transaction() 375 txn_data: TxParams = {} 376 377 match self.signature_type: 378 case 0: 379 txn_data = self.usdc.functions.approve( 380 spender, int(MAX_INT, base=16) 381 ).build_transaction(transaction=base_transaction) 382 case 1: 383 txn_data = self._build_proxy_transaction(to, data, base_transaction) 384 case 2: 385 txn_data = self._build_safe_transaction(to, data, base_transaction) 386 387 return self._execute_transaction(txn_data, operation_name="Collateral Approval")
389 def set_conditional_tokens_approval( 390 self, spender: ChecksumAddress 391 ) -> TransactionReceipt: 392 to = self.conditional_tokens_address 393 data = self._encode_condition_tokens_approve(address=spender) 394 base_transaction = self._build_base_transaction() 395 txn_data: TxParams = {} 396 397 match self.signature_type: 398 case 0: 399 txn_data = self.conditional_tokens.functions.setApprovalForAll( 400 spender, True 401 ).build_transaction(transaction=base_transaction) 402 case 1: 403 txn_data = self._build_proxy_transaction(to, data, base_transaction) 404 case 2: 405 txn_data = self._build_safe_transaction(to, data, base_transaction) 406 407 return self._execute_transaction( 408 txn_data, operation_name="Conditional Tokens Approval" 409 )
411 def set_all_approvals(self) -> list[TransactionReceipt]: 412 """Sets both collateral and conditional tokens approvals.""" 413 receipts = [] 414 print("Approving ConditionalTokens as spender on USDC") 415 receipts.append( 416 self.set_collateral_approval( 417 spender=self.conditional_tokens_address, 418 ) 419 ) 420 print("Approving CTFExchange as spender on USDC") 421 receipts.append( 422 self.set_collateral_approval( 423 spender=self.exchange_address, 424 ) 425 ) 426 print("Approving NegRiskCtfExchange as spender on USDC") 427 receipts.append( 428 self.set_collateral_approval( 429 spender=self.neg_risk_exchange_address, 430 ) 431 ) 432 print("Approving NegRiskAdapter as spender on USDC") 433 receipts.append( 434 self.set_collateral_approval( 435 spender=self.neg_risk_adapter_address, 436 ) 437 ) 438 print("Approving CTFExchange as spender on ConditionalTokens") 439 receipts.append( 440 self.set_conditional_tokens_approval( 441 spender=self.exchange_address, 442 ) 443 ) 444 print("Approving NegRiskCtfExchange as spender on ConditionalTokens") 445 receipts.append( 446 self.set_conditional_tokens_approval( 447 spender=self.neg_risk_exchange_address, 448 ) 449 ) 450 print("Approving NegRiskAdapter as spender on ConditionalTokens") 451 receipts.append( 452 self.set_conditional_tokens_approval( 453 spender=self.neg_risk_adapter_address, 454 ) 455 ) 456 print("All approvals set!") 457 458 return receipts
Sets both collateral and conditional tokens approvals.
460 def transfer_usdc(self, recipient: EthAddress, amount: float) -> TransactionReceipt: 461 """Transfers usdc.e from the account to the proxy address.""" 462 balance = self.get_usdc_balance(address=self.address) 463 if balance < amount: 464 msg = f"Insufficient USDC.e balance: {balance} < {amount}" 465 raise ValueError(msg) 466 amount = int(balance * 1e6) 467 468 to = self.usdc_address 469 data = self._encode_transfer_usdc( 470 self.w3.to_checksum_address(recipient), amount 471 ) 472 base_transaction = self._build_base_transaction() 473 txn_data: TxParams = {} 474 475 match self.signature_type: 476 case 0: 477 txn_data = self.usdc.functions.transfer( 478 recipient, 479 amount, 480 ).build_transaction(transaction=base_transaction) 481 case 1: 482 txn_data = self._build_proxy_transaction(to, data, base_transaction) 483 case 2: 484 txn_data = self._build_safe_transaction(to, data, base_transaction) 485 486 # Sign and send transaction 487 return self._execute_transaction(txn_data, operation_name="USDC Transfer")
Transfers usdc.e from the account to the proxy address.
489 def transfer_token( 490 self, token_id: str, recipient: EthAddress, amount: float 491 ) -> TransactionReceipt: 492 """Transfers conditional tokens from the account to the recipient address.""" 493 balance = self.get_token_balance(token_id=token_id, address=self.address) 494 if balance < amount: 495 msg = f"Insufficient token balance: {balance} < {amount}" 496 raise ValueError(msg) 497 amount = int(balance * 1e6) 498 499 to = self.conditional_tokens_address 500 data = self._encode_transfer_token( 501 token_id, self.w3.to_checksum_address(recipient), amount 502 ) 503 base_transaction = self._build_base_transaction() 504 txn_data: TxParams = {} 505 506 match self.signature_type: 507 case 0: 508 txn_data = self.conditional_tokens.functions.safeTransferFrom( 509 self.address, 510 recipient, 511 int(token_id), 512 amount, 513 b"", 514 ).build_transaction(transaction=base_transaction) 515 case 1: 516 txn_data = self._build_proxy_transaction(to, data, base_transaction) 517 case 2: 518 txn_data = self._build_safe_transaction(to, data, base_transaction) 519 520 # Sign and send transaction 521 return self._execute_transaction(txn_data, operation_name="Token Transfer")
Transfers conditional tokens from the account to the recipient address.
523 def split_position( 524 self, condition_id: Keccak256, amount: float, neg_risk: bool = True 525 ) -> TransactionReceipt: 526 """Splits usdc into two complementary positions of equal size.""" 527 amount = int(amount * 1e6) 528 529 to = ( 530 self.neg_risk_adapter_address 531 if neg_risk 532 else self.conditional_tokens_address 533 ) 534 data = self._encode_split(condition_id, amount) 535 base_transaction = self._build_base_transaction() 536 txn_data: TxParams = {} 537 538 match self.signature_type: 539 case 0: 540 _contract = ( 541 self.neg_risk_adapter if neg_risk else self.conditional_tokens 542 ) 543 txn_data = _contract.functions.splitPosition( 544 self.usdc_address, 545 HASH_ZERO, 546 condition_id, 547 [1, 2], 548 amount, 549 ).build_transaction(transaction=base_transaction) 550 case 1: 551 txn_data = self._build_proxy_transaction(to, data, base_transaction) 552 case 2: 553 txn_data = self._build_safe_transaction(to, data, base_transaction) 554 555 # Sign and send transaction 556 return self._execute_transaction(txn_data, operation_name="Split Position")
Splits usdc into two complementary positions of equal size.
558 def merge_position( 559 self, condition_id: Keccak256, amount: float, neg_risk: bool = True 560 ) -> TransactionReceipt: 561 """Merges two complementary positions into usdc.""" 562 amount = int(amount * 1e6) 563 564 to = ( 565 self.neg_risk_adapter_address 566 if neg_risk 567 else self.conditional_tokens_address 568 ) 569 data = self._encode_merge(condition_id, amount) 570 base_transaction = self._build_base_transaction() 571 txn_data: TxParams = {} 572 573 match self.signature_type: 574 case 0: 575 _contract = ( 576 self.neg_risk_adapter if neg_risk else self.conditional_tokens 577 ) 578 txn_data = _contract.functions.mergePositions( 579 self.usdc_address, 580 HASH_ZERO, 581 condition_id, 582 [1, 2], 583 amount, 584 ).build_transaction(transaction=base_transaction) 585 case 1: 586 txn_data = self._build_proxy_transaction(to, data, base_transaction) 587 case 2: 588 txn_data = self._build_safe_transaction(to, data, base_transaction) 589 590 # Sign and send transaction 591 return self._execute_transaction(txn_data, operation_name="Merge Position")
Merges two complementary positions into usdc.
593 def redeem_position( 594 self, condition_id: Keccak256, amounts: list[float], neg_risk: bool = True 595 ) -> TransactionReceipt: 596 """ 597 Redeem a position into usdc. 598 599 Takes a condition id and a list of sizes in shares [x, y] 600 where x is the number of shares of the first outcome 601 y is the number of shares of the second outcome. 602 """ 603 int_amounts = [int(amount * 1e6) for amount in amounts] 604 605 to = ( 606 self.neg_risk_adapter_address 607 if neg_risk 608 else self.conditional_tokens_address 609 ) 610 data = ( 611 self._encode_redeem_neg_risk(condition_id, int_amounts) 612 if neg_risk 613 else self._encode_redeem(condition_id) 614 ) 615 base_transaction = self._build_base_transaction() 616 txn_data: TxParams = {} 617 618 match self.signature_type: 619 case 0: 620 _contract = ( 621 self.neg_risk_adapter if neg_risk else self.conditional_tokens 622 ) 623 if neg_risk: 624 txn_data = _contract.functions.redeemPositions( 625 condition_id, int_amounts 626 ).build_transaction(transaction=base_transaction) 627 else: 628 txn_data = _contract.functions.redeemPositions( 629 self.usdc_address, 630 HASH_ZERO, 631 condition_id, 632 [1, 2], 633 ).build_transaction(transaction=base_transaction) 634 case 1: 635 txn_data = self._build_proxy_transaction(to, data, base_transaction) 636 case 2: 637 txn_data = self._build_safe_transaction(to, data, base_transaction) 638 639 # Sign and send transaction 640 return self._execute_transaction(txn_data, operation_name="Redeem Position")
Redeem a position into usdc.
Takes a condition id and a list of sizes in shares [x, y] where x is the number of shares of the first outcome y is the number of shares of the second outcome.
642 def convert_positions( 643 self, 644 question_ids: list[Keccak256], 645 amount: float, 646 ) -> TransactionReceipt: 647 amount = int(amount * 1e6) 648 neg_risk_market_id = question_ids[0][:-2] + "00" 649 650 to = self.neg_risk_adapter_address 651 data = self._encode_convert( 652 neg_risk_market_id, get_index_set(question_ids), amount 653 ) 654 base_transaction = self._build_base_transaction() 655 txn_data: TxParams = {} 656 657 match self.signature_type: 658 case 0: 659 txn_data = self.neg_risk_adapter.functions.convertPositions( 660 neg_risk_market_id, 661 get_index_set(question_ids), 662 amount, 663 ).build_transaction(transaction=base_transaction) 664 case 1: 665 txn_data = self._build_proxy_transaction(to, data, base_transaction) 666 case 2: 667 txn_data = self._build_safe_transaction(to, data, base_transaction) 668 669 # Sign and send transaction 670 return self._execute_transaction(txn_data, operation_name="Convert Positions")
129class PolymarketWebsocketsClient: 130 def __init__(self): 131 self.url_market = "wss://ws-subscriptions-clob.polymarket.com/ws/market" 132 self.url_user = "wss://ws-subscriptions-clob.polymarket.com/ws/user" 133 self.url_live_data = "wss://ws-live-data.polymarket.com" 134 135 def market_socket( 136 self, token_ids: list[str], process_event: Callable = _process_market_event 137 ): 138 """ 139 Connect to the market websocket and subscribe to market events for specific token IDs. 140 141 Args: 142 token_ids: List of token IDs to subscribe to 143 process_event: Callback function to process received events 144 145 """ 146 websocket = WebSocket(self.url_market) 147 148 for event in persist(websocket): # persist automatically reconnects 149 if event.name == "ready": 150 websocket.send_json( 151 assets_ids=token_ids, 152 ) 153 elif event.name == "text": 154 process_event(event) 155 156 def user_socket( 157 self, creds: ApiCreds, process_event: Callable = _process_user_event 158 ): 159 """ 160 Connect to the user websocket and subscribe to user events. 161 162 Args: 163 creds: API credentials for authentication 164 process_event: Callback function to process received events 165 166 """ 167 websocket = WebSocket(self.url_user) 168 169 for event in persist(websocket): 170 if event.name == "ready": 171 websocket.send_json( 172 auth=creds.model_dump(by_alias=True), 173 ) 174 elif event.name == "text": 175 process_event(event) 176 177 def live_data_socket( 178 self, 179 subscriptions: list[dict[str, Any]], 180 process_event: Callable = _process_live_data_event, 181 creds: Optional[ApiCreds] = None, 182 ): 183 # info on how to subscribe found at https://github.com/Polymarket/real-time-data-client?tab=readme-ov-file#subscribe 184 """ 185 Connect to the live data websocket and subscribe to specified events. 186 187 Args: 188 subscriptions: List of subscription configurations 189 process_event: Callback function to process received events 190 creds: ApiCreds for authentication if subscribing to clob_user topic 191 192 """ 193 websocket = WebSocket(self.url_live_data) 194 195 needs_auth = any(sub.get("topic") == "clob_user" for sub in subscriptions) 196 197 for event in persist(websocket): 198 if event.name == "ready": 199 if needs_auth: 200 if creds is None: 201 msg = "ApiCreds credentials are required for the clob_user topic subscriptions" 202 raise AuthenticationRequiredError(msg) 203 subscriptions_with_creds = [] 204 for sub in subscriptions: 205 if sub.get("topic") == "clob_user": 206 sub_copy = sub.copy() 207 sub_copy["clob_auth"] = creds.model_dump() 208 subscriptions_with_creds.append(sub_copy) 209 else: 210 subscriptions_with_creds.append(sub) 211 subscriptions = subscriptions_with_creds 212 213 payload = { 214 "action": "subscribe", 215 "subscriptions": subscriptions, 216 } 217 218 websocket.send_json(**payload) 219 220 elif event.name == "text": 221 process_event(event)
135 def market_socket( 136 self, token_ids: list[str], process_event: Callable = _process_market_event 137 ): 138 """ 139 Connect to the market websocket and subscribe to market events for specific token IDs. 140 141 Args: 142 token_ids: List of token IDs to subscribe to 143 process_event: Callback function to process received events 144 145 """ 146 websocket = WebSocket(self.url_market) 147 148 for event in persist(websocket): # persist automatically reconnects 149 if event.name == "ready": 150 websocket.send_json( 151 assets_ids=token_ids, 152 ) 153 elif event.name == "text": 154 process_event(event)
Connect to the market websocket and subscribe to market events for specific token IDs.
Args: token_ids: List of token IDs to subscribe to process_event: Callback function to process received events
156 def user_socket( 157 self, creds: ApiCreds, process_event: Callable = _process_user_event 158 ): 159 """ 160 Connect to the user websocket and subscribe to user events. 161 162 Args: 163 creds: API credentials for authentication 164 process_event: Callback function to process received events 165 166 """ 167 websocket = WebSocket(self.url_user) 168 169 for event in persist(websocket): 170 if event.name == "ready": 171 websocket.send_json( 172 auth=creds.model_dump(by_alias=True), 173 ) 174 elif event.name == "text": 175 process_event(event)
Connect to the user websocket and subscribe to user events.
Args: creds: API credentials for authentication process_event: Callback function to process received events
177 def live_data_socket( 178 self, 179 subscriptions: list[dict[str, Any]], 180 process_event: Callable = _process_live_data_event, 181 creds: Optional[ApiCreds] = None, 182 ): 183 # info on how to subscribe found at https://github.com/Polymarket/real-time-data-client?tab=readme-ov-file#subscribe 184 """ 185 Connect to the live data websocket and subscribe to specified events. 186 187 Args: 188 subscriptions: List of subscription configurations 189 process_event: Callback function to process received events 190 creds: ApiCreds for authentication if subscribing to clob_user topic 191 192 """ 193 websocket = WebSocket(self.url_live_data) 194 195 needs_auth = any(sub.get("topic") == "clob_user" for sub in subscriptions) 196 197 for event in persist(websocket): 198 if event.name == "ready": 199 if needs_auth: 200 if creds is None: 201 msg = "ApiCreds credentials are required for the clob_user topic subscriptions" 202 raise AuthenticationRequiredError(msg) 203 subscriptions_with_creds = [] 204 for sub in subscriptions: 205 if sub.get("topic") == "clob_user": 206 sub_copy = sub.copy() 207 sub_copy["clob_auth"] = creds.model_dump() 208 subscriptions_with_creds.append(sub_copy) 209 else: 210 subscriptions_with_creds.append(sub) 211 subscriptions = subscriptions_with_creds 212 213 payload = { 214 "action": "subscribe", 215 "subscriptions": subscriptions, 216 } 217 218 websocket.send_json(**payload) 219 220 elif event.name == "text": 221 process_event(event)
Connect to the live data websocket and subscribe to specified events.
Args: subscriptions: List of subscription configurations process_event: Callback function to process received events creds: ApiCreds for authentication if subscribing to clob_user topic