Skip to content

Custom Extractors

Learn how to build custom extractors for data sources not supported out of the box.

When to Create a Custom Extractor

Create a custom extractor when you need to:

  • Extract from a proprietary API
  • Support a new database type
  • Implement custom authentication
  • Handle specialized file formats

Basic Structure

from pycharter.etl_generator.extractors import BaseExtractor, ExtractorFactory
from typing import AsyncIterator, List, Dict, Any

class CustomExtractor(BaseExtractor):
    """Extract data from a custom source."""

    def __init__(self, connection_string: str, **options):
        self.connection_string = connection_string
        self.options = options

    async def extract_streaming(
        self,
        extract_config: Dict[str, Any],
        params: Dict[str, Any],
        headers: Dict[str, Any],
        batch_size: int = 1000,
        max_records: int = None,
        **kwargs
    ) -> AsyncIterator[List[Dict[str, Any]]]:
        """Extract data in batches."""

        # Connect to your data source
        client = await self.connect()

        try:
            # Fetch data in batches
            offset = 0
            total = 0

            while True:
                batch = await client.fetch(
                    offset=offset,
                    limit=batch_size
                )

                if not batch:
                    break

                yield batch

                offset += len(batch)
                total += len(batch)

                if max_records and total >= max_records:
                    break
        finally:
            await client.close()

    def validate_config(self, extract_config: Dict[str, Any]) -> None:
        """Validate extractor configuration."""
        if 'type' in extract_config and extract_config['type'] != 'custom':
            raise ValueError(f"Expected type='custom'")

    @classmethod
    def from_config(cls, config: Dict[str, Any]) -> "CustomExtractor":
        """Create extractor from config dict."""
        return cls(
            connection_string=config.get("connection_string"),
            **config.get("options", {})
        )

# Register the extractor
ExtractorFactory.register("custom", CustomExtractor)

Example: Custom API Extractor

import httpx

class MyAPIExtractor(BaseExtractor):
    """Extract data from a proprietary API with custom auth."""

    def __init__(self, base_url: str, api_key: str, batch_size: int = 100):
        self.base_url = base_url
        self.api_key = api_key
        self.batch_size = batch_size

    async def extract_streaming(
        self,
        extract_config: Dict[str, Any],
        params: Dict[str, Any],
        headers: Dict[str, Any],
        batch_size: int = 1000,
        max_records: int = None,
        **kwargs,
    ) -> AsyncIterator[List[Dict[str, Any]]]:
        async with httpx.AsyncClient() as client:
            page = 1
            total = 0
            while True:
                resp = await client.get(
                    f"{self.base_url}/data",
                    headers={"X-API-Key": self.api_key},
                    params={"page": page, "per_page": self.batch_size},
                )
                resp.raise_for_status()
                records = resp.json().get("results", [])
                if not records:
                    break
                yield records
                total += len(records)
                if max_records and total >= max_records:
                    break
                page += 1

    @classmethod
    def from_config(cls, config: Dict[str, Any]) -> "MyAPIExtractor":
        return cls(
            base_url=config["base_url"],
            api_key=config["api_key"],
            batch_size=config.get("batch_size", 100),
        )

    def validate_config(self, extract_config: Dict[str, Any]) -> None:
        if not extract_config.get("base_url"):
            raise ValueError("Missing 'base_url'")

ExtractorFactory.register("my_api", MyAPIExtractor)

Using Your Custom Extractor

from pycharter import Pipeline, FileLoader

# Programmatic
pipeline = (
    Pipeline(MyAPIExtractor(base_url="https://api.example.com", api_key="secret"))
    | FileLoader(path="output.json")
)

# Config-driven (after registration)
pipeline = Pipeline.from_config_files(
    extract="my_api_extract.yaml",
    load="load.yaml"
)
my_api_extract.yaml
type: my_api
base_url: https://api.example.com
api_key: ${MY_API_KEY}
batch_size: 200

Built-in messaging extractors

PyCharter includes built-in KafkaExtractor, RabbitMQExtractor, and SQSExtractor with deferred acknowledgment support. See the Streaming and Messaging Guide instead of building your own.

Adding Acknowledgment Support

If your custom extractor consumes from a message queue and needs deferred acknowledgment, implement an acknowledge() method. The pipeline will call it automatically after each batch load.

from pycharter.etl_generator.extractors._messaging import AckTracker

class CustomQueueExtractor(BaseExtractor):
    """Extract from a custom message queue with ack support."""

    def __init__(self, queue_url: str, batch_size: int = 1000):
        self.queue_url = queue_url
        self.batch_size = batch_size
        self._ack_tracker = AckTracker()
        self._batch_index = 0

    async def extract_streaming(self, *args, **kwargs):
        self._ack_tracker.start_batch(self._batch_index)
        # ... poll messages, track metadata per message:
        #   self._ack_tracker.track(self._batch_index, msg_metadata)
        # ... yield batch, increment self._batch_index
        ...

    async def extract(self, **params):
        # ... similar to extract_streaming
        ...

    async def acknowledge(self, batch_index: int, success: bool) -> None:
        """Called by the pipeline after each batch load."""
        metadata = self._ack_tracker.get_batch_metadata(batch_index)
        if not metadata:
            return
        for item in metadata:
            if success:
                await self._ack_message(item)
            else:
                await self._nack_message(item)
        self._ack_tracker.clear_batch(batch_index)

ExtractorFactory.register("custom_queue", CustomQueueExtractor)

The AckTracker utility (from pycharter.etl_generator.extractors._messaging) maps batch indices to lists of connector-specific metadata (offsets, delivery tags, receipt handles, etc.).

Best Practices

  1. Implement streaming - Use yield to emit batches for memory efficiency
  2. Handle errors gracefully - Use try/finally to clean up connections
  3. Support configuration - Implement from_config() for YAML support
  4. Validate configuration - Implement validate_config() for early error detection
  5. Document your extractor - Add docstrings and usage examples

See Also