Skip to content

Nexios API Examples

This guide provides canonical, working examples for building APIs with Nexios. All code is idiomatic and matches the Nexios source, docs, and real-world examples.

Nexios Request API

  • Use await req.json (not req.json())
  • Use await req.files (not req.files())
  • Use req.path_params.param (not req.path_params['param'])

1. Minimal App and REST API

python
from nexios import NexiosApp

app = NexiosApp()

@app.get("/api/items")
async def get_items(req, res):
    items = [{"id": 1, "name": "Item 1"}, {"id": 2, "name": "Item 2"}]
    return res.json(items)

@app.get("/api/items/{item_id:int}")
async def get_item(req, res):
    item_id = req.path_params.item_id
    return res.json({"id": item_id, "name": f"Item {item_id}"})

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=5000, reload=True)

2. Request Input Handling (JSON Body)

python
from pydantic import BaseModel, ValidationError
from nexios import NexiosApp

app = NexiosApp()

class User(BaseModel):
    name: str
    age: int

@app.post("/json")
async def process_json(req, res):
    try:
        data = await req.json
        user = User(**data)
        return res.json({"status": "success", "user": user.dict()})
    except ValidationError as e:
        return res.json({"error": str(e)}, status_code=422)

2.5. Request Type Detection and Smart Handling

Nexios provides convenient properties to quickly detect and handle different types of requests:

python
from nexios import NexiosApp
from nexios.http import Request, Response

app = NexiosApp()

@app.post("/api/smart-endpoint")
async def smart_handler(req: Request, res: Response):
    """Smart endpoint that handles multiple input types"""

    # Check content type and handle accordingly
    if req.is_json:
        data = await req.json
        return res.json({
            "type": "json",
            "data": data,
            "content_type": req.content_type
        })

    elif req.is_form:
        if req.is_multipart:
            # Handle file uploads
            files = await req.files()
            form_data = await req.form()
            return res.json({
                "type": "multipart",
                "files": list(files.keys()),
                "form_fields": dict(form_data),
                "content_type": req.content_type
            })
        else:
            # Handle URL-encoded form
            form_data = await req.form()
            return res.json({
                "type": "urlencoded",
                "data": dict(form_data),
                "content_type": req.content_type
            })

    elif req.has_body:
        # Handle other content types
        body_text = await req.text()
        return res.json({
            "type": "raw",
            "content_type": req.content_type,
            "size": len(body_text)
        })

    else:
        return res.json({"error": "No body provided"}, status_code=400)

@app.get("/api/request-info")
async def request_info(req: Request, res: Response):
    """Get comprehensive information about the current request"""

    info = {
        "method": req.method,
        "path": req.path,
        "content_type": req.content_type,
        "content_length": req.content_length,

        # Content type flags
        "is_json": req.is_json,
        "is_form": req.is_form,
        "is_multipart": req.is_multipart,
        "is_urlencoded": req.is_urlencoded,

        # Request state flags
        "has_cookie": req.has_cookie,
        "has_files": req.has_files,
        "has_body": req.has_body,
        "is_authenticated": req.is_authenticated,
        "has_session": req.has_session,

        # Security and client info
        "is_secure": req.is_secure,
        "is_ajax": req.is_ajax,
        "client_ip": req.get_client_ip(),
        "user_agent": req.user_agent,

        # Headers (using new utilities)
        "has_authorization": req.has_header("authorization"),
        "accept_header": req.get_header("accept"),
    }

    return res.json(info)

@app.post("/api/validate-input")
async def validate_input(req: Request, res: Response):
    """Validate and process different input types"""

    # File size validation
    if req.has_body and req.content_length > 10 * 1024 * 1024:  # 10MB
        return res.json({"error": "File too large"}, status_code=413)

    # Authentication check
    if req.is_authenticated:
        user_info = {
            "id": req.user.id if req.user else None,
            "authenticated": True
        }
    else:
        user_info = {"authenticated": False}

    # Content type specific processing
    if req.is_json:
        data = await req.json(
        return res.json({
            "processed": True,
            "input_type": "json",
            "user": user_info,
            "data": data
        })

    elif req.is_multipart and req.has_files:
        files = await req.files()
        return res.json({
            "processed": True,
            "input_type": "multipart",
            "user": user_info,
            "files_count": len(files),
            "files": [f.filename for f in files.values()]
        })

    elif req.is_form:
        form_data = await req.form_data
        return res.json({
            "processed": True,
            "input_type": "form",
            "user": user_info,
            "fields": list(form_data.keys())
        })

    else:
        return res.json({
            "error": "Unsupported content type",
            "supported_types": ["application/json", "multipart/form-data", "application/x-www-form-urlencoded"]
        }, status_code=400)

3. Header Management and Custom Headers

python
from nexios import NexiosApp
from nexios.http import Request, Response

app = NexiosApp()

@app.get("/api/headers")
async def header_demo(req: Request, res: Response):
    """Demonstrate header utilities"""

    # Check for specific headers
    if req.has_header("authorization"):
        auth_type = "Bearer" if "Bearer" in req.get_header("authorization", "") else "Other"
        return res.json({
            "authenticated": True,
            "auth_type": auth_type,
            "api_version": req.get_header("x-api-version", "v1")
        })

    # Check for API key
    api_key = req.get_header("x-api-key")
    if not api_key:
        return res.json({"error": "API key required"}, status_code=401)

    # Check client preferences
    if req.accepts_json:
        format = "json"
    elif req.accepts_html:
        format = "html"
    else:
        format = "text"

    return res.json({
        "api_key_provided": bool(api_key),
        "preferred_format": format,
        "is_ajax": req.is_ajax,
        "user_agent": req.user_agent
    })

4. File Upload and Download

python
from nexios import NexiosApp
from pathlib import Path

app = NexiosApp()
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)

@app.post("/upload")
async def upload_files(req, res):
    files = await req.files
    uploaded = []
    for name, file in files.items():
        # Read file content for processing/saving
        file_content = await file.read()
        file_path = UPLOAD_DIR / file.filename
        
        # Save file to disk
        with open(file_path, "wb") as f:
            f.write(file_content)
        
        uploaded.append({
            "filename": file.filename,
            "content_type": file.content_type,
            "size": len(file_content),
            "saved_path": str(file_path)
        })
    return res.json({"uploaded": uploaded})

@app.get("/files/{filename}")
async def download_file(req, res):
    filename = req.path_params.filename
    filepath = UPLOAD_DIR / filename
    if not filepath.exists():
        return res.json({"error": "File not found"}, status_code=404)
    from nexios.responses import FileResponse
    return FileResponse(filepath, filename=filename)

5. Authentication (Session and JWT)

Session Auth Example

python
from nexios import NexiosApp
from nexios.auth.backends.session import SessionAuthBackend
from nexios.auth.middleware import AuthenticationMiddleware
from nexios.auth.base import BaseUser

class User(BaseUser):
    def __init__(self, id: str, username: str):
        self.id = id
        self.username = username

    @property
    def is_authenticated(self) -> bool:
        return True

    @property
    def identity(self) -> str:
        return self.id

    @property
    def display_name(self) -> str:
        return self.username

    @classmethod
    async def load_user(cls, identity: str):
        # Replace with your database lookup
        user_data = get_user_from_db(identity)
        if user_data:
            return cls(id=user_data["id"], username=user_data["username"])
        return None

app = NexiosApp()

# Session backend
session_backend = SessionAuthBackend()

app.add_middleware(AuthenticationMiddleware(
    user_model=User,
    backend=session_backend
))

@app.get("/protected")
async def protected(req, res):
    if req.user and req.user.is_authenticated:
        return res.json({"message": f"Hello, {req.user.display_name}!"})
    return res.json({"error": "Not authenticated"}, status_code=401)

@app.post("/login")
async def login(req, res):
    data = await req.json
    username = data.get("username")
    password = data.get("password")

    # Validate credentials (replace with your logic)
    if username == "admin" and password == "password":
        user = User(id="123", username=username)
        from nexios.auth.backends.session import login
        login(req, user)
        return res.json({"message": "Logged in successfully"})

    return res.json({"error": "Invalid credentials"}, status_code=401)

JWT Auth Example

python
from nexios import NexiosApp
from nexios.auth.backends.jwt import JWTAuthBackend, create_jwt
from nexios.auth.middleware import AuthenticationMiddleware
from nexios.auth.base import BaseUser

class User(BaseUser):
    def __init__(self, id: str, username: str):
        self.id = id
        self.username = username

    @property
    def is_authenticated(self) -> bool:
        return True

    @property
    def identity(self) -> str:
        return self.id

    @property
    def display_name(self) -> str:
        return self.username

    @classmethod
    async def load_user(cls, identity: str):
        # Replace with your database lookup
        user_data = get_user_from_db(identity)
        if user_data:
            return cls(id=user_data["id"], username=user_data["username"])
        return None

app = NexiosApp()

# JWT backend
jwt_backend = JWTAuthBackend()

app.add_middleware(AuthenticationMiddleware(
    user_model=User,
    backend=jwt_backend
))

@app.post("/login")
async def login(req, res):
    data = await req.json
    username = data.get("username")
    password = data.get("password")

    # Validate credentials (replace with your logic)
    if username == "admin" and password == "password":
        user = User(id="123", username=username)
        token = create_jwt({"sub": user.identity})
        return res.json({"token": token})

    return res.json({"error": "Invalid credentials"}, status_code=401)

@app.get("/protected")
async def protected(req, res):
    if req.user and req.user.is_authenticated:
        return res.json({"message": f"Hello, {req.user.display_name}!"})
    return res.json({"error": "Not authenticated"}, status_code=401)

6. Middleware Usage

Custom Middleware

python
async def custom_middleware(req, res,call_next):
    print("Custom middleware called")
    return await call_next(req, res)
app.add_middleware(custom_middleware)

Class Based Middleware

py
from nexios import BaseMiddleware

class CustomMiddleware(BaseMiddleware):
    async def process_request(self, req, res,call_next):
        print("Custom middleware called")
        return await call_next(req, res)
app.add_middleware(CustomMiddleware())

Inbuilt Middleware

python
from nexios import NexiosApp
from nexios.middleware import CORSMiddleware, SecurityMiddleware

app = NexiosApp()
app.add_middleware(CORSMiddleware())
app.add_middleware(SecurityMiddleware())

7. Error Handling

python
from nexios import NexiosApp
from nexios.exceptions import HTTPException

app = NexiosApp()

@app.get("/error")
async def error_route(req, res):
    raise HTTPException(400, "This is a bad request!")

@app.add_exception_handler(HTTPException)
async def handle_http_exception(req, res, exc):
    return res.json({"error": exc.detail}, status_code=exc.status_code)

8. WebSockets (Minimal)

python
from nexios import NexiosApp
from nexios.websockets import WebSocket

app = NexiosApp()

@app.ws_route("/ws")
async def websocket_handler(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            message = await websocket.receive_text()
            await websocket.send_text(f"Echo: {message}")
    except Exception as e:
        print(f"WebSocket error: {e}")
    finally:
        await websocket.close()

9. WebSockets (Chat Room)

python
from typing import Dict, Set
from nexios import NexiosApp
from nexios.websockets import WebSocket

app = NexiosApp()
chat_rooms: Dict[str, Set[WebSocket]] = {}

@app.ws_route("/ws/chat/{room_id}")
async def chat_room(websocket: WebSocket):
    room_id = websocket.path_params.room_id
    await websocket.accept()
    if room_id not in chat_rooms:
        chat_rooms[room_id] = set()
    chat_rooms[room_id].add(websocket)
    try:
        for client in chat_rooms[room_id]:
            if client != websocket:
                await client.send_json({"type": "system", "message": "New user joined the chat"})
        while True:
            data = await websocket.receive_json()
            message = data.get("message", "")
            for client in chat_rooms[room_id]:
                if client != websocket:
                    await client.send_json({"type": "message", "message": message})
    except Exception as e:
        print(f"WebSocket error in room {room_id}: {e}")
    finally:
        chat_rooms[room_id].remove(websocket)
        if not chat_rooms[room_id]:
            del chat_rooms[room_id]
        await websocket.close()

10. Advanced Validation (Pydantic, Enums, Query Params)

python
from datetime import date, datetime
from enum import Enum
from typing import Optional
from pydantic import BaseModel, EmailStr, Field, ValidationError, constr
from nexios import NexiosApp

app = NexiosApp()

class UserRole(str, Enum):
    ADMIN = "admin"
    USER = "user"
    GUEST = "guest"

class UserStatus(str, Enum):
    ACTIVE = "active"
    INACTIVE = "inactive"
    SUSPENDED = "suspended"

class UserCreate(BaseModel):
    username: constr(min_length=3, max_length=50)
    email: EmailStr
    password: constr(min_length=8)
    full_name: str
    birth_date: Optional[date] = None
    role: UserRole = UserRole.USER
    status: UserStatus = UserStatus.ACTIVE

@app.post("/users")
async def create_user(request, response):
    try:
        data = await request.json
        user = UserCreate(**data)
        return response.json(user.dict())
    except ValidationError as e:
        return response.json({"error": str(e)}, status_code=422)

class PaginationParams(BaseModel):
    page: int = Field(ge=1, default=1)
    limit: int = Field(ge=1, le=100, default=10)
    sort_by: str = Field(default="created_at")
    order: str = Field(default="desc")

@app.get("/users")
async def list_users(request, response):
    try:
        params = PaginationParams(
            page=int(request.query_params.get("page", 1)),
            limit=int(request.query_params.get("limit", 10)),
            sort_by=request.query_params.get("sort_by", "created_at"),
            order=request.query_params.get("order", "desc"),
        )
    except ValidationError as e:
        return response.json({"error": "Invalid query parameters", "details": e.errors()}, status_code=422)
    # Simulate paginated response
    users = [{"id": i, "username": f"user{i}"} for i in range(1, 6)]
    return response.json({"items": users, "total": len(users), "page": params.page, "limit": params.limit})

11. Advanced Templating (Inheritance, Context, Filters)

python
from pathlib import Path
from nexios import NexiosApp
from nexios.templating import TemplateConfig, render

app = NexiosApp()
template_config = TemplateConfig(
    template_dir=Path("templates"), trim_blocks=True, lstrip_blocks=True
)
app.config.templating = template_config

@app.get("/")
async def home(request, response):
    return await render(
        "pages/home.html",
        {"title": "Home", "content": "Welcome!", "features": [
            {"name": "Fast", "description": "Built for speed"},
            {"name": "Secure", "description": "Security first"},
        ]},
    )

12. Advanced Routing (Typed Path Params, Wildcards)

python
from nexios import NexiosApp
from nexios.http import Request, Response

app = NexiosApp()

@app.get("/products/{product_id:int}")
async def get_product(req: Request, res: Response):
    product_id = req.path_params.product_id
    return res.json({"product_id": product_id, "type": "integer"})

@app.get("/categories/{category_name:str}")
async def get_category(req: Request, res: Response):
    category_name = req.path_params.category_name
    return res.json({"category_name": category_name, "type": "string"})

@app.get("/wildcard/{wildcard_path:path}")
async def get_wildcard(req: Request, res: Response):
    wildcard_path = req.path_params.wildcard_path
    return res.json({"wildcard_path": wildcard_path, "type": "path"})

13. Custom Response Types

python
from nexios import NexiosApp
from nexios.http.response import (
    BaseResponse, FileResponse, HTMLResponse, JSONResponse, PlainTextResponse
)

app = NexiosApp()


class XMLResponse(BaseResponse):
    media_type = "application/xml"

    def __init__(self, content, *args, **kwargs):
        import xml.etree.ElementTree as ET
        xml_content = ET.tostring(content, encoding="unicode")
        super().__init__(content=xml_content, *args, **kwargs)

@app.get("/json")
async def json_handler(req, res):
    return JSONResponse({"message": "Hello, World!"})

@app.get("/xml")
async def xml_handler(req, res):
    return XMLResponse({"message": "Hello, World!"})

For more, see the official Nexios documentation and the examples/ folder in the repo.