Metadata-Version: 2.1
Name: pgpq
Version: 0.9.0
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Topic :: Software Development
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Dist: pyarrow>=11
Requires-Dist: typing-extensions>=3; python_version < '3.8'
Requires-Dist: pytest >=7.0.0; extra == 'test'
Requires-Dist: maturin >= 0.14.0; extra == 'test'
Requires-Dist: pre-commit>=2.16.0; extra == 'test'
Requires-Dist: testing.postgresql >= 1.3.0; extra == 'test'
Requires-Dist: psycopg[binary] >= 3.1.8; extra == 'test'
Requires-Dist: pyarrow >= 11.0.0; extra == 'test'
Requires-Dist: jupyter >=1.0.0; extra == 'bench'
Requires-Dist: requests; extra == 'bench'
Provides-Extra: test
Provides-Extra: bench
Summary: Arrow -> PostgreSQL encoder
Author: Adrian Garcia Badaracco
License: MIT
Requires-Python: >=3.7
Description-Content-Type: text/markdown; charset=UTF-8; variant=GFM
Project-URL: homepage, https://github.com/adriangb/pgpq
Project-URL: documentation, https://github.com/adriangb/pgpq/README.md
Project-URL: repository, https://github.com/adriangb/pgpq

# pgpq

Convert PyArrow RecordBatches to Postgres' native binary format.

## Usage

```python
"""Example for README.md"""
from tempfile import mkdtemp
import psycopg
import pyarrow.dataset as ds
import requests
from pgpq import ArrowToPostgresBinaryEncoder

# let's get some example data
tmpdir = mkdtemp()
with open(f"{tmpdir}/yellow_tripdata_2023-01.parquet", mode="wb") as f:
    resp = requests.get(
        "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet"
    )
    resp.raise_for_status()
    f.write(resp.content)

# load an arrow dataset
# arrow can load datasets from partitioned parquet files locally or in S3/GCS
# it handles buffering, matching globs, etc.
dataset = ds.dataset(tmpdir)

# create an encoder object which will do the encoding
# and give us the expected Postgres table schema
encoder = ArrowToPostgresBinaryEncoder(dataset.schema)
# get the expected Postgres destination schema
# note that this is _not_ the same as the incoming arrow schema
# and not necessarily the schema of your permanent table
# instead it's the schema of the data that will be sent over the wire
# which for example does not have timezones on any timestamps
pg_schema = encoder.schema()
# assemble ddl for a temporary table
# it's often a good idea to bulk load into a temp table to:
# (1) Avoid indexes
# (2) Stay in-memory as long as possible
# (3) Be more flexible with types
#     (you can't load a SMALLINT into a BIGINT column without casting)
cols = [f'"{col_name}" {col.data_type.ddl()}' for col_name, col in pg_schema.columns]
ddl = f"CREATE TEMP TABLE data ({','.join(cols)})"

with psycopg.connect("postgres://postgres:postgres@localhost:5432/postgres") as conn:
    with conn.cursor() as cursor:
        cursor.execute(ddl)  # type: ignore
        with cursor.copy("COPY data FROM STDIN WITH (FORMAT BINARY)") as copy:
            copy.write(encoder.write_header())
            for batch in dataset.to_batches():
                copy.write(encoder.write_batch(batch))
            copy.write(encoder.finish())
        # load into your actual table, possibly doing type casts
        # cursor.execute("INSERT INTO \"table\" SELECT * FROM data")
```

