Skip to content

FastMCP Integration

FastMCP integration enables dependency injection for MCP (Model Context Protocol) servers, providing automatic service resolution with proper request scoping and lifecycle management.

🎯 Getting Started

Basic Setup

from fastmcp import FastMCP
from injectq import InjectQ
from injectq.integrations.fastmcp import setup_fastmcp_integration, InjectQDependency

# 1. Create container and bind services
container = InjectQ()
container.bind(IDocumentService, DocumentService())
container.bind(IUserService, UserService())
container.bind(IAuthService, AuthService())

# 2. Create FastMCP server
mcp = FastMCP("My MCP Server")

# 3. Set up integration
setup_fastmcp_integration(mcp, container)

# 4. Use dependency injection in tools
@mcp.tool()
async def search_documents(
    query: str,
    limit: int = 10,
    doc_service: IDocumentService = InjectQDependency(IDocumentService),
    user_service: IUserService = InjectQDependency(IUserService)
):
    """Search documents with user context."""
    # Get current user from context
    user = user_service.get_current_user()

    # Search documents
    results = await doc_service.search_documents(
        query=query,
        user_id=user.id,
        limit=limit
    )

    return results

@mcp.tool()
async def create_document(
    title: str,
    content: str,
    doc_service: IDocumentService = InjectQDependency(IDocumentService),
    auth_service: IAuthService = InjectQDependency(IAuthService)
):
    """Create a new document."""
    # Verify permissions
    if not auth_service.has_permission("create_document"):
        raise ValueError("Insufficient permissions")

    # Create document
    document = await doc_service.create_document(
        title=title,
        content=content
    )

    return document

# 5. Start the server
if __name__ == "__main__":
    mcp.run()

Service Definitions

from typing import Protocol, List, Optional
from datetime import datetime

# Define service interfaces
class IDocumentService(Protocol):
    async def search_documents(self, query: str, user_id: int, limit: int) -> List[Document]: ...
    async def create_document(self, title: str, content: str) -> Document: ...
    async def get_document(self, doc_id: int) -> Optional[Document]: ...
    async def update_document(self, doc_id: int, title: str, content: str) -> Document: ...
    async def delete_document(self, doc_id: int) -> bool: ...

class IUserService(Protocol):
    def get_current_user(self) -> User: ...
    def get_user(self, user_id: int) -> Optional[User]: ...
    def get_user_permissions(self, user_id: int) -> List[str]: ...

class IAuthService(Protocol):
    def has_permission(self, permission: str) -> bool: ...
    def authenticate_user(self, token: str) -> Optional[User]: ...
    def authorize_action(self, user: User, action: str, resource: str) -> bool: ...

# Data models
class User:
    def __init__(self, id: int, name: str, email: str):
        self.id = id
        self.name = name
        self.email = email

class Document:
    def __init__(self, id: int, title: str, content: str, user_id: int, created_at: datetime):
        self.id = id
        self.title = title
        self.content = content
        self.user_id = user_id
        self.created_at = created_at

# Implement services
class DocumentService:
    def __init__(self, db: IDatabaseConnection):
        self.db = db

    async def search_documents(self, query: str, user_id: int, limit: int) -> List[Document]:
        # Search documents in database
        return await self.db.query(Document).filter(
            Document.user_id == user_id,
            Document.title.contains(query) | Document.content.contains(query)
        ).limit(limit).all()

    async def create_document(self, title: str, content: str) -> Document:
        # Get current user from context
        user_service = self.db.get(IUserService)  # Injected via container
        user = user_service.get_current_user()

        document = Document(
            id=self.db.next_id(),
            title=title,
            content=content,
            user_id=user.id,
            created_at=datetime.now()
        )

        await self.db.save(document)
        return document

    async def get_document(self, doc_id: int) -> Optional[Document]:
        return await self.db.query(Document).filter(Document.id == doc_id).first()

    async def update_document(self, doc_id: int, title: str, content: str) -> Document:
        document = await self.get_document(doc_id)
        if not document:
            raise ValueError(f"Document {doc_id} not found")

        document.title = title
        document.content = content
        await self.db.save(document)
        return document

    async def delete_document(self, doc_id: int) -> bool:
        document = await self.get_document(doc_id)
        if not document:
            return False

        await self.db.delete(document)
        return True

class UserService:
    def __init__(self, db: IDatabaseConnection):
        self.db = db
        self._current_user = None

    def get_current_user(self) -> User:
        if self._current_user is None:
            # Get from request context (set by middleware)
            self._current_user = self._get_user_from_context()
        return self._current_user

    def _get_user_from_context(self) -> User:
        # Implementation depends on your auth system
        # This would typically get user from request context
        pass

    def get_user(self, user_id: int) -> Optional[User]:
        return self.db.query(User).filter(User.id == user_id).first()

    def get_user_permissions(self, user_id: int) -> List[str]:
        # Get user permissions from database
        return self.db.query(Permission).filter(Permission.user_id == user_id).all()

class AuthService:
    def __init__(self, user_service: IUserService):
        self.user_service = user_service

    def has_permission(self, permission: str) -> bool:
        user = self.user_service.get_current_user()
        permissions = self.user_service.get_user_permissions(user.id)
        return permission in permissions

    def authenticate_user(self, token: str) -> Optional[User]:
        # Verify token and return user
        pass

    def authorize_action(self, user: User, action: str, resource: str) -> bool:
        # Check if user can perform action on resource
        pass

🔧 Advanced Configuration

Request-Scoped Services

from injectq import scoped

@scoped
class RequestContext:
    def __init__(self):
        self.request_id = str(uuid.uuid4())
        self.start_time = time.time()
        self.user = None
        self.metadata = {}

    def set_user(self, user: User):
        self.user = user

    def set_metadata(self, key: str, value: Any):
        self.metadata[key] = value

    def get_duration(self) -> float:
        return time.time() - self.start_time

@scoped
class RequestMetrics:
    def __init__(self):
        self.operations = []
        self.errors = []

    def record_operation(self, operation: str, duration: float):
        self.operations.append({
            "operation": operation,
            "duration": duration,
            "timestamp": time.time()
        })

    def record_error(self, error: str):
        self.errors.append({
            "error": error,
            "timestamp": time.time()
        })

# Use in MCP tools
@mcp.tool()
async def complex_document_operation(
    doc_id: int,
    operation: str,
    ctx: RequestContext = InjectQDependency(RequestContext),
    metrics: RequestMetrics = InjectQDependency(RequestMetrics),
    doc_service: IDocumentService = InjectQDependency(IDocumentService)
):
    ctx.set_metadata("operation", operation)
    ctx.set_metadata("doc_id", doc_id)

    try:
        # Perform operation with metrics
        start_time = time.time()

        if operation == "get":
            result = await doc_service.get_document(doc_id)
        elif operation == "update":
            result = await doc_service.update_document(doc_id, "New Title", "New Content")
        else:
            raise ValueError(f"Unknown operation: {operation}")

        duration = time.time() - start_time
        metrics.record_operation(operation, duration)

        return result

    except Exception as e:
        metrics.record_error(str(e))
        raise

Module-Based Setup

from injectq import Module

class DocumentModule(Module):
    def configure(self, binder):
        # Document services
        binder.bind(IDocumentService, DocumentService())
        binder.bind(IUserService, UserService())
        binder.bind(IAuthService, AuthService())

        # Request context services
        binder.bind(RequestContext, RequestContext())
        binder.bind(RequestMetrics, RequestMetrics())

class InfrastructureModule(Module):
    def configure(self, binder):
        # Database and external services
        binder.bind(IDatabaseConnection, PostgresConnection())
        binder.bind(ICacheService, RedisCache())

class AuthModule(Module):
    def configure(self, binder):
        # Authentication services
        binder.bind(ITokenService, JWTTokenService())
        binder.bind(ISessionService, SessionService())

def create_mcp_server() -> FastMCP:
    # Create container with modules
    container = InjectQ()
    container.install(InfrastructureModule())
    container.install(AuthModule())
    container.install(DocumentModule())

    # Create MCP server
    mcp = FastMCP("Document Management Server")

    # Set up integration
    setup_fastmcp_integration(mcp, container)

    return mcp

# Usage
mcp = create_mcp_server()

🎨 MCP Tool Patterns

Document Management Tools

@mcp.tool()
async def list_user_documents(
    limit: int = 20,
    offset: int = 0,
    doc_service: IDocumentService = InjectQDependency(IDocumentService)
):
    """List current user's documents."""
    documents = await doc_service.list_user_documents(limit, offset)
    return {
        "documents": [
            {
                "id": doc.id,
                "title": doc.title,
                "created_at": doc.created_at.isoformat()
            }
            for doc in documents
        ],
        "total": len(documents)
    }

@mcp.tool()
async def search_documents(
    query: str,
    category: Optional[str] = None,
    limit: int = 10,
    doc_service: IDocumentService = InjectQDependency(IDocumentService)
):
    """Search documents with advanced filters."""
    results = await doc_service.search_documents(
        query=query,
        category=category,
        limit=limit
    )

    return {
        "query": query,
        "results": [
            {
                "id": result.id,
                "title": result.title,
                "snippet": result.content[:200] + "..." if len(result.content) > 200 else result.content,
                "score": result.score
            }
            for result in results
        ]
    }

@mcp.tool()
async def create_document_from_template(
    template_id: int,
    title: str,
    variables: dict,
    doc_service: IDocumentService = InjectQDependency(IDocumentService),
    template_service: ITemplateService = InjectQDependency(ITemplateService)
):
    """Create document from template with variable substitution."""
    # Get template
    template = await template_service.get_template(template_id)

    # Substitute variables
    content = template.content
    for key, value in variables.items():
        content = content.replace(f"{{{{ {key} }}}}", str(value))

    # Create document
    document = await doc_service.create_document(title, content)

    return {
        "document_id": document.id,
        "title": document.title,
        "created_at": document.created_at.isoformat()
    }

User Management Tools

@mcp.tool()
async def get_user_profile(
    user_service: IUserService = InjectQDependency(IUserService)
):
    """Get current user's profile."""
    user = user_service.get_current_user()
    permissions = user_service.get_user_permissions(user.id)

    return {
        "id": user.id,
        "name": user.name,
        "email": user.email,
        "permissions": permissions
    }

@mcp.tool()
async def update_user_preferences(
    preferences: dict,
    user_service: IUserService = InjectQDependency(IUserService),
    preference_service: IPreferenceService = InjectQDependency(IPreferenceService)
):
    """Update user preferences."""
    user = user_service.get_current_user()

    await preference_service.update_preferences(user.id, preferences)

    return {"message": "Preferences updated successfully"}

@mcp.tool()
async def share_document(
    document_id: int,
    target_user_id: int,
    permissions: List[str],
    doc_service: IDocumentService = InjectQDependency(IDocumentService),
    auth_service: IAuthService = InjectQDependency(IAuthService)
):
    """Share document with another user."""
    # Check if current user owns the document
    document = await doc_service.get_document(document_id)
    current_user = doc_service.user_service.get_current_user()

    if document.user_id != current_user.id:
        raise ValueError("You can only share documents you own")

    # Check permissions
    if not auth_service.authorize_action(current_user, "share", "document"):
        raise ValueError("Insufficient permissions to share documents")

    # Share document
    await doc_service.share_document(document_id, target_user_id, permissions)

    return {"message": f"Document shared with user {target_user_id}"}

Analytics and Reporting Tools

@mcp.tool()
async def get_document_stats(
    document_id: int,
    analytics_service: IAnalyticsService = InjectQDependency(IAnalyticsService),
    auth_service: IAuthService = InjectQDependency(IAuthService)
):
    """Get analytics for a document."""
    # Check permissions
    if not auth_service.has_permission("view_analytics"):
        raise ValueError("Insufficient permissions")

    stats = await analytics_service.get_document_stats(document_id)

    return {
        "document_id": document_id,
        "views": stats.views,
        "unique_viewers": stats.unique_viewers,
        "last_viewed": stats.last_viewed.isoformat() if stats.last_viewed else None,
        "average_session_duration": stats.average_session_duration
    }

@mcp.tool()
async def generate_user_report(
    user_id: Optional[int] = None,
    date_from: Optional[str] = None,
    date_to: Optional[str] = None,
    report_service: IReportService = InjectQDependency(IReportService),
    auth_service: IAuthService = InjectQDependency(IAuthService)
):
    """Generate user activity report."""
    # Check permissions
    if not auth_service.has_permission("generate_reports"):
        raise ValueError("Insufficient permissions")

    # Default to current user if not specified
    if user_id is None:
        user_service = report_service.container.get(IUserService)
        user_id = user_service.get_current_user().id

    # Parse dates
    from_date = datetime.fromisoformat(date_from) if date_from else None
    to_date = datetime.fromisoformat(date_to) if date_to else None

    # Generate report
    report = await report_service.generate_user_report(
        user_id=user_id,
        from_date=from_date,
        to_date=to_date
    )

    return {
        "user_id": user_id,
        "period": {
            "from": from_date.isoformat() if from_date else None,
            "to": to_date.isoformat() if to_date else None
        },
        "documents_created": report.documents_created,
        "documents_viewed": report.documents_viewed,
        "total_activity_time": report.total_activity_time,
        "most_active_day": report.most_active_day.isoformat() if report.most_active_day else None
    }

🧪 Testing FastMCP Integration

Unit Testing Tools

import pytest
from injectq.integrations.fastmcp import setup_fastmcp_integration

@pytest.fixture
def test_mcp():
    # Create test container
    container = InjectQ()
    container.bind(IDocumentService, MockDocumentService())
    container.bind(IUserService, MockUserService())

    # Create test MCP server
    mcp = FastMCP("Test Server")
    setup_fastmcp_integration(mcp, container)

    return mcp

def test_search_documents_tool(test_mcp):
    # Define test tool
    @test_mcp.tool()
    async def search_documents(
        query: str,
        doc_service: IDocumentService = InjectQDependency(IDocumentService)
    ):
        results = await doc_service.search_documents(query, user_id=1, limit=5)
        return {"results": results, "count": len(results)}

    # Mock the request context
    with test_mcp.test_context():
        # Execute tool
        result = await test_mcp.call_tool("search_documents", {"query": "test"})

        # Verify result
        assert "results" in result
        assert "count" in result
        assert result["count"] > 0

def test_request_scoping(test_mcp):
    # Define tool with scoped service
    @test_mcp.tool()
    async def scoped_tool(
        data: str,
        ctx: RequestContext = InjectQDependency(RequestContext)
    ):
        ctx.set_metadata("input", data)
        return ctx.metadata

    # Execute multiple requests
    with test_mcp.test_context():
        result1 = await test_mcp.call_tool("scoped_tool", {"data": "test1"})
        result2 = await test_mcp.call_tool("scoped_tool", {"data": "test2"})

    # Each request should have its own context
    assert result1["input"] == "test1"
    assert result2["input"] == "test2"

Integration Testing

@pytest.fixture
def integration_mcp():
    # Real container with test database
    container = InjectQ()
    container.install(TestDatabaseModule())
    container.install(DocumentModule())
    container.install(AuthModule())

    mcp = FastMCP("Integration Test Server")
    setup_fastmcp_integration(mcp, container)

    return mcp

def test_document_creation_integration(integration_mcp):
    # Define integration tool
    @integration_mcp.tool()
    async def create_test_document(
        title: str,
        content: str,
        doc_service: IDocumentService = InjectQDependency(IDocumentService)
    ):
        document = await doc_service.create_document(title, content)
        return {
            "id": document.id,
            "title": document.title,
            "content_length": len(document.content)
        }

    # Execute tool
    with integration_mcp.test_context():
        result = await integration_mcp.call_tool("create_test_document", {
            "title": "Test Document",
            "content": "This is a test document content."
        })

    # Verify result
    assert result["title"] == "Test Document"
    assert result["content_length"] == len("This is a test document content.")
    assert "id" in result

def test_error_handling_integration(integration_mcp):
    # Define tool that might fail
    @integration_mcp.tool()
    async def risky_tool(
        doc_id: int,
        doc_service: IDocumentService = InjectQDependency(IDocumentService)
    ):
        document = await doc_service.get_document(doc_id)
        if not document:
            raise ValueError("Document not found")
        return {"title": document.title}

    # Test successful case
    with integration_mcp.test_context():
        # First create a document
        create_result = await integration_mcp.call_tool("create_test_document", {
            "title": "Test",
            "content": "Content"
        })

        # Then retrieve it
        result = await integration_mcp.call_tool("risky_tool", {
            "doc_id": create_result["id"]
        })

        assert result["title"] == "Test"

    # Test error case
    with integration_mcp.test_context():
        with pytest.raises(ValueError, match="Document not found"):
            await integration_mcp.call_tool("risky_tool", {"doc_id": 99999})

Mock Testing

class MockDocumentService:
    def __init__(self):
        self.documents = {}
        self.next_id = 1

    async def search_documents(self, query: str, user_id: int, limit: int):
        # Simple mock search
        results = [
            doc for doc in self.documents.values()
            if query.lower() in doc.title.lower() or query.lower() in doc.content.lower()
        ]
        return results[:limit]

    async def create_document(self, title: str, content: str):
        doc_id = self.next_id
        self.next_id += 1

        document = Document(
            id=doc_id,
            title=title,
            content=content,
            user_id=1,  # Mock user
            created_at=datetime.now()
        )

        self.documents[doc_id] = document
        return document

    async def get_document(self, doc_id: int):
        return self.documents.get(doc_id)

class MockUserService:
    def __init__(self):
        self.current_user = User(id=1, name="Test User", email="test@example.com")

    def get_current_user(self):
        return self.current_user

    def get_user(self, user_id: int):
        if user_id == 1:
            return self.current_user
        return None

def test_with_mocks():
    container = InjectQ()
    mock_doc = MockDocumentService()
    mock_user = MockUserService()

    container.bind(IDocumentService, mock_doc)
    container.bind(IUserService, mock_user)

    mcp = FastMCP("Mock Test Server")
    setup_fastmcp_integration(mcp, container)

    @mcp.tool()
    async def test_tool(
        title: str,
        doc_service: IDocumentService = InjectQDependency(IDocumentService)
    ):
        doc = await doc_service.create_document(title, "Test content")
        return {"created_id": doc.id, "documents_count": len(mock_doc.documents)}

    # Execute tool
    with mcp.test_context():
        result = await mcp.call_tool("test_tool", {"title": "Mock Test"})

    # Verify mock interactions
    assert result["created_id"] == 1
    assert result["documents_count"] == 1
    assert len(mock_doc.documents) == 1

🚨 Common Patterns and Pitfalls

✅ Good Patterns

1. Proper Request Scoping

# ✅ Good: Use scoped for request-specific data
@scoped
class RequestContext:
    def __init__(self):
        self.request_id = str(uuid.uuid4())
        self.user = None
        self.metadata = {}

# ✅ Good: Use singleton for shared resources
@singleton
class DatabasePool:
    def __init__(self):
        self.pool = create_database_pool()

# ✅ Good: Use transient for stateless operations
@transient
class DataValidator:
    def validate(self, data: dict) -> bool:
        return validate_schema(data)

2. Error Handling

# ✅ Good: Handle tool errors gracefully
@mcp.tool()
async def safe_tool_operation(
    data: dict,
    service: IService = InjectQDependency(IService),
    logger: ILogger = InjectQDependency(ILogger)
):
    try:
        result = await service.process_data(data)
        return result
    except ValidationError as e:
        logger.error(f"Validation failed: {e}")
        return {"error": "Invalid data", "details": str(e)}
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        return {"error": "Internal server error"}

3. Permission Checking

# ✅ Good: Check permissions before operations
@mcp.tool()
async def secure_document_operation(
    doc_id: int,
    operation: str,
    auth_service: IAuthService = InjectQDependency(IAuthService),
    doc_service: IDocumentService = InjectQDependency(IDocumentService)
):
    # Check permissions first
    if not auth_service.has_permission(f"document.{operation}"):
        raise ValueError(f"Insufficient permissions for {operation}")

    # Perform operation
    if operation == "read":
        return await doc_service.get_document(doc_id)
    elif operation == "update":
        return await doc_service.update_document(doc_id, "New Title", "New Content")
    else:
        raise ValueError(f"Unknown operation: {operation}")

❌ Bad Patterns

1. Manual Container Access

# ❌ Bad: Manual container access in tools
container = InjectQ()  # Global container

@mcp.tool()
async def manual_tool(user_id: int):
    user_service = container.get(IUserService)  # Manual resolution
    return user_service.get_user(user_id)

# ✅ Good: Use dependency injection
@mcp.tool()
async def injected_tool(
    user_id: int,
    user_service: IUserService = InjectQDependency(IUserService)
):
    return user_service.get_user(user_id)

2. Singleton Abuse

# ❌ Bad: Singleton for request-specific state
@singleton
class RequestState:
    def __init__(self):
        self.current_request_data = None  # Shared across requests!

    def set_request_data(self, data):
        self.current_request_data = data  # Overwrites other requests!

# ❌ Bad: Singleton for mutable request data
@singleton
class RequestMetrics:
    def __init__(self):
        self.request_count = 0  # Accumulates across all requests

    def increment_request_count(self):
        self.request_count += 1  # Not request-specific

# ✅ Good: Scoped for request-specific data
@scoped
class RequestState:
    def __init__(self):
        self.request_data = None

@scoped
class RequestMetrics:
    def __init__(self):
        self.operations = []

3. Heavy Operations in Tools

# ❌ Bad: Heavy initialization per request
@mcp.tool()
async def heavy_tool(data: dict):
    # Load model on every request
    model = await load_ml_model()  # 2GB model!
    result = model.predict(data)
    return result

# ✅ Good: Pre-load heavy resources
@singleton
class MLModelService:
    def __init__(self):
        self.model = None

    async def initialize(self):
        if self.model is None:
            self.model = await load_ml_model()

    async def predict(self, data: dict):
        await self.initialize()
        return self.model.predict(data)

@mcp.tool()
async def light_tool(
    data: dict,
    ml_service: MLModelService = InjectQDependency(MLModelService)
):
    return await ml_service.predict(data)

⚡ Advanced Features

Custom MCP Middleware

from injectq.integrations.fastmcp import FastMCPMiddleware

class MetricsMiddleware(FastMCPMiddleware):
    def __init__(self, metrics_service: IMetricsService):
        self.metrics = metrics_service

    async def before_tool_call(self, tool_name, args):
        # Record tool call start
        self.metrics.increment("tool_calls_started")
        self.metrics.increment(f"tool_{tool_name}_calls")

    async def after_tool_call(self, tool_name, args, result, duration):
        # Record tool call completion
        self.metrics.histogram("tool_call_duration", duration)
        self.metrics.increment("tool_calls_completed")

    async def on_tool_error(self, tool_name, args, error):
        # Record tool call failure
        self.metrics.increment("tool_calls_failed")
        self.metrics.increment(f"tool_error_{type(error).__name__}")

# Use custom middleware
setup_fastmcp_integration(
    mcp,
    container,
    middlewares=[MetricsMiddleware(metrics_service)]
)

Tool Result Caching

@scoped
class ToolCache:
    def __init__(self):
        self.cache = {}

    def get(self, key: str):
        return self.cache.get(key)

    def set(self, key: str, value: Any, ttl: int = 300):
        self.cache[key] = {
            "value": value,
            "expires_at": time.time() + ttl
        }

    def is_expired(self, key: str) -> bool:
        if key not in self.cache:
            return True
        return time.time() > self.cache[key]["expires_at"]

@mcp.tool()
async def cached_search_documents(
    query: str,
    cache: ToolCache = InjectQDependency(ToolCache),
    doc_service: IDocumentService = InjectQDependency(IDocumentService)
):
    """Search documents with caching."""
    cache_key = f"search:{query}"

    # Check cache first
    if not cache.is_expired(cache_key):
        return cache.get(cache_key)["value"]

    # Perform search
    results = await doc_service.search_documents(query, user_id=1, limit=10)

    # Cache results
    cache.set(cache_key, results)

    return results

Tool Composition

@mcp.tool()
async def complex_workflow(
    input_data: dict,
    validator: IDataValidator = InjectQDependency(IDataValidator),
    processor: IDataProcessor = InjectQDependency(IDataProcessor),
    formatter: IDataFormatter = InjectQDependency(IDataFormatter)
):
    """Complex workflow combining multiple services."""
    # Step 1: Validate input
    validation_result = validator.validate(input_data)
    if not validation_result.valid:
        return {"error": "Validation failed", "details": validation_result.errors}

    # Step 2: Process data
    processed_data = await processor.process_data(input_data)

    # Step 3: Format output
    formatted_result = formatter.format_data(processed_data)

    return {
        "success": True,
        "original_input": input_data,
        "processed_data": processed_data,
        "formatted_result": formatted_result
    }

@mcp.tool()
async def batch_operation(
    items: List[dict],
    batch_processor: IBatchProcessor = InjectQDependency(IBatchProcessor)
):
    """Process multiple items in batch."""
    # Process items in parallel
    results = await batch_processor.process_batch(items)

    # Group results
    successful = [r for r in results if r.success]
    failed = [r for r in results if not r.success]

    return {
        "total_items": len(items),
        "successful": len(successful),
        "failed": len(failed),
        "results": results
    }

🎯 Summary

FastMCP integration provides:

  • Automatic dependency injection - No manual container management in tools
  • Request-scoped services - Proper isolation per MCP request
  • Type-driven injection - Just add type hints to tool parameters
  • Framework lifecycle integration - Automatic cleanup and resource management
  • Testing support - Easy mocking and test isolation

Key features: - Seamless integration with FastMCP's tool system - Support for all InjectQ scopes (singleton, scoped, transient) - Request-scoped container access - Custom middleware support - Tool result caching - Tool composition patterns

Best practices: - Use scoped services for request-specific data - Use singleton for shared resources and heavy objects - Use transient for stateless operations - Handle errors gracefully in tools - Check permissions before operations - Test thoroughly with mocked dependencies - Avoid manual container access in tools

Congratulations! You've completed the framework integrations section. Ready to explore testing utilities?