# Flow SDK

Flow SDK is a Python library for seamless GPU compute orchestration across cloud providers. It provides a unified interface for submitting computational workloads, managing distributed training, and handling infrastructure complexity.

## Core Architecture

The SDK follows a layered architecture with clear separation of concerns:

### 1. User Interface Layer (`flow/`)
- **flow.py**: Main Flow class providing high-level API (run, status, cancel)
- **models.py**: Pydantic models for TaskConfig, VolumeSpec, InstanceType, TaskStatus
- **invoke.py**: Zero-import remote execution functionality
- **decorator.py**: Modal-compatible decorator interface (@app, @function)

### 2. Frontend Adapters (`flow/frontends/`)
Multiple input formats converging to unified TaskConfig:
- **yaml/**: YAML configuration files → TaskConfig
- **slurm/**: SLURM sbatch scripts → TaskConfig  
- **submitit/**: Facebook Submitit compatibility → TaskConfig
- **cli/**: Command-line interface → TaskConfig

### 3. Core Domain (`flow/core/`)
- **instance.py**: Instance type parsing, canonicalization, GPU specifications
- **catalog/**: Instance catalog with capability-based search
- **task_engine.py**: Task lifecycle management

### 4. Provider Abstraction (`flow/providers/`)
- **base.py**: IProvider interface defining provider contract
- **factory.py**: Provider creation using registry pattern
- **registry.py**: Provider registration and discovery

#### FCP Provider (`flow/providers/fcp/`)
Complete implementation for FCP (ML Foundry) cloud:
- **provider.py**: Main provider implementation (includes _package_local_code method)
- **instance_resolver.py**: Multi-strategy instance resolution
- **auction_finder.py**: Spot instance auction discovery
- **startup_builder.py**: Startup script generation (10KB limit for FCP)
- **script_sections.py**: Modular script sections including CodeUploadSection
- **lifecycle_manager.py**: Instance lifecycle management
- **error_handler.py**: FCP-specific error handling

**FCP Environment**: Ubuntu 22.04, bash shell, 10KB script limit, first-boot execution only

**Code Upload Implementation**:
- `_package_local_code()` in provider.py creates gzipped tar archive
- Archive passed via environment variable `_FLOW_CODE_ARCHIVE`
- `CodeUploadSection` in script_sections.py extracts to /workspace
- Docker container working directory set to /workspace when upload_code=True

### 5. Supporting Services
- **data/**: Data loading and S3/URL resolution
- **storage/**: Volume and persistent storage management
- **utils/**: Retry logic, validation, security utilities
- **observability/**: OpenTelemetry integration

## Key Concepts

### Instance Type Resolution
The SDK supports multiple instance type formats:

**Supported formats at Flow SDK level:**
1. **Simple name**: `"a100"`, `"h100"` (defaults to single GPU)
2. **Count prefix**: `"2xa100"`, `"4xa100"`, `"8xa100"`, `"8xh100"`
3. **FCP format**: `"a100-80gb.sxm.1x"`, `"h100-80gb.pcie.1x"` (exact match)
4. **Direct FID**: `"it_MsIRhxj3ccyVWGfP"` (if you know the FID)

**NOT supported** (will raise error):
- `"a100x8"` or `"a100*8"` style syntax
- `"a100-80gb"` without full specification
- `"gpu.nvidia.a100"` style prefixes
- Case variations like `"A100"` or `"H100"`
- Vendor prefixes like `"nvidia-a100"`

**Resolution mapping (as implemented):**
```python
# From provider.py line 1408
"a100" → "it_MsIRhxj3ccyVWGfP"  # 1x A100
"1xa100" → "it_MsIRhxj3ccyVWGfP"
"2xa100" → "it_5M6aGxGovNeX5ltT"
"4xa100" → "it_fK7Cx6TVhOK5ZfXT"
"8xa100" → "it_J7OyNf9idfImLIFo"
"a100-80gb.sxm.1x" → "it_MsIRhxj3ccyVWGfP"  # FCP format also works
"h100" → "it_5ECSoHQjLBzrp5YM"  # Defaults to 8x
"8xh100" → "it_5ECSoHQjLBzrp5YM"
"h100-80gb.pcie.1x" → "it_XqgKWbhZ5gznAYsG"  # Different FID for PCIe
```

### Task Lifecycle
1. **Configuration**: User provides TaskConfig (directly or via frontend)
2. **Resolution**: Find suitable instances based on requirements
3. **Submission**: Create cloud resources and start execution
4. **Monitoring**: Track status, stream logs, handle failures
5. **Cleanup**: Terminate resources, persist outputs

### Multi-Provider Design
- Providers implement IProvider interface
- Flow class delegates to appropriate provider
- Provider selection based on API endpoint
- Extensible to AWS, GCP, Azure, Lambda Labs

## API Surface

### Primary API (flow.py)
```python
flow = Flow()
task = flow.run(config)  # Submit task
status = task.status()   # Check status
logs = task.logs()      # Get logs
task.stop()            # Cancel/terminate
```

### Convenience Functions
```python
import flow

# Submit a task - automatically uploads local files
task = flow.run("python train.py", instance_type="a100")
task = flow.run(config)  # With TaskConfig
task = flow.run("job.yaml")  # From YAML file

# Code upload behavior (new in v2)
task = flow.run("python train.py", instance_type="a100")  # upload_code=True by default
task = flow.run("python /app/train.py", instance_type="a100", 
                image="myapp:latest", upload_code=False)  # Use pre-built image

# Check status
status = task.status  # Returns: "pending"|"running"|"completed"|"failed"|"cancelled"
# Or for existing task:
status = flow.status(task_id)  # Still supported for backward compatibility

# Cancel task
task.cancel()
# Or for existing task:
flow.cancel(task_id)  # Still supported for backward compatibility

# Get task object by ID
task = flow.get_task(task_id)  # Returns Task object with all methods
user = task.get_user()  # Get user who created task
instances = task.get_instances()  # Get instance details with IPs
```

### Data Mounting (flow.submit)
```python
# Mount data from S3 or volumes
task = flow.submit(
    "python train.py",
    gpu="a100",
    data="s3://bucket/dataset"  # Mounts at /data
)

# Multiple data sources
task = flow.submit(
    "python train.py",
    gpu="a100:4",
    data={
        "/datasets": "s3://ml-bucket/imagenet",
        "/models": "volume://pretrained-models",  # Auto-creates if missing
        "/cache": "volume://build-cache"
    }
)
```

### Configuration (models.py)

#### TaskConfig Fields (Common Mistakes)
```python
from flow.models import TaskConfig

config = TaskConfig(
    # Basic configuration
    name="gpu-training",                          # Required, alphanumeric + dash/underscore
    
    # Instance specification (exactly ONE of these)
    instance_type="a100",                         # "a100", "2xa100", "8xh100", etc.
    # OR
    min_gpu_memory_gb=80,                         # For capability-based selection
    
    # Command specification (exactly ONE of these)
    command=["python", "train.py", "--epochs", "10"],  # List[str] - preferred
    # OR
    shell="cd /app && python train.py",          # Shell command string
    # OR  
    script="import torch\nprint('hello')",       # Python/bash script content
    
    # Environment - COMMON MISTAKE!
    env={"CUDA_VISIBLE_DEVICES": "0"},           # NOT "environment"!
    
    # SSH access - note plural!
    ssh_keys=["key-123"],                        # NOT "ssh_key" - always plural!
    
    # Resources
    volumes=[VolumeSpec(size_gb=100, mount_path="/data")],
    
    # Pricing and limits
    max_price_per_hour=10.0,                     # NOT "max_price" or "price"!
    max_run_time_hours=24.0,                     # Max 168 (7 days)
    
    # Multi-node
    num_instances=1,                             # NOT "instance_count"!
    
    # Other
    image="nvidia/cuda:12.1.0-runtime-ubuntu22.04",
    working_dir="/workspace",
    region="us-central1-a",
    priority=50,
    auto_terminate=True,
    
    # Code upload (new in v2)
    upload_code=True                             # Default: uploads current directory
)
```

**Field validation rules:**
- Must specify exactly ONE of: `command`, `shell`, `script`
- Must specify exactly ONE of: `instance_type`, `min_gpu_memory_gb`
- Cannot specify both (will raise ValidationError)

### Code Upload Feature (new in v2)

Flow SDK automatically uploads your local code to GPU instances by default:

```python
# This works out of the box - train.py from your current directory
task = flow.run("python train.py", instance_type="a100")

# Behind the scenes:
# 1. Creates tar.gz archive of current directory
# 2. Excludes common dev files (.git, __pycache__, etc.)
# 3. Respects .flowignore patterns
# 4. Embeds in startup script (base64 encoded)
# 5. Extracts to /workspace on instance
# 6. Sets working directory to /workspace

# Control upload behavior
task = flow.run("python train.py", upload_code=False)  # Disable upload

# Handle dependencies (not automatic)
task = flow.run("pip install -r requirements.txt && python train.py")
task = flow.run("uv pip install . && uv run python train.py")  # Faster with uv
```

**Upload Constraints:**
- 10MB limit after gzip compression
- Use .flowignore to exclude files (same syntax as .gitignore)
- Working directory set to /workspace when upload_code=True
- Dependencies must be installed explicitly

**For larger projects:**
1. Use .flowignore to exclude unnecessary files
2. Clone from Git: `flow.run("git clone https://github.com/org/repo . && python train.py", upload_code=False)`
3. Pre-built Docker images: `flow.run("python /app/train.py", image="myapp:latest", upload_code=False)`
4. Download from S3: `flow.run("aws s3 cp s3://bucket/code.tar.gz . && tar -xzf code.tar.gz && python train.py", upload_code=False)`

### Zero-Import Execution (invoke.py)
```python
result = invoke("script.py", "train_model", 
               args=["dataset.csv"], 
               gpu="a100")
```

## Instance Type System

The instance resolution system has multiple layers:

1. **User Input** → Various formats accepted
2. **Parsing** → InstanceParser extracts components  
3. **Canonicalization** → Convert to standard format
4. **Resolution** → Find matching cloud instance
5. **Validation** → Ensure availability and price limits

### Current Issue
FCP API returns format `a100-80gb.sxm.2x` which the parser doesn't recognize. Need to add regex pattern for this format.

## Testing Infrastructure

### Unit Tests (`tests/unit/`)
- Model validation
- Parser correctness
- Provider interfaces

### Integration Tests (`tests/integration/`)
- Real API calls with test credentials
- Multi-component workflows
- Error handling

### E2E Tests (`tests/e2e/`)
- Full workflow validation
- Real infrastructure provisioning
- Cost-controlled testing

### Examples (`examples/`)
- 01_verify_instance.py: Basic GPU verification
- 02_jupyter_server.py: Interactive notebook server
- 03_multi_node_training.py: Distributed training
- 04_s3_data_access.py: Cloud storage integration

## Detailed Model Reference

### Task Model
```python
from flow.models import Task, TaskStatus
from datetime import datetime
from typing import Dict, List, Optional, Union, Iterator

# Task - Returned by flow.run(), represents a running/completed task
task = Task(
    # Identity:
    task_id="task-abc123",               # str: Unique identifier
    name="training-job",                  # str: Human-readable name
    status=TaskStatus.RUNNING,            # TaskStatus enum
    config=config,                        # Optional[TaskConfig]: Original config
    
    # Timestamps:
    created_at=datetime.utcnow(),         # datetime: Creation time
    started_at=datetime.utcnow(),         # Optional[datetime]: Start time
    completed_at=None,                    # Optional[datetime]: Completion time
    
    # Resources:
    instance_type="gpu.nvidia.a100",      # str: Instance type used
    num_instances=1,                      # int: Number of instances
    region="us-west-2",                   # str: Region
    
    # Cost:
    cost_per_hour="$25.60",               # str: Hourly cost
    total_cost="$12.80",                  # Optional[str]: Total cost so far
    
    # SSH Access:
    ssh_host="54.123.45.67",              # Optional[str]: SSH hostname/IP
    ssh_port=22,                          # int: SSH port
    ssh_user="ubuntu",                    # str: SSH username
    ssh_command="ssh ubuntu@54.123.45.67", # Optional[str]: Full SSH command
    
    # Runtime info:
    endpoints={"jupyter": "http://..."},   # Dict[str, str]: Service URLs
    instances=["i-abc123"],               # List[str]: Instance IDs
    message="Task is running",            # Optional[str]: Status message
    
    # User info:
    created_by="user-123"                 # Optional[str]: User ID who created task
)

# Task methods:
task.is_running           # Property: bool - True if RUNNING
task.is_terminal          # Property: bool - True if COMPLETED/FAILED/CANCELLED

# Get logs (returns str or Iterator[str]):
logs = task.logs()                       # Get last 100 lines
logs = task.logs(tail=1000)              # Get last 1000 lines
for line in task.logs(follow=True):      # Stream logs in real-time
    print(line)

# Wait for completion:
task.wait()                              # Wait indefinitely
task.wait(timeout=3600)                  # Wait up to 1 hour

# Update status:
task.refresh()                           # Refresh from provider

# Stop/cancel:
task.stop()                              # Stop the task
task.cancel()                            # Alias for stop()

# SSH access:
task.ssh()                               # Interactive SSH session
task.ssh(command="nvidia-smi")           # Run command remotely
task.ssh(node=1)                         # SSH to specific node (multi-node)

# Get user information:
user = task.get_user()                   # Get User object for task creator
print(user.username)                     # Username
print(user.email)                        # Email address

# Get instance details:
instances = task.get_instances()         # Get List[Instance] with full details
for inst in instances:
    print(inst.instance_id)              # Instance ID
    print(inst.public_ip)                # Public IP address
    print(inst.private_ip)               # Private IP address
    print(inst.status)                   # Instance status
```

### TaskStatus Enum
```python
from flow.models import TaskStatus
from enum import Enum

class TaskStatus(str, Enum):
    PENDING = "pending"       # Submitted but not started
    RUNNING = "running"       # Actively executing
    COMPLETED = "completed"   # Finished successfully (exit 0)
    FAILED = "failed"         # Finished with error (non-zero exit)
    CANCELLED = "cancelled"   # Terminated by user

# Usage:
if task.status == TaskStatus.RUNNING:
    print("Task is still running")

if task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED]:
    print("Task finished")
```

### User Model
```python
from flow.models import User

# User - Task creator information
user = User(
    user_id="user-123",                   # str: User ID
    username="alice",                     # str: Username
    email="alice@example.com"             # str: Email address
)

# Returned by:
user = task.get_user()                    # Get user who created the task
```

### Instance Model
```python
from flow.models import Instance, InstanceStatus

# Instance - Detailed instance information
instance = Instance(
    instance_id="inst-abc123",            # str: Instance ID
    status=InstanceStatus.RUNNING,        # InstanceStatus enum
    public_ip="54.123.45.67",            # Optional[str]: Public IP
    private_ip="10.0.1.23",              # Optional[str]: Private IP
    instance_type="a100-80gb",           # str: Instance type
    region="us-west-2",                  # str: Region
    created_at=datetime.utcnow(),        # datetime: Creation time
    ssh_host="54.123.45.67",             # Optional[str]: SSH hostname
    ssh_port=22,                         # int: SSH port
    ssh_user="ubuntu"                    # str: SSH username
)

# Returned by:
instances = task.get_instances()          # Get all instances for a task
```

### Volume and Mount Models
```python
from flow.models import Volume, MountSpec, StorageInterface
from datetime import datetime

# Volume - Persistent storage volume
volume = Volume(
    volume_id="vol-abc123",               # str: Unique identifier
    name="training-data",                 # str: Volume name
    size_gb=500,                         # int: Size in GB
    region="us-west-2",                   # str: Region
    interface=StorageInterface.BLOCK,     # StorageInterface enum
    created_at=datetime.utcnow(),         # datetime: Creation time
    attached_to=["i-123", "i-456"]       # List[str]: Attached instances
)

# MountSpec - Data mount specification
mount = MountSpec(
    source="s3://bucket/data",            # str: Source URL or path
    target="/mnt/data",                   # str: Container mount path
    mount_type="s3fs",                    # Literal["bind", "volume", "s3fs"]
    options={"readonly": True},           # Dict[str, Any]: Mount options
    cache_key="data-v1",                  # Optional[str]: Cache key
    size_estimate_gb=100.0                # Optional[float]: Size estimate
)

# StorageInterface enum:
class StorageInterface(str, Enum):
    BLOCK = "block"           # Block storage (EBS-like)
    FILE = "file"             # File storage (EFS-like)
```

## CLI Commands

The CLI (`flow/cli/`) provides:
- `flow init`: Initial setup and configuration
- `flow run <config.yaml>`: Submit tasks
- `flow status <task_id>`: Check task status
- `flow logs <task_id>`: Stream task logs
- `flow cancel <task_id>`: Cancel running task
- `flow catalog`: Browse available instances

## Error Handling

### Exception Hierarchy
```python
from flow.errors import (
    FlowError,              # Base exception
    AuthenticationError,    # Invalid API key
    ResourceNotFoundError,  # Resource not found
    TaskNotFoundError,      # Task not found (subclass of ResourceNotFoundError)
    ValidationError,        # Invalid configuration
    APIError,               # API request failed
    ValidationAPIError,     # API validation error (422)
    NetworkError,           # Network communication failed
    TimeoutError,           # Request timed out
    ProviderError,          # Provider-specific error
)

# FlowError structure:
try:
    task = flow.run(config)
except FlowError as e:
    print(e.message)        # Error message
    print(e.suggestions)    # List of suggestions
    print(e.error_code)     # Optional error code

# ValidationAPIError (422 responses):
try:
    task = flow.run(config)
except ValidationAPIError as e:
    print(e.status_code)    # 422
    print(e.validation_errors)  # List of field errors
    # Formatted message includes field-specific help

# APIError:
try:
    task = flow.run(config)
except APIError as e:
    print(e.status_code)    # HTTP status code
    print(e.response_body)  # Raw response body
```

## Configuration System

### FlowConfig Model
```python
from flow.models import FlowConfig

# SDK configuration
config = FlowConfig(
    api_key="fcp-...",                    # str: API key (required)
    project="my-project",                 # str: Project name (required)
    region="us-central1-a",               # str: Default region
    api_url="https://api.mlfoundry.com"   # str: API endpoint
)
```

### Configuration Hierarchy
Configuration precedence (highest to lowest):
1. Command-line arguments
2. Environment variables (FLOW_*)
3. Local .flow/config.yaml
4. Global ~/.flow/config.yaml
5. Default values

### Environment Variables
```bash
export FLOW_API_KEY="fcp-..."
export FLOW_PROJECT="my-project"
export FLOW_REGION="us-west-2"
export FLOW_API_URL="https://api.mlfoundry.com"
```

## Security Model

- API keys stored securely (never in code)
- SSH keys auto-generated per project
- Temporary credentials for cloud storage
- Network isolation between tasks
- No sudo access in containers

## Provider Interface (IProvider Protocol)

```python
from flow.interfaces import IProvider, IComputeProvider, IStorageProvider
from flow.models import Task, TaskConfig, TaskStatus, Instance, Volume
from typing import List, Dict, Any, Optional, Iterator

# IComputeProvider methods:
class IComputeProvider(Protocol):
    def find_instances(
        self,
        requirements: Dict[str, Any],  # instance_type, min_gpu_count, max_price, etc.
        limit: int = 10
    ) -> List[Instance]:
        """Find available instances matching requirements."""
        ...
    
    def submit_task(
        self,
        instance_id: str,
        config: TaskConfig,
        volume_ids: Optional[List[str]] = None
    ) -> Task:
        """Submit task to instance."""
        ...
    
    def get_task(self, task_id: str) -> Task:
        """Get full Task object."""
        ...
    
    def get_task_status(self, task_id: str) -> TaskStatus:
        """Get task status (lightweight)."""
        ...
    
    def stop_task(self, task_id: str) -> bool:
        """Stop a running task."""
        ...
    
    def get_task_logs(
        self,
        task_id: str,
        tail: int = 100,
        log_type: str = "stdout"  # "stdout" or "stderr"
    ) -> str:
        """Get last N lines of logs."""
        ...
    
    def stream_task_logs(
        self,
        task_id: str,
        log_type: str = "stdout"
    ) -> Iterator[str]:
        """Stream logs in real-time."""
        ...
    
    def list_tasks(
        self,
        status: Optional[TaskStatus] = None,
        limit: int = 100
    ) -> List[Task]:
        """List tasks with optional filter."""
        ...
    
    def prepare_task_config(self, config: TaskConfig) -> TaskConfig:
        """Add provider-specific defaults."""
        ...

# IStorageProvider methods:
class IStorageProvider(Protocol):
    def create_volume(
        self,
        size_gb: int,
        name: Optional[str] = None
    ) -> Volume:
        """Create a new volume."""
        ...
    
    def delete_volume(self, volume_id: str) -> bool:
        """Delete a volume."""
        ...
    
    def list_volumes(self, limit: int = 100) -> List[Volume]:
        """List all volumes."""
        ...
    
    def is_volume_id(self, identifier: str) -> bool:
        """Check if string is volume ID vs name."""
        ...

# IProvider combines both:
class IProvider(IComputeProvider, IStorageProvider):
    def get_capabilities(self) -> ProviderCapabilities:
        """Get provider capabilities."""
        ...
```

## Common Patterns and Best Practices

### 1. Command Specification
```python
# CORRECT - Use list for command:
config = TaskConfig(
    command=["python", "train.py", "--epochs", "10"],
    ...
)

# CORRECT - Shell commands:
config = TaskConfig(
    shell="cd /app && python train.py --epochs 10",
    ...
)

# CORRECT - Script content:
config = TaskConfig(
    script="""
import torch
model = torch.nn.Linear(10, 1)
print('Model created')
""",
    ...
)

# WRONG - String command (will be wrapped in list):
config = TaskConfig(
    command="python train.py",  # Becomes ["python train.py"]
    ...
)
```

### 2. Volume Specification
```python
# Create new volume:
volume = VolumeSpec(
    size_gb=100,
    mount_path="/data",
    name="training-data",
    iops=3000  # Optional performance tuning
)

# Attach existing volume:
volume = VolumeSpec(
    volume_id="vol-abc123",
    mount_path="/data"
    # Cannot specify size_gb or performance options
)

# Docker cache optimization:
cache_volume = VolumeSpec(
    size_gb=50,
    mount_path="/var/lib/docker",
    name="docker-cache"
)

# IMPORTANT: Volumes are empty by default!
# To use a volume for code, you must populate it first:
# Step 1: Create and populate volume
flow.run("git clone https://github.com/org/repo /code", 
         volumes=[{"name": "my-code", "mount_path": "/code"}])
# Step 2: Use pre-populated volume
flow.run("python /code/train.py", 
         volumes=[{"name": "my-code", "mount_path": "/code"}],
         upload_code=False)
```

### 3. Instance Selection
```python
# Specific instance type:
config = TaskConfig(instance_type="gpu.nvidia.a100", ...)

# Capability-based:
config = TaskConfig(
    min_gpu_memory_gb=80,
    max_price_per_hour=20.0,
    ...
)

# WRONG - Cannot specify both:
config = TaskConfig(
    instance_type="gpu.nvidia.a100",
    min_gpu_memory_gb=80,  # Will raise ValidationError
    ...
)
```

### 4. Error Handling
```python
import logging
from flow.errors import FlowError, TaskNotFoundError, ValidationError

try:
    task = flow.run(config)
    task.wait(timeout=3600)
except ValidationError as e:
    # Configuration was invalid
    logging.error(f"Invalid config: {e.message}")
    for suggestion in e.suggestions:
        logging.info(f"Try: {suggestion}")
except TaskNotFoundError:
    # Task disappeared (rare)
    logging.error("Task was terminated externally")
except FlowError as e:
    # General Flow error
    logging.error(f"Flow error: {e}")
except Exception as e:
    # Unexpected error
    logging.exception("Unexpected error")
```

### 5. Type Annotations
```python
from flow import Flow
from flow.models import Task, TaskConfig, TaskStatus, VolumeSpec
from typing import List, Optional, Iterator

def submit_training_job(
    dataset_path: str,
    epochs: int = 10,
    gpu_type: str = "a100"
) -> Task:
    """Submit a training job."""
    config = TaskConfig(
        name=f"training-{dataset_path}",
        instance_type=gpu_type,
        command=[
            "python", "train.py",
            "--dataset", dataset_path,
            "--epochs", str(epochs)
        ],
        volumes=[
            VolumeSpec(size_gb=100, mount_path="/data")
        ]
    )
    
    flow = Flow()
    return flow.run(config)

def stream_task_output(task: Task) -> Iterator[str]:
    """Stream task logs until completion."""
    for line in task.logs(follow=True):
        yield line
        if task.is_terminal:
            break
```

## API Documentation

### FCP (ML Foundry) API Reference

Full documentation: https://docs.mlfoundry.com/foundry-api/api-reference
API Overview: https://docs.mlfoundry.com/foundry-api/api-overview-and-quickstart
Startup Scripts: https://docs.mlfoundry.com/compute-and-storage/startup-scripts
Instance Types: https://docs.mlfoundry.com/compute-and-storage/instance-types-and-specifications
OpenAPI Specification: https://firebasestorage.googleapis.com/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FOTq5EAhUq1bhaygVHT8s%2Fimports%2F0gQyVfnbr1SwJA8ot4uG%2Fopenapi8.json?alt=media&token=a18e9b66-119f-4276-a372-5be3e52150c3

### API Overview

The Foundry API provides programmatic access to ML Foundry's compute infrastructure:
- Base URL: `https://api.mlfoundry.com/v2/`
- Currently supports: Spot Instances, Storage Volumes, SSH Keys
- Requires API key authentication
- API keys created at https://app.mlfoundry.com/account/apikeys

### Prerequisites
- Billing must be configured in Foundry Console before API usage
- Project must exist for most operations

### Key API Endpoints

#### Authentication
All endpoints require Bearer authentication with Foundry API Key:
```python
headers = {"Authorization": f"Bearer {api_key}"}
```

#### Core Resource Endpoints

##### Projects
- `GET /v2/projects` - List all projects

##### Instance Types
- `GET /v2/instance-types` - Get available instance types and specifications

##### SSH Keys
- `GET /v2/ssh-keys` - List SSH keys
- `POST /v2/ssh-keys` - Create new SSH key
- `DELETE /v2/ssh-keys/{key_id}` - Delete SSH key

##### Storage Volumes
- `GET /v2/volumes` - List volumes
- `POST /v2/volumes` - Create volume (Block or File storage)
- `DELETE /v2/volumes/{volume_id}` - Delete volume

##### Spot Instances (Bids)
- `GET /v2/spot/availability` - Check current spot capacity and pricing
- `POST /v2/spot/bids` - Create bid for spot instances
- `GET /v2/spot/bids` - List bids (filter by project, instance type, region, status)
- `PATCH /v2/spot/bids/{bid_fid}` - Update bid (e.g., change limit price)
- `DELETE /v2/spot/bids/{bid_fid}` - Cancel bid (NOT POST to /cancel endpoint!)

##### Instances
- `GET /v2/instances` - List instances (filter by project, instance type, region, status)

##### API Keys
- Manage API keys for authentication

##### Profile
- User profile management

### Common API Mistakes to Avoid

1. **Cancelling Bids**: Use `DELETE /v2/spot/bids/{bid_fid}`, NOT `POST /v2/spot/bids/{bid_fid}/cancel`
   - The API uses RESTful DELETE for cancellation, not a separate /cancel endpoint
   - This applies to both v1 (`/bids/{id}`) and v2 (`/v2/spot/bids/{id}`) APIs

2. **Volume Defaults**: Default volume size should be minimal (1GB) for tests to avoid quota exhaustion
   - Production uses can specify larger sizes as needed
   - Colab persistence volumes were causing quota issues at 100GB default

3. **Region Availability**: Not all instance types are available in all regions
   - Preferably use us-central1-b for H100s (it's less congested)
   - A100s may not be available in all regions
   - Always check availability before hardcoding region/instance combinations

#### Request/Response Models (Based on FCP API Types)

##### Instance Type Format
FCP API instance types ALWAYS include memory specification in the format: `{gpu}-{memory}gb.{interconnect}.{count}x`

Verified examples from actual API response:
- `a100-80gb.sxm.1x` - 1x A100 80GB with SXM4 interconnect
- `a100-80gb.sxm.2x` - 2x A100 80GB with SXM4 interconnect
- `a100-80gb.sxm.4x` - 4x A100 80GB with SXM4 interconnect
- `a100-80gb.sxm.8x` - 8x A100 80GB with SXM4 interconnect
- `h100-80gb.sxm.8x` - 8x H100 80GB with SXM5 interconnect
- `h100-80gb.pcie.1x` - 1x H100 80GB with PCIe (different FID than SXM)

Note: ALL instance types include memory (e.g., "80gb") in their names.

##### IMPORTANT: Instance Type FIDs
The FCP API uses opaque FIDs (e.g., "it_MsIRhxj3ccyVWGfP") in requests/responses.
The SDK automatically translates between user-friendly names and FIDs:
- User provides: "a100" or "2xa100" 
- SDK resolves to: "it_MsIRhxj3ccyVWGfP" (for a100-80gb.sxm.1x)
- API returns: "it_MsIRhxj3ccyVWGfP"
- SDK displays as: "a100-80gb.sxm.1x"

Current FID mappings (as of 2025-07-02):
- `it_MsIRhxj3ccyVWGfP` → a100-80gb.sxm.1x
- `it_5M6aGxGovNeX5ltT` → a100-80gb.sxm.2x
- `it_fK7Cx6TVhOK5ZfXT` → a100-80gb.sxm.4x
- `it_J7OyNf9idfImLIFo` → a100-80gb.sxm.8x
- `it_5ECSoHQjLBzrp5YM` → h100-80gb.sxm.8x
- `it_XqgKWbhZ5gznAYsG` → h100-80gb.pcie.1x

##### InstanceTypeModel Response (Actual FCP API Response)
```json
{
  "fid": "it_5M6aGxGovNeX5ltT",
  "name": "a100-80gb.sxm.2x",
  "num_cpus": 48,
  "cpu_type": "Intel(R) Xeon(R) Platinum 8468",
  "ram_gb": 450,
  "num_gpus": 2,
  "gpu_type": "NVIDIA A100",
  "gpu_memory_gb": 80,
  "gpu_socket": "SXM4",
  "local_storage_gb": 3500
}
```

Note the actual field names from FCP API:
- `ram_gb` (not `ram`)
- `num_gpus` and `gpu_type` (not nested `gpus` array)
- `gpu_memory_gb` (separate field)
- `gpu_socket` (e.g., "SXM4", "SXM5")
- `cpu_type` (includes full CPU model name)

##### ProjectModel Response
```json
{
  "fid": "proj-abc123def456",
  "name": "my-ml-project",
  "created_at": "2024-01-01T00:00:00Z"
}
```

##### CreateBidRequest
```json
{
  "project": "proj-abc123def456",           // Project FID (required)
  "region": "us-central1-a",                // Region (required)
  "instance_type": "it_MsIRhxj3ccyVWGfP",   // Instance type FID, NOT name! (required)
  "limit_price": "$15.00",                  // Dollar string format (required)
  "ssh_keys": ["ssh-key-abc123"],           // SSH key FIDs (required)
  "startup_script": "#!/bin/bash\n...",     // Cloud-init script (required)
  "volumes": ["vol-abc123"],                // Volume FIDs (optional)
  "name": "my-training-job",                // Display name (optional but recommended)
  "instance_quantity": 1                     // Number of instances (defaults to 1)
}
```

IMPORTANT bid creation notes:
- `instance_type` MUST be the FID (e.g., "it_MsIRhxj3ccyVWGfP"), not the name ("a100")
- Field is `limit_price` not `price`
- Price MUST include dollar sign (e.g., "$15.00")
- `startup_script` is cloud-init format and is required

##### BidModel Response (Actual FCP API Response)
```json
{
  "fid": "bid_xyz789abc123",                 // Bid FID
  "name": "my-training-job",                 // Display name
  "project": "proj-abc123def456",            // Project FID
  "created_by": "user-123",                  // User FID
  "region": "us-central1-a",                 // Region
  "instance_type": "it_MsIRhxj3ccyVWGfP",    // Instance type FID (NOT name!)
  "instance_quantity": 1,                    // Number of instances
  "limit_price": "$15.00",                   // Dollar string format
  "status": "pending",                       // pending|provisioning|allocated|running|completed|failed|cancelled|terminated
  "ssh_keys": ["ssh-key-abc123"],            // SSH key FIDs
  "startup_script": "#!/bin/bash\n...",      // Cloud-init script
  "volumes": ["vol-abc123"],                 // Volume FIDs
  "created_at": "2024-01-15T10:30:00Z",      // ISO timestamp
  "updated_at": "2024-01-15T10:35:00Z"       // ISO timestamp
}
```

Status progression:
- `pending` → Bid created, waiting for auction
- `provisioning` → Instance being created
- `allocated` → Instance ready, startup script running
- `running` → Startup script completed (if trackable)
- `completed`/`terminated` → Instance stopped
- `failed` → Error occurred
- `cancelled` → User cancelled

##### AuctionModel (Spot Availability) - Actual FCP API Response
```json
{
  "fid": "auc_rECU5s87CABp37aB",
  "instance_type": "it_XqgKWbhZ5gznAYsG",  // FID reference, not name
  "region": "us-central1-a",
  "capacity": 11,
  "last_instance_price": "$10.00"  // Dollar string format
}
```

Note: The spot availability response uses instance type FIDs (e.g., "it_XqgKWbhZ5gznAYsG") 
rather than human-readable names. To get the actual instance type details, you need to 
cross-reference with the instance types endpoint.

##### InstanceModel Response
```json
{
  "fid": "inst-abc123xyz789",
  "bid": "bid-xyz789abc123",
  "status": "running",  // pending|running|stopped|terminated
  "ssh_destination": "34.125.67.89:22",  // SSH connection string (host:port)
  "public_ip": "34.125.67.89",  // DEPRECATED - use ssh_destination
  "private_ip": "10.128.0.5",
  "created_at": "2024-01-15T10:35:00Z",
  "terminated_at": null
}
```

Note: The FCP API now uses `ssh_destination` field instead of `public_ip`. The SDK 
automatically handles both fields for backwards compatibility, preferring `ssh_destination` 
when available and falling back to `public_ip` if needed.

##### VolumeModel Response
```json
{
  "fid": "vol-abc123def456",
  "name": "training-data",
  "project": "proj-abc123def456",
  "region": "us-central1-a",
  "capacity_gb": 500,
  "interface": "block",  // block|file
  "status": "available",  // available|attached|deleting
  "created_at": "2024-01-15T09:00:00Z",
  "updated_at": "2024-01-15T09:05:00Z"
}
```

##### SSHKeyModel Response
```json
{
  "fid": "ssh-key-abc123",
  "name": "my-dev-key",
  "project": "proj-abc123def456",
  "public_key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQ...",
  "created_at": "2024-01-15T08:00:00Z"
}
```

##### Paginated Response Format (BidsResponse)
```json
{
  "data": [
    {
      "fid": "bid-xyz789abc123",
      "project": "proj-abc123def456",
      "region": "us-central1-a",
      "instance_type": "a100-80gb.sxm.2x",
      "price": "$15.00",
      "status": "running",
      "ssh_keys": ["ssh-key-abc123"],
      "startup_script": "#!/bin/bash\n...",
      "volumes": ["vol-abc123"],
      "created_at": "2024-01-15T10:30:00Z",
      "updated_at": "2024-01-15T10:35:00Z"
    }
  ],
  "next_cursor": "cursor-abc123"  // Optional, for pagination
}
```

##### List Response Formats
IMPORTANT: Response format varies by endpoint!

Endpoints that return arrays directly:
- `GET /v2/projects` → `ProjectModel[]`
- `GET /v2/ssh-keys` → `SSHKeyModel[]`
- `GET /v2/instance-types` → `InstanceTypeModel[]`
- `GET /v2/spot/availability` → `AuctionModel[]`
- `GET /v2/volumes` → `VolumeModel[]`

Endpoints that return paginated objects with "data" key:
- `GET /v2/spot/bids` → `{"data": BidModel[], "next_cursor": string?}`
- Note: Individual bid GET is NOT supported - must list and filter

#### Important Format Details and Common Gotchas

##### Price Format
- Always includes dollar sign: "$25.00" not "25.00"
- String format, not number
- Used in `limit_price` field for bids
- Returned as `last_instance_price` in spot availability

##### Common API Gotchas
1. **Instance Type FIDs**: API uses FIDs everywhere, not human names
   - Wrong: `{"instance_type": "a100"}`
   - Right: `{"instance_type": "it_MsIRhxj3ccyVWGfP"}`

2. **Bid Listing**: No individual GET endpoint
   - Wrong: `GET /v2/spot/bids/{bid_id}`
   - Right: `GET /v2/spot/bids?project={project_id}` then filter

3. **Response Formats**: Inconsistent between endpoints
   - Arrays: projects, ssh-keys, instance-types, spot/availability
   - Objects with "data": spot/bids

4. **Field Names**: Not always intuitive
   - `limit_price` not `price` in bid creation
   - `instance_quantity` not `num_instances`
   - `ram_gb` not `memory_gb` in instance types

5. **Status Values**: Many possible states
   - Bids: pending, provisioning, allocated, running, completed, failed, cancelled, terminated, deactivated, terminating
   - Map appropriately to your domain model

6. **Common Field Name Mistakes in TaskConfig**:
   - `env` NOT `environment` for environment variables
   - `command` as list NOT string (though string is auto-converted)
   - `ssh_keys` NOT `ssh_key` (always plural)
   - `max_price_per_hour` NOT `max_price` or `price`
   - `num_instances` NOT `instance_count` or `instances`

##### Price Format
All prices use dollar string format: `"$12.50"`, `"$100.00"`
- Always includes dollar sign
- Always includes two decimal places
- Stored as string, not number

##### FID Format (Verified from API)
All resource IDs use FID (Foundry ID) format:
- Instance Types: `it_` prefix (e.g., `it_5M6aGxGovNeX5ltT`)
- Auctions: `auc_` prefix (e.g., `auc_rECU5s87CABp37aB`)
- Projects: Expected format `proj_{random}` 
- Bids: Expected format `bid_{random}`
- Volumes: Expected format `vol_{random}`
- SSH Keys: Expected format `ssh-key_{random}`

##### Instance Type String Format (Verified)
Pattern: `{gpu_type}-{memory}gb.{interconnect}.{count}x`
- GPU type: `a100`, `h100`, etc.
- Memory: ALWAYS specified (e.g., `80gb`)
- Interconnect: `sxm` (shown without version in name, but API returns socket type like "SXM4", "SXM5")
- Count: `1x`, `2x`, `4x`, `8x`

Examples from actual API:
- `a100-80gb.sxm.1x` (uses SXM4 socket)
- `a100-80gb.sxm.8x` (uses SXM4 socket)
- `h100-80gb.sxm.8x` (uses SXM5 socket)

#### Status Codes
- 201 Created - Resource successfully created
- 200 OK - Successful retrieval
- 400 Bad Request - Invalid parameters
- 401 Unauthorized - Invalid API key
- 422 Unprocessable Entity - Validation error
- 404 Not Found - Resource not found

#### Error Response Format
```json
{
  "error": {
    "code": "VALIDATION_ERROR",
    "message": "Invalid instance type",
    "details": {
      "field": "instance_type",
      "value": "invalid-gpu",
      "allowed_values": ["a100-80gb.sxm.2x", "h100.sxm.8x"]
    }
  },
  "request_id": "req-abc123def456"
}
```

### Best Practices

1. **Authentication Security**
   - Store API keys securely (environment variables, secret managers)
   - Never commit API keys to version control

2. **Error Handling**
   - Implement retry logic for transient failures
   - Handle 422 validation errors with detailed field-level messages
   - Monitor rate limits

3. **Resource Management**
   - Always check `/v2/spot/availability` before creating bids
   - Verify project exists before resource creation
   - Clean up unused volumes and instances

4. **Startup Scripts**
   - Use cloud-init format for instance initialization
   - Include error handling in startup scripts
   - Log startup progress for debugging

### Common Workflows

#### 1. Create Spot Instance
```python
# Check availability
response = requests.get(f"{base_url}/v2/spot/availability", headers=headers)
availability = response.json()

# Create bid
bid_data = {
    "project": "my-project",
    "instance_type": "gpu.nvidia.a100",
    "region": "us-central1-a",
    "limit_price": 10.0,
    "quantity": 1,
    "startup_script": "#!/bin/bash\n..."
}
response = requests.post(f"{base_url}/v2/spot/bids", json=bid_data, headers=headers)
```

#### 2. Manage Storage
```python
# Create volume
volume_data = {
    "name": "training-data",
    "size_gb": 100,
    "interface": "block",  # or "file"
    "project": "my-project",
    "region": "us-central1-a"
}
response = requests.post(f"{base_url}/v2/volumes", json=volume_data, headers=headers)
```

## Codebase Structure

```
src/
└── flow/
    ├── flow.py                 # Main Flow class and high-level API
    ├── models.py               # Pydantic models (TaskConfig, Task, etc.)
    ├── interfaces.py           # Protocol definitions (IProvider, etc.)
    ├── errors.py               # Exception hierarchy
    ├── constants.py            # FCP constants and enums
    │
    ├── core/                   # Core domain logic
    │   ├── instance.py         # Instance type parsing and resolution
    │   ├── task_engine.py      # Task lifecycle management
    │   └── catalog/            # Instance catalog system
    │       ├── catalog.py      # Main catalog implementation
    │       ├── builder.py      # Catalog builder
    │       ├── query.py        # Query interface
    │       └── storage.py      # Catalog persistence
    │
    ├── providers/              # Cloud provider implementations
    │   ├── base.py            # Base provider interface
    │   ├── factory.py         # Provider factory
    │   ├── registry.py        # Provider registration
    │   ├── fcp/               # FCP (ML Foundry) provider
    │   │   ├── provider.py    # Main FCP provider
    │   │   ├── api_types.py   # FCP API type definitions
    │   │   ├── instance_resolver.py    # Instance resolution
    │   │   ├── bid_manager.py          # Spot bid management
    │   │   ├── startup_builder_v2.py   # Startup script generation
    │   │   └── lifecycle_manager.py    # Instance lifecycle
    │   └── local/             # Local provider for testing
    │
    ├── frontends/             # Input format adapters
    │   ├── base.py            # Base frontend interface
    │   ├── cli/               # CLI argument parsing
    │   ├── yaml/              # YAML config files
    │   ├── slurm/             # SLURM script parsing
    │   └── submitit/          # Facebook Submitit compatibility
    │
    ├── cli/                   # CLI application
    │   ├── app.py             # Main CLI app
    │   └── commands/          # CLI commands
    │       └── init.py        # flow init command
    │
    ├── data/                  # Data loading and resolution
    │   ├── loaders.py         # Data loading utilities
    │   └── resolver.py        # S3/URL resolution
    │
    ├── managers/              # Task and resource management
    │   └── task_manager.py    # Task lifecycle coordination
    │
    └── utils/                 # Utilities
        ├── retry.py           # Retry logic
        ├── validation.py      # Input validation
        └── security.py        # Security utilities
```

## System Architecture

```
┌─────────────────────────────────────────────────────────────────────┐
│                           User Interface Layer                       │
├─────────────────────────┬───────────────┬───────────────────────────┤
│   Python API (flow.py)  │  CLI (cli/)   │  Decorators (@flow.run)  │
├─────────────────────────┴───────────────┴───────────────────────────┤
│                         Frontend Adapters                            │
├──────────┬──────────┬──────────┬──────────┬────────────────────────┤
│   YAML   │  SLURM   │ Submitit │   CLI    │  Direct TaskConfig     │
├──────────┴──────────┴──────────┴──────────┴────────────────────────┤
│                    Unified Task Model (models.py)                    │
│                        TaskConfig → Task                             │
├─────────────────────────────────────────────────────────────────────┤
│                      Core Domain Layer                               │
├─────────────────────┬──────────────────┬───────────────────────────┤
│  Instance Resolution│  Task Management  │    Data Loading            │
│  - Parser           │  - Lifecycle      │    - S3 Resolution        │
│  - Catalog          │  - Status         │    - URL Handling         │
│  - Validation       │  - Monitoring     │    - Mount Specs          │
├─────────────────────┴──────────────────┴───────────────────────────┤
│                    Provider Abstraction Layer                        │
│                         IProvider Protocol                           │
├─────────────────────────────────────────────────────────────────────┤
│                      Provider Implementations                        │
├─────────────────────┬──────────────────┬───────────────────────────┤
│    FCP Provider     │  Local Provider  │   Future: AWS/GCP/Azure    │
│  - Spot Bidding     │  - Testing       │                           │
│  - SSH Management   │  - Development   │                           │
│  - Volume Storage   │                  │                           │
└─────────────────────┴──────────────────┴───────────────────────────┘
```

## Data Flow

```
1. User Input
   ↓
2. Frontend Adapter (parse/validate)
   ↓
3. TaskConfig (unified model)
   ↓
4. Provider Selection (based on API endpoint)
   ↓
5. Instance Resolution
   - Parse instance type
   - Query catalog/API
   - Find best match
   ↓
6. Resource Creation
   - Create volumes
   - Generate SSH keys
   - Build startup script
   ↓
7. Task Submission
   - Create spot bid
   - Monitor status
   - Handle lifecycle
   ↓
8. Task Object (returned to user)
   - Status tracking
   - Log streaming
   - SSH access
```

## Future Directions

- Additional providers (AWS, GCP, Azure)
- Kubernetes backend
- Advanced scheduling (gang scheduling, priorities)
- Cost optimization algorithms
- Workflow orchestration (DAGs)