Docs
Advanced Usage - Python SDK
Advanced Usage - Python SDK
Error handling, testing, and advanced patterns with the Python SDK
Error Handling
Exception Types
from lumnisai import (
Client,
AuthenticationError,
NotFoundError,
RateLimitError,
ValidationError,
LumnisAPIError
)
client = Client(api_key="your-api-key")
try:
response = client.invoke("Hello!")
except AuthenticationError as e:
# Invalid API key or authentication issue
print(f"Authentication failed: {e}")
except ValidationError as e:
# Invalid request parameters
print(f"Validation error: {e}")
print(f"Details: {e.details}")
except RateLimitError as e:
# Rate limit exceeded
print(f"Rate limited. Retry after {e.retry_after} seconds")
except NotFoundError as e:
# Resource not found
print(f"Resource not found: {e}")
except LumnisAPIError as e:
# Generic API error
print(f"API error: {e}")
print(f"Status code: {e.status_code}")
print(f"Error code: {e.error_code}")Handling Specific Errors
from lumnisai import Client, ValidationError, NotFoundError
client = Client(api_key="your-api-key")
# Handle validation errors
try:
response = client.invoke(
messages=[], # Invalid: empty messages
user_id="user@example.com"
)
except ValidationError as e:
print(f"Invalid request: {e.details}")
# Fix and retry
response = client.invoke(
"Valid message",
user_id="user@example.com"
)
# Handle not found errors
try:
response = client.get_response("nonexistent-id")
except NotFoundError:
print("Response not found, creating new one")
response = client.invoke("New task")Retry Logic
import time
from lumnisai import Client, RateLimitError, LumnisAPIError
client = Client(api_key="your-api-key")
def invoke_with_retry(client, prompt, max_retries=3):
"""Invoke with automatic retry on transient errors."""
for attempt in range(max_retries):
try:
return client.invoke(prompt)
except RateLimitError as e:
if attempt < max_retries - 1:
wait_time = e.retry_after or (2 ** attempt)
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
else:
raise
except LumnisAPIError as e:
# Retry on 5xx errors
if 500 <= e.status_code < 600 and attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"Server error. Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
raise
# Usage
response = invoke_with_retry(client, "Hello!")Custom Retry Configuration
from lumnisai import Client
# Configure client-level retries
client = Client(
api_key="your-api-key",
max_retries=5, # Retry up to 5 times
timeout=60.0 # 60 second timeout
)
# Retries are automatic for transient errors
response = client.invoke("Hello!")Idempotent Requests
import uuid
from lumnisai import Client
client = Client(api_key="your-api-key")
# Generate idempotency key
idempotency_key = str(uuid.uuid4())
# First request
response1 = client.invoke(
"Important task",
idempotency_key=idempotency_key,
user_id="user@example.com"
)
# Retry with same key (won't create duplicate)
response2 = client.invoke(
"Important task",
idempotency_key=idempotency_key,
user_id="user@example.com"
)
# Returns the same response
assert response1.response_id == response2.response_idType Hints
from typing import List, Dict, Any
from lumnisai import Client, ResponseObject, AgentConfig
from pydantic import BaseModel
client: Client = Client(api_key="your-api-key")
# Type-safe messages
messages: List[Dict[str, str]] = [
{"role": "user", "content": "Hello"}
]
# Type-safe response
response: ResponseObject = client.invoke(messages=messages)
output: str = response.output_text
# Type-safe structured output
class Person(BaseModel):
name: str
age: int
response = client.invoke(
"Tell me about Ada Lovelace",
response_format=Person
)
if response.structured_response:
person = Person(**response.structured_response)
name: str = person.name
age: int = person.ageTesting
Unit Tests
import pytest
from lumnisai import Client, ValidationError
@pytest.fixture
def client():
return Client(api_key="test-api-key")
def test_create_response(client):
"""Test creating a response."""
response = client.invoke("Hello!")
assert response.response_id is not None
assert response.status in ["queued", "in_progress", "succeeded"]
def test_invalid_request(client):
"""Test validation error handling."""
with pytest.raises(ValidationError):
client.invoke(messages=[]) # Empty messages should fail
def test_list_users(client):
"""Test listing users."""
users = client.list_users()
assert isinstance(users.users, list)
assert users.total >= 0Integration Tests
import pytest
from lumnisai import Client
@pytest.fixture
def client():
"""Fixture with real API credentials."""
return Client(api_key="your-test-api-key")
def test_full_workflow(client):
"""Test complete workflow."""
# Create user
user = client.create_user(
email="test@example.com",
first_name="Test"
)
assert user.email == "test@example.com"
# Create response
response = client.invoke(
"Hello!",
user_id=user.email
)
assert response.response_id is not None
# List responses
responses = client.list_responses(user_id=user.email)
assert len(responses.responses) > 0
# Cleanup
client.delete_user(user.email)Mocking
from unittest.mock import Mock, patch
from lumnisai import Client, ResponseObject
def test_with_mock():
"""Test with mocked API."""
client = Client(api_key="test-key")
# Mock the invoke method
mock_response = ResponseObject(
response_id="test-id",
status="succeeded",
output_text="Hello!",
thread_id="thread-id"
)
with patch.object(client, 'invoke', return_value=mock_response):
response = client.invoke("Test")
assert response.output_text == "Hello!"Advanced Patterns
Connection Pooling
from lumnisai import AsyncClient
import asyncio
async def process_batch(items):
"""Process multiple items concurrently."""
client = AsyncClient(api_key="your-api-key")
async with client:
# Process items concurrently
tasks = [
client.invoke(item, user_id="user@example.com")
for item in items
]
results = await asyncio.gather(*tasks)
return results
# Usage
items = ["Task 1", "Task 2", "Task 3"]
results = asyncio.run(process_batch(items))Progress Tracking
from lumnisai import Client, ProgressTracker
client = Client(api_key="your-api-key")
# Custom progress tracking
tracker = ProgressTracker()
for update in client.invoke("Complex task", stream=True):
# Only display new content (no duplicates)
new_content = tracker.format_new_entries(
update.state,
update.message,
update.tool_calls
)
if new_content:
print(new_content)Response Caching
from functools import lru_cache
from lumnisai import Client
client = Client(api_key="your-api-key")
@lru_cache(maxsize=100)
def cached_invoke(prompt: str) -> str:
"""Cache responses for identical prompts."""
response = client.invoke(prompt)
return response.output_text
# First call hits API
result1 = cached_invoke("What is AI?")
# Second call uses cache
result2 = cached_invoke("What is AI?")
assert result1 == result2Batch Processing
from lumnisai import AsyncClient
import asyncio
async def batch_process(items, batch_size=5):
"""Process items in batches to avoid rate limits."""
client = AsyncClient(api_key="your-api-key")
async with client:
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
# Process batch
batch_results = await asyncio.gather(*[
client.invoke(item, user_id="user@example.com")
for item in batch
])
results.extend(batch_results)
# Rate limit protection
if i + batch_size < len(items):
await asyncio.sleep(1)
return results
# Usage
items = [f"Task {i}" for i in range(20)]
results = asyncio.run(batch_process(items, batch_size=5))Request Middleware
from lumnisai import AsyncClient
from typing import Any, Dict
class LoggingClient(AsyncClient):
"""Client with request/response logging."""
async def invoke(self, *args, **kwargs):
print(f"Request: {args}, {kwargs}")
try:
response = await super().invoke(*args, **kwargs)
print(f"Response: {response.response_id}")
return response
except Exception as e:
print(f"Error: {e}")
raise
# Usage
client = LoggingClient(api_key="your-api-key")
async def main():
async with client:
response = await client.invoke("Hello!")
asyncio.run(main())Performance Optimization
Async for Concurrency
from lumnisai import AsyncClient
import asyncio
async def process_multiple_tasks():
"""Process multiple tasks concurrently."""
client = AsyncClient(api_key="your-api-key")
async with client:
# Create multiple tasks
tasks = [
client.invoke(f"Task {i}", user_id="user@example.com")
for i in range(10)
]
# Wait for all
results = await asyncio.gather(*tasks)
return results
# Much faster than sequential processing
results = asyncio.run(process_multiple_tasks())Connection Reuse
from lumnisai import Client
# Reuse client for multiple requests
client = Client(api_key="your-api-key")
# Connection pooling handled automatically
for i in range(100):
response = client.invoke(f"Task {i}")
# Close when done
client.close()Streaming for Long Tasks
from lumnisai import display_progress
# Use streaming for better responsiveness
for update in client.invoke("Long task", stream=True):
# Display formatted progress
display_progress(update)
# Can process partial results immediately
if update.tool_calls:
process_tool_calls(update.tool_calls)Debugging
Enable Logging
import logging
from lumnisai import Client
# Enable SDK logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("lumnisai")
logger.setLevel(logging.DEBUG)
client = Client(api_key="your-api-key")
response = client.invoke("Hello!")Inspect Request Details
# Access response metadata
response = client.invoke("Hello!")
print(f"Response ID: {response.response_id}")
print(f"Thread ID: {response.thread_id}")
print(f"Status: {response.status}")
print(f"Created: {response.created_at}")
print(f"Completed: {response.completed_at}")
# Access progress entries
for entry in response.progress:
print(f"{entry.ts}: {entry.state} - {entry.message}")Error Details
from lumnisai import Client, LumnisAPIError
client = Client(api_key="your-api-key")
try:
response = client.invoke("Invalid request")
except LumnisAPIError as e:
print(f"Status: {e.status_code}")
print(f"Error code: {e.error_code}")
print(f"Message: {e.message}")
print(f"Details: {e.details}")Best Practices Summary
1. Use Context Managers
# Good: Automatic cleanup
with Client(api_key="your-api-key") as client:
response = client.invoke("Hello!")
# Even better: Async with proper cleanup
async with AsyncClient(api_key="your-api-key") as client:
response = await client.invoke("Hello!")2. Handle Errors Gracefully
try:
response = client.invoke("Hello!")
except ValidationError as e:
# Handle validation errors
print(f"Invalid request: {e.details}")
except RateLimitError:
# Handle rate limits
time.sleep(60)
except LumnisAPIError as e:
# Handle other errors
print(f"Error: {e}")3. Use Type Hints
from lumnisai import Client, ResponseObject
from typing import List, Dict
def process_response(response: ResponseObject) -> str:
return response.output_text
client: Client = Client(api_key="your-api-key")4. Leverage Async for Performance
# Sync: Sequential (slow)
for item in items:
response = client.invoke(item)
# Async: Concurrent (fast)
async with AsyncClient() as client:
results = await asyncio.gather(*[
client.invoke(item) for item in items
])5. Use Idempotency Keys
# For critical operations
response = client.invoke(
"Important task",
idempotency_key=str(uuid.uuid4())
)