Metadata-Version: 2.4
Name: mehdashti-encryption
Version: 0.2.0
Summary: AES-256-GCM encryption with key rotation and context binding support
Project-URL: Homepage, https://github.com/mehdashti/smart-platform
Project-URL: Repository, https://github.com/mehdashti/smart-platform.git
Project-URL: Issues, https://github.com/mehdashti/smart-platform/issues
Author-email: Mahdi Ashti <mahdi@mehdashti.com>
License: MIT
Keywords: aes-gcm,encryption,key-rotation,security
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Security :: Cryptography
Requires-Python: >=3.13
Requires-Dist: cryptography>=44.0.0
Provides-Extra: dev
Requires-Dist: pytest>=8.0.0; extra == 'dev'
Requires-Dist: ruff>=0.8.0; extra == 'dev'
Description-Content-Type: text/markdown

# mehdashti-encryption

AES-256-GCM encryption with key rotation support for securing sensitive data.

## Features

- ✅ **AES-256-GCM**: Authenticated encryption (confidentiality + integrity)
- ✅ **Context Binding**: AAD support to prevent ciphertext reuse attacks
- ✅ **Key Rotation**: Seamless key rotation without downtime
- ✅ **Automatic Key Detection**: Decrypts with correct key automatically
- ✅ **No Database Changes**: Key ID embedded in ciphertext
- ✅ **Version Support**: Future-proof encryption format
- ✅ **Type Safe**: Full type hints for Python 3.13+

## Installation

```bash
pip install mehdashti-encryption
# or
uv add mehdashti-encryption
```

## Quick Start

### Basic Usage

```python
import os
import base64
from mehdashti_encryption import EncryptionService

# Load keys from environment
keys = {
    1: base64.b64decode(os.getenv("ENCRYPTION_KEY_1")),
    2: base64.b64decode(os.getenv("ENCRYPTION_KEY_2")),
}

# Initialize service
service = EncryptionService(keys)

# Encrypt
encrypted = service.encrypt("my secret password")
print(encrypted)  # Base64 string

# Decrypt
plaintext = service.decrypt(encrypted)
print(plaintext)  # "my secret password"
```

### Context Binding (Recommended for Production)

For production use, **always use context binding** to prevent ciphertext reuse attacks:

```python
from mehdashti_encryption import EncryptionService

service = EncryptionService(keys)

# Bind encryption to specific context
user_id = 123
field_name = "email"
context = f"user_id:{user_id}:field:{field_name}"

# Encrypt with context
encrypted_email = service.encrypt("user@example.com", associated_data=context)

# Decrypt with same context (REQUIRED)
email = service.decrypt(encrypted_email, associated_data=context)

# ❌ Decryption with wrong context will FAIL
wrong_context = f"user_id:456:field:email"
# This raises ValueError: Decryption failed
service.decrypt(encrypted_email, associated_data=wrong_context)
```

**Why Context Binding?**
- Prevents encrypted data from being copied to different users/records
- Ensures ciphertext can only be decrypted in the correct context
- Protects against replay attacks and data tampering
- Recommended pattern: `table:users:id:{user_id}:field:{field_name}`

### Generate Encryption Keys

```python
from mehdashti_encryption import EncryptionService
import base64

# Generate a new random key
key = EncryptionService.generate_key()
key_base64 = base64.b64encode(key).decode()

print(f"ENCRYPTION_KEY_1={key_base64}")
# Add to .env file
```

### FastAPI Integration

```python
from fastapi import FastAPI, Depends
from mehdashti_encryption import EncryptionService
import os
import base64

app = FastAPI()

def get_encryption_service() -> EncryptionService:
    keys = {
        1: base64.b64decode(os.getenv("ENCRYPTION_KEY_1")),
        2: base64.b64decode(os.getenv("ENCRYPTION_KEY_2")),
    }
    return EncryptionService(keys)

@app.post("/connections")
async def create_connection(
    user_id: int,
    password: str,
    encryption: EncryptionService = Depends(get_encryption_service)
):
    # Create context binding for this specific connection
    context = f"user_id:{user_id}:field:password"

    # Encrypt password with context
    encrypted_password = encryption.encrypt(password, associated_data=context)

    # Store in database (context info is derived from user_id in DB)
    await db.execute(
        "INSERT INTO connections (user_id, password_encrypted) VALUES ($1, $2)",
        user_id, encrypted_password
    )

    return {"status": "created"}

@app.get("/connections/{id}")
async def get_connection(
    id: int,
    encryption: EncryptionService = Depends(get_encryption_service)
):
    # Fetch from database
    result = await db.fetchone(
        "SELECT user_id, password_encrypted FROM connections WHERE id = $1", id
    )

    # Recreate same context for decryption
    context = f"user_id:{result['user_id']}:field:password"

    # Decrypt password with context
    password = encryption.decrypt(result["password_encrypted"], associated_data=context)

    return {"password": password}
```

## Key Rotation

### 1. Generate New Key

```python
from mehdashti_encryption import EncryptionService
import base64

new_key = EncryptionService.generate_key()
print(f"ENCRYPTION_KEY_3={base64.b64encode(new_key).decode()}")
```

### 2. Add to Environment

```bash
# .env
ENCRYPTION_KEY_1=old_key_base64
ENCRYPTION_KEY_2=old_key_base64
ENCRYPTION_KEY_3=new_key_base64  # New!
```

### 3. Rotate Keys

```python
import os
import base64
from mehdashti_encryption import EncryptionService

# Initialize with all keys
keys = {
    1: base64.b64decode(os.getenv("ENCRYPTION_KEY_1")),
    2: base64.b64decode(os.getenv("ENCRYPTION_KEY_2")),
    3: base64.b64decode(os.getenv("ENCRYPTION_KEY_3")),  # New key
}
service = EncryptionService(keys)

# Add new key and set as current
new_key = base64.b64decode(os.getenv("ENCRYPTION_KEY_3"))
service.rotate_key(new_key, new_key_id=3)

# Re-encrypt existing data
async def migrate_encryption():
    records = await db.fetch("SELECT id, user_id, password_encrypted FROM connections")

    for record in records:
        # Recreate context for re-encryption
        context = f"user_id:{record['user_id']}:field:password"

        # Re-encrypt with new key (preserving context)
        new_encrypted = service.re_encrypt_with_new_key(
            record["password_encrypted"],
            associated_data=context
        )

        # Update database
        await db.execute(
            "UPDATE connections SET password_encrypted = $1 WHERE id = $2",
            new_encrypted, record["id"]
        )

    print(f"Re-encrypted {len(records)} records")
```

### 4. Remove Old Keys (Optional)

After all data is re-encrypted, you can remove old keys:

```python
# Remove old keys from environment
# Keep only ENCRYPTION_KEY_3

keys = {
    3: base64.b64decode(os.getenv("ENCRYPTION_KEY_3")),
}
service = EncryptionService(keys)
```

## Password-Derived Keys

Instead of random keys, derive from a master password:

```python
import secrets
from mehdashti_encryption import EncryptionService

# Generate salt (store this!)
salt = secrets.token_bytes(16)

# Derive key from password
master_password = "your-strong-master-password"
key = EncryptionService.derive_key_from_password(master_password, salt)

# Use derived key
keys = {1: key}
service = EncryptionService(keys)
```

**⚠️ Important**: Store the salt securely! You need it to derive the same key later.

## Ciphertext Format

The encrypted output is base64-encoded with this structure:

```
[version:1byte][key_id:2bytes][nonce:12bytes][ciphertext][tag:16bytes]
```

- **version**: Encryption format version (currently 1)
- **key_id**: Which key was used (for rotation)
- **nonce**: Random nonce (96 bits)
- **ciphertext**: Encrypted data
- **tag**: Authentication tag (128 bits)

This allows:
- Automatic key detection during decryption
- Future format upgrades
- No database schema changes for key rotation

## API Reference

### `EncryptionService`

Main encryption service class.

#### `__init__(keys: dict[int, bytes], current_key_id: Optional[int] = None)`

Initialize with encryption keys.

- **keys**: Dictionary mapping key_id to 32-byte key
- **current_key_id**: ID of current key (defaults to max)

#### `encrypt(plaintext: str | bytes, associated_data: Optional[str | bytes] = None) -> str`

Encrypt plaintext with optional context binding.

- **plaintext**: String or bytes to encrypt
- **associated_data**: Optional context data (user_id, record_id, field_name, etc.)
  - Binds ciphertext to specific context
  - Authenticated but NOT encrypted
  - **STRONGLY RECOMMENDED** for production use
- **Returns**: Base64-encoded ciphertext

#### `decrypt(ciphertext: str, associated_data: Optional[str | bytes] = None) -> str`

Decrypt ciphertext (automatically detects key).

- **ciphertext**: Base64 ciphertext from encrypt()
- **associated_data**: Optional context data (MUST match what was used in encrypt())
- **Returns**: Decrypted plaintext
- **Raises**: ValueError if ciphertext is invalid or associated_data doesn't match

#### `re_encrypt_with_new_key(old_ciphertext: str, associated_data: Optional[str | bytes] = None) -> str`

Re-encrypt with current key (for key rotation).

- **old_ciphertext**: Ciphertext encrypted with old key
- **associated_data**: Optional context data (must match original encryption)
- **Returns**: New ciphertext with current key

#### `rotate_key(new_key: bytes, new_key_id: int) -> None`

Add new key and set as current.

- **new_key**: New 32-byte encryption key
- **new_key_id**: Unique ID for new key

#### `@staticmethod generate_key() -> bytes`

Generate random 32-byte encryption key.

#### `@staticmethod derive_key_from_password(password: str, salt: bytes) -> bytes`

Derive key from password using PBKDF2.

## Security Considerations

### ✅ Best Practices

1. **Use Context Binding**: ALWAYS use `associated_data` for production
   - Prevents ciphertext reuse attacks
   - Binds encrypted data to specific records/users
   - Example: `f"user_id:{user_id}:field:{field_name}"`
2. **Use Random Keys**: Generate with `generate_key()`
3. **Rotate Regularly**: Update keys every 6-12 months
4. **Store Keys Securely**: Use environment variables or secret managers
5. **Never Log Keys**: Don't print or log encryption keys
6. **Use HTTPS**: Always transmit encrypted data over HTTPS

### ⚠️ Important Notes

- **Context Binding (AAD)**: Strongly recommended for production use
  - Without AAD: Encrypted data can be copied between records
  - With AAD: Decryption only works in the correct context
  - AAD is authenticated but NOT encrypted (can be public)
- **AES-GCM is authenticated**: Tampering is detected automatically
- **Nonces are random**: Safe for concurrent encryption
- **Keys are 256-bit**: Quantum-resistant for foreseeable future
- **PBKDF2 iterations**: 600,000 (OWASP 2023 recommendation)

### ❌ Don't

- ❌ Don't skip `associated_data` in production (security vulnerability)
- ❌ Don't reuse keys across environments (dev/prod)
- ❌ Don't store keys in source code
- ❌ Don't use weak passwords for key derivation
- ❌ Don't decrypt on client side (keep keys server-side)
- ❌ Don't use different `associated_data` for encrypt/decrypt (will fail)

## Use Cases

### 1. Database Connection Passwords

```python
# Bind to specific connection and user
connection_id = 42
user_id = 123
context = f"connection_id:{connection_id}:user_id:{user_id}:field:password"

# Encrypt before storing
encrypted_password = service.encrypt(user_password, associated_data=context)
await db.execute(
    "INSERT INTO connections (id, user_id, password_encrypted) VALUES ($1, $2, $3)",
    connection_id, user_id, encrypted_password
)

# Decrypt when needed (recreate same context)
row = await db.fetchone("SELECT user_id, password_encrypted FROM connections WHERE id = $1", connection_id)
context = f"connection_id:{connection_id}:user_id:{row['user_id']}:field:password"
password = service.decrypt(row["password_encrypted"], associated_data=context)
connection = connect_to_oracle(username, password)
```

### 2. API Keys

```python
# Bind to specific user and integration
user_id = 456
integration_name = "stripe"
context = f"user_id:{user_id}:integration:{integration_name}:field:api_key"

# Store encrypted API key
encrypted_key = service.encrypt(api_key, associated_data=context)

# Use when making requests (recreate context)
context = f"user_id:{user_id}:integration:{integration_name}:field:api_key"
api_key = service.decrypt(encrypted_key, associated_data=context)
response = requests.get(url, headers={"Authorization": f"Bearer {api_key}"})
```

### 3. Personal Data (GDPR Compliance)

```python
# Bind each field to specific user
user_id = 789

# Encrypt PII with context binding
ssn_context = f"table:users:id:{user_id}:field:ssn"
email_context = f"table:users:id:{user_id}:field:email"

encrypted_ssn = service.encrypt(user_ssn, associated_data=ssn_context)
encrypted_email = service.encrypt(user_email, associated_data=email_context)

# Store encrypted
await db.execute(
    "INSERT INTO users (id, ssn_encrypted, email_encrypted) VALUES ($1, $2, $3)",
    user_id, encrypted_ssn, encrypted_email
)

# Decrypt with correct context
ssn = service.decrypt(encrypted_ssn, associated_data=ssn_context)
email = service.decrypt(encrypted_email, associated_data=email_context)
```

## Requirements

- Python 3.13+
- cryptography 44.0+

## License

MIT License

## Author

Mahdi Ashti <mahdi@mehdashti.com>

## Links

- **Repository**: https://github.com/mehdashti/smart-platform
- **Issues**: https://github.com/mehdashti/smart-platform/issues
