Skip to content

Streaming and Messaging Extractors

This guide covers PyCharter's real-time data extraction capabilities: streaming extractors for continuous data feeds and messaging extractors for message queue consumption with at-least-once delivery guarantees.

Overview

Extractor Source Install Extra Acknowledgment
SSEExtractor Server-Sent Events (core) No
WebSocketExtractor WebSocket streams pycharter[streaming] No
FileWatcherExtractor Directory monitoring pycharter[streaming] No
KafkaExtractor Apache Kafka pycharter[kafka] Yes (offset commit)
RabbitMQExtractor RabbitMQ pycharter[rabbitmq] Yes (ack/nack)
SQSExtractor AWS SQS pycharter[etl] Yes (delete/visibility)

All streaming and messaging extractors share common controls:

  • batch_size — maximum records per yielded batch
  • batch_timeout — seconds before yielding a partial batch
  • max_records — stop after this many total records
  • max_batches — stop after yielding this many batches
  • shutdown_eventasyncio.Event to signal graceful shutdown

Streaming Extractors

Streaming extractors yield batches indefinitely from real-time data sources. They reconnect automatically on connection loss and use the BatchAccumulator for efficient memory-bounded batching.

SSE (Server-Sent Events)

SSE is ideal for consuming event streams from HTTP endpoints. Uses httpx (a core dependency — no extra install needed).

import asyncio
from pycharter import Pipeline
from pycharter.etl_generator.extractors import SSEExtractor

shutdown = asyncio.Event()

extractor = SSEExtractor(
    url="https://stream.example.com/events",
    headers={"Authorization": "Bearer ${TOKEN}"},
    event_filter="trade",          # only process "trade" events
    data_format="json",
    batch_size=500,
    batch_timeout=2.0,
    reconnect=True,
    reconnect_delay=3.0,
    max_reconnects=10,
    shutdown_event=shutdown,
)

pipeline = Pipeline(extractor) | PostgresLoader(...)
extract.yaml
type: sse
url: https://stream.example.com/events
headers:
  Authorization: "Bearer ${TOKEN}"
event_filter: trade
data_format: json
batch_size: 500
batch_timeout: 2.0
reconnect: true
max_reconnects: 10

Each SSE event becomes a dict with _event_type, _event_id, and the parsed data fields (or a data key for non-dict payloads).

WebSocket

For bidirectional WebSocket feeds. Requires pip install pycharter[streaming].

from pycharter.etl_generator.extractors import WebSocketExtractor

extractor = WebSocketExtractor(
    url="wss://ws.example.com/feed",
    subscribe_messages=[
        {"action": "subscribe", "channel": "trades"},
        {"action": "subscribe", "channel": "orderbook"},
    ],
    data_format="json",
    batch_size=1000,
    batch_timeout=5.0,
    ping_interval=20.0,
    reconnect=True,
    max_reconnects=10,
)
extract.yaml
type: websocket
url: wss://ws.example.com/feed
subscribe_messages:
  - action: subscribe
    channel: trades
data_format: json
batch_size: 1000
ping_interval: 20.0

FileWatcher

Monitor a directory for new files and extract their contents. Uses watchfiles for efficient OS-level notifications when available, with a polling fallback.

from pycharter.etl_generator.extractors import FileWatcherExtractor

extractor = FileWatcherExtractor(
    watch_dir="/data/inbox",
    patterns=["*.json", "*.csv"],
    file_format="json",
    recursive=True,
    process_existing=True,             # process files already in the dir
    move_after_process="/data/archive", # move processed files
    batch_size=1000,
)
extract.yaml
type: file_watcher
watch_dir: /data/inbox
patterns: ["*.json", "*.csv"]
file_format: json
recursive: true
process_existing: true
move_after_process: /data/archive

Messaging Extractors

Messaging extractors consume from message queues with deferred acknowledgment. Messages are only acknowledged after the pipeline successfully loads a batch. This provides at-least-once delivery guarantees.

How Acknowledgment Works

Consumer → Poll messages → Accumulate batch → Transform → Load → Acknowledge
                                                     Load fails?
                                              Nack / seek-back / reset visibility

The Pipeline.run() method handles this automatically:

  1. The extractor yields a batch of records
  2. The pipeline transforms and loads the batch
  3. If the load succeeds, acknowledge(batch_index, success=True) is called
  4. If the load fails, acknowledge(batch_index, success=False) is called
  5. If the acknowledge call itself fails, a warning is logged and the pipeline continues

You do not need to call acknowledge() yourself — the pipeline does it for you.

Kafka

Apache Kafka extractor using aiokafka. Requires pip install pycharter[kafka].

from pycharter.etl_generator.extractors import KafkaExtractor

extractor = KafkaExtractor(
    topics=["orders", "payments"],
    bootstrap_servers="broker1:9092,broker2:9092",
    consumer_group="etl-orders",
    auto_offset_reset="earliest",
    data_format="json",
    batch_size=500,
    batch_timeout=2.0,
    poll_timeout=1.0,
    # SASL/SSL (optional)
    security_protocol="SASL_SSL",
    sasl_mechanism="PLAIN",
    sasl_username="${KAFKA_USER}",
    sasl_password="${KAFKA_PASS}",
    ssl_cafile="/path/to/ca.pem",
)

pipeline = Pipeline(extractor) | Rename({...}) | PostgresLoader(...)
result = await pipeline.run()
await extractor.close()  # stop the consumer
extract.yaml
type: kafka
topics: [orders, payments]
bootstrap_servers: ${KAFKA_BROKERS}
consumer_group: etl-orders
auto_offset_reset: earliest
batch_size: 500
batch_timeout: 2.0

Acknowledgment:

  • Success — commits consumer group offsets (highest offset + 1 per partition)
  • Failure — seeks back to the earliest offset in the batch for retry

Key details:

  • enable_auto_commit=False — offsets are only committed after successful load
  • Each record includes _topic, _partition, _offset metadata fields
  • Parse errors produce a record with _parse_error: True and raw data field

RabbitMQ

RabbitMQ extractor using aio-pika. Requires pip install pycharter[rabbitmq].

from pycharter.etl_generator.extractors import RabbitMQExtractor

extractor = RabbitMQExtractor(
    queue="order_events",
    url="amqp://user:pass@rabbitmq-host/",
    prefetch_count=200,         # backpressure control
    data_format="json",
    batch_size=1000,
    batch_timeout=5.0,
)

pipeline = Pipeline(extractor) | Rename({...}) | PostgresLoader(...)
result = await pipeline.run()
await extractor.close()
extract.yaml
type: rabbitmq
queue: order_events
url: ${RABBITMQ_URL}
prefetch_count: 200
batch_size: 1000

Acknowledgment:

  • Success — message.ack() for each message in the batch
  • Failure — message.nack(requeue=True) to requeue messages

Key details:

  • Uses aio_pika.connect_robust() for automatic reconnection
  • prefetch_count controls how many unacknowledged messages the broker delivers
  • Each record includes _routing_key and _exchange metadata fields

SQS

AWS SQS extractor using boto3 (included in the [etl] extra). Uses asyncio.to_thread() to wrap blocking boto3 calls.

from pycharter.etl_generator.extractors import SQSExtractor

extractor = SQSExtractor(
    queue_url="https://sqs.us-east-1.amazonaws.com/123456789/orders",
    region="us-east-1",
    wait_time_seconds=20,        # long polling
    visibility_timeout=300,      # 5 min processing window
    max_messages_per_poll=10,    # SQS max is 10
    data_format="json",
    batch_size=1000,
)

pipeline = Pipeline(extractor) | Rename({...}) | PostgresLoader(...)
result = await pipeline.run()
await extractor.close()
extract.yaml
type: sqs
queue_url: ${SQS_QUEUE_URL}
region: us-east-1
wait_time_seconds: 20
visibility_timeout: 600

Acknowledgment:

  • Success — delete_message_batch (removes messages from the queue)
  • Failure — change_message_visibility_batch with VisibilityTimeout=0 (makes messages immediately available for retry)

Key details:

  • SQS limits polling to 10 messages at a time — the BatchAccumulator handles batching up to batch_size
  • Long polling (wait_time_seconds=20) reduces empty responses and API costs
  • Set visibility_timeout higher than your expected processing time
  • Each record includes _message_id metadata field

Graceful Shutdown

All streaming and messaging extractors support shutdown_event for graceful shutdown:

import asyncio
import signal

shutdown = asyncio.Event()

# Signal handler
def handle_signal():
    shutdown.set()

loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, handle_signal)
loop.add_signal_handler(signal.SIGTERM, handle_signal)

extractor = KafkaExtractor(
    topics=["orders"],
    shutdown_event=shutdown,
)

pipeline = Pipeline(extractor) | PostgresLoader(...)
result = await pipeline.run()
await extractor.close()

When shutdown_event is set, the extractor finishes the current poll cycle, flushes any buffered records as a final batch, and stops yielding.

See Also