Metadata-Version: 2.4
Name: vcm-file-uploader
Version: 0.1.0
Summary: A Python library for uploading workflow artifacts to Amazon S3 using STS credentials.
Author: ATTMOS Inc.
License-Expression: MIT
License-File: LICENSE
Requires-Python: >=3.10
Requires-Dist: boto3>=1.28
Requires-Dist: file-watchman>=0.1.0
Provides-Extra: dev
Requires-Dist: boto3-stubs[s3]; extra == 'dev'
Requires-Dist: moto[s3]>=5.0; extra == 'dev'
Requires-Dist: mypy>=1.0; extra == 'dev'
Requires-Dist: pytest>=7.0; extra == 'dev'
Requires-Dist: ruff>=0.4; extra == 'dev'
Description-Content-Type: text/markdown

# vcm-file-uploader

A Python library for uploading workflow artifacts to Amazon S3 using temporary STS credentials.

## Features

- **STS credential sessions** — scoped uploads using temporary AWS credentials with prefix enforcement
- **boto3 TransferManager** — configurable multipart thresholds, chunk sizes, and concurrency
- **Automatic retries** — exponential backoff with adaptive retry mode for transient S3 failures
- **ASCII log compression** — gzip compression for large text files with stability detection
- **Growing log segmentation** — incremental byte-range uploads for actively growing log files
- **JSONL state management** — append-only, last-write-wins state tracking with compaction
- **Manifest generation** — JSON manifests recording all uploaded objects with metadata
- **File monitoring** — directory scanning, diffing, and upload plan generation via BackupWatchman
- **Partial failure reporting** — structured exit codes (0/1/2) for programmatic callers

## Installation

```bash
pip install vcm-file-uploader
```

With development dependencies:

```bash
pip install "vcm-file-uploader[dev]"
```

Requires Python 3.10+.

## Quick Start

```python
from vcm_file_uploader import STSUploadSession, UploadPlan, ManifestWriter

# Load an upload plan produced by BackupWatchman
plan = UploadPlan.from_json("upload_plan.json")

# Create a session with STS credentials and upload
with STSUploadSession(
    aws_access_key_id="AKIA...",
    aws_secret_access_key="secret",
    aws_session_token="token",
    region="us-east-1",
    bucket="my-bucket",
    prefix="jobs/12345/",
) as session:
    summary = session.upload_files(plan)

    # Write a manifest recording what was uploaded
    manifest = ManifestWriter(session)
    manifest.add_summary(summary)
    manifest.write_manifest()

# Check results programmatically
print(f"Uploaded {len(summary.succeeded)} files ({summary.total_bytes} bytes)")
sys.exit(summary.exit_code)
```

## Usage Examples

### Basic Upload with UploadPlan

```python
from vcm_file_uploader import STSUploadSession, UploadPlan

# Load from JSON file
plan = UploadPlan.from_json("/path/to/upload_plan.json")

# Or construct from a dictionary
plan = UploadPlan.from_dict({
    "files": [
        {"path": "output/results.csv", "full_path": "/data/output/results.csv", "size": 1048576},
        {"path": "logs/run.log", "full_path": "/data/logs/run.log", "size": 524288},
    ]
})

print(f"{len(plan)} files, {plan.total_bytes} bytes, {plan.multipart_count} multipart")

with STSUploadSession(
    aws_access_key_id="AKIA...",
    aws_secret_access_key="secret",
    aws_session_token="token",
    region="us-east-1",
    bucket="my-bucket",
    prefix="jobs/12345/",
) as session:
    summary = session.upload_files(plan)

    if summary.all_succeeded:
        print("All uploads succeeded")
    else:
        for r in summary.failed:
            print(f"FAILED: {r.local_path}: {r.error}")
```

### File Monitoring with BackupWatchman

```python
from vcm_file_uploader import BackupWatchman, UploadPlan

watchman = BackupWatchman(
    watch_dir="/data/workspace",
    output_dir="/data/state",
    multipart_threshold=50 * 1024 * 1024,  # 50 MB
)

# Run a scan-diff-plan cycle
plan_dict = watchman.scan()

# The result can be loaded as an UploadPlan
plan = UploadPlan.from_dict(plan_dict)
print(f"Found {len(plan)} files to upload")
```

### Log Compression

```python
from vcm_file_uploader import maybe_compress, compress_file

# Conditionally compress large ASCII files (>= 64 MiB by default)
result = maybe_compress("/data/logs/simulation.log")
if result:
    compressed_path, info = result
    print(f"Compressed {info.original_bytes} -> {info.compressed_bytes} bytes")

# Force compression with custom settings
compressed_path, info = compress_file(
    "/data/logs/output.log",
    level=9,
    output_path="/tmp/output.log.gz",
)

# Compress log-category files (skips ASCII detection)
result = maybe_compress("/data/logs/binary.log", category="log", threshold=32 * 1024 * 1024)
```

### Growing Log Segmentation

```python
from vcm_file_uploader import LogSegmenter, JsonlStore

store = JsonlStore("/data/state/ascii_segments.jsonl", key_field="segment_key")
segmenter = LogSegmenter(store)

# Detect new bytes in a growing log file
segments = segmenter.get_new_segments("/data/logs/amber.mdout")

for segment in segments:
    print(f"Segment {segment.segment_index}: offset={segment.offset}, length={segment.length}")
    # Upload each segment
    s3_key = f"jobs/12345/{segment.segment_key_suffix}"
    result = segmenter.upload_segment(segment, session, s3_key)
```

### Manifest Generation

```python
from vcm_file_uploader import ManifestWriter

manifest = ManifestWriter(session, run_id="abc123")

# Add results from an upload batch
manifest.add_summary(summary)

# Or add individual results
manifest.add_result(single_result)

# Upload the manifest JSON to S3
manifest.write_manifest()

# Inspect the manifest contents
print(manifest.to_dict())
```

### JSONL State Management

```python
from vcm_file_uploader import StateManager, JsonlStore

# High-level: StateManager provides pre-configured stores
state = StateManager("/data/state")
state.files.append({"path": "output.csv", "uploaded": True, "size": 1024})
state.segments.append({"segment_key": "log.seg0001", "offset": 4096})

entry = state.files.get("output.csv")
all_files = state.files.read_all()

# Compact all stores (deduplicates on disk)
counts = state.compact_all()

# Low-level: use JsonlStore directly
store = JsonlStore("/data/state/custom.jsonl", key_field="id")
store.append({"id": "item-1", "status": "done"})
store.append({"id": "item-1", "status": "updated"})  # overwrites on read
latest = store.get("item-1")  # {"id": "item-1", "status": "updated"}
store.compact()  # deduplicates the file on disk
```

## Configuration

### TransferConfig

Controls boto3 S3 TransferManager behavior:

| Field | Default | Description |
|---|---|---|
| `multipart_threshold` | 128 MiB | File size threshold for multipart uploads |
| `multipart_chunksize` | 128 MiB | Size of each multipart chunk |
| `max_concurrency` | 8 | Maximum concurrent upload threads |

```python
from vcm_file_uploader import TransferConfig, STSUploadSession

config = TransferConfig(
    multipart_threshold=64 * 1024 * 1024,
    multipart_chunksize=64 * 1024 * 1024,
    max_concurrency=4,
)

session = STSUploadSession(..., transfer_config=config)
```

### Retry Configuration

| Parameter | Default | Description |
|---|---|---|
| `max_retries` | 5 | Maximum retry attempts for transient S3 errors |
| `retry_mode` | `"adaptive"` | boto3 retry mode (`"adaptive"`, `"standard"`, or `"legacy"`) |

### Compression Defaults

| Constant | Value | Description |
|---|---|---|
| `DEFAULT_COMPRESS_THRESHOLD` | 64 MiB | Minimum file size for automatic compression |
| `DEFAULT_GZIP_LEVEL` | 6 | Gzip compression level (0-9) |
| `STABILITY_WAIT` | 2.0 s | Wait time between file size checks |

## API Reference

See [docs/api.md](docs/api.md) for the complete API reference.

## Development

```bash
# Install in editable mode with dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Run linter
ruff check .

# Run type checker
mypy src/
```

## License

MIT — see [LICENSE](LICENSE) for details.
