Metadata-Version: 2.4
Name: mcpc
Version: 0.1.0
Summary: MCP Callback Protocol extension for enabling asynchronous tool callbacks and streaming updates from MCP clients
Project-URL: Homepage, https://github.com/OlaHulleberg/mcpc
Project-URL: Bug Tracker, https://github.com/OlaHulleberg/mcpc/issues
Project-URL: Documentation, https://github.com/OlaHulleberg/mcpc#readme
Author-email: Ola Hulleberg <ola@hulleberg.net>
License: MIT
Keywords: async,callbacks,mcp,protocol,streaming
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.12
Requires-Dist: mcp[cli]>=1.6.0
Requires-Dist: pydantic>=2.0.0
Description-Content-Type: text/markdown

# MCPC - Model Context Protocol Callback

An extension to the MCP (Model-Call-Provider) protocol that enables asynchronous real-time callbacks and streaming updates from MCP tools.

## Quick Start

### Installation

```bash
pip install mcpc
```

### Client Usage

```python
from mcpc import MCPCHandler
from mcp import ClientSession
from mcp.client.stdio import stdio_client

# Define your callback function
async def my_mcpc_callback(mcpc_message):
    print(f"Received MCPC message: {mcpc_message}")
    # Handle the message based on status
    if mcpc_message.status == "task_complete":
        print(f"Task {mcpc_message.task_id} completed with result: {mcpc_message.result}")

# Initialize the MCPC handler with your callback
mcpc_handler = MCPCHandler("my-provider", my_mcpc_callback)

# In your connection logic:
async def connect_to_mcp():
    # Connect to MCP provider
    transport = await stdio_client(parameters)

    # Wrap the transport with MCPC event listeners
    wrapped_transport = await mcpc_handler.wrap_streams(*transport)

    # Create a ClientSession with the wrapped transport
    session = await ClientSession(*wrapped_transport)

    # Initialize the session
    await session.initialize()

    # Check if MCPC is supported
    mcpc_supported = await mcpc_handler.check_mcpc_support(session)
    if mcpc_supported:
        print(f"MCPC protocol v{mcpc_handler.protocol_version} supported")

    return session

# When calling tools, add MCPC metadata
async def run_tool(session, tool_name, tool_args, session_id):
    # Add MCPC metadata if supported
    enhanced_args = mcpc_handler.add_metadata(tool_args, session_id)

    # Call the tool with enhanced arguments
    return await session.call_tool(tool_name, enhanced_args)
```

## Why MCPC Exists

I created MCPC to solve a critical limitation in LLM tool interactions: **maintaining conversational flow while running background tasks**.

The standard MCP protocol follows a synchronous request-response pattern, which blocks the conversation until a tool completes. This creates poor UX when:

1. You want to chat with an LLM while a long-running task executes
2. You need real-time progress updates from background operations
3. You're running tasks that potentially continue forever (like monitoring)

MCPC addresses these limitations by enabling:

- Continuous conversation with LLMs during tool execution
- Real-time updates from background processes
- Asynchronous notifications when operations complete
- Support for indefinitely running tasks with streaming updates

For example, you might start a data processing task, continue discussing with the LLM about the expected results, receive progress updates throughout, and get notified when processing completes - all without interrupting the conversation flow.

MCPC also enables powerful interactive patterns that weren't possible before in MCP:

- **Modifying running tasks**: You can adjust parameters or change the behavior of a task while it's running (e.g., "focus on this subset of data instead" or "I see that you're misunderstanding some relations, can you please parse the PDF first?")
- **Tool-initiated prompts**: A tool can ask for clarification when it encounters ambiguity or needs additional input (e.g., "I found multiple matches, which one did you mean?" or "I need additional authorization to proceed")
- **Conversation branching**: Start multiple background tasks and selectively respond to their updates while maintaining conversational context

These capabilities create a much more natural interaction model where tools feel like collaborative participants in the conversation rather than black-box functions.

## How MCPC Works

MCPC extends MCP by:

1. **Adding metadata to tool calls**: Session and task identifiers
2. **Defining a message structure**: Standardized format for callbacks
3. **Providing stream interception**: Monitors I/O streams for MCPC messages
4. **Implementing task management**: Handles background tasks and messaging

The protocol is fully backward compatible with MCP, allowing MCPC-enabled clients to work with standard MCP servers, and vice versa.

## Server Implementation

For implementing MCPC in your MCP servers, use the `MCPCHelper` class to handle message creation, background tasks, and progress updates.

```python
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from mcpc import MCPCHelper
import asyncio
import uuid

# Initialize MCPC helper
PROVIDER_NAME = "my-processor"
mcpc = MCPCHelper(PROVIDER_NAME)

async def serve():
    """Run the MCP server with MCPC support."""
    server = Server(PROVIDER_NAME)

    @server.list_tools()
    async def list_tools():
        return [
            Tool(
                name="process_data",
                description="Process data with real-time progress updates.",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "data_id": {"type": "string"},
                        "process_type": {"type": "string"}
                    },
                    "required": ["data_id"]
                }
            )
        ]

    @server.call_tool()
    async def call_tool(name, arguments):
        # Extract MCPC metadata
        metadata = arguments.pop("_metadata", {})
        session_id = metadata.get("mcpc_session_id", "default")
        task_id = metadata.get("mcpc_task_id", str(uuid.uuid4()))

        # Handle MCPC protocol info request
        if name == "is_mcpc_enabled":
            info = mcpc.get_protocol_info()
            return [TextContent(type="text", text=info.model_dump_json())]

        # Handle the tool call
        if name == "process_data":
            data_id = arguments.get("data_id")

            # Define the background task that will provide real-time updates
            async def process_data_task():
                try:
                    # Send initial update
                    await mcpc.send_direct(mcpc.create_message(
                        tool_name="process_data",
                        session_id=session_id,
                        task_id=task_id,
                        result="Starting data processing",
                        status="task_update"
                    ).model_dump_json())

                    # Simulate work with progress updates
                    total_steps = 5
                    for step in range(1, total_steps + 1):
                        # Send progress update
                        await mcpc.send_direct(mcpc.create_message(
                            tool_name="process_data",
                            session_id=session_id,
                            task_id=task_id,
                            result={
                                "status": f"Processing step {step}/{total_steps}",
                                "progress": step / total_steps * 100
                            },
                            status="task_update"
                        ).model_dump_json())

                        # Simulate work
                        await asyncio.sleep(1)

                    # Send completion message
                    await mcpc.send_direct(mcpc.create_message(
                        tool_name="process_data",
                        session_id=session_id,
                        task_id=task_id,
                        result={
                            "status": "Complete",
                            "data_id": data_id,
                            "summary": "Processing completed successfully"
                        },
                        status="task_complete"
                    ).model_dump_json())

                except Exception as e:
                    # Send error message
                    await mcpc.send_direct(mcpc.create_message(
                        tool_name="process_data",
                        session_id=session_id,
                        task_id=task_id,
                        result=f"Error: {str(e)}",
                        status="task_failed"
                    ).model_dump_json())

                finally:
                    # Clean up task
                    mcpc.cleanup_task(task_id)

            # Start the background task
            mcpc.start_task(task_id, process_data_task)

            # Return immediate response
            response = mcpc.create_message(
                tool_name="process_data",
                session_id=session_id,
                task_id=task_id,
                result=f"Started processing data_id={data_id}. Updates will stream in real-time.",
                status="task_created"
            )

            return [TextContent(type="text", text=response.model_dump_json())]

    # Start the server
    options = server.create_initialization_options()
    async with stdio_server() as (read_stream, write_stream):
        await server.run(read_stream, write_stream, options)

if __name__ == "__main__":
    asyncio.run(serve())
```

## Advanced Server Features

The `MCPCHelper` class provides additional features for complex server implementations:

1. **Task Management**

   - `start_task()`: Run a background task with automatic thread management
   - `check_task()`: Get the status of a running task
   - `stop_task()`: Request a task to stop gracefully
   - `cleanup_task()`: Remove a completed task from tracking

2. **Message Creation**

   - `create_message()`: Create standardized MCPC protocol messages
   - `send_direct()`: Send messages directly to clients over stdout

3. **Protocol Information**
   - `get_protocol_info()`: Return MCPC protocol compatibility information

## MCPC Message Structure

MCPC messages have the following structure:

```python
class MCPCMessage:
    session_id: str      # Unique session identifier
    task_id: str         # Unique task identifier
    tool_name: str       # Name of the tool being called
    result: Any = None   # Result or update data
    status: str = "task_update"  # Status of the task
    type: str = "mcpc"   # Protocol identifier
```

## Message Status Types

MCPC defines four standard callback states:

- `task_created`: Initial acknowledgment when task begins
- `task_update`: Progress updates during task execution
- `task_complete`: Final result when task completes successfully
- `task_failed`: Error information when task fails

## Use Cases

MCPC is ideal for:

- **Interactive AI Agents**: Chat with LLMs while tasks run in the background
- **Data Processing**: Stream progress updates during large file processing
- **ML Training**: Monitor model training progress in real-time
- **Content Generation**: Receive partial results as they're generated
- **Long-Running Operations**: Support for tasks that run indefinitely
- **Distributed Systems**: Coordinate asynchronous operations across services

## Compatibility

MCPC is designed to be fully backward compatible with the MCP protocol:

- MCPC-enabled clients can communicate with standard MCP servers
- MCPC-enabled servers can respond to standard MCP clients
- The protocol negotiation ensures graceful fallback to standard MCP when needed

## License

MIT
