Background Tasks
A robust background task management system for Nexios applications, providing a simple yet powerful way to run, monitor, and manage asynchronous tasks.
This system allows you to execute time-consuming operations without blocking your API responses, making your application more responsive and scalable.
Features
Key capabilities that make background task management efficient and reliable:
- 🚀 Simple task creation and management
- 🔄 Built-in task status tracking
- ⏱️ Timeout and cancellation support
- 🔌 Seamless integration with Nexios dependency injection
- 📊 Progress tracking and result handling
- 🧵 Thread-safe task management
- 📝 Comprehensive logging
Installation
Install the nexios-contrib package to access background task functionality:
pip install nexios-contribBasic Usage
Follow these steps to integrate background tasks into your Nexios application:
1. Set Up Your Application
Initialize the task manager in your application to enable background task functionality:
from nexios import NexiosApp
from nexios_contrib.tasks import setup_tasks, create_task
app = NexiosApp()
# Initialize the task manager
task_manager = setup_tasks(app)2. Define a Background Task
Create async functions that will run as background tasks:
import asyncio
async def process_data(data: dict) -> dict:
"""A sample background task that processes data."""
await asyncio.sleep(2) # Simulate work
return {"status": "completed", "data": data}3. Create and Run a Task
Start background tasks from your API endpoints and return immediately:
from nexios.http import Request, Response
@app.post("/process")
async def start_processing(request: Request, response: Response) -> dict:
"""Start a background processing task."""
data = await request.json
task = await create_task(
request=request,
func=process_data,
data=data,
name="data_processing"
)
return {"task_id": task.id}4. Check Task Status
Monitor task progress and retrieve results through status endpoints:
from nexios_contrib.tasks import TaskStatus
@app.get("/status/{task_id}")
async def get_status(request: Request, response: Response, task_id: str) -> dict:
"""Get the status of a background task."""
task = task_manager.get_task(task_id)
if not task:
return {"error": "Task not found"}, 404
return {
"task_id": task.id,
"status": task.status.value,
"result": task.result if task.status == TaskStatus.COMPLETED else None,
"error": str(task.error) if task.error else None
}Using with Dependency Injection
Nexios Tasks integrates seamlessly with Nexios's dependency injection system for a more elegant solution.
Leverage dependency injection to simplify task creation and management in your endpoints.
Create a Task with Dependencies
Use TaskDependency to inject task management capabilities directly into your handlers:
from nexios_contrib.tasks import TaskDependency
@app.post("/process-with-deps")
async def start_processing_with_deps(
request: Request,
response: Response,
task_dep = TaskDependency()
) -> dict:
"""Start a task with dependencies."""
data = await request.json
task = await task_dep.create(
func=process_with_deps,
data=data,
name="data_processing_with_deps"
)
return {"task_id": task.id}Task Management
Administrative operations for monitoring and controlling background tasks.
Listing All Tasks
Retrieve information about all tasks in the system:
@app.get("/tasks")
async def list_tasks(request: Request, response: Response) -> list:
"""List all tasks."""
return [
{
"id": task.id,
"name": task.name,
"status": task.status.value,
"created_at": task.created_at.isoformat(),
"completed_at": task.completed_at.isoformat() if task.completed_at else None
}
for task in task_manager.list_tasks()
]Canceling a Task
Stop running tasks when they're no longer needed:
@app.post("/tasks/{task_id}/cancel")
async def cancel_task(request: Request, response: Response, task_id: str) -> dict:
"""Cancel a running task."""
success = await task_manager.cancel_task(task_id)
return {"success": success, "task_id": task_id}Configuration
Customize the task manager behavior to match your application's requirements:
Customize the task manager with a configuration object:
from nexios_contrib.tasks import TaskConfig, setup_tasks
config = TaskConfig(
max_concurrent_tasks=50, # Maximum number of concurrent tasks
default_timeout=300, # Default timeout in seconds (5 minutes)
task_result_ttl=86400, # How long to keep task results (24 hours)
enable_task_history=True, # Whether to keep completed tasks in history
log_level="INFO" # Logging level
)
task_manager = setup_tasks(app, config=config)Task Status and Lifecycle
Understanding how tasks progress through different states and accessing their properties.
Task Status Enum
Available status values that indicate the current state of a task:
from nexios_contrib.tasks import TaskStatus
# Available task statuses:
TaskStatus.PENDING # Task created but not started
TaskStatus.RUNNING # Task is currently executing
TaskStatus.COMPLETED # Task finished successfully
TaskStatus.FAILED # Task failed with an error
TaskStatus.CANCELLED # Task was cancelled
TaskStatus.TIMEOUT # Task exceeded timeout limitTask Lifecycle
Access detailed information about task execution and timing:
from nexios_contrib.tasks import Task
# Task properties
task = task_manager.get_task(task_id)
print(f"Task ID: {task.id}")
print(f"Name: {task.name}")
print(f"Status: {task.status}")
print(f"Created: {task.created_at}")
print(f"Started: {task.started_at}")
print(f"Completed: {task.completed_at}")
print(f"Result: {task.result}")
print(f"Error: {task.error}")
print(f"Progress: {task.progress}")Advanced Features
Enhanced capabilities for complex task management scenarios.
Progress Tracking
Monitor and report progress for long-running tasks:
from nexios_contrib.tasks import update_task_progress
async def long_running_task(task_id: str, items: list) -> dict:
"""A task that reports progress."""
total_items = len(items)
processed = 0
for item in items:
# Process item
await process_item(item)
processed += 1
# Update progress
progress = (processed / total_items) * 100
await update_task_progress(task_id, progress)
return {"processed": processed, "total": total_items}
@app.get("/tasks/{task_id}/progress")
async def get_task_progress(request: Request, response: Response, task_id: str):
"""Get task progress."""
task = task_manager.get_task(task_id)
if not task:
return {"error": "Task not found"}, 404
return {
"task_id": task_id,
"progress": task.progress,
"status": task.status.value
}Error Handling
Properly handle and respond to task failures and exceptions.
Handle task errors by checking the task's error attribute:
@app.get("/task-result/{task_id}")
async def get_task_result(request: Request, response: Response, task_id: str):
"""Get the result of a completed task."""
task = task_manager.get_task(task_id)
if not task:
return {"error": "Task not found"}, 404
if task.error:
return {
"error": "Task failed",
"message": str(task.error),
"traceback": task.error.traceback if hasattr(task.error, 'traceback') else None
}, 500
return {"result": task.result}Custom Error Handling
Define and handle application-specific task errors:
from nexios_contrib.tasks import TaskError
async def task_with_custom_error(data: dict) -> dict:
"""A task that raises custom errors."""
if not data.get("valid"):
raise TaskError(
code="INVALID_DATA",
message="Data validation failed",
details={"received": data}
)
return {"processed": data}
@app.exception_handler(TaskError)
async def handle_task_error(request, exc):
"""Handle custom task errors."""
return {
"error": exc.code,
"message": exc.message,
"details": exc.details
}, 400Examples
Real-world implementations demonstrating common background task patterns.
File Processing Service
Process files asynchronously with progress tracking:
from nexios import NexiosApp
from nexios_contrib.tasks import setup_tasks, create_task
import asyncio
import os
app = NexiosApp()
task_manager = setup_tasks(app)
async def process_file(file_path: str, task_id: str) -> dict:
"""Process a file in the background."""
from nexios_contrib.tasks import update_task_progress
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
file_size = os.path.getsize(file_path)
processed_bytes = 0
# Simulate file processing
with open(file_path, 'rb') as f:
while True:
chunk = f.read(1024) # Read 1KB chunks
if not chunk:
break
# Simulate processing time
await asyncio.sleep(0.1)
processed_bytes += len(chunk)
progress = (processed_bytes / file_size) * 100
await update_task_progress(task_id, progress)
return {
"file_path": file_path,
"file_size": file_size,
"processed_bytes": processed_bytes,
"status": "completed"
}
@app.post("/process-file")
async def start_file_processing(request: Request, response: Response):
"""Start file processing task."""
data = await request.json()
file_path = data.get("file_path")
if not file_path:
return {"error": "file_path is required"}, 400
task = await create_task(
request=request,
func=process_file,
file_path=file_path,
name=f"process_file_{os.path.basename(file_path)}"
)
return {"task_id": task.id, "file_path": file_path}Email Sending Service
Send bulk emails without blocking API responses:
from nexios_contrib.tasks import setup_tasks, create_task
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
app = NexiosApp()
task_manager = setup_tasks(app)
async def send_bulk_emails(recipients: list, subject: str, body: str, task_id: str) -> dict:
"""Send emails to multiple recipients."""
from nexios_contrib.tasks import update_task_progress
total_recipients = len(recipients)
sent_count = 0
failed_count = 0
for i, recipient in enumerate(recipients):
try:
# Send email (implement your email sending logic)
await send_single_email(recipient, subject, body)
sent_count += 1
except Exception as e:
print(f"Failed to send email to {recipient}: {e}")
failed_count += 1
# Update progress
progress = ((i + 1) / total_recipients) * 100
await update_task_progress(task_id, progress)
return {
"total_recipients": total_recipients,
"sent_count": sent_count,
"failed_count": failed_count,
"status": "completed"
}
async def send_single_email(recipient: str, subject: str, body: str):
"""Send a single email."""
# Implement your email sending logic here
await asyncio.sleep(0.5) # Simulate email sending delay
@app.post("/send-bulk-emails")
async def start_bulk_email_sending(request: Request, response: Response):
"""Start bulk email sending task."""
data = await request.json()
task = await create_task(
request=request,
func=send_bulk_emails,
recipients=data["recipients"],
subject=data["subject"],
body=data["body"],
name="bulk_email_sending"
)
return {"task_id": task.id, "recipient_count": len(data["recipients"])}Data Export Service
Generate and export large datasets in various formats:
from nexios_contrib.tasks import setup_tasks, create_task
import csv
import json
from datetime import datetime
app = NexiosApp()
task_manager = setup_tasks(app)
async def export_data(format: str, filters: dict, task_id: str) -> dict:
"""Export data in specified format."""
from nexios_contrib.tasks import update_task_progress
# Fetch data (implement your data fetching logic)
data = await fetch_data_from_database(filters)
total_records = len(data)
# Generate filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"export_{timestamp}.{format}"
if format == "csv":
await export_to_csv(data, filename, task_id)
elif format == "json":
await export_to_json(data, filename, task_id)
else:
raise ValueError(f"Unsupported format: {format}")
return {
"filename": filename,
"format": format,
"total_records": total_records,
"status": "completed"
}
async def fetch_data_from_database(filters: dict) -> list:
"""Fetch data from database."""
# Implement your database query logic
await asyncio.sleep(1) # Simulate database query
return [{"id": i, "name": f"Record {i}"} for i in range(1000)]
async def export_to_csv(data: list, filename: str, task_id: str):
"""Export data to CSV format."""
from nexios_contrib.tasks import update_task_progress
with open(filename, 'w', newline='') as csvfile:
if data:
fieldnames = data[0].keys()
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for i, row in enumerate(data):
writer.writerow(row)
# Update progress every 100 records
if i % 100 == 0:
progress = (i / len(data)) * 100
await update_task_progress(task_id, progress)
async def export_to_json(data: list, filename: str, task_id: str):
"""Export data to JSON format."""
from nexios_contrib.tasks import update_task_progress
with open(filename, 'w') as jsonfile:
json.dump(data, jsonfile, indent=2)
await update_task_progress(task_id, 100)
@app.post("/export-data")
async def start_data_export(request: Request, response: Response):
"""Start data export task."""
data = await request.json()
task = await create_task(
request=request,
func=export_data,
format=data.get("format", "csv"),
filters=data.get("filters", {}),
name=f"data_export_{data.get('format', 'csv')}"
)
return {"task_id": task.id, "format": data.get("format", "csv")}Best Practices
Guidelines for building reliable and maintainable background task systems.
- Task Granularity: Keep tasks focused on a single responsibility
- Error Handling: Always implement proper error handling in your tasks
- Timeouts: Set appropriate timeouts for your tasks
- Resource Management: Clean up resources in a
finallyblock - Logging: Use the provided logger for task-related logs
- Progress Updates: Update progress for long-running tasks
- Testing: Write comprehensive tests for your background tasks
Production Configuration
Optimize task manager settings for production environments:
from nexios_contrib.tasks import TaskConfig, setup_tasks
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
# Production task configuration
config = TaskConfig(
max_concurrent_tasks=100,
default_timeout=1800, # 30 minutes
task_result_ttl=604800, # 7 days
enable_task_history=True,
log_level="INFO",
cleanup_interval=3600, # Clean up every hour
max_task_history=10000 # Keep last 10k completed tasks
)
task_manager = setup_tasks(app, config=config)Testing
Ensure your background tasks work correctly with comprehensive testing strategies.
Unit Testing Tasks
Test individual task functions in isolation:
import pytest
from nexios_contrib.tasks import TaskManager, TaskConfig
@pytest.fixture
def task_manager():
config = TaskConfig(max_concurrent_tasks=10)
return TaskManager(config)
@pytest.mark.asyncio
async def test_simple_task(task_manager):
async def simple_task(value: int) -> int:
return value * 2
task = await task_manager.create_task(
func=simple_task,
value=5,
name="test_task"
)
# Wait for completion
await task_manager.wait_for_task(task.id)
assert task.result == 10
assert task.status == TaskStatus.COMPLETED
@pytest.mark.asyncio
async def test_task_failure(task_manager):
async def failing_task():
raise ValueError("Test error")
task = await task_manager.create_task(
func=failing_task,
name="failing_task"
)
await task_manager.wait_for_task(task.id)
assert task.status == TaskStatus.FAILED
assert "Test error" in str(task.error)Integration Testing
Test the complete task workflow including API endpoints:
import pytest
from nexios.testing import TestClient
from nexios import NexiosApp
from nexios_contrib.tasks import setup_tasks
@pytest.fixture
def app():
app = NexiosApp()
setup_tasks(app)
@app.post("/test-task")
async def create_test_task(request, response):
from nexios_contrib.tasks import create_task
async def test_task(value: str) -> str:
return f"processed_{value}"
task = await create_task(
request=request,
func=test_task,
value="test",
name="integration_test"
)
return {"task_id": task.id}
return app
def test_task_creation(app):
client = TestClient(app)
response = client.post("/test-task")
assert response.status_code == 200
data = response.json()
assert "task_id" in dataBuilt with ❤️ by the @nexios-labs community.
