Metadata-Version: 2.4
Name: snowflake-data-exchange-agent
Version: 1.3.2
Summary: Data exchange agent for migrations and validation
Project-URL: Bug Tracker, https://github.com/snowflakedb/migrations-data-validation/issues
Project-URL: Source code, https://github.com/snowflakedb/migrations-data-validation/
Project-URL: homepage, https://www.snowflake.com/
Author-email: "Snowflake, Inc." <snowflake-python-libraries-dl@snowflake.com>
License: Apache License, Version 2.0
Keywords: Snowflake,analytics,cloud,data,data-analysis,data-analytics,data-engineering,data-management,data-processing,data-science,data-visualization,data-warehouse,database
Classifier: Development Status :: 3 - Alpha
Classifier: Environment :: Console
Classifier: Environment :: Other Environment
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: Education
Classifier: Intended Audience :: Information Technology
Classifier: Intended Audience :: System Administrators
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Programming Language :: SQL
Classifier: Topic :: Database
Classifier: Topic :: Scientific/Engineering :: Information Analysis
Classifier: Topic :: Software Development
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: Software Development :: Libraries :: Application Frameworks
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: <3.13,>=3.10
Requires-Dist: azure-identity>=1.25.0
Requires-Dist: azure-storage-blob>=12.26.0
Requires-Dist: boto3>=1.40.41
Requires-Dist: dependency-injector>=4.48.2
Requires-Dist: flask>=3.1.2
Requires-Dist: psutil>=7.1.0
Requires-Dist: psycopg2-binary>=2.9.10
Requires-Dist: pyarrow>=22.0.0
Requires-Dist: pyodbc>=5.0.0
Requires-Dist: requests>=2.32.5
Requires-Dist: snowflake-connector-python>=4.0.0
Requires-Dist: sqlparse==0.5.4
Requires-Dist: toml==0.10.2
Requires-Dist: urllib3>=2.6.3
Requires-Dist: waitress>=3.0.2
Provides-Extra: all
Requires-Dist: parameterized>=0.9.0; extra == 'all'
Requires-Dist: pytest-cov>=4.0.0; extra == 'all'
Requires-Dist: pytest-mock>=3.10.0; extra == 'all'
Requires-Dist: pytest>=7.0.0; extra == 'all'
Requires-Dist: ruff>=0.1.0; extra == 'all'
Requires-Dist: ty>=0.0.1a5; extra == 'all'
Provides-Extra: development
Requires-Dist: parameterized>=0.9.0; extra == 'development'
Requires-Dist: pytest-cov>=4.0.0; extra == 'development'
Requires-Dist: pytest-mock>=3.10.0; extra == 'development'
Requires-Dist: pytest>=7.0.0; extra == 'development'
Requires-Dist: ruff>=0.1.0; extra == 'development'
Requires-Dist: ty>=0.0.1a5; extra == 'development'
Description-Content-Type: text/markdown

# Snowflake Data Exchange Agent

[![License Apache-2.0](https://img.shields.io/:license-Apache%202-brightgreen.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt)
[![Python](https://img.shields.io/badge/python-3.10--3.12-blue)](https://www.python.org/downloads/)

A REST API service for database migrations and data validation. Supports Snowflake, SQL Server, and Amazon Redshift with queue-based task processing. The architecture is extensible to support additional database types in the future.

## Quick Start

```bash
# Install
pip install snowflake-data-exchange-agent

# Run
data-exchange-agent --port 8080

# Test
curl http://localhost:8080/health
```

## Installation

### From PyPI (Production)
```bash
pip install snowflake-data-exchange-agent
```

### Requirements & Dependencies

**Python Version**: 3.10, 3.11, or 3.12 (3.13 not yet supported)

**Available dependency groups**:
- `development`: Testing and development tools (pytest, ruff, etc.)
- `all`: Includes all development dependencies

**Core dependencies include**:
- Snowflake Connector for Python
- PySpark for data processing
- Flask + Waitress for REST API
- ODBC support (pyodbc) for SQL Server
- AWS SDK (boto3)

## Configuration

Create `src/data_exchange_agent/configuration.toml`:

```toml
selected_task_source = "api"

[application]
workers = 4
task_fetch_interval = 5
debug_mode = false

[task_source.api]
key = "api-key"

# SQL Server connection (standard authentication)
[connections.source.sqlserver]
username = "username"
password = "password"
database = "database_name"
host = "127.0.0.1"
port = 1433

# Amazon Redshift connection (IAM authentication for provisioned cluster)
[connections.source.redshift]
username = "demo-user"
database = "snowconvert_demo"
auth_method = "iam-provisioned-cluster"
cluster_id = "migrations-aws"
region = "us-west-2"
access_key_id = "your-access-key-id"
secret_access_key = "your-secret-access-key"

# Amazon Redshift connection (standard authentication)
# [connections.source.redshift]
# username = "myuser"
# password = "mypassword"
# database = "mydatabase"
# host = "my-cluster.abcdef123456.us-west-2.redshift.amazonaws.com"
# port = 5439
# auth_method = "standard"

[connections.target.snowflake_connection_name]
connection_name = "connection_name"

[connections.target.s3]
profile_name = "profile_name"
bucket_name = "bucket_name"

[connections.target.blob]
connection_string = "DefaultEndpointsProtocol=https;AccountName=account_name;AccountKey=account_key;EndpointSuffix=core.windows.net"
container_name = "container_name"
# Optional: Account name and use_default_credential if not using connection string
account_name="storage_account_name"
use_default_credential=<True|False>
```

For Snowflake, create `~/.snowflake/config.toml`:

```toml
[connections.default]
account = "your_account.region"
user = "your_username"
password = "your_password"
warehouse = "COMPUTE_WH"
database = "PRODUCTION_DB"
```

## API Usage

### Command Line
```bash
# Basic usage
data-exchange-agent

# Production settings
data-exchange-agent --workers 8 --port 8080

# Debug mode
data-exchange-agent --debug --port 5001
```

### Health Check
```http
GET /health
```
```json
{
  "status": "healthy",
  "version": "0.0.18",
  "database_connections": {
    "snowflake": "connected"
  }
}
```

### Task Management
```http
# Start processing
GET /handle_tasks

# Stop processing
GET /stop

# Get status
GET /get_handling_tasks_status

# Task count
GET /get_tasks_count
```

### Add Task
```http
POST /tasks
Content-Type: application/json
```
```json
{
  "task_type": "data_extraction",
  "source_config": {
    "database": "sqlserver",
    "query": "SELECT * FROM users"
  },
  "destination_config": {
    "type": "snowflake_stage",
    "stage": "@data_stage/users/"
  }
}
```

## Development

### Setup
```bash
git clone https://github.com/snowflakedb/migrations-data-validation.git
cd migrations-data-validation/data-exchange-agent
pip install -e .[development]
```

### Testing
```bash
# Run all tests
pytest

# With coverage
pytest --cov=src/data_exchange_agent

# Run specific test types
pytest tests/unit/           # Unit tests only
pytest -m "not integration" # Non-integration tests
```

### Code Quality
```bash
# Format code
ruff format .

# Lint code
ruff check .

# Auto-fix linting issues
ruff check --fix .
```

### Type Checking
Run static type checking with [ty](https://docs.astral.sh/ty/) (extremely fast Python type checker from Astral):
```bash
# Check all source code
hatch run types:check-ty

# Check specific path
hatch run types:check-ty tests

# Watch mode - automatically re-checks on file changes
hatch run types:watch-ty
```

#### ty Diagnostics Baseline

The project uses a **diagnostics baseline** to prevent type error regressions. The CI will fail if the number of `ty` diagnostics increases beyond the baseline.

| Project | Baseline |
|---------|----------|
| data-exchange-agent | 64 |

To check diagnostics locally:
```bash
python .github/scripts/ty_check_diagnostics.py data-exchange-agent
```

If you **fix** type errors and reduce the count, please update the baseline in:
- `.github/workflows/data-exchange-all-ci.yml`
- `.github/scripts/ty_check_diagnostics.py` (DEFAULT_BASELINES)

### CI Workflows

The following checks run automatically on PRs and pushes to `main`/`develop`:

1. **Linting** - Static analysis with ruff
2. **Type Check** - ty diagnostics baseline check
3. **Build** - Build wheel packages
4. **Unit Tests** - pytest on Python 3.10, 3.11, and 3.12 with coverage
5. **Artifact** - Package and publish to Test PyPI (on PRs)
6. **Integration** - Test package installation from Test PyPI (on PRs)
## 🐳 Docker

The Data Exchange Agent can be run in a Docker container with configuration injected via environment variables at runtime.

### Building the Image

```bash
cd data-exchange-agent
docker build -t data-exchange-agent .
```

### How It Works

The Dockerfile uses configuration templates that are processed at container startup:

1. **`docker-artifacts/configuration.template.toml`** - Agent configuration template
2. **`docker-artifacts/snowflake.config.template.toml`** - Snowflake connection template
3. **`docker-artifacts/docker-entrypoint.sh`** - Entrypoint script that uses `envsubst` to substitute environment variables into the templates before starting the agent

This approach ensures that sensitive credentials (passwords) are never baked into the Docker image—they are only injected at runtime.

### Environment Variables

#### Data Source Configuration (Required for database connections)

| Variable | Description | Default |
|----------|-------------|---------|
| `DATA_SOURCE_USERNAME` | Username for the source database | - |
| `DATA_SOURCE_PASSWORD` | Password for the source database | - |
| `DATA_SOURCE_HOST` | Hostname of the source database | - |
| `DATA_SOURCE_PORT` | Port of the source database | `1433` |
| `DATA_SOURCE_DATABASE` | Database name on the source | - |

#### Snowflake Connection Configuration (Required for Snowflake integration)

| Variable | Description | Default |
|----------|-------------|---------|
| `SNOWFLAKE_ACCOUNT` | Snowflake account identifier (e.g., `myaccount.us-west-2.aws`) | - |
| `SNOWFLAKE_USER` | Snowflake username | - |
| `SNOWFLAKE_PASSWORD` | Snowflake password | - |
| `SNOWFLAKE_WAREHOUSE` | Snowflake warehouse name | - |
| `SNOWFLAKE_ROLE` | Snowflake role | - |
| `SNOWFLAKE_DATABASE` | Default Snowflake database | - |
| `SNOWFLAKE_SCHEMA` | Default Snowflake schema | - |

#### Application Configuration

| Variable | Description | Default |
|----------|-------------|---------|
| `AGENT_AFFINITY` | Agent affinity label for task routing (required) | - |
| `WORKER_COUNT` | Number of worker threads | `1` |

### Running the Container

```bash
docker run -p 5000:5000 \
  -e DATA_SOURCE_USERNAME="db_user" \
  -e DATA_SOURCE_PASSWORD="db_password" \
  -e DATA_SOURCE_HOST="db.example.com" \
  -e DATA_SOURCE_PORT="1433" \
  -e DATA_SOURCE_DATABASE="mydb" \
  -e SNOWFLAKE_ACCOUNT="myaccount.us-west-2.aws" \
  -e SNOWFLAKE_USER="snowflake_user" \
  -e SNOWFLAKE_PASSWORD="snowflake_password" \
  -e SNOWFLAKE_WAREHOUSE="COMPUTE_WH" \
  -e SNOWFLAKE_ROLE="DATA_ENGINEER" \
  -e SNOWFLAKE_DATABASE="PROD_DB" \
  -e SNOWFLAKE_SCHEMA="PUBLIC" \
  -e AGENT_AFFINITY="blue" \
  -e WORKER_COUNT="8" \
  data-exchange-agent
```

You can also pass additional arguments to the agent:

```bash
docker run -p 8080:8080 \
  -e DATA_SOURCE_PASSWORD="secret" \
  -e SNOWFLAKE_PASSWORD="secret" \
  # ... other env vars ...
  data-exchange-agent --port 8080 --debug
```

### Running in Snowpark Container Services (SPCS)

When deploying the Data Exchange Agent in [Snowpark Container Services](https://docs.snowflake.com/en/developer-guide/snowpark-container-services/overview), you can use the special `@SPCS_CONNECTION` connection name to automatically use Snowflake-provided credentials.

#### How It Works

When running in SPCS, Snowflake automatically provides:
- An **OAuth token** at `/snowflake/session/token`
- **Environment variables**: `SNOWFLAKE_HOST`, `SNOWFLAKE_ACCOUNT`, `SNOWFLAKE_DATABASE`, `SNOWFLAKE_SCHEMA`

The `@SPCS_CONNECTION` feature reads these credentials automatically, so you don't need to configure Snowflake passwords or account details manually.

> **Note**: `SNOWFLAKE_WAREHOUSE` is **not** provided by SPCS. You can set it manually via environment variable or use the `QUERY_WAREHOUSE` parameter when creating the service.

#### Configuration

By default, the Docker image uses `@SPCS_CONNECTION`. No additional Snowflake configuration is needed:

```toml
[task_source.snowflake_stored_procedure]
connection_name = "@SPCS_CONNECTION"

[connections.target.snowflake_connection_name]
connection_name = "@SPCS_CONNECTION"
```

#### Environment Variables for SPCS

| Variable | Description | Default |
|----------|-------------|---------|
| `SNOWFLAKE_CONNECTION_NAME` | Connection mode: `@SPCS_CONNECTION` for SPCS credentials, or a named connection from `~/.snowflake/config.toml` | `@SPCS_CONNECTION` |
| `SNOWFLAKE_WAREHOUSE` | Warehouse for queries (not provided by SPCS, must be set manually) | - |

#### Switching to Manual Credentials

If you need to use traditional Snowflake credentials instead of SPCS-provided ones (e.g., for testing outside SPCS), set the `SNOWFLAKE_CONNECTION_NAME` environment variable:

```bash
# Use a named connection from ~/.snowflake/config.toml
docker run \
  -e SNOWFLAKE_CONNECTION_NAME="MY_SNOWFLAKE_CONNECTION" \
  -e SNOWFLAKE_ACCOUNT="myaccount.us-west-2.aws" \
  -e SNOWFLAKE_USER="user" \
  -e SNOWFLAKE_PASSWORD="password" \
  # ... other env vars ...
  data-exchange-agent
```

#### Example SPCS Service Definition

```sql
CREATE SERVICE data_exchange_agent
  IN COMPUTE POOL my_compute_pool
  QUERY_WAREHOUSE = MY_WAREHOUSE
  FROM SPECIFICATION $$
  spec:
    containers:
    - name: agent
      image: /my_db/my_schema/my_repo/data-exchange-agent:latest
      env:
        DATA_SOURCE_HOST: "source-db.example.com"
        DATA_SOURCE_PORT: "1433"
        DATA_SOURCE_DATABASE: "mydb"
        DATA_SOURCE_USERNAME: "user"
        AGENT_AFFINITY: "spcs-agent"
        WORKER_COUNT: "4"
      secrets:
      - snowflakeSecret: my_db_password_secret
        secretKeyRef: password
        envVarName: DATA_SOURCE_PASSWORD
  $$;
```

For more details, see the [Snowflake SPCS documentation](https://docs.snowflake.com/en/developer-guide/snowpark-container-services/spcs-execute-sql).

## 🔌 Extending the System

### Adding a New Bulk Utility

Bulk utilities are command-line tools used to efficiently export data from databases (e.g., BCP for SQL Server). Follow these steps to add a new bulk utility:

#### 1. Define the Bulk Utility Type

Add your new utility to the `BulkUtilityType` enum in `src/data_exchange_agent/data_sources/bulk_utility_types.py`:

```python
class BulkUtilityType(str, Enum):
    BCP = "bcp"
    YOUR_UTILITY = "your_utility_name"  # Add this line
```

#### 2. Create Configuration Class

Create a new configuration class in `src/data_exchange_agent/config/sections/bulk_utilities/your_utility.py`:

```python
from data_exchange_agent.config.sections.bulk_utilities.base import BaseBulkUtilityConfig

class YourUtilityConfig(BaseBulkUtilityConfig):
    """Configuration class for YourUtility bulk utility settings."""

    def __init__(
        self,
        # Add utility-specific parameters
        utility_specific_parameter: str = "default_param",
    ) -> None:
        """Initialize YourUtility configuration."""
        self.utility_specific_parameter = utility_specific_parameter

    def _custom_validation(self) -> str | None:
        """Validate configuration parameters."""
        if not self.utility_specific_parameter:
            return "Utility specific parameter cannot be empty."
        return None

    def __repr__(self) -> str:
        """Return string representation."""
        return f"YourUtilityConfig(utility_specific_parameter='{self.utility_specific_parameter}')"
```

#### 3. Register the Bulk Utility

Register your utility in `src/data_exchange_agent/config/sections/bulk_utilities/__init__.py`:

```python
from data_exchange_agent.config.sections.bulk_utilities.your_utility import YourUtilityConfig
from data_exchange_agent.constants.connection_types import ConnectionType

# Add to registry
BulkUtilityRegistry.register(ConnectionType.YOUR_UTILITY, YourUtilityConfig) # Add this

# Add to __all__
__all__ = [
    "BaseBulkUtilityConfig",
    "BulkUtilityRegistry",
    "BCPBulkUtilityConfig",
    "YourUtilityConfig",  # Add this
]
```

#### 4. Update ConnectionType Enum

Add your utility type to `src/data_exchange_agent/constants/connection_types.py`:

```python
class ConnectionType(str, Enum):
    # Bulk utilities
    BCP = BulkUtilityType.BCP.value
    YOUR_UTILITY = BulkUtilityType.YOUR_UTILITY.value  # Add this
```

#### 5. Create Data Source Implementation

Create the data source class in `src/data_exchange_agent/data_sources/your_utility_data_source.py`:

```python
from data_exchange_agent.data_sources.base import BaseDataSource
from data_exchange_agent.data_sources.bulk_utility_types import BulkUtilityType

class YourUtilityDataSource(BaseDataSource):
    """Data source implementation for YourUtility."""

    @inject
    def __init__(
        self,
        engine: str,
        statement: str,
        results_folder_path: str = None,
        base_file_name: str = "result",
        logger: SFLogger = Provide[container_keys.SF_LOGGER],
        program_config: ConfigManager = Provide[container_keys.PROGRAM_CONFIG],
    ) -> None:
        """Initialize YourUtilityDataSource."""
        self.logger = logger
        self._statement = statement

        # Get configuration
        bulk_utility_config = program_config[config_keys.BULK_UTILITY]
        utility_config = bulk_utility_config.get(BulkUtilityType.YOUR_UTILITY, None)

        # Use config values or defaults
        self.utility_specific_parameter = utility_config.utility_specific_parameter if utility_config else "default_param"

    def export_data(self) -> bool:
        """Export data using your utility command."""
        # Implement the export logic
        pass
```

#### 6. Add Configuration to TOML

Users can now configure your bulk utility in `configuration.toml`:

```toml
[bulk_utility.your_utility_name]
# Add your custom parameters
utility_specific_parameter = "param"
```

#### 7. Write Tests

Create tests in `tests/data_sources/test_your_utility_data_source.py` to verify functionality.

### Example: BCP Implementation

See the existing BCP implementation for reference:
- Config: `src/data_exchange_agent/config/sections/bulk_utilities/bcp.py`
- Data Source: `src/data_exchange_agent/data_sources/bcp_data_source.py`
- Configuration example in `configuration_example.toml`

### Using BCP for Data Extraction

By default, the agent uses JDBC for data extraction. To use BCP (Bulk Copy Program) instead, simply add the `[bulk_utility.bcp]` section to your `configuration.toml`:

```toml
[bulk_utility.bcp]
delimiter = ";"
row_terminator = "\\n"
encoding = "UTF8"
trusted_connection = false
encrypt = true
```

**How it works**: When the agent detects a `[bulk_utility.bcp]` configuration section, it automatically uses BCP for data extraction instead of JDBC. No additional configuration is needed—the presence of the BCP configuration section enables BCP mode.

**Requirements**:
- The BCP utility must be installed and available in the system PATH
- SQL Server source connection must be configured in `[connections.source.sqlserver]`

**BCP Configuration Options**:

| Option | Description | Default |
|--------|-------------|---------|
| `delimiter` | Field delimiter character(s) | `,` |
| `row_terminator` | Row terminator character(s) | `\n` |
| `encoding` | Character encoding (e.g., `UTF8`, `ACP`) | `UTF8` |
| `trusted_connection` | Use Windows authentication | `false` |
| `encrypt` | Encrypt the connection | `true` |

**Note**: To switch back to JDBC, simply remove or comment out the `[bulk_utility.bcp]` section from your configuration.

---

## 🤝 Contributing

We welcome contributions! See our [Contributing Guide](../CONTRIBUTING.md) for details on how to collaborate, set up your development environment, and submit PRs.

---

## 📄 License

This project is licensed under the Apache License 2.0. See the [LICENSE](../LICENSE) file for details.

## 🆘 Support

- **Documentation**: [Full documentation](https://github.com/snowflakedb/migrations-data-validation)
- **Issues**: [GitHub Issues](https://github.com/snowflakedb/migrations-data-validation/issues)

---

**Developed with ❄️ by Snowflake**
