Metadata-Version: 2.4
Name: sqladbx
Version: 0.0.2
Summary: SQLAlchemy db context
Author-email: Oleksii Svichkar <oswops@gmail.com>
License: MIT
Project-URL: homepage, https://github.com/your-org/sqladbx
Project-URL: repository, https://github.com/your-org/sqladbx
Project-URL: documentation, https://github.com/your-org/sqladbx#readme
Project-URL: issues, https://github.com/your-org/sqladbx/issues
Keywords: async,sqlalchemy,sqlmodel,fastapi,postgresql,db,context,temporal,middleware
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Database
Classifier: Topic :: Internet :: WWW/HTTP :: Dynamic Content
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.13
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: sqlalchemy>=2.0
Requires-Dist: asyncpg>=0.27
Requires-Dist: starlette>=0.27
Provides-Extra: test
Requires-Dist: pytest>=7.0; extra == "test"
Requires-Dist: pytest-asyncio>=0.21; extra == "test"
Requires-Dist: pytest-cov>=4.0; extra == "test"
Requires-Dist: httpx>=0.24; extra == "test"
Requires-Dist: fastapi>=0.100; extra == "test"
Requires-Dist: temporalio>=1.0; extra == "test"
Requires-Dist: litestar>=2.0; extra == "test"
Requires-Dist: sniffio>=1.3; extra == "test"
Requires-Dist: taskiq>=0.12; extra == "test"
Requires-Dist: sqlmodel>=0.0.8; extra == "test"
Requires-Dist: greenlet>=3.0; extra == "test"
Dynamic: license-file

# sqladbx

![Python Version](https://img.shields.io/badge/python-3.13%2B-blue)
![License](https://img.shields.io/badge/license-MIT-green)

**sqladbx** — async-first SQLAlchemy middleware and database proxy for FastAPI, Temporal, and other async Python frameworks.

Provides a global async session context (`db.session`) with seamless integration into existing codebases. Works with SQLAlchemy 2.0+.

## Features

✨ **Core Features:**

- 🔄 Async SQLAlchemy 2.0+ integration with `AsyncSession`
- 🎯 Global session context with `db.session` pattern
- 📦 Database proxy with flexible initialization
- 🔌 Starlette/FastAPI middleware support
- 🔀 Multi-database support (master/replica pattern)
- ⚡ Multi-session mode for concurrent async tasks
- 🎪 Framework-agnostic (works with FastAPI, Litestar, Temporal, etc.)
- 💾 No middleware required (works in workers, CLI, scripts)
- ✅ 97% test coverage with 47+ tests across multiple frameworks
- 🔄 Seamless backward compatibility with existing codebases

## Installation

```bash
pip install sqladbx
```

### Requirements

- Python 3.13+
- SQLAlchemy 2.0+
- asyncpg or other async database driver

## Quick Start

### Basic Setup

```python
from fastapi import FastAPI
from sqladbx import SQLAlchemyMiddleware, db

app = FastAPI()

# Add middleware to initialize the database
app.add_middleware(
    SQLAlchemyMiddleware,
    db_url="postgresql+asyncpg://user:password@localhost/dbname",
    engine_args={
        "echo": True,
        "pool_size": 10,
        "max_overflow": 20,
    }
)
```

### Using Sessions

```python
from fastapi import APIRouter
from sqlalchemy import select
from sqlmodel import SQLModel, Field

class User(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    name: str
    email: str

router = APIRouter()

@router.get("/users")
async def get_users():
    """Read users from database."""
    result = await db.session.execute(select(User))
    return result.scalars().all()

@router.post("/users")
async def create_user(name: str, email: str):
    """Create a new user."""
    user = User(name=name, email=email)
    db.session.add(user)
    await db.session.flush()
    return user
```

## Advanced Usage

### Multiple Databases (Master/Replica Pattern)

```python
from sqladbx import create_db_middleware, db
from sqladbx.proxy import DBProxy

# Create independent database proxies
master_db = db  # Use default proxy
replica_db = DBProxy()  # Create new proxy for replica

# Initialize master database
app.add_middleware(
    SQLAlchemyMiddleware,
    db_url="postgresql+asyncpg://user:password@localhost/master_db",
    db_proxy=master_db,
    engine_args={"pool_size": 10}
)

# Initialize replica database using factory pattern
ReplicaMiddleware = create_db_middleware()
app.add_middleware(
    ReplicaMiddleware,
    db_url="postgresql+asyncpg://user:password@localhost/replica_db",
    db_proxy=replica_db,
    engine_args={"pool_size": 10}
)

# Route handlers manage their own context
@router.get("/users")
async def get_users():
    result = await master_db.session.execute(select(User))
    return result.scalars().all()

@router.get("/users/cached")
async def get_users_cached():
    result = await replica_db.session.execute(select(User))
    return result.scalars().all()
```

### Without Middleware (Workers, CLI, Scripts)

```python
from sqladbx import db

# Initialize anywhere - no middleware required
async def main():
    db.initialize(
        create_engine(
            "postgresql+asyncpg://user:password@localhost/dbname",
            echo=True
        ),
        session_args={}
    )

    async with db():
        result = await db.session.execute(select(User))
        users = result.scalars().all()
        print(users)
```

### Multi-Session Mode (Concurrent Tasks)

For parallel async operations within a single request:

```python
@router.post("/bulk-operations")
async def bulk_operations():
    """Execute multiple database operations concurrently."""
    async with db(multi_sessions=True):
        # Each db.session call creates a new session
        tasks = [
            process_task(1),
            process_task(2),
            process_task(3),
        ]
        results = await asyncio.gather(*tasks)
    return results

async def process_task(task_id: int):
    # Each task gets its own session
    result = await db.session.execute(select(User).where(User.id == task_id))
    return result.scalar_one()
```

### Commit Control

```python
# Automatic commit on context exit
async with db(commit_on_exit=True):
    user = User(name="John", email="john@example.com")
    db.session.add(user)
    # Auto-commits on __aexit__

# Manual commit control (default)
async with db():
    user = User(name="Jane", email="jane@example.com")
    db.session.add(user)
    await db.session.flush()  # Explicit flush
    # Auto-rollback on context exit
```

## Framework Integration

### FastAPI

See [Quick Start](#quick-start) section above.

### Litestar

```python
from litestar import Litestar
from sqladbx import SQLAlchemyMiddleware

app = Litestar(
    route_handlers=[...],
    middleware=[
        MiddlewareProtocolConfig(
            middleware=SQLAlchemyMiddleware,
            kwargs={
                "db_url": "postgresql+asyncpg://...",
            }
        )
    ]
)
```

### Temporal

```python
from sqladbx import db
from temporalio import workflow

@workflow.defn
class MyWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        # Initialize db without middleware
        db.initialize(engine, session_args={})

        async with db():
            # Use session in activities
            result = await db.session.execute(select(User))
            users = result.scalars().all()

        return f"Processed {len(users)} users"
```

### Background Tasks (TaskIQ)

```python
from sqladbx import db
from taskiq import InMemoryBroker

broker = InMemoryBroker()

@broker.task
async def process_user(user_id: int):
    db.initialize(engine, session_args={})

    async with db():
        result = await db.session.execute(
            select(User).where(User.id == user_id)
        )
        user = result.scalar_one()
        # Process user...
```

## API Reference

### SQLAlchemyMiddleware

Main middleware for FastAPI/Starlette applications.

```python
SQLAlchemyMiddleware(
    app: ASGIApp,
    *,
    db_url: str | URL | None = None,
    custom_engine: AsyncEngine | None = None,
    engine_args: dict[str, object] | None = None,
    session_args: dict[str, object] | None = None,
    commit_on_exit: bool = False,
    db_proxy: DBProxy | None = None,
)
```

**Parameters:**

- `db_url`: Database connection string
- `custom_engine`: Pre-configured `AsyncEngine` (overrides `db_url`)
- `engine_args`: SQLAlchemy engine creation arguments
- `session_args`: Session factory arguments
- `commit_on_exit`: Auto-commit on context exit
- `db_proxy`: Custom `DBProxy` instance (defaults to global `db`)

### DBProxy

Database proxy for managing async sessions.

```python
from sqlax import db
from sqlax.proxy import DBProxy

# Global proxy (pre-created)
db: DBProxy

# Create custom proxy
replica_db = DBProxy()
```

**Methods:**

- `initialize(engine, session_args)`: Initialize with AsyncEngine
- `session`: Get current session (within context)
- `__call__(commit_on_exit=False, multi_sessions=False)`: Async context manager

### create_db_middleware()

Factory function for creating independent middleware classes:

```python
from sqladbx import create_db_middleware

ReplicaMiddleware = create_db_middleware()
app.add_middleware(
    ReplicaMiddleware,
    db_url="...",
    db_proxy=replica_db,
)
```

## Testing

The library includes comprehensive test coverage:

```bash
# Run all tests
pytest

# Run with coverage
pytest --cov=sqlax --cov-report=html

# Run specific test suite
pytest tests/integration/test_fastapi_integration.py
```

**Test Coverage:** 97% (47+ tests)

- Unit tests for proxy, sessions, exceptions
- FastAPI integration tests
- Temporal workflow tests
- Litestar integration tests
- TaskIQ background task tests

## Error Handling

### Common Exceptions

```python
from sqladbx.exceptions import (
    SessionNotInitialisedError,  # DB not initialized
    MissingSessionError,          # No session in context
)

try:
    async with db():
        result = await db.session.execute(select(User))
except MissingSessionError:
    print("Session context required")
except SessionNotInitialisedError:
    print("Database not initialized")
```

## Design Principles

1. **Simplicity**: Zero-boilerplate session management
2. **Flexibility**: Works with or without middleware
3. **Performance**: Efficient async/await patterns, connection pooling
4. **Type Safety**: Full type hints for IDE autocomplete
5. **Testing**: Comprehensive test suite across frameworks
6. **Production Ready**: Used in high-throughput systems

## Migration Guide

### From fastapi-async-sqlalchemy

If migrating from `fastapi_async_sqlalchemy`:

```python
# Old code
from fastapi_async_sqlalchemy import get_db

@router.get("/users")
async def get_users(db = Depends(get_db)):
    result = await db.execute(select(User))
    return result.scalars().all()

# New code - simpler!
from sqladbx import db

@router.get("/users")
async def get_users():
    async with db():
        result = await db.session.execute(select(User))
        return result.scalars().all()
```

## Contributing

Contributions welcome! Please:

1. Fork the repository
2. Create a feature branch
3. Add tests for new functionality
4. Ensure 97%+ coverage
5. Submit a pull request

## License

MIT License - see LICENSE file for details

## Support

- 📖 [GitHub Repository](https://github.com/your-org/sqladbx)
- 🐛 [Issue Tracker](https://github.com/your-org/sqladbx/issues)
- 💬 [Discussions](https://github.com/your-org/sqladbx/discussions)

---

**Made with ❤️ for the Python async community**
