JSON-RPC Integration
Lightweight helpers for running JSON-RPC 2.0 over HTTP with Nexios.
What is JSON-RPC?
JSON-RPC (JavaScript Object Notation Remote Procedure Call) is a stateless, light-weight remote procedure call (RPC) protocol. It allows you to call methods on a remote server as if they were local functions. The protocol uses JSON for data encoding and is transport agnostic, though HTTP is the most common transport layer.
Key Benefits of JSON-RPC:
- Simplicity: Clean, minimal protocol with just request/response patterns
- Language Agnostic: Works with any programming language that supports JSON
- Stateless: Each request is independent, making it scalable and reliable
- Type Safety: Clear parameter and return type definitions
- Error Handling: Standardized error codes and messages
This contrib provides:
- JSON-RPC 2.0 Server Implementation: Full-featured server that handles method registration, request parsing, and response formatting
- JSON-RPC Client: Async client for making remote procedure calls with proper error handling
- Method Registry System: Decorator-based method registration with support for both sync and async functions
- Comprehensive Error Handling: Built-in error types following JSON-RPC 2.0 specification
- Zero External Dependencies: Uses only Python standard library for maximum compatibility
Installation
The JSON-RPC contrib is included with the main nexios_contrib package, making it easy to get started with remote procedure calls.
Install the JSON-RPC contrib with Nexios:
pip install nexios_contribWhy No External Dependencies? This contrib intentionally uses only Python standard library modules for several reasons:
- Reduced Dependency Conflicts: No version conflicts with your existing packages
- Faster Installation: No need to download additional dependencies
- Better Security: Fewer attack vectors from third-party packages
- Easier Deployment: Simpler containerization and deployment processes
- Long-term Stability: Less likely to break due to dependency updates
Quick Start
Basic Server Setup
Creating a JSON-RPC server with Nexios is straightforward. The server automatically handles request parsing, method dispatch, and response formatting.
# server.py
from nexios import NexiosApp
from nexios_contrib.jrpc.server import JsonRpcPlugin
from nexios_contrib.jrpc.registry import get_registry
def main():
app = NexiosApp()
registry = get_registry()
@registry.register("add")
def add(a: int, b: int) -> int:
"""Add two numbers together."""
return a + b
@registry.register("echo")
async def echo(msg: str) -> str:
"""Echo back the input message."""
return f"Server says: {msg}"
JsonRpcPlugin(app, {"path_prefix": "/rpc"})
app.run(host="0.0.0.0", port=3020)
if __name__ == "__main__":
main()View Complete Server Example with Detailed Comments
# server.py
from nexios import NexiosApp
from nexios_contrib.jrpc.server import JsonRpcPlugin
from nexios_contrib.jrpc.registry import get_registry
def main():
# Create Nexios app - this is your main application instance
app = NexiosApp()
# Get the global registry - this is a singleton that stores all RPC methods
# The registry acts as a central method dispatcher for JSON-RPC calls
registry = get_registry()
# Register a synchronous method using the decorator pattern
# The method name "add" will be the RPC method name clients call
@registry.register("add")
def add(a: int, b: int) -> int:
"""Add two numbers together.
This docstring becomes part of the method's metadata and can be
used for auto-generating API documentation.
Args:
a: First number to add
b: Second number to add
Returns:
The sum of a and b
"""
return a + b
# Register an asynchronous method - the registry handles both sync and async
# Async methods are useful for I/O operations like database calls or HTTP requests
@registry.register("echo")
async def echo(msg: str) -> str:
"""Echo back the input message with a server prefix.
This demonstrates how async methods work in the JSON-RPC system.
The registry will automatically await async functions.
"""
return f"Server says: {msg}"
# Mount the JSON-RPC plugin to your Nexios app
# This creates an HTTP endpoint that accepts JSON-RPC requests
# The plugin integrates with Nexios's routing and middleware system
JsonRpcPlugin(app, {"path_prefix": "/rpc"})
print("🚀 JSON-RPC Server starting on port 3020...")
print("📋 Available JSON-RPC methods:")
print(" - add(a, b) -> int")
print(" - echo(msg) -> str")
print("\n🔗 JSON-RPC endpoint: http://localhost:3020/rpc")
print("\n📖 Example JSON-RPC request:")
print(' POST /rpc')
print(' {"jsonrpc": "2.0", "method": "add", "params": {"a": 5, "b": 3}, "id": 1}')
# Start the server - this will block and serve requests
app.run(host="0.0.0.0", port=3020)
if __name__ == "__main__":
main()Understanding the Server Components:
- NexiosApp: The main application instance that handles HTTP requests and routing
- Registry: A method storage and dispatch system that maps method names to Python functions
- JsonRpcPlugin: The bridge between Nexios HTTP handling and JSON-RPC protocol
- Method Registration: The
@registry.register()decorator makes Python functions available as RPC methods - Path Prefix: The
/rpcendpoint where all JSON-RPC requests are sent
Basic Client Usage
The JSON-RPC client provides a simple async interface for calling remote methods. It handles connection management, request serialization, and error handling automatically.
# client.py
import asyncio
from nexios_contrib.jrpc.client import JsonRpcClient
async def main() -> None:
client = JsonRpcClient("http://localhost:3020/rpc")
result = await client.call("add", a=2, b=3)
print(f"Addition result: {result}") # Output: 5
result = await client.call("echo", msg="Hello, server!")
print(f"Echo result: {result}") # Output: Server says: Hello, server!
if __name__ == "__main__":
asyncio.run(main())View Complete Client Example with Detailed Explanations
# client.py
import asyncio
from nexios_contrib.jrpc.client import JsonRpcClient
async def main() -> None:
# Create a client connection to the JSON-RPC server
# Using async context manager ensures proper connection cleanup
client = JsonRpcClient("http://localhost:3020/rpc"):
# Call the 'add' method with named parameters
# This sends: {"jsonrpc": "2.0", "method": "add", "params": {"a": 2, "b": 3}, "id": <auto>}
result = await client.call("add", a=2, b=3)
print(f"Addition result: {result}") # Output: Addition result: 5
# Call the 'echo' method - demonstrates async method calls
# The client automatically handles the JSON-RPC protocol details
result = await client.call("echo", msg="Hello, server!")
print(f"Echo result: {result}") # Output: Echo result: Server says: Hello, server!
# Method call with positional arguments
# This creates: {"jsonrpc": "2.0", "method": "multiply", "params": [3.14, 2.0], "id": 2}
result = await client.call("multiply", 3.14, 2.0)
print(f"Multiplication: 3.14 * 2.0 = {result}")
# Method call with complex data structures
user_data = {
"name": "John Doe",
"email": "john@example.com",
"age": 30,
"preferences": {
"theme": "dark",
"notifications": True
}
}
result = await client.call("process_user_data", user_data=user_data)
print(f"Processed user: {result}")
if __name__ == "__main__":
asyncio.run(main())Understanding the Client:
- Method Calls:
client.call()translates Python function calls into JSON-RPC requests - Parameter Handling: Named parameters become the
paramsobject in the JSON-RPC request - Automatic ID Management: The client generates unique request IDs for tracking responses
- Response Parsing: The client automatically extracts the result from JSON-RPC response format
What Happens Under the Hood:
- Client serializes your method call into JSON-RPC 2.0 format
- HTTP POST request is sent to the server endpoint
- Server deserializes the request and calls the registered method
- Method result is serialized back into JSON-RPC response
- Client receives and deserializes the response, returlt
Server Implementation
Method Registration Deep Dive
The method registry is the heart of the JSON-RPC server. It manages method storage, validation, and execution.
from nexios_contrib.jrpc.registry import get_registry
# Get the global singleton registry instance
# This ensures all parts of your application share the same method registry
registry = get_registry()
# Register synchronous methods - these execute immediately and return results
@registry.register("multiply")
def multiply(x: float, y: float) -> float:
"""Multiply two numbers.
Synchronous methods are best for:
- Pure computations (math operations, data transformations)
- Quick operations that don't involve I/O
- Methods that don't need to wait for external resources
Args:
x: First number to multiply
y: Second number to multiply
Returns:
The product of x and y
"""
return x * y
# Register asynchronous methods - these can await other async operations
@registry.register("fetch_data")
async def fetch_data(url: str) -> dict:
"""Fetch data from a URL using async HTTP client.
Async methods are essential for:
- HTTP requests to external APIs
- Database operations
- File I/O operations
- Any operation that might block or take time
The registry automatically detects async functions and awaits them properly.
"""
import httpx
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
# Register with custom name - function name doesn't have to match RPC method name
@registry.register("get_time")
def current_timestamp() -> float:
"""Get current timestamp.
This demonstrates method name aliasing:
- Python function: current_timestamp()
- RPC method name: "get_time"
This is useful when:
- You want more descriptive Python function names
- You need to maintain backward compatibility
- You're wrapping existing functions with different naming conventions
"""
import time
return time.time()
# You can also register methods without decorators
def calculate_area(length: float, width: float) -> float:
"""Calculate rectangle area."""
return length * width
# Manual registration - useful for conditional registration or dynamic methods
registry.register_method("area", calculate_area)Registry Features Explained:
- Singleton Pattern:
get_registry()always returns the same instance, ensuring method consistency - Type Detection: The registry automatically detects sync vs async functions
- Method Validation: Ensures method names are unique and functions are callable
- Execution Context: Handles both sync and async execution contexts properly
- Method Metadata: Stores function signatures, docstrings, and type hints for introspection
Error Handling Deep Dive
Proper error handling is crucial for robust JSON-RPC services. The protocol defines standard error codes and allows custom error data.
from nexios_contrib.jrpc.exceptions import JsonRpcError
@registry.register("divide")
def divide(a: float, b: float) -> float:
"""Divide two numbers with comprehensive error handling.
This method demonstrates several error handling patterns:
- Input validation
- Business logic errors
- Structured error responses
"""
# Input type validation
if not isinstance(a, (int, float)) or not isinstance(b, (int, float)):
raise JsonRpcError(
code=-32602, # Invalid params - standard JSON-RPC error code
message="Parameters must be numbers",
data={
"received_types": {
"a": type(a).__name__,
"b": type(b).__name__
},
"expected_types": ["int", "float"]
}
)
# Business logic validation
if b == 0:
raise JsonRpcError(
code=-32602, # Invalid params
message="Division by zero is not allowed",
data={
"dividend": a,
"divisor": b,
"suggestion": "Use a non-zero divisor"
}
)
return a / b
@registry.register("validate_email")
def validate_email(email: str) -> dict:
"""Validate email format with detailed feedback.
Returns validation result with detailed information about why
an email might be invalid.
"""
import re
# Type validation
if not isinstance(email, str):
raise JsonRpcError(
code=-32602,
message="Email must be a string",
data={
"expected": "string",
"received": type(email).__name__,
"value": str(email)[:50] # Truncate for safety
}
)
# Length validation
if len(email) > 254: # RFC 5321 limit
raise JsonRpcError(
code=-32602,
message="Email address too long",
data={
"max_length": 254,
"received_length": len(email)
}
)
# Format validation
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
is_valid = bool(re.match(pattern, email))
return {
"is_valid": is_valid,
"email": email,
"checks": {
"format": is_valid,
"length": len(email) <= 254,
"has_at_symbol": "@" in email,
"has_domain": "." in email.split("@")[-1] if "@" in email else False
}
}
@registry.register("process_user_data")
async def process_user_data(user_data: dict) -> dict:
"""Process user data with comprehensive validation and error handling."""
# Required fields validation
required_fields = ["name", "email", "age"]
missing_fields = [field for field in required_fields if field not in user_data]
if missing_fields:
raise JsonRpcError(
code=-32602,
message="Missing required fields",
data={
"missing_fields": missing_fields,
"required_fields": required_fields,
"provided_fields": list(user_data.keys())
}
)
# Age validation
try:
age = int(user_data["age"])
if age < 0 or age > 150:
raise ValueError("Age out of valid range")
except (ValueError, TypeError) as e:
raise JsonRpcError(
code=-32602,
message="Invalid age value",
data={
"age_value": user_data["age"],
"error": str(e),
"valid_range": "0-150"
}
)
# Simulate processing that might fail
try:
# This could be a database operation, API call, etc.
processed_data = {
"id": f"user_{hash(user_data['email']) % 10000}",
"name": user_data["name"].strip().title(),
"email": user_data["email"].lower(),
"age": age,
"status": "processed"
}
return processed_data
except Exception as e:
# Catch unexpected errors and convert to JSON-RPC errors
raise JsonRpcError(
code=-32603, # Internal error
message="Failed to process user data",
data={
"error_type": type(e).__name__,
"error_message": str(e),
"user_data_keys": list(user_data.keys())
}
)JSON-RPC Error Codes Reference:
- -32700: Parse error (invalid JSON)
- -32600: Invalid request (malformed JSON-RPC)
- -32601: Method not found
- -32602: Invalid params (use this for validation errors)
- -32603: Internal error (server-side errors)
- -32000 to -32099: Server error (reserved for implementation-defined errors)
Error Handling Best Practices:
- Use Standard Codes: Stick to JSON-RPC standard error codes when possible
- Provide Context: Include helpful data in the error response
- Validate Early: Check inputs before processing to fail fast
- Structured Data: Use consistent error data structures
- Security: Don't expose sensitive information in error messages
- Logging: Log errors server-side for debugging while returning clean errors to clients
Advanced Server Configuration
The JSON-RPC plugin offers extensive configuration options for production deployments and complex scenarios.
from nexios import NexiosApp
from nexios_contrib.jrpc.server import JsonRpcPlugin
from nexios_contrib.jrpc.registry import Registry, get_registry
import logging
app = NexiosApp()
# Method 1: Using Custom Registry (Isolated Method Namespace)
# Create a separate registry for specific functionality
custom_registry = Registry()
@custom_registry.register("custom_method")
def custom_method(param: str) -> str:
"""Custom method in isolated registry.
Custom registries are useful for:
- Separating different API versions
- Creating isolated method namespaces
- Testing without affecting global registry
- Multi-tenant applications
"""
return f"Custom: {param}"
# Method 2: Multiple JSON-RPC Endpoints
# You can mount multiple JSON-RPC endpoints with different configurations
# Admin API endpoint with restricted methods
admin_registry = Registry()
@admin_registry.register("system_status")
def get_system_status() -> dict:
"""Get system status - admin only."""
import psutil
return {
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"disk_usage": psutil.disk_usage('/').percent
}
@admin_registry.register("restart_service")
async def restart_service(service_name: str) -> dict:
"""Restart a system service - admin only."""
# This would typically require authentication/authorization
return {"status": "restarted", "service": service_name}
# Public API endpoint with general methods
public_registry = get_registry() # Use global registry
@public_registry.register("health_check")
def health_check() -> dict:
"""Public health check endpoint."""
return {"status": "healthy", "timestamp": time.time()}
# Configure multiple JSON-RPC endpoints
JsonRpcPlugin(app, {
"path_prefix": "/api/v1/rpc",
"registry": public_registry,
"cors_enabled": True,
"max_request_size": 1024 * 1024, # 1MB limit
"timeout": 30.0 # 30 second timeout
})
JsonRpcPlugin(app, {
"path_prefix": "/admin/rpc",
"registry": admin_registry,
"cors_enabled": False, # Disable CORS for admin endpoint
"require_auth": True, # Custom auth requirement
"rate_limit": "10/minute" # Rate limiting
})
# Method 3: Advanced Configuration with Middleware Integration
class AuthenticatedJsonRpcPlugin(JsonRpcPlugin):
"""Custom JSON-RPC plugin with authentication."""
async def authenticate_request(self, request):
"""Custom authentication logic."""
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise JsonRpcError(
code=-32001,
message="Authentication required",
data={"auth_type": "Bearer token"}
)
token = auth_header[7:] # Remove "Bearer " prefix
# Validate token here (check database, JWT, etc.)
if not self.validate_token(token):
raise JsonRpcError(
code=-32002,
message="Invalid authentication token"
)
def validate_token(self, token: str) -> bool:
"""Validate authentication token."""
# Implement your token validation logic
return token == "valid_token_123"
# Use custom plugin with authentication
AuthenticatedJsonRpcPlugin(app, {
"path_prefix": "/secure/rpc",
"registry": admin_registry
})
# Method 4: Configuration with Environment Variables
import os
# Production-ready configuration
production_config = {
"path_prefix": os.getenv("JSONRPC_PATH", "/rpc"),
"max_request_size": int(os.getenv("JSONRPC_MAX_SIZE", "2097152")), # 2MB
"timeout": float(os.getenv("JSONRPC_TIMEOUT", "60.0")),
"enable_introspection": os.getenv("JSONRPC_INTROSPECTION", "false").lower() == "true",
"log_requests": os.getenv("JSONRPC_LOG_REQUESTS", "true").lower() == "true",
"cors_origins": os.getenv("JSONRPC_CORS_ORIGINS", "*").split(",")
}
JsonRpcPlugin(app, production_config)
# Method 5: Registry with Method Introspection
@public_registry.register("list_methods")
def list_available_methods() -> dict:
"""List all available RPC methods with their signatures.
This is useful for API discovery and documentation generation.
"""
methods = {}
for method_name, method_func in public_registry.methods.items():
import inspect
sig = inspect.signature(method_func)
methods[method_name] = {
"parameters": [
{
"name": param.name,
"type": str(param.annotation) if param.annotation != param.empty else "Any",
"default": str(param.default) if param.default != param.empty else None
}
for param in sig.parameters.values()
],
"return_type": str(sig.return_annotation) if sig.return_annotation != sig.empty else "Any",
"docstring": method_func.__doc__ or "No documentation available"
}
return methods
if __name__ == "__main__":
# Configure logging for production
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
app.run(host="0.0.0.0", port=8000)Configuration Options Explained:
- path_prefix: URL path where JSON-RPC endpoint is mounted
- registry: Which method registry to use (allows multiple isolated APIs)
- cors_enabled: Enable Cross-Origin Resource Sharing for web clients
- max_request_size: Limit request payload size to prevent abuse
- timeout: Maximum time to wait for method execution
- require_auth: Enable authentication middleware
- rate_limit: Limit requests per time period per client
- enable_introspection: Allow clients to discover available methods
- log_requests: Log all incoming requests for debugging/monitoring
Use Cases for Multiple Endpoints:
- API Versioning:
/api/v1/rpcand/api/v2/rpc - Access Control: Public vs admin endpoints
- Service Separation: Different microservices on same server
- Environment Isolation: Development vs production methods
Client Implementation Deep Dive
Basic Client Usage with Detailed Explanations
import asyncio
from nexios_contrib.jrpc.client import JsonRpcClient
async def example_client():
"""Comprehensive client usage examples."""
# Create client with explicit configuration
# The client manages HTTP connections, request/response serialization,
# and error handling automatically
client = JsonRpcClient(
url="http://localhost:3020/rpc",
timeout=30.0, # Request timeout in seconds
headers={ # Custom HTTP headers
"User-Agent": "MyApp/1.0",
"Authorization": "Bearer your-token-here"
}
)
try:
# Single method call with named parameters
# This creates a JSON-RPC request like:
# {"jsonrpc": "2.0", "method": "add", "params": {"a": 10, "b": 20}, "id": 1}
result = await client.call("add", a=10, b=20)
print(f"Addition: 10 + 20 = {result}")
# Method call with keyword arguments (same as above, different syntax)
result = await client.call("echo", msg="Hello World")
print(f"Echo response: {result}")
# Method call with positional arguments
# This creates: {"jsonrpc": "2.0", "method": "multiply", "params": [3.14, 2.0], "id": 2}
result = await client.call("multiply", 3.14, 2.0)
print(f"Multiplication: 3.14 * 2.0 = {result}")
# Method call with no parameters
result = await client.call("get_time")
print(f"Server time: {result}")
# Method call with complex data structures
user_data = {
"name": "John Doe",
"email": "john@example.com",
"age": 30,
"preferences": {
"theme": "dark",
"notifications": True
}
}
result = await client.call("process_user_data", user_data=user_data)
print(f"Processed user: {result}")
except Exception as e:
print(f"Client error: {e}")
finally:
# Always close the client to free resources
await client.close()
# Context manager approach (recommended)
async def context_manager_example():
"""Using context manager for automatic resource cleanup."""
# The async context manager automatically handles:
# 1. Connection setup
# 2. Resource cleanup on exit
# 3. Proper error handling
client = JsonRpcClient("http://localhost:3020/rpc")
# Multiple calls in sequence
results = []
for i in range(5):
result = await client.call("add", a=i, b=i+1)
results.append(result)
print(f"Sequential results: {results}")
# The client connection is automatically closed when exiting the context
# Concurrent calls for better performance
async def concurrent_calls_example():
"""Making multiple concurrent JSON-RPC calls."""
client = JsonRpcClient("http://localhost:3020/rpc") :
# Create multiple concurrent calls
# This is much faster than sequential calls for independent operations
tasks = [
client.call("add", a=1, b=2),
client.call("multiply", 3, 4),
client.call("echo", msg="concurrent"),
client.call("get_time"),
]
# Wait for all calls to complete
results = await asyncio.gather(*tasks)
print("Concurrent results:")
for i, result in enumerate(results):
print(f" Task {i}: {result}")
# Run examples
if __name__ == "__main__":
asyncio.run(example_client())
asyncio.run(context_manager_example())
asyncio.run(concurrent_calls_example())Client Features Explained:
- Automatic Connection Management: The client handles HTTP connection pooling and reuse
- Request ID Management: Each request gets a unique ID for response matching
- Serialization: Python objects are automatically converted to JSON
- Deserialization: JSON responses are converted back to Python objects
- Error Handling: JSON-RPC errors are converted to Python exceptions
- Timeout Handling: Configurable timeouts prevent hanging requests
- Custom Headers: Support for authentication and custom HTTP headers
Performance Considerations:
- Connection Reuse: The client reuses HTTP connections for multiple requests
- Concurrent Calls: Use
asyncio.gather()for independent parallel requests - Context Managers: Always use context managers or explicit
close()calls - Timeout Configuration: Set appropriate timeouts based on your use case
Comprehensive Client Error Handling
Error handling is crucial for robust client applications. The JSON-RPC client provides detailed error information for different failure scenarios.
from nexios_contrib.jrpc.exceptions import JsonRpcError, JsonRpcClientError
import asyncio
import logging
# Configure logging to see detailed error information
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def comprehensive_error_handling():
"""Demonstrate all types of error handling scenarios."""
client = JsonRpcClient("http://localhost:3020/rpc")
# 1. Handle JSON-RPC Protocol Errors
try:
# This will raise a JSON-RPC error (division by zero)
result = await client.call("divide", a=10, b=0)
except JsonRpcError as e:
logger.error(f"JSON-RPC Error: {e.message} (Code: {e.code})")
# Handle specific error codes
if e.code == -32602: # Invalid params
logger.info("This is a parameter validation error")
if e.data:
logger.info(f"Error details: {e.data}")
elif e.code == -32601: # Method not found
logger.info("The requested method doesn't exist")
elif e.code == -32603: # Internal error
logger.info("Server encountered an internal error")
# Access all error properties
print(f"Error Code: {e.code}")
print(f"Error Message: {e.message}")
print(f"Error Data: {e.data}")
print(f"Request ID: {e.request_id}")
# 2. Handle Network and Connection Errors
try:
# Try to call a method on unreachable server
unreachable_client = JsonRpcClient("http://nonexistent:9999/rpc")
result = await unreachable_client.call("test")
except JsonRpcClientError as e:
logger.error(f"Client Error: {e}")
# Different types of client errors:
if "Connection refused" in str(e):
logger.info("Server is not running or unreachable")
elif "timeout" in str(e).lower():
logger.info("Request timed out")
elif "DNS" in str(e) or "resolve" in str(e):
logger.info("DNS resolution failed")
# 3. Handle Method Not Found Errors
try:
result = await client.call("nonexistent_method", param="value")
except JsonRpcError as e:
if e.code == -32601:
logger.error(f"Method 'nonexistent_method' not found on server")
# You might want to check available methods
try:
methods = await client.call("list_methods")
logger.info(f"Available methods: {list(methods.keys())}")
except:
logger.info("Could not retrieve available methods")
# 4. Handle Invalid Parameter Errors
try:
# Send wrong parameter types
result = await client.call("add", a="not_a_number", b=5)
except JsonRpcError as e:
if e.code == -32602:
logger.error("Parameter validation failed")
if e.data and "expected_types" in e.data:
expected = e.data["expected_types"]
received = e.data["received_types"]
logger.info(f"Expected: {expected}, Received: {received}")
# 5. Handle Timeout Errors
try:
# Create client with short timeout
timeout_client = JsonRpcClient(
"http://localhost:3020/rpc",
timeout=0.1 # Very short timeout
)
async with timeout_client:
result = await timeout_client.call("slow_method")
except JsonRpcClientError as e:
if "timeout" in str(e).lower():
logger.error("Request timed out - server might be overloaded")
# 6. Retry Logic for Transient Errors
max_retries = 3
retry_delay = 1.0
for attempt in range(max_retries):
try:
result = await client.call("unreliable_method")
logger.info(f"Success on attempt {attempt + 1}")
break
except JsonRpcClientError as e:
if attempt < max_retries - 1:
logger.warning(f"Attempt {attempt + 1} failed: {e}")
logger.info(f"Retrying in {retry_delay} seconds...")
await asyncio.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
logger.error(f"All {max_retries} attempts failed")
raise
except JsonRpcError as e:
# Don't retry JSON-RPC errors (they're not transient)
logger.error(f"JSON-RPC error (not retrying): {e.message}")
break
async def graceful_degradation_example():
"""Example of graceful degradation when services are unavailable."""
try:
client = JsonRpcClient("http://localhost:3020/rpc"):
# Try to get data from remote service
result = await client.call("get_user_data", user_id=123)
return result
except (JsonRpcClientError, JsonRpcError) as e:
logger.warning(f"Remote service unavailable: {e}")
# Fallback to cached data or default values
return {
"user_id": 123,
"name": "Unknown User",
"status": "offline",
"source": "fallback_data"
}
async def batch_operations_with_error_handling():
"""Handle errors in batch operations gracefully."""
client = JsonRpcClient("http://localhost:3020/rpc"):
# List of operations to perform
operations = [
("add", {"a": 1, "b": 2}),
("divide", {"a": 10, "b": 0}), # This will fail
("multiply", {"a": 3, "b": 4}),
("nonexistent", {"param": "value"}), # This will fail
("echo", {"msg": "hello"})
]
results = []
for method, params in operations:
try:
result = await client.call(method, **params)
results.append({"method": method, "result": result, "success": True})
except JsonRpcError as e:
results.append({
"method": method,
"error": {"code": e.code, "message": e.message},
"success": False
})
except Exception as e:
results.append({
"method": method,
"error": {"message": str(e)},
"success": False
})
# Process results
successful = [r for r in results if r["success"]]
failed = [r for r in results if not r["success"]]
logger.info(f"Successful operations: {len(successful)}")
logger.info(f"Failed operations: {len(failed)}")
return results
if __name__ == "__main__":
asyncio.run(comprehensive_error_handling())
asyncio.run(graceful_degradation_example())
asyncio.run(batch_operations_with_error_handling())Error Types Explained:
JsonRpcError: Protocol-level errors from the server
- Method not found (-32601)
- Invalid parameters (-32602)
- Internal server error (-32603)
- Custom application errors
JsonRpcClientError: Client-side errors
- Network connectivity issues
- Timeout errors
- Invalid server responses
- Connection failures
Error Handling Best Practices:
- Specific Exception Handling: Catch specific error types for appropriate responses
- Retry Logic: Implement retries for transient network errors
- Graceful Degradation: Provide fallback behavior when services are unavailable
- Logging: Log errors with sufficient detail for debugging
- User Feedback: Provide meaningful error messages to end users
- Circuit Breaker: Stop calling failing services to prevent cascading failures
Integration with Nexios Framework
Middleware Integration
JSON-RPC seamlessly integrates with Nexios's middleware system, allowing you to add cross-cutting concerns like logging, authentication, and monitoring.
Here's a simple example of adding logging middleware:
from nexios import NexiosApp
from nexios_contrib.jrpc.server import JsonRpcPlugin
import time
import logging
app = NexiosApp()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Simple logging middleware
async def logging_middleware(request, response, call_next):
start_time = time.time()
logger.info(f"JSON-RPC Request: {request.method} {request.url}")
response = await call_next()
duration = time.time() - start_time
logger.info(f"Response: {response.status_code} ({duration:.3f}s)")
return response
app.add_middleware(logging_middleware)
JsonRpcPlugin(app, {"path_prefix": "/rpc"})View Advanced Middleware Examples
from nexios import NexiosApp
from nexios_contrib.jrpc.server import JsonRpcPlugin
from nexios_contrib.jrpc.exceptions import JsonRpcError
import time
import json
import logging
from collections import defaultdict, deque
app = NexiosApp()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 1. Comprehensive Logging Middleware
async def comprehensive_logging_middleware(request, response, call_next):
"""Comprehensive request/response logging for JSON-RPC calls."""
start_time = time.time()
request_id = f"req_{int(start_time * 1000000)}"
# Log incoming request
logger.info(f"[{request_id}] JSON-RPC Request: {request.method} {request.url}")
# For JSON-RPC, log the method being called
if request.method == "POST" and "/rpc" in str(request.url):
try:
body = await request.body()
if body:
rpc_data = json.loads(body)
method_name = rpc_data.get("method", "unknown")
logger.info(f"[{request_id}] RPC Method: {method_name}")
except Exception as e:
logger.warning(f"[{request_id}] Could not parse RPC body: {e}")
response = await call_next()
# Log response
duration = time.time() - start_time
logger.info(f"[{request_id}] Response: {response.status_code} ({duration:.3f}s)")
# Add custom headers
response.headers["X-Request-ID"] = request_id
response.headers["X-Processing-Time"] = f"{duration:.3f}"
return response
# 2. Rate Limiting Middleware
class RateLimitingMiddleware:
"""Rate limiting middleware for JSON-RPC endpoints."""
def __init__(self, max_requests: int = 100, window_seconds: int = 60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.request_counts = defaultdict(deque)
async def __call__(self, request, response, call_next):
"""Apply rate limiting to JSON-RPC requests."""
# Only rate limit JSON-RPC endpoints
if not (request.method == "POST" and "/rpc" in str(request.url)):
return await call_next()
# Get client identifier (IP address)
client_ip = request.client.host if request.client else "unknown"
current_time = time.time()
# Clean old requests outside the window
client_requests = self.request_counts[client_ip]
while client_requests and client_requests[0] < current_time - self.window_seconds:
client_requests.popleft()
# Check rate limit
if len(client_requests) >= self.max_requests:
logger.warning(f"Rate limit exceeded for {client_ip}")
error_response = {
"jsonrpc": "2.0",
"error": {
"code": -32004,
"message": "Rate limit exceeded",
"data": {
"max_requests": self.max_requests,
"window_seconds": self.window_seconds,
"retry_after": self.window_seconds
}
},
"id": None
}
return Response(
content=json.dumps(error_response),
status_code=429,
headers={
"Content-Type": "application/json",
"Retry-After": str(self.window_seconds)
}
)
# Record this request
client_requests.append(current_time)
return await call_next()
# 3. CORS Middleware for Web Clients
async def cors_middleware(request, response, call_next):
"""Handle CORS for web-based JSON-RPC clients."""
# Handle preflight requests
if request.method == "OPTIONS":
return Response(
status_code=200,
headers={
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization",
"Access-Control-Max-Age": "86400"
}
)
# Process request
response = await call_next()
# Add CORS headers to response
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Methods"] = "POST, OPTIONS"
response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization"
return response
# Initialize middleware instances
rate_limit_middleware = RateLimitingMiddleware(max_requests=50, window_seconds=60)
# Add middleware to app (order matters!)
app.add_middleware(cors_middleware) # First - handle CORS
app.add_middleware(comprehensive_logging_middleware) # Second - log everything
app.add_middleware(rate_limit_middleware) # Third - rate limiting
# JSON-RPC will automatically use all Nexios middleware
JsonRpcPlugin(app, {"path_prefix": "/rpc"})
if __name__ == "__main__":
print("🚀 Starting Nexios JSON-RPC server with middleware...")
print("⚡ Rate limiting: 50 requests per minute per IP")
print("🌐 CORS: Enabled for web clients")
app.run(host="0.0.0.0", port=8000)Middleware Benefits:
- Cross-Cutting Concerns: Handle logging, rate limiting, etc. across all RPC methods
- Separation of Concerns: Keep business logic separate from infrastructure concerns
- Reusability: Middleware can be reused across different endpoints
- Composability: Stack multiple middleware for complex functionality
Method Registration Patterns
The method registry supports various registration patterns for different use cases:
from nexios_contrib.jrpc.registry import get_registry
registry = get_registry()
# Basic method registration
@registry.register("multiply")
def multiply(x: float, y: float) -> float:
"""Multiply two numbers."""
return x * y
# Async method registration
@registry.register("fetch_data")
async def fetch_data(url: str) -> dict:
"""Fetch data from a URL."""
import httpx
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
# Custom method name
@registry.register("get_time")
def current_timestamp() -> float:
"""Get current timestamp."""
import time
return time.time()
# Manual registration
def calculate_area(length: float, width: float) -> float:
"""Calculate rectangle area."""
return length * width
registry.register_method("area", calculate_area)Real-World Examples
Calculator Service Example
Here's a simple calculator service to demonstrate JSON-RPC in action:
from nexios import NexiosApp
from nexios_contrib.jrpc.server import JsonRpcPlugin
from nexios_contrib.jrpc.registry import get_registry
from nexios_contrib.jrpc.exceptions import JsonRpcError
import math
app = NexiosApp()
registry = get_registry()
@registry.register("add")
def add(a: float, b: float) -> float:
"""Add two numbers."""
return a + b
@registry.register("divide")
def divide(a: float, b: float) -> float:
"""Divide a by b."""
if b == 0:
raise JsonRpcError(
code=-32602,
message="Division by zero is not allowed"
)
return a / b
@registry.register("sqrt")
def square_root(number: float) -> float:
"""Calculate square root."""
if number < 0:
raise JsonRpcError(
code=-32602,
message="Cannot calculate square root of negative number"
)
return math.sqrt(number)
JsonRpcPlugin(app, {"path_prefix": "/calc"})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000)View Complete Calculator Service with Advanced Features
from nexios import NexiosApp
from nexios_contrib.jrpc.server import JsonRpcPlugin
from nexios_contrib.jrpc.registry import get_registry
from nexios_contrib.jrpc.exceptions import JsonRpcError
import math
import logging
from typing import Union, List
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = NexiosApp()
registry = get_registry()
def validate_number(value, param_name: str) -> float:
"""Validate and convert input to float with proper error handling."""
if value is None:
raise JsonRpcError(
code=-32602,
message=f"Parameter '{param_name}' cannot be null",
data={"parameter": param_name, "received": None}
)
if isinstance(value, bool): # bool is subclass of int, check first
raise JsonRpcError(
code=-32602,
message=f"Parameter '{param_name}' cannot be boolean",
data={"parameter": param_name, "received": value, "type": "boolean"}
)
if not isinstance(value, (int, float)):
raise JsonRpcError(
code=-32602,
message=f"Parameter '{param_name}' must be a number",
data={
"parameter": param_name,
"expected": ["int", "float"],
"received": type(value).__name__,
"value": str(value)[:50]
}
)
if math.isnan(value) or math.isinf(value):
raise JsonRpcError(
code=-32602,
message=f"Parameter '{param_name}' must be a finite number",
data={"parameter": param_name, "received": value}
)
return float(value)
@registry.register("add")
def add(a, b) -> float:
"""Add two numbers with validation."""
a_val = validate_number(a, "a")
b_val = validate_number(b, "b")
result = a_val + b_val
logger.info(f"Addition: {a_val} + {b_val} = {result}")
return result
@registry.register("subtract")
def subtract(a, b) -> float:
"""Subtract b from a."""
a_val = validate_number(a, "a")
b_val = validate_number(b, "b")
result = a_val - b_val
logger.info(f"Subtraction: {a_val} - {b_val} = {result}")
return result
@registry.register("multiply")
def multiply(a, b) -> float:
"""Multiply two numbers."""
a_val = validate_number(a, "a")
b_val = validate_number(b, "b")
result = a_val * b_val
logger.info(f"Multiplication: {a_val} * {b_val} = {result}")
return result
@registry.register("divide")
def divide(a, b) -> float:
"""Divide a by b with comprehensive error handling."""
a_val = validate_number(a, "a")
b_val = validate_number(b, "b")
if b_val == 0:
raise JsonRpcError(
code=-32602,
message="Division by zero is not allowed",
data={
"dividend": a_val,
"divisor": b_val,
"suggestion": "Use a non-zero divisor"
}
)
result = a_val / b_val
logger.info(f"Division: {a_val} / {b_val} = {result}")
return result
@registry.register("power")
def power(base, exponent) -> float:
"""Raise base to the power of exponent with overflow protection."""
base_val = validate_number(base, "base")
exp_val = validate_number(exponent, "exponent")
# Check for potential overflow
if abs(base_val) > 1000 and abs(exp_val) > 100:
raise JsonRpcError(
code=-32602,
message="Operation would result in overflow",
data={
"base": base_val,
"exponent": exp_val,
"max_safe_base": 1000,
"max_safe_exponent": 100
}
)
try:
result = base_val ** exp_val
if math.isinf(result):
raise OverflowError("Result is infinite")
logger.info(f"Power: {base_val} ^ {exp_val} = {result}")
return result
except OverflowError:
raise JsonRpcError(
code=-32603,
message="Calculation resulted in overflow",
data={"base": base_val, "exponent": exp_val}
)
@registry.register("sqrt")
def square_root(number) -> float:
"""Calculate square root with domain validation."""
num_val = validate_number(number, "number")
if num_val < 0:
raise JsonRpcError(
code=-32602,
message="Cannot calculate square root of negative number",
data={
"number": num_val,
"suggestion": "Use a non-negative number"
}
)
result = math.sqrt(num_val)
logger.info(f"Square root: √{num_val} = {result}")
return result
@registry.register("factorial")
def factorial(n: int) -> int:
"""Calculate factorial with input validation."""
if not isinstance(n, int):
raise JsonRpcError(
code=-32602,
message="Factorial requires an integer",
data={"received": type(n).__name__, "value": n}
)
if n < 0:
raise JsonRpcError(
code=-32602,
message="Factorial is not defined for negative numbers",
data={"number": n}
)
if n > 170: # Factorial of 171 overflows float64
raise JsonRpcError(
code=-32602,
message="Number too large for factorial calculation",
data={"number": n, "max_safe_value": 170}
)
result = math.factorial(n)
logger.info(f"Factorial: {n}! = {result}")
return result
@registry.register("batch_calculate")
def batch_calculate(operations: List[dict]) -> List[dict]:
"""Perform multiple calculations in a single request."""
if not isinstance(operations, list):
raise JsonRpcError(
code=-32602,
message="Operations must be a list",
data={"received": type(operations).__name__}
)
if len(operations) > 100:
raise JsonRpcError(
code=-32602,
message="Too many operations in batch",
data={"count": len(operations), "max_allowed": 100}
)
results = []
for i, op in enumerate(operations):
try:
if not isinstance(op, dict):
results.append({
"index": i,
"success": False,
"error": "Operation must be an object"
})
continue
operation = op.get("operation")
params = op.get("params", {})
if operation == "add":
result = add(**params)
elif operation == "subtract":
result = subtract(**params)
elif operation == "multiply":
result = multiply(**params)
elif operation == "divide":
result = divide(**params)
elif operation == "power":
result = power(**params)
else:
raise JsonRpcError(
code=-32601,
message=f"Unknown operation: {operation}"
)
results.append({
"index": i,
"success": True,
"result": result,
"operation": operation
})
except JsonRpcError as e:
results.append({
"index": i,
"success": False,
"error": {
"code": e.code,
"message": e.message,
"data": e.data
},
"operation": op.get("operation", "unknown")
})
except Exception as e:
results.append({
"index": i,
"success": False,
"error": str(e),
"operation": op.get("operation", "unknown")
})
return results
@registry.register("get_calculator_info")
def get_calculator_info() -> dict:
"""Get information about the calculator service."""
return {
"service": "Advanced Calculator Service",
"version": "1.0.0",
"supported_operations": [
"add", "subtract", "multiply", "divide",
"power", "sqrt", "factorial", "batch_calculate"
],
"features": [
"Input validation",
"Overflow protection",
"Batch operations",
"Comprehensive error handling"
],
"limits": {
"max_batch_operations": 100,
"max_factorial_input": 170,
"max_safe_power_base": 1000,
"max_safe_power_exponent": 100
}
}
# Configure JSON-RPC plugin
JsonRpcPlugin(app, {
"path_prefix": "/calc",
"cors_enabled": True,
"max_request_size": 1024 * 1024 # 1MB for batch operations
})
if __name__ == "__main__":
print("🧮 Advanced Calculator Service")
print("📊 Features: Input validation, batch operations")
print("🔗 Endpoint: http://localhost:8000/calc")
print("\n📋 Available operations:")
print(" - add(a, b)")
print(" - subtract(a, b)")
print(" - multiply(a, b)")
print(" - divide(a, b)")
print(" - power(base, exponent)")
print(" - sqrt(number)")
print(" - factorial(n)")
print(" - batch_calculate(operations)")
print(" - get_calculator_info()")
app.run(host="0.0.0.0", port=8000)Key Features:
- Input Validation: Type checking and range validation
- Error Handling: Detailed error messages with helpful data
- Batch Operations: Process multiple calculations in one request
- Overflow Protection: Prevents calculations that would cause overflow
- Service Introspection: Clients can query service capabilities
Enterprise User Management Service
This example demonstrates a production-ready user management system with comprehensive validation, audit logging, and advanced features.
from nexios import NexiosApp
from nexios_contrib.jrpc.server import JsonRpcPlugin
from nexios_contrib.jrpc.registry import get_registry
from nexios_contrib.jrpc.exceptions import JsonRpcError
import uuid
import time
import re
import hashlib
import logging
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = NexiosApp()
registry = get_registry()
@dataclass
class User:
"""User data model with validation and serialization."""
id: str
name: str
email: str
created_at: float
updated_at: float
status: str = "active"
role: str = "user"
last_login: Optional[float] = None
login_count: int = 0
metadata: Dict[str, Any] = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
def to_dict(self) -> dict:
"""Convert user to dictionary for JSON serialization."""
return asdict(self)
def to_public_dict(self) -> dict:
"""Convert user to public dictionary (excluding sensitive data)."""
data = self.to_dict()
# Remove sensitive fields if needed
return data
@dataclass
class AuditLog:
"""Audit log entry for tracking user operations."""
id: str
user_id: str
action: str
timestamp: float
details: Dict[str, Any]
performed_by: Optional[str] = None
class UserService:
"""Comprehensive user management service with validation and audit logging."""
def __init__(self):
# In-memory storage (use database in production)
self.users: Dict[str, User] = {}
self.audit_logs: List[AuditLog] = []
# Email validation regex
self.email_pattern = re.compile(
r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
)
# Valid user statuses and roles
self.valid_statuses = {"active", "inactive", "suspended", "deleted"}
self.valid_roles = {"user", "admin", "moderator", "guest"}
def validate_email(self, email: str) -> None:
"""Validate email format and uniqueness."""
if not isinstance(email, str):
raise JsonRpcError(
code=-32602,
message="Email must be a string",
data={"received_type": type(email).__name__}
)
if not email or len(email.strip()) == 0:
raise JsonRpcError(
code=-32602,
message="Email cannot be empty"
)
email = email.strip().lower()
if len(email) > 254: # RFC 5321 limit
raise JsonRpcError(
code=-32602,
message="Email address too long",
data={"max_length": 254, "received_length": len(email)}
)
if not self.email_pattern.match(email):
raise JsonRpcError(
code=-32602,
message="Invalid email format",
data={"email": email, "expected_format": "user@domain.com"}
)
# Check for existing email
for user in self.users.values():
if user.email.lower() == email and user.status != "deleted":
raise JsonRpcError(
code=-32602,
message="Email already exists",
data={"email": email}
)
def validate_name(self, name: str) -> str:
"""Validate and normalize user name."""
if not isinstance(name, str):
raise JsonRpcError(
code=-32602,
message="Name must be a string",
data={"received_type": type(name).__name__}
)
name = name.strip()
if not name:
raise JsonRpcError(
code=-32602,
message="Name cannot be empty"
)
if len(name) < 2:
raise JsonRpcError(
code=-32602,
message="Name must be at least 2 characters long",
data={"min_length": 2, "received_length": len(name)}
)
if len(name) > 100:
raise JsonRpcError(
code=-32602,
message="Name too long",
data={"max_length": 100, "received_length": len(name)}
)
# Check for valid characters (letters, spaces, hyphens, apostrophes)
if not re.match(r"^[a-zA-Z\s\-'\.]+$", name):
raise JsonRpcError(
code=-32602,
message="Name contains invalid characters",
data={"name": name, "allowed_chars": "letters, spaces, hyphens, apostrophes, periods"}
)
return name
def log_action(self, action: str, user_id: str, details: Dict[str, Any], performed_by: Optional[str] = None):
"""Log user action for audit trail."""
log_entry = AuditLog(
id=str(uuid.uuid4()),
user_id=user_id,
action=action,
timestamp=time.time(),
details=details,
performed_by=performed_by
)
self.audit_logs.append(log_entry)
logger.info(f"Audit: {action} for user {user_id} by {performed_by or 'system'}")
# Initialize user service
user_service = UserService()
@registry.register("create_user")
def create_user(name: str, email: str, role: str = "user", metadata: Optional[Dict[str, Any]] = None) -> dict:
"""Create a new user with comprehensive validation.
This method demonstrates:
- Input validation and sanitization
- Business rule enforcement
- Audit logging
- Structured error responses
Args:
name: User's full name (2-100 characters, letters/spaces/hyphens only)
email: Valid email address (unique, RFC compliant)
role: User role (user, admin, moderator, guest)
metadata: Optional additional user data
Returns:
Created user object with generated ID and timestamps
"""
# Validate inputs
validated_name = user_service.validate_name(name)
user_service.validate_email(email)
# Validate role
if role not in user_service.valid_roles:
raise JsonRpcError(
code=-32602,
message="Invalid user role",
data={
"role": role,
"valid_roles": list(user_service.valid_roles)
}
)
# Validate metadata
if metadata is not None and not isinstance(metadata, dict):
raise JsonRpcError(
code=-32602,
message="Metadata must be an object",
data={"received_type": type(metadata).__name__}
)
# Create user
user_id = str(uuid.uuid4())
current_time = time.time()
user = User(
id=user_id,
name=validated_name,
email=email.strip().lower(),
created_at=current_time,
updated_at=current_time,
role=role,
metadata=metadata or {}
)
user_service.users[user_id] = user
# Log action
user_service.log_action(
action="user_created",
user_id=user_id,
details={
"name": validated_name,
"email": user.email,
"role": role
}
)
logger.info(f"Created user: {user.name} ({user.email}) with ID {user_id}")
return user.to_public_dict()
@registry.register("get_user")
def get_user(user_id: str, include_metadata: bool = False) -> dict:
"""Get user by ID with optional metadata inclusion."""
if not isinstance(user_id, str) or not user_id.strip():
raise JsonRpcError(
code=-32602,
message="User ID must be a non-empty string",
data={"received": user_id}
)
user = user_service.users.get(user_id)
if not user or user.status == "deleted":
raise JsonRpcError(
code=-32602,
message="User not found",
data={"user_id": user_id}
)
user_data = user.to_public_dict()
if not include_metadata:
user_data.pop("metadata", None)
return user_data
@registry.register("list_users")
def list_users(
status: Optional[str] = None,
role: Optional[str] = None,
limit: int = 50,
offset: int = 0,
include_deleted: bool = False
) -> dict:
"""List users with filtering and pagination.
This demonstrates advanced querying capabilities with pagination.
"""
# Validate parameters
if limit < 1 or limit > 1000:
raise JsonRpcError(
code=-32602,
message="Limit must be between 1 and 1000",
data={"limit": limit, "min": 1, "max": 1000}
)
if offset < 0:
raise JsonRpcError(
code=-32602,
message="Offset must be non-negative",
data={"offset": offset}
)
if status and status not in user_service.valid_statuses:
raise JsonRpcError(
code=-32602,
message="Invalid status filter",
data={"status": status, "valid_statuses": list(user_service.valid_statuses)}
)
if role and role not in user_service.valid_roles:
raise JsonRpcError(
code=-32602,
message="Invalid role filter",
data={"role": role, "valid_roles": list(user_service.valid_roles)}
)
# Filter users
filtered_users = []
for user in user_service.users.values():
# Skip deleted users unless explicitly requested
if user.status == "deleted" and not include_deleted:
continue
# Apply filters
if status and user.status != status:
continue
if role and user.role != role:
continue
filtered_users.append(user)
# Sort by creation date (newest first)
filtered_users.sort(key=lambda u: u.created_at, reverse=True)
# Apply pagination
total_count = len(filtered_users)
paginated_users = filtered_users[offset:offset + limit]
return {
"users": [user.to_public_dict() for user in paginated_users],
"pagination": {
"total_count": total_count,
"limit": limit,
"offset": offset,
"has_more": offset + limit < total_count
},
"filters": {
"status": status,
"role": role,
"include_deleted": include_deleted
}
}
@registry.register("update_user")
def update_user(
user_id: str,
name: Optional[str] = None,
email: Optional[str] = None,
role: Optional[str] = None,
status: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
) -> dict:
"""Update user information with partial updates and validation."""
user = user_service.users.get(user_id)
if not user or user.status == "deleted":
raise JsonRpcError(
code=-32602,
message="User not found",
data={"user_id": user_id}
)
changes = {}
# Validate and apply name update
if name is not None:
validated_name = user_service.validate_name(name)
if validated_name != user.name:
changes["name"] = {"old": user.name, "new": validated_name}
user.name = validated_name
# Validate and apply email update
if email is not None:
# Temporarily remove current user from email check
old_email = user.email
user.email = "" # Temporarily clear to avoid self-conflict
try:
user_service.validate_email(email)
new_email = email.strip().lower()
if new_email != old_email:
changes["email"] = {"old": old_email, "new": new_email}
user.email = new_email
else:
user.email = old_email # Restore if no change
except JsonRpcError:
user.email = old_email # Restore on validation error
raise
# Validate and apply role update
if role is not None:
if role not in user_service.valid_roles:
raise JsonRpcError(
code=-32602,
message="Invalid user role",
data={"role": role, "valid_roles": list(user_service.valid_roles)}
)
if role != user.role:
changes["role"] = {"old": user.role, "new": role}
user.role = role
# Validate and apply status update
if status is not None:
if status not in user_service.valid_statuses:
raise JsonRpcError(
code=-32602,
message="Invalid user status",
data={"status": status, "valid_statuses": list(user_service.valid_statuses)}
)
if status != user.status:
changes["status"] = {"old": user.status, "new": status}
user.status = status
# Update metadata
if metadata is not None:
if not isinstance(metadata, dict):
raise JsonRpcError(
code=-32602,
message="Metadata must be an object",
data={"received_type": type(metadata).__name__}
)
if metadata != user.metadata:
changes["metadata"] = {"old": user.metadata, "new": metadata}
user.metadata = metadata
# Update timestamp if any changes were made
if changes:
user.updated_at = time.time()
# Log action
user_service.log_action(
action="user_updated",
user_id=user_id,
details={"changes": changes}
)
logger.info(f"Updated user {user_id}: {list(changes.keys())}")
return {
"user": user.to_public_dict(),
"changes": changes,
"updated": len(changes) > 0
}
@registry.register("delete_user")
def delete_user(user_id: str, hard_delete: bool = False) -> dict:
"""Delete user with soft/hard delete options."""
user = user_service.users.get(user_id)
if not user:
raise JsonRpcError(
code=-32602,
message="User not found",
data={"user_id": user_id}
)
if user.status == "deleted":
raise JsonRpcError(
code=-32602,
message="User already deleted",
data={"user_id": user_id}
)
if hard_delete:
# Permanently remove user
del user_service.users[user_id]
action = "user_hard_deleted"
logger.info(f"Hard deleted user {user_id}")
else:
# Soft delete - mark as deleted
user.status = "deleted"
user.updated_at = time.time()
action = "user_soft_deleted"
logger.info(f"Soft deleted user {user_id}")
# Log action
user_service.log_action(
action=action,
user_id=user_id,
details={"hard_delete": hard_delete}
)
return {
"user_id": user_id,
"deleted": True,
"hard_delete": hard_delete,
"timestamp": time.time()
}
@registry.register("search_users")
def search_users(query: str, fields: List[str] = None, limit: int = 20) -> dict:
"""Search users by name or email with flexible field matching."""
if not isinstance(query, str) or len(query.strip()) < 2:
raise JsonRpcError(
code=-32602,
message="Search query must be at least 2 characters",
data={"query": query}
)
if fields is None:
fields = ["name", "email"]
valid_fields = {"name", "email", "role"}
invalid_fields = set(fields) - valid_fields
if invalid_fields:
raise JsonRpcError(
code=-32602,
message="Invalid search fields",
data={"invalid_fields": list(invalid_fields), "valid_fields": list(valid_fields)}
)
query = query.strip().lower()
matching_users = []
for user in user_service.users.values():
if user.status == "deleted":
continue
# Check each specified field
for field in fields:
field_value = getattr(user, field, "").lower()
if query in field_value:
matching_users.append({
"user": user.to_public_dict(),
"matched_field": field,
"match_position": field_value.find(query)
})
break # Don't add the same user multiple times
# Sort by relevance (exact matches first, then by match position)
matching_users.sort(key=lambda x: (x["match_position"] != 0, x["match_position"]))
return {
"query": query,
"fields_searched": fields,
"results": matching_users[:limit],
"total_matches": len(matching_users),
"limited": len(matching_users) > limit
}
@registry.register("get_user_audit_log")
def get_user_audit_log(user_id: str, limit: int = 50) -> dict:
"""Get audit log for a specific user."""
user = user_service.users.get(user_id)
if not user:
raise JsonRpcError(
code=-32602,
message="User not found",
data={"user_id": user_id}
)
# Filter audit logs for this user
user_logs = [
{
"id": log.id,
"action": log.action,
"timestamp": log.timestamp,
"details": log.details,
"performed_by": log.performed_by
}
for log in user_service.audit_logs
if log.user_id == user_id
]
# Sort by timestamp (newest first)
user_logs.sort(key=lambda x: x["timestamp"], reverse=True)
return {
"user_id": user_id,
"audit_logs": user_logs[:limit],
"total_logs": len(user_logs),
"limited": len(user_logs) > limit
}
@registry.register("get_service_stats")
def get_service_stats() -> dict:
"""Get comprehensive service statistics."""
total_users = len(user_service.users)
# Count by status
status_counts = {}
role_counts = {}
for user in user_service.users.values():
status_counts[user.status] = status_counts.get(user.status, 0) + 1
role_counts[user.role] = role_counts.get(user.role, 0) + 1
# Recent activity (last 24 hours)
recent_threshold = time.time() - 86400 # 24 hours ago
recent_logs = [log for log in user_service.audit_logs if log.timestamp > recent_threshold]
return {
"total_users": total_users,
"status_breakdown": status_counts,
"role_breakdown": role_counts,
"recent_activity": {
"last_24_hours": len(recent_logs),
"total_audit_logs": len(user_service.audit_logs)
},
"service_info": {
"version": "1.0.0",
"features": [
"User CRUD operations",
"Advanced search and filtering",
"Audit logging",
"Soft/hard delete",
"Input validation",
"Pagination support"
]
}
}
# Configure JSON-RPC plugin
JsonRpcPlugin(app, {
"path_prefix": "/users",
"cors_enabled": True,
"max_request_size": 2 * 1024 * 1024 # 2MB for batch operations
})
if __name__ == "__main__":
print("👥 Enterprise User Management Service")
print("🔒 Features: Validation, audit logging, search, pagination")
print("🔗 Endpoint: http://localhost:8000/users")
print("\n📋 Available operations:")
print(" - create_user(name, email, role?, metadata?)")
print(" - get_user(user_id, include_metadata?)")
print(" - list_users(status?, role?, limit?, offset?, include_deleted?)")
print(" - update_user(user_id, name?, email?, role?, status?, metadata?)")
print(" - delete_user(user_id, hard_delete?)")
print(" - search_users(query, fields?, limit?)")
print(" - get_user_audit_log(user_id, limit?)")
print(" - get_service_stats()")
app.run(host="0.0.0.0", port=8000)Built with ❤️ by the @nexios-labs community.
