Taskiq Integration¶
Taskiq integration enables dependency injection for background tasks and workers, providing automatic service resolution with proper task scoping and lifecycle management.
🎯 Getting Started¶
Basic Setup¶
from taskiq import TaskiqScheduler
from injectq import InjectQ
from injectq.integrations.taskiq import setup_taskiq_integration, InjectQDependency
# 1. Create container and bind services
container = InjectQ()
container.bind(IEmailService, EmailService())
container.bind(IUserService, UserService())
container.bind(INotificationService, NotificationService())
# 2. Create Taskiq scheduler
scheduler = TaskiqScheduler()
# 3. Set up integration
setup_taskiq_integration(scheduler, container)
# 4. Use dependency injection in tasks
@scheduler.task
async def send_welcome_email(
user_id: int,
email_service: IEmailService = InjectQDependency(IEmailService),
user_service: IUserService = InjectQDependency(IUserService)
):
user = user_service.get_user(user_id)
await email_service.send_welcome_email(user.email)
@scheduler.task
async def process_order(
order_id: int,
notification_svc: INotificationService = InjectQDependency(INotificationService)
):
# Process order logic
await notification_svc.send_order_confirmation(order_id)
# 5. Schedule tasks
await scheduler.schedule_task(send_welcome_email, user_id=123)
await scheduler.schedule_task(process_order, order_id=456)
Service Definitions¶
from typing import Protocol
# Define service interfaces
class IEmailService(Protocol):
async def send_welcome_email(self, email: str) -> None: ...
async def send_order_confirmation(self, email: str, order_id: int) -> None: ...
class IUserService(Protocol):
def get_user(self, user_id: int) -> User: ...
def update_user_status(self, user_id: int, status: str) -> None: ...
class INotificationService(Protocol):
async def send_order_confirmation(self, order_id: int) -> None: ...
async def send_payment_failed(self, user_id: int) -> None: ...
# Implement services
class EmailService:
def __init__(self, smtp_config: SMTPConfig):
self.smtp_config = smtp_config
async def send_welcome_email(self, email: str) -> None:
# Send welcome email logic
print(f"Sending welcome email to {email}")
async def send_order_confirmation(self, email: str, order_id: int) -> None:
# Send order confirmation logic
print(f"Sending order confirmation to {email} for order {order_id}")
class UserService:
def __init__(self, db: IDatabaseConnection):
self.db = db
def get_user(self, user_id: int) -> User:
return self.db.query(User).filter(id=user_id).first()
def update_user_status(self, user_id: int, status: str) -> None:
user = self.get_user(user_id)
user.status = status
self.db.commit()
class NotificationService:
def __init__(self, email_svc: IEmailService, user_svc: IUserService):
self.email_svc = email_svc
self.user_svc = user_svc
async def send_order_confirmation(self, order_id: int) -> None:
# Get order and user
order = self.db.get_order(order_id)
user = self.user_svc.get_user(order.user_id)
# Send notification
await self.email_svc.send_order_confirmation(user.email, order_id)
async def send_payment_failed(self, user_id: int) -> None:
user = self.user_svc.get_user(user_id)
await self.email_svc.send_payment_failed(user.email)
🔧 Advanced Configuration¶
Task-Scoped Services¶
from injectq import scoped
@scoped
class TaskContext:
def __init__(self):
self.task_id = str(uuid.uuid4())
self.start_time = time.time()
self.metadata = {}
def set_metadata(self, key: str, value: Any):
self.metadata[key] = value
def get_duration(self) -> float:
return time.time() - self.start_time
@scoped
class TaskMetrics:
def __init__(self):
self.operations = []
self.errors = []
def record_operation(self, operation: str, duration: float):
self.operations.append({
"operation": operation,
"duration": duration,
"timestamp": time.time()
})
def record_error(self, error: str):
self.errors.append({
"error": error,
"timestamp": time.time()
})
# Use in tasks
@scheduler.task
async def complex_task(
data: dict,
ctx: TaskContext = InjectQDependency(TaskContext),
metrics: TaskMetrics = InjectQDependency(TaskMetrics),
processor: IDataProcessor = InjectQDependency(IDataProcessor)
):
ctx.set_metadata("input_size", len(data))
try:
# Process data with metrics
start_time = time.time()
result = await processor.process_data(data)
duration = time.time() - start_time
metrics.record_operation("process_data", duration)
return result
except Exception as e:
metrics.record_error(str(e))
raise
Module-Based Setup¶
from injectq import Module
class TaskModule(Module):
def configure(self, binder):
# Task-specific services
binder.bind(IEmailService, EmailService())
binder.bind(IUserService, UserService())
binder.bind(INotificationService, NotificationService())
# Task context services
binder.bind(TaskContext, TaskContext())
binder.bind(TaskMetrics, TaskMetrics())
# Data processors
binder.bind(IDataProcessor, DataProcessor())
class InfrastructureModule(Module):
def configure(self, binder):
# Database and external services
binder.bind(IDatabaseConnection, PostgresConnection())
binder.bind(SMTPConfig, SMTPConfig.from_env())
def create_taskiq_scheduler() -> TaskiqScheduler:
# Create container with modules
container = InjectQ()
container.install(InfrastructureModule())
container.install(TaskModule())
# Create scheduler
scheduler = TaskiqScheduler()
# Set up integration
setup_taskiq_integration(scheduler, container)
return scheduler
# Usage
scheduler = create_taskiq_scheduler()
🎨 Task Patterns¶
Background Email Tasks¶
@scheduler.task
async def send_bulk_emails(
user_ids: List[int],
template: str,
email_service: IEmailService = InjectQDependency(IEmailService),
user_service: IUserService = InjectQDependency(IUserService)
):
"""Send emails to multiple users."""
for user_id in user_ids:
user = user_service.get_user(user_id)
await email_service.send_template_email(
user.email,
template,
{"name": user.name}
)
@scheduler.task
async def send_reminder_emails(
reminder_type: str,
email_service: IEmailService = InjectQDependency(IEmailService),
user_service: IUserService = InjectQDependency(IUserService)
):
"""Send reminder emails based on type."""
users = user_service.get_users_due_for_reminder(reminder_type)
for user in users:
await email_service.send_reminder_email(
user.email,
reminder_type
)
# Schedule recurring tasks
await scheduler.schedule_task(
send_reminder_emails,
reminder_type="payment_due",
cron="0 9 * * *" # Daily at 9 AM
)
Data Processing Tasks¶
@scheduler.task
async def process_user_data(
user_id: int,
data_type: str,
processor: IDataProcessor = InjectQDependency(IDataProcessor),
storage: IDataStorage = InjectQDependency(IDataStorage),
metrics: TaskMetrics = InjectQDependency(TaskMetrics)
):
"""Process user data in background."""
try:
# Get user data
raw_data = await storage.get_user_data(user_id, data_type)
# Process data
start_time = time.time()
processed_data = await processor.process_user_data(raw_data)
processing_time = time.time() - start_time
metrics.record_operation("process_user_data", processing_time)
# Store processed data
await storage.store_processed_data(user_id, data_type, processed_data)
except Exception as e:
metrics.record_error(f"Failed to process user data: {e}")
raise
@scheduler.task
async def cleanup_old_data(
days_old: int = 30,
storage: IDataStorage = InjectQDependency(IDataStorage)
):
"""Clean up old processed data."""
cutoff_date = datetime.now() - timedelta(days=days_old)
deleted_count = await storage.cleanup_old_data(cutoff_date)
print(f"Cleaned up {deleted_count} old data records")
Notification Tasks¶
@scheduler.task
async def send_order_notifications(
order_id: int,
notification_svc: INotificationService = InjectQDependency(INotificationService),
user_svc: IUserService = InjectQDependency(IUserService)
):
"""Send notifications for order events."""
order = user_svc.get_order(order_id)
# Send to customer
await notification_svc.send_order_confirmation(order_id)
# Send to admin if high value
if order.total > 1000:
await notification_svc.send_high_value_order_alert(order_id)
@scheduler.task
async def send_payment_reminders(
user_id: int,
amount: float,
due_date: str,
notification_svc: INotificationService = InjectQDependency(INotificationService)
):
"""Send payment reminder notifications."""
await notification_svc.send_payment_reminder(user_id, amount, due_date)
# Chain tasks together
@scheduler.task
async def process_payment_and_notify(
payment_data: dict,
payment_svc: IPaymentService = InjectQDependency(IPaymentService),
notification_svc: INotificationService = InjectQDependency(INotificationService)
):
"""Process payment and send notifications."""
# Process payment
result = await payment_svc.process_payment(payment_data)
if result.success:
# Send success notification
await notification_svc.send_payment_success(
result.user_id,
result.amount
)
else:
# Send failure notification
await notification_svc.send_payment_failed(result.user_id)
return result
🧪 Testing Taskiq Integration¶
Unit Testing Tasks¶
import pytest
from injectq.integrations.taskiq import setup_taskiq_integration
@pytest.fixture
def test_scheduler():
# Create test container
container = InjectQ()
container.bind(IEmailService, MockEmailService())
container.bind(IUserService, MockUserService())
# Create test scheduler
scheduler = TaskiqScheduler()
setup_taskiq_integration(scheduler, container)
return scheduler
def test_send_welcome_email_task(test_scheduler):
# Define test task
@test_scheduler.task
async def send_welcome_email(
user_id: int,
email_service: IEmailService = InjectQDependency(IEmailService),
user_service: IUserService = InjectQDependency(IUserService)
):
user = user_service.get_user(user_id)
await email_service.send_welcome_email(user.email)
return {"email": user.email}
# Execute task
result = await test_scheduler.execute_task(
send_welcome_email,
user_id=123
)
# Verify result
assert result["email"] == "user123@example.com"
# Verify mocks were called
email_service = test_scheduler.container.get(IEmailService)
user_service = test_scheduler.container.get(IUserService)
assert email_service.send_welcome_email_called
assert user_service.get_user_called
def test_task_scoping(test_scheduler):
# Define task with scoped service
@test_scheduler.task
async def scoped_task(
data: str,
ctx: TaskContext = InjectQDependency(TaskContext)
):
ctx.set_metadata("input", data)
return ctx.metadata
# Execute multiple tasks
result1 = await test_scheduler.execute_task(scoped_task, data="test1")
result2 = await test_scheduler.execute_task(scoped_task, data="test2")
# Each task should have its own context
assert result1["input"] == "test1"
assert result2["input"] == "test2"
Integration Testing¶
@pytest.fixture
def integration_scheduler():
# Real container with test database
container = InjectQ()
container.install(TestDatabaseModule())
container.install(EmailModule())
container.install(TaskModule())
scheduler = TaskiqScheduler()
setup_taskiq_integration(scheduler, container)
return scheduler
def test_email_task_integration(integration_scheduler):
# Define integration task
@integration_scheduler.task
async def send_user_notification(
user_id: int,
message: str,
email_service: IEmailService = InjectQDependency(IEmailService),
user_service: IUserService = InjectQDependency(IUserService)
):
user = user_service.get_user(user_id)
await email_service.send_notification(user.email, message)
return {"sent_to": user.email}
# Execute task
result = await integration_scheduler.execute_task(
send_user_notification,
user_id=123,
message="Welcome to our platform!"
)
# Verify result
assert "sent_to" in result
assert result["sent_to"].endswith("@example.com")
def test_task_error_handling(integration_scheduler):
# Define task that might fail
@integration_scheduler.task
async def risky_task(
user_id: int,
user_service: IUserService = InjectQDependency(IUserService)
):
user = user_service.get_user(user_id)
if user.status == "inactive":
raise ValueError("Cannot process inactive user")
return {"processed": user.id}
# Test successful case
result = await integration_scheduler.execute_task(risky_task, user_id=123)
assert result["processed"] == 123
# Test error case
with pytest.raises(ValueError, match="Cannot process inactive user"):
await integration_scheduler.execute_task(risky_task, user_id=456)
Mock Testing¶
class MockEmailService:
def __init__(self):
self.sent_emails = []
async def send_welcome_email(self, email: str):
self.sent_emails.append({
"type": "welcome",
"email": email,
"timestamp": time.time()
})
async def send_notification(self, email: str, message: str):
self.sent_emails.append({
"type": "notification",
"email": email,
"message": message,
"timestamp": time.time()
})
class MockUserService:
def __init__(self):
self.users = {
123: User(id=123, email="user123@example.com", status="active"),
456: User(id=456, email="user456@example.com", status="inactive")
}
def get_user(self, user_id: int) -> User:
return self.users.get(user_id)
def test_with_mocks():
container = InjectQ()
mock_email = MockEmailService()
mock_user = MockUserService()
container.bind(IEmailService, mock_email)
container.bind(IUserService, mock_user)
scheduler = TaskiqScheduler()
setup_taskiq_integration(scheduler, container)
@scheduler.task
async def test_task(
user_id: int,
email_service: IEmailService = InjectQDependency(IEmailService),
user_service: IUserService = InjectQDependency(IUserService)
):
user = user_service.get_user(user_id)
await email_service.send_welcome_email(user.email)
return len(mock_email.sent_emails)
# Execute task
result = await scheduler.execute_task(test_task, user_id=123)
# Verify mock interactions
assert result == 1
assert len(mock_email.sent_emails) == 1
assert mock_email.sent_emails[0]["email"] == "user123@example.com"
🚨 Common Patterns and Pitfalls¶
✅ Good Patterns¶
1. Proper Task Scoping¶
# ✅ Good: Use scoped for task-specific data
@scoped
class TaskProgress:
def __init__(self):
self.steps = []
self.current_step = 0
def record_step(self, step_name: str):
self.steps.append({
"name": step_name,
"timestamp": time.time()
})
self.current_step += 1
# ✅ Good: Use singleton for shared resources
@singleton
class DatabasePool:
def __init__(self):
self.pool = create_database_pool()
# ✅ Good: Use transient for stateless operations
@transient
class DataValidator:
def validate(self, data: dict) -> bool:
return validate_schema(data)
2. Error Handling¶
# ✅ Good: Handle task errors gracefully
@scheduler.task
async def process_with_error_handling(
data: dict,
processor: IDataProcessor = InjectQDependency(IDataProcessor),
logger: ILogger = InjectQDependency(ILogger)
):
try:
result = await processor.process_data(data)
return result
except ValidationError as e:
logger.error(f"Validation failed: {e}")
# Retry logic or dead letter queue
await handle_validation_error(data, e)
except Exception as e:
logger.error(f"Unexpected error: {e}")
# Alert system or manual intervention
await alert_system(f"Task failed: {e}")
raise
3. Task Dependencies¶
# ✅ Good: Chain related tasks
@scheduler.task
async def process_order(
order_id: int,
order_svc: IOrderService = InjectQDependency(IOrderService)
):
order = await order_svc.process_order(order_id)
return order
@scheduler.task
async def notify_order_processed(
order_id: int,
notification_svc: INotificationService = InjectQDependency(INotificationService)
):
await notification_svc.send_order_processed_notification(order_id)
# Chain tasks
order_result = await scheduler.execute_task(process_order, order_id=123)
await scheduler.execute_task(notify_order_processed, order_id=123)
❌ Bad Patterns¶
1. Manual Container Access¶
# ❌ Bad: Manual container access in tasks
container = InjectQ() # Global container
@scheduler.task
async def manual_task(user_id: int):
user_service = container.get(IUserService) # Manual resolution
return user_service.get_user(user_id)
# ✅ Good: Use dependency injection
@scheduler.task
async def injected_task(
user_id: int,
user_service: IUserService = InjectQDependency(IUserService)
):
return user_service.get_user(user_id)
2. Singleton Abuse¶
# ❌ Bad: Singleton for task-specific state
@singleton
class TaskState:
def __init__(self):
self.current_task_data = None # Shared across tasks!
def set_task_data(self, data):
self.current_task_data = data # Overwrites other tasks!
# ❌ Bad: Singleton for mutable task data
@singleton
class TaskMetrics:
def __init__(self):
self.task_count = 0 # Accumulates across all tasks
def increment_task_count(self):
self.task_count += 1 # Not task-specific
# ✅ Good: Scoped for task-specific data
@scoped
class TaskState:
def __init__(self):
self.task_data = None
@scoped
class TaskMetrics:
def __init__(self):
self.operations = []
3. Heavy Operations in Tasks¶
# ❌ Bad: Heavy initialization per task
@scheduler.task
async def heavy_task(data: dict):
# Load model on every task execution
model = await load_ml_model() # 2GB model!
result = model.predict(data)
return result
# ✅ Good: Pre-load heavy resources
@singleton
class MLModelService:
def __init__(self):
self.model = None
async def initialize(self):
if self.model is None:
self.model = await load_ml_model()
async def predict(self, data: dict):
await self.initialize()
return self.model.predict(data)
@scheduler.task
async def light_task(
data: dict,
ml_service: MLModelService = InjectQDependency(MLModelService)
):
return await ml_service.predict(data)
⚡ Advanced Features¶
Custom Task Middleware¶
from injectq.integrations.taskiq import TaskiqMiddleware
class MetricsMiddleware(TaskiqMiddleware):
def __init__(self, metrics_service: IMetricsService):
self.metrics = metrics_service
async def before_task(self, task_info):
# Record task start
self.metrics.increment("tasks_started")
task_info.start_time = time.time()
async def after_task(self, task_info, result):
# Record task completion
duration = time.time() - task_info.start_time
self.metrics.histogram("task_duration", duration)
self.metrics.increment("tasks_completed")
async def on_task_error(self, task_info, error):
# Record task failure
self.metrics.increment("tasks_failed")
self.metrics.increment(f"task_error_{type(error).__name__}")
# Use custom middleware
setup_taskiq_integration(
scheduler,
container,
middlewares=[MetricsMiddleware(metrics_service)]
)
Task Result Handling¶
@scheduler.task
async def process_with_result_handling(
data: dict,
processor: IDataProcessor = InjectQDependency(IDataProcessor)
):
result = await processor.process_data(data)
# Return structured result
return {
"task_id": str(uuid.uuid4()),
"processed_at": time.time(),
"input_size": len(data),
"output_size": len(result),
"result": result
}
# Handle task results
async def handle_task_result(task_result):
if task_result.success:
# Process successful result
data = task_result.result
print(f"Task completed: {data['task_id']}")
# Store result or trigger next task
await store_task_result(data)
else:
# Handle task failure
print(f"Task failed: {task_result.error}")
# Retry logic or error handling
if task_result.retry_count < 3:
await scheduler.retry_task(task_result.task_id)
else:
await handle_permanent_failure(task_result)
Cron Tasks¶
@scheduler.task
async def cleanup_expired_sessions(
session_svc: ISessionService = InjectQDependency(ISessionService)
):
"""Clean up expired user sessions."""
expired_count = await session_svc.cleanup_expired_sessions()
print(f"Cleaned up {expired_count} expired sessions")
@scheduler.task
async def generate_daily_reports(
report_svc: IReportService = InjectQDependency(IReportService)
):
"""Generate daily business reports."""
await report_svc.generate_daily_report()
print("Daily report generated")
@scheduler.task
async def send_reminders(
reminder_svc: IReminderService = InjectQDependency(IReminderService)
):
"""Send scheduled reminders."""
sent_count = await reminder_svc.send_pending_reminders()
print(f"Sent {sent_count} reminders")
# Schedule cron tasks
await scheduler.schedule_cron(
cleanup_expired_sessions,
cron="0 */6 * * *" # Every 6 hours
)
await scheduler.schedule_cron(
generate_daily_reports,
cron="0 2 * * *" # Daily at 2 AM
)
await scheduler.schedule_cron(
send_reminders,
cron="0 */2 * * *" # Every 2 hours
)
🎯 Summary¶
Taskiq integration provides:
- Automatic dependency injection - No manual container management in tasks
- Task-scoped services - Proper isolation per background task
- Type-driven injection - Just add type hints to task parameters
- Framework lifecycle integration - Automatic cleanup and resource management
- Testing support - Easy mocking and test isolation
Key features: - Seamless integration with Taskiq's task system - Support for all InjectQ scopes (singleton, scoped, transient) - Task-scoped container access - Custom middleware support - Cron task scheduling - Result handling and error recovery
Best practices: - Use scoped services for task-specific data - Use singleton for shared resources and heavy objects - Use transient for stateless operations - Handle errors gracefully in tasks - Test thoroughly with mocked dependencies - Avoid manual container access in tasks
Ready to explore FastMCP integration?