Streaming and Messaging Extractors¶
This guide covers PyCharter's real-time data extraction capabilities: streaming extractors for continuous data feeds and messaging extractors for message queue consumption with at-least-once delivery guarantees.
Overview¶
| Extractor | Source | Install Extra | Acknowledgment |
|---|---|---|---|
| SSEExtractor | Server-Sent Events | (core) | No |
| WebSocketExtractor | WebSocket streams | pycharter[streaming] |
No |
| FileWatcherExtractor | Directory monitoring | pycharter[streaming] |
No |
| KafkaExtractor | Apache Kafka | pycharter[kafka] |
Yes (offset commit) |
| RabbitMQExtractor | RabbitMQ | pycharter[rabbitmq] |
Yes (ack/nack) |
| SQSExtractor | AWS SQS | pycharter[etl] |
Yes (delete/visibility) |
All streaming and messaging extractors share common controls:
batch_size— maximum records per yielded batchbatch_timeout— seconds before yielding a partial batchmax_records— stop after this many total recordsmax_batches— stop after yielding this many batchesshutdown_event—asyncio.Eventto signal graceful shutdown
Streaming Extractors¶
Streaming extractors yield batches indefinitely from real-time data sources. They reconnect automatically on connection loss and use the BatchAccumulator for efficient memory-bounded batching.
SSE (Server-Sent Events)¶
SSE is ideal for consuming event streams from HTTP endpoints. Uses httpx (a core dependency — no extra install needed).
import asyncio
from pycharter import Pipeline
from pycharter.etl_generator.extractors import SSEExtractor
shutdown = asyncio.Event()
extractor = SSEExtractor(
url="https://stream.example.com/events",
headers={"Authorization": "Bearer ${TOKEN}"},
event_filter="trade", # only process "trade" events
data_format="json",
batch_size=500,
batch_timeout=2.0,
reconnect=True,
reconnect_delay=3.0,
max_reconnects=10,
shutdown_event=shutdown,
)
pipeline = Pipeline(extractor) | PostgresLoader(...)
type: sse
url: https://stream.example.com/events
headers:
Authorization: "Bearer ${TOKEN}"
event_filter: trade
data_format: json
batch_size: 500
batch_timeout: 2.0
reconnect: true
max_reconnects: 10
Each SSE event becomes a dict with _event_type, _event_id, and the parsed data fields (or a data key for non-dict payloads).
WebSocket¶
For bidirectional WebSocket feeds. Requires pip install pycharter[streaming].
from pycharter.etl_generator.extractors import WebSocketExtractor
extractor = WebSocketExtractor(
url="wss://ws.example.com/feed",
subscribe_messages=[
{"action": "subscribe", "channel": "trades"},
{"action": "subscribe", "channel": "orderbook"},
],
data_format="json",
batch_size=1000,
batch_timeout=5.0,
ping_interval=20.0,
reconnect=True,
max_reconnects=10,
)
type: websocket
url: wss://ws.example.com/feed
subscribe_messages:
- action: subscribe
channel: trades
data_format: json
batch_size: 1000
ping_interval: 20.0
FileWatcher¶
Monitor a directory for new files and extract their contents. Uses watchfiles for efficient OS-level notifications when available, with a polling fallback.
from pycharter.etl_generator.extractors import FileWatcherExtractor
extractor = FileWatcherExtractor(
watch_dir="/data/inbox",
patterns=["*.json", "*.csv"],
file_format="json",
recursive=True,
process_existing=True, # process files already in the dir
move_after_process="/data/archive", # move processed files
batch_size=1000,
)
type: file_watcher
watch_dir: /data/inbox
patterns: ["*.json", "*.csv"]
file_format: json
recursive: true
process_existing: true
move_after_process: /data/archive
Messaging Extractors¶
Messaging extractors consume from message queues with deferred acknowledgment. Messages are only acknowledged after the pipeline successfully loads a batch. This provides at-least-once delivery guarantees.
How Acknowledgment Works¶
Consumer → Poll messages → Accumulate batch → Transform → Load → Acknowledge
↓
Load fails?
↓
Nack / seek-back / reset visibility
The Pipeline.run() method handles this automatically:
- The extractor yields a batch of records
- The pipeline transforms and loads the batch
- If the load succeeds,
acknowledge(batch_index, success=True)is called - If the load fails,
acknowledge(batch_index, success=False)is called - If the acknowledge call itself fails, a warning is logged and the pipeline continues
You do not need to call acknowledge() yourself — the pipeline does it for you.
Kafka¶
Apache Kafka extractor using aiokafka. Requires pip install pycharter[kafka].
from pycharter.etl_generator.extractors import KafkaExtractor
extractor = KafkaExtractor(
topics=["orders", "payments"],
bootstrap_servers="broker1:9092,broker2:9092",
consumer_group="etl-orders",
auto_offset_reset="earliest",
data_format="json",
batch_size=500,
batch_timeout=2.0,
poll_timeout=1.0,
# SASL/SSL (optional)
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
sasl_username="${KAFKA_USER}",
sasl_password="${KAFKA_PASS}",
ssl_cafile="/path/to/ca.pem",
)
pipeline = Pipeline(extractor) | Rename({...}) | PostgresLoader(...)
result = await pipeline.run()
await extractor.close() # stop the consumer
type: kafka
topics: [orders, payments]
bootstrap_servers: ${KAFKA_BROKERS}
consumer_group: etl-orders
auto_offset_reset: earliest
batch_size: 500
batch_timeout: 2.0
Acknowledgment:
- Success — commits consumer group offsets (highest offset + 1 per partition)
- Failure — seeks back to the earliest offset in the batch for retry
Key details:
enable_auto_commit=False— offsets are only committed after successful load- Each record includes
_topic,_partition,_offsetmetadata fields - Parse errors produce a record with
_parse_error: Trueand rawdatafield
RabbitMQ¶
RabbitMQ extractor using aio-pika. Requires pip install pycharter[rabbitmq].
from pycharter.etl_generator.extractors import RabbitMQExtractor
extractor = RabbitMQExtractor(
queue="order_events",
url="amqp://user:pass@rabbitmq-host/",
prefetch_count=200, # backpressure control
data_format="json",
batch_size=1000,
batch_timeout=5.0,
)
pipeline = Pipeline(extractor) | Rename({...}) | PostgresLoader(...)
result = await pipeline.run()
await extractor.close()
type: rabbitmq
queue: order_events
url: ${RABBITMQ_URL}
prefetch_count: 200
batch_size: 1000
Acknowledgment:
- Success —
message.ack()for each message in the batch - Failure —
message.nack(requeue=True)to requeue messages
Key details:
- Uses
aio_pika.connect_robust()for automatic reconnection prefetch_countcontrols how many unacknowledged messages the broker delivers- Each record includes
_routing_keyand_exchangemetadata fields
SQS¶
AWS SQS extractor using boto3 (included in the [etl] extra). Uses asyncio.to_thread() to wrap blocking boto3 calls.
from pycharter.etl_generator.extractors import SQSExtractor
extractor = SQSExtractor(
queue_url="https://sqs.us-east-1.amazonaws.com/123456789/orders",
region="us-east-1",
wait_time_seconds=20, # long polling
visibility_timeout=300, # 5 min processing window
max_messages_per_poll=10, # SQS max is 10
data_format="json",
batch_size=1000,
)
pipeline = Pipeline(extractor) | Rename({...}) | PostgresLoader(...)
result = await pipeline.run()
await extractor.close()
type: sqs
queue_url: ${SQS_QUEUE_URL}
region: us-east-1
wait_time_seconds: 20
visibility_timeout: 600
Acknowledgment:
- Success —
delete_message_batch(removes messages from the queue) - Failure —
change_message_visibility_batchwithVisibilityTimeout=0(makes messages immediately available for retry)
Key details:
- SQS limits polling to 10 messages at a time — the
BatchAccumulatorhandles batching up tobatch_size - Long polling (
wait_time_seconds=20) reduces empty responses and API costs - Set
visibility_timeouthigher than your expected processing time - Each record includes
_message_idmetadata field
Graceful Shutdown¶
All streaming and messaging extractors support shutdown_event for graceful shutdown:
import asyncio
import signal
shutdown = asyncio.Event()
# Signal handler
def handle_signal():
shutdown.set()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, handle_signal)
loop.add_signal_handler(signal.SIGTERM, handle_signal)
extractor = KafkaExtractor(
topics=["orders"],
shutdown_event=shutdown,
)
pipeline = Pipeline(extractor) | PostgresLoader(...)
result = await pipeline.run()
await extractor.close()
When shutdown_event is set, the extractor finishes the current poll cycle, flushes any buffered records as a final batch, and stops yielding.