Metadata-Version: 2.4
Name: cjm-plugin-system
Version: 0.0.23
Summary: Process-isolated plugin system with resource-aware scheduling, configuration validation, and lifecycle management for extensible Python applications.
Home-page: https://github.com/cj-mills/cjm-plugin-system
Author: Christian J. Mills
Author-email: 9126128+cj-mills@users.noreply.github.com
License: Apache Software License 2.0
Keywords: nbdev jupyter notebook python
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: Natural Language :: English
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: License :: OSI Approved :: Apache Software License
Requires-Python: >=3.11
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: cjm_error_handling
Requires-Dist: fastapi
Requires-Dist: psutil
Requires-Dist: uvicorn
Requires-Dist: httpx
Requires-Dist: typer
Requires-Dist: pyyaml
Provides-Extra: dev
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: keywords
Dynamic: license
Dynamic: license-file
Dynamic: provides-extra
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary

# cjm-plugin-system


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## Install

``` bash
pip install cjm_plugin_system
```

## Project Structure

    nbs/
    ├── core/ (7)
    │   ├── interface.ipynb   # Abstract base class defining the generic plugin interface
    │   ├── manager.ipynb     # Plugin discovery, loading, and lifecycle management system
    │   ├── metadata.ipynb    # Data structures for plugin metadata
    │   ├── proxy.ipynb       # Bridge between Host application and isolated Worker processes
    │   ├── queue.ipynb       # Resource-aware job queue for sequential plugin execution with cancellation support
    │   ├── scheduling.ipynb  # Resource scheduling policies for plugin execution
    │   └── worker.ipynb      # FastAPI server that runs inside isolated plugin environments
    ├── utils/ (1)
    │   └── validation.ipynb  # Validation helpers for plugin configuration dataclasses
    └── cli.ipynb  # CLI tool for declarative plugin management

Total: 9 notebooks across 2 directories

## Module Dependencies

``` mermaid
graph LR
    cli[cli<br/>cli]
    core_interface[core.interface<br/>Plugin Interface]
    core_manager[core.manager<br/>Plugin Manager]
    core_metadata[core.metadata<br/>Plugin Metadata]
    core_proxy[core.proxy<br/>Remote Plugin Proxy]
    core_queue[core.queue<br/>Job Queue]
    core_scheduling[core.scheduling<br/>Scheduling]
    core_worker[core.worker<br/>Universal Worker]
    utils_validation[utils.validation<br/>Configuration Validation]

    core_manager --> core_interface
    core_manager --> core_scheduling
    core_manager --> core_metadata
    core_manager --> core_proxy
    core_proxy --> core_interface
    core_queue --> core_manager
    core_scheduling --> core_metadata
```

*7 cross-module dependencies detected*

## CLI Reference

### `cjm-ctl` Command

                                                                                                              
     Usage: cjm-ctl [OPTIONS] COMMAND [ARGS]...                                                               
                                                                                                              
     CJM Plugin System CLI                                                                                    
                                                                                                              
    ╭─ Options ──────────────────────────────────────────────────────────────────────────────────────────────╮
    │ --install-completion          Install completion for the current shell.                                │
    │ --show-completion             Show completion for the current shell, to copy it or customize the       │
    │                               installation.                                                            │
    │ --help                        Show this message and exit.                                              │
    ╰────────────────────────────────────────────────────────────────────────────────────────────────────────╯
    ╭─ Commands ─────────────────────────────────────────────────────────────────────────────────────────────╮
    │ install-all   Install and register all plugins defined in plugins.yaml.                                │
    │ setup-host    Install interface libraries in the current Python environment.                           │
    ╰────────────────────────────────────────────────────────────────────────────────────────────────────────╯

For detailed help on any command, use `cjm-ctl <command> --help`.

## Module Overview

Detailed documentation for each module in the project:

### cli (`cli.ipynb`)

> CLI tool for declarative plugin management

#### Import

``` python
from cjm_plugin_system.cli import (
    app,
    main,
    run_cmd,
    install_all,
    setup_host
)
```

#### Functions

``` python
def main() -> None
    "CJM Plugin System CLI for managing isolated plugin environments."
```

``` python
def run_cmd(
    cmd: str,  # Shell command to execute
    shell: bool = True,  # Whether to run through shell
    check: bool = True  # Whether to raise on non-zero exit
) -> None
    "Run a shell command and stream output."
```

``` python
def _generate_manifest(
    env_name: str,  # Name of the Conda environment
    package_name: str,  # Package source string (git URL or package name)
    manifest_dir: Path  # Directory to write manifest JSON files
) -> None
    "Run introspection script inside the target env to generate manifest."
```

``` python
def install_all(
    config_path: str = typer.Option("plugins.yaml", "--config", help="Path to master config file"),
    force: bool = typer.Option(False, help="Force recreation of environments")
) -> None
    "Install and register all plugins defined in plugins.yaml."
```

``` python
def setup_host(
    config_path: str = typer.Option("plugins.yaml", "--config", help="Path to master config file"),
    yes: bool = typer.Option(False, "--yes", "-y", help="Skip confirmation prompt")
) -> None
    "Install interface libraries in the current Python environment."
```

### Plugin Interface (`interface.ipynb`)

> Abstract base class defining the generic plugin interface

#### Import

``` python
from cjm_plugin_system.core.interface import (
    FileBackedDTO,
    PluginInterface
)
```

#### Classes

``` python
@runtime_checkable
class FileBackedDTO(Protocol):
    "Protocol for Data Transfer Objects that serialize to disk for zero-copy transfer."
    
    def to_temp_file(self) -> str: # Absolute path to the temporary file
        "Save the data to a temporary file and return the absolute path."
```

``` python
class PluginInterface(ABC):
    "Abstract base class for all plugins (both local workers and remote proxies)."
    
    def name(self) -> str: # Unique identifier for this plugin
            """Unique plugin identifier."""
            ...
    
        @property
        @abstractmethod
        def version(self) -> str: # Semantic version string (e.g., "1.0.0")
        "Unique plugin identifier."
    
    def version(self) -> str: # Semantic version string (e.g., "1.0.0")
            """Plugin version."""
            ...
    
        @abstractmethod
        def initialize(
            self,
            config: Optional[Dict[str, Any]] = None # Configuration dictionary
        ) -> None
        "Plugin version."
    
    def initialize(
            self,
            config: Optional[Dict[str, Any]] = None # Configuration dictionary
        ) -> None
        "Initialize or re-configure the plugin."
    
    def execute(
            self,
            *args,
            **kwargs
        ) -> Any: # Plugin-specific output
        "Execute the plugin's main functionality."
    
    def execute_stream(
            self,
            *args,
            **kwargs
        ) -> Generator[Any, None, None]: # Yields partial results
        "Stream execution results chunk by chunk."
    
    def get_config_schema(self) -> Dict[str, Any]: # JSON Schema for configuration
            """Return JSON Schema describing the plugin's configuration options."""
            ...
    
        @abstractmethod
        def get_current_config(self) -> Dict[str, Any]: # Current configuration values
        "Return JSON Schema describing the plugin's configuration options."
    
    def get_current_config(self) -> Dict[str, Any]: # Current configuration values
            """Return the current configuration state as a dictionary."""
            ...
    
        @abstractmethod
        def cleanup(self) -> None
        "Return the current configuration state as a dictionary."
    
    def cleanup(self) -> None:
            """Clean up resources when plugin is unloaded."""
            ...
    
        def cancel(self) -> None
        "Clean up resources when plugin is unloaded."
    
    def cancel(self) -> None:
            """Cancel the current execution. Override for cooperative cancellation support."""
            pass  # Default: no-op (plugins opt-in to cancellation)
        "Cancel the current execution. Override for cooperative cancellation support."
    
    def report_progress(
            self,
            progress: float, # 0.0 to 1.0, or -1.0 for indeterminate
            message: str = "" # Descriptive status message
        ) -> None
        "Report execution progress. Call during execute() to update status."
```

### Plugin Manager (`manager.ipynb`)

> Plugin discovery, loading, and lifecycle management system

#### Import

``` python
from cjm_plugin_system.core.manager import (
    PluginManager,
    get_plugin_config,
    get_plugin_config_schema,
    get_all_plugin_configs,
    update_plugin_config,
    reload_plugin,
    get_plugin_stats,
    execute_plugin_stream
)
```

#### Functions

``` python
def get_plugin_config(
    self,
    plugin_name: str # Name of the plugin
) -> Optional[Dict[str, Any]]: # Current configuration or None
    "Get the current configuration of a plugin."
```

``` python
def get_plugin_config_schema(
    self,
    plugin_name: str # Name of the plugin
) -> Optional[Dict[str, Any]]: # JSON Schema or None
    "Get the configuration JSON Schema for a plugin."
```

``` python
def get_all_plugin_configs(self) -> Dict[str, Dict[str, Any]]: # Plugin name -> config mapping
    """Get current configuration for all loaded plugins."""
    return {
        name: plugin.get_current_config()
    "Get current configuration for all loaded plugins."
```

``` python
def update_plugin_config(
    self,
    plugin_name: str, # Name of the plugin
    config: Dict[str, Any] # New configuration values
) -> bool: # True if successful
    "Update a plugin's configuration (hot-reload without restart)."
```

``` python
def reload_plugin(
    self,
    plugin_name: str, # Name of the plugin
    config: Optional[Dict[str, Any]] = None # Optional new configuration
) -> bool: # True if successful
    "Reload a plugin by terminating and restarting its Worker."
```

``` python
def get_plugin_stats(
    self,
    plugin_name: str # Name of the plugin
) -> Optional[Dict[str, Any]]: # Resource telemetry or None
    "Get resource usage stats for a plugin's Worker process."
```

``` python
async def execute_plugin_stream(
    self,
    plugin_name: str,  # Name of the plugin
    *args,
    **kwargs
) -> AsyncGenerator[Any, None]:  # Async generator yielding results
    "Execute a plugin with streaming response."
```

#### Classes

``` python
class PluginManager:
    def __init__(
        self,
        plugin_interface: Type[PluginInterface] = PluginInterface,  # Base interface for type checking
        search_paths: Optional[List[Path]] = None,  # Custom manifest search paths
        scheduler: Optional[ResourceScheduler] = None  # Resource allocation policy
    )
    "Manages plugin discovery, loading, and lifecycle via process isolation."
    
    def __init__(
            self,
            plugin_interface: Type[PluginInterface] = PluginInterface,  # Base interface for type checking
            search_paths: Optional[List[Path]] = None,  # Custom manifest search paths
            scheduler: Optional[ResourceScheduler] = None  # Resource allocation policy
        )
        "Initialize the plugin manager."
    
    def register_system_monitor(
            self,
            plugin_name: str  # Name of the system monitor plugin
        ) -> None
        "Bind a loaded plugin to act as the hardware system monitor."
    
    def discover_manifests(self) -> List[PluginMeta]:  # List of discovered plugin metadata
            """Discover plugins via JSON manifests in search paths."""
            self.discovered = []
            seen_plugins = set()
    
            for base_path in self.search_paths
        "Discover plugins via JSON manifests in search paths."
    
    def get_discovered_by_category(
            self,
            category: str  # Category to filter by (e.g., "transcription")
        ) -> List[PluginMeta]:  # List of matching discovered plugins
        "Get discovered plugins filtered by category."
    
    def get_plugins_by_category(
            self,
            category: str  # Category to filter by (e.g., "transcription")
        ) -> List[PluginMeta]:  # List of matching loaded plugins
        "Get loaded plugins filtered by category."
    
    def get_discovered_categories(self) -> List[str]:  # List of unique categories
            """Get all unique categories among discovered plugins."""
            return list(set(meta.category for meta in self.discovered if meta.category))
    
        def get_loaded_categories(self) -> List[str]:  # List of unique categories
        "Get all unique categories among discovered plugins."
    
    def get_loaded_categories(self) -> List[str]:  # List of unique categories
            """Get all unique categories among loaded plugins."""
            return list(set(meta.category for meta in self.plugins.values() if meta.category))
    
        def get_plugin_meta(
            self,
            plugin_name: str  # Name of the plugin
        ) -> Optional[PluginMeta]:  # Plugin metadata or None
        "Get all unique categories among loaded plugins."
    
    def get_plugin_meta(
            self,
            plugin_name: str  # Name of the plugin
        ) -> Optional[PluginMeta]:  # Plugin metadata or None
        "Get metadata for a loaded plugin by name."
    
    def get_discovered_meta(
            self,
            plugin_name: str  # Name of the plugin
        ) -> Optional[PluginMeta]:  # Plugin metadata or None
        "Get metadata for a discovered (not necessarily loaded) plugin by name."
    
    def load_plugin(
            self,
            plugin_meta: PluginMeta,  # Plugin metadata (with manifest attached)
            config: Optional[Dict[str, Any]] = None  # Initial configuration
        ) -> bool:  # True if successfully loaded
        "Load a plugin by spawning a Worker subprocess."
    
    def load_all(
            self,
            configs: Optional[Dict[str, Dict[str, Any]]] = None  # Plugin name -> config mapping
        ) -> Dict[str, bool]:  # Plugin name -> success mapping
        "Discover and load all available plugins."
    
    def unload_plugin(
            self,
            plugin_name: str  # Name of the plugin to unload
        ) -> bool:  # True if successfully unloaded
        "Unload a plugin and terminate its Worker process."
    
    def unload_all(self) -> None:
            """Unload all plugins and terminate all Worker processes."""
            for name in list(self.plugins.keys())
        "Unload all plugins and terminate all Worker processes."
    
    def get_plugin(
            self,
            plugin_name: str  # Name of the plugin
        ) -> Optional[PluginInterface]:  # Plugin proxy instance or None
        "Get a loaded plugin instance by name."
    
    def list_plugins(self) -> List[PluginMeta]:  # List of loaded plugin metadata
            """List all loaded plugins."""
            return list(self.plugins.values())
    
        def _evict_for_resources(self, needed_meta: PluginMeta) -> bool
        "List all loaded plugins."
    
    def execute_plugin(
            self,
            plugin_name: str,  # Name of the plugin
            *args,
            **kwargs
        ) -> Any:  # Plugin result
        "Execute a plugin's main functionality (sync)."
    
    async def execute_plugin_async(
            self,
            plugin_name: str,  # Name of the plugin
            *args,
            **kwargs
        ) -> Any:  # Plugin result
        "Execute a plugin's main functionality (async)."
    
    def enable_plugin(
            self,
            plugin_name: str  # Name of the plugin
        ) -> bool:  # True if plugin was enabled
        "Enable a plugin."
    
    def disable_plugin(
            self,
            plugin_name: str  # Name of the plugin
        ) -> bool:  # True if plugin was disabled
        "Disable a plugin without unloading it."
    
    def get_plugin_logs(self, plugin_name: str, lines: int = 50) -> str:
                """Read the last N lines of the plugin's log file."""
                # Find the log path based on convention
                log_path = Path.home() / ".cjm" / "logs" / f"{plugin_name}.log"
                
                if not log_path.exists()
        "Read the last N lines of the plugin's log file."
```

### Plugin Metadata (`metadata.ipynb`)

> Data structures for plugin metadata

#### Import

``` python
from cjm_plugin_system.core.metadata import (
    PluginMeta
)
```

#### Classes

``` python
@dataclass
class PluginMeta:
    "Metadata about a plugin."
    
    name: str  # Plugin's unique identifier
    version: str  # Plugin's version string
    description: str = ''  # Brief description of the plugin's functionality
    author: str = ''  # Plugin author's name or organization
    package_name: str = ''  # Python package name containing the plugin
    category: str = ''  # Plugin category (e.g., "transcription", "system_monitor")
    interface: str = ''  # Fully qualified interface class name
    config_schema: Optional[Dict[str, Any]]  # JSON Schema for plugin configuration
    instance: Optional[Any]  # Plugin instance (PluginInterface subclass)
    enabled: bool = True  # Whether the plugin is enabled
    last_executed: float = 0.0  # Unix timestamp
```

### Remote Plugin Proxy (`proxy.ipynb`)

> Bridge between Host application and isolated Worker processes

#### Import

``` python
from cjm_plugin_system.core.proxy import (
    RemotePluginProxy,
    execute_async,
    execute_stream_sync,
    execute_stream,
    get_stats,
    is_alive,
    cancel,
    cancel_async,
    get_progress,
    get_progress_async
)
```

#### Functions

``` python
def _maybe_serialize_input(
    self,
    obj: Any # Object to potentially serialize
) -> Any: # Serialized form (path string or original object)
    "Convert FileBackedDTO objects to file paths for zero-copy transfer."
```

``` python
def _prepare_payload(
    self,
    args: tuple, # Positional arguments
    kwargs: dict # Keyword arguments
) -> Dict[str, Any]: # JSON-serializable payload
    "Prepare arguments for HTTP transmission."
```

``` python
async def execute_async(
    self,
    *args,
    **kwargs
) -> Any: # Plugin result
    "Execute the plugin asynchronously."
```

``` python
def execute_stream_sync(self, *args, **kwargs) -> Generator[Any, None, None]
    "Synchronous wrapper for streaming (blocking)."
```

``` python
async def execute_stream(
    self,
    *args,
    **kwargs
) -> AsyncGenerator[Any, None]: # Yields parsed JSON chunks
    "Execute with streaming response (async generator)."
```

``` python
def get_stats(self) -> Dict[str, Any]: # Process telemetry
    """Get worker process resource usage."""
    with httpx.Client() as client
    "Get worker process resource usage."
```

``` python
def is_alive(self) -> bool: # True if worker is responsive
    """Check if the worker process is still running and responsive."""
    if not self.process or self.process.poll() is not None
    "Check if the worker process is still running and responsive."
```

``` python
def cancel(self) -> bool: # True if cancel request was sent
    """Request cancellation of running execution."""
    try
    "Request cancellation of running execution."
```

``` python
async def cancel_async(self) -> bool: # True if cancel request was sent
    """Request cancellation asynchronously."""
    try
    "Request cancellation asynchronously."
```

``` python
def get_progress(self) -> Dict[str, Any]: # {progress: float, message: str}
    """Get current execution progress from worker."""
    try
    "Get current execution progress from worker."
```

``` python
async def get_progress_async(self) -> Dict[str, Any]: # {progress: float, message: str}
    """Get current execution progress asynchronously."""
    try
    "Get current execution progress asynchronously."
```

``` python
def __enter__(self):
    """Enter context manager."""
    return self

def __exit__(self, exc_type, exc_val, exc_tb)
    "Enter context manager."
```

``` python
def __exit__(self, exc_type, exc_val, exc_tb):
    """Exit context manager and cleanup."""
    self.cleanup()
    return False

async def __aenter__(self)
    "Exit context manager and cleanup."
```

``` python
async def __aenter__(self):
    """Enter async context manager."""
    return self

async def __aexit__(self, exc_type, exc_val, exc_tb)
    "Enter async context manager."
```

``` python
async def __aexit__(self, exc_type, exc_val, exc_tb)
    "Exit async context manager and cleanup."
```

#### Classes

``` python
class RemotePluginProxy:
    def __init__(
        self,
        manifest: Dict[str, Any] # Plugin manifest with python_path, module, class, etc.
    )
    "Proxy that forwards plugin calls to an isolated Worker subprocess."
    
    def __init__(
            self,
            manifest: Dict[str, Any] # Plugin manifest with python_path, module, class, etc.
        )
        "Initialize proxy and start the worker process."
    
    def name(self) -> str: # Plugin name from manifest
            """Plugin name."""
            return self.manifest.get('name', 'unknown')
        
        @property
        def version(self) -> str: # Plugin version from manifest
        "Plugin name."
    
    def version(self) -> str: # Plugin version from manifest
            """Plugin version."""
            return self.manifest.get('version', '0.0.0')
    
        def _get_free_port(self) -> int
        "Plugin version."
    
    def initialize(
            self,
            config: Optional[Dict[str, Any]] = None # Configuration dictionary
        ) -> None
        "Initialize or reconfigure the plugin."
    
    def execute(
            self,
            *args,
            **kwargs
        ) -> Any: # Plugin result
        "Execute the plugin synchronously."
    
    def get_config_schema(self) -> Dict[str, Any]: # JSON Schema
            """Get the plugin's configuration schema."""
            with httpx.Client() as client
        "Get the plugin's configuration schema."
    
    def get_current_config(self) -> Dict[str, Any]: # Current config values
            """Get the plugin's current configuration."""
            with httpx.Client() as client
        "Get the plugin's current configuration."
    
    def cleanup(self) -> None:
            """Clean up plugin resources and terminate worker process."""
            # Send cleanup request to worker
            try
        "Clean up plugin resources and terminate worker process."
```

### Job Queue (`queue.ipynb`)

> Resource-aware job queue for sequential plugin execution with
> cancellation support

#### Import

``` python
from cjm_plugin_system.core.queue import (
    JobStatus,
    Job,
    JobQueue,
    submit,
    cancel,
    reorder,
    get_job,
    wait_for_job,
    get_state,
    get_job_logs,
    start,
    stop
)
```

#### Functions

``` python
async def submit(
    self,
    plugin_name: str,  # Target plugin
    *args,
    priority: int = 0,  # Higher = more urgent
    **kwargs
) -> str:  # Returns job_id
    "Submit a job to the queue."
```

``` python
async def cancel(
    self,
    job_id: str  # Job to cancel
) -> bool:  # True if cancelled
    "Cancel a pending or running job."
```

``` python
def reorder(
    self,
    job_id: str,  # Job to move
    new_priority: int  # New priority value
) -> bool:  # True if reordered
    "Change the priority of a pending job."
```

``` python
def get_job(
    self,
    job_id: str  # Job to retrieve
) -> Optional[Job]:  # Job or None
    "Get a job by ID."
```

``` python
async def wait_for_job(
    self,
    job_id: str,  # Job to wait for
    timeout: Optional[float] = None  # Max seconds to wait
) -> Job:  # Completed/failed/cancelled job
    "Wait for a job to complete."
```

``` python
def get_state(self) -> Dict[str, Any]:  # Queue state for UI
    """Get the current queue state."""
    running_dict = None
    if self._running
    "Get the current queue state."
```

``` python
def get_job_logs(
    self,
    job_id: str,  # Job to get logs for
    lines: int = 100  # Max lines to return
) -> str:  # Log content
    "Get logs for a job from the plugin's log file."
```

``` python
async def start(self) -> None:
    """Start the queue processor."""
    if self._running_flag
    "Start the queue processor."
```

``` python
async def stop(self) -> None:
    """Stop the queue processor gracefully."""
    self._running_flag = False
    self._job_available.set()  # Wake up the processor
    
    if self._processor_task
    "Stop the queue processor gracefully."
```

``` python
def _move_to_history(self, job: Job) -> None:
    """Move a job to history, maintaining max_history limit."""
    self._history.append(job)
    if len(self._history) > self.max_history
    "Move a job to history, maintaining max_history limit."
```

``` python
def _signal_job_completed(self, job_id: str) -> None:
    """Signal that a job has completed."""
    event = self._job_completed_events.get(job_id)
    if event
    "Signal that a job has completed."
```

``` python
async def _process_loop(self) -> None:
    """Main processing loop."""
    while self._running_flag
    "Main processing loop."
```

``` python
async def _execute_job(self, job: Job) -> None:
    """Execute a single job."""
    self.logger.info(f"Starting job {job.id[:8]} ({job.plugin_name})")
    
    # Mark as running
    job.status = JobStatus.running
    job.started_at = time.time()
    self._running = job
    
    try
    "Execute a single job."
```

``` python
async def _execute_with_cancellation(
    self,
    job: Job,
    plugin: Any
) -> Any
    "Execute job with cancellation monitoring."
```

``` python
async def _poll_progress(
    self,
    job: Job,
    plugin: Any
) -> None
    "Poll progress from the plugin during execution."
```

#### Classes

``` python
class JobStatus(str, Enum):
    "Status of a job in the queue."
```

``` python
@dataclass
class Job:
    "A queued plugin execution request."
    
    id: str  # Unique job identifier (UUID)
    plugin_name: str  # Target plugin name
    args: Tuple[Any, ...]  # Positional arguments for execute()
    kwargs: Dict[str, Any]  # Keyword arguments for execute()
    status: JobStatus = JobStatus.pending  # Current job status
    priority: int = 0  # Higher = more urgent
    created_at: float = field(...)  # Submission timestamp
    started_at: Optional[float]  # Execution start timestamp
    completed_at: Optional[float]  # Completion timestamp
    result: Any  # Execution result (if completed)
    error: Optional[str]  # Error message (if failed)
    progress: float = 0.0  # 0.0 to 1.0, or -1.0 for indeterminate
    status_message: str = ''  # Descriptive status message
    
```

``` python
class JobQueue:
    def __init__(
        self,
        manager: PluginManager,  # Plugin manager instance
        max_history: int = 100,  # Max completed jobs to retain
        cancel_timeout: float = 3.0,  # Seconds to wait for cooperative cancel
        progress_poll_interval: float = 1.0  # Seconds between progress polls
    )
    "Resource-aware job queue for sequential plugin execution."
    
    def __init__(
            self,
            manager: PluginManager,  # Plugin manager instance
            max_history: int = 100,  # Max completed jobs to retain
            cancel_timeout: float = 3.0,  # Seconds to wait for cooperative cancel
            progress_poll_interval: float = 1.0  # Seconds between progress polls
        )
        "Initialize the job queue."
```

### Scheduling (`scheduling.ipynb`)

> Resource scheduling policies for plugin execution

#### Import

``` python
from cjm_plugin_system.core.scheduling import (
    ResourceScheduler,
    PermissiveScheduler,
    SafetyScheduler,
    QueueScheduler
)
```

#### Classes

``` python
class ResourceScheduler(ABC):
    "Abstract base class for resource allocation policies."
    
    def allocate(
            self,
            plugin_meta: PluginMeta,  # Metadata of the plugin requesting resources
            stats_provider: Callable[[], Dict[str, Any]]  # Function that returns fresh stats
        ) -> bool:  # True if execution is allowed
        "Decide if a plugin can start based on its requirements and system state."
    
    async def allocate_async(
            self,
            plugin_meta: PluginMeta,  # Metadata of the plugin requesting resources
            stats_provider: Callable[[], Awaitable[Dict[str, Any]]]  # Async function returning stats
        ) -> bool:  # True if execution is allowed
        "Async allocation decision. Default delegates to sync allocate after fetching stats once."
    
    def on_execution_start(
            self,
            plugin_name: str  # Name of the plugin starting execution
        ) -> None
        "Notify scheduler that a task started (to reserve resources)."
    
    def on_execution_finish(
            self,
            plugin_name: str  # Name of the plugin finishing execution
        ) -> None
        "Notify scheduler that a task finished (to release resources)."
```

``` python
class PermissiveScheduler(ResourceScheduler):
    "Scheduler that allows all executions (Default / Dev Mode)."
    
    def allocate(
            self,
            plugin_meta: PluginMeta,  # Metadata of the plugin requesting resources
            stats_provider: Callable[[], Dict[str, Any]]  # Stats provider (ignored)
        ) -> bool:  # Always returns True
        "Allow all plugin executions without checking resources."
    
    def on_execution_start(
            self,
            plugin_name: str  # Name of the plugin starting execution
        ) -> None
        "No-op for permissive scheduler."
    
    def on_execution_finish(
            self,
            plugin_name: str  # Name of the plugin finishing execution
        ) -> None
        "No-op for permissive scheduler."
```

``` python
class SafetyScheduler(ResourceScheduler):
    "Scheduler that prevents execution if resources are insufficient."
    
    def allocate(
            self,
            plugin_meta: PluginMeta,  # Metadata of the plugin requesting resources
            stats_provider: Callable[[], Dict[str, Any]]  # Function returning current stats
        ) -> bool:  # True if resources are available
        "Check resource requirements against system state."
    
    def on_execution_start(
            self,
            plugin_name: str  # Name of the plugin starting execution
        ) -> None
        "Called when execution starts (for future resource reservation)."
    
    def on_execution_finish(
            self,
            plugin_name: str  # Name of the plugin finishing execution
        ) -> None
        "Called when execution finishes (for future resource release)."
```

``` python
class QueueScheduler:
    def __init__(
        self,
        timeout: float = 300.0,  # Max seconds to wait for resources
        poll_interval: float = 2.0  # Seconds between resource checks
    )
    "Scheduler that waits for resources to become available."
    
    def __init__(
            self,
            timeout: float = 300.0,  # Max seconds to wait for resources
            poll_interval: float = 2.0  # Seconds between resource checks
        )
        "Initialize queue scheduler with timeout and polling settings."
    
    def allocate(
            self,
            plugin_meta: PluginMeta,  # Metadata of the plugin requesting resources
            stats_provider: Callable[[], Dict[str, Any]]  # Function returning current stats
        ) -> bool:  # True if resources become available before timeout
        "Wait for resources using blocking sleep."
    
    async def allocate_async(
            self,
            plugin_meta: PluginMeta,  # Metadata of the plugin requesting resources
            stats_provider: Callable[[], Awaitable[Dict[str, Any]]]  # Async stats function
        ) -> bool:  # True if resources become available before timeout
        "Wait for resources using non-blocking async sleep."
    
    def on_execution_start(
            self,
            plugin_name: str  # Name of the plugin starting execution
        ) -> None
        "Track that a plugin has started executing."
    
    def on_execution_finish(
            self,
            plugin_name: str  # Name of the plugin finishing execution
        ) -> None
        "Track that a plugin has finished executing."
    
    def get_active_plugins(self) -> Set[str]:  # Set of currently executing plugin names
        "Get the set of plugins with active executions."
```

### Configuration Validation (`validation.ipynb`)

> Validation helpers for plugin configuration dataclasses

#### Import

``` python
from cjm_plugin_system.utils.validation import (
    T,
    SCHEMA_TITLE,
    SCHEMA_DESC,
    SCHEMA_MIN,
    SCHEMA_MAX,
    SCHEMA_ENUM,
    SCHEMA_MIN_LEN,
    SCHEMA_MAX_LEN,
    SCHEMA_PATTERN,
    SCHEMA_FORMAT,
    validate_field_value,
    validate_config,
    config_to_dict,
    dict_to_config,
    extract_defaults,
    dataclass_to_jsonschema
)
```

#### Functions

``` python
def validate_field_value(
    value:Any, # Value to validate
    metadata:Dict[str, Any], # Field metadata containing constraints
    field_name:str="" # Field name for error messages
) -> Tuple[bool, Optional[str]]: # (is_valid, error_message)
    "Validate a value against field metadata constraints."
```

``` python
def validate_config(
    config:Any # Configuration dataclass instance to validate
) -> Tuple[bool, Optional[str]]: # (is_valid, error_message)
    "Validate all fields in a configuration dataclass against their metadata constraints."
```

``` python
def config_to_dict(
    config:Any # Configuration dataclass instance
) -> Dict[str, Any]: # Dictionary representation of the configuration
    "Convert a configuration dataclass instance to a dictionary."
```

``` python
def dict_to_config(
    config_class:Type[T], # Configuration dataclass type
    data:Optional[Dict[str, Any]]=None, # Dictionary with configuration values
    validate:bool=False # Whether to validate against metadata constraints
) -> T: # Instance of the configuration dataclass
    "Create a configuration dataclass instance from a dictionary."
```

``` python
def extract_defaults(
    config_class:Type # Configuration dataclass type
) -> Dict[str, Any]: # Default values from the dataclass
    "Extract default values from a configuration dataclass type."
```

``` python
def _python_type_to_json_type(
    python_type:type # Python type annotation to convert
) -> Dict[str, Any]: # JSON schema type definition
    "Convert Python type to JSON schema type."
```

``` python
def dataclass_to_jsonschema(
    cls:type # Dataclass with field metadata
) -> Dict[str, Any]: # JSON schema dictionary
    "Convert a dataclass to a JSON schema for form generation."
```

#### Variables

``` python
T
SCHEMA_TITLE = 'title'  # Display title for the field
SCHEMA_DESC = 'description'  # Help text description
SCHEMA_MIN = 'minimum'  # Minimum value for numbers
SCHEMA_MAX = 'maximum'  # Maximum value for numbers
SCHEMA_ENUM = 'enum'  # Allowed values for dropdowns
SCHEMA_MIN_LEN = 'minLength'  # Minimum string length
SCHEMA_MAX_LEN = 'maxLength'  # Maximum string length
SCHEMA_PATTERN = 'pattern'  # Regex pattern for strings
SCHEMA_FORMAT = 'format'  # String format (email, uri, date, etc.)
```

### Universal Worker (`worker.ipynb`)

> FastAPI server that runs inside isolated plugin environments

#### Import

``` python
from cjm_plugin_system.core.worker import (
    EnhancedJSONEncoder,
    parent_monitor,
    create_app,
    run_worker
)
```

#### Functions

``` python
def parent_monitor(
    ppid: int # Parent process ID to monitor
) -> None
    "Monitor parent process and terminate self if parent dies."
```

``` python
def create_app(
    module_name: str, # Python module path (e.g., "my_plugin.plugin")
    class_name: str   # Plugin class name (e.g., "WhisperPlugin")
) -> FastAPI: # Configured FastAPI application
    "Create FastAPI app that hosts the specified plugin."
```

``` python
def run_worker() -> None:
    """CLI entry point for running the worker."""
    parser = argparse.ArgumentParser(description="Universal Plugin Worker")
    parser.add_argument("--module", required=True, help="Plugin module path")
    parser.add_argument("--class", dest="class_name", required=True, help="Plugin class name")
    parser.add_argument("--port", type=int, required=True, help="Port to listen on")
    parser.add_argument("--ppid", type=int, required=False, help="Parent PID to monitor")
    args = parser.parse_args()

    # Start watchdog if parent PID provided
    if args.ppid
    "CLI entry point for running the worker."
```

#### Classes

``` python
class EnhancedJSONEncoder(JSONEncoder):
    "JSON encoder that handles dataclasses and other common types."
    
    def default(
            self,
            o: Any # Object to encode
        ) -> Any: # JSON-serializable representation
        "Convert non-serializable objects to serializable form."
```
