polymarket_apis

Polymarket APIs - Unified interface for Polymarket's various APIs.

This package provides a comprehensive interface to Polymarket's APIs including:

  • CLOB (Central Limit Order Book) API for trading
  • Gamma API for event and market information
  • Data API for user information
  • Web3 API for blockchain interactions
  • WebSocket API for real-time data streams
  • GraphQL API for flexible data queries
 1"""
 2Polymarket APIs - Unified interface for Polymarket's various APIs.
 3
 4This package provides a comprehensive interface to Polymarket's APIs including:
 5- CLOB (Central Limit Order Book) API for trading
 6- Gamma API for event and market information
 7- Data API for user information
 8- Web3 API for blockchain interactions
 9- WebSocket API for real-time data streams
10- GraphQL API for flexible data queries
11"""
12
13__version__ = "0.3.9"
14__author__ = "Razvan Gheorghe"
15__email__ = "razvan@gheorghe.me"
16
17from .clients import (
18    AsyncPolymarketGraphQLClient,
19    PolymarketClobClient,
20    PolymarketDataClient,
21    PolymarketGammaClient,
22    PolymarketGraphQLClient,
23    PolymarketWeb3Client,
24    PolymarketWebsocketsClient,
25)
26from .types.clob_types import MarketOrderArgs, OrderArgs, OrderType
27
28__all__ = [
29    "AsyncPolymarketGraphQLClient",
30    "MarketOrderArgs",
31    "OrderArgs",
32    "OrderType",
33    "PolymarketClobClient",
34    "PolymarketDataClient",
35    "PolymarketGammaClient",
36    "PolymarketGraphQLClient",
37    "PolymarketWeb3Client",
38    "PolymarketWebsocketsClient",
39    "__author__",
40    "__email__",
41    "__version__",
42]
class AsyncPolymarketGraphQLClient:
37class AsyncPolymarketGraphQLClient:
38    """Asynchronous GraphQL client for Polymarket subgraphs."""
39
40    def __init__(
41        self,
42        endpoint_name: Literal[
43            "activity_subgraph",
44            "fpmm_subgraph",
45            "open_interest_subgraph",
46            "orderbook_subgraph",
47            "pnl_subgraph",
48            "positions_subgraph",
49            "sports_oracle_subgraph",
50            "wallet_subgraph",
51        ],
52    ) -> None:
53        endpoint_url = GRAPHQL_ENDPOINTS[endpoint_name]
54        self.transport = HTTPXAsyncTransport(url=endpoint_url)
55        self.client = Client(
56            transport=self.transport, fetch_schema_from_transport=False
57        )
58
59    async def query(self, query_string: str) -> dict:
60        async with self.client as session:
61            return await session.execute(gql(query_string))

Asynchronous GraphQL client for Polymarket subgraphs.

AsyncPolymarketGraphQLClient( endpoint_name: Literal['activity_subgraph', 'fpmm_subgraph', 'open_interest_subgraph', 'orderbook_subgraph', 'pnl_subgraph', 'positions_subgraph', 'sports_oracle_subgraph', 'wallet_subgraph'])
40    def __init__(
41        self,
42        endpoint_name: Literal[
43            "activity_subgraph",
44            "fpmm_subgraph",
45            "open_interest_subgraph",
46            "orderbook_subgraph",
47            "pnl_subgraph",
48            "positions_subgraph",
49            "sports_oracle_subgraph",
50            "wallet_subgraph",
51        ],
52    ) -> None:
53        endpoint_url = GRAPHQL_ENDPOINTS[endpoint_name]
54        self.transport = HTTPXAsyncTransport(url=endpoint_url)
55        self.client = Client(
56            transport=self.transport, fetch_schema_from_transport=False
57        )
transport
client
async def query(self, query_string: str) -> dict:
59    async def query(self, query_string: str) -> dict:
60        async with self.client as session:
61            return await session.execute(gql(query_string))
class MarketOrderArgs(pydantic.main.BaseModel):
439class MarketOrderArgs(BaseModel):
440    token_id: str
441    """
442    TokenID of the Conditional token asset being traded
443    """
444
445    amount: float
446    """
447    BUY orders: $$$ Amount to buy
448    SELL orders: Shares to sell
449    """
450
451    side: str
452    """
453    Side of the order
454    """
455
456    price: float = 0
457    """
458    Price used to create the order
459    """
460
461    fee_rate_bps: int = 0
462    """
463    Fee rate, in basis points, charged to the order maker, charged on proceeds.
464    """
465
466    nonce: int = 0
467    """
468    Nonce used for onchain cancellations.
469    """
470
471    taker: str = ADDRESS_ZERO
472    """
473    Address of the order taker. The zero address is used to indicate a public order.
474    """
475
476    order_type: OrderType = OrderType.FOK

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes: __class_vars__: The names of the class variables defined on the model. __private_attributes__: Metadata about the private attributes of the model. __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.

__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The core schema of the model.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
    This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
    __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a [`RootModel`][pydantic.root_model.RootModel].
__pydantic_serializer__: The `pydantic-core` `SchemaSerializer` used to dump instances of the model.
__pydantic_validator__: The `pydantic-core` `SchemaValidator` used to validate instances of the model.

__pydantic_fields__: A dictionary of field names and their corresponding [`FieldInfo`][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__: A dictionary of computed field names and their corresponding [`ComputedFieldInfo`][pydantic.fields.ComputedFieldInfo] objects.

__pydantic_extra__: A dictionary containing extra values, if [`extra`][pydantic.config.ConfigDict.extra]
    is set to `'allow'`.
__pydantic_fields_set__: The names of fields explicitly set during instantiation.
__pydantic_private__: Values of private attributes set on the model instance.
token_id: str = PydanticUndefined

TokenID of the Conditional token asset being traded

amount: float = PydanticUndefined

BUY orders: $$$ Amount to buy SELL orders: Shares to sell

side: str = PydanticUndefined

Side of the order

price: float = 0

Price used to create the order

fee_rate_bps: int = 0

Fee rate, in basis points, charged to the order maker, charged on proceeds.

nonce: int = 0

Nonce used for onchain cancellations.

taker: str = '0x0000000000000000000000000000000000000000'

Address of the order taker. The zero address is used to indicate a public order.

order_type: OrderType = <OrderType.FOK: 'FOK'>
class OrderArgs(pydantic.main.BaseModel):
397class OrderArgs(BaseModel):
398    token_id: str
399    """
400    TokenID of the Conditional token asset being traded
401    """
402
403    price: float
404    """
405    Price used to create the order
406    """
407
408    size: float
409    """
410    Size in terms of the ConditionalToken
411    """
412
413    side: str
414    """
415    Side of the order
416    """
417
418    fee_rate_bps: int = 0
419    """
420    Fee rate, in basis points, charged to the order maker, charged on proceeds
421    """
422
423    nonce: int = 0
424    """
425    Nonce used for onchain cancellations
426    """
427
428    expiration: int = 0
429    """
430    Timestamp after which the order is expired.
431    """
432
433    taker: str = ADDRESS_ZERO
434    """
435    Address of the order taker. The zero address is used to indicate a public order.
436    """

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes: __class_vars__: The names of the class variables defined on the model. __private_attributes__: Metadata about the private attributes of the model. __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.

__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The core schema of the model.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
    This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
    __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a [`RootModel`][pydantic.root_model.RootModel].
__pydantic_serializer__: The `pydantic-core` `SchemaSerializer` used to dump instances of the model.
__pydantic_validator__: The `pydantic-core` `SchemaValidator` used to validate instances of the model.

__pydantic_fields__: A dictionary of field names and their corresponding [`FieldInfo`][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__: A dictionary of computed field names and their corresponding [`ComputedFieldInfo`][pydantic.fields.ComputedFieldInfo] objects.

__pydantic_extra__: A dictionary containing extra values, if [`extra`][pydantic.config.ConfigDict.extra]
    is set to `'allow'`.
__pydantic_fields_set__: The names of fields explicitly set during instantiation.
__pydantic_private__: Values of private attributes set on the model instance.
token_id: str = PydanticUndefined

TokenID of the Conditional token asset being traded

price: float = PydanticUndefined

Price used to create the order

size: float = PydanticUndefined

Size in terms of the ConditionalToken

side: str = PydanticUndefined

Side of the order

fee_rate_bps: int = 0

Fee rate, in basis points, charged to the order maker, charged on proceeds

nonce: int = 0

Nonce used for onchain cancellations

expiration: int = 0

Timestamp after which the order is expired.

taker: str = '0x0000000000000000000000000000000000000000'

Address of the order taker. The zero address is used to indicate a public order.

class OrderType(builtins.str, enum.Enum):
371class OrderType(str, Enum):
372    GTC = "GTC"  # Good Till Cancelled
373    GTD = "GTD"  # Good Till Date
374    FOK = "FOK"  # Fill or Kill
375    FAK = "FAK"  # Fill and Kill

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

GTC = <OrderType.GTC: 'GTC'>
GTD = <OrderType.GTD: 'GTD'>
FOK = <OrderType.FOK: 'FOK'>
FAK = <OrderType.FAK: 'FAK'>
class PolymarketClobClient:
 93class PolymarketClobClient:
 94    def __init__(
 95        self,
 96        private_key: str,
 97        address: EthAddress,
 98        creds: ApiCreds | None = None,
 99        chain_id: Literal[137, 80002] = POLYGON,
100        signature_type: Literal[0, 1, 2] = 1,
101        # 0 - EOA wallet, 1 - Proxy wallet, 2 - Gnosis Safe wallet
102    ):
103        self.address = address
104        self.client = httpx.Client(http2=True, timeout=30.0)
105        self.async_client = httpx.AsyncClient(http2=True, timeout=30.0)
106        self.base_url: str = "https://clob.polymarket.com"
107        self.signer = Signer(private_key=private_key, chain_id=chain_id)
108        self.builder = OrderBuilder(
109            signer=self.signer,
110            sig_type=signature_type,
111            funder=address,
112        )
113        self.creds = creds if creds else self.create_or_derive_api_creds()
114
115        # local cache
116        self.__tick_sizes: dict[str, TickSize] = {}
117        self.__neg_risk: dict[str, bool] = {}
118        self.__fee_rates: dict[str, int] = {}
119
120    def _build_url(self, endpoint: str) -> str:
121        return urljoin(self.base_url, endpoint)
122
123    def get_ok(self) -> str:
124        response = self.client.get(self.base_url)
125        response.raise_for_status()
126        return response.json()
127
128    def create_api_creds(self, nonce: int | None = None) -> ApiCreds:
129        headers = create_level_1_headers(self.signer, nonce)
130        response = self.client.post(self._build_url(CREATE_API_KEY), headers=headers)
131        response.raise_for_status()
132        return ApiCreds(**response.json())
133
134    def derive_api_key(self, nonce: int | None = None) -> ApiCreds:
135        headers = create_level_1_headers(self.signer, nonce)
136        response = self.client.get(self._build_url(DERIVE_API_KEY), headers=headers)
137        response.raise_for_status()
138        return ApiCreds(**response.json())
139
140    def create_or_derive_api_creds(self, nonce: int | None = None) -> ApiCreds:
141        try:
142            return self.create_api_creds(nonce)
143        except HTTPStatusError:
144            return self.derive_api_key(nonce)
145
146    def set_api_creds(self, creds: ApiCreds) -> None:
147        self.creds = creds
148
149    def get_api_keys(self) -> dict:
150        request_args = RequestArgs(method="GET", request_path=GET_API_KEYS)
151        headers = create_level_2_headers(self.signer, self.creds, request_args)
152        response = self.client.get(self._build_url(GET_API_KEYS), headers=headers)
153        response.raise_for_status()
154        return response.json()
155
156    def delete_api_keys(self) -> Literal["OK"]:
157        request_args = RequestArgs(method="DELETE", request_path=DELETE_API_KEY)
158        headers = create_level_2_headers(self.signer, self.creds, request_args)
159        response = self.client.delete(self._build_url(DELETE_API_KEY), headers=headers)
160        response.raise_for_status()
161        return response.json()
162
163    def get_utc_time(self) -> datetime:
164        # parse server timestamp into utc datetime
165        response = self.client.get(self._build_url(TIME))
166        response.raise_for_status()
167        return datetime.fromtimestamp(response.json(), tz=UTC)
168
169    def get_tick_size(self, token_id: str) -> TickSize:
170        if token_id in self.__tick_sizes:
171            return self.__tick_sizes[token_id]
172
173        params = {"token_id": token_id}
174        response = self.client.get(self._build_url(GET_TICK_SIZE), params=params)
175        response.raise_for_status()
176        self.__tick_sizes[token_id] = cast(
177            "TickSize", str(response.json()["minimum_tick_size"])
178        )
179
180        return self.__tick_sizes[token_id]
181
182    def get_neg_risk(self, token_id: str) -> bool:
183        if token_id in self.__neg_risk:
184            return self.__neg_risk[token_id]
185
186        params = {"token_id": token_id}
187        response = self.client.get(self._build_url(GET_NEG_RISK), params=params)
188        response.raise_for_status()
189        self.__neg_risk[token_id] = response.json()["neg_risk"]
190
191        return self.__neg_risk[token_id]
192
193    def get_fee_rate_bps(self, token_id: str) -> int:
194        if token_id in self.__fee_rates:
195            return self.__fee_rates[token_id]
196
197        params = {"token_id": token_id}
198        response = self.client.get(self._build_url(GET_FEE_RATE), params=params)
199        response.raise_for_status()
200        fee_rate = response.json().get("base_fee") or 0
201        self.__fee_rates[token_id] = fee_rate
202
203        return fee_rate
204
205    def __resolve_tick_size(
206        self,
207        token_id: str,
208        tick_size: TickSize | None = None,
209    ) -> TickSize:
210        min_tick_size = self.get_tick_size(token_id)
211        if tick_size is not None:
212            if is_tick_size_smaller(tick_size, min_tick_size):
213                msg = f"invalid tick size ({tick_size!s}), minimum for the market is {min_tick_size!s}"
214                raise InvalidTickSizeError(msg)
215        else:
216            tick_size = min_tick_size
217        return tick_size
218
219    def __resolve_fee_rate(
220        self,
221        token_id: str,
222        user_fee_rate: int | None = None,
223    ) -> int:
224        market_fee_rate_bps = self.get_fee_rate_bps(token_id)
225        # If both fee rate on the market and the user supplied fee rate are non-zero, validate that they match
226        # else return the market fee rate
227        if (
228            market_fee_rate_bps > 0
229            and user_fee_rate is not None
230            and user_fee_rate > 0
231            and user_fee_rate != market_fee_rate_bps
232        ):
233            msg = f"invalid user provided fee rate: ({user_fee_rate}), fee rate for the market must be {market_fee_rate_bps}"
234            raise InvalidFeeRateError(msg)
235        return market_fee_rate_bps
236
237    def get_midpoint(self, token_id: str) -> Midpoint:
238        """Get the mid-market price for the given token."""
239        params = {"token_id": token_id}
240        response = self.client.get(self._build_url(MID_POINT), params=params)
241        response.raise_for_status()
242        return Midpoint(token_id=token_id, value=float(response.json()["mid"]))
243
244    def get_midpoints(self, token_ids: list[str]) -> dict:
245        """Get the mid-market prices for a set of tokens."""
246        data = [{"token_id": token_id} for token_id in token_ids]
247        response = self.client.post(self._build_url(MID_POINTS), json=data)
248        response.raise_for_status()
249        return TokenValueDict(**response.json()).root
250
251    def get_spread(self, token_id: str) -> Spread:
252        """Get the spread for the given token."""
253        params = {"token_id": token_id}
254        response = self.client.get(self._build_url(GET_SPREAD), params=params)
255        response.raise_for_status()
256        return Spread(token_id=token_id, value=float(response.json()["mid"]))
257
258    def get_spreads(self, token_ids: list[str]) -> dict:
259        """Get the spreads for a set of tokens."""
260        data = [{"token_id": token_id} for token_id in token_ids]
261        response = self.client.post(self._build_url(GET_SPREADS), json=data)
262        response.raise_for_status()
263        return TokenValueDict(**response.json()).root
264
265    def get_price(self, token_id: str, side: Literal["BUY", "SELL"]) -> Price:
266        """Get the market price for the given token and side."""
267        params = {"token_id": token_id, "side": side}
268        response = self.client.get(self._build_url(PRICE), params=params)
269        response.raise_for_status()
270        return Price(**response.json(), token_id=token_id, side=side)
271
272    def get_prices(self, params: list[BookParams]) -> dict[str, BidAsk]:
273        """Get the market prices for a set of tokens and sides."""
274        data = [{"token_id": param.token_id, "side": param.side} for param in params]
275        response = self.client.post(self._build_url(GET_PRICES), json=data)
276        response.raise_for_status()
277        return TokenBidAskDict(**response.json()).root
278
279    def get_last_trade_price(self, token_id) -> Price:
280        """Fetches the last trade price for a token_id."""
281        params = {"token_id": token_id}
282        response = self.client.get(self._build_url(GET_LAST_TRADE_PRICE), params=params)
283        response.raise_for_status()
284        return Price(**response.json(), token_id=token_id)
285
286    def get_last_trades_prices(self, token_ids: list[str]) -> list[Price]:
287        """Fetches the last trades prices for a set of token ids."""
288        body = [{"token_id": token_id} for token_id in token_ids]
289        response = self.client.post(self._build_url(GET_LAST_TRADES_PRICES), json=body)
290        response.raise_for_status()
291        return [Price(**price) for price in response.json()]
292
293    def get_order_book(self, token_id) -> OrderBookSummary:
294        """Get the orderbook for the given token."""
295        params = {"token_id": token_id}
296        response = self.client.get(self._build_url(GET_ORDER_BOOK), params=params)
297        response.raise_for_status()
298        return OrderBookSummary(**response.json())
299
300    def get_order_books(self, token_ids: list[str]) -> list[OrderBookSummary]:
301        """Get the orderbook for a set of tokens."""
302        body = [{"token_id": token_id} for token_id in token_ids]
303        response = self.client.post(self._build_url(GET_ORDER_BOOKS), json=body)
304        response.raise_for_status()
305        return [OrderBookSummary(**obs) for obs in response.json()]
306
307    async def get_order_books_async(
308        self, token_ids: list[str]
309    ) -> list[OrderBookSummary]:
310        """Get the orderbook for a set of tokens asynchronously."""
311        body = [{"token_id": token_id} for token_id in token_ids]
312        response = await self.async_client.post(
313            self._build_url(GET_ORDER_BOOKS), json=body
314        )
315        response.raise_for_status()
316        return [OrderBookSummary(**obs) for obs in response.json()]
317
318    def get_market(self, condition_id) -> ClobMarket:
319        """Get a ClobMarket by condition_id."""
320        response = self.client.get(self._build_url(GET_MARKET + condition_id))
321        response.raise_for_status()
322        return ClobMarket(**response.json())
323
324    def get_markets(self, next_cursor="MA==") -> PaginatedResponse[ClobMarket]:
325        """Get paginated ClobMarkets."""
326        params = {"next_cursor": next_cursor}
327        response = self.client.get(self._build_url(GET_MARKETS), params=params)
328        response.raise_for_status()
329        return PaginatedResponse[ClobMarket](**response.json())
330
331    def get_all_markets(self, next_cursor="MA==") -> list[ClobMarket]:
332        """Recursively fetch all ClobMarkets using pagination."""
333        # Base case: Stop recursion if next_cursor indicates the last page
334        if next_cursor == "LTE=":
335            print("Reached the last page of markets.")
336            return []
337
338        # Fetch current page of markets
339        paginated_response = self.get_markets(next_cursor=next_cursor)
340
341        # Collect current page data
342        current_markets = paginated_response.data
343
344        # Recursively fetch remaining pages
345        next_page_markets = self.get_all_markets(
346            next_cursor=paginated_response.next_cursor,
347        )
348
349        # Combine current page data with data from subsequent pages
350        return current_markets + next_page_markets
351
352    def get_recent_history(
353        self,
354        token_id: str,
355        interval: Literal["1h", "6h", "1d", "1w", "1m", "max"] = "1d",
356        fidelity: int = 1,  # resolution in minutes
357    ) -> PriceHistory:
358        """Get the recent price history of a token (up to now) - 1h, 6h, 1d, 1w, 1m."""
359        min_fidelities: dict[str, int] = {
360            "1h": 1,
361            "6h": 1,
362            "1d": 1,
363            "1w": 5,
364            "1m": 10,
365            "max": 2,
366        }
367
368        if fidelity < min_fidelities[interval]:
369            msg = f"invalid filters: minimum fidelity' for '{interval}' range is {min_fidelities.get(interval)}"
370            raise ValueError(msg)
371
372        params: dict[str, int | str] = {
373            "market": token_id,
374            "interval": interval,
375            "fidelity": fidelity,
376        }
377        response = self.client.get(self._build_url("/prices-history"), params=params)
378        response.raise_for_status()
379        return PriceHistory(**response.json(), token_id=token_id)
380
381    def get_history(
382        self,
383        token_id: str,
384        start_time: datetime | None = None,
385        end_time: datetime | None = None,
386        fidelity: int = 2,  # resolution in minutes
387    ) -> PriceHistory:
388        """Get the price history of a token between a selected date range of max 15 days or from start_time to now."""
389        if start_time is None and end_time is None:
390            msg = "At least 'start_time' or ('start_time' and 'end_time') must be provided"
391            raise ValueError(msg)
392
393        if (
394            start_time
395            and end_time
396            and start_time + timedelta(days=15, seconds=1) < end_time
397        ):
398            msg = "'start_time' - 'end_time' range cannot exceed 15 days. Remove 'end_time' to get prices up to now or set a shorter range."
399            raise ValueError(msg)
400
401        params: dict[str, int | str] = {
402            "market": token_id,
403            "fidelity": fidelity,
404        }
405        if start_time:
406            params["startTs"] = int(start_time.timestamp())
407        if end_time:
408            params["endTs"] = int(end_time.timestamp())
409
410        response = self.client.get(self._build_url("/prices-history"), params=params)
411        response.raise_for_status()
412        return PriceHistory(**response.json(), token_id=token_id)
413
414    def get_all_history(self, token_id: str) -> PriceHistory:
415        """Get the full price history of a token."""
416        return self.get_history(
417            token_id=token_id,
418            start_time=datetime(2020, 1, 1, tzinfo=UTC),
419        )
420
421    def get_orders(
422        self,
423        order_id: str | None = None,
424        condition_id: Keccak256 | None = None,
425        token_id: str | None = None,
426        next_cursor: str = "MA==",
427    ) -> list[OpenOrder]:
428        """Gets your active orders, filtered by order_id, condition_id, token_id."""
429        params = {}
430        if order_id:
431            params["id"] = order_id
432        if condition_id:
433            params["market"] = condition_id
434        if token_id:
435            params["asset_id"] = token_id
436
437        request_args = RequestArgs(method="GET", request_path=ORDERS)
438        headers = create_level_2_headers(self.signer, self.creds, request_args)
439
440        results = []
441        next_cursor = next_cursor if next_cursor is not None else "MA=="
442        while next_cursor != END_CURSOR:
443            params["next_cursor"] = next_cursor
444            response = self.client.get(
445                self._build_url(ORDERS), headers=headers, params=params
446            )
447            response.raise_for_status()
448            next_cursor = response.json()["next_cursor"]
449            results += [OpenOrder(**order) for order in response.json()["data"]]
450
451        return results
452
453    def create_order(
454        self, order_args: OrderArgs, options: PartialCreateOrderOptions | None = None
455    ) -> SignedOrder:
456        """Creates and signs an order."""
457        # add resolve_order_options, or similar
458        tick_size = self.__resolve_tick_size(
459            order_args.token_id,
460            options.tick_size if options else None,
461        )
462
463        if not price_valid(order_args.price, tick_size):
464            msg = f"price ({order_args.price}), min: {tick_size} - max: {1 - float(tick_size)}"
465            raise InvalidPriceError(msg)
466
467        neg_risk = (
468            options.neg_risk
469            if options and options.neg_risk is not None
470            else self.get_neg_risk(order_args.token_id)
471        )
472
473        # fee rate
474        fee_rate_bps = self.__resolve_fee_rate(
475            order_args.token_id, order_args.fee_rate_bps
476        )
477        order_args.fee_rate_bps = fee_rate_bps
478
479        return self.builder.create_order(
480            order_args,
481            CreateOrderOptions(
482                tick_size=tick_size,
483                neg_risk=neg_risk,
484            ),
485        )
486
487    def post_order(
488        self, order: SignedOrder, order_type: OrderType = OrderType.GTC
489    ) -> OrderPostResponse | None:
490        """Posts a SignedOrder."""
491        body = order_to_json(order, self.creds.key, order_type)
492        headers = create_level_2_headers(
493            self.signer,
494            self.creds,
495            RequestArgs(method="POST", request_path=POST_ORDER, body=body),
496        )
497
498        try:
499            response = self.client.post(
500                self._build_url("/order"),
501                headers=headers,
502                content=json.dumps(body).encode("utf-8"),
503            )
504            response.raise_for_status()
505            return OrderPostResponse(**response.json())
506        except httpx.HTTPStatusError as exc:
507            msg = f"Client Error '{exc.response.status_code} {exc.response.reason_phrase}' while posting order"
508            logger.warning(msg)
509            error_json = exc.response.json()
510            print("Details:", error_json["error"])
511            return None
512
513    def create_and_post_order(
514        self,
515        order_args: OrderArgs,
516        options: PartialCreateOrderOptions | None = None,
517        order_type: OrderType = OrderType.GTC,
518    ) -> OrderPostResponse | None:
519        """Utility function to create and publish an order."""
520        order = self.create_order(order_args, options)
521        return self.post_order(order=order, order_type=order_type)
522
523    def post_orders(self, args: list[PostOrdersArgs]) -> list[OrderPostResponse] | None:
524        """Posts multiple SignedOrders at once."""
525        body = [
526            order_to_json(arg.order, self.creds.key, arg.order_type) for arg in args
527        ]
528        headers = create_level_2_headers(
529            self.signer,
530            self.creds,
531            RequestArgs(method="POST", request_path=POST_ORDERS, body=body),
532        )
533
534        try:
535            response = self.client.post(
536                self._build_url("/orders"),
537                headers=headers,
538                content=json.dumps(body).encode("utf-8"),
539            )
540            response.raise_for_status()
541            order_responses = []
542            for index, item in enumerate(response.json()):
543                resp = OrderPostResponse(**item)
544                order_responses.append(resp)
545                if resp.error_msg:
546                    msg = (
547                        f"Error posting order in position {index} \n"
548                        f"Details: {resp.error_msg}"
549                    )
550                    logger.warning(msg)
551        except httpx.HTTPStatusError as exc:
552            msg = f"Client Error '{exc.response.status_code} {exc.response.reason_phrase}' while posting order"
553            logger.warning(msg)
554            error_json = exc.response.json()
555            print("Details:", error_json["error"])
556            return None
557        else:
558            return order_responses
559
560    def create_and_post_orders(
561        self, args: list[OrderArgs], order_types: list[OrderType]
562    ) -> list[OrderPostResponse] | None:
563        """Utility function to create and publish multiple orders at once."""
564        return self.post_orders(
565            [
566                PostOrdersArgs(
567                    order=self.create_order(order_args), order_type=order_type
568                )
569                for order_args, order_type in zip(args, order_types, strict=True)
570            ],
571        )
572
573    def calculate_market_price(
574        self, token_id: str, side: str, amount: float, order_type: OrderType
575    ) -> float:
576        """Calculates the matching price considering an amount and the current orderbook."""
577        book = self.get_order_book(token_id)
578        if book is None:
579            msg = "Order book is None"
580            raise MissingOrderbookError(msg)
581        if side == "BUY":
582            if book.asks is None:
583                msg = "No ask orders available"
584                raise LiquidityError(msg)
585            return self.builder.calculate_buy_market_price(
586                book.asks,
587                amount,
588                order_type,
589            )
590        if side == "SELL":
591            if book.bids is None:
592                msg = "No bid orders available"
593                raise LiquidityError(msg)
594            return self.builder.calculate_sell_market_price(
595                book.bids,
596                amount,
597                order_type,
598            )
599        msg = 'Side must be "BUY" or "SELL"'
600        raise ValueError(msg)
601
602    def create_market_order(
603        self,
604        order_args: MarketOrderArgs,
605        options: PartialCreateOrderOptions | None = None,
606    ):
607        """Creates and signs a market order."""
608        tick_size = self.__resolve_tick_size(
609            order_args.token_id,
610            options.tick_size if options else None,
611        )
612
613        if order_args.price is None or order_args.price <= 0:
614            order_args.price = self.calculate_market_price(
615                order_args.token_id,
616                order_args.side,
617                order_args.amount,
618                order_args.order_type,
619            )
620
621        if not price_valid(order_args.price, tick_size):
622            msg = f"price ({order_args.price}), min: {tick_size} - max: {1 - float(tick_size)}"
623            raise InvalidPriceError(msg)
624
625        neg_risk = (
626            options.neg_risk
627            if options and options.neg_risk is not None
628            else self.get_neg_risk(order_args.token_id)
629        )
630
631        # fee rate
632        fee_rate_bps = self.__resolve_fee_rate(
633            order_args.token_id, order_args.fee_rate_bps
634        )
635        order_args.fee_rate_bps = fee_rate_bps
636
637        return self.builder.create_market_order(
638            order_args,
639            CreateOrderOptions(
640                tick_size=tick_size,
641                neg_risk=neg_risk,
642            ),
643        )
644
645    def create_and_post_market_order(
646        self,
647        order_args: MarketOrderArgs,
648        options: PartialCreateOrderOptions | None = None,
649        order_type: OrderType = OrderType.FOK,
650    ) -> OrderPostResponse | None:
651        """Utility function to create and publish a market order."""
652        order = self.create_market_order(order_args, options)
653        return self.post_order(order=order, order_type=order_type)
654
655    def cancel_order(self, order_id: Keccak256) -> OrderCancelResponse:
656        """Cancels an order."""
657        body = {"orderID": order_id}
658
659        request_args = RequestArgs(method="DELETE", request_path=CANCEL, body=body)
660        headers = create_level_2_headers(self.signer, self.creds, request_args)
661
662        response = self.client.request(
663            "DELETE",
664            self._build_url(CANCEL),
665            headers=headers,
666            content=json.dumps(body).encode("utf-8"),
667        )
668        response.raise_for_status()
669        return OrderCancelResponse(**response.json())
670
671    def cancel_orders(self, order_ids: list[Keccak256]) -> OrderCancelResponse:
672        """Cancels orders."""
673        body = order_ids
674
675        request_args = RequestArgs(
676            method="DELETE",
677            request_path=CANCEL_ORDERS,
678            body=body,
679        )
680        headers = create_level_2_headers(self.signer, self.creds, request_args)
681
682        response = self.client.request(
683            "DELETE",
684            self._build_url(CANCEL_ORDERS),
685            headers=headers,
686            content=json.dumps(body).encode("utf-8"),
687        )
688        response.raise_for_status()
689        return OrderCancelResponse(**response.json())
690
691    def cancel_all(self) -> OrderCancelResponse:
692        """Cancels all available orders for the user."""
693        request_args = RequestArgs(method="DELETE", request_path=CANCEL_ALL)
694        headers = create_level_2_headers(self.signer, self.creds, request_args)
695
696        response = self.client.delete(self._build_url(CANCEL_ALL), headers=headers)
697        response.raise_for_status()
698        return OrderCancelResponse(**response.json())
699
700    def is_order_scoring(self, order_id: Keccak256) -> bool:
701        """Check if the order is currently scoring."""
702        request_args = RequestArgs(method="GET", request_path=IS_ORDER_SCORING)
703        headers = create_level_2_headers(self.signer, self.creds, request_args)
704
705        response = self.client.get(
706            self._build_url(IS_ORDER_SCORING),
707            headers=headers,
708            params={"order_id": order_id},
709        )
710        response.raise_for_status()
711        return response.json()["scoring"]
712
713    def are_orders_scoring(self, order_ids: list[Keccak256]) -> dict[Keccak256, bool]:
714        """Check if the orders are currently scoring."""
715        body = order_ids
716        request_args = RequestArgs(
717            method="POST",
718            request_path=ARE_ORDERS_SCORING,
719            body=body,
720        )
721        headers = create_level_2_headers(self.signer, self.creds, request_args)
722        headers["Content-Type"] = "application/json"
723
724        response = self.client.post(
725            self._build_url(ARE_ORDERS_SCORING), headers=headers, json=body
726        )
727        response.raise_for_status()
728        return response.json()
729
730    def get_market_rewards(self, condition_id: Keccak256) -> MarketRewards:
731        """
732        Get the MarketRewards for a given market (condition_id).
733
734        - metadata, tokens, max_spread, min_size, rewards_config, market_competitiveness.
735        """
736        request_args = RequestArgs(method="GET", request_path="/rewards/markets/")
737        headers = create_level_2_headers(self.signer, self.creds, request_args)
738
739        response = self.client.get(
740            self._build_url("/rewards/markets/" + condition_id), headers=headers
741        )
742        response.raise_for_status()
743        return next(MarketRewards(**market) for market in response.json()["data"])
744
745    def get_trades(
746        self,
747        condition_id: Keccak256 | None = None,
748        token_id: str | None = None,
749        trade_id: str | None = None,
750        before: datetime | None = None,
751        after: datetime | None = None,
752        address: EthAddress | None = None,
753        next_cursor="MA==",
754    ) -> list[PolygonTrade]:
755        """Fetches the trade history for a user."""
756        params: dict[str, str | int] = {}
757        if condition_id:
758            params["market"] = condition_id
759        if token_id:
760            params["asset_id"] = token_id
761        if trade_id:
762            params["id"] = trade_id
763        if before:
764            params["before"] = int(before.replace(microsecond=0).timestamp())
765        if after:
766            params["after"] = int(after.replace(microsecond=0).timestamp())
767        if address:
768            params["maker_address"] = address
769
770        request_args = RequestArgs(method="GET", request_path=TRADES)
771        headers = create_level_2_headers(self.signer, self.creds, request_args)
772
773        results = []
774        next_cursor = next_cursor if next_cursor is not None else "MA=="
775        while next_cursor != END_CURSOR:
776            params["next_cursor"] = next_cursor
777            response = self.client.get(
778                self._build_url(TRADES), headers=headers, params=params
779            )
780            response.raise_for_status()
781            next_cursor = response.json()["next_cursor"]
782            results += [PolygonTrade(**trade) for trade in response.json()["data"]]
783
784        return results
785
786    def get_total_rewards(self, date: datetime | None = None) -> DailyEarnedReward:
787        """Get the total rewards earned on a given date (seems to only hold the 6 most recent data points)."""
788        if date is None:
789            date = datetime.now(UTC)
790        params = {
791            "authenticationType": "magic",
792            "date": f"{date.strftime('%Y-%m-%d')}",
793        }
794
795        request_args = RequestArgs(method="GET", request_path="/rewards/user/total")
796        headers = create_level_2_headers(self.signer, self.creds, request_args)
797        params["l2Headers"] = json.dumps(headers)
798
799        response = self.client.get(
800            "https://polymarket.com/api/rewards/totalEarnings", params=params
801        )
802        response.raise_for_status()
803        if response.json():
804            return DailyEarnedReward(**response.json()[0])
805        return DailyEarnedReward(
806            date=date,
807            asset_address="0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174",
808            maker_address=self.address,
809            earnings=0.0,
810            asset_rate=0.0,
811        )
812
813    def get_reward_markets(
814        self,
815        query: str | None = None,
816        sort_by: Literal[
817            "market",
818            "max_spread",
819            "min_size",
820            "rate_per_day",
821            "spread",
822            "price",
823            "earnings",
824            "earning_percentage",
825        ]
826        | None = "market",
827        sort_direction: Literal["ASC", "DESC"] | None = None,
828        show_favorites: bool = False,
829    ) -> list[RewardMarket]:
830        """
831        Search through markets that offer rewards (polymarket.com/rewards items) by query, sorted by different metrics. If query is empty, returns all markets with rewards.
832
833         - market start date ("market") - TODO confirm this
834         - max spread for rewards in usdc
835         - min size for rewards in shares
836         - reward rate per day in usdc
837         - current spread of a market
838         - current price of a market
839         - your daily earnings on a market - only need auth for these last two
840         - your current earning percentage on a market.
841        """
842        results = []
843        desc = {"ASC": False, "DESC": True}
844        params: dict[str, bool | str] = {
845            "authenticationType": "magic",
846            "showFavorites": show_favorites,
847        }
848        if sort_by:
849            params["orderBy"] = sort_by
850        if query:
851            params["query"] = query
852            params["desc"] = False
853        if sort_direction:
854            params["desc"] = desc[sort_direction]
855
856        request_args = RequestArgs(method="GET", request_path="/rewards/user/markets")
857        headers = create_level_2_headers(self.signer, self.creds, request_args)
858        params["l2Headers"] = json.dumps(headers)
859
860        next_cursor = "MA=="
861        while next_cursor != END_CURSOR:
862            params["nextCursor"] = next_cursor
863            response = self.client.get(
864                "https://polymarket.com/api/rewards/markets", params=params
865            )
866            response.raise_for_status()
867            next_cursor = response.json()["next_cursor"]
868            results += [RewardMarket(**reward) for reward in response.json()["data"]]
869
870        return results
871
872    def __enter__(self):
873        return self
874
875    def __exit__(self, exc_type, exc_val, exc_tb):
876        self.client.close()
PolymarketClobClient( private_key: str, address: Annotated[str, AfterValidator(func=<function validate_eth_address>)], creds: polymarket_apis.types.clob_types.ApiCreds | None = None, chain_id: Literal[137, 80002] = 137, signature_type: Literal[0, 1, 2] = 1)
 94    def __init__(
 95        self,
 96        private_key: str,
 97        address: EthAddress,
 98        creds: ApiCreds | None = None,
 99        chain_id: Literal[137, 80002] = POLYGON,
100        signature_type: Literal[0, 1, 2] = 1,
101        # 0 - EOA wallet, 1 - Proxy wallet, 2 - Gnosis Safe wallet
102    ):
103        self.address = address
104        self.client = httpx.Client(http2=True, timeout=30.0)
105        self.async_client = httpx.AsyncClient(http2=True, timeout=30.0)
106        self.base_url: str = "https://clob.polymarket.com"
107        self.signer = Signer(private_key=private_key, chain_id=chain_id)
108        self.builder = OrderBuilder(
109            signer=self.signer,
110            sig_type=signature_type,
111            funder=address,
112        )
113        self.creds = creds if creds else self.create_or_derive_api_creds()
114
115        # local cache
116        self.__tick_sizes: dict[str, TickSize] = {}
117        self.__neg_risk: dict[str, bool] = {}
118        self.__fee_rates: dict[str, int] = {}
address
client
async_client
base_url: str
signer
builder
creds
def get_ok(self) -> str:
123    def get_ok(self) -> str:
124        response = self.client.get(self.base_url)
125        response.raise_for_status()
126        return response.json()
def create_api_creds( self, nonce: int | None = None) -> polymarket_apis.types.clob_types.ApiCreds:
128    def create_api_creds(self, nonce: int | None = None) -> ApiCreds:
129        headers = create_level_1_headers(self.signer, nonce)
130        response = self.client.post(self._build_url(CREATE_API_KEY), headers=headers)
131        response.raise_for_status()
132        return ApiCreds(**response.json())
def derive_api_key( self, nonce: int | None = None) -> polymarket_apis.types.clob_types.ApiCreds:
134    def derive_api_key(self, nonce: int | None = None) -> ApiCreds:
135        headers = create_level_1_headers(self.signer, nonce)
136        response = self.client.get(self._build_url(DERIVE_API_KEY), headers=headers)
137        response.raise_for_status()
138        return ApiCreds(**response.json())
def create_or_derive_api_creds( self, nonce: int | None = None) -> polymarket_apis.types.clob_types.ApiCreds:
140    def create_or_derive_api_creds(self, nonce: int | None = None) -> ApiCreds:
141        try:
142            return self.create_api_creds(nonce)
143        except HTTPStatusError:
144            return self.derive_api_key(nonce)
def set_api_creds(self, creds: polymarket_apis.types.clob_types.ApiCreds) -> None:
146    def set_api_creds(self, creds: ApiCreds) -> None:
147        self.creds = creds
def get_api_keys(self) -> dict:
149    def get_api_keys(self) -> dict:
150        request_args = RequestArgs(method="GET", request_path=GET_API_KEYS)
151        headers = create_level_2_headers(self.signer, self.creds, request_args)
152        response = self.client.get(self._build_url(GET_API_KEYS), headers=headers)
153        response.raise_for_status()
154        return response.json()
def delete_api_keys(self) -> Literal['OK']:
156    def delete_api_keys(self) -> Literal["OK"]:
157        request_args = RequestArgs(method="DELETE", request_path=DELETE_API_KEY)
158        headers = create_level_2_headers(self.signer, self.creds, request_args)
159        response = self.client.delete(self._build_url(DELETE_API_KEY), headers=headers)
160        response.raise_for_status()
161        return response.json()
def get_utc_time(self) -> datetime.datetime:
163    def get_utc_time(self) -> datetime:
164        # parse server timestamp into utc datetime
165        response = self.client.get(self._build_url(TIME))
166        response.raise_for_status()
167        return datetime.fromtimestamp(response.json(), tz=UTC)
def get_tick_size(self, token_id: str) -> Literal['0.1', '0.01', '0.001', '0.0001']:
169    def get_tick_size(self, token_id: str) -> TickSize:
170        if token_id in self.__tick_sizes:
171            return self.__tick_sizes[token_id]
172
173        params = {"token_id": token_id}
174        response = self.client.get(self._build_url(GET_TICK_SIZE), params=params)
175        response.raise_for_status()
176        self.__tick_sizes[token_id] = cast(
177            "TickSize", str(response.json()["minimum_tick_size"])
178        )
179
180        return self.__tick_sizes[token_id]
def get_neg_risk(self, token_id: str) -> bool:
182    def get_neg_risk(self, token_id: str) -> bool:
183        if token_id in self.__neg_risk:
184            return self.__neg_risk[token_id]
185
186        params = {"token_id": token_id}
187        response = self.client.get(self._build_url(GET_NEG_RISK), params=params)
188        response.raise_for_status()
189        self.__neg_risk[token_id] = response.json()["neg_risk"]
190
191        return self.__neg_risk[token_id]
def get_fee_rate_bps(self, token_id: str) -> int:
193    def get_fee_rate_bps(self, token_id: str) -> int:
194        if token_id in self.__fee_rates:
195            return self.__fee_rates[token_id]
196
197        params = {"token_id": token_id}
198        response = self.client.get(self._build_url(GET_FEE_RATE), params=params)
199        response.raise_for_status()
200        fee_rate = response.json().get("base_fee") or 0
201        self.__fee_rates[token_id] = fee_rate
202
203        return fee_rate
def get_midpoint(self, token_id: str) -> polymarket_apis.types.clob_types.Midpoint:
237    def get_midpoint(self, token_id: str) -> Midpoint:
238        """Get the mid-market price for the given token."""
239        params = {"token_id": token_id}
240        response = self.client.get(self._build_url(MID_POINT), params=params)
241        response.raise_for_status()
242        return Midpoint(token_id=token_id, value=float(response.json()["mid"]))

Get the mid-market price for the given token.

def get_midpoints(self, token_ids: list[str]) -> dict:
244    def get_midpoints(self, token_ids: list[str]) -> dict:
245        """Get the mid-market prices for a set of tokens."""
246        data = [{"token_id": token_id} for token_id in token_ids]
247        response = self.client.post(self._build_url(MID_POINTS), json=data)
248        response.raise_for_status()
249        return TokenValueDict(**response.json()).root

Get the mid-market prices for a set of tokens.

def get_spread(self, token_id: str) -> polymarket_apis.types.clob_types.Spread:
251    def get_spread(self, token_id: str) -> Spread:
252        """Get the spread for the given token."""
253        params = {"token_id": token_id}
254        response = self.client.get(self._build_url(GET_SPREAD), params=params)
255        response.raise_for_status()
256        return Spread(token_id=token_id, value=float(response.json()["mid"]))

Get the spread for the given token.

def get_spreads(self, token_ids: list[str]) -> dict:
258    def get_spreads(self, token_ids: list[str]) -> dict:
259        """Get the spreads for a set of tokens."""
260        data = [{"token_id": token_id} for token_id in token_ids]
261        response = self.client.post(self._build_url(GET_SPREADS), json=data)
262        response.raise_for_status()
263        return TokenValueDict(**response.json()).root

Get the spreads for a set of tokens.

def get_price( self, token_id: str, side: Literal['BUY', 'SELL']) -> polymarket_apis.types.clob_types.Price:
265    def get_price(self, token_id: str, side: Literal["BUY", "SELL"]) -> Price:
266        """Get the market price for the given token and side."""
267        params = {"token_id": token_id, "side": side}
268        response = self.client.get(self._build_url(PRICE), params=params)
269        response.raise_for_status()
270        return Price(**response.json(), token_id=token_id, side=side)

Get the market price for the given token and side.

def get_prices( self, params: list[polymarket_apis.types.clob_types.BookParams]) -> dict[str, polymarket_apis.types.clob_types.BidAsk]:
272    def get_prices(self, params: list[BookParams]) -> dict[str, BidAsk]:
273        """Get the market prices for a set of tokens and sides."""
274        data = [{"token_id": param.token_id, "side": param.side} for param in params]
275        response = self.client.post(self._build_url(GET_PRICES), json=data)
276        response.raise_for_status()
277        return TokenBidAskDict(**response.json()).root

Get the market prices for a set of tokens and sides.

def get_last_trade_price(self, token_id) -> polymarket_apis.types.clob_types.Price:
279    def get_last_trade_price(self, token_id) -> Price:
280        """Fetches the last trade price for a token_id."""
281        params = {"token_id": token_id}
282        response = self.client.get(self._build_url(GET_LAST_TRADE_PRICE), params=params)
283        response.raise_for_status()
284        return Price(**response.json(), token_id=token_id)

Fetches the last trade price for a token_id.

def get_last_trades_prices( self, token_ids: list[str]) -> list[polymarket_apis.types.clob_types.Price]:
286    def get_last_trades_prices(self, token_ids: list[str]) -> list[Price]:
287        """Fetches the last trades prices for a set of token ids."""
288        body = [{"token_id": token_id} for token_id in token_ids]
289        response = self.client.post(self._build_url(GET_LAST_TRADES_PRICES), json=body)
290        response.raise_for_status()
291        return [Price(**price) for price in response.json()]

Fetches the last trades prices for a set of token ids.

def get_order_book(self, token_id) -> polymarket_apis.types.clob_types.OrderBookSummary:
293    def get_order_book(self, token_id) -> OrderBookSummary:
294        """Get the orderbook for the given token."""
295        params = {"token_id": token_id}
296        response = self.client.get(self._build_url(GET_ORDER_BOOK), params=params)
297        response.raise_for_status()
298        return OrderBookSummary(**response.json())

Get the orderbook for the given token.

def get_order_books( self, token_ids: list[str]) -> list[polymarket_apis.types.clob_types.OrderBookSummary]:
300    def get_order_books(self, token_ids: list[str]) -> list[OrderBookSummary]:
301        """Get the orderbook for a set of tokens."""
302        body = [{"token_id": token_id} for token_id in token_ids]
303        response = self.client.post(self._build_url(GET_ORDER_BOOKS), json=body)
304        response.raise_for_status()
305        return [OrderBookSummary(**obs) for obs in response.json()]

Get the orderbook for a set of tokens.

async def get_order_books_async( self, token_ids: list[str]) -> list[polymarket_apis.types.clob_types.OrderBookSummary]:
307    async def get_order_books_async(
308        self, token_ids: list[str]
309    ) -> list[OrderBookSummary]:
310        """Get the orderbook for a set of tokens asynchronously."""
311        body = [{"token_id": token_id} for token_id in token_ids]
312        response = await self.async_client.post(
313            self._build_url(GET_ORDER_BOOKS), json=body
314        )
315        response.raise_for_status()
316        return [OrderBookSummary(**obs) for obs in response.json()]

Get the orderbook for a set of tokens asynchronously.

def get_market(self, condition_id) -> polymarket_apis.types.clob_types.ClobMarket:
318    def get_market(self, condition_id) -> ClobMarket:
319        """Get a ClobMarket by condition_id."""
320        response = self.client.get(self._build_url(GET_MARKET + condition_id))
321        response.raise_for_status()
322        return ClobMarket(**response.json())

Get a ClobMarket by condition_id.

def get_markets( self, next_cursor='MA==') -> polymarket_apis.types.clob_types.PaginatedResponse[ClobMarket]:
324    def get_markets(self, next_cursor="MA==") -> PaginatedResponse[ClobMarket]:
325        """Get paginated ClobMarkets."""
326        params = {"next_cursor": next_cursor}
327        response = self.client.get(self._build_url(GET_MARKETS), params=params)
328        response.raise_for_status()
329        return PaginatedResponse[ClobMarket](**response.json())

Get paginated ClobMarkets.

def get_all_markets( self, next_cursor='MA==') -> list[polymarket_apis.types.clob_types.ClobMarket]:
331    def get_all_markets(self, next_cursor="MA==") -> list[ClobMarket]:
332        """Recursively fetch all ClobMarkets using pagination."""
333        # Base case: Stop recursion if next_cursor indicates the last page
334        if next_cursor == "LTE=":
335            print("Reached the last page of markets.")
336            return []
337
338        # Fetch current page of markets
339        paginated_response = self.get_markets(next_cursor=next_cursor)
340
341        # Collect current page data
342        current_markets = paginated_response.data
343
344        # Recursively fetch remaining pages
345        next_page_markets = self.get_all_markets(
346            next_cursor=paginated_response.next_cursor,
347        )
348
349        # Combine current page data with data from subsequent pages
350        return current_markets + next_page_markets

Recursively fetch all ClobMarkets using pagination.

def get_recent_history( self, token_id: str, interval: Literal['1h', '6h', '1d', '1w', '1m', 'max'] = '1d', fidelity: int = 1) -> polymarket_apis.types.clob_types.PriceHistory:
352    def get_recent_history(
353        self,
354        token_id: str,
355        interval: Literal["1h", "6h", "1d", "1w", "1m", "max"] = "1d",
356        fidelity: int = 1,  # resolution in minutes
357    ) -> PriceHistory:
358        """Get the recent price history of a token (up to now) - 1h, 6h, 1d, 1w, 1m."""
359        min_fidelities: dict[str, int] = {
360            "1h": 1,
361            "6h": 1,
362            "1d": 1,
363            "1w": 5,
364            "1m": 10,
365            "max": 2,
366        }
367
368        if fidelity < min_fidelities[interval]:
369            msg = f"invalid filters: minimum fidelity' for '{interval}' range is {min_fidelities.get(interval)}"
370            raise ValueError(msg)
371
372        params: dict[str, int | str] = {
373            "market": token_id,
374            "interval": interval,
375            "fidelity": fidelity,
376        }
377        response = self.client.get(self._build_url("/prices-history"), params=params)
378        response.raise_for_status()
379        return PriceHistory(**response.json(), token_id=token_id)

Get the recent price history of a token (up to now) - 1h, 6h, 1d, 1w, 1m.

def get_history( self, token_id: str, start_time: datetime.datetime | None = None, end_time: datetime.datetime | None = None, fidelity: int = 2) -> polymarket_apis.types.clob_types.PriceHistory:
381    def get_history(
382        self,
383        token_id: str,
384        start_time: datetime | None = None,
385        end_time: datetime | None = None,
386        fidelity: int = 2,  # resolution in minutes
387    ) -> PriceHistory:
388        """Get the price history of a token between a selected date range of max 15 days or from start_time to now."""
389        if start_time is None and end_time is None:
390            msg = "At least 'start_time' or ('start_time' and 'end_time') must be provided"
391            raise ValueError(msg)
392
393        if (
394            start_time
395            and end_time
396            and start_time + timedelta(days=15, seconds=1) < end_time
397        ):
398            msg = "'start_time' - 'end_time' range cannot exceed 15 days. Remove 'end_time' to get prices up to now or set a shorter range."
399            raise ValueError(msg)
400
401        params: dict[str, int | str] = {
402            "market": token_id,
403            "fidelity": fidelity,
404        }
405        if start_time:
406            params["startTs"] = int(start_time.timestamp())
407        if end_time:
408            params["endTs"] = int(end_time.timestamp())
409
410        response = self.client.get(self._build_url("/prices-history"), params=params)
411        response.raise_for_status()
412        return PriceHistory(**response.json(), token_id=token_id)

Get the price history of a token between a selected date range of max 15 days or from start_time to now.

def get_all_history(self, token_id: str) -> polymarket_apis.types.clob_types.PriceHistory:
414    def get_all_history(self, token_id: str) -> PriceHistory:
415        """Get the full price history of a token."""
416        return self.get_history(
417            token_id=token_id,
418            start_time=datetime(2020, 1, 1, tzinfo=UTC),
419        )

Get the full price history of a token.

def get_orders( self, order_id: str | None = None, condition_id: Optional[Annotated[str, AfterValidator(func=<function validate_keccak256>)]] = None, token_id: str | None = None, next_cursor: str = 'MA==') -> list[polymarket_apis.types.clob_types.OpenOrder]:
421    def get_orders(
422        self,
423        order_id: str | None = None,
424        condition_id: Keccak256 | None = None,
425        token_id: str | None = None,
426        next_cursor: str = "MA==",
427    ) -> list[OpenOrder]:
428        """Gets your active orders, filtered by order_id, condition_id, token_id."""
429        params = {}
430        if order_id:
431            params["id"] = order_id
432        if condition_id:
433            params["market"] = condition_id
434        if token_id:
435            params["asset_id"] = token_id
436
437        request_args = RequestArgs(method="GET", request_path=ORDERS)
438        headers = create_level_2_headers(self.signer, self.creds, request_args)
439
440        results = []
441        next_cursor = next_cursor if next_cursor is not None else "MA=="
442        while next_cursor != END_CURSOR:
443            params["next_cursor"] = next_cursor
444            response = self.client.get(
445                self._build_url(ORDERS), headers=headers, params=params
446            )
447            response.raise_for_status()
448            next_cursor = response.json()["next_cursor"]
449            results += [OpenOrder(**order) for order in response.json()["data"]]
450
451        return results

Gets your active orders, filtered by order_id, condition_id, token_id.

def create_order( self, order_args: OrderArgs, options: polymarket_apis.types.clob_types.PartialCreateOrderOptions | None = None) -> py_order_utils.model.order.SignedOrder:
453    def create_order(
454        self, order_args: OrderArgs, options: PartialCreateOrderOptions | None = None
455    ) -> SignedOrder:
456        """Creates and signs an order."""
457        # add resolve_order_options, or similar
458        tick_size = self.__resolve_tick_size(
459            order_args.token_id,
460            options.tick_size if options else None,
461        )
462
463        if not price_valid(order_args.price, tick_size):
464            msg = f"price ({order_args.price}), min: {tick_size} - max: {1 - float(tick_size)}"
465            raise InvalidPriceError(msg)
466
467        neg_risk = (
468            options.neg_risk
469            if options and options.neg_risk is not None
470            else self.get_neg_risk(order_args.token_id)
471        )
472
473        # fee rate
474        fee_rate_bps = self.__resolve_fee_rate(
475            order_args.token_id, order_args.fee_rate_bps
476        )
477        order_args.fee_rate_bps = fee_rate_bps
478
479        return self.builder.create_order(
480            order_args,
481            CreateOrderOptions(
482                tick_size=tick_size,
483                neg_risk=neg_risk,
484            ),
485        )

Creates and signs an order.

def post_order( self, order: py_order_utils.model.order.SignedOrder, order_type: OrderType = <OrderType.GTC: 'GTC'>) -> polymarket_apis.types.clob_types.OrderPostResponse | None:
487    def post_order(
488        self, order: SignedOrder, order_type: OrderType = OrderType.GTC
489    ) -> OrderPostResponse | None:
490        """Posts a SignedOrder."""
491        body = order_to_json(order, self.creds.key, order_type)
492        headers = create_level_2_headers(
493            self.signer,
494            self.creds,
495            RequestArgs(method="POST", request_path=POST_ORDER, body=body),
496        )
497
498        try:
499            response = self.client.post(
500                self._build_url("/order"),
501                headers=headers,
502                content=json.dumps(body).encode("utf-8"),
503            )
504            response.raise_for_status()
505            return OrderPostResponse(**response.json())
506        except httpx.HTTPStatusError as exc:
507            msg = f"Client Error '{exc.response.status_code} {exc.response.reason_phrase}' while posting order"
508            logger.warning(msg)
509            error_json = exc.response.json()
510            print("Details:", error_json["error"])
511            return None

Posts a SignedOrder.

def create_and_post_order( self, order_args: OrderArgs, options: polymarket_apis.types.clob_types.PartialCreateOrderOptions | None = None, order_type: OrderType = <OrderType.GTC: 'GTC'>) -> polymarket_apis.types.clob_types.OrderPostResponse | None:
513    def create_and_post_order(
514        self,
515        order_args: OrderArgs,
516        options: PartialCreateOrderOptions | None = None,
517        order_type: OrderType = OrderType.GTC,
518    ) -> OrderPostResponse | None:
519        """Utility function to create and publish an order."""
520        order = self.create_order(order_args, options)
521        return self.post_order(order=order, order_type=order_type)

Utility function to create and publish an order.

def post_orders( self, args: list[polymarket_apis.types.clob_types.PostOrdersArgs]) -> list[polymarket_apis.types.clob_types.OrderPostResponse] | None:
523    def post_orders(self, args: list[PostOrdersArgs]) -> list[OrderPostResponse] | None:
524        """Posts multiple SignedOrders at once."""
525        body = [
526            order_to_json(arg.order, self.creds.key, arg.order_type) for arg in args
527        ]
528        headers = create_level_2_headers(
529            self.signer,
530            self.creds,
531            RequestArgs(method="POST", request_path=POST_ORDERS, body=body),
532        )
533
534        try:
535            response = self.client.post(
536                self._build_url("/orders"),
537                headers=headers,
538                content=json.dumps(body).encode("utf-8"),
539            )
540            response.raise_for_status()
541            order_responses = []
542            for index, item in enumerate(response.json()):
543                resp = OrderPostResponse(**item)
544                order_responses.append(resp)
545                if resp.error_msg:
546                    msg = (
547                        f"Error posting order in position {index} \n"
548                        f"Details: {resp.error_msg}"
549                    )
550                    logger.warning(msg)
551        except httpx.HTTPStatusError as exc:
552            msg = f"Client Error '{exc.response.status_code} {exc.response.reason_phrase}' while posting order"
553            logger.warning(msg)
554            error_json = exc.response.json()
555            print("Details:", error_json["error"])
556            return None
557        else:
558            return order_responses

Posts multiple SignedOrders at once.

def create_and_post_orders( self, args: list[OrderArgs], order_types: list[OrderType]) -> list[polymarket_apis.types.clob_types.OrderPostResponse] | None:
560    def create_and_post_orders(
561        self, args: list[OrderArgs], order_types: list[OrderType]
562    ) -> list[OrderPostResponse] | None:
563        """Utility function to create and publish multiple orders at once."""
564        return self.post_orders(
565            [
566                PostOrdersArgs(
567                    order=self.create_order(order_args), order_type=order_type
568                )
569                for order_args, order_type in zip(args, order_types, strict=True)
570            ],
571        )

Utility function to create and publish multiple orders at once.

def calculate_market_price( self, token_id: str, side: str, amount: float, order_type: OrderType) -> float:
573    def calculate_market_price(
574        self, token_id: str, side: str, amount: float, order_type: OrderType
575    ) -> float:
576        """Calculates the matching price considering an amount and the current orderbook."""
577        book = self.get_order_book(token_id)
578        if book is None:
579            msg = "Order book is None"
580            raise MissingOrderbookError(msg)
581        if side == "BUY":
582            if book.asks is None:
583                msg = "No ask orders available"
584                raise LiquidityError(msg)
585            return self.builder.calculate_buy_market_price(
586                book.asks,
587                amount,
588                order_type,
589            )
590        if side == "SELL":
591            if book.bids is None:
592                msg = "No bid orders available"
593                raise LiquidityError(msg)
594            return self.builder.calculate_sell_market_price(
595                book.bids,
596                amount,
597                order_type,
598            )
599        msg = 'Side must be "BUY" or "SELL"'
600        raise ValueError(msg)

Calculates the matching price considering an amount and the current orderbook.

def create_market_order( self, order_args: MarketOrderArgs, options: polymarket_apis.types.clob_types.PartialCreateOrderOptions | None = None):
602    def create_market_order(
603        self,
604        order_args: MarketOrderArgs,
605        options: PartialCreateOrderOptions | None = None,
606    ):
607        """Creates and signs a market order."""
608        tick_size = self.__resolve_tick_size(
609            order_args.token_id,
610            options.tick_size if options else None,
611        )
612
613        if order_args.price is None or order_args.price <= 0:
614            order_args.price = self.calculate_market_price(
615                order_args.token_id,
616                order_args.side,
617                order_args.amount,
618                order_args.order_type,
619            )
620
621        if not price_valid(order_args.price, tick_size):
622            msg = f"price ({order_args.price}), min: {tick_size} - max: {1 - float(tick_size)}"
623            raise InvalidPriceError(msg)
624
625        neg_risk = (
626            options.neg_risk
627            if options and options.neg_risk is not None
628            else self.get_neg_risk(order_args.token_id)
629        )
630
631        # fee rate
632        fee_rate_bps = self.__resolve_fee_rate(
633            order_args.token_id, order_args.fee_rate_bps
634        )
635        order_args.fee_rate_bps = fee_rate_bps
636
637        return self.builder.create_market_order(
638            order_args,
639            CreateOrderOptions(
640                tick_size=tick_size,
641                neg_risk=neg_risk,
642            ),
643        )

Creates and signs a market order.

def create_and_post_market_order( self, order_args: MarketOrderArgs, options: polymarket_apis.types.clob_types.PartialCreateOrderOptions | None = None, order_type: OrderType = <OrderType.FOK: 'FOK'>) -> polymarket_apis.types.clob_types.OrderPostResponse | None:
645    def create_and_post_market_order(
646        self,
647        order_args: MarketOrderArgs,
648        options: PartialCreateOrderOptions | None = None,
649        order_type: OrderType = OrderType.FOK,
650    ) -> OrderPostResponse | None:
651        """Utility function to create and publish a market order."""
652        order = self.create_market_order(order_args, options)
653        return self.post_order(order=order, order_type=order_type)

Utility function to create and publish a market order.

def cancel_order( self, order_id: Annotated[str, AfterValidator(func=<function validate_keccak256>)]) -> polymarket_apis.types.clob_types.OrderCancelResponse:
655    def cancel_order(self, order_id: Keccak256) -> OrderCancelResponse:
656        """Cancels an order."""
657        body = {"orderID": order_id}
658
659        request_args = RequestArgs(method="DELETE", request_path=CANCEL, body=body)
660        headers = create_level_2_headers(self.signer, self.creds, request_args)
661
662        response = self.client.request(
663            "DELETE",
664            self._build_url(CANCEL),
665            headers=headers,
666            content=json.dumps(body).encode("utf-8"),
667        )
668        response.raise_for_status()
669        return OrderCancelResponse(**response.json())

Cancels an order.

def cancel_orders( self, order_ids: list[typing.Annotated[str, AfterValidator(func=<function validate_keccak256>)]]) -> polymarket_apis.types.clob_types.OrderCancelResponse:
671    def cancel_orders(self, order_ids: list[Keccak256]) -> OrderCancelResponse:
672        """Cancels orders."""
673        body = order_ids
674
675        request_args = RequestArgs(
676            method="DELETE",
677            request_path=CANCEL_ORDERS,
678            body=body,
679        )
680        headers = create_level_2_headers(self.signer, self.creds, request_args)
681
682        response = self.client.request(
683            "DELETE",
684            self._build_url(CANCEL_ORDERS),
685            headers=headers,
686            content=json.dumps(body).encode("utf-8"),
687        )
688        response.raise_for_status()
689        return OrderCancelResponse(**response.json())

Cancels orders.

def cancel_all(self) -> polymarket_apis.types.clob_types.OrderCancelResponse:
691    def cancel_all(self) -> OrderCancelResponse:
692        """Cancels all available orders for the user."""
693        request_args = RequestArgs(method="DELETE", request_path=CANCEL_ALL)
694        headers = create_level_2_headers(self.signer, self.creds, request_args)
695
696        response = self.client.delete(self._build_url(CANCEL_ALL), headers=headers)
697        response.raise_for_status()
698        return OrderCancelResponse(**response.json())

Cancels all available orders for the user.

def is_order_scoring( self, order_id: Annotated[str, AfterValidator(func=<function validate_keccak256>)]) -> bool:
700    def is_order_scoring(self, order_id: Keccak256) -> bool:
701        """Check if the order is currently scoring."""
702        request_args = RequestArgs(method="GET", request_path=IS_ORDER_SCORING)
703        headers = create_level_2_headers(self.signer, self.creds, request_args)
704
705        response = self.client.get(
706            self._build_url(IS_ORDER_SCORING),
707            headers=headers,
708            params={"order_id": order_id},
709        )
710        response.raise_for_status()
711        return response.json()["scoring"]

Check if the order is currently scoring.

def are_orders_scoring( self, order_ids: list[typing.Annotated[str, AfterValidator(func=<function validate_keccak256>)]]) -> dict[typing.Annotated[str, AfterValidator(func=<function validate_keccak256 at 0x109abe480>)], bool]:
713    def are_orders_scoring(self, order_ids: list[Keccak256]) -> dict[Keccak256, bool]:
714        """Check if the orders are currently scoring."""
715        body = order_ids
716        request_args = RequestArgs(
717            method="POST",
718            request_path=ARE_ORDERS_SCORING,
719            body=body,
720        )
721        headers = create_level_2_headers(self.signer, self.creds, request_args)
722        headers["Content-Type"] = "application/json"
723
724        response = self.client.post(
725            self._build_url(ARE_ORDERS_SCORING), headers=headers, json=body
726        )
727        response.raise_for_status()
728        return response.json()

Check if the orders are currently scoring.

def get_market_rewards( self, condition_id: Annotated[str, AfterValidator(func=<function validate_keccak256>)]) -> polymarket_apis.types.clob_types.MarketRewards:
730    def get_market_rewards(self, condition_id: Keccak256) -> MarketRewards:
731        """
732        Get the MarketRewards for a given market (condition_id).
733
734        - metadata, tokens, max_spread, min_size, rewards_config, market_competitiveness.
735        """
736        request_args = RequestArgs(method="GET", request_path="/rewards/markets/")
737        headers = create_level_2_headers(self.signer, self.creds, request_args)
738
739        response = self.client.get(
740            self._build_url("/rewards/markets/" + condition_id), headers=headers
741        )
742        response.raise_for_status()
743        return next(MarketRewards(**market) for market in response.json()["data"])

Get the MarketRewards for a given market (condition_id).

  • metadata, tokens, max_spread, min_size, rewards_config, market_competitiveness.
def get_trades( self, condition_id: Optional[Annotated[str, AfterValidator(func=<function validate_keccak256>)]] = None, token_id: str | None = None, trade_id: str | None = None, before: datetime.datetime | None = None, after: datetime.datetime | None = None, address: Optional[Annotated[str, AfterValidator(func=<function validate_eth_address>)]] = None, next_cursor='MA==') -> list[polymarket_apis.types.clob_types.PolygonTrade]:
745    def get_trades(
746        self,
747        condition_id: Keccak256 | None = None,
748        token_id: str | None = None,
749        trade_id: str | None = None,
750        before: datetime | None = None,
751        after: datetime | None = None,
752        address: EthAddress | None = None,
753        next_cursor="MA==",
754    ) -> list[PolygonTrade]:
755        """Fetches the trade history for a user."""
756        params: dict[str, str | int] = {}
757        if condition_id:
758            params["market"] = condition_id
759        if token_id:
760            params["asset_id"] = token_id
761        if trade_id:
762            params["id"] = trade_id
763        if before:
764            params["before"] = int(before.replace(microsecond=0).timestamp())
765        if after:
766            params["after"] = int(after.replace(microsecond=0).timestamp())
767        if address:
768            params["maker_address"] = address
769
770        request_args = RequestArgs(method="GET", request_path=TRADES)
771        headers = create_level_2_headers(self.signer, self.creds, request_args)
772
773        results = []
774        next_cursor = next_cursor if next_cursor is not None else "MA=="
775        while next_cursor != END_CURSOR:
776            params["next_cursor"] = next_cursor
777            response = self.client.get(
778                self._build_url(TRADES), headers=headers, params=params
779            )
780            response.raise_for_status()
781            next_cursor = response.json()["next_cursor"]
782            results += [PolygonTrade(**trade) for trade in response.json()["data"]]
783
784        return results

Fetches the trade history for a user.

def get_total_rewards( self, date: datetime.datetime | None = None) -> polymarket_apis.types.clob_types.DailyEarnedReward:
786    def get_total_rewards(self, date: datetime | None = None) -> DailyEarnedReward:
787        """Get the total rewards earned on a given date (seems to only hold the 6 most recent data points)."""
788        if date is None:
789            date = datetime.now(UTC)
790        params = {
791            "authenticationType": "magic",
792            "date": f"{date.strftime('%Y-%m-%d')}",
793        }
794
795        request_args = RequestArgs(method="GET", request_path="/rewards/user/total")
796        headers = create_level_2_headers(self.signer, self.creds, request_args)
797        params["l2Headers"] = json.dumps(headers)
798
799        response = self.client.get(
800            "https://polymarket.com/api/rewards/totalEarnings", params=params
801        )
802        response.raise_for_status()
803        if response.json():
804            return DailyEarnedReward(**response.json()[0])
805        return DailyEarnedReward(
806            date=date,
807            asset_address="0x2791Bca1f2de4661ED88A30C99A7a9449Aa84174",
808            maker_address=self.address,
809            earnings=0.0,
810            asset_rate=0.0,
811        )

Get the total rewards earned on a given date (seems to only hold the 6 most recent data points).

def get_reward_markets( self, query: str | None = None, sort_by: Optional[Literal['market', 'max_spread', 'min_size', 'rate_per_day', 'spread', 'price', 'earnings', 'earning_percentage']] = 'market', sort_direction: Optional[Literal['ASC', 'DESC']] = None, show_favorites: bool = False) -> list[polymarket_apis.types.clob_types.RewardMarket]:
813    def get_reward_markets(
814        self,
815        query: str | None = None,
816        sort_by: Literal[
817            "market",
818            "max_spread",
819            "min_size",
820            "rate_per_day",
821            "spread",
822            "price",
823            "earnings",
824            "earning_percentage",
825        ]
826        | None = "market",
827        sort_direction: Literal["ASC", "DESC"] | None = None,
828        show_favorites: bool = False,
829    ) -> list[RewardMarket]:
830        """
831        Search through markets that offer rewards (polymarket.com/rewards items) by query, sorted by different metrics. If query is empty, returns all markets with rewards.
832
833         - market start date ("market") - TODO confirm this
834         - max spread for rewards in usdc
835         - min size for rewards in shares
836         - reward rate per day in usdc
837         - current spread of a market
838         - current price of a market
839         - your daily earnings on a market - only need auth for these last two
840         - your current earning percentage on a market.
841        """
842        results = []
843        desc = {"ASC": False, "DESC": True}
844        params: dict[str, bool | str] = {
845            "authenticationType": "magic",
846            "showFavorites": show_favorites,
847        }
848        if sort_by:
849            params["orderBy"] = sort_by
850        if query:
851            params["query"] = query
852            params["desc"] = False
853        if sort_direction:
854            params["desc"] = desc[sort_direction]
855
856        request_args = RequestArgs(method="GET", request_path="/rewards/user/markets")
857        headers = create_level_2_headers(self.signer, self.creds, request_args)
858        params["l2Headers"] = json.dumps(headers)
859
860        next_cursor = "MA=="
861        while next_cursor != END_CURSOR:
862            params["nextCursor"] = next_cursor
863            response = self.client.get(
864                "https://polymarket.com/api/rewards/markets", params=params
865            )
866            response.raise_for_status()
867            next_cursor = response.json()["next_cursor"]
868            results += [RewardMarket(**reward) for reward in response.json()["data"]]
869
870        return results

Search through markets that offer rewards (polymarket.com/rewards items) by query, sorted by different metrics. If query is empty, returns all markets with rewards.

  • market start date ("market") - TODO confirm this
  • max spread for rewards in usdc
  • min size for rewards in shares
  • reward rate per day in usdc
  • current spread of a market
  • current price of a market
  • your daily earnings on a market - only need auth for these last two
  • your current earning percentage on a market.
class PolymarketDataClient:
 24class PolymarketDataClient:
 25    def __init__(self, base_url: str = "https://data-api.polymarket.com"):
 26        self.base_url = base_url
 27        self.client = httpx.Client(http2=True, timeout=30.0)
 28        self.gql_positions_client = PolymarketGraphQLClient(
 29            endpoint_name="positions_subgraph"
 30        )
 31
 32    def _build_url(self, endpoint: str) -> str:
 33        return urljoin(self.base_url, endpoint)
 34
 35    def get_ok(self) -> str:
 36        response = self.client.get(self.base_url)
 37        response.raise_for_status()
 38        return response.json()["data"]
 39
 40    def get_all_positions(
 41        self,
 42        user: EthAddress,
 43        size_threshold: float = 0.0,
 44    ):
 45        # data-api /positions endpoint does not support fetching all positions without filters
 46        # a workaround is to use the GraphQL positions subgraph directly
 47        query = f"""query {{
 48                  userBalances(where: {{
 49                  user: "{user.lower()}",
 50                  balance_gt: "{int(size_threshold * 10**6)}"
 51                  }}) {{
 52                    user
 53                    asset {{
 54                      id
 55                      condition {{
 56                        id
 57                      }}
 58                      complement
 59                      outcomeIndex
 60                    }}
 61                    balance
 62                  }}
 63                }}
 64                """
 65
 66        response = self.gql_positions_client.query(query)
 67        return [GQLPosition(**pos) for pos in response["userBalances"]]
 68
 69    def get_positions(
 70        self,
 71        user: EthAddress,
 72        condition_id: Optional[
 73            Union[str, list[str]]
 74        ] = None,  # mutually exclusive with event_id
 75        event_id: Optional[
 76            Union[int, list[int]]
 77        ] = None,  # mutually exclusive with condition_id
 78        size_threshold: float = 1.0,
 79        redeemable: bool = False,
 80        mergeable: bool = False,
 81        title: Optional[str] = None,
 82        limit: int = 100,
 83        offset: int = 0,
 84        sort_by: Literal[
 85            "TOKENS",
 86            "CURRENT",
 87            "INITIAL",
 88            "CASHPNL",
 89            "PERCENTPNL",
 90            "TITLE",
 91            "RESOLVING",
 92            "PRICE",
 93            "AVGPRICE",
 94        ] = "TOKENS",
 95        sort_direction: Literal["ASC", "DESC"] = "DESC",
 96    ) -> list[Position]:
 97        params: dict[str, str | list[str] | int | float] = {
 98            "user": user,
 99            "sizeThreshold": size_threshold,
100            "limit": min(limit, 500),
101            "offset": offset,
102        }
103        if isinstance(condition_id, str):
104            params["market"] = condition_id
105        if isinstance(condition_id, list):
106            params["market"] = ",".join(condition_id)
107        if isinstance(event_id, str):
108            params["eventId"] = event_id
109        if isinstance(event_id, list):
110            params["eventId"] = [str(i) for i in event_id]
111        if redeemable is not None:
112            params["redeemable"] = redeemable
113        if mergeable is not None:
114            params["mergeable"] = mergeable
115        if title:
116            params["title"] = title
117        if sort_by:
118            params["sortBy"] = sort_by
119        if sort_direction:
120            params["sortDirection"] = sort_direction
121
122        response = self.client.get(self._build_url("/positions"), params=params)
123        response.raise_for_status()
124        return [Position(**pos) for pos in response.json()]
125
126    def get_trades(
127        self,
128        limit: int = 100,
129        offset: int = 0,
130        taker_only: bool = True,
131        filter_type: Optional[Literal["CASH", "TOKENS"]] = None,
132        filter_amount: Optional[
133            float
134        ] = None,  # must be provided together with filter_type
135        condition_id: Optional[str | list[str]] = None,
136        event_id: Optional[int | list[int]] = None,
137        user: Optional[str] = None,
138        side: Optional[Literal["BUY", "SELL"]] = None,
139    ) -> list[Trade]:
140        params: dict[str, int | bool | float | str | list[str]] = {
141            "limit": min(limit, 500),
142            "offset": offset,
143            "takerOnly": taker_only,
144        }
145        if filter_type:
146            params["filterType"] = filter_type
147        if filter_amount:
148            params["filterAmount"] = filter_amount
149        if isinstance(condition_id, str):
150            params["market"] = condition_id
151        if isinstance(condition_id, list):
152            params["market"] = ",".join(condition_id)
153        if isinstance(event_id, str):
154            params["eventId"] = event_id
155        if isinstance(event_id, list):
156            params["eventId"] = [str(i) for i in event_id]
157        if user:
158            params["user"] = user
159        if side:
160            params["side"] = side
161
162        response = self.client.get(self._build_url("/trades"), params=params)
163        response.raise_for_status()
164        return [Trade(**trade) for trade in response.json()]
165
166    def get_activity(
167        self,
168        user: EthAddress,
169        limit: int = 100,
170        offset: int = 0,
171        condition_id: Optional[Union[str, list[str]]] = None,
172        event_id: Optional[Union[int, list[int]]] = None,
173        type: Optional[
174            Union[
175                Literal["TRADE", "SPLIT", "MERGE", "REDEEM", "REWARD", "CONVERSION"],
176                list[
177                    Literal["TRADE", "SPLIT", "MERGE", "REDEEM", "REWARD", "CONVERSION"]
178                ],
179            ]
180        ] = None,
181        start: Optional[datetime] = None,
182        end: Optional[datetime] = None,
183        side: Optional[Literal["BUY", "SELL"]] = None,
184        sort_by: Literal["TIMESTAMP", "TOKENS", "CASH"] = "TIMESTAMP",
185        sort_direction: Literal["ASC", "DESC"] = "DESC",
186    ) -> list[Activity]:
187        params: dict[str, str | list[str] | int] = {
188            "user": user,
189            "limit": min(limit, 500),
190            "offset": offset,
191        }
192        if isinstance(condition_id, str):
193            params["market"] = condition_id
194        if isinstance(condition_id, list):
195            params["market"] = ",".join(condition_id)
196        if isinstance(event_id, str):
197            params["eventId"] = event_id
198        if isinstance(event_id, list):
199            params["eventId"] = [str(i) for i in event_id]
200        if isinstance(type, str):
201            params["type"] = type
202        if isinstance(type, list):
203            params["type"] = ",".join(type)
204        if start:
205            params["start"] = int(start.timestamp())
206        if end:
207            params["end"] = int(end.timestamp())
208        if side:
209            params["side"] = side
210        if sort_by:
211            params["sortBy"] = sort_by
212        if sort_direction:
213            params["sortDirection"] = sort_direction
214
215        response = self.client.get(self._build_url("/activity"), params=params)
216        response.raise_for_status()
217        return [Activity(**activity) for activity in response.json()]
218
219    def get_holders(
220        self,
221        condition_id: str,
222        limit: int = 500,
223        min_balance: int = 1,
224    ) -> list[HolderResponse]:
225        """Takes in a condition_id and returns top holders for each corresponding token_id."""
226        params: dict[str, int | str] = {
227            "market": condition_id,
228            "limit": limit,
229            "min_balance": min_balance,
230        }
231        response = self.client.get(self._build_url("/holders"), params=params)
232        response.raise_for_status()
233        return [HolderResponse(**holder_data) for holder_data in response.json()]
234
235    def get_value(
236        self,
237        user: EthAddress,
238        condition_ids: Optional[Union[str, list[str]]] = None,
239    ) -> ValueResponse:
240        """
241        Get the current value of a user's position in a set of markets.
242
243        Takes in condition_id as:
244            - None      --> total value of positions
245            - str       --> value of position
246            - list[str] --> sum of the values of positions.
247        """
248        params = {"user": user}
249        if isinstance(condition_ids, str):
250            params["market"] = condition_ids
251        if isinstance(condition_ids, list):
252            params["market"] = ",".join(condition_ids)
253
254        response = self.client.get(self._build_url("/value"), params=params)
255        response.raise_for_status()
256        return ValueResponse(**response.json()[0])
257
258    def get_closed_positions(
259        self,
260        user: EthAddress,
261        condition_ids: Optional[Union[str, list[str]]] = None,
262    ) -> list[Position]:
263        """Get all closed positions."""
264        params = {"user": user}
265        if isinstance(condition_ids, str):
266            params["market"] = condition_ids
267        if isinstance(condition_ids, list):
268            params["market"] = ",".join(condition_ids)
269
270        response = self.client.get(self._build_url("/closed-positions"), params=params)
271        response.raise_for_status()
272        return [Position(**pos) for pos in response.json()]
273
274    def get_total_markets_traded(
275        self,
276        user: EthAddress,
277    ) -> int:
278        """Get the total number of markets a user has traded in."""
279        params = {"user": user}
280
281        response = self.client.get(self._build_url("/traded"), params=params)
282        response.raise_for_status()
283        return response.json()["traded"]
284
285    def get_open_interest(
286        self,
287        condition_ids: Optional[Union[str, list[str]]] = None,
288    ) -> list[MarketValue]:
289        """Get open interest."""
290        params = {}
291
292        if isinstance(condition_ids, str):
293            params["market"] = condition_ids
294        if isinstance(condition_ids, list):
295            params["market"] = ",".join(condition_ids)
296
297        response = self.client.get(self._build_url("/oi"), params=params)
298        response.raise_for_status()
299        return [MarketValue(**oi) for oi in response.json()]
300
301    def get_live_volume(
302        self,
303        event_id: int,
304    ) -> EventLiveVolume:
305        """Get live volume for a given event."""
306        params = {"id": str(event_id)}
307
308        response = self.client.get(self._build_url("/live-volume"), params=params)
309        response.raise_for_status()
310        return EventLiveVolume(**response.json()[0])
311
312    # website endpoints
313
314    def get_pnl(
315        self,
316        user: EthAddress,
317        period: Literal["all", "1m", "1w", "1d"] = "all",
318        frequency: Literal["1h", "3h", "12h", "1d"] = "1h",
319    ) -> list[TimeseriesPoint]:
320        """Get a user's PnL timeseries in the last day, week, month or all with a given frequency."""
321        params = {
322            "user_address": user,
323            "interval": period,
324            "fidelity": frequency,
325        }
326
327        response = self.client.get(
328            "https://user-pnl-api.polymarket.com/user-pnl", params=params
329        )
330        response.raise_for_status()
331        return [TimeseriesPoint(**point) for point in response.json()]
332
333    def get_user_metric(
334        self,
335        user: EthAddress,
336        metric: Literal["profit", "volume"] = "profit",
337        window: Literal["1d", "7d", "30d", "all"] = "all",
338    ):
339        """Get a user's overall profit or volume in the last day, week, month or all."""
340        params: dict[str, int | str] = {
341            "address": user,
342            "window": window,
343            "limit": 1,
344        }
345        response = self.client.get(
346            "https://lb-api.polymarket.com/" + metric, params=params
347        )
348        response.raise_for_status()
349        return UserMetric(**response.json()[0])
350
351    def get_leaderboard_user_rank(
352        self,
353        user: EthAddress,
354        metric: Literal["profit", "volume"] = "profit",
355        window: Literal["1d", "7d", "30d", "all"] = "all",
356    ):
357        """Get a user's rank on the leaderboard by profit or volume."""
358        params = {
359            "address": user,
360            "window": window,
361            "rankType": "pnl" if metric == "profit" else "vol",
362        }
363        response = self.client.get("https://lb-api.polymarket.com/rank", params=params)
364        response.raise_for_status()
365        return UserRank(**response.json()[0])
366
367    def get_leaderboard_top_users(
368        self,
369        metric: Literal["profit", "volume"] = "profit",
370        window: Literal["1d", "7d", "30d", "all"] = "all",
371        limit: int = 100,
372    ):
373        """Get the leaderboard of the top at most 100 users by profit or volume."""
374        params = {
375            "window": window,
376            "limit": limit,
377        }
378        response = self.client.get(
379            "https://lb-api.polymarket.com/" + metric, params=params
380        )
381        response.raise_for_status()
382        return [UserMetric(**user) for user in response.json()]
383
384    def __enter__(self):
385        return self
386
387    def __exit__(self, exc_type, exc_val, exc_tb):
388        self.client.close()
PolymarketDataClient(base_url: str = 'https://data-api.polymarket.com')
25    def __init__(self, base_url: str = "https://data-api.polymarket.com"):
26        self.base_url = base_url
27        self.client = httpx.Client(http2=True, timeout=30.0)
28        self.gql_positions_client = PolymarketGraphQLClient(
29            endpoint_name="positions_subgraph"
30        )
base_url
client
gql_positions_client
def get_ok(self) -> str:
35    def get_ok(self) -> str:
36        response = self.client.get(self.base_url)
37        response.raise_for_status()
38        return response.json()["data"]
def get_all_positions( self, user: Annotated[str, AfterValidator(func=<function validate_eth_address>)], size_threshold: float = 0.0):
40    def get_all_positions(
41        self,
42        user: EthAddress,
43        size_threshold: float = 0.0,
44    ):
45        # data-api /positions endpoint does not support fetching all positions without filters
46        # a workaround is to use the GraphQL positions subgraph directly
47        query = f"""query {{
48                  userBalances(where: {{
49                  user: "{user.lower()}",
50                  balance_gt: "{int(size_threshold * 10**6)}"
51                  }}) {{
52                    user
53                    asset {{
54                      id
55                      condition {{
56                        id
57                      }}
58                      complement
59                      outcomeIndex
60                    }}
61                    balance
62                  }}
63                }}
64                """
65
66        response = self.gql_positions_client.query(query)
67        return [GQLPosition(**pos) for pos in response["userBalances"]]
def get_positions( self, user: Annotated[str, AfterValidator(func=<function validate_eth_address>)], condition_id: Union[str, list[str], NoneType] = None, event_id: Union[int, list[int], NoneType] = None, size_threshold: float = 1.0, redeemable: bool = False, mergeable: bool = False, title: Optional[str] = None, limit: int = 100, offset: int = 0, sort_by: Literal['TOKENS', 'CURRENT', 'INITIAL', 'CASHPNL', 'PERCENTPNL', 'TITLE', 'RESOLVING', 'PRICE', 'AVGPRICE'] = 'TOKENS', sort_direction: Literal['ASC', 'DESC'] = 'DESC') -> list[polymarket_apis.types.data_types.Position]:
 69    def get_positions(
 70        self,
 71        user: EthAddress,
 72        condition_id: Optional[
 73            Union[str, list[str]]
 74        ] = None,  # mutually exclusive with event_id
 75        event_id: Optional[
 76            Union[int, list[int]]
 77        ] = None,  # mutually exclusive with condition_id
 78        size_threshold: float = 1.0,
 79        redeemable: bool = False,
 80        mergeable: bool = False,
 81        title: Optional[str] = None,
 82        limit: int = 100,
 83        offset: int = 0,
 84        sort_by: Literal[
 85            "TOKENS",
 86            "CURRENT",
 87            "INITIAL",
 88            "CASHPNL",
 89            "PERCENTPNL",
 90            "TITLE",
 91            "RESOLVING",
 92            "PRICE",
 93            "AVGPRICE",
 94        ] = "TOKENS",
 95        sort_direction: Literal["ASC", "DESC"] = "DESC",
 96    ) -> list[Position]:
 97        params: dict[str, str | list[str] | int | float] = {
 98            "user": user,
 99            "sizeThreshold": size_threshold,
100            "limit": min(limit, 500),
101            "offset": offset,
102        }
103        if isinstance(condition_id, str):
104            params["market"] = condition_id
105        if isinstance(condition_id, list):
106            params["market"] = ",".join(condition_id)
107        if isinstance(event_id, str):
108            params["eventId"] = event_id
109        if isinstance(event_id, list):
110            params["eventId"] = [str(i) for i in event_id]
111        if redeemable is not None:
112            params["redeemable"] = redeemable
113        if mergeable is not None:
114            params["mergeable"] = mergeable
115        if title:
116            params["title"] = title
117        if sort_by:
118            params["sortBy"] = sort_by
119        if sort_direction:
120            params["sortDirection"] = sort_direction
121
122        response = self.client.get(self._build_url("/positions"), params=params)
123        response.raise_for_status()
124        return [Position(**pos) for pos in response.json()]
def get_trades( self, limit: int = 100, offset: int = 0, taker_only: bool = True, filter_type: Optional[Literal['CASH', 'TOKENS']] = None, filter_amount: Optional[float] = None, condition_id: Union[str, list[str], NoneType] = None, event_id: Union[int, list[int], NoneType] = None, user: Optional[str] = None, side: Optional[Literal['BUY', 'SELL']] = None) -> list[polymarket_apis.types.data_types.Trade]:
126    def get_trades(
127        self,
128        limit: int = 100,
129        offset: int = 0,
130        taker_only: bool = True,
131        filter_type: Optional[Literal["CASH", "TOKENS"]] = None,
132        filter_amount: Optional[
133            float
134        ] = None,  # must be provided together with filter_type
135        condition_id: Optional[str | list[str]] = None,
136        event_id: Optional[int | list[int]] = None,
137        user: Optional[str] = None,
138        side: Optional[Literal["BUY", "SELL"]] = None,
139    ) -> list[Trade]:
140        params: dict[str, int | bool | float | str | list[str]] = {
141            "limit": min(limit, 500),
142            "offset": offset,
143            "takerOnly": taker_only,
144        }
145        if filter_type:
146            params["filterType"] = filter_type
147        if filter_amount:
148            params["filterAmount"] = filter_amount
149        if isinstance(condition_id, str):
150            params["market"] = condition_id
151        if isinstance(condition_id, list):
152            params["market"] = ",".join(condition_id)
153        if isinstance(event_id, str):
154            params["eventId"] = event_id
155        if isinstance(event_id, list):
156            params["eventId"] = [str(i) for i in event_id]
157        if user:
158            params["user"] = user
159        if side:
160            params["side"] = side
161
162        response = self.client.get(self._build_url("/trades"), params=params)
163        response.raise_for_status()
164        return [Trade(**trade) for trade in response.json()]
def get_activity( self, user: Annotated[str, AfterValidator(func=<function validate_eth_address>)], limit: int = 100, offset: int = 0, condition_id: Union[str, list[str], NoneType] = None, event_id: Union[int, list[int], NoneType] = None, type: Union[Literal['TRADE', 'SPLIT', 'MERGE', 'REDEEM', 'REWARD', 'CONVERSION'], list[Literal['TRADE', 'SPLIT', 'MERGE', 'REDEEM', 'REWARD', 'CONVERSION']], NoneType] = None, start: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, side: Optional[Literal['BUY', 'SELL']] = None, sort_by: Literal['TIMESTAMP', 'TOKENS', 'CASH'] = 'TIMESTAMP', sort_direction: Literal['ASC', 'DESC'] = 'DESC') -> list[polymarket_apis.types.data_types.Activity]:
166    def get_activity(
167        self,
168        user: EthAddress,
169        limit: int = 100,
170        offset: int = 0,
171        condition_id: Optional[Union[str, list[str]]] = None,
172        event_id: Optional[Union[int, list[int]]] = None,
173        type: Optional[
174            Union[
175                Literal["TRADE", "SPLIT", "MERGE", "REDEEM", "REWARD", "CONVERSION"],
176                list[
177                    Literal["TRADE", "SPLIT", "MERGE", "REDEEM", "REWARD", "CONVERSION"]
178                ],
179            ]
180        ] = None,
181        start: Optional[datetime] = None,
182        end: Optional[datetime] = None,
183        side: Optional[Literal["BUY", "SELL"]] = None,
184        sort_by: Literal["TIMESTAMP", "TOKENS", "CASH"] = "TIMESTAMP",
185        sort_direction: Literal["ASC", "DESC"] = "DESC",
186    ) -> list[Activity]:
187        params: dict[str, str | list[str] | int] = {
188            "user": user,
189            "limit": min(limit, 500),
190            "offset": offset,
191        }
192        if isinstance(condition_id, str):
193            params["market"] = condition_id
194        if isinstance(condition_id, list):
195            params["market"] = ",".join(condition_id)
196        if isinstance(event_id, str):
197            params["eventId"] = event_id
198        if isinstance(event_id, list):
199            params["eventId"] = [str(i) for i in event_id]
200        if isinstance(type, str):
201            params["type"] = type
202        if isinstance(type, list):
203            params["type"] = ",".join(type)
204        if start:
205            params["start"] = int(start.timestamp())
206        if end:
207            params["end"] = int(end.timestamp())
208        if side:
209            params["side"] = side
210        if sort_by:
211            params["sortBy"] = sort_by
212        if sort_direction:
213            params["sortDirection"] = sort_direction
214
215        response = self.client.get(self._build_url("/activity"), params=params)
216        response.raise_for_status()
217        return [Activity(**activity) for activity in response.json()]
def get_holders( self, condition_id: str, limit: int = 500, min_balance: int = 1) -> list[polymarket_apis.types.data_types.HolderResponse]:
219    def get_holders(
220        self,
221        condition_id: str,
222        limit: int = 500,
223        min_balance: int = 1,
224    ) -> list[HolderResponse]:
225        """Takes in a condition_id and returns top holders for each corresponding token_id."""
226        params: dict[str, int | str] = {
227            "market": condition_id,
228            "limit": limit,
229            "min_balance": min_balance,
230        }
231        response = self.client.get(self._build_url("/holders"), params=params)
232        response.raise_for_status()
233        return [HolderResponse(**holder_data) for holder_data in response.json()]

Takes in a condition_id and returns top holders for each corresponding token_id.

def get_value( self, user: Annotated[str, AfterValidator(func=<function validate_eth_address>)], condition_ids: Union[str, list[str], NoneType] = None) -> polymarket_apis.types.data_types.ValueResponse:
235    def get_value(
236        self,
237        user: EthAddress,
238        condition_ids: Optional[Union[str, list[str]]] = None,
239    ) -> ValueResponse:
240        """
241        Get the current value of a user's position in a set of markets.
242
243        Takes in condition_id as:
244            - None      --> total value of positions
245            - str       --> value of position
246            - list[str] --> sum of the values of positions.
247        """
248        params = {"user": user}
249        if isinstance(condition_ids, str):
250            params["market"] = condition_ids
251        if isinstance(condition_ids, list):
252            params["market"] = ",".join(condition_ids)
253
254        response = self.client.get(self._build_url("/value"), params=params)
255        response.raise_for_status()
256        return ValueResponse(**response.json()[0])

Get the current value of a user's position in a set of markets.

Takes in condition_id as: - None --> total value of positions - str --> value of position - list[str] --> sum of the values of positions.

def get_closed_positions( self, user: Annotated[str, AfterValidator(func=<function validate_eth_address>)], condition_ids: Union[str, list[str], NoneType] = None) -> list[polymarket_apis.types.data_types.Position]:
258    def get_closed_positions(
259        self,
260        user: EthAddress,
261        condition_ids: Optional[Union[str, list[str]]] = None,
262    ) -> list[Position]:
263        """Get all closed positions."""
264        params = {"user": user}
265        if isinstance(condition_ids, str):
266            params["market"] = condition_ids
267        if isinstance(condition_ids, list):
268            params["market"] = ",".join(condition_ids)
269
270        response = self.client.get(self._build_url("/closed-positions"), params=params)
271        response.raise_for_status()
272        return [Position(**pos) for pos in response.json()]

Get all closed positions.

def get_total_markets_traded( self, user: Annotated[str, AfterValidator(func=<function validate_eth_address>)]) -> int:
274    def get_total_markets_traded(
275        self,
276        user: EthAddress,
277    ) -> int:
278        """Get the total number of markets a user has traded in."""
279        params = {"user": user}
280
281        response = self.client.get(self._build_url("/traded"), params=params)
282        response.raise_for_status()
283        return response.json()["traded"]

Get the total number of markets a user has traded in.

def get_open_interest( self, condition_ids: Union[str, list[str], NoneType] = None) -> list[polymarket_apis.types.data_types.MarketValue]:
285    def get_open_interest(
286        self,
287        condition_ids: Optional[Union[str, list[str]]] = None,
288    ) -> list[MarketValue]:
289        """Get open interest."""
290        params = {}
291
292        if isinstance(condition_ids, str):
293            params["market"] = condition_ids
294        if isinstance(condition_ids, list):
295            params["market"] = ",".join(condition_ids)
296
297        response = self.client.get(self._build_url("/oi"), params=params)
298        response.raise_for_status()
299        return [MarketValue(**oi) for oi in response.json()]

Get open interest.

def get_live_volume(self, event_id: int) -> polymarket_apis.types.data_types.EventLiveVolume:
301    def get_live_volume(
302        self,
303        event_id: int,
304    ) -> EventLiveVolume:
305        """Get live volume for a given event."""
306        params = {"id": str(event_id)}
307
308        response = self.client.get(self._build_url("/live-volume"), params=params)
309        response.raise_for_status()
310        return EventLiveVolume(**response.json()[0])

Get live volume for a given event.

def get_pnl( self, user: Annotated[str, AfterValidator(func=<function validate_eth_address>)], period: Literal['all', '1m', '1w', '1d'] = 'all', frequency: Literal['1h', '3h', '12h', '1d'] = '1h') -> list[polymarket_apis.types.common.TimeseriesPoint]:
314    def get_pnl(
315        self,
316        user: EthAddress,
317        period: Literal["all", "1m", "1w", "1d"] = "all",
318        frequency: Literal["1h", "3h", "12h", "1d"] = "1h",
319    ) -> list[TimeseriesPoint]:
320        """Get a user's PnL timeseries in the last day, week, month or all with a given frequency."""
321        params = {
322            "user_address": user,
323            "interval": period,
324            "fidelity": frequency,
325        }
326
327        response = self.client.get(
328            "https://user-pnl-api.polymarket.com/user-pnl", params=params
329        )
330        response.raise_for_status()
331        return [TimeseriesPoint(**point) for point in response.json()]

Get a user's PnL timeseries in the last day, week, month or all with a given frequency.

def get_user_metric( self, user: Annotated[str, AfterValidator(func=<function validate_eth_address>)], metric: Literal['profit', 'volume'] = 'profit', window: Literal['1d', '7d', '30d', 'all'] = 'all'):
333    def get_user_metric(
334        self,
335        user: EthAddress,
336        metric: Literal["profit", "volume"] = "profit",
337        window: Literal["1d", "7d", "30d", "all"] = "all",
338    ):
339        """Get a user's overall profit or volume in the last day, week, month or all."""
340        params: dict[str, int | str] = {
341            "address": user,
342            "window": window,
343            "limit": 1,
344        }
345        response = self.client.get(
346            "https://lb-api.polymarket.com/" + metric, params=params
347        )
348        response.raise_for_status()
349        return UserMetric(**response.json()[0])

Get a user's overall profit or volume in the last day, week, month or all.

def get_leaderboard_user_rank( self, user: Annotated[str, AfterValidator(func=<function validate_eth_address>)], metric: Literal['profit', 'volume'] = 'profit', window: Literal['1d', '7d', '30d', 'all'] = 'all'):
351    def get_leaderboard_user_rank(
352        self,
353        user: EthAddress,
354        metric: Literal["profit", "volume"] = "profit",
355        window: Literal["1d", "7d", "30d", "all"] = "all",
356    ):
357        """Get a user's rank on the leaderboard by profit or volume."""
358        params = {
359            "address": user,
360            "window": window,
361            "rankType": "pnl" if metric == "profit" else "vol",
362        }
363        response = self.client.get("https://lb-api.polymarket.com/rank", params=params)
364        response.raise_for_status()
365        return UserRank(**response.json()[0])

Get a user's rank on the leaderboard by profit or volume.

def get_leaderboard_top_users( self, metric: Literal['profit', 'volume'] = 'profit', window: Literal['1d', '7d', '30d', 'all'] = 'all', limit: int = 100):
367    def get_leaderboard_top_users(
368        self,
369        metric: Literal["profit", "volume"] = "profit",
370        window: Literal["1d", "7d", "30d", "all"] = "all",
371        limit: int = 100,
372    ):
373        """Get the leaderboard of the top at most 100 users by profit or volume."""
374        params = {
375            "window": window,
376            "limit": limit,
377        }
378        response = self.client.get(
379            "https://lb-api.polymarket.com/" + metric, params=params
380        )
381        response.raise_for_status()
382        return [UserMetric(**user) for user in response.json()]

Get the leaderboard of the top at most 100 users by profit or volume.

class PolymarketGammaClient:
 31class PolymarketGammaClient:
 32    def __init__(self, base_url: str = "https://gamma-api.polymarket.com"):
 33        self.base_url = base_url
 34        self.client = httpx.Client(http2=True, timeout=30.0)
 35
 36    def _build_url(self, endpoint: str) -> str:
 37        return urljoin(self.base_url, endpoint)
 38
 39    def search(
 40        self,
 41        query: str,
 42        cache: Optional[bool] = None,
 43        status: Optional[Literal["active", "resolved"]] = None,
 44        limit_per_type: Optional[int] = None,  # max is 50
 45        page: Optional[int] = None,
 46        tags: Optional[list[str]] = None,
 47        keep_closed_markets: Optional[bool] = None,
 48        sort: Optional[
 49            Literal[
 50                "volume",
 51                "volume_24hr",
 52                "liquidity",
 53                "start_date",
 54                "end_date",
 55                "competitive",
 56            ]
 57        ] = None,
 58        ascending: Optional[bool] = None,
 59        search_tags: Optional[bool] = None,
 60        search_profiles: Optional[bool] = None,
 61        recurrence: Optional[
 62            Literal["hourly", "daily", "weekly", "monthly", "annual"]
 63        ] = None,
 64        exclude_tag_ids: Optional[list[int]] = None,
 65        optimized: Optional[bool] = None,
 66    ) -> SearchResult:
 67        params: dict[str, str | list[str] | int | bool] = {
 68            "q": query,
 69        }
 70        if cache is not None:
 71            params["cache"] = str(cache).lower()
 72        if status:
 73            params["events_status"] = status
 74        if limit_per_type:
 75            params["limit_per_type"] = limit_per_type
 76        if page:
 77            params["page"] = page
 78        if tags:
 79            params["events_tag"] = json.dumps([json.dumps(item) for item in tags])
 80        if keep_closed_markets is not None:
 81            params["keep_closed_markets"] = keep_closed_markets
 82        if sort:
 83            params["sort"] = sort
 84        if ascending is not None:
 85            params["ascending"] = str(ascending).lower()
 86        if search_tags is not None:
 87            params["search_tags"] = str(search_tags).lower()
 88        if search_profiles is not None:
 89            params["search_profiles"] = str(search_profiles).lower()
 90        if recurrence:
 91            params["recurrence"] = recurrence
 92        if exclude_tag_ids:
 93            params["exclude_tag_id"] = [str(i) for i in exclude_tag_ids]
 94        if optimized is not None:
 95            params["optimized"] = str(optimized).lower()
 96        response = self.client.get(self._build_url("/public-search"), params=params)
 97        response.raise_for_status()
 98        return SearchResult(**response.json())
 99
100    def get_market(self, market_id: str) -> GammaMarket:
101        """Get a GammaMarket by market_id."""
102        response = self.client.get(self._build_url(f"/markets/{market_id}"))
103        response.raise_for_status()
104        return GammaMarket(**response.json())
105
106    def get_markets(
107        self,
108        limit: int | None = None,
109        offset: int | None = None,
110        order: str | None = None,
111        ascending: bool = True,
112        archived: bool | None = None,
113        active: bool | None = None,
114        closed: bool | None = None,
115        slugs: list[str] | None = None,
116        market_ids: list[int] | None = None,
117        token_ids: list[str] | None = None,
118        condition_ids: list[str] | None = None,
119        tag_id: int | None = None,
120        related_tags: bool | None = False,
121        liquidity_num_min: float | None = None,
122        liquidity_num_max: float | None = None,
123        volume_num_min: float | None = None,
124        volume_num_max: float | None = None,
125        start_date_min: datetime | None = None,
126        start_date_max: datetime | None = None,
127        end_date_min: datetime | None = None,
128        end_date_max: datetime | None = None,
129    ) -> list[GammaMarket]:
130        params: dict[str, float | int | list[int] | str | list[str] | bool] = {}
131        if limit:
132            params["limit"] = limit
133        if offset:
134            params["offset"] = offset
135        if order:
136            params["order"] = order
137            params["ascending"] = ascending
138        if slugs:
139            params["slug"] = slugs
140        if archived is not None:
141            params["archived"] = archived
142        if active is not None:
143            params["active"] = active
144        if closed is not None:
145            params["closed"] = closed
146        if market_ids:
147            params["id"] = market_ids
148        if token_ids:
149            params["clob_token_ids"] = token_ids
150        if condition_ids:
151            params["condition_ids"] = condition_ids
152        if liquidity_num_min:
153            params["liquidity_num_min"] = liquidity_num_min
154        if liquidity_num_max:
155            params["liquidity_num_max"] = liquidity_num_max
156        if volume_num_min:
157            params["volume_num_min"] = volume_num_min
158        if volume_num_max:
159            params["volume_num_max"] = volume_num_max
160        if start_date_min:
161            params["start_date_min"] = start_date_min.isoformat()
162        if start_date_max:
163            params["start_date_max"] = start_date_max.isoformat()
164        if end_date_min:
165            params["end_date_min"] = end_date_min.isoformat()
166        if end_date_max:
167            params["end_date_max"] = end_date_max.isoformat()
168        if tag_id:
169            params["tag_id"] = tag_id
170            if related_tags:
171                params["related_tags"] = related_tags
172
173        response = self.client.get(self._build_url("/markets"), params=params)
174        response.raise_for_status()
175        return [GammaMarket(**market) for market in response.json()]
176
177    def get_market_by_id(
178        self, market_id: str, include_tag: Optional[bool] = None
179    ) -> GammaMarket:
180        params = {}
181        if include_tag:
182            params["include_tag"] = include_tag
183        response = self.client.get(
184            self._build_url(f"/markets/{market_id}"), params=params
185        )
186        response.raise_for_status()
187        return GammaMarket(**response.json())
188
189    def get_market_tags(self, market_id: str) -> list[Tag]:
190        response = self.client.get(self._build_url(f"/markets/{market_id}/tags"))
191        response.raise_for_status()
192        return [Tag(**tag) for tag in response.json()]
193
194    def get_market_by_slug(
195        self, slug: str, include_tag: Optional[bool] = None
196    ) -> GammaMarket:
197        params = {}
198        if include_tag:
199            params["include_tag"] = include_tag
200        response = self.client.get(
201            self._build_url(f"/markets/slug/{slug}"), params=params
202        )
203        response.raise_for_status()
204        return GammaMarket(**response.json())
205
206    def get_events(
207        self,
208        limit: int = 500,
209        offset: int = 0,
210        order: Optional[str] = None,
211        ascending: bool = True,
212        event_ids: Optional[Union[str, list[str]]] = None,
213        slugs: Optional[list[str]] = None,
214        archived: Optional[bool] = None,
215        active: Optional[bool] = None,
216        closed: Optional[bool] = None,
217        liquidity_min: Optional[float] = None,
218        liquidity_max: Optional[float] = None,
219        volume_min: Optional[float] = None,
220        volume_max: Optional[float] = None,
221        start_date_min: Optional[datetime] = None,
222        start_date_max: Optional[datetime] = None,
223        end_date_min: Optional[datetime] = None,
224        end_date_max: Optional[datetime] = None,
225        tag: Optional[str] = None,
226        tag_id: Optional[int] = None,
227        tag_slug: Optional[str] = None,
228        related_tags: bool = False,
229    ) -> list[Event]:
230        params: dict[str, int | str | list[str] | float] = {
231            "limit": limit,
232            "offset": offset,
233        }
234        if order:
235            params["order"] = order
236            params["ascending"] = ascending
237        if event_ids:
238            params["id"] = event_ids
239        if slugs:
240            params["slug"] = slugs
241        if archived is not None:
242            params["archived"] = archived
243        if active is not None:
244            params["active"] = active
245        if closed is not None:
246            params["closed"] = closed
247        if liquidity_min:
248            params["liquidity_min"] = liquidity_min
249        if liquidity_max:
250            params["liquidity_max"] = liquidity_max
251        if volume_min:
252            params["volume_min"] = volume_min
253        if volume_max:
254            params["volume_max"] = volume_max
255        if start_date_min:
256            params["start_date_min"] = start_date_min.isoformat()
257        if start_date_max:
258            params["start_date_max"] = start_date_max.isoformat()
259        if end_date_min:
260            params["end_date_min"] = end_date_min.isoformat()
261        if end_date_max:
262            params["end_date_max"] = end_date_max.isoformat()
263        if tag:
264            params["tag"] = tag
265        elif tag_id:
266            params["tag_id"] = tag_id
267            if related_tags:
268                params["related_tags"] = related_tags
269        elif tag_slug:
270            params["tag_slug"] = tag_slug
271
272        response = self.client.get(self._build_url("/events"), params=params)
273        response.raise_for_status()
274        return [Event(**event) for event in response.json()]
275
276    def get_all_events(
277        self,
278        order: Optional[str] = None,
279        ascending: bool = True,
280        event_ids: Optional[Union[str, list[str]]] = None,
281        slugs: Optional[list[str]] = None,
282        archived: Optional[bool] = None,
283        active: Optional[bool] = None,
284        closed: Optional[bool] = None,
285        liquidity_min: Optional[float] = None,
286        liquidity_max: Optional[float] = None,
287        volume_min: Optional[float] = None,
288        volume_max: Optional[float] = None,
289        start_date_min: Optional[datetime] = None,
290        start_date_max: Optional[datetime] = None,
291        end_date_min: Optional[datetime] = None,
292        end_date_max: Optional[datetime] = None,
293        tag: Optional[str] = None,
294        tag_id: Optional[int] = None,
295        tag_slug: Optional[str] = None,
296        related_tags: bool = False,
297    ) -> list[Event]:
298        offset = 0
299        events = []
300
301        while True:
302            part = self.get_events(
303                offset=offset,
304                order=order,
305                ascending=ascending,
306                event_ids=event_ids,
307                slugs=slugs,
308                archived=archived,
309                active=active,
310                closed=closed,
311                liquidity_min=liquidity_min,
312                liquidity_max=liquidity_max,
313                volume_min=volume_min,
314                volume_max=volume_max,
315                start_date_min=start_date_min,
316                start_date_max=start_date_max,
317                end_date_min=end_date_min,
318                end_date_max=end_date_max,
319                tag=tag,
320                tag_id=tag_id,
321                tag_slug=tag_slug,
322                related_tags=related_tags,
323            )
324            events.extend(part)
325
326            if len(part) < 500:
327                break
328
329            offset += 500
330
331        return events
332
333    def get_event_by_id(
334        self,
335        event_id: int,
336        include_chat: Optional[bool] = None,
337        include_template: Optional[bool] = None,
338    ) -> Event:
339        params = {}
340        if include_chat:
341            params["include_chat"] = include_chat
342        if include_template:
343            params["include_template"] = include_template
344        response = self.client.get(
345            self._build_url(f"/events/{event_id}"), params=params
346        )
347        response.raise_for_status()
348        return Event(**response.json())
349
350    def get_event_by_slug(
351        self,
352        slug: str,
353        include_chat: Optional[bool] = None,
354        include_template: Optional[bool] = None,
355    ) -> Event:
356        params = {}
357        if include_chat:
358            params["include_chat"] = include_chat
359        if include_template:
360            params["include_template"] = include_template
361        response = self.client.get(
362            self._build_url(f"/events/slug/{slug}"), params=params
363        )
364        response.raise_for_status()
365        return Event(**response.json())
366
367    def get_event_tags(self, event_id: int) -> list[Tag]:
368        response = self.client.get(self._build_url(f"/events/{event_id}/tags"))
369        response.raise_for_status()
370        return [Tag(**tag) for tag in response.json()]
371
372    def get_teams(
373        self,
374        limit: int = 500,
375        offset: int = 0,
376        order: Optional[
377            Literal[
378                "id",
379                "name",
380                "league",
381                "record",
382                "logo",
383                "abbreviation",
384                "alias",
385                "createdAt",
386                "updatedAt",
387            ]
388        ] = None,
389        ascending: bool = True,
390        league: Optional[str] = None,
391        name: Optional[str] = None,
392        abbreviation: Optional[str] = None,
393    ) -> list[Team]:
394        params: dict[str, int | str] = {
395            "limit": limit,
396            "offset": offset,
397        }
398        if order:
399            params["order"] = order
400            params["ascending"] = str(ascending).lower()
401        if league:
402            params["league"] = league.lower()
403        if name:
404            params["name"] = name
405        if abbreviation:
406            params["abbreviation"] = abbreviation.lower()
407        response = self.client.get(self._build_url("/teams"), params=params)
408        response.raise_for_status()
409        return [Team(**team) for team in response.json()]
410
411    def get_all_teams(
412        self,
413        order: Optional[
414            Literal[
415                "id",
416                "name",
417                "league",
418                "record",
419                "logo",
420                "abbreviation",
421                "alias",
422                "createdAt",
423                "updatedAt",
424            ]
425        ] = None,
426        ascending: bool = True,
427        league: Optional[str] = None,
428        name: Optional[str] = None,
429        abbreviation: Optional[str] = None,
430    ) -> list[Team]:
431        offset = 0
432        teams = []
433
434        while True:
435            part = self.get_teams(
436                offset=offset,
437                order=order,
438                ascending=ascending,
439                league=league,
440                name=name,
441                abbreviation=abbreviation,
442            )
443            teams.extend(part)
444
445            if len(part) < 500:
446                break
447
448            offset += 500
449
450        return teams
451
452    def get_sports_metadata(
453        self,
454    ) -> list[Sport]:
455        response = self.client.get(self._build_url("/sports"))
456        response.raise_for_status()
457        return [Sport(**sport) for sport in response.json()]
458
459    def get_tags(
460        self,
461        limit: int = 300,
462        offset: int = 0,
463        order: Optional[
464            Literal[
465                "id",
466                "label",
467                "slug",
468                "forceShow",
469                "forceHide",
470                "isCarousel",
471                "createdAt",
472                "updatedAt",
473                "createdBy",
474                "updatedBy",
475            ]
476        ] = None,
477        ascending: bool = True,
478        include_templates: Optional[bool] = None,
479        is_carousel: Optional[bool] = None,
480    ) -> list[Tag]:
481        params: dict[str, int | str] = {
482            "limit": limit,
483            "offset": offset,
484        }
485        if order:
486            params["order"] = order
487            params["ascending"] = str(ascending).lower()
488        if include_templates is not None:
489            params["include_templates"] = str(include_templates).lower()
490        if is_carousel is not None:
491            params["is_carousel"] = str(is_carousel).lower()
492        response = self.client.get(self._build_url("/tags"), params=params)
493        response.raise_for_status()
494        return [Tag(**tag) for tag in response.json()]
495
496    def get_all_tags(
497        self,
498        order: Optional[
499            Literal[
500                "id",
501                "label",
502                "slug",
503                "forceShow",
504                "forceHide",
505                "isCarousel",
506                "createdAt",
507                "updatedAt",
508                "createdBy",
509                "updatedBy",
510            ]
511        ] = None,
512        ascending: bool = True,
513        include_templates: Optional[bool] = None,
514        is_carousel: Optional[bool] = None,
515    ) -> list[Tag]:
516        offset = 0
517        tags = []
518
519        while True:
520            part = self.get_tags(
521                offset=offset,
522                order=order,
523                ascending=ascending,
524                include_templates=include_templates,
525                is_carousel=is_carousel,
526            )
527            tags.extend(part)
528
529            if len(part) < 300:
530                break
531
532            offset += 300
533
534        return tags
535
536    def get_tag(self, tag_id: str, include_template: Optional[bool] = None) -> Tag:
537        params = {}
538        if include_template is not None:
539            params = {"include_template": str(include_template).lower()}
540        response = self.client.get(self._build_url(f"/tags/{tag_id}"), params=params)
541        response.raise_for_status()
542        return Tag(**response.json())
543
544    def get_related_tag_ids_by_tag_id(
545        self,
546        tag_id: int,
547        omit_empty: Optional[bool] = None,
548        status: Optional[Literal["active", "closed", "all"]] = None,
549    ) -> list[TagRelation]:
550        params = {}
551        if omit_empty is not None:
552            params["omit_empty"] = str(omit_empty).lower()
553        if status:
554            params["status"] = status
555        response = self.client.get(
556            self._build_url(f"/tags/{tag_id}/related-tags"), params=params
557        )
558        response.raise_for_status()
559        return [TagRelation(**tag) for tag in response.json()]
560
561    def get_related_tag_ids_by_slug(
562        self,
563        slug: str,
564        omit_empty: Optional[bool] = None,
565        status: Optional[Literal["active", "closed", "all"]] = None,
566    ) -> list[TagRelation]:
567        params = {}
568        if omit_empty is not None:
569            params["omit_empty"] = str(omit_empty).lower()
570        if status:
571            params["status"] = status
572        response = self.client.get(
573            self._build_url(f"/tags/slug/{slug}/related-tags"), params=params
574        )
575        response.raise_for_status()
576        return [TagRelation(**tag) for tag in response.json()]
577
578    def get_related_tags_by_tag_id(
579        self,
580        tag_id: int,
581        omit_empty: Optional[bool] = None,
582        status: Optional[Literal["active", "closed", "all"]] = None,
583    ) -> list[Tag]:
584        params = {}
585        if omit_empty is not None:
586            params["omit_empty"] = str(omit_empty).lower()
587        if status:
588            params["status"] = status
589        response = self.client.get(
590            self._build_url(f"/tags/{tag_id}/related-tags/tags"), params=params
591        )
592        response.raise_for_status()
593        return [Tag(**tag) for tag in response.json()]
594
595    def get_related_tags_by_slug(
596        self,
597        slug: str,
598        omit_empty: Optional[bool] = None,
599        status: Optional[Literal["active", "closed", "all"]] = None,
600    ) -> list[Tag]:
601        params = {}
602        if omit_empty is not None:
603            params["omit_empty"] = str(omit_empty).lower()
604        if status:
605            params["status"] = status
606        response = self.client.get(
607            self._build_url(f"/tags/slug/{slug}/related-tags/tags"), params=params
608        )
609        response.raise_for_status()
610        return [Tag(**tag) for tag in response.json()]
611
612    def get_series(
613        self,
614        limit: int = 300,
615        offset: int = 0,
616        order: Optional[str] = None,
617        ascending: bool = True,
618        slug: Optional[str] = None,
619        closed: Optional[bool] = None,
620        include_chat: Optional[bool] = None,
621        recurrence: Optional[
622            Literal[
623                "hourly", "daily", "weekly", "monthly", "annual"
624            ]  # results also contain "15m" but the server returns a 422 Unprocessable Content
625        ] = None,
626    ) -> list[Series]:
627        params: dict[str, str | int | list[int]] = {
628            "limit": limit,
629            "offset": offset,
630        }
631        if order:
632            params["order"] = order
633            params["ascending"] = str(ascending).lower()
634        if slug:
635            params["slug"] = slug
636        if closed is not None:
637            params["closed"] = str(closed).lower()
638        if include_chat is not None:
639            params["include_chat"] = str(include_chat).lower()
640        if recurrence is not None:
641            params["recurrence"] = str(recurrence).lower()
642
643        response = self.client.get(self._build_url("/series"), params=params)
644        response.raise_for_status()
645        return [Series(**series) for series in response.json()]
646
647    def get_all_series(
648        self,
649        order: Optional[str] = None,
650        ascending: bool = True,
651        slug: Optional[str] = None,
652        closed: Optional[bool] = None,
653        include_chat: Optional[bool] = None,
654        recurrence: Optional[
655            Literal["hourly", "daily", "weekly", "monthly", "annual"]
656        ] = None,
657    ) -> list[Series]:
658        offset = 0
659        series = []
660
661        while True:
662            part = self.get_series(
663                offset=offset,
664                order=order,
665                ascending=ascending,
666                slug=slug,
667                closed=closed,
668                include_chat=include_chat,
669                recurrence=recurrence,
670            )
671            series.extend(part)
672
673            if len(part) < 300:
674                break
675
676            offset += 300
677        return series
678
679    def get_series_by_id(self, series_id: str) -> Series:
680        response = self.client.get(self._build_url(f"/series/{series_id}"))
681        response.raise_for_status()
682        return Series(**response.json())
683
684    def get_comments(
685        self,
686        parent_entity_type: Literal["Event", "Series", "market"],
687        parent_entity_id: int,
688        limit=500,
689        offset=0,
690        order: Optional[str] = None,
691        ascending: bool = True,
692        get_positions: Optional[bool] = None,
693        holders_only: Optional[bool] = None,
694    ) -> list[Comment]:
695        """Warning, the server doesn't give back the right amount of comments you asked for."""
696        params: dict[str, str | int] = {
697            "parent_entity_type": parent_entity_type,
698            "parent_entity_id": parent_entity_id,
699            "limit": limit,
700            "offset": offset,
701        }
702        if order:
703            params["order"] = order
704            params["ascending"] = str(ascending).lower()
705        if get_positions is not None:
706            params["get_positions"] = str(get_positions).lower()
707        if holders_only is not None:
708            params["holders_only"] = str(holders_only).lower()
709        response = self.client.get(self._build_url("/comments"), params=params)
710        response.raise_for_status()
711        return [Comment(**comment) for comment in response.json()]
712
713    def get_comments_by_id(
714        self, comment_id: str, get_positions: Optional[bool] = None
715    ) -> list[Comment]:
716        """Returns all comments that belong to the comment's thread."""
717        params = {}
718        if get_positions is not None:
719            params["get_positions"] = str(get_positions).lower()
720        response = self.client.get(
721            self._build_url(f"/comments/{comment_id}"), params=params
722        )
723        response.raise_for_status()
724        return [Comment(**comment) for comment in response.json()]
725
726    def get_comments_by_user_address(
727        self,
728        user_address: EthAddress,  # warning, this is the base address, not the proxy address
729        limit=500,
730        offset=0,
731        order: Optional[str] = None,
732        ascending: bool = True,
733    ) -> list[Comment]:
734        params: dict[str, str | int] = {
735            "limit": limit,
736            "offset": offset,
737        }
738        if order:
739            params["order"] = order
740            params["ascending"] = str(ascending).lower()
741        response = self.client.get(
742            self._build_url(f"/comments/user_address/{user_address}"), params=params
743        )
744        response.raise_for_status()
745        return [Comment(**comment) for comment in response.json()]
746
747    def grok_event_summary(self, event_slug: str):
748        json_payload = {
749            "id": generate_random_id(),
750            "messages": [{"role": "user", "content": "", "parts": []}],
751        }
752
753        params = {
754            "prompt": event_slug,
755        }
756
757        with self.client.stream(
758            method="POST",
759            url="https://polymarket.com/api/grok/event-summary",
760            params=params,
761            json=json_payload,
762        ) as stream:
763            messages = []
764            citations = []
765            seen_urls = set()
766
767            for line_bytes in stream.iter_lines():
768                line = (
769                    line_bytes.decode() if isinstance(line_bytes, bytes) else line_bytes
770                )
771                if line.startswith("__SOURCES__:"):
772                    sources_json_str = line[len("__SOURCES__:") :]
773                    try:
774                        sources_obj = json.loads(sources_json_str)
775                        for source in sources_obj.get("sources", []):
776                            url = source.get("url")
777                            if url and url not in seen_urls:
778                                citations.append(source)
779                                seen_urls.add(url)
780                    except json.JSONDecodeError:
781                        pass
782                else:
783                    messages.append(line)
784                    print(line, end="")  # or handle message text as needed
785
786        # After reading streamed lines:
787        if citations:
788            print("\n\nSources:")
789            for source in citations:
790                print(f"- {source.get('url', 'Unknown URL')}")
791
792    def grok_election_market_explanation(
793        self, candidate_name: str, election_title: str
794    ):
795        text = f"Provide candidate information for {candidate_name} in the {election_title} on Polymarket."
796        json_payload = {
797            "id": generate_random_id(),
798            "messages": [
799                {
800                    "role": "user",
801                    "content": text,
802                    "parts": [{"type": "text", "text": text}],
803                },
804            ],
805        }
806
807        response = self.client.post(
808            url="https://polymarket.com/api/grok/election-market-explanation",
809            json=json_payload,
810        )
811        response.raise_for_status()
812
813        parts = [p.strip() for p in response.text.split("**") if p.strip()]
814        for i, part in enumerate(parts):
815            if ":" in part and i != 0:
816                print()
817            print(part)
818
819    def __enter__(self):
820        return self
821
822    def __exit__(self, exc_type, exc_val, exc_tb):
823        self.client.close()
PolymarketGammaClient(base_url: str = 'https://gamma-api.polymarket.com')
32    def __init__(self, base_url: str = "https://gamma-api.polymarket.com"):
33        self.base_url = base_url
34        self.client = httpx.Client(http2=True, timeout=30.0)
base_url
client
def search( self, query: str, cache: Optional[bool] = None, status: Optional[Literal['active', 'resolved']] = None, limit_per_type: Optional[int] = None, page: Optional[int] = None, tags: Optional[list[str]] = None, keep_closed_markets: Optional[bool] = None, sort: Optional[Literal['volume', 'volume_24hr', 'liquidity', 'start_date', 'end_date', 'competitive']] = None, ascending: Optional[bool] = None, search_tags: Optional[bool] = None, search_profiles: Optional[bool] = None, recurrence: Optional[Literal['hourly', 'daily', 'weekly', 'monthly', 'annual']] = None, exclude_tag_ids: Optional[list[int]] = None, optimized: Optional[bool] = None) -> polymarket_apis.types.gamma_types.SearchResult:
39    def search(
40        self,
41        query: str,
42        cache: Optional[bool] = None,
43        status: Optional[Literal["active", "resolved"]] = None,
44        limit_per_type: Optional[int] = None,  # max is 50
45        page: Optional[int] = None,
46        tags: Optional[list[str]] = None,
47        keep_closed_markets: Optional[bool] = None,
48        sort: Optional[
49            Literal[
50                "volume",
51                "volume_24hr",
52                "liquidity",
53                "start_date",
54                "end_date",
55                "competitive",
56            ]
57        ] = None,
58        ascending: Optional[bool] = None,
59        search_tags: Optional[bool] = None,
60        search_profiles: Optional[bool] = None,
61        recurrence: Optional[
62            Literal["hourly", "daily", "weekly", "monthly", "annual"]
63        ] = None,
64        exclude_tag_ids: Optional[list[int]] = None,
65        optimized: Optional[bool] = None,
66    ) -> SearchResult:
67        params: dict[str, str | list[str] | int | bool] = {
68            "q": query,
69        }
70        if cache is not None:
71            params["cache"] = str(cache).lower()
72        if status:
73            params["events_status"] = status
74        if limit_per_type:
75            params["limit_per_type"] = limit_per_type
76        if page:
77            params["page"] = page
78        if tags:
79            params["events_tag"] = json.dumps([json.dumps(item) for item in tags])
80        if keep_closed_markets is not None:
81            params["keep_closed_markets"] = keep_closed_markets
82        if sort:
83            params["sort"] = sort
84        if ascending is not None:
85            params["ascending"] = str(ascending).lower()
86        if search_tags is not None:
87            params["search_tags"] = str(search_tags).lower()
88        if search_profiles is not None:
89            params["search_profiles"] = str(search_profiles).lower()
90        if recurrence:
91            params["recurrence"] = recurrence
92        if exclude_tag_ids:
93            params["exclude_tag_id"] = [str(i) for i in exclude_tag_ids]
94        if optimized is not None:
95            params["optimized"] = str(optimized).lower()
96        response = self.client.get(self._build_url("/public-search"), params=params)
97        response.raise_for_status()
98        return SearchResult(**response.json())
def get_market(self, market_id: str) -> polymarket_apis.types.gamma_types.GammaMarket:
100    def get_market(self, market_id: str) -> GammaMarket:
101        """Get a GammaMarket by market_id."""
102        response = self.client.get(self._build_url(f"/markets/{market_id}"))
103        response.raise_for_status()
104        return GammaMarket(**response.json())

Get a GammaMarket by market_id.

def get_markets( self, limit: int | None = None, offset: int | None = None, order: str | None = None, ascending: bool = True, archived: bool | None = None, active: bool | None = None, closed: bool | None = None, slugs: list[str] | None = None, market_ids: list[int] | None = None, token_ids: list[str] | None = None, condition_ids: list[str] | None = None, tag_id: int | None = None, related_tags: bool | None = False, liquidity_num_min: float | None = None, liquidity_num_max: float | None = None, volume_num_min: float | None = None, volume_num_max: float | None = None, start_date_min: datetime.datetime | None = None, start_date_max: datetime.datetime | None = None, end_date_min: datetime.datetime | None = None, end_date_max: datetime.datetime | None = None) -> list[polymarket_apis.types.gamma_types.GammaMarket]:
106    def get_markets(
107        self,
108        limit: int | None = None,
109        offset: int | None = None,
110        order: str | None = None,
111        ascending: bool = True,
112        archived: bool | None = None,
113        active: bool | None = None,
114        closed: bool | None = None,
115        slugs: list[str] | None = None,
116        market_ids: list[int] | None = None,
117        token_ids: list[str] | None = None,
118        condition_ids: list[str] | None = None,
119        tag_id: int | None = None,
120        related_tags: bool | None = False,
121        liquidity_num_min: float | None = None,
122        liquidity_num_max: float | None = None,
123        volume_num_min: float | None = None,
124        volume_num_max: float | None = None,
125        start_date_min: datetime | None = None,
126        start_date_max: datetime | None = None,
127        end_date_min: datetime | None = None,
128        end_date_max: datetime | None = None,
129    ) -> list[GammaMarket]:
130        params: dict[str, float | int | list[int] | str | list[str] | bool] = {}
131        if limit:
132            params["limit"] = limit
133        if offset:
134            params["offset"] = offset
135        if order:
136            params["order"] = order
137            params["ascending"] = ascending
138        if slugs:
139            params["slug"] = slugs
140        if archived is not None:
141            params["archived"] = archived
142        if active is not None:
143            params["active"] = active
144        if closed is not None:
145            params["closed"] = closed
146        if market_ids:
147            params["id"] = market_ids
148        if token_ids:
149            params["clob_token_ids"] = token_ids
150        if condition_ids:
151            params["condition_ids"] = condition_ids
152        if liquidity_num_min:
153            params["liquidity_num_min"] = liquidity_num_min
154        if liquidity_num_max:
155            params["liquidity_num_max"] = liquidity_num_max
156        if volume_num_min:
157            params["volume_num_min"] = volume_num_min
158        if volume_num_max:
159            params["volume_num_max"] = volume_num_max
160        if start_date_min:
161            params["start_date_min"] = start_date_min.isoformat()
162        if start_date_max:
163            params["start_date_max"] = start_date_max.isoformat()
164        if end_date_min:
165            params["end_date_min"] = end_date_min.isoformat()
166        if end_date_max:
167            params["end_date_max"] = end_date_max.isoformat()
168        if tag_id:
169            params["tag_id"] = tag_id
170            if related_tags:
171                params["related_tags"] = related_tags
172
173        response = self.client.get(self._build_url("/markets"), params=params)
174        response.raise_for_status()
175        return [GammaMarket(**market) for market in response.json()]
def get_market_by_id( self, market_id: str, include_tag: Optional[bool] = None) -> polymarket_apis.types.gamma_types.GammaMarket:
177    def get_market_by_id(
178        self, market_id: str, include_tag: Optional[bool] = None
179    ) -> GammaMarket:
180        params = {}
181        if include_tag:
182            params["include_tag"] = include_tag
183        response = self.client.get(
184            self._build_url(f"/markets/{market_id}"), params=params
185        )
186        response.raise_for_status()
187        return GammaMarket(**response.json())
def get_market_tags(self, market_id: str) -> list[polymarket_apis.types.gamma_types.Tag]:
189    def get_market_tags(self, market_id: str) -> list[Tag]:
190        response = self.client.get(self._build_url(f"/markets/{market_id}/tags"))
191        response.raise_for_status()
192        return [Tag(**tag) for tag in response.json()]
def get_market_by_slug( self, slug: str, include_tag: Optional[bool] = None) -> polymarket_apis.types.gamma_types.GammaMarket:
194    def get_market_by_slug(
195        self, slug: str, include_tag: Optional[bool] = None
196    ) -> GammaMarket:
197        params = {}
198        if include_tag:
199            params["include_tag"] = include_tag
200        response = self.client.get(
201            self._build_url(f"/markets/slug/{slug}"), params=params
202        )
203        response.raise_for_status()
204        return GammaMarket(**response.json())
def get_events( self, limit: int = 500, offset: int = 0, order: Optional[str] = None, ascending: bool = True, event_ids: Union[str, list[str], NoneType] = None, slugs: Optional[list[str]] = None, archived: Optional[bool] = None, active: Optional[bool] = None, closed: Optional[bool] = None, liquidity_min: Optional[float] = None, liquidity_max: Optional[float] = None, volume_min: Optional[float] = None, volume_max: Optional[float] = None, start_date_min: Optional[datetime.datetime] = None, start_date_max: Optional[datetime.datetime] = None, end_date_min: Optional[datetime.datetime] = None, end_date_max: Optional[datetime.datetime] = None, tag: Optional[str] = None, tag_id: Optional[int] = None, tag_slug: Optional[str] = None, related_tags: bool = False) -> list[polymarket_apis.types.gamma_types.Event]:
206    def get_events(
207        self,
208        limit: int = 500,
209        offset: int = 0,
210        order: Optional[str] = None,
211        ascending: bool = True,
212        event_ids: Optional[Union[str, list[str]]] = None,
213        slugs: Optional[list[str]] = None,
214        archived: Optional[bool] = None,
215        active: Optional[bool] = None,
216        closed: Optional[bool] = None,
217        liquidity_min: Optional[float] = None,
218        liquidity_max: Optional[float] = None,
219        volume_min: Optional[float] = None,
220        volume_max: Optional[float] = None,
221        start_date_min: Optional[datetime] = None,
222        start_date_max: Optional[datetime] = None,
223        end_date_min: Optional[datetime] = None,
224        end_date_max: Optional[datetime] = None,
225        tag: Optional[str] = None,
226        tag_id: Optional[int] = None,
227        tag_slug: Optional[str] = None,
228        related_tags: bool = False,
229    ) -> list[Event]:
230        params: dict[str, int | str | list[str] | float] = {
231            "limit": limit,
232            "offset": offset,
233        }
234        if order:
235            params["order"] = order
236            params["ascending"] = ascending
237        if event_ids:
238            params["id"] = event_ids
239        if slugs:
240            params["slug"] = slugs
241        if archived is not None:
242            params["archived"] = archived
243        if active is not None:
244            params["active"] = active
245        if closed is not None:
246            params["closed"] = closed
247        if liquidity_min:
248            params["liquidity_min"] = liquidity_min
249        if liquidity_max:
250            params["liquidity_max"] = liquidity_max
251        if volume_min:
252            params["volume_min"] = volume_min
253        if volume_max:
254            params["volume_max"] = volume_max
255        if start_date_min:
256            params["start_date_min"] = start_date_min.isoformat()
257        if start_date_max:
258            params["start_date_max"] = start_date_max.isoformat()
259        if end_date_min:
260            params["end_date_min"] = end_date_min.isoformat()
261        if end_date_max:
262            params["end_date_max"] = end_date_max.isoformat()
263        if tag:
264            params["tag"] = tag
265        elif tag_id:
266            params["tag_id"] = tag_id
267            if related_tags:
268                params["related_tags"] = related_tags
269        elif tag_slug:
270            params["tag_slug"] = tag_slug
271
272        response = self.client.get(self._build_url("/events"), params=params)
273        response.raise_for_status()
274        return [Event(**event) for event in response.json()]
def get_all_events( self, order: Optional[str] = None, ascending: bool = True, event_ids: Union[str, list[str], NoneType] = None, slugs: Optional[list[str]] = None, archived: Optional[bool] = None, active: Optional[bool] = None, closed: Optional[bool] = None, liquidity_min: Optional[float] = None, liquidity_max: Optional[float] = None, volume_min: Optional[float] = None, volume_max: Optional[float] = None, start_date_min: Optional[datetime.datetime] = None, start_date_max: Optional[datetime.datetime] = None, end_date_min: Optional[datetime.datetime] = None, end_date_max: Optional[datetime.datetime] = None, tag: Optional[str] = None, tag_id: Optional[int] = None, tag_slug: Optional[str] = None, related_tags: bool = False) -> list[polymarket_apis.types.gamma_types.Event]:
276    def get_all_events(
277        self,
278        order: Optional[str] = None,
279        ascending: bool = True,
280        event_ids: Optional[Union[str, list[str]]] = None,
281        slugs: Optional[list[str]] = None,
282        archived: Optional[bool] = None,
283        active: Optional[bool] = None,
284        closed: Optional[bool] = None,
285        liquidity_min: Optional[float] = None,
286        liquidity_max: Optional[float] = None,
287        volume_min: Optional[float] = None,
288        volume_max: Optional[float] = None,
289        start_date_min: Optional[datetime] = None,
290        start_date_max: Optional[datetime] = None,
291        end_date_min: Optional[datetime] = None,
292        end_date_max: Optional[datetime] = None,
293        tag: Optional[str] = None,
294        tag_id: Optional[int] = None,
295        tag_slug: Optional[str] = None,
296        related_tags: bool = False,
297    ) -> list[Event]:
298        offset = 0
299        events = []
300
301        while True:
302            part = self.get_events(
303                offset=offset,
304                order=order,
305                ascending=ascending,
306                event_ids=event_ids,
307                slugs=slugs,
308                archived=archived,
309                active=active,
310                closed=closed,
311                liquidity_min=liquidity_min,
312                liquidity_max=liquidity_max,
313                volume_min=volume_min,
314                volume_max=volume_max,
315                start_date_min=start_date_min,
316                start_date_max=start_date_max,
317                end_date_min=end_date_min,
318                end_date_max=end_date_max,
319                tag=tag,
320                tag_id=tag_id,
321                tag_slug=tag_slug,
322                related_tags=related_tags,
323            )
324            events.extend(part)
325
326            if len(part) < 500:
327                break
328
329            offset += 500
330
331        return events
def get_event_by_id( self, event_id: int, include_chat: Optional[bool] = None, include_template: Optional[bool] = None) -> polymarket_apis.types.gamma_types.Event:
333    def get_event_by_id(
334        self,
335        event_id: int,
336        include_chat: Optional[bool] = None,
337        include_template: Optional[bool] = None,
338    ) -> Event:
339        params = {}
340        if include_chat:
341            params["include_chat"] = include_chat
342        if include_template:
343            params["include_template"] = include_template
344        response = self.client.get(
345            self._build_url(f"/events/{event_id}"), params=params
346        )
347        response.raise_for_status()
348        return Event(**response.json())
def get_event_by_slug( self, slug: str, include_chat: Optional[bool] = None, include_template: Optional[bool] = None) -> polymarket_apis.types.gamma_types.Event:
350    def get_event_by_slug(
351        self,
352        slug: str,
353        include_chat: Optional[bool] = None,
354        include_template: Optional[bool] = None,
355    ) -> Event:
356        params = {}
357        if include_chat:
358            params["include_chat"] = include_chat
359        if include_template:
360            params["include_template"] = include_template
361        response = self.client.get(
362            self._build_url(f"/events/slug/{slug}"), params=params
363        )
364        response.raise_for_status()
365        return Event(**response.json())
def get_event_tags(self, event_id: int) -> list[polymarket_apis.types.gamma_types.Tag]:
367    def get_event_tags(self, event_id: int) -> list[Tag]:
368        response = self.client.get(self._build_url(f"/events/{event_id}/tags"))
369        response.raise_for_status()
370        return [Tag(**tag) for tag in response.json()]
def get_teams( self, limit: int = 500, offset: int = 0, order: Optional[Literal['id', 'name', 'league', 'record', 'logo', 'abbreviation', 'alias', 'createdAt', 'updatedAt']] = None, ascending: bool = True, league: Optional[str] = None, name: Optional[str] = None, abbreviation: Optional[str] = None) -> list[polymarket_apis.types.gamma_types.Team]:
372    def get_teams(
373        self,
374        limit: int = 500,
375        offset: int = 0,
376        order: Optional[
377            Literal[
378                "id",
379                "name",
380                "league",
381                "record",
382                "logo",
383                "abbreviation",
384                "alias",
385                "createdAt",
386                "updatedAt",
387            ]
388        ] = None,
389        ascending: bool = True,
390        league: Optional[str] = None,
391        name: Optional[str] = None,
392        abbreviation: Optional[str] = None,
393    ) -> list[Team]:
394        params: dict[str, int | str] = {
395            "limit": limit,
396            "offset": offset,
397        }
398        if order:
399            params["order"] = order
400            params["ascending"] = str(ascending).lower()
401        if league:
402            params["league"] = league.lower()
403        if name:
404            params["name"] = name
405        if abbreviation:
406            params["abbreviation"] = abbreviation.lower()
407        response = self.client.get(self._build_url("/teams"), params=params)
408        response.raise_for_status()
409        return [Team(**team) for team in response.json()]
def get_all_teams( self, order: Optional[Literal['id', 'name', 'league', 'record', 'logo', 'abbreviation', 'alias', 'createdAt', 'updatedAt']] = None, ascending: bool = True, league: Optional[str] = None, name: Optional[str] = None, abbreviation: Optional[str] = None) -> list[polymarket_apis.types.gamma_types.Team]:
411    def get_all_teams(
412        self,
413        order: Optional[
414            Literal[
415                "id",
416                "name",
417                "league",
418                "record",
419                "logo",
420                "abbreviation",
421                "alias",
422                "createdAt",
423                "updatedAt",
424            ]
425        ] = None,
426        ascending: bool = True,
427        league: Optional[str] = None,
428        name: Optional[str] = None,
429        abbreviation: Optional[str] = None,
430    ) -> list[Team]:
431        offset = 0
432        teams = []
433
434        while True:
435            part = self.get_teams(
436                offset=offset,
437                order=order,
438                ascending=ascending,
439                league=league,
440                name=name,
441                abbreviation=abbreviation,
442            )
443            teams.extend(part)
444
445            if len(part) < 500:
446                break
447
448            offset += 500
449
450        return teams
def get_sports_metadata(self) -> list[polymarket_apis.types.gamma_types.Sport]:
452    def get_sports_metadata(
453        self,
454    ) -> list[Sport]:
455        response = self.client.get(self._build_url("/sports"))
456        response.raise_for_status()
457        return [Sport(**sport) for sport in response.json()]
def get_tags( self, limit: int = 300, offset: int = 0, order: Optional[Literal['id', 'label', 'slug', 'forceShow', 'forceHide', 'isCarousel', 'createdAt', 'updatedAt', 'createdBy', 'updatedBy']] = None, ascending: bool = True, include_templates: Optional[bool] = None, is_carousel: Optional[bool] = None) -> list[polymarket_apis.types.gamma_types.Tag]:
459    def get_tags(
460        self,
461        limit: int = 300,
462        offset: int = 0,
463        order: Optional[
464            Literal[
465                "id",
466                "label",
467                "slug",
468                "forceShow",
469                "forceHide",
470                "isCarousel",
471                "createdAt",
472                "updatedAt",
473                "createdBy",
474                "updatedBy",
475            ]
476        ] = None,
477        ascending: bool = True,
478        include_templates: Optional[bool] = None,
479        is_carousel: Optional[bool] = None,
480    ) -> list[Tag]:
481        params: dict[str, int | str] = {
482            "limit": limit,
483            "offset": offset,
484        }
485        if order:
486            params["order"] = order
487            params["ascending"] = str(ascending).lower()
488        if include_templates is not None:
489            params["include_templates"] = str(include_templates).lower()
490        if is_carousel is not None:
491            params["is_carousel"] = str(is_carousel).lower()
492        response = self.client.get(self._build_url("/tags"), params=params)
493        response.raise_for_status()
494        return [Tag(**tag) for tag in response.json()]
def get_all_tags( self, order: Optional[Literal['id', 'label', 'slug', 'forceShow', 'forceHide', 'isCarousel', 'createdAt', 'updatedAt', 'createdBy', 'updatedBy']] = None, ascending: bool = True, include_templates: Optional[bool] = None, is_carousel: Optional[bool] = None) -> list[polymarket_apis.types.gamma_types.Tag]:
496    def get_all_tags(
497        self,
498        order: Optional[
499            Literal[
500                "id",
501                "label",
502                "slug",
503                "forceShow",
504                "forceHide",
505                "isCarousel",
506                "createdAt",
507                "updatedAt",
508                "createdBy",
509                "updatedBy",
510            ]
511        ] = None,
512        ascending: bool = True,
513        include_templates: Optional[bool] = None,
514        is_carousel: Optional[bool] = None,
515    ) -> list[Tag]:
516        offset = 0
517        tags = []
518
519        while True:
520            part = self.get_tags(
521                offset=offset,
522                order=order,
523                ascending=ascending,
524                include_templates=include_templates,
525                is_carousel=is_carousel,
526            )
527            tags.extend(part)
528
529            if len(part) < 300:
530                break
531
532            offset += 300
533
534        return tags
def get_tag( self, tag_id: str, include_template: Optional[bool] = None) -> polymarket_apis.types.gamma_types.Tag:
536    def get_tag(self, tag_id: str, include_template: Optional[bool] = None) -> Tag:
537        params = {}
538        if include_template is not None:
539            params = {"include_template": str(include_template).lower()}
540        response = self.client.get(self._build_url(f"/tags/{tag_id}"), params=params)
541        response.raise_for_status()
542        return Tag(**response.json())
def get_series( self, limit: int = 300, offset: int = 0, order: Optional[str] = None, ascending: bool = True, slug: Optional[str] = None, closed: Optional[bool] = None, include_chat: Optional[bool] = None, recurrence: Optional[Literal['hourly', 'daily', 'weekly', 'monthly', 'annual']] = None) -> list[polymarket_apis.types.gamma_types.Series]:
612    def get_series(
613        self,
614        limit: int = 300,
615        offset: int = 0,
616        order: Optional[str] = None,
617        ascending: bool = True,
618        slug: Optional[str] = None,
619        closed: Optional[bool] = None,
620        include_chat: Optional[bool] = None,
621        recurrence: Optional[
622            Literal[
623                "hourly", "daily", "weekly", "monthly", "annual"
624            ]  # results also contain "15m" but the server returns a 422 Unprocessable Content
625        ] = None,
626    ) -> list[Series]:
627        params: dict[str, str | int | list[int]] = {
628            "limit": limit,
629            "offset": offset,
630        }
631        if order:
632            params["order"] = order
633            params["ascending"] = str(ascending).lower()
634        if slug:
635            params["slug"] = slug
636        if closed is not None:
637            params["closed"] = str(closed).lower()
638        if include_chat is not None:
639            params["include_chat"] = str(include_chat).lower()
640        if recurrence is not None:
641            params["recurrence"] = str(recurrence).lower()
642
643        response = self.client.get(self._build_url("/series"), params=params)
644        response.raise_for_status()
645        return [Series(**series) for series in response.json()]
def get_all_series( self, order: Optional[str] = None, ascending: bool = True, slug: Optional[str] = None, closed: Optional[bool] = None, include_chat: Optional[bool] = None, recurrence: Optional[Literal['hourly', 'daily', 'weekly', 'monthly', 'annual']] = None) -> list[polymarket_apis.types.gamma_types.Series]:
647    def get_all_series(
648        self,
649        order: Optional[str] = None,
650        ascending: bool = True,
651        slug: Optional[str] = None,
652        closed: Optional[bool] = None,
653        include_chat: Optional[bool] = None,
654        recurrence: Optional[
655            Literal["hourly", "daily", "weekly", "monthly", "annual"]
656        ] = None,
657    ) -> list[Series]:
658        offset = 0
659        series = []
660
661        while True:
662            part = self.get_series(
663                offset=offset,
664                order=order,
665                ascending=ascending,
666                slug=slug,
667                closed=closed,
668                include_chat=include_chat,
669                recurrence=recurrence,
670            )
671            series.extend(part)
672
673            if len(part) < 300:
674                break
675
676            offset += 300
677        return series
def get_series_by_id(self, series_id: str) -> polymarket_apis.types.gamma_types.Series:
679    def get_series_by_id(self, series_id: str) -> Series:
680        response = self.client.get(self._build_url(f"/series/{series_id}"))
681        response.raise_for_status()
682        return Series(**response.json())
def get_comments( self, parent_entity_type: Literal['Event', 'Series', 'market'], parent_entity_id: int, limit=500, offset=0, order: Optional[str] = None, ascending: bool = True, get_positions: Optional[bool] = None, holders_only: Optional[bool] = None) -> list[polymarket_apis.types.gamma_types.Comment]:
684    def get_comments(
685        self,
686        parent_entity_type: Literal["Event", "Series", "market"],
687        parent_entity_id: int,
688        limit=500,
689        offset=0,
690        order: Optional[str] = None,
691        ascending: bool = True,
692        get_positions: Optional[bool] = None,
693        holders_only: Optional[bool] = None,
694    ) -> list[Comment]:
695        """Warning, the server doesn't give back the right amount of comments you asked for."""
696        params: dict[str, str | int] = {
697            "parent_entity_type": parent_entity_type,
698            "parent_entity_id": parent_entity_id,
699            "limit": limit,
700            "offset": offset,
701        }
702        if order:
703            params["order"] = order
704            params["ascending"] = str(ascending).lower()
705        if get_positions is not None:
706            params["get_positions"] = str(get_positions).lower()
707        if holders_only is not None:
708            params["holders_only"] = str(holders_only).lower()
709        response = self.client.get(self._build_url("/comments"), params=params)
710        response.raise_for_status()
711        return [Comment(**comment) for comment in response.json()]

Warning, the server doesn't give back the right amount of comments you asked for.

def get_comments_by_id( self, comment_id: str, get_positions: Optional[bool] = None) -> list[polymarket_apis.types.gamma_types.Comment]:
713    def get_comments_by_id(
714        self, comment_id: str, get_positions: Optional[bool] = None
715    ) -> list[Comment]:
716        """Returns all comments that belong to the comment's thread."""
717        params = {}
718        if get_positions is not None:
719            params["get_positions"] = str(get_positions).lower()
720        response = self.client.get(
721            self._build_url(f"/comments/{comment_id}"), params=params
722        )
723        response.raise_for_status()
724        return [Comment(**comment) for comment in response.json()]

Returns all comments that belong to the comment's thread.

def get_comments_by_user_address( self, user_address: Annotated[str, AfterValidator(func=<function validate_eth_address>)], limit=500, offset=0, order: Optional[str] = None, ascending: bool = True) -> list[polymarket_apis.types.gamma_types.Comment]:
726    def get_comments_by_user_address(
727        self,
728        user_address: EthAddress,  # warning, this is the base address, not the proxy address
729        limit=500,
730        offset=0,
731        order: Optional[str] = None,
732        ascending: bool = True,
733    ) -> list[Comment]:
734        params: dict[str, str | int] = {
735            "limit": limit,
736            "offset": offset,
737        }
738        if order:
739            params["order"] = order
740            params["ascending"] = str(ascending).lower()
741        response = self.client.get(
742            self._build_url(f"/comments/user_address/{user_address}"), params=params
743        )
744        response.raise_for_status()
745        return [Comment(**comment) for comment in response.json()]
def grok_event_summary(self, event_slug: str):
747    def grok_event_summary(self, event_slug: str):
748        json_payload = {
749            "id": generate_random_id(),
750            "messages": [{"role": "user", "content": "", "parts": []}],
751        }
752
753        params = {
754            "prompt": event_slug,
755        }
756
757        with self.client.stream(
758            method="POST",
759            url="https://polymarket.com/api/grok/event-summary",
760            params=params,
761            json=json_payload,
762        ) as stream:
763            messages = []
764            citations = []
765            seen_urls = set()
766
767            for line_bytes in stream.iter_lines():
768                line = (
769                    line_bytes.decode() if isinstance(line_bytes, bytes) else line_bytes
770                )
771                if line.startswith("__SOURCES__:"):
772                    sources_json_str = line[len("__SOURCES__:") :]
773                    try:
774                        sources_obj = json.loads(sources_json_str)
775                        for source in sources_obj.get("sources", []):
776                            url = source.get("url")
777                            if url and url not in seen_urls:
778                                citations.append(source)
779                                seen_urls.add(url)
780                    except json.JSONDecodeError:
781                        pass
782                else:
783                    messages.append(line)
784                    print(line, end="")  # or handle message text as needed
785
786        # After reading streamed lines:
787        if citations:
788            print("\n\nSources:")
789            for source in citations:
790                print(f"- {source.get('url', 'Unknown URL')}")
def grok_election_market_explanation(self, candidate_name: str, election_title: str):
792    def grok_election_market_explanation(
793        self, candidate_name: str, election_title: str
794    ):
795        text = f"Provide candidate information for {candidate_name} in the {election_title} on Polymarket."
796        json_payload = {
797            "id": generate_random_id(),
798            "messages": [
799                {
800                    "role": "user",
801                    "content": text,
802                    "parts": [{"type": "text", "text": text}],
803                },
804            ],
805        }
806
807        response = self.client.post(
808            url="https://polymarket.com/api/grok/election-market-explanation",
809            json=json_payload,
810        )
811        response.raise_for_status()
812
813        parts = [p.strip() for p in response.text.split("**") if p.strip()]
814        for i, part in enumerate(parts):
815            if ":" in part and i != 0:
816                print()
817            print(part)
class PolymarketGraphQLClient:
10class PolymarketGraphQLClient:
11    """Synchronous GraphQL client for Polymarket subgraphs."""
12
13    def __init__(
14        self,
15        endpoint_name: Literal[
16            "activity_subgraph",
17            "fpmm_subgraph",
18            "open_interest_subgraph",
19            "orderbook_subgraph",
20            "pnl_subgraph",
21            "positions_subgraph",
22            "sports_oracle_subgraph",
23            "wallet_subgraph",
24        ],
25    ) -> None:
26        endpoint_url = GRAPHQL_ENDPOINTS[endpoint_name]
27        self.transport = HTTPXTransport(url=endpoint_url)
28        self.client = Client(
29            transport=self.transport, fetch_schema_from_transport=False
30        )
31
32    def query(self, query_string: str) -> dict:
33        with self.client as session:
34            return session.execute(gql(query_string))

Synchronous GraphQL client for Polymarket subgraphs.

PolymarketGraphQLClient( endpoint_name: Literal['activity_subgraph', 'fpmm_subgraph', 'open_interest_subgraph', 'orderbook_subgraph', 'pnl_subgraph', 'positions_subgraph', 'sports_oracle_subgraph', 'wallet_subgraph'])
13    def __init__(
14        self,
15        endpoint_name: Literal[
16            "activity_subgraph",
17            "fpmm_subgraph",
18            "open_interest_subgraph",
19            "orderbook_subgraph",
20            "pnl_subgraph",
21            "positions_subgraph",
22            "sports_oracle_subgraph",
23            "wallet_subgraph",
24        ],
25    ) -> None:
26        endpoint_url = GRAPHQL_ENDPOINTS[endpoint_name]
27        self.transport = HTTPXTransport(url=endpoint_url)
28        self.client = Client(
29            transport=self.transport, fetch_schema_from_transport=False
30        )
transport
client
def query(self, query_string: str) -> dict:
32    def query(self, query_string: str) -> dict:
33        with self.client as session:
34            return session.execute(gql(query_string))
class PolymarketWeb3Client:
 41class PolymarketWeb3Client:
 42    def __init__(
 43        self,
 44        private_key: str,
 45        signature_type: Literal[0, 1, 2] = 1,
 46        chain_id: Literal[137, 80002] = POLYGON,
 47    ):
 48        self.w3 = Web3(Web3.HTTPProvider("https://polygon-rpc.com"))
 49        self.w3.middleware_onion.inject(ExtraDataToPOAMiddleware, layer=0)  # type: ignore[arg-type]
 50        self.w3.middleware_onion.inject(
 51            SignAndSendRawMiddlewareBuilder.build(private_key),  # type: ignore[arg-type]
 52            layer=0,
 53        )
 54
 55        self.account = self.w3.eth.account.from_key(private_key)
 56        self.signature_type = signature_type
 57
 58        self.config = get_contract_config(chain_id, neg_risk=False)
 59        self.neg_risk_config = get_contract_config(chain_id, neg_risk=True)
 60
 61        self.usdc_address = Web3.to_checksum_address(self.config.collateral)
 62        self.usdc_abi = _load_abi("UChildERC20Proxy")
 63        self.usdc = self._contract(self.usdc_address, self.usdc_abi)
 64
 65        self.conditional_tokens_address = Web3.to_checksum_address(
 66            self.config.conditional_tokens
 67        )
 68        self.conditional_tokens_abi = _load_abi("ConditionalTokens")
 69        self.conditional_tokens = self._contract(
 70            self.conditional_tokens_address, self.conditional_tokens_abi
 71        )
 72
 73        self.exchange_address = Web3.to_checksum_address(self.config.exchange)
 74        self.exchange_abi = _load_abi("CTFExchange")
 75        self.exchange = self._contract(self.exchange_address, self.exchange_abi)
 76
 77        self.neg_risk_exchange_address = Web3.to_checksum_address(
 78            self.neg_risk_config.exchange
 79        )
 80        self.neg_risk_exchange_abi = _load_abi("NegRiskCtfExchange")
 81        self.neg_risk_exchange = self._contract(
 82            self.neg_risk_exchange_address, self.neg_risk_exchange_abi
 83        )
 84
 85        self.neg_risk_adapter_address = Web3.to_checksum_address(
 86            "0xd91E80cF2E7be2e162c6513ceD06f1dD0dA35296"
 87        )
 88        self.neg_risk_adapter_abi = _load_abi("NegRiskAdapter")
 89        self.neg_risk_adapter = self._contract(
 90            self.neg_risk_adapter_address, self.neg_risk_adapter_abi
 91        )
 92
 93        self.proxy_factory_address = Web3.to_checksum_address(
 94            "0xaB45c5A4B0c941a2F231C04C3f49182e1A254052"
 95        )
 96        self.proxy_factory_abi = _load_abi("ProxyWalletFactory")
 97        self.proxy_factory = self._contract(
 98            self.proxy_factory_address, self.proxy_factory_abi
 99        )
100
101        self.safe_proxy_factory_address = Web3.to_checksum_address(
102            "0xaacFeEa03eb1561C4e67d661e40682Bd20E3541b"
103        )
104        self.safe_proxy_factory_abi = _load_abi("SafeProxyFactory")
105        self.safe_proxy_factory = self._contract(
106            self.safe_proxy_factory_address, self.safe_proxy_factory_abi
107        )
108
109        match self.signature_type:
110            case 0:
111                self.address = self.account.address
112            case 1:
113                self.address = self.get_poly_proxy_address()
114            case 2:
115                self.address = self.get_safe_proxy_address()
116                self.safe_abi = _load_abi("Safe")
117                self.safe = self._contract(self.address, self.safe_abi)
118
119    def _contract(self, address, abi):
120        return self.w3.eth.contract(
121            address=Web3.to_checksum_address(address),
122            abi=abi,
123        )
124
125    def _encode_usdc_approve(self, address: ChecksumAddress) -> str:
126        return self.usdc.encode_abi(
127            abi_element_identifier="approve",
128            args=[address, int(MAX_INT, base=16)],
129        )
130
131    def _encode_condition_tokens_approve(self, address: ChecksumAddress) -> str:
132        return self.conditional_tokens.encode_abi(
133            abi_element_identifier="setApprovalForAll",
134            args=[address, True],
135        )
136
137    def _encode_transfer_usdc(self, address: ChecksumAddress, amount: int) -> str:
138        return self.usdc.encode_abi(
139            abi_element_identifier="transfer",
140            args=[address, amount],
141        )
142
143    def _encode_transfer_token(
144        self, token_id: str, address: ChecksumAddress, amount: int
145    ) -> str:
146        return self.conditional_tokens.encode_abi(
147            abi_element_identifier="safeTransferFrom",
148            args=[self.address, address, int(token_id), amount, HASH_ZERO],
149        )
150
151    def _encode_split(self, condition_id: Keccak256, amount: int) -> str:
152        return self.conditional_tokens.encode_abi(
153            abi_element_identifier="splitPosition",
154            args=[self.usdc_address, HASH_ZERO, condition_id, [1, 2], amount],
155        )
156
157    def _encode_merge(self, condition_id: Keccak256, amount: int) -> str:
158        return self.conditional_tokens.encode_abi(
159            abi_element_identifier="mergePositions",
160            args=[self.usdc_address, HASH_ZERO, condition_id, [1, 2], amount],
161        )
162
163    def _encode_redeem(self, condition_id: Keccak256) -> str:
164        return self.conditional_tokens.encode_abi(
165            abi_element_identifier="redeemPositions",
166            args=[self.usdc_address, HASH_ZERO, condition_id, [1, 2]],
167        )
168
169    def _encode_redeem_neg_risk(
170        self, condition_id: Keccak256, amounts: list[int]
171    ) -> str:
172        return self.neg_risk_adapter.encode_abi(
173            abi_element_identifier="redeemPositions",
174            args=[condition_id, amounts],
175        )
176
177    def _encode_convert(
178        self, neg_risk_market_id: Keccak256, index_set: int, amount: int
179    ) -> str:
180        return self.neg_risk_adapter.encode_abi(
181            abi_element_identifier="convertPositions",
182            args=[neg_risk_market_id, index_set, amount],
183        )
184
185    def _build_base_transaction(self) -> TxParams:
186        """Build base transaction parameters."""
187        nonce = self.w3.eth.get_transaction_count(self.account.address)
188
189        current_gas_price: int = self.w3.eth.gas_price
190        adjusted_gas_price = Wei(int(current_gas_price * 1.05))
191
192        base_transaction: TxParams = {
193            "nonce": nonce,
194            "gasPrice": adjusted_gas_price,
195            "gas": 1000000,
196            "from": self.account.address,
197        }
198
199        return base_transaction
200
201    def _build_proxy_transaction(self, to, data, base_transaction) -> TxParams:
202        proxy_txn = {
203            "typeCode": 1,
204            "to": to,
205            "value": 0,
206            "data": data,
207        }
208        txn_data = self.proxy_factory.functions.proxy([proxy_txn]).build_transaction(
209            transaction=base_transaction
210        )
211
212        return txn_data
213
214    def _build_safe_transaction(self, to, data, base_transaction) -> TxParams:
215        safe_nonce = self.safe.functions.nonce().call()
216        safe_txn = {
217            "to": to,
218            "data": data,
219            "operation": 0,  # 1 for delegatecall, 0 for call
220            "value": 0,
221        }
222        packed_sig = sign_safe_transaction(
223            self.account,
224            self.safe,
225            safe_txn,
226            safe_nonce,
227        )
228        txn_data = self.safe.functions.execTransaction(
229            safe_txn["to"],
230            safe_txn["value"],
231            safe_txn["data"],
232            safe_txn.get("operation", 0),
233            0,  # safeTxGas
234            0,  # baseGas
235            0,  # gasPrice
236            ADDRESS_ZERO,  # gasToken
237            ADDRESS_ZERO,  # refundReceiver
238            packed_sig,
239        ).build_transaction(transaction=base_transaction)
240
241        return txn_data
242
243    def _execute_transaction(
244        self, txn_data: TxParams, operation_name: str = "Transaction"
245    ) -> TransactionReceipt:
246        """
247        Execute a transaction, wait for receipt, and print status.
248
249        Args:
250            txn_data: Built transaction data
251            operation_name: Name of operation for logging
252
253        """
254        signed_txn = self.account.sign_transaction(txn_data)
255        tx_hash = self.w3.eth.send_raw_transaction(signed_txn.raw_transaction)
256        tx_hash_hex = tx_hash.hex()
257
258        print(f"Txn hash: 0x{tx_hash_hex}")
259
260        # Wait for transaction to be mined and get receipt
261        receipt_dict = self.w3.eth.wait_for_transaction_receipt(tx_hash)
262        receipt = TransactionReceipt.model_validate(receipt_dict)
263
264        print(f"{operation_name} succeeded") if receipt.status == 1 else print(
265            f"{operation_name} failed"
266        )
267
268        return receipt
269
270    def get_poly_proxy_address(self, address: EthAddress | None = None) -> EthAddress:
271        """Get the polymarket proxy address for the current account."""
272        address = address if address else self.account.address
273        return self.exchange.functions.getPolyProxyWalletAddress(address).call()
274
275    def get_safe_proxy_address(self, address: EthAddress | None = None) -> EthAddress:
276        """Get the safe proxy address for the current account."""
277        address = address if address else self.account.address
278        return self.safe_proxy_factory.functions.computeProxyAddress(address).call()
279
280    def get_usdc_balance(self, address: EthAddress | None = None) -> float:
281        """
282        Get the usdc balance of the given address.
283
284        If no address is given, the balance of the proxy account corresponding to
285        the private key is returned (i.e. Polymarket balance).
286        Explicitly passing the proxy address is faster due to only one contract function call.
287        """
288        if address is None:
289            address = self.address
290        balance_res = self.usdc.functions.balanceOf(address).call()
291        return float(balance_res / 1e6)
292
293    def get_token_balance(
294        self, token_id: str, address: EthAddress | None = None
295    ) -> float:
296        """Get the token balance of the given address."""
297        if not address:
298            address = self.address
299        balance_res = self.conditional_tokens.functions.balanceOf(
300            address, int(token_id)
301        ).call()
302        return float(balance_res / 1e6)
303
304    def get_token_complement(self, token_id: str) -> str | None:
305        """Get the complement of the given token."""
306        try:
307            return str(
308                self.neg_risk_exchange.functions.getComplement(int(token_id)).call()
309            )
310        except ContractCustomError as e:
311            if e.args[0] in CUSTOM_ERROR_DICT:
312                try:
313                    return str(
314                        self.exchange.functions.getComplement(int(token_id)).call()
315                    )
316                except ContractCustomError as e2:
317                    if e2.args[0] in CUSTOM_ERROR_DICT:
318                        msg = f"{CUSTOM_ERROR_DICT[e2.args[0]]}"
319                        raise ContractCustomError(
320                            msg,
321                        ) from e2
322                    return None
323            return None
324
325    def get_condition_id_neg_risk(self, question_id: Keccak256) -> Keccak256:
326        """
327        Get the condition id for a given question id.
328
329        Warning: this works for neg risk markets (where the
330        outcomeSlotCount is represented by the last two digits of question id). Returns a keccak256 hash of
331        the oracle and question id.
332        """
333        return (
334            "0x"
335            + self.neg_risk_adapter.functions.getConditionId(question_id).call().hex()
336        )
337
338    def deploy_safe(self) -> TransactionReceipt:
339        """Deploy a Safe wallet using the SafeProxyFactory contract."""
340        safe_address = self.get_safe_proxy_address()
341        if self.w3.eth.get_code(self.w3.to_checksum_address(safe_address)) != b"":
342            msg = f"Safe already deployed at {safe_address}"
343            raise SafeAlreadyDeployedError(msg)
344
345        # Create the EIP-712 signature for Safe creation
346        sig = create_safe_create_signature(account=self.account, chain_id=POLYGON)
347
348        # Split the signature into r, s, v components
349        split_sig = split_signature(sig)
350
351        # Build the transaction
352        base_transaction = self._build_base_transaction()
353
354        # Execute the createProxy function
355        txn_data = self.safe_proxy_factory.functions.createProxy(
356            ADDRESS_ZERO,  # paymentToken
357            0,  # payment
358            ADDRESS_ZERO,  # paymentReceiver
359            (
360                split_sig["v"],
361                split_sig["r"],
362                split_sig["s"],
363            ),  # createSig tuple (uint8, bytes32, bytes32)
364        ).build_transaction(transaction=base_transaction)
365
366        # Sign and send transaction
367        return self._execute_transaction(
368            txn_data, operation_name="Gnosis Safe Deployment"
369        )
370
371    def set_collateral_approval(self, spender: ChecksumAddress) -> TransactionReceipt:
372        to = self.usdc_address
373        data = self._encode_usdc_approve(address=spender)
374        base_transaction = self._build_base_transaction()
375        txn_data: TxParams = {}
376
377        match self.signature_type:
378            case 0:
379                txn_data = self.usdc.functions.approve(
380                    spender, int(MAX_INT, base=16)
381                ).build_transaction(transaction=base_transaction)
382            case 1:
383                txn_data = self._build_proxy_transaction(to, data, base_transaction)
384            case 2:
385                txn_data = self._build_safe_transaction(to, data, base_transaction)
386
387        return self._execute_transaction(txn_data, operation_name="Collateral Approval")
388
389    def set_conditional_tokens_approval(
390        self, spender: ChecksumAddress
391    ) -> TransactionReceipt:
392        to = self.conditional_tokens_address
393        data = self._encode_condition_tokens_approve(address=spender)
394        base_transaction = self._build_base_transaction()
395        txn_data: TxParams = {}
396
397        match self.signature_type:
398            case 0:
399                txn_data = self.conditional_tokens.functions.setApprovalForAll(
400                    spender, True
401                ).build_transaction(transaction=base_transaction)
402            case 1:
403                txn_data = self._build_proxy_transaction(to, data, base_transaction)
404            case 2:
405                txn_data = self._build_safe_transaction(to, data, base_transaction)
406
407        return self._execute_transaction(
408            txn_data, operation_name="Conditional Tokens Approval"
409        )
410
411    def set_all_approvals(self) -> list[TransactionReceipt]:
412        """Sets both collateral and conditional tokens approvals."""
413        receipts = []
414        print("Approving ConditionalTokens as spender on USDC")
415        receipts.append(
416            self.set_collateral_approval(
417                spender=self.conditional_tokens_address,
418            )
419        )
420        print("Approving CTFExchange as spender on USDC")
421        receipts.append(
422            self.set_collateral_approval(
423                spender=self.exchange_address,
424            )
425        )
426        print("Approving NegRiskCtfExchange as spender on USDC")
427        receipts.append(
428            self.set_collateral_approval(
429                spender=self.neg_risk_exchange_address,
430            )
431        )
432        print("Approving NegRiskAdapter as spender on USDC")
433        receipts.append(
434            self.set_collateral_approval(
435                spender=self.neg_risk_adapter_address,
436            )
437        )
438        print("Approving CTFExchange as spender on ConditionalTokens")
439        receipts.append(
440            self.set_conditional_tokens_approval(
441                spender=self.exchange_address,
442            )
443        )
444        print("Approving NegRiskCtfExchange as spender on ConditionalTokens")
445        receipts.append(
446            self.set_conditional_tokens_approval(
447                spender=self.neg_risk_exchange_address,
448            )
449        )
450        print("Approving NegRiskAdapter as spender on ConditionalTokens")
451        receipts.append(
452            self.set_conditional_tokens_approval(
453                spender=self.neg_risk_adapter_address,
454            )
455        )
456        print("All approvals set!")
457
458        return receipts
459
460    def transfer_usdc(self, recipient: EthAddress, amount: float) -> TransactionReceipt:
461        """Transfers usdc.e from the account to the proxy address."""
462        balance = self.get_usdc_balance(address=self.address)
463        if balance < amount:
464            msg = f"Insufficient USDC.e balance: {balance} < {amount}"
465            raise ValueError(msg)
466        amount = int(balance * 1e6)
467
468        to = self.usdc_address
469        data = self._encode_transfer_usdc(
470            self.w3.to_checksum_address(recipient), amount
471        )
472        base_transaction = self._build_base_transaction()
473        txn_data: TxParams = {}
474
475        match self.signature_type:
476            case 0:
477                txn_data = self.usdc.functions.transfer(
478                    recipient,
479                    amount,
480                ).build_transaction(transaction=base_transaction)
481            case 1:
482                txn_data = self._build_proxy_transaction(to, data, base_transaction)
483            case 2:
484                txn_data = self._build_safe_transaction(to, data, base_transaction)
485
486        # Sign and send transaction
487        return self._execute_transaction(txn_data, operation_name="USDC Transfer")
488
489    def transfer_token(
490        self, token_id: str, recipient: EthAddress, amount: float
491    ) -> TransactionReceipt:
492        """Transfers conditional tokens from the account to the recipient address."""
493        balance = self.get_token_balance(token_id=token_id, address=self.address)
494        if balance < amount:
495            msg = f"Insufficient token balance: {balance} < {amount}"
496            raise ValueError(msg)
497        amount = int(balance * 1e6)
498
499        to = self.conditional_tokens_address
500        data = self._encode_transfer_token(
501            token_id, self.w3.to_checksum_address(recipient), amount
502        )
503        base_transaction = self._build_base_transaction()
504        txn_data: TxParams = {}
505
506        match self.signature_type:
507            case 0:
508                txn_data = self.conditional_tokens.functions.safeTransferFrom(
509                    self.address,
510                    recipient,
511                    int(token_id),
512                    amount,
513                    b"",
514                ).build_transaction(transaction=base_transaction)
515            case 1:
516                txn_data = self._build_proxy_transaction(to, data, base_transaction)
517            case 2:
518                txn_data = self._build_safe_transaction(to, data, base_transaction)
519
520        # Sign and send transaction
521        return self._execute_transaction(txn_data, operation_name="Token Transfer")
522
523    def split_position(
524        self, condition_id: Keccak256, amount: float, neg_risk: bool = True
525    ) -> TransactionReceipt:
526        """Splits usdc into two complementary positions of equal size."""
527        amount = int(amount * 1e6)
528
529        to = (
530            self.neg_risk_adapter_address
531            if neg_risk
532            else self.conditional_tokens_address
533        )
534        data = self._encode_split(condition_id, amount)
535        base_transaction = self._build_base_transaction()
536        txn_data: TxParams = {}
537
538        match self.signature_type:
539            case 0:
540                _contract = (
541                    self.neg_risk_adapter if neg_risk else self.conditional_tokens
542                )
543                txn_data = _contract.functions.splitPosition(
544                    self.usdc_address,
545                    HASH_ZERO,
546                    condition_id,
547                    [1, 2],
548                    amount,
549                ).build_transaction(transaction=base_transaction)
550            case 1:
551                txn_data = self._build_proxy_transaction(to, data, base_transaction)
552            case 2:
553                txn_data = self._build_safe_transaction(to, data, base_transaction)
554
555        # Sign and send transaction
556        return self._execute_transaction(txn_data, operation_name="Split Position")
557
558    def merge_position(
559        self, condition_id: Keccak256, amount: float, neg_risk: bool = True
560    ) -> TransactionReceipt:
561        """Merges two complementary positions into usdc."""
562        amount = int(amount * 1e6)
563
564        to = (
565            self.neg_risk_adapter_address
566            if neg_risk
567            else self.conditional_tokens_address
568        )
569        data = self._encode_merge(condition_id, amount)
570        base_transaction = self._build_base_transaction()
571        txn_data: TxParams = {}
572
573        match self.signature_type:
574            case 0:
575                _contract = (
576                    self.neg_risk_adapter if neg_risk else self.conditional_tokens
577                )
578                txn_data = _contract.functions.mergePositions(
579                    self.usdc_address,
580                    HASH_ZERO,
581                    condition_id,
582                    [1, 2],
583                    amount,
584                ).build_transaction(transaction=base_transaction)
585            case 1:
586                txn_data = self._build_proxy_transaction(to, data, base_transaction)
587            case 2:
588                txn_data = self._build_safe_transaction(to, data, base_transaction)
589
590        # Sign and send transaction
591        return self._execute_transaction(txn_data, operation_name="Merge Position")
592
593    def redeem_position(
594        self, condition_id: Keccak256, amounts: list[float], neg_risk: bool = True
595    ) -> TransactionReceipt:
596        """
597        Redeem a position into usdc.
598
599        Takes a condition id and a list of sizes in shares [x, y]
600        where x is the number of shares of the first outcome
601              y is the number of shares of the second outcome.
602        """
603        int_amounts = [int(amount * 1e6) for amount in amounts]
604
605        to = (
606            self.neg_risk_adapter_address
607            if neg_risk
608            else self.conditional_tokens_address
609        )
610        data = (
611            self._encode_redeem_neg_risk(condition_id, int_amounts)
612            if neg_risk
613            else self._encode_redeem(condition_id)
614        )
615        base_transaction = self._build_base_transaction()
616        txn_data: TxParams = {}
617
618        match self.signature_type:
619            case 0:
620                _contract = (
621                    self.neg_risk_adapter if neg_risk else self.conditional_tokens
622                )
623                if neg_risk:
624                    txn_data = _contract.functions.redeemPositions(
625                        condition_id, int_amounts
626                    ).build_transaction(transaction=base_transaction)
627                else:
628                    txn_data = _contract.functions.redeemPositions(
629                        self.usdc_address,
630                        HASH_ZERO,
631                        condition_id,
632                        [1, 2],
633                    ).build_transaction(transaction=base_transaction)
634            case 1:
635                txn_data = self._build_proxy_transaction(to, data, base_transaction)
636            case 2:
637                txn_data = self._build_safe_transaction(to, data, base_transaction)
638
639        # Sign and send transaction
640        return self._execute_transaction(txn_data, operation_name="Redeem Position")
641
642    def convert_positions(
643        self,
644        question_ids: list[Keccak256],
645        amount: float,
646    ) -> TransactionReceipt:
647        amount = int(amount * 1e6)
648        neg_risk_market_id = question_ids[0][:-2] + "00"
649
650        to = self.neg_risk_adapter_address
651        data = self._encode_convert(
652            neg_risk_market_id, get_index_set(question_ids), amount
653        )
654        base_transaction = self._build_base_transaction()
655        txn_data: TxParams = {}
656
657        match self.signature_type:
658            case 0:
659                txn_data = self.neg_risk_adapter.functions.convertPositions(
660                    neg_risk_market_id,
661                    get_index_set(question_ids),
662                    amount,
663                ).build_transaction(transaction=base_transaction)
664            case 1:
665                txn_data = self._build_proxy_transaction(to, data, base_transaction)
666            case 2:
667                txn_data = self._build_safe_transaction(to, data, base_transaction)
668
669        # Sign and send transaction
670        return self._execute_transaction(txn_data, operation_name="Convert Positions")
PolymarketWeb3Client( private_key: str, signature_type: Literal[0, 1, 2] = 1, chain_id: Literal[137, 80002] = 137)
 42    def __init__(
 43        self,
 44        private_key: str,
 45        signature_type: Literal[0, 1, 2] = 1,
 46        chain_id: Literal[137, 80002] = POLYGON,
 47    ):
 48        self.w3 = Web3(Web3.HTTPProvider("https://polygon-rpc.com"))
 49        self.w3.middleware_onion.inject(ExtraDataToPOAMiddleware, layer=0)  # type: ignore[arg-type]
 50        self.w3.middleware_onion.inject(
 51            SignAndSendRawMiddlewareBuilder.build(private_key),  # type: ignore[arg-type]
 52            layer=0,
 53        )
 54
 55        self.account = self.w3.eth.account.from_key(private_key)
 56        self.signature_type = signature_type
 57
 58        self.config = get_contract_config(chain_id, neg_risk=False)
 59        self.neg_risk_config = get_contract_config(chain_id, neg_risk=True)
 60
 61        self.usdc_address = Web3.to_checksum_address(self.config.collateral)
 62        self.usdc_abi = _load_abi("UChildERC20Proxy")
 63        self.usdc = self._contract(self.usdc_address, self.usdc_abi)
 64
 65        self.conditional_tokens_address = Web3.to_checksum_address(
 66            self.config.conditional_tokens
 67        )
 68        self.conditional_tokens_abi = _load_abi("ConditionalTokens")
 69        self.conditional_tokens = self._contract(
 70            self.conditional_tokens_address, self.conditional_tokens_abi
 71        )
 72
 73        self.exchange_address = Web3.to_checksum_address(self.config.exchange)
 74        self.exchange_abi = _load_abi("CTFExchange")
 75        self.exchange = self._contract(self.exchange_address, self.exchange_abi)
 76
 77        self.neg_risk_exchange_address = Web3.to_checksum_address(
 78            self.neg_risk_config.exchange
 79        )
 80        self.neg_risk_exchange_abi = _load_abi("NegRiskCtfExchange")
 81        self.neg_risk_exchange = self._contract(
 82            self.neg_risk_exchange_address, self.neg_risk_exchange_abi
 83        )
 84
 85        self.neg_risk_adapter_address = Web3.to_checksum_address(
 86            "0xd91E80cF2E7be2e162c6513ceD06f1dD0dA35296"
 87        )
 88        self.neg_risk_adapter_abi = _load_abi("NegRiskAdapter")
 89        self.neg_risk_adapter = self._contract(
 90            self.neg_risk_adapter_address, self.neg_risk_adapter_abi
 91        )
 92
 93        self.proxy_factory_address = Web3.to_checksum_address(
 94            "0xaB45c5A4B0c941a2F231C04C3f49182e1A254052"
 95        )
 96        self.proxy_factory_abi = _load_abi("ProxyWalletFactory")
 97        self.proxy_factory = self._contract(
 98            self.proxy_factory_address, self.proxy_factory_abi
 99        )
100
101        self.safe_proxy_factory_address = Web3.to_checksum_address(
102            "0xaacFeEa03eb1561C4e67d661e40682Bd20E3541b"
103        )
104        self.safe_proxy_factory_abi = _load_abi("SafeProxyFactory")
105        self.safe_proxy_factory = self._contract(
106            self.safe_proxy_factory_address, self.safe_proxy_factory_abi
107        )
108
109        match self.signature_type:
110            case 0:
111                self.address = self.account.address
112            case 1:
113                self.address = self.get_poly_proxy_address()
114            case 2:
115                self.address = self.get_safe_proxy_address()
116                self.safe_abi = _load_abi("Safe")
117                self.safe = self._contract(self.address, self.safe_abi)
w3
account
signature_type
config
neg_risk_config
usdc_address
usdc_abi
usdc
conditional_tokens_address
conditional_tokens_abi
conditional_tokens
exchange_address
exchange_abi
exchange
neg_risk_exchange_address
neg_risk_exchange_abi
neg_risk_exchange
neg_risk_adapter_address
neg_risk_adapter_abi
neg_risk_adapter
proxy_factory_address
proxy_factory_abi
proxy_factory
safe_proxy_factory_address
safe_proxy_factory_abi
safe_proxy_factory
def get_poly_proxy_address( self, address: Optional[Annotated[str, AfterValidator(func=<function validate_eth_address>)]] = None) -> Annotated[str, AfterValidator(func=<function validate_eth_address at 0x109abe7a0>)]:
270    def get_poly_proxy_address(self, address: EthAddress | None = None) -> EthAddress:
271        """Get the polymarket proxy address for the current account."""
272        address = address if address else self.account.address
273        return self.exchange.functions.getPolyProxyWalletAddress(address).call()

Get the polymarket proxy address for the current account.

def get_safe_proxy_address( self, address: Optional[Annotated[str, AfterValidator(func=<function validate_eth_address>)]] = None) -> Annotated[str, AfterValidator(func=<function validate_eth_address at 0x109abe7a0>)]:
275    def get_safe_proxy_address(self, address: EthAddress | None = None) -> EthAddress:
276        """Get the safe proxy address for the current account."""
277        address = address if address else self.account.address
278        return self.safe_proxy_factory.functions.computeProxyAddress(address).call()

Get the safe proxy address for the current account.

def get_usdc_balance( self, address: Optional[Annotated[str, AfterValidator(func=<function validate_eth_address>)]] = None) -> float:
280    def get_usdc_balance(self, address: EthAddress | None = None) -> float:
281        """
282        Get the usdc balance of the given address.
283
284        If no address is given, the balance of the proxy account corresponding to
285        the private key is returned (i.e. Polymarket balance).
286        Explicitly passing the proxy address is faster due to only one contract function call.
287        """
288        if address is None:
289            address = self.address
290        balance_res = self.usdc.functions.balanceOf(address).call()
291        return float(balance_res / 1e6)

Get the usdc balance of the given address.

If no address is given, the balance of the proxy account corresponding to the private key is returned (i.e. Polymarket balance). Explicitly passing the proxy address is faster due to only one contract function call.

def get_token_balance( self, token_id: str, address: Optional[Annotated[str, AfterValidator(func=<function validate_eth_address>)]] = None) -> float:
293    def get_token_balance(
294        self, token_id: str, address: EthAddress | None = None
295    ) -> float:
296        """Get the token balance of the given address."""
297        if not address:
298            address = self.address
299        balance_res = self.conditional_tokens.functions.balanceOf(
300            address, int(token_id)
301        ).call()
302        return float(balance_res / 1e6)

Get the token balance of the given address.

def get_token_complement(self, token_id: str) -> str | None:
304    def get_token_complement(self, token_id: str) -> str | None:
305        """Get the complement of the given token."""
306        try:
307            return str(
308                self.neg_risk_exchange.functions.getComplement(int(token_id)).call()
309            )
310        except ContractCustomError as e:
311            if e.args[0] in CUSTOM_ERROR_DICT:
312                try:
313                    return str(
314                        self.exchange.functions.getComplement(int(token_id)).call()
315                    )
316                except ContractCustomError as e2:
317                    if e2.args[0] in CUSTOM_ERROR_DICT:
318                        msg = f"{CUSTOM_ERROR_DICT[e2.args[0]]}"
319                        raise ContractCustomError(
320                            msg,
321                        ) from e2
322                    return None
323            return None

Get the complement of the given token.

def get_condition_id_neg_risk( self, question_id: Annotated[str, AfterValidator(func=<function validate_keccak256>)]) -> Annotated[str, AfterValidator(func=<function validate_keccak256 at 0x109abe480>)]:
325    def get_condition_id_neg_risk(self, question_id: Keccak256) -> Keccak256:
326        """
327        Get the condition id for a given question id.
328
329        Warning: this works for neg risk markets (where the
330        outcomeSlotCount is represented by the last two digits of question id). Returns a keccak256 hash of
331        the oracle and question id.
332        """
333        return (
334            "0x"
335            + self.neg_risk_adapter.functions.getConditionId(question_id).call().hex()
336        )

Get the condition id for a given question id.

Warning: this works for neg risk markets (where the outcomeSlotCount is represented by the last two digits of question id). Returns a keccak256 hash of the oracle and question id.

def deploy_safe(self) -> polymarket_apis.types.web3_types.TransactionReceipt:
338    def deploy_safe(self) -> TransactionReceipt:
339        """Deploy a Safe wallet using the SafeProxyFactory contract."""
340        safe_address = self.get_safe_proxy_address()
341        if self.w3.eth.get_code(self.w3.to_checksum_address(safe_address)) != b"":
342            msg = f"Safe already deployed at {safe_address}"
343            raise SafeAlreadyDeployedError(msg)
344
345        # Create the EIP-712 signature for Safe creation
346        sig = create_safe_create_signature(account=self.account, chain_id=POLYGON)
347
348        # Split the signature into r, s, v components
349        split_sig = split_signature(sig)
350
351        # Build the transaction
352        base_transaction = self._build_base_transaction()
353
354        # Execute the createProxy function
355        txn_data = self.safe_proxy_factory.functions.createProxy(
356            ADDRESS_ZERO,  # paymentToken
357            0,  # payment
358            ADDRESS_ZERO,  # paymentReceiver
359            (
360                split_sig["v"],
361                split_sig["r"],
362                split_sig["s"],
363            ),  # createSig tuple (uint8, bytes32, bytes32)
364        ).build_transaction(transaction=base_transaction)
365
366        # Sign and send transaction
367        return self._execute_transaction(
368            txn_data, operation_name="Gnosis Safe Deployment"
369        )

Deploy a Safe wallet using the SafeProxyFactory contract.

def set_collateral_approval( self, spender: eth_typing.evm.ChecksumAddress) -> polymarket_apis.types.web3_types.TransactionReceipt:
371    def set_collateral_approval(self, spender: ChecksumAddress) -> TransactionReceipt:
372        to = self.usdc_address
373        data = self._encode_usdc_approve(address=spender)
374        base_transaction = self._build_base_transaction()
375        txn_data: TxParams = {}
376
377        match self.signature_type:
378            case 0:
379                txn_data = self.usdc.functions.approve(
380                    spender, int(MAX_INT, base=16)
381                ).build_transaction(transaction=base_transaction)
382            case 1:
383                txn_data = self._build_proxy_transaction(to, data, base_transaction)
384            case 2:
385                txn_data = self._build_safe_transaction(to, data, base_transaction)
386
387        return self._execute_transaction(txn_data, operation_name="Collateral Approval")
def set_conditional_tokens_approval( self, spender: eth_typing.evm.ChecksumAddress) -> polymarket_apis.types.web3_types.TransactionReceipt:
389    def set_conditional_tokens_approval(
390        self, spender: ChecksumAddress
391    ) -> TransactionReceipt:
392        to = self.conditional_tokens_address
393        data = self._encode_condition_tokens_approve(address=spender)
394        base_transaction = self._build_base_transaction()
395        txn_data: TxParams = {}
396
397        match self.signature_type:
398            case 0:
399                txn_data = self.conditional_tokens.functions.setApprovalForAll(
400                    spender, True
401                ).build_transaction(transaction=base_transaction)
402            case 1:
403                txn_data = self._build_proxy_transaction(to, data, base_transaction)
404            case 2:
405                txn_data = self._build_safe_transaction(to, data, base_transaction)
406
407        return self._execute_transaction(
408            txn_data, operation_name="Conditional Tokens Approval"
409        )
def set_all_approvals(self) -> list[polymarket_apis.types.web3_types.TransactionReceipt]:
411    def set_all_approvals(self) -> list[TransactionReceipt]:
412        """Sets both collateral and conditional tokens approvals."""
413        receipts = []
414        print("Approving ConditionalTokens as spender on USDC")
415        receipts.append(
416            self.set_collateral_approval(
417                spender=self.conditional_tokens_address,
418            )
419        )
420        print("Approving CTFExchange as spender on USDC")
421        receipts.append(
422            self.set_collateral_approval(
423                spender=self.exchange_address,
424            )
425        )
426        print("Approving NegRiskCtfExchange as spender on USDC")
427        receipts.append(
428            self.set_collateral_approval(
429                spender=self.neg_risk_exchange_address,
430            )
431        )
432        print("Approving NegRiskAdapter as spender on USDC")
433        receipts.append(
434            self.set_collateral_approval(
435                spender=self.neg_risk_adapter_address,
436            )
437        )
438        print("Approving CTFExchange as spender on ConditionalTokens")
439        receipts.append(
440            self.set_conditional_tokens_approval(
441                spender=self.exchange_address,
442            )
443        )
444        print("Approving NegRiskCtfExchange as spender on ConditionalTokens")
445        receipts.append(
446            self.set_conditional_tokens_approval(
447                spender=self.neg_risk_exchange_address,
448            )
449        )
450        print("Approving NegRiskAdapter as spender on ConditionalTokens")
451        receipts.append(
452            self.set_conditional_tokens_approval(
453                spender=self.neg_risk_adapter_address,
454            )
455        )
456        print("All approvals set!")
457
458        return receipts

Sets both collateral and conditional tokens approvals.

def transfer_usdc( self, recipient: Annotated[str, AfterValidator(func=<function validate_eth_address>)], amount: float) -> polymarket_apis.types.web3_types.TransactionReceipt:
460    def transfer_usdc(self, recipient: EthAddress, amount: float) -> TransactionReceipt:
461        """Transfers usdc.e from the account to the proxy address."""
462        balance = self.get_usdc_balance(address=self.address)
463        if balance < amount:
464            msg = f"Insufficient USDC.e balance: {balance} < {amount}"
465            raise ValueError(msg)
466        amount = int(balance * 1e6)
467
468        to = self.usdc_address
469        data = self._encode_transfer_usdc(
470            self.w3.to_checksum_address(recipient), amount
471        )
472        base_transaction = self._build_base_transaction()
473        txn_data: TxParams = {}
474
475        match self.signature_type:
476            case 0:
477                txn_data = self.usdc.functions.transfer(
478                    recipient,
479                    amount,
480                ).build_transaction(transaction=base_transaction)
481            case 1:
482                txn_data = self._build_proxy_transaction(to, data, base_transaction)
483            case 2:
484                txn_data = self._build_safe_transaction(to, data, base_transaction)
485
486        # Sign and send transaction
487        return self._execute_transaction(txn_data, operation_name="USDC Transfer")

Transfers usdc.e from the account to the proxy address.

def transfer_token( self, token_id: str, recipient: Annotated[str, AfterValidator(func=<function validate_eth_address>)], amount: float) -> polymarket_apis.types.web3_types.TransactionReceipt:
489    def transfer_token(
490        self, token_id: str, recipient: EthAddress, amount: float
491    ) -> TransactionReceipt:
492        """Transfers conditional tokens from the account to the recipient address."""
493        balance = self.get_token_balance(token_id=token_id, address=self.address)
494        if balance < amount:
495            msg = f"Insufficient token balance: {balance} < {amount}"
496            raise ValueError(msg)
497        amount = int(balance * 1e6)
498
499        to = self.conditional_tokens_address
500        data = self._encode_transfer_token(
501            token_id, self.w3.to_checksum_address(recipient), amount
502        )
503        base_transaction = self._build_base_transaction()
504        txn_data: TxParams = {}
505
506        match self.signature_type:
507            case 0:
508                txn_data = self.conditional_tokens.functions.safeTransferFrom(
509                    self.address,
510                    recipient,
511                    int(token_id),
512                    amount,
513                    b"",
514                ).build_transaction(transaction=base_transaction)
515            case 1:
516                txn_data = self._build_proxy_transaction(to, data, base_transaction)
517            case 2:
518                txn_data = self._build_safe_transaction(to, data, base_transaction)
519
520        # Sign and send transaction
521        return self._execute_transaction(txn_data, operation_name="Token Transfer")

Transfers conditional tokens from the account to the recipient address.

def split_position( self, condition_id: Annotated[str, AfterValidator(func=<function validate_keccak256>)], amount: float, neg_risk: bool = True) -> polymarket_apis.types.web3_types.TransactionReceipt:
523    def split_position(
524        self, condition_id: Keccak256, amount: float, neg_risk: bool = True
525    ) -> TransactionReceipt:
526        """Splits usdc into two complementary positions of equal size."""
527        amount = int(amount * 1e6)
528
529        to = (
530            self.neg_risk_adapter_address
531            if neg_risk
532            else self.conditional_tokens_address
533        )
534        data = self._encode_split(condition_id, amount)
535        base_transaction = self._build_base_transaction()
536        txn_data: TxParams = {}
537
538        match self.signature_type:
539            case 0:
540                _contract = (
541                    self.neg_risk_adapter if neg_risk else self.conditional_tokens
542                )
543                txn_data = _contract.functions.splitPosition(
544                    self.usdc_address,
545                    HASH_ZERO,
546                    condition_id,
547                    [1, 2],
548                    amount,
549                ).build_transaction(transaction=base_transaction)
550            case 1:
551                txn_data = self._build_proxy_transaction(to, data, base_transaction)
552            case 2:
553                txn_data = self._build_safe_transaction(to, data, base_transaction)
554
555        # Sign and send transaction
556        return self._execute_transaction(txn_data, operation_name="Split Position")

Splits usdc into two complementary positions of equal size.

def merge_position( self, condition_id: Annotated[str, AfterValidator(func=<function validate_keccak256>)], amount: float, neg_risk: bool = True) -> polymarket_apis.types.web3_types.TransactionReceipt:
558    def merge_position(
559        self, condition_id: Keccak256, amount: float, neg_risk: bool = True
560    ) -> TransactionReceipt:
561        """Merges two complementary positions into usdc."""
562        amount = int(amount * 1e6)
563
564        to = (
565            self.neg_risk_adapter_address
566            if neg_risk
567            else self.conditional_tokens_address
568        )
569        data = self._encode_merge(condition_id, amount)
570        base_transaction = self._build_base_transaction()
571        txn_data: TxParams = {}
572
573        match self.signature_type:
574            case 0:
575                _contract = (
576                    self.neg_risk_adapter if neg_risk else self.conditional_tokens
577                )
578                txn_data = _contract.functions.mergePositions(
579                    self.usdc_address,
580                    HASH_ZERO,
581                    condition_id,
582                    [1, 2],
583                    amount,
584                ).build_transaction(transaction=base_transaction)
585            case 1:
586                txn_data = self._build_proxy_transaction(to, data, base_transaction)
587            case 2:
588                txn_data = self._build_safe_transaction(to, data, base_transaction)
589
590        # Sign and send transaction
591        return self._execute_transaction(txn_data, operation_name="Merge Position")

Merges two complementary positions into usdc.

def redeem_position( self, condition_id: Annotated[str, AfterValidator(func=<function validate_keccak256>)], amounts: list[float], neg_risk: bool = True) -> polymarket_apis.types.web3_types.TransactionReceipt:
593    def redeem_position(
594        self, condition_id: Keccak256, amounts: list[float], neg_risk: bool = True
595    ) -> TransactionReceipt:
596        """
597        Redeem a position into usdc.
598
599        Takes a condition id and a list of sizes in shares [x, y]
600        where x is the number of shares of the first outcome
601              y is the number of shares of the second outcome.
602        """
603        int_amounts = [int(amount * 1e6) for amount in amounts]
604
605        to = (
606            self.neg_risk_adapter_address
607            if neg_risk
608            else self.conditional_tokens_address
609        )
610        data = (
611            self._encode_redeem_neg_risk(condition_id, int_amounts)
612            if neg_risk
613            else self._encode_redeem(condition_id)
614        )
615        base_transaction = self._build_base_transaction()
616        txn_data: TxParams = {}
617
618        match self.signature_type:
619            case 0:
620                _contract = (
621                    self.neg_risk_adapter if neg_risk else self.conditional_tokens
622                )
623                if neg_risk:
624                    txn_data = _contract.functions.redeemPositions(
625                        condition_id, int_amounts
626                    ).build_transaction(transaction=base_transaction)
627                else:
628                    txn_data = _contract.functions.redeemPositions(
629                        self.usdc_address,
630                        HASH_ZERO,
631                        condition_id,
632                        [1, 2],
633                    ).build_transaction(transaction=base_transaction)
634            case 1:
635                txn_data = self._build_proxy_transaction(to, data, base_transaction)
636            case 2:
637                txn_data = self._build_safe_transaction(to, data, base_transaction)
638
639        # Sign and send transaction
640        return self._execute_transaction(txn_data, operation_name="Redeem Position")

Redeem a position into usdc.

Takes a condition id and a list of sizes in shares [x, y] where x is the number of shares of the first outcome y is the number of shares of the second outcome.

def convert_positions( self, question_ids: list[typing.Annotated[str, AfterValidator(func=<function validate_keccak256>)]], amount: float) -> polymarket_apis.types.web3_types.TransactionReceipt:
642    def convert_positions(
643        self,
644        question_ids: list[Keccak256],
645        amount: float,
646    ) -> TransactionReceipt:
647        amount = int(amount * 1e6)
648        neg_risk_market_id = question_ids[0][:-2] + "00"
649
650        to = self.neg_risk_adapter_address
651        data = self._encode_convert(
652            neg_risk_market_id, get_index_set(question_ids), amount
653        )
654        base_transaction = self._build_base_transaction()
655        txn_data: TxParams = {}
656
657        match self.signature_type:
658            case 0:
659                txn_data = self.neg_risk_adapter.functions.convertPositions(
660                    neg_risk_market_id,
661                    get_index_set(question_ids),
662                    amount,
663                ).build_transaction(transaction=base_transaction)
664            case 1:
665                txn_data = self._build_proxy_transaction(to, data, base_transaction)
666            case 2:
667                txn_data = self._build_safe_transaction(to, data, base_transaction)
668
669        # Sign and send transaction
670        return self._execute_transaction(txn_data, operation_name="Convert Positions")
class PolymarketWebsocketsClient:
129class PolymarketWebsocketsClient:
130    def __init__(self):
131        self.url_market = "wss://ws-subscriptions-clob.polymarket.com/ws/market"
132        self.url_user = "wss://ws-subscriptions-clob.polymarket.com/ws/user"
133        self.url_live_data = "wss://ws-live-data.polymarket.com"
134
135    def market_socket(
136        self, token_ids: list[str], process_event: Callable = _process_market_event
137    ):
138        """
139        Connect to the market websocket and subscribe to market events for specific token IDs.
140
141        Args:
142            token_ids: List of token IDs to subscribe to
143            process_event: Callback function to process received events
144
145        """
146        websocket = WebSocket(self.url_market)
147
148        for event in persist(websocket):  # persist automatically reconnects
149            if event.name == "ready":
150                websocket.send_json(
151                    assets_ids=token_ids,
152                )
153            elif event.name == "text":
154                process_event(event)
155
156    def user_socket(
157        self, creds: ApiCreds, process_event: Callable = _process_user_event
158    ):
159        """
160        Connect to the user websocket and subscribe to user events.
161
162        Args:
163            creds: API credentials for authentication
164            process_event: Callback function to process received events
165
166        """
167        websocket = WebSocket(self.url_user)
168
169        for event in persist(websocket):
170            if event.name == "ready":
171                websocket.send_json(
172                    auth=creds.model_dump(by_alias=True),
173                )
174            elif event.name == "text":
175                process_event(event)
176
177    def live_data_socket(
178        self,
179        subscriptions: list[dict[str, Any]],
180        process_event: Callable = _process_live_data_event,
181        creds: Optional[ApiCreds] = None,
182    ):
183        # info on how to subscribe found at https://github.com/Polymarket/real-time-data-client?tab=readme-ov-file#subscribe
184        """
185        Connect to the live data websocket and subscribe to specified events.
186
187        Args:
188            subscriptions: List of subscription configurations
189            process_event: Callback function to process received events
190            creds: ApiCreds for authentication if subscribing to clob_user topic
191
192        """
193        websocket = WebSocket(self.url_live_data)
194
195        needs_auth = any(sub.get("topic") == "clob_user" for sub in subscriptions)
196
197        for event in persist(websocket):
198            if event.name == "ready":
199                if needs_auth:
200                    if creds is None:
201                        msg = "ApiCreds credentials are required for the clob_user topic subscriptions"
202                        raise AuthenticationRequiredError(msg)
203                    subscriptions_with_creds = []
204                    for sub in subscriptions:
205                        if sub.get("topic") == "clob_user":
206                            sub_copy = sub.copy()
207                            sub_copy["clob_auth"] = creds.model_dump()
208                            subscriptions_with_creds.append(sub_copy)
209                        else:
210                            subscriptions_with_creds.append(sub)
211                    subscriptions = subscriptions_with_creds
212
213                payload = {
214                    "action": "subscribe",
215                    "subscriptions": subscriptions,
216                }
217
218                websocket.send_json(**payload)
219
220            elif event.name == "text":
221                process_event(event)
url_market
url_user
url_live_data
def market_socket( self, token_ids: list[str], process_event: Callable = <function _process_market_event>):
135    def market_socket(
136        self, token_ids: list[str], process_event: Callable = _process_market_event
137    ):
138        """
139        Connect to the market websocket and subscribe to market events for specific token IDs.
140
141        Args:
142            token_ids: List of token IDs to subscribe to
143            process_event: Callback function to process received events
144
145        """
146        websocket = WebSocket(self.url_market)
147
148        for event in persist(websocket):  # persist automatically reconnects
149            if event.name == "ready":
150                websocket.send_json(
151                    assets_ids=token_ids,
152                )
153            elif event.name == "text":
154                process_event(event)

Connect to the market websocket and subscribe to market events for specific token IDs.

Args: token_ids: List of token IDs to subscribe to process_event: Callback function to process received events

def user_socket( self, creds: polymarket_apis.types.clob_types.ApiCreds, process_event: Callable = <function _process_user_event>):
156    def user_socket(
157        self, creds: ApiCreds, process_event: Callable = _process_user_event
158    ):
159        """
160        Connect to the user websocket and subscribe to user events.
161
162        Args:
163            creds: API credentials for authentication
164            process_event: Callback function to process received events
165
166        """
167        websocket = WebSocket(self.url_user)
168
169        for event in persist(websocket):
170            if event.name == "ready":
171                websocket.send_json(
172                    auth=creds.model_dump(by_alias=True),
173                )
174            elif event.name == "text":
175                process_event(event)

Connect to the user websocket and subscribe to user events.

Args: creds: API credentials for authentication process_event: Callback function to process received events

def live_data_socket( self, subscriptions: list[dict[str, typing.Any]], process_event: Callable = <function _process_live_data_event>, creds: Optional[polymarket_apis.types.clob_types.ApiCreds] = None):
177    def live_data_socket(
178        self,
179        subscriptions: list[dict[str, Any]],
180        process_event: Callable = _process_live_data_event,
181        creds: Optional[ApiCreds] = None,
182    ):
183        # info on how to subscribe found at https://github.com/Polymarket/real-time-data-client?tab=readme-ov-file#subscribe
184        """
185        Connect to the live data websocket and subscribe to specified events.
186
187        Args:
188            subscriptions: List of subscription configurations
189            process_event: Callback function to process received events
190            creds: ApiCreds for authentication if subscribing to clob_user topic
191
192        """
193        websocket = WebSocket(self.url_live_data)
194
195        needs_auth = any(sub.get("topic") == "clob_user" for sub in subscriptions)
196
197        for event in persist(websocket):
198            if event.name == "ready":
199                if needs_auth:
200                    if creds is None:
201                        msg = "ApiCreds credentials are required for the clob_user topic subscriptions"
202                        raise AuthenticationRequiredError(msg)
203                    subscriptions_with_creds = []
204                    for sub in subscriptions:
205                        if sub.get("topic") == "clob_user":
206                            sub_copy = sub.copy()
207                            sub_copy["clob_auth"] = creds.model_dump()
208                            subscriptions_with_creds.append(sub_copy)
209                        else:
210                            subscriptions_with_creds.append(sub)
211                    subscriptions = subscriptions_with_creds
212
213                payload = {
214                    "action": "subscribe",
215                    "subscriptions": subscriptions,
216                }
217
218                websocket.send_json(**payload)
219
220            elif event.name == "text":
221                process_event(event)

Connect to the live data websocket and subscribe to specified events.

Args: subscriptions: List of subscription configurations process_event: Callback function to process received events creds: ApiCreds for authentication if subscribing to clob_user topic

__author__ = 'Razvan Gheorghe'
__email__ = 'razvan@gheorghe.me'
__version__ = '0.3.9'