Custom Extractors¶
Learn how to build custom extractors for data sources not supported out of the box.
When to Create a Custom Extractor¶
Create a custom extractor when you need to:
- Extract from a proprietary API
- Support a new database type
- Implement custom authentication
- Handle specialized file formats
Basic Structure¶
from pycharter.etl_generator.extractors import BaseExtractor, ExtractorFactory
from typing import AsyncIterator, List, Dict, Any
class CustomExtractor(BaseExtractor):
"""Extract data from a custom source."""
def __init__(self, connection_string: str, **options):
self.connection_string = connection_string
self.options = options
async def extract_streaming(
self,
extract_config: Dict[str, Any],
params: Dict[str, Any],
headers: Dict[str, Any],
batch_size: int = 1000,
max_records: int = None,
**kwargs
) -> AsyncIterator[List[Dict[str, Any]]]:
"""Extract data in batches."""
# Connect to your data source
client = await self.connect()
try:
# Fetch data in batches
offset = 0
total = 0
while True:
batch = await client.fetch(
offset=offset,
limit=batch_size
)
if not batch:
break
yield batch
offset += len(batch)
total += len(batch)
if max_records and total >= max_records:
break
finally:
await client.close()
def validate_config(self, extract_config: Dict[str, Any]) -> None:
"""Validate extractor configuration."""
if 'type' in extract_config and extract_config['type'] != 'custom':
raise ValueError(f"Expected type='custom'")
@classmethod
def from_config(cls, config: Dict[str, Any]) -> "CustomExtractor":
"""Create extractor from config dict."""
return cls(
connection_string=config.get("connection_string"),
**config.get("options", {})
)
# Register the extractor
ExtractorFactory.register("custom", CustomExtractor)
Example: Custom API Extractor¶
import httpx
class MyAPIExtractor(BaseExtractor):
"""Extract data from a proprietary API with custom auth."""
def __init__(self, base_url: str, api_key: str, batch_size: int = 100):
self.base_url = base_url
self.api_key = api_key
self.batch_size = batch_size
async def extract_streaming(
self,
extract_config: Dict[str, Any],
params: Dict[str, Any],
headers: Dict[str, Any],
batch_size: int = 1000,
max_records: int = None,
**kwargs,
) -> AsyncIterator[List[Dict[str, Any]]]:
async with httpx.AsyncClient() as client:
page = 1
total = 0
while True:
resp = await client.get(
f"{self.base_url}/data",
headers={"X-API-Key": self.api_key},
params={"page": page, "per_page": self.batch_size},
)
resp.raise_for_status()
records = resp.json().get("results", [])
if not records:
break
yield records
total += len(records)
if max_records and total >= max_records:
break
page += 1
@classmethod
def from_config(cls, config: Dict[str, Any]) -> "MyAPIExtractor":
return cls(
base_url=config["base_url"],
api_key=config["api_key"],
batch_size=config.get("batch_size", 100),
)
def validate_config(self, extract_config: Dict[str, Any]) -> None:
if not extract_config.get("base_url"):
raise ValueError("Missing 'base_url'")
ExtractorFactory.register("my_api", MyAPIExtractor)
Using Your Custom Extractor¶
from pycharter import Pipeline, FileLoader
# Programmatic
pipeline = (
Pipeline(MyAPIExtractor(base_url="https://api.example.com", api_key="secret"))
| FileLoader(path="output.json")
)
# Config-driven (after registration)
pipeline = Pipeline.from_config_files(
extract="my_api_extract.yaml",
load="load.yaml"
)
my_api_extract.yaml
type: my_api
base_url: https://api.example.com
api_key: ${MY_API_KEY}
batch_size: 200
Built-in messaging extractors
PyCharter includes built-in KafkaExtractor, RabbitMQExtractor, and SQSExtractor with
deferred acknowledgment support. See the Streaming and Messaging Guide
instead of building your own.
Adding Acknowledgment Support¶
If your custom extractor consumes from a message queue and needs deferred acknowledgment, implement an acknowledge() method. The pipeline will call it automatically after each batch load.
from pycharter.etl_generator.extractors._messaging import AckTracker
class CustomQueueExtractor(BaseExtractor):
"""Extract from a custom message queue with ack support."""
def __init__(self, queue_url: str, batch_size: int = 1000):
self.queue_url = queue_url
self.batch_size = batch_size
self._ack_tracker = AckTracker()
self._batch_index = 0
async def extract_streaming(self, *args, **kwargs):
self._ack_tracker.start_batch(self._batch_index)
# ... poll messages, track metadata per message:
# self._ack_tracker.track(self._batch_index, msg_metadata)
# ... yield batch, increment self._batch_index
...
async def extract(self, **params):
# ... similar to extract_streaming
...
async def acknowledge(self, batch_index: int, success: bool) -> None:
"""Called by the pipeline after each batch load."""
metadata = self._ack_tracker.get_batch_metadata(batch_index)
if not metadata:
return
for item in metadata:
if success:
await self._ack_message(item)
else:
await self._nack_message(item)
self._ack_tracker.clear_batch(batch_index)
ExtractorFactory.register("custom_queue", CustomQueueExtractor)
The AckTracker utility (from pycharter.etl_generator.extractors._messaging) maps batch indices to lists of connector-specific metadata (offsets, delivery tags, receipt handles, etc.).
Best Practices¶
- Implement streaming - Use
yieldto emit batches for memory efficiency - Handle errors gracefully - Use try/finally to clean up connections
- Support configuration - Implement
from_config()for YAML support - Validate configuration - Implement
validate_config()for early error detection - Document your extractor - Add docstrings and usage examples