Extractors¶
Extractors retrieve data from various sources for ETL pipelines.
Overview¶
PyCharter provides 13 built-in extractors across three categories:
| Category | Extractors | Install Extra |
|---|---|---|
| Batch | HTTP, File, Database, CloudStorage, MongoDB | pycharter[etl] |
| Streaming | SSE, WebSocket, FileWatcher | pycharter[streaming] |
| Messaging | Kafka, RabbitMQ, SQS | pycharter[messaging] or pycharter[etl] (SQS) |
# Batch extractors (always available)
from pycharter import HTTPExtractor, FileExtractor, DatabaseExtractor, CloudStorageExtractor
# Streaming extractors (lazy-loaded)
from pycharter.etl_generator.extractors import SSEExtractor, WebSocketExtractor, FileWatcherExtractor
# Messaging extractors (lazy-loaded)
from pycharter.etl_generator.extractors import KafkaExtractor, RabbitMQExtractor, SQSExtractor
Batch Extractors¶
HTTPExtractor¶
Extract data from HTTP/REST APIs.
HTTPExtractor
¶
HTTPExtractor(
url: str | None = None,
base_url: str | None = None,
endpoint: str | None = None,
method: str = "GET",
headers: dict[str, str] | None = None,
params: dict[str, Any] | None = None,
body: Any | None = None,
response_path: str | None = None,
batch_size: int = 1000,
pagination: dict[str, Any] | None = None,
)
Bases: BaseExtractor
Extractor for HTTP/API data sources.
Supports two modes: 1. Programmatic API: >>> extractor = HTTPExtractor(url="https://api.example.com/users") >>> async for batch in extractor.extract(): ... process(batch)
- Config-driven (legacy): >>> extractor = HTTPExtractor() >>> async for batch in extractor.extract_streaming(config, params, headers): ... process(batch)
from_config
classmethod
¶
Create extractor from configuration dict.
extract
async
¶
Extract data from HTTP source.
Yields:
| Type | Description |
|---|---|
AsyncIterator[list[dict[str, Any]]]
|
Batches of records |
validate_config
¶
Validate HTTP extractor configuration.
extract_streaming
async
¶
extract_streaming(
extract_config: dict[str, Any],
params: dict[str, Any],
headers: dict[str, Any],
contract_dir: Any | None = None,
batch_size: int = 1000,
max_records: int | None = None,
config_context: dict[str, Any] | None = None,
) -> AsyncIterator[list[dict[str, Any]]]
Extract data from HTTP/API source with pagination support.
Yields batches as they are extracted, preventing memory exhaustion for large datasets.
Examples¶
# Simple GET
extractor = HTTPExtractor(url="https://api.example.com/users")
# With headers
extractor = HTTPExtractor(
url="https://api.example.com/users",
method="GET",
headers={"Authorization": "Bearer ${API_KEY}"}
)
# POST with body
extractor = HTTPExtractor(
url="https://api.example.com/search",
method="POST",
body={"query": "active", "limit": 100}
)
# With pagination
extractor = HTTPExtractor(
url="https://api.example.com/users",
pagination={
"type": "offset",
"param": "offset",
"limit_param": "limit",
"limit": 100
}
)
Path parameters and record injection¶
When using base_url and api_endpoint with placeholders (e.g. /v2/aggs/ticker/{symbol}/range/1/day/{start_date}/{end_date}), pass the values as params. The HTTP extractor will:
- Substitute
{symbol},{start_date},{end_date}in the URL from the params. - Inject those values into every extracted record so each row has e.g. a
symbolfield. This avoids extra logic in transforms or loaders to attach the symbol to each record.
Config-driven example:
# extract.yaml
type: http
base_url: https://api.example.com
api_endpoint: /v2/aggs/ticker/{symbol}/range/1/day/{start_date}/{end_date}
response_path: results
When you run with symbol=AAPL, start_date=2024-01-01, end_date=2024-01-31, each record in the response array will get symbol, start_date, and end_date fields added (only if not already present). Use get_path_param_names(api_endpoint) from pycharter.etl_generator.extractors to get the list of placeholder names from an endpoint string.
FileExtractor¶
Extract data from local files.
FileExtractor
¶
FileExtractor(
path: str | None = None,
file_format: str | None = None,
batch_size: int = 1000,
max_records: int | None = None,
)
Bases: BaseExtractor
Extractor for file-based data sources.
Supports two modes: 1. Programmatic API: >>> extractor = FileExtractor(path="data.csv") >>> async for batch in extractor.extract(): ... process(batch)
- Config-driven: >>> extractor = FileExtractor() >>> async for batch in extractor.extract_streaming(config, params, headers): ... process(batch)
from_config
classmethod
¶
Create extractor from configuration dict.
extract
async
¶
Extract data from file.
Yields:
| Type | Description |
|---|---|
AsyncIterator[list[dict[str, Any]]]
|
Batches of records |
validate_config
¶
Validate file extractor configuration.
extract_streaming
async
¶
extract_streaming(
extract_config: dict[str, Any],
params: dict[str, Any],
headers: dict[str, Any],
contract_dir: Any | None = None,
batch_size: int = 1000,
max_records: int | None = None,
config_context: dict[str, Any] | None = None,
) -> AsyncIterator[list[dict[str, Any]]]
Extract data from file(s) in batches.
Supports: - Single files - Glob patterns for multiple files - Compressed files (gzip, zip)
Supported Formats¶
| Format | Extension | Notes |
|---|---|---|
| JSON | .json |
Single array or object |
| JSON Lines | .jsonl |
One JSON object per line |
| CSV | .csv |
With header row |
| Parquet | .parquet |
Columnar format |
| Excel | .xlsx, .xls |
Requires openpyxl |
| XML | .xml |
Requires lxml |
Examples¶
# JSON file
extractor = FileExtractor(path="data/users.json", file_format="json")
# CSV file
extractor = FileExtractor(path="data/users.csv", file_format="csv")
# Parquet file
extractor = FileExtractor(path="data/users.parquet", file_format="parquet")
# Glob pattern (multiple files)
extractor = FileExtractor(path="data/*.json", file_format="json")
# Excel file
extractor = FileExtractor(
path="data/users.xlsx",
file_format="excel",
sheet_name="Sheet1"
)
DatabaseExtractor¶
Extract data from SQL databases.
DatabaseExtractor
¶
DatabaseExtractor(
connection_string: str | None = None,
query: str | None = None,
query_params: dict[str, Any] | None = None,
batch_size: int = 1000,
max_records: int | None = None,
ssh_tunnel: dict[str, Any] | None = None,
)
Bases: BaseExtractor
Extractor for database data sources.
Supports two modes: 1. Programmatic API: >>> extractor = DatabaseExtractor(connection_string="...", query="SELECT * FROM users") >>> async for batch in extractor.extract(): ... process(batch)
- Config-driven: >>> extractor = DatabaseExtractor() >>> async for batch in extractor.extract_streaming(config, params, headers): ... process(batch)
from_config
classmethod
¶
Create extractor from configuration dict.
extract
async
¶
Extract data from database.
Yields:
| Type | Description |
|---|---|
AsyncIterator[list[dict[str, Any]]]
|
Batches of records |
validate_config
¶
Validate database extractor configuration.
extract_streaming
async
¶
extract_streaming(
extract_config: dict[str, Any],
params: dict[str, Any],
headers: dict[str, Any],
contract_dir: Any | None = None,
batch_size: int = 1000,
max_records: int | None = None,
config_context: dict[str, Any] | None = None,
) -> AsyncIterator[list[dict[str, Any]]]
Extract data from database using SQL query.
Supports parameterized queries and streaming results for large datasets.
Supported Databases¶
| Database | Connection String Format |
|---|---|
| PostgreSQL | postgresql://user:pass@host/db |
| MySQL | mysql://user:pass@host/db |
| SQLite | sqlite:///path/to/db.sqlite |
| SQL Server | mssql+pyodbc://user:pass@host/db |
| Oracle | oracle://user:pass@host/db |
Examples¶
# PostgreSQL
extractor = DatabaseExtractor(
connection_string="postgresql://user:pass@localhost/db",
query="SELECT * FROM users WHERE active = true"
)
# With parameters
extractor = DatabaseExtractor(
connection_string="postgresql://user:pass@localhost/db",
query="SELECT * FROM users WHERE created_at > %(start_date)s",
params={"start_date": "2024-01-01"}
)
# With SSH tunnel
extractor = DatabaseExtractor(
connection_string="postgresql://user:pass@localhost/db",
ssh_tunnel={
"host": "bastion.example.com",
"port": 22,
"username": "deploy",
"key_file": "~/.ssh/id_rsa"
}
)
CloudStorageExtractor¶
Extract data from cloud storage (S3, GCS, Azure Blob).
CloudStorageExtractor
¶
CloudStorageExtractor(
provider: str | None = None,
bucket: str | None = None,
path: str | None = None,
credentials: dict[str, Any] | None = None,
file_format: str | None = None,
batch_size: int = 1000,
max_records: int | None = None,
)
Bases: BaseExtractor
Extractor for cloud storage data sources.
Supports two modes: 1. Programmatic API: >>> extractor = CloudStorageExtractor(provider="s3", bucket="my-bucket", path="data/") >>> async for batch in extractor.extract(): ... process(batch)
- Config-driven: >>> extractor = CloudStorageExtractor() >>> async for batch in extractor.extract_streaming(config, params, headers): ... process(batch)
from_config
classmethod
¶
Create extractor from configuration dict.
extract
async
¶
Extract data from cloud storage.
Yields:
| Type | Description |
|---|---|
AsyncIterator[list[dict[str, Any]]]
|
Batches of records |
validate_config
¶
Validate cloud storage extractor configuration.
extract_streaming
async
¶
extract_streaming(
extract_config: dict[str, Any],
params: dict[str, Any],
headers: dict[str, Any],
contract_dir: Any | None = None,
batch_size: int = 1000,
max_records: int | None = None,
config_context: dict[str, Any] | None = None,
) -> AsyncIterator[list[dict[str, Any]]]
Extract data from cloud storage.
Downloads files from cloud storage and processes them using FileExtractor. Supports single files and prefixes (for multiple files).
Examples¶
# AWS S3
extractor = CloudStorageExtractor(
provider="s3",
bucket="my-bucket",
path="data/users.json",
credentials={
"aws_access_key_id": "${AWS_KEY}",
"aws_secret_access_key": "${AWS_SECRET}",
"region": "us-east-1"
}
)
# Google Cloud Storage
extractor = CloudStorageExtractor(
provider="gcs",
bucket="my-bucket",
path="data/users.json"
# Uses default credentials
)
# Azure Blob Storage
extractor = CloudStorageExtractor(
provider="azure",
container="my-container",
path="data/users.json",
credentials={
"connection_string": "${AZURE_STORAGE_CONNECTION_STRING}"
}
)
Streaming Extractors¶
Streaming extractors yield batches indefinitely from real-time data sources. They support shutdown_event, max_records, and max_batches for graceful lifecycle control.
Install extras
SSE uses httpx (core dep). WebSocket and FileWatcher require pip install pycharter[streaming].
SSEExtractor¶
Extract data from Server-Sent Events streams. Supports automatic reconnection with Last-Event-ID header.
from pycharter.etl_generator.extractors import SSEExtractor
extractor = SSEExtractor(
url="https://stream.example.com/events",
event_filter="trade", # only yield "trade" events
data_format="json",
batch_size=500,
batch_timeout=2.0,
reconnect=True,
max_reconnects=10,
max_records=10000, # stop after 10k records
)
pipeline = Pipeline(extractor) | Rename({...}) | PostgresLoader(...)
type: sse
url: https://stream.example.com/events
event_filter: trade
data_format: json
batch_size: 500
batch_timeout: 2.0
max_records: 10000
WebSocketExtractor¶
Extract data from WebSocket streams. Requires pip install pycharter[streaming].
from pycharter.etl_generator.extractors import WebSocketExtractor
extractor = WebSocketExtractor(
url="wss://ws.example.com/feed",
subscribe_messages=[{"action": "subscribe", "channel": "trades"}],
data_format="json",
batch_size=1000,
ping_interval=20.0,
)
type: websocket
url: wss://ws.example.com/feed
subscribe_messages:
- action: subscribe
channel: trades
data_format: json
batch_size: 1000
FileWatcherExtractor¶
Monitor a directory for new files and extract their contents. Optionally uses watchfiles for efficient filesystem notifications (falls back to polling).
from pycharter.etl_generator.extractors import FileWatcherExtractor
extractor = FileWatcherExtractor(
watch_dir="/data/inbox",
patterns=["*.json", "*.csv"],
file_format="json",
recursive=True,
process_existing=True,
move_after_process="/data/archive",
)
type: file_watcher
watch_dir: /data/inbox
patterns: ["*.json"]
file_format: json
recursive: true
process_existing: true
move_after_process: /data/archive
Messaging Extractors¶
Messaging extractors consume from message queues with deferred acknowledgment — messages are only acknowledged (committed/deleted) after the batch is successfully loaded. This provides at-least-once delivery guarantees.
All messaging extractors implement the AckableExtractor protocol. The pipeline automatically calls acknowledge() after each batch load.
Install extras
KafkaExtractor¶
Extract data from Apache Kafka topics using aiokafka. Supports consumer groups, SASL/SSL authentication, and offset management.
from pycharter.etl_generator.extractors import KafkaExtractor
extractor = KafkaExtractor(
topics=["orders", "payments"],
bootstrap_servers="broker1:9092,broker2:9092",
consumer_group="etl-orders",
auto_offset_reset="earliest",
batch_size=500,
batch_timeout=2.0,
# SASL authentication (optional)
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
sasl_username="${KAFKA_USER}",
sasl_password="${KAFKA_PASS}",
)
pipeline = Pipeline(extractor) | Rename({...}) | PostgresLoader(...)
result = await pipeline.run()
# Don't forget to close when done
await extractor.close()
type: kafka
topics: [orders, payments]
bootstrap_servers: ${KAFKA_BROKERS}
consumer_group: etl-orders
auto_offset_reset: earliest
batch_size: 500
batch_timeout: 2.0
Acknowledgment behavior:
success=True— commits consumer group offsets for the batchsuccess=False— seeks back to the earliest offset in the batch (enables retry)
RabbitMQExtractor¶
Extract data from RabbitMQ queues using aio-pika. Supports prefetch for backpressure control.
from pycharter.etl_generator.extractors import RabbitMQExtractor
extractor = RabbitMQExtractor(
queue="order_events",
url="amqp://user:pass@rabbitmq-host/",
prefetch_count=200,
batch_size=1000,
batch_timeout=5.0,
)
type: rabbitmq
queue: order_events
url: ${RABBITMQ_URL}
prefetch_count: 200
batch_size: 1000
Acknowledgment behavior:
success=True— callsmessage.ack()for each message in the batchsuccess=False— callsmessage.nack(requeue=True)to requeue messages
SQSExtractor¶
Extract data from AWS SQS queues using boto3. Uses long polling for efficiency and respects the SQS 10-message-per-poll limit.
from pycharter.etl_generator.extractors import SQSExtractor
extractor = SQSExtractor(
queue_url="https://sqs.us-east-1.amazonaws.com/123456789/orders",
region="us-east-1",
wait_time_seconds=20, # long polling
visibility_timeout=300, # 5 min processing window
batch_size=1000,
)
type: sqs
queue_url: ${SQS_QUEUE_URL}
region: us-east-1
wait_time_seconds: 20
visibility_timeout: 600
Acknowledgment behavior:
success=True— callsdelete_message_batchto remove messages from the queuesuccess=False— callschange_message_visibility_batchwithVisibilityTimeout=0to make messages immediately available for retry
AckableExtractor Protocol¶
Messaging extractors implement the AckableExtractor protocol, which extends Extractor with an acknowledge() method:
from pycharter.etl_generator.protocols import AckableExtractor
# The pipeline automatically handles this — you don't need to call it manually.
# But you can check if an extractor supports acknowledgment:
if isinstance(extractor, AckableExtractor):
await extractor.acknowledge(batch_index=0, success=True)
The Pipeline.run() method automatically calls acknowledge() after each batch:
- Load succeeds —
acknowledge(batch_index, success=True) - Load fails —
acknowledge(batch_index, success=False) - Dry run —
acknowledge(batch_index, success=True)(so messages aren't redelivered) - Ack failure — logged as a warning; pipeline continues processing
Non-ackable extractors (HTTP, File, Database, etc.) are unaffected — the pipeline skips the acknowledge step.
ExtractorFactory¶
Create extractors from configuration:
from pycharter.etl_generator.extractors import ExtractorFactory
# Create from config
extractor = ExtractorFactory.create({
"type": "http",
"url": "https://api.example.com/users"
})
# List all available types
print(ExtractorFactory.list_types())
# ['cloud_storage', 'database', 'file', 'file_watcher', 'http',
# 'kafka', 'mongo', 'mongodb', 'rabbitmq', 'sqs', 'sse', 'websocket']
# Register a custom extractor
ExtractorFactory.register("custom", MyExtractor)
Custom Extractors¶
Extend BaseExtractor to create custom extractors:
from pycharter.etl_generator.extractors import BaseExtractor
from typing import Any, AsyncIterator
class CustomExtractor(BaseExtractor):
def __init__(self, config: dict):
self.config = config
async def extract_streaming(
self,
extract_config: dict[str, Any],
params: dict[str, Any],
headers: dict[str, Any],
**kwargs,
) -> AsyncIterator[list[dict[str, Any]]]:
# Implement extraction logic
data = await self.fetch_data()
yield data
def validate_config(self, extract_config: dict[str, Any]) -> None:
if "required_field" not in extract_config:
raise ValueError("Missing required_field")
# Register
ExtractorFactory.register("custom", CustomExtractor)