Docs
File Management - Python SDK

File Management - Python SDK

Upload, search, and manage files with the Python SDK

File Management

Upload Files

from lumnisai import Client, FileScope, DuplicateHandling
 
client = Client(api_key="your-api-key")
 
# Upload a single file
file_result = client.upload_file(
    file_path="document.pdf",
    scope=FileScope.TENANT,
    user_id="user@example.com",
    tags=["documentation", "important"],
    duplicate_handling=DuplicateHandling.SUFFIX
)
 
print(f"File ID: {file_result.file_id}")
print(f"Status: {file_result.status}")

Upload from File Content

# Upload from bytes or file object
with open("document.pdf", "rb") as f:
    file_result = client.upload_file(
        file_content=f,
        file_name="document.pdf",
        scope=FileScope.USER,
        user_id="user@example.com",
        tags="documentation"
    )

Wait for File Processing

import time
from lumnisai import ProcessingStatus
 
# Upload file
file_result = client.upload_file(
    file_path="data.csv",
    user_id="user@example.com"
)
 
# Wait for processing to complete
while True:
    status = client.get_file_status(
        file_result.file_id,
        user_id="user@example.com"
    )
    
    if status.status == ProcessingStatus.COMPLETED:
        print("File processing complete!")
        break
    elif status.status == ProcessingStatus.ERROR:
        print(f"Processing failed: {status.error_message}")
        break
    
    print(f"Processing: {status.progress_percentage}%")
    time.sleep(2)

Search Files

# Search across files with semantic understanding
results = client.search_files(
    query="machine learning algorithms",
    user_id="user@example.com",
    limit=10,
    min_score=0.7,
    file_types=["pdf", "md"],
    tags=["research"]
)
 
print(f"Found {results.total_count} results")
 
for result in results.results:
    print(f"\nFile: {result.file.file_name}")
    print(f"Score: {result.overall_score:.2f}")
    print(f"Type: {result.file.file_type}")
    
    # Access matching chunks
    if result.chunks:
        print(f"Excerpt: {result.chunks[0].chunk_text[:200]}...")

List Files

from lumnisai import FileScope, ProcessingStatus
 
# List files with filters
files = client.list_files(
    user_id="user@example.com",
    scope=FileScope.TENANT,
    file_type="pdf",
    status=ProcessingStatus.COMPLETED,
    tags=["important"],
    page=1,
    limit=20
)
 
print(f"Total files: {files.total_count}")
 
for file in files.files:
    print(f"\n{file.file_name}")
    print(f"  Type: {file.file_type}")
    print(f"  Status: {file.processing_status}")
    print(f"  Chunks: {file.chunks_embedded}/{file.total_chunks}")
    if file.tags:
        print(f"  Tags: {', '.join(file.tags)}")

File Operations

Get File Metadata

# Get file details
file = client.get_file(
    file_id="file-id",
    user_id="user@example.com"
)
 
print(f"File: {file.file_name}")
print(f"Size: {file.file_size} bytes")
print(f"Type: {file.file_type}")
print(f"Status: {file.processing_status}")
print(f"Uploaded: {file.created_at}")

Get File Content

from lumnisai import ContentType
 
# Get text content
content = client.get_file_content(
    file_id="file-id",
    user_id="user@example.com",
    content_type=ContentType.TEXT,
    start_line=1,
    end_line=100
)
 
print(content.text)
 
# Get markdown content
markdown = client.get_file_content(
    file_id="file-id",
    user_id="user@example.com",
    content_type=ContentType.MARKDOWN
)
 
print(markdown.text)

Download File

# Download original file
file_bytes = client.download_file(
    file_id="file-id",
    user_id="user@example.com"
)
 
# Save to disk
with open("downloaded_file.pdf", "wb") as f:
    f.write(file_bytes)
 
# Or specify save path directly
client.download_file(
    file_id="file-id",
    user_id="user@example.com",
    save_path="local_file.pdf"
)

Delete Files

# Delete single file
client.delete_file(
    file_id="file-id",
    user_id="user@example.com",
    hard_delete=True
)
 
# Delete multiple files (using files resource)
async with client:
    await client.files.bulk_delete(
        file_ids=["id1", "id2", "id3"],
        hard_delete=True
    )

Complete File Workflow Example

from lumnisai import Client, FileScope, ProcessingStatus, display_progress
from pathlib import Path
import time
 
# Initialize client
client = Client(api_key="your-api-key")
 
# Create user
user = client.create_user(
    email="analyst@example.com",
    first_name="Data",
    last_name="Analyst"
)
 
# Upload file
print("Uploading file...")
file = client.upload_file(
    file_path="data.csv",
    scope=FileScope.USER,
    user_id=user.email,
    tags=["analysis", "important"]
)
 
print(f"File uploaded: {file.file_id}")
 
# Wait for processing
print("Waiting for processing...")
while True:
    status = client.get_file_status(file.file_id, user_id=user.email)
    
    if status.status == ProcessingStatus.COMPLETED:
        print("Processing complete!")
        break
    elif status.status == ProcessingStatus.ERROR:
        print(f"Error: {status.error_message}")
        break
    
    print(f"Progress: {status.progress_percentage}%")
    time.sleep(2)
 
# Search the file content
print("\nSearching file content...")
search_results = client.search_files(
    query="important metrics",
    user_id=user.email,
    limit=5,
    min_score=0.5
)
 
print(f"Found {search_results.total_count} results")
for result in search_results.results:
    print(f"\n{result.file.file_name} (score: {result.overall_score:.2f})")
    if result.chunks:
        print(f"Excerpt: {result.chunks[0].chunk_text[:150]}...")
 
# Create AI response with file context
print("\nAnalyzing file with AI...")
for update in client.invoke(
    "Analyze the uploaded CSV and provide insights",
    stream=True,
    user_id=user.email
):
    display_progress(update)
    
    if update.state == "completed":
        print(f"\n\nAnalysis:\n{update.output_text}")
 
# List all user's files
files = client.list_files(user_id=user.email)
print(f"\nUser has {files.total_count} files")

Bulk File Operations

Upload Multiple Files

import asyncio
from pathlib import Path
from lumnisai import AsyncClient
 
async def upload_directory(directory_path: str):
    client = AsyncClient(api_key="your-api-key")
    
    async with client:
        # Get all files
        files = list(Path(directory_path).rglob("*.pdf"))
        
        # Upload in batches
        batch_size = 5
        uploaded = []
        
        for i in range(0, len(files), batch_size):
            batch = files[i:i+batch_size]
            
            # Upload batch concurrently
            tasks = [
                client.upload_file(
                    file_path=str(f),
                    user_id="user@example.com"
                )
                for f in batch
            ]
            
            results = await asyncio.gather(*tasks)
            uploaded.extend(results)
            
            print(f"Uploaded {len(uploaded)}/{len(files)} files")
            await asyncio.sleep(1)
        
        return uploaded
 
# Run upload
uploaded_files = asyncio.run(upload_directory("./documents"))
print(f"Total uploaded: {len(uploaded_files)}")

Best Practices

Tag Files for Organization

# Use tags to organize files
client.upload_file(
    file_path="report.pdf",
    user_id="user@example.com",
    tags=["Q1-2025", "finance", "confidential"]
)
 
# Search by tags
files = client.list_files(
    user_id="user@example.com",
    tags=["Q1-2025", "finance"]
)

Handle Processing Errors

from lumnisai import ProcessingStatus
 
file = client.upload_file(
    file_path="document.pdf",
    user_id="user@example.com"
)
 
status = client.get_file_status(file.file_id, user_id="user@example.com")
 
if status.status == ProcessingStatus.ERROR:
    print(f"Processing failed: {status.error_message}")
    # Handle error (e.g., retry, notify user)
elif status.status == ProcessingStatus.COMPLETED:
    print(f"File ready! Chunks: {status.chunks_embedded}")

Use Appropriate Scope

from lumnisai import FileScope
 
# User-specific files
client.upload_file(
    file_path="user_data.csv",
    scope=FileScope.USER,
    user_id="user@example.com"
)
 
# Tenant-wide files (accessible to all users)
client.upload_file(
    file_path="company_handbook.pdf",
    scope=FileScope.TENANT
)