Skip to content

Extractors

Extractors retrieve data from various sources for ETL pipelines.

Overview

PyCharter provides 13 built-in extractors across three categories:

Category Extractors Install Extra
Batch HTTP, File, Database, CloudStorage, MongoDB pycharter[etl]
Streaming SSE, WebSocket, FileWatcher pycharter[streaming]
Messaging Kafka, RabbitMQ, SQS pycharter[messaging] or pycharter[etl] (SQS)
# Batch extractors (always available)
from pycharter import HTTPExtractor, FileExtractor, DatabaseExtractor, CloudStorageExtractor

# Streaming extractors (lazy-loaded)
from pycharter.etl_generator.extractors import SSEExtractor, WebSocketExtractor, FileWatcherExtractor

# Messaging extractors (lazy-loaded)
from pycharter.etl_generator.extractors import KafkaExtractor, RabbitMQExtractor, SQSExtractor

Batch Extractors

HTTPExtractor

Extract data from HTTP/REST APIs.

HTTPExtractor

HTTPExtractor(
    url: str | None = None,
    base_url: str | None = None,
    endpoint: str | None = None,
    method: str = "GET",
    headers: dict[str, str] | None = None,
    params: dict[str, Any] | None = None,
    body: Any | None = None,
    response_path: str | None = None,
    batch_size: int = 1000,
    pagination: dict[str, Any] | None = None,
)

Bases: BaseExtractor

Extractor for HTTP/API data sources.

Supports two modes: 1. Programmatic API: >>> extractor = HTTPExtractor(url="https://api.example.com/users") >>> async for batch in extractor.extract(): ... process(batch)

  1. Config-driven (legacy): >>> extractor = HTTPExtractor() >>> async for batch in extractor.extract_streaming(config, params, headers): ... process(batch)

from_config classmethod

from_config(config: dict[str, Any]) -> 'HTTPExtractor'

Create extractor from configuration dict.

extract async

extract(**params) -> AsyncIterator[list[dict[str, Any]]]

Extract data from HTTP source.

Yields:

Type Description
AsyncIterator[list[dict[str, Any]]]

Batches of records

validate_config

validate_config(extract_config: dict[str, Any]) -> None

Validate HTTP extractor configuration.

extract_streaming async

extract_streaming(
    extract_config: dict[str, Any],
    params: dict[str, Any],
    headers: dict[str, Any],
    contract_dir: Any | None = None,
    batch_size: int = 1000,
    max_records: int | None = None,
    config_context: dict[str, Any] | None = None,
) -> AsyncIterator[list[dict[str, Any]]]

Extract data from HTTP/API source with pagination support.

Yields batches as they are extracted, preventing memory exhaustion for large datasets.

Examples

# Simple GET
extractor = HTTPExtractor(url="https://api.example.com/users")

# With headers
extractor = HTTPExtractor(
    url="https://api.example.com/users",
    method="GET",
    headers={"Authorization": "Bearer ${API_KEY}"}
)

# POST with body
extractor = HTTPExtractor(
    url="https://api.example.com/search",
    method="POST",
    body={"query": "active", "limit": 100}
)

# With pagination
extractor = HTTPExtractor(
    url="https://api.example.com/users",
    pagination={
        "type": "offset",
        "param": "offset",
        "limit_param": "limit",
        "limit": 100
    }
)

Path parameters and record injection

When using base_url and api_endpoint with placeholders (e.g. /v2/aggs/ticker/{symbol}/range/1/day/{start_date}/{end_date}), pass the values as params. The HTTP extractor will:

  1. Substitute {symbol}, {start_date}, {end_date} in the URL from the params.
  2. Inject those values into every extracted record so each row has e.g. a symbol field. This avoids extra logic in transforms or loaders to attach the symbol to each record.

Config-driven example:

# extract.yaml
type: http
base_url: https://api.example.com
api_endpoint: /v2/aggs/ticker/{symbol}/range/1/day/{start_date}/{end_date}
response_path: results

When you run with symbol=AAPL, start_date=2024-01-01, end_date=2024-01-31, each record in the response array will get symbol, start_date, and end_date fields added (only if not already present). Use get_path_param_names(api_endpoint) from pycharter.etl_generator.extractors to get the list of placeholder names from an endpoint string.

FileExtractor

Extract data from local files.

FileExtractor

FileExtractor(
    path: str | None = None,
    file_format: str | None = None,
    batch_size: int = 1000,
    max_records: int | None = None,
)

Bases: BaseExtractor

Extractor for file-based data sources.

Supports two modes: 1. Programmatic API: >>> extractor = FileExtractor(path="data.csv") >>> async for batch in extractor.extract(): ... process(batch)

  1. Config-driven: >>> extractor = FileExtractor() >>> async for batch in extractor.extract_streaming(config, params, headers): ... process(batch)

from_config classmethod

from_config(config: dict[str, Any]) -> 'FileExtractor'

Create extractor from configuration dict.

extract async

extract(**params) -> AsyncIterator[list[dict[str, Any]]]

Extract data from file.

Yields:

Type Description
AsyncIterator[list[dict[str, Any]]]

Batches of records

validate_config

validate_config(extract_config: dict[str, Any]) -> None

Validate file extractor configuration.

extract_streaming async

extract_streaming(
    extract_config: dict[str, Any],
    params: dict[str, Any],
    headers: dict[str, Any],
    contract_dir: Any | None = None,
    batch_size: int = 1000,
    max_records: int | None = None,
    config_context: dict[str, Any] | None = None,
) -> AsyncIterator[list[dict[str, Any]]]

Extract data from file(s) in batches.

Supports: - Single files - Glob patterns for multiple files - Compressed files (gzip, zip)

Supported Formats

Format Extension Notes
JSON .json Single array or object
JSON Lines .jsonl One JSON object per line
CSV .csv With header row
Parquet .parquet Columnar format
Excel .xlsx, .xls Requires openpyxl
XML .xml Requires lxml

Examples

# JSON file
extractor = FileExtractor(path="data/users.json", file_format="json")

# CSV file
extractor = FileExtractor(path="data/users.csv", file_format="csv")

# Parquet file
extractor = FileExtractor(path="data/users.parquet", file_format="parquet")

# Glob pattern (multiple files)
extractor = FileExtractor(path="data/*.json", file_format="json")

# Excel file
extractor = FileExtractor(
    path="data/users.xlsx",
    file_format="excel",
    sheet_name="Sheet1"
)

DatabaseExtractor

Extract data from SQL databases.

DatabaseExtractor

DatabaseExtractor(
    connection_string: str | None = None,
    query: str | None = None,
    query_params: dict[str, Any] | None = None,
    batch_size: int = 1000,
    max_records: int | None = None,
    ssh_tunnel: dict[str, Any] | None = None,
)

Bases: BaseExtractor

Extractor for database data sources.

Supports two modes: 1. Programmatic API: >>> extractor = DatabaseExtractor(connection_string="...", query="SELECT * FROM users") >>> async for batch in extractor.extract(): ... process(batch)

  1. Config-driven: >>> extractor = DatabaseExtractor() >>> async for batch in extractor.extract_streaming(config, params, headers): ... process(batch)

from_config classmethod

from_config(config: dict[str, Any]) -> 'DatabaseExtractor'

Create extractor from configuration dict.

extract async

extract(**params) -> AsyncIterator[list[dict[str, Any]]]

Extract data from database.

Yields:

Type Description
AsyncIterator[list[dict[str, Any]]]

Batches of records

validate_config

validate_config(extract_config: dict[str, Any]) -> None

Validate database extractor configuration.

extract_streaming async

extract_streaming(
    extract_config: dict[str, Any],
    params: dict[str, Any],
    headers: dict[str, Any],
    contract_dir: Any | None = None,
    batch_size: int = 1000,
    max_records: int | None = None,
    config_context: dict[str, Any] | None = None,
) -> AsyncIterator[list[dict[str, Any]]]

Extract data from database using SQL query.

Supports parameterized queries and streaming results for large datasets.

Supported Databases

Database Connection String Format
PostgreSQL postgresql://user:pass@host/db
MySQL mysql://user:pass@host/db
SQLite sqlite:///path/to/db.sqlite
SQL Server mssql+pyodbc://user:pass@host/db
Oracle oracle://user:pass@host/db

Examples

# PostgreSQL
extractor = DatabaseExtractor(
    connection_string="postgresql://user:pass@localhost/db",
    query="SELECT * FROM users WHERE active = true"
)

# With parameters
extractor = DatabaseExtractor(
    connection_string="postgresql://user:pass@localhost/db",
    query="SELECT * FROM users WHERE created_at > %(start_date)s",
    params={"start_date": "2024-01-01"}
)

# With SSH tunnel
extractor = DatabaseExtractor(
    connection_string="postgresql://user:pass@localhost/db",
    ssh_tunnel={
        "host": "bastion.example.com",
        "port": 22,
        "username": "deploy",
        "key_file": "~/.ssh/id_rsa"
    }
)

CloudStorageExtractor

Extract data from cloud storage (S3, GCS, Azure Blob).

CloudStorageExtractor

CloudStorageExtractor(
    provider: str | None = None,
    bucket: str | None = None,
    path: str | None = None,
    credentials: dict[str, Any] | None = None,
    file_format: str | None = None,
    batch_size: int = 1000,
    max_records: int | None = None,
)

Bases: BaseExtractor

Extractor for cloud storage data sources.

Supports two modes: 1. Programmatic API: >>> extractor = CloudStorageExtractor(provider="s3", bucket="my-bucket", path="data/") >>> async for batch in extractor.extract(): ... process(batch)

  1. Config-driven: >>> extractor = CloudStorageExtractor() >>> async for batch in extractor.extract_streaming(config, params, headers): ... process(batch)

from_config classmethod

from_config(
    config: dict[str, Any],
) -> "CloudStorageExtractor"

Create extractor from configuration dict.

extract async

extract(**params) -> AsyncIterator[list[dict[str, Any]]]

Extract data from cloud storage.

Yields:

Type Description
AsyncIterator[list[dict[str, Any]]]

Batches of records

validate_config

validate_config(extract_config: dict[str, Any]) -> None

Validate cloud storage extractor configuration.

extract_streaming async

extract_streaming(
    extract_config: dict[str, Any],
    params: dict[str, Any],
    headers: dict[str, Any],
    contract_dir: Any | None = None,
    batch_size: int = 1000,
    max_records: int | None = None,
    config_context: dict[str, Any] | None = None,
) -> AsyncIterator[list[dict[str, Any]]]

Extract data from cloud storage.

Downloads files from cloud storage and processes them using FileExtractor. Supports single files and prefixes (for multiple files).

Examples

# AWS S3
extractor = CloudStorageExtractor(
    provider="s3",
    bucket="my-bucket",
    path="data/users.json",
    credentials={
        "aws_access_key_id": "${AWS_KEY}",
        "aws_secret_access_key": "${AWS_SECRET}",
        "region": "us-east-1"
    }
)

# Google Cloud Storage
extractor = CloudStorageExtractor(
    provider="gcs",
    bucket="my-bucket",
    path="data/users.json"
    # Uses default credentials
)

# Azure Blob Storage
extractor = CloudStorageExtractor(
    provider="azure",
    container="my-container",
    path="data/users.json",
    credentials={
        "connection_string": "${AZURE_STORAGE_CONNECTION_STRING}"
    }
)

Streaming Extractors

Streaming extractors yield batches indefinitely from real-time data sources. They support shutdown_event, max_records, and max_batches for graceful lifecycle control.

Install extras

SSE uses httpx (core dep). WebSocket and FileWatcher require pip install pycharter[streaming].

SSEExtractor

Extract data from Server-Sent Events streams. Supports automatic reconnection with Last-Event-ID header.

from pycharter.etl_generator.extractors import SSEExtractor

extractor = SSEExtractor(
    url="https://stream.example.com/events",
    event_filter="trade",        # only yield "trade" events
    data_format="json",
    batch_size=500,
    batch_timeout=2.0,
    reconnect=True,
    max_reconnects=10,
    max_records=10000,           # stop after 10k records
)

pipeline = Pipeline(extractor) | Rename({...}) | PostgresLoader(...)
extract.yaml
type: sse
url: https://stream.example.com/events
event_filter: trade
data_format: json
batch_size: 500
batch_timeout: 2.0
max_records: 10000

WebSocketExtractor

Extract data from WebSocket streams. Requires pip install pycharter[streaming].

from pycharter.etl_generator.extractors import WebSocketExtractor

extractor = WebSocketExtractor(
    url="wss://ws.example.com/feed",
    subscribe_messages=[{"action": "subscribe", "channel": "trades"}],
    data_format="json",
    batch_size=1000,
    ping_interval=20.0,
)
extract.yaml
type: websocket
url: wss://ws.example.com/feed
subscribe_messages:
  - action: subscribe
    channel: trades
data_format: json
batch_size: 1000

FileWatcherExtractor

Monitor a directory for new files and extract their contents. Optionally uses watchfiles for efficient filesystem notifications (falls back to polling).

from pycharter.etl_generator.extractors import FileWatcherExtractor

extractor = FileWatcherExtractor(
    watch_dir="/data/inbox",
    patterns=["*.json", "*.csv"],
    file_format="json",
    recursive=True,
    process_existing=True,
    move_after_process="/data/archive",
)
extract.yaml
type: file_watcher
watch_dir: /data/inbox
patterns: ["*.json"]
file_format: json
recursive: true
process_existing: true
move_after_process: /data/archive

Messaging Extractors

Messaging extractors consume from message queues with deferred acknowledgment — messages are only acknowledged (committed/deleted) after the batch is successfully loaded. This provides at-least-once delivery guarantees.

All messaging extractors implement the AckableExtractor protocol. The pipeline automatically calls acknowledge() after each batch load.

Install extras

pip install pycharter[kafka]       # Kafka only
pip install pycharter[rabbitmq]    # RabbitMQ only
pip install pycharter[messaging]   # Kafka + RabbitMQ
pip install pycharter[etl]         # SQS (via boto3)

KafkaExtractor

Extract data from Apache Kafka topics using aiokafka. Supports consumer groups, SASL/SSL authentication, and offset management.

from pycharter.etl_generator.extractors import KafkaExtractor

extractor = KafkaExtractor(
    topics=["orders", "payments"],
    bootstrap_servers="broker1:9092,broker2:9092",
    consumer_group="etl-orders",
    auto_offset_reset="earliest",
    batch_size=500,
    batch_timeout=2.0,
    # SASL authentication (optional)
    security_protocol="SASL_SSL",
    sasl_mechanism="PLAIN",
    sasl_username="${KAFKA_USER}",
    sasl_password="${KAFKA_PASS}",
)

pipeline = Pipeline(extractor) | Rename({...}) | PostgresLoader(...)
result = await pipeline.run()

# Don't forget to close when done
await extractor.close()
extract.yaml
type: kafka
topics: [orders, payments]
bootstrap_servers: ${KAFKA_BROKERS}
consumer_group: etl-orders
auto_offset_reset: earliest
batch_size: 500
batch_timeout: 2.0

Acknowledgment behavior:

  • success=True — commits consumer group offsets for the batch
  • success=False — seeks back to the earliest offset in the batch (enables retry)

RabbitMQExtractor

Extract data from RabbitMQ queues using aio-pika. Supports prefetch for backpressure control.

from pycharter.etl_generator.extractors import RabbitMQExtractor

extractor = RabbitMQExtractor(
    queue="order_events",
    url="amqp://user:pass@rabbitmq-host/",
    prefetch_count=200,
    batch_size=1000,
    batch_timeout=5.0,
)
extract.yaml
type: rabbitmq
queue: order_events
url: ${RABBITMQ_URL}
prefetch_count: 200
batch_size: 1000

Acknowledgment behavior:

  • success=True — calls message.ack() for each message in the batch
  • success=False — calls message.nack(requeue=True) to requeue messages

SQSExtractor

Extract data from AWS SQS queues using boto3. Uses long polling for efficiency and respects the SQS 10-message-per-poll limit.

from pycharter.etl_generator.extractors import SQSExtractor

extractor = SQSExtractor(
    queue_url="https://sqs.us-east-1.amazonaws.com/123456789/orders",
    region="us-east-1",
    wait_time_seconds=20,       # long polling
    visibility_timeout=300,     # 5 min processing window
    batch_size=1000,
)
extract.yaml
type: sqs
queue_url: ${SQS_QUEUE_URL}
region: us-east-1
wait_time_seconds: 20
visibility_timeout: 600

Acknowledgment behavior:

  • success=True — calls delete_message_batch to remove messages from the queue
  • success=False — calls change_message_visibility_batch with VisibilityTimeout=0 to make messages immediately available for retry

AckableExtractor Protocol

Messaging extractors implement the AckableExtractor protocol, which extends Extractor with an acknowledge() method:

from pycharter.etl_generator.protocols import AckableExtractor

# The pipeline automatically handles this — you don't need to call it manually.
# But you can check if an extractor supports acknowledgment:
if isinstance(extractor, AckableExtractor):
    await extractor.acknowledge(batch_index=0, success=True)

The Pipeline.run() method automatically calls acknowledge() after each batch:

  • Load succeedsacknowledge(batch_index, success=True)
  • Load failsacknowledge(batch_index, success=False)
  • Dry runacknowledge(batch_index, success=True) (so messages aren't redelivered)
  • Ack failure — logged as a warning; pipeline continues processing

Non-ackable extractors (HTTP, File, Database, etc.) are unaffected — the pipeline skips the acknowledge step.


ExtractorFactory

Create extractors from configuration:

from pycharter.etl_generator.extractors import ExtractorFactory

# Create from config
extractor = ExtractorFactory.create({
    "type": "http",
    "url": "https://api.example.com/users"
})

# List all available types
print(ExtractorFactory.list_types())
# ['cloud_storage', 'database', 'file', 'file_watcher', 'http',
#  'kafka', 'mongo', 'mongodb', 'rabbitmq', 'sqs', 'sse', 'websocket']

# Register a custom extractor
ExtractorFactory.register("custom", MyExtractor)

Custom Extractors

Extend BaseExtractor to create custom extractors:

from pycharter.etl_generator.extractors import BaseExtractor
from typing import Any, AsyncIterator

class CustomExtractor(BaseExtractor):
    def __init__(self, config: dict):
        self.config = config

    async def extract_streaming(
        self,
        extract_config: dict[str, Any],
        params: dict[str, Any],
        headers: dict[str, Any],
        **kwargs,
    ) -> AsyncIterator[list[dict[str, Any]]]:
        # Implement extraction logic
        data = await self.fetch_data()
        yield data

    def validate_config(self, extract_config: dict[str, Any]) -> None:
        if "required_field" not in extract_config:
            raise ValueError("Missing required_field")

# Register
ExtractorFactory.register("custom", CustomExtractor)

See Also