Metadata-Version: 2.4
Name: infra-core
Version: 0.7.1
Summary: Shared HTTP, storage, and runtime helpers for automation services.
Author-email: Infra Core maintainers <infra-core@example.com>
Maintainer-email: Infra Core maintainers <infra-core@example.com>
License-Expression: MIT
Project-URL: Homepage, https://github.com/pj-ms/infra-core
Project-URL: Repository, https://github.com/pj-ms/infra-core
Project-URL: Issues, https://github.com/pj-ms/infra-core/issues
Keywords: http,storage,azure,async,infrastructure
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries
Classifier: Typing :: Typed
Requires-Python: >=3.11
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: requests<3.0,>=2.31.0
Requires-Dist: httpx<1.0,>=0.27.0
Requires-Dist: tenacity<9.0,>=8.2.2
Provides-Extra: azure
Requires-Dist: azure-core<2.0,>=1.29; extra == "azure"
Requires-Dist: azure-storage-blob<13.0,>=12.12; extra == "azure"
Requires-Dist: azure-identity<2.0,>=1.17; extra == "azure"
Requires-Dist: azure-data-tables<13.0,>=12.5.0; extra == "azure"
Provides-Extra: dev
Requires-Dist: pytest>=8.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.23; extra == "dev"
Requires-Dist: pytest-cov>=4.0; extra == "dev"
Requires-Dist: pytest-httpx>=0.30; extra == "dev"
Requires-Dist: respx>=0.20; extra == "dev"
Requires-Dist: mypy>=1.8; extra == "dev"
Requires-Dist: ruff>=0.1; extra == "dev"
Requires-Dist: pre-commit>=3.6; extra == "dev"
Requires-Dist: bump-my-version>=0.26.0; extra == "dev"
Dynamic: license-file

# Infra Core

[![PyPI](https://img.shields.io/pypi/v/infra-core.svg)](https://pypi.org/project/infra-core/)
[![Python Versions](https://img.shields.io/pypi/pyversions/infra-core.svg)](https://pypi.org/project/infra-core/)
[![Build Status](https://github.com/pj-ms/infra-core/workflows/Build%20and%20Test/badge.svg)](https://github.com/pj-ms/infra-core/actions)
[![License](https://img.shields.io/badge/license-MIT-blue.svg)](LICENSE)

`infra-core` contains the reusable HTTP, storage, Azure upload, and runtime helpers shared by multiple services.

Supported Python versions: **3.11 · 3.12 · 3.13** (3.13 added to the testing matrix as soon as it is available in the execution environment).


## Modules

| Module                           | Purpose                                                                                                    |
| -------------------------------- | ---------------------------------------------------------------------------------------------------------- |
| `infra_core.http` / `http_async` | Request helpers with sensible defaults and retry/backoff logic.                                            |
| `infra_core.fs_utils`            | Local filesystem helpers (`ensure_parent`, `hashed_asset_path`, `compute_checksum`).                       |
| `infra_core.asset_client`        | Async asset downloader with retries, connection pooling, and checksum support.                             |
| `infra_core.polling`             | Reusable polling utilities with exponential backoff for status-based operations (`poll_until`, `StatusPoller`). |
| `infra_core.azure.client`        | Azure Function-oriented HTTP clients that inject API keys, honor timeout/retry overrides, and support sync wrappers. |
| `infra_core.azure.job_store`     | Lightweight Azure Table Storage job tracker with environment-driven configuration and caching.            |
| `infra_core.azure.monitoring`    | Monitoring helpers for heartbeats, run logs, and telemetry enrichment with mirror logging.                |
| `infra_core.azure.storage`       | Thin Azure Blob client with configurable retries, local mirroring, and async helpers.                      |
| `infra_core.task_runtime`        | Cooperative asyncio runtime (`TaskRuntime`) with per-task budgeting.                                       |

## Quick Start

### HTTP Fetching
```python
from infra_core import fetch, fetch_async

html = fetch("https://example.com", timeout=30)

async def load_async() -> str:
    return await fetch_async("https://example.com")
```

### Storage with Optional Azure Mirroring
```python
from pathlib import Path
from infra_core import AzureStorageClient, AzureStorageSettings, download_asset

# Load settings with mode (v0.6.0+)
settings, mode = AzureStorageSettings.from_env()
storage = AzureStorageClient.from_settings(settings, mode)
storage.write_json(Path("output/results.json"), {"status": "ok"})

asset_path = download_asset(
    "https://example.com/image.png",
    Path("assets/image.png"),
    skip_if_exists=True,
)
```

### Run Storage Manifests

Generate enriched manifests for run artifacts with automatic file type detection, MIME types, and blob URLs:

```python
from pathlib import Path
from infra_core.azure.run_storage import build_manifest
from infra_core.azure.storage import AzureStorageSettings, get_client

# Build manifest for run artifacts (v0.6.0+)
storage_path = Path("/tmp/runs/batch-123/job-456/outputs")
settings, mode = AzureStorageSettings.from_env_with_prefix("MYSERVICE")
azure_client = get_client(settings, mode)

manifest = build_manifest(storage_path, client=azure_client)

# Manifest structure (v0.4.0+)
{
    "files": [
        {
            "path": "screenshot.png",
            "type": "image",                    # Generic category: image, document, data, other
            "content_type": "image/png",        # MIME type via mimetypes library
            "size_bytes": 245680,
            "url": "https://account.blob.core.windows.net/container/blob/path"  # Full blob URL
        },
        {
            "path": "metadata.json",
            "type": "data",
            "content_type": "application/json",
            "size_bytes": 1024,
            "url": "https://..."
        }
    ],
    "storage": {
        "provider": "azure_blob",
        "container": "output-container",
        "prefix": "runs/batch-123/job-456/outputs"
    },
    "summary": {
        "total_files": 2,
        "total_size_bytes": 246704,
        "file_types": {"image": 1, "data": 1}
    }
}
```

**Migration from v0.3.x:**

```python
# Old format (v0.3.x)
storage_path = manifest["storage_path"]
blob_prefix = manifest["blob_prefix"]
files = manifest["files"]

# New format (v0.4.0+)
storage = manifest.get("storage", {})
blob_prefix = storage.get("prefix")
files = manifest["files"]  # Now includes type, content_type, size_bytes, url

# Access file URLs directly
for file in files:
    print(f"{file['path']}: {file['url']}")
```

**Features:**
- **Automatic MIME type detection** using Python's `mimetypes` library
- **Generic file categorization** (image, document, data, other) for flexible organization
- **Full blob URLs** with proper encoding of special characters (spaces, unicode, #, ?, &)
- **Summary statistics** for efficient UI rendering
- **Prefers local files** with automatic fallback to blob listing when local directory is empty

### Concurrent Task Runtime
```python
import asyncio
from infra_core import TaskRuntime, RuntimeConfig

async def process(item: str) -> None:
    ...

async def main() -> None:
    runtime = TaskRuntime(config=RuntimeConfig(concurrency=5, task_timeout=30.0))
    tasks = [(item, lambda item=item: process(item)) for item in ["a", "b", "c"]]
    await runtime.run(tasks)

asyncio.run(main())
```

#### Task Runtime Semantics

`TaskRuntime` enforces the configured concurrency per `run()` call by limiting the `inflight` set of tasks it schedules at once. A separate `_active_tasks` set tracks all tasks spawned across overlapping `run()` calls so that a single `cancel()` sweeps everything that is currently executing. As a result `_active_tasks` can temporarily exceed `config.concurrency`, which is expected and keeps cancellation comprehensive while each `run()` call still honours the configured bound.

By default, any task exception (including per-task timeouts) is propagated back to the caller so failure is obvious. Supplying `on_error` and/or `on_timeout` callbacks opts you into best-effort mode where the runtime reports failures via callbacks and continues processing the remaining tasks.

### Status Polling

```python
from infra_core.polling import poll_until, StatusPoller, PollingConfig

# Simple polling with timeout
result = poll_until(
    fetch_fn=lambda: api_client.get_status(job_id),
    is_terminal=lambda r: r["status"] in {"completed", "failed"},
    timeout=600.0,
    poll_interval=5.0,
)

# High-level polling with progress callbacks
def fetch_status():
    return {"status": "running", "progress": 50}

config = PollingConfig(
    timeout=600.0,
    poll_interval=5.0,
    terminal_statuses={"completed", "failed"},
    failure_statuses={"failed"},
    initial_delay=2.0,
    backoff_multiplier=2.0,
    max_backoff=30.0,
)

poller = StatusPoller(fetch_fn=fetch_status, config=config)
final_status = poller.poll(
    on_poll=lambda s: print(f"Progress: {s.get('progress')}%")
)
```

Async polling is also supported via `poll_until_async` and `StatusPoller.poll_async()`. Both the low-level helpers and `StatusPoller` honour the `PollingConfig` backoff settings (including `initial_delay` and exponential backoff bounded by `max_backoff`), so increasing `backoff_multiplier` automatically slows subsequent polls without reimplementing the loop.

## Installation

Using [uv](https://github.com/astral-sh/uv) (recommended):

```bash
uv pip install infra-core
```

Install with Azure helpers:

```bash
uv pip install "infra-core[azure]"
```

Or using pip:

```bash
pip install infra-core
pip install "infra-core[azure]"  # with Azure support
```

## Tests

Matrix: Python 3.11, 3.12, and 3.13 (once the interpreter is available in the execution environment).

```bash
pytest tests -v
```

Or with coverage:

```bash
pytest tests -v --cov=infra_core --cov-report=term-missing
```

## Configuration

### Azure Storage (Optional)

When Azure credentials are provided, `infra_core` mirrors files written via helpers such as `write_json`, `write_text`, and their async counterparts.

**Required**
- `AZURE_STORAGE_CONTAINER` – Target container name.

**Authentication (choose one)**
- `AZURE_STORAGE_CONNECTION_STRING` – Full connection string; or
- `AZURE_STORAGE_ACCOUNT` – Storage account name (uses DefaultAzureCredential)
  - `AZURE_STORAGE_BLOB_ENDPOINT` – Optional custom endpoint.

**Optional**
- `AZURE_STORAGE_BLOB_PREFIX` – Prefix applied to uploaded blobs.
- `AZURE_STORAGE_MODE` – Storage mode: `strict` (default, raises on missing config), `best-effort` (logs and skips), or `dev-skip` (never uploads).

#### Blob Download Safety

Blob download helpers stream data into unique temp files via `tempfile.mkstemp()` (see `azure_storage._stream_blob_to_path*`). The files are placed next to the destination, created with owner-only permissions, flushed and `fsync`'d, then atomically renamed into place. This preserves original extensions, avoids collisions when multiple processes download the same blob, and prevents partially-downloaded data from clobbering the real file.

No configuration is required to use the HTTP or runtime helpers; sensible defaults are provided.

## Azure-specific helpers

### Storage mirroring & downloads

`infra_core.azure.storage.AzureStorageClient` provides Azure Blob Storage operations with explicit configuration. Settings and mode must be passed explicitly to prevent accidental cross-service configuration conflicts.

```python
from infra_core.azure import AzureStorageSettings, get_client

# Load settings with service-specific prefix (recommended for multi-service deployments)
settings, mode = AzureStorageSettings.from_env_with_prefix("CRAWLER")
# Reads: CRAWLER_AZURE_STORAGE_CONTAINER, CRAWLER_AZURE_STORAGE_MODE, etc.

# Or use default env vars
settings, mode = AzureStorageSettings.from_env()
# Reads: AZURE_STORAGE_CONTAINER, AZURE_STORAGE_MODE, etc.

client = get_client(settings, mode)
client.upload_text(path, "content")
client.download_tree(target_path)
```

**Modes:**
- `strict` (default): Container + credentials required; `ConfigurationError` if missing
- `best-effort`: Log and skip when config is missing
- `dev-skip`: Never attempt uploads (for local development)

### Azure Function HTTP clients

`infra_core.azure.client.AzureFunctionAsyncClient` provides a retry/backoff wrapper around `httpx`, automatically injects `x-functions-key` when `api_key` is configured, and supports `RequestOptions` to tune retries, delays, and timeouts at each call site. The sync counterpart (`AzureFunctionSyncClient`) simply runs the async flow via `asyncio.run()` so CLI scripts can reuse the same configuration without juggling event loops. Both helpers offer `.from_env()` helpers to materialize clients directly from `AZURE_FUNCTION_BASE_URL` and `AZURE_FUNCTION_API_KEY` (or similar) environment variables.

### Azure Table job store

`infra_core.azure.job_store.AzureTableJobStore` and `TableConfig.from_env()` share consistent env vars (`AZURE_JOB_TABLE`, `AZURE_JOB_CONNECTION`, fallback to `AZURE_STORAGE_*`) plus configurable cache size (via `functools.lru_cache`). Each record implements `to_entity()` / `from_entity()` so you can store structured run metadata and look up retries via `fetch()` plus `list_runs()` pagination helpers.

### Monitoring & telemetry

`infra_core.azure.monitoring` keeps a running heartbeat, mirrors logs to blob storage, and validates telemetry payloads before emitting them. Pair that module with the HTTP helpers to report failures from Azure Functions, and enable the `infra_core.azure.monitoring.run_log_writer` when you need consistent log labeling across services.

```python
from infra_core.azure.monitoring import TelemetryConfig, TelemetryClient, TelemetryEvent

# Explicit telemetry configuration (v0.7.0+)
config = TelemetryConfig.from_env_with_prefix("CRAWLER")
# Reads: CRAWLER_APPLICATIONINSIGHTS_CONNECTION_STRING
# Falls back to: APPLICATIONINSIGHTS_CONNECTION_STRING, APPINSIGHTS_INSTRUMENTATIONKEY

client = TelemetryClient(config=config, service_name="crawler")
client.track_event(TelemetryEvent.STARTED, batch_id="batch-1", job_id="job-1")
client.track_event(TelemetryEvent.COMPLETED, batch_id="batch-1", metadata={"duration_ms": 1500})
```

### Sample environment (.env) snippet

```bash
# Blob mirroring
export AZURE_STORAGE_CONTAINER="crawler-artifacts"
export AZURE_STORAGE_CONNECTION_STRING="UseDevelopmentStorage=true"  # Azurite/local
export AZURE_STORAGE_BLOB_PREFIX="screenshots/"

# Azure Function triggers
export AZURE_FUNCTION_BASE_URL="https://my-team-func.azurewebsites.net"
export AZURE_FUNCTION_API_KEY="your-secret-key"

# Azure Table job tracking
export AZURE_JOB_TABLE="crawlerRuns"
export AZURE_JOB_CONNECTION="UseDevelopmentStorage=true"
```

### CLI wiring example

Use `infra_core.azure.function_cli.add_azure_service_arguments()` to add the shared Azure flags to any argparse-based entrypoint, then call into `AzureFunctionAsyncClient` with values resolved from flags or environment variables:

```python
import argparse
import asyncio
import os

from infra_core.azure.client import AzureFunctionAsyncClient
from infra_core.azure.function_cli import add_azure_service_arguments

parser = argparse.ArgumentParser()
add_azure_service_arguments(parser, base_url_env_var="AZURE_FUNCTION_BASE_URL")
args = parser.parse_args()

async def main() -> None:
    client = AzureFunctionAsyncClient(
        args.base_url or os.environ["AZURE_FUNCTION_BASE_URL"],
        api_key=os.getenv("AZURE_FUNCTION_API_KEY"),
    )
    payload = await client.post_json(
        "/api/run",
        json={"batchId": args.batch_id, "concurrency": args.concurrency},
    )
    print(payload)

asyncio.run(main())
```

Pair this with the `.env` snippet above and you have a ready-to-run CLI that schedules Azure Function batches, polls for completion (via the parser defaults), and mirrors summaries to blob storage when `--summary-file` is provided.


## Logging

`infra_core` uses Python's standard `logging` module. To enable diagnostics in your application:

```python
import logging

logging.basicConfig(level=logging.INFO)
logging.getLogger("infra_core.asset_client").setLevel(logging.DEBUG)
```

Logger namespaces:

| Logger | Purpose |
|--------|---------|
| `infra_core.asset_client` | Download/retry lifecycle |
| `infra_core.polling` | Polling attempts, backoff, and terminal states |
| `infra_core.azure.storage` | Blob uploads/downloads |
| `infra_core.task_runtime` | Concurrency and cancellation events |

All log records include structured `extra={...}` fields (e.g., `url`, `blob_name`, `attempt`). Configure your formatter (JSON or text) to emit those keys for easier filtering, and sanitize environment-specific secrets before forwarding logs.

### OpenTelemetry/trace correlation

If your application uses OpenTelemetry, start spans around infra_core operations and add span IDs to log records so traces and logs stay aligned:

```python
from opentelemetry import trace

tracer = trace.get_tracer(__name__)

with tracer.start_as_current_span("download_batch") as span:
    logger = logging.getLogger("infra_core.asset_client")
    logger.info(
        "Starting download",
        extra={
            "url": url,
            "trace_id": span.get_span_context().trace_id,
            "span_id": span.get_span_context().span_id,
        },
    )
    await download_asset_async(url, dest)
```

## Troubleshooting

- **"Azure storage not configured" warning** – Ensure `AZURE_STORAGE_CONTAINER` is set along with either `AZURE_STORAGE_CONNECTION_STRING` or `AZURE_STORAGE_ACCOUNT`. Call `AzureStorageClient.from_settings()` after loading environment variables to confirm configuration.
- **`download_asset` raises inside async code** – Use `download_asset_async` when an event loop is running; the sync helper intentionally fails inside `asyncio` contexts to avoid deadlocks.
- **Type checker cannot find infra_core stubs** – Install dev extras (`pip install .[dev]` or `uv sync --extra dev`) so `py.typed` and dependency stubs are available to mypy/pyright.
- **HTTP retries still hitting rate limits** – Pass a `delay` to `fetch`/`fetch_async` or construct a custom `RequestsHttpClient`/`AsyncHttpClient` with tuned limits and headers.
- **Large Azure uploads timing out** – Use the async helpers (they stream files) and tweak `AzureStorageClient` settings (e.g., `swallow_errors=False`, custom retry logic) to observe detailed failures.

## Contributing

Using [uv](https://github.com/astral-sh/uv) (recommended):

```bash
git clone https://github.com/pj-ms/infra-core.git
cd infra-core
uv sync --extra azure --extra dev
uv run pytest tests -v --cov=infra_core --cov-report=term-missing
uv run mypy src/infra_core
uv run ruff check src/ tests/
```

See [CONTRIBUTING.md](CONTRIBUTING.md) for more details.

## License

MIT License - see [LICENSE](LICENSE) for details.
