Metadata-Version: 2.4
Name: gcp-mysql
Version: 0.1.2
Summary: A lightweight MySQL service wrapper for GCP environments
Author-email: Vince Berry <vince@keenai.io>
Keywords: mysql,gcp,cloud-sql,pymysql,database
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Topic :: Database
Classifier: Topic :: Software Development :: Libraries
Requires-Python: >=3.9
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: pydantic
Requires-Dist: pymysql
Requires-Dist: google-cloud-secret-manager
Dynamic: license-file

# gcp-mysql

[![PyPI version](https://badge.fury.io/py/gcp-mysql.svg)](https://badge.fury.io/py/gcp-mysql)
[![Python 3.9+](https://img.shields.io/badge/python-3.9+-blue.svg)](https://www.python.org/downloads/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![PyPI downloads](https://img.shields.io/pypi/dm/gcp-mysql.svg)](https://pypi.org/project/gcp-mysql/)

A lightweight, production-ready MySQL service wrapper designed for **Google Cloud environments**, with first-class support for **Cloud SQL**, **GCP Secret Manager**, and clean application-level logging.

`gcp-mysql` provides a thin, explicit abstraction over PyMySQL — it does **not** attempt to hide SQL or become an ORM. Instead, it focuses on:

- Safe and explicit connection handling
- Clear, testable query helpers
- GCP-native credential management
- Predictable, non-magical behavior

---

## Features

- **Cloud SQL–native**
  - Supports Unix socket connections for Cloud Run
  - Supports TCP connections for local development (Cloud SQL Proxy)
  - Automatic connection mode detection via environment variables
- **GCP Secret Manager integration**
  - Credentials are loaded securely at runtime
  - No secrets stored in code or config files
  - Optional SSL CA certificate support from Secret Manager
- **Minimal abstraction**
  - SQL remains explicit and readable
  - No ORM or hidden query generation
  - Returns dictionary-based results (DictCursor)
- **Safe defaults**
  - DROP queries blocked by default
  - UPDATE and DELETE require WHERE clauses
  - Connection timeouts and read/write timeouts configurable
- **Production-grade logging**
  - Uses standard Python logging with module-specific loggers
  - Never configures logging for the user
  - Comprehensive debug, info, warning, and error logging
- **Query operations**
  - Execute raw SQL queries with parameterized support
  - Insert, update, delete with safety checks
  - Batch operations via `executemany`
  - Bulk data loading from files (LOAD DATA LOCAL INFILE)
- **Table and schema management**
  - Create tables from Pydantic models
  - Automatic type inference and MySQL type mapping
  - Additive schema migrations
  - Index creation utilities
- **Python 3.9+ compatible**
  - Tested against modern Python versions

---

## Installation

### From PyPI (recommended for users)

```bash
pip install gcp-mysql
```

### From source (for development)

```bash
git clone https://github.com/yourusername/gcp-mysql.git
cd gcp-mysql
pip install -e .
```

---

## Quick Start

### Using GCP Secret Manager (Recommended)

The easiest way to use `gcp-mysql` in production is via the `from_gcp_secret` factory method:

```python
from gcp_mysql import MySQLService

# Connection mode is determined by environment variables
# See "Connection Modes" section below
db = MySQLService.from_gcp_secret(
    project_id="my-gcp-project",
    secret_id="mysql-credentials",
    version_id="latest",  # or specific version
    ssl_ca_secret_id="mysql-ca-cert",  # optional
)

# Test the connection
if db.test_connection():
    print("Connected successfully!")

# Execute a query
results = db.execute_query("SELECT * FROM users WHERE id = %s", (123,))
for row in results:
    print(row)
```

### Direct Connection

For local development or when not using Secret Manager:

```python
from gcp_mysql import MySQLService

db = MySQLService(
    host="127.0.0.1",
    port=3306,
    user="myuser",
    password="mypassword",
    database="mydb",
    # Optional: use Unix socket for Cloud SQL
    # unix_socket="/cloudsql/project:region:instance",
    connect_timeout=10,
    read_timeout=30,
    write_timeout=30,
)
```

---

## Connection Modes

`gcp-mysql` supports two connection modes, controlled by the `MYSQL_CONNECTION_MODE` environment variable:

### Cloud SQL (Production)

For Cloud Run and other GCP services using Unix domain sockets:

```bash
export MYSQL_CONNECTION_MODE=cloudsql
export CLOUDSQL_INSTANCE="project:region:instance"
```

The library will automatically use `/cloudsql/{CLOUDSQL_INSTANCE}` as the Unix socket path.

### TCP (Local Development)

For local development with Cloud SQL Proxy:

```bash
export MYSQL_CONNECTION_MODE=tcp
export MYSQL_HOST="127.0.0.1"
export MYSQL_PORT="3306"  # Optional, defaults to 3306
```

---

## GCP Secret Manager Setup

Your secret in GCP Secret Manager must contain a JSON object with the following structure:

```json
{
  "USER": "your_mysql_user",
  "PASSWORD": "your_mysql_password",
  "DATABASE": "your_database_name",
  "PORT": 3306
}
```

**Important:** The secret should contain **only** credentials. Connection mode and host configuration are determined by environment variables, not the secret.

### Creating the Secret

```bash
# Create the secret
echo '{
  "USER": "myuser",
  "PASSWORD": "mypassword",
  "DATABASE": "mydb",
  "PORT": 3306
}' | gcloud secrets create mysql-credentials \
  --data-file=- \
  --replication-policy="automatic"
```

### Optional: SSL CA Certificate

If you need to load an SSL CA certificate from Secret Manager:

```python
db = MySQLService.from_gcp_secret(
    project_id="my-gcp-project",
    secret_id="mysql-credentials",
    ssl_ca_secret_id="mysql-ca-cert",  # Secret containing PEM-encoded CA cert
)
```

The CA certificate will be automatically downloaded and written to a temporary file for PyMySQL.

---

## API Reference

### MySQLService

The main service class for database operations.

#### Constructor

```python
MySQLService(
    host: Optional[str] = None,
    port: int = 3306,
    user: Optional[str] = None,
    password: Optional[str] = None,
    database: Optional[str] = None,
    unix_socket: Optional[str] = None,
    table_name: Optional[str] = None,
    ssl_ca_path: Optional[str] = None,
    connect_timeout: int = 10,
    read_timeout: int = 30,
    write_timeout: int = 30,
    autocommit: bool = True,
    local_infile: bool = False,
)
```

**Parameters:**
- `host`: MySQL hostname (ignored if `unix_socket` is set)
- `port`: MySQL port (default: 3306, ignored if `unix_socket` is set)
- `user`: MySQL username (required)
- `password`: MySQL password (required)
- `database`: Database name (required)
- `unix_socket`: Unix socket path for Cloud SQL (e.g., `/cloudsql/project:region:instance`)
- `table_name`: Default table name for convenience methods
- `ssl_ca_path`: Path to SSL CA certificate file
- `connect_timeout`: Connection timeout in seconds (default: 10)
- `read_timeout`: Read timeout in seconds (default: 30)
- `write_timeout`: Write timeout in seconds (default: 30)
- `autocommit`: Enable autocommit mode (default: True)
- `local_infile`: Enable LOAD DATA LOCAL INFILE (default: False, required for `insert_from_file`)

#### Class Methods

##### `from_gcp_secret`

Create a `MySQLService` instance from GCP Secret Manager.

```python
@classmethod
def from_gcp_secret(
    cls,
    *,
    project_id: str,
    secret_id: str,
    version_id: str = "latest",
    ssl_ca_secret_id: Optional[str] = None,
) -> MySQLService
```

**Parameters:**
- `project_id`: GCP project ID
- `secret_id`: Secret Manager secret ID
- `version_id`: Secret version (default: "latest")
- `ssl_ca_secret_id`: Optional secret ID containing SSL CA certificate

**Returns:** `MySQLService` instance

**Raises:**
- `RuntimeError`: If Secret Manager is unavailable or secret is invalid
- `TypeError`: If not called as a classmethod

#### Instance Methods

##### `test_connection`

Test the database connection.

```python
def test_connection(self) -> bool
```

**Returns:** `True` if connection is successful, `False` otherwise

##### `execute_query`

Execute a SQL query (SELECT, CREATE, UPDATE, etc.).

```python
def execute_query(
    self,
    query: str,
    params: Optional[Tuple[Any, ...]] = None,
) -> list[Dict[str, Any]]
```

**Parameters:**
- `query`: SQL query string
- `params`: Optional tuple of parameters for parameterized queries

**Returns:** List of dictionaries (one per row) for queries that return rows, empty list otherwise

**Raises:**
- `ValueError`: If query is empty or contains DROP statement
- `Exception`: Re-raises database errors

**Example:**
```python
# Parameterized query
results = db.execute_query(
    "SELECT * FROM users WHERE email = %s AND active = %s",
    ("user@example.com", True)
)

# Non-parameterized query
tables = db.execute_query("SHOW TABLES")
```

##### `insert`

Insert a single row into a table.

```python
def insert(
    self,
    table_name: str,
    data: Dict[str, Any],
) -> int
```

**Parameters:**
- `table_name`: Table name
- `data`: Dictionary mapping column names to values

**Returns:** Auto-increment ID if available, otherwise 0

**Raises:**
- `ValueError`: If data dictionary is empty
- `Exception`: Re-raises database errors

**Example:**
```python
user_id = db.insert("users", {
    "name": "John Doe",
    "email": "john@example.com",
    "active": True
})
```

##### `update`

Update rows in a table.

```python
def update(
    self,
    table_name: str,
    data: Dict[str, Any],
    where_clause: str,
    where_params: Optional[Tuple[Any, ...]] = None,
) -> int
```

**Parameters:**
- `table_name`: Table name
- `data`: Dictionary mapping column names to new values
- `where_clause`: WHERE clause (required for safety)
- `where_params`: Optional tuple of parameters for WHERE clause

**Returns:** Number of rows affected

**Raises:**
- `ValueError`: If data is empty or WHERE clause is missing
- `Exception`: Re-raises database errors

**Example:**
```python
rows_updated = db.update(
    "users",
    {"active": False, "updated_at": "2024-01-01"},
    "email = %s",
    ("user@example.com",)
)
```

##### `delete`

Delete rows from a table.

```python
def delete(
    self,
    table_name: str,
    where_clause: str,
    where_params: Optional[Tuple[Any, ...]] = None,
) -> int
```

**Parameters:**
- `table_name`: Table name
- `where_clause`: WHERE clause (required for safety)
- `where_params`: Optional tuple of parameters for WHERE clause

**Returns:** Number of rows deleted

**Raises:**
- `ValueError`: If WHERE clause is missing
- `Exception`: Re-raises database errors

**Example:**
```python
rows_deleted = db.delete(
    "users",
    "id = %s",
    (123,)
)
```

##### `executemany`

Execute a query multiple times with different parameters.

```python
def executemany(
    self,
    query: str,
    params_list: Sequence[Tuple[Any, ...]],
) -> int
```

**Parameters:**
- `query`: SQL query string with placeholders
- `params_list`: Sequence of parameter tuples

**Returns:** Number of rows affected (driver-dependent semantics)

**Raises:**
- `ValueError`: If params_list is empty
- `Exception`: Re-raises database errors

**Example:**
```python
users = [
    ("Alice", "alice@example.com"),
    ("Bob", "bob@example.com"),
    ("Charlie", "charlie@example.com"),
]
rows_inserted = db.executemany(
    "INSERT INTO users (name, email) VALUES (%s, %s)",
    users
)
```

##### `insert_from_file`

Load data from a local file using `LOAD DATA LOCAL INFILE`.

```python
def insert_from_file(
    self,
    table_name: str,
    file_path: str,
    columns: Optional[Sequence[str]] = None,
    fields_terminated_by: str = ",",
    fields_enclosed_by: Optional[str] = '"',
    fields_escaped_by: Optional[str] = None,
    lines_terminated_by: str = "\n",
    ignore_lines: int = 0,
    field_overrides: Optional[Dict[str, Any]] = None,
    replace: bool = False,
    ignore_duplicates: bool = False,
) -> int
```

**Parameters:**
- `table_name`: Table name
- `file_path`: Path to CSV/data file
- `columns`: Optional list of column names (if file doesn't match table structure)
- `fields_terminated_by`: Field delimiter (default: ",")
- `fields_enclosed_by`: Field enclosure character (default: '"')
- `fields_escaped_by`: Escape character (default: None)
- `lines_terminated_by`: Line terminator (default: "\n")
- `ignore_lines`: Number of header lines to skip (default: 0)
- `field_overrides`: Dictionary of field values to override during import
- `replace`: Use REPLACE instead of INSERT (default: False)
- `ignore_duplicates`: Use IGNORE to skip duplicates (default: False)

**Returns:** Number of rows loaded

**Raises:**
- `RuntimeError`: If `local_infile` is not enabled on MySQLService
- `FileNotFoundError`: If file doesn't exist
- `Exception`: Re-raises database errors

**Note:** Requires `local_infile=True` when creating MySQLService.

**Example:**
```python
db = MySQLService(..., local_infile=True)
rows_loaded = db.insert_from_file(
    "users",
    "/path/to/users.csv",
    columns=["name", "email", "active"],
    ignore_lines=1,  # Skip CSV header
)
```

---

## Logging

`gcp-mysql` uses Python's standard `logging` module with module-specific loggers. The library **never** configures logging handlers for you, allowing you to control logging in your application.

### Logger Names

The library uses the following logger names:

- `gcp_mysql.service` - Connection and service-level operations
- `gcp_mysql.utils.factory` - GCP Secret Manager factory operations
- `gcp_mysql.utils.query_operations` - Query execution operations
- `gcp_mysql._internal.table_creation` - Table creation and schema management
- `gcp_mysql._internal.index_creation` - Index creation operations

### Log Levels

- **DEBUG**: Detailed information for debugging (query strings, connection details, DDL statements)
- **INFO**: General informational messages (query results, table creation, index creation)
- **WARNING**: Warning messages (e.g., type fallbacks, optional SSL CA load failures)
- **ERROR**: Error messages with full exception traces

### Configuring Logging

#### Basic Configuration

```python
import logging

# Configure root logger
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Or configure specific loggers
logging.getLogger('gcp_mysql').setLevel(logging.DEBUG)
```

#### Get Library-Specific Logs

To capture only `gcp-mysql` logs:

```python
import logging

# Configure gcp_mysql logger specifically
gcp_mysql_logger = logging.getLogger('gcp_mysql')
gcp_mysql_logger.setLevel(logging.DEBUG)

# Create a handler
handler = logging.StreamHandler()
handler.setFormatter(
    logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
)
gcp_mysql_logger.addHandler(handler)
```

#### Example: Structured Logging

```python
import logging
import json

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_data = {
            'timestamp': self.formatTime(record),
            'logger': record.name,
            'level': record.levelname,
            'message': record.getMessage(),
        }
        if record.exc_info:
            log_data['exception'] = self.formatException(record.exc_info)
        return json.dumps(log_data)

# Configure for gcp_mysql
logger = logging.getLogger('gcp_mysql')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
```

#### Filtering by Module

To get logs from specific modules:

```python
import logging

# Only service and query operations
logging.getLogger('gcp_mysql.service').setLevel(logging.DEBUG)
logging.getLogger('gcp_mysql.utils.query_operations').setLevel(logging.DEBUG)

# Suppress internal operations
logging.getLogger('gcp_mysql._internal').setLevel(logging.WARNING)
```

### Log Examples

When using the library, you'll see logs like:

```
INFO:gcp_mysql.utils.query_operations:Query returned 5 row(s)
DEBUG:gcp_mysql.service:Test connection result: {'test': 1}
INFO:gcp_mysql._internal.table_creation:Ensuring table exists: users
DEBUG:gcp_mysql._internal.table_creation:DDL:
CREATE TABLE IF NOT EXISTS `users` (
  `id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(1024) NOT NULL,
  `email` VARCHAR(255) NOT NULL,
  `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
```

---

## Safety Features

### DROP Statement Protection

The library blocks all DROP statements by default:

```python
# This will raise ValueError
db.execute_query("DROP TABLE users")  # ❌ Raises ValueError
```

### WHERE Clause Requirements

UPDATE and DELETE operations require explicit WHERE clauses:

```python
# ✅ Allowed
db.update("users", {"active": False}, "id = %s", (123,))

# ❌ Raises ValueError
db.update("users", {"active": False}, "")  # Missing WHERE clause
```

### Connection Management

Each operation opens a new connection via a context manager, ensuring connections are always properly closed, even if an exception occurs.

---

## Table and Schema Management

The library includes utilities for creating tables from Pydantic models and managing schemas. These are available in the `_internal` module and can be used directly:

```python
from gcp_mysql._internal.table_creation import create_table_if_not_exists
from gcp_mysql._internal.index_creation import create_index_if_not_exists
from pydantic import BaseModel

class User(BaseModel):
    id: int
    name: str
    email: str
    active: bool = True

# Create table from model
create_table_if_not_exists(db, User, table_name="users")

# Create index
create_index_if_not_exists(db, "users", "idx_email", ["email"], unique=True)
```

### Type Mapping

The library automatically maps Python types to MySQL types:

- `int` → `INT` (or `BIGINT UNSIGNED AUTO_INCREMENT` for `id` fields)
- `str` → `VARCHAR(255)` (or `VARCHAR(1024)` for `name` fields, `TEXT` for `description` fields)
- `bool` → `TINYINT(1)`
- `float` → `DECIMAL(10,2)`
- `list`/`dict` → `JSON`
- `Optional[T]` → `T NULL`
- `datetime`/`date` → `TIMESTAMP`/`DATE`

---

## Error Handling

All database operations re-raise exceptions from PyMySQL, allowing you to handle them in your application:

```python
from pymysql import OperationalError, IntegrityError

try:
    db.insert("users", {"email": "duplicate@example.com"})
except IntegrityError as e:
    print(f"Duplicate entry: {e}")
except OperationalError as e:
    print(f"Database error: {e}")
```

---

## Requirements

- Python 3.9+
- PyMySQL
- Pydantic (for table creation utilities)
- google-cloud-secret-manager (optional, for `from_gcp_secret`)

---

## License

See [LICENSE](LICENSE) file for details.

---

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.
