Overview
This guide demonstrates how to process large datasets with Adaptive’s intelligent routing. Handle thousands of requests efficiently with automatic model selection and cost optimization. Key Benefits:- Intelligent model routing for each request
- Cost optimization through provider selection
- Automatic fallback when providers fail
- Simple batch processing patterns
Prerequisites
- Python 3.8+ or Node.js 18+
- Adaptive API key
Installation
Copy
pip install openai aiohttp
Basic Batch Processing
Python Implementation
Copy
import asyncio
import openai
import json
from typing import List, Dict
# Configure Adaptive client
client = openai.AsyncOpenAI(
api_key="your-adaptive-api-key",
base_url="https://api.llmadaptive.uk/v1"
)
async def process_batch(items: List[Dict]) -> List[Dict]:
"""Process a batch of items with Adaptive."""
async def process_item(item: Dict) -> Dict:
try:
response = await client.chat.completions.create(
model="", # Intelligent routing
messages=[{"role": "user", "content": item["prompt"]}],
max_tokens=1000,
temperature=0.7
)
return {
"id": item["id"],
"status": "completed",
"result": response.choices[0].message.content,
"model_used": response.model # See which model was selected
}
except Exception as e:
return {
"id": item["id"],
"status": "failed",
"error": str(e)
}
# Process all items concurrently
tasks = [process_item(item) for item in items]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle any exceptions
processed_results = []
for result in results:
if isinstance(result, Exception):
processed_results.append({
"id": "unknown",
"status": "failed",
"error": str(result)
})
else:
processed_results.append(result)
return processed_results
# Usage
async def main():
# Sample data to process
items = [
{"id": "1", "prompt": "Summarize this article about AI..."},
{"id": "2", "prompt": "Analyze the sentiment of this review..."},
{"id": "3", "prompt": "Translate this text to Spanish..."},
]
results = await process_batch(items)
# Show results
for result in results:
if result["status"] == "completed":
print(f"Item {result['id']}: {result['result'][:100]}...")
print(f"Model used: {result.get('model_used', 'unknown')}")
else:
print(f"Item {result['id']} failed: {result['error']}")
if __name__ == "__main__":
asyncio.run(main())
Node.js Implementation
Copy
const OpenAI = require('openai');
// Configure Adaptive client
const client = new OpenAI({
apiKey: 'your-adaptive-api-key',
baseURL: 'https://api.llmadaptive.uk/v1'
});
async function processBatch(items) {
const results = [];
// Process items concurrently
const promises = items.map(async (item) => {
try {
const response = await client.chat.completions.create({
model: '', // Intelligent routing
messages: [{ role: 'user', content: item.prompt }],
maxTokens: 1000,
temperature: 0.7
});
return {
id: item.id,
status: 'completed',
result: response.choices[0].message.content,
modelUsed: response.model // See which model was selected
};
} catch (error) {
return {
id: item.id,
status: 'failed',
error: error.message
};
}
});
const batchResults = await Promise.allSettled(promises);
// Process results
for (const promiseResult of batchResults) {
if (promiseResult.status === 'fulfilled') {
results.push(promiseResult.value);
} else {
results.push({
id: 'unknown',
status: 'failed',
error: promiseResult.reason.message
});
}
}
return results;
}
// Usage
async function main() {
const items = [
{ id: '1', prompt: 'Summarize this article about AI...' },
{ id: '2', prompt: 'Analyze the sentiment of this review...' },
{ id: '3', prompt: 'Translate this text to Spanish...' }
];
const results = await processBatch(items);
results.forEach(result => {
if (result.status === 'completed') {
console.log(`Item ${result.id}: ${result.result.substring(0, 100)}...`);
console.log(`Model used: ${result.modelUsed}`);
} else {
console.log(`Item ${result.id} failed: ${result.error}`);
}
});
}
main().catch(console.error);
Error Handling with Adaptive
Copy
import asyncio
import logging
from typing import List, Dict
logger = logging.getLogger(__name__)
class AdaptiveBatchProcessor:
"""Batch processor with Adaptive-aware error handling."""
def __init__(self, api_key: str, max_retries: int = 3):
self.client = openai.AsyncOpenAI(
api_key=api_key,
base_url="https://api.llmadaptive.uk/v1"
)
self.max_retries = max_retries
async def process_batch_with_retry(self, items: List[Dict]) -> List[Dict]:
"""Process batch with intelligent retry logic."""
async def process_with_retry(item: Dict) -> Dict:
for attempt in range(self.max_retries):
try:
response = await self.client.chat.completions.create(
model="", # Adaptive handles routing
messages=[{"role": "user", "content": item["prompt"]}],
max_tokens=1000
)
return {
"id": item["id"],
"status": "completed",
"result": response.choices[0].message.content,
"attempts": attempt + 1
}
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed for item {item['id']}: {e}")
# Adaptive automatically tries different providers
# Just wait before retry if needed
if attempt < self.max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
return {
"id": item["id"],
"status": "failed",
"error": f"Failed after {self.max_retries} attempts",
"attempts": self.max_retries
}
# Process all items
tasks = [process_with_retry(item) for item in items]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle exceptions
final_results = []
for result in results:
if isinstance(result, Exception):
final_results.append({
"id": "unknown",
"status": "failed",
"error": str(result)
})
else:
final_results.append(result)
return final_results
# Usage
async def main():
processor = AdaptiveBatchProcessor("your-adaptive-api-key")
items = [
{"id": "1", "prompt": "Analyze this data..."},
{"id": "2", "prompt": "Summarize this report..."},
]
results = await processor.process_batch_with_retry(items)
successful = [r for r in results if r["status"] == "completed"]
failed = [r for r in results if r["status"] == "failed"]
print(f"Processed: {len(successful)} successful, {len(failed)} failed")
if __name__ == "__main__":
asyncio.run(main())
Progress Tracking
Copy
import time
from typing import List, Dict
class BatchProgressTracker:
"""Simple progress tracker for batch processing."""
def __init__(self, total_items: int):
self.total = total_items
self.completed = 0
self.failed = 0
self.start_time = time.time()
def update(self, results: List[Dict]):
"""Update progress from results."""
self.completed = sum(1 for r in results if r.get("status") == "completed")
self.failed = sum(1 for r in results if r.get("status") == "failed")
def get_progress(self) -> Dict:
"""Get current progress."""
elapsed = time.time() - self.start_time
progress = (self.completed + self.failed) / self.total if self.total > 0 else 0
return {
"completed": self.completed,
"failed": self.failed,
"total": self.total,
"progress_percent": round(progress * 100, 1),
"elapsed_seconds": round(elapsed, 1)
}
# Usage
tracker = BatchProgressTracker(len(items))
# After processing a batch
tracker.update(results)
progress = tracker.get_progress()
print(f"Progress: {progress['progress_percent']}% ({progress['completed']}/{progress['total']})")
What You Get with Adaptive
Intelligent Routing
Each request gets the optimal model automatically
Cost Optimization
Significant savings through smart provider selection
Provider Resilience
Automatic fallback when providers fail
Simple Integration
Works with standard OpenAI SDK patterns
Environment Variables
Copy
ADAPTIVE_API_KEY=your-adaptive-api-key



