Skip to main content

Overview

This guide demonstrates how to process large datasets with Adaptive’s intelligent routing. Handle thousands of requests efficiently with automatic model selection and cost optimization. Key Benefits:
  • Intelligent model routing for each request
  • Cost optimization through provider selection
  • Automatic fallback when providers fail
  • Simple batch processing patterns

Prerequisites

  • Python 3.8+ or Node.js 18+
  • Adaptive API key

Installation

pip install openai aiohttp

Basic Batch Processing

Python Implementation

import asyncio
import openai
import json
from typing import List, Dict

# Configure Adaptive client
client = openai.AsyncOpenAI(
    api_key="your-adaptive-api-key",
    base_url="https://api.llmadaptive.uk/v1"
)

async def process_batch(items: List[Dict]) -> List[Dict]:
    """Process a batch of items with Adaptive."""

    async def process_item(item: Dict) -> Dict:
        try:
            response = await client.chat.completions.create(
                model="",  # Intelligent routing
                messages=[{"role": "user", "content": item["prompt"]}],
                max_tokens=1000,
                temperature=0.7
            )

            return {
                "id": item["id"],
                "status": "completed",
                "result": response.choices[0].message.content,
                "model_used": response.model  # See which model was selected
            }

        except Exception as e:
            return {
                "id": item["id"],
                "status": "failed",
                "error": str(e)
            }

    # Process all items concurrently
    tasks = [process_item(item) for item in items]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Handle any exceptions
    processed_results = []
    for result in results:
        if isinstance(result, Exception):
            processed_results.append({
                "id": "unknown",
                "status": "failed",
                "error": str(result)
            })
        else:
            processed_results.append(result)

    return processed_results

# Usage
async def main():
    # Sample data to process
    items = [
        {"id": "1", "prompt": "Summarize this article about AI..."},
        {"id": "2", "prompt": "Analyze the sentiment of this review..."},
        {"id": "3", "prompt": "Translate this text to Spanish..."},
    ]

    results = await process_batch(items)

    # Show results
    for result in results:
        if result["status"] == "completed":
            print(f"Item {result['id']}: {result['result'][:100]}...")
            print(f"Model used: {result.get('model_used', 'unknown')}")
        else:
            print(f"Item {result['id']} failed: {result['error']}")

if __name__ == "__main__":
    asyncio.run(main())

Node.js Implementation

const OpenAI = require('openai');

// Configure Adaptive client
const client = new OpenAI({
  apiKey: 'your-adaptive-api-key',
  baseURL: 'https://api.llmadaptive.uk/v1'
});

async function processBatch(items) {
  const results = [];

  // Process items concurrently
  const promises = items.map(async (item) => {
    try {
      const response = await client.chat.completions.create({
        model: '', // Intelligent routing
        messages: [{ role: 'user', content: item.prompt }],
        maxTokens: 1000,
        temperature: 0.7
      });

      return {
        id: item.id,
        status: 'completed',
        result: response.choices[0].message.content,
        modelUsed: response.model // See which model was selected
      };

    } catch (error) {
      return {
        id: item.id,
        status: 'failed',
        error: error.message
      };
    }
  });

  const batchResults = await Promise.allSettled(promises);

  // Process results
  for (const promiseResult of batchResults) {
    if (promiseResult.status === 'fulfilled') {
      results.push(promiseResult.value);
    } else {
      results.push({
        id: 'unknown',
        status: 'failed',
        error: promiseResult.reason.message
      });
    }
  }

  return results;
}

// Usage
async function main() {
  const items = [
    { id: '1', prompt: 'Summarize this article about AI...' },
    { id: '2', prompt: 'Analyze the sentiment of this review...' },
    { id: '3', prompt: 'Translate this text to Spanish...' }
  ];

  const results = await processBatch(items);

  results.forEach(result => {
    if (result.status === 'completed') {
      console.log(`Item ${result.id}: ${result.result.substring(0, 100)}...`);
      console.log(`Model used: ${result.modelUsed}`);
    } else {
      console.log(`Item ${result.id} failed: ${result.error}`);
    }
  });
}

main().catch(console.error);

Error Handling with Adaptive

import asyncio
import logging
from typing import List, Dict

logger = logging.getLogger(__name__)

class AdaptiveBatchProcessor:
    """Batch processor with Adaptive-aware error handling."""

    def __init__(self, api_key: str, max_retries: int = 3):
        self.client = openai.AsyncOpenAI(
            api_key=api_key,
            base_url="https://api.llmadaptive.uk/v1"
        )
        self.max_retries = max_retries

    async def process_batch_with_retry(self, items: List[Dict]) -> List[Dict]:
        """Process batch with intelligent retry logic."""

        async def process_with_retry(item: Dict) -> Dict:
            for attempt in range(self.max_retries):
                try:
                    response = await self.client.chat.completions.create(
                        model="",  # Adaptive handles routing
                        messages=[{"role": "user", "content": item["prompt"]}],
                        max_tokens=1000
                    )

                    return {
                        "id": item["id"],
                        "status": "completed",
                        "result": response.choices[0].message.content,
                        "attempts": attempt + 1
                    }

                except Exception as e:
                    logger.warning(f"Attempt {attempt + 1} failed for item {item['id']}: {e}")

                    # Adaptive automatically tries different providers
                    # Just wait before retry if needed
                    if attempt < self.max_retries - 1:
                        await asyncio.sleep(2 ** attempt)  # Exponential backoff

            return {
                "id": item["id"],
                "status": "failed",
                "error": f"Failed after {self.max_retries} attempts",
                "attempts": self.max_retries
            }

        # Process all items
        tasks = [process_with_retry(item) for item in items]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Handle exceptions
        final_results = []
        for result in results:
            if isinstance(result, Exception):
                final_results.append({
                    "id": "unknown",
                    "status": "failed",
                    "error": str(result)
                })
            else:
                final_results.append(result)

        return final_results

# Usage
async def main():
    processor = AdaptiveBatchProcessor("your-adaptive-api-key")

    items = [
        {"id": "1", "prompt": "Analyze this data..."},
        {"id": "2", "prompt": "Summarize this report..."},
    ]

    results = await processor.process_batch_with_retry(items)

    successful = [r for r in results if r["status"] == "completed"]
    failed = [r for r in results if r["status"] == "failed"]

    print(f"Processed: {len(successful)} successful, {len(failed)} failed")

if __name__ == "__main__":
    asyncio.run(main())

Progress Tracking

import time
from typing import List, Dict

class BatchProgressTracker:
    """Simple progress tracker for batch processing."""

    def __init__(self, total_items: int):
        self.total = total_items
        self.completed = 0
        self.failed = 0
        self.start_time = time.time()

    def update(self, results: List[Dict]):
        """Update progress from results."""
        self.completed = sum(1 for r in results if r.get("status") == "completed")
        self.failed = sum(1 for r in results if r.get("status") == "failed")

    def get_progress(self) -> Dict:
        """Get current progress."""
        elapsed = time.time() - self.start_time
        progress = (self.completed + self.failed) / self.total if self.total > 0 else 0

        return {
            "completed": self.completed,
            "failed": self.failed,
            "total": self.total,
            "progress_percent": round(progress * 100, 1),
            "elapsed_seconds": round(elapsed, 1)
        }

# Usage
tracker = BatchProgressTracker(len(items))

# After processing a batch
tracker.update(results)
progress = tracker.get_progress()
print(f"Progress: {progress['progress_percent']}% ({progress['completed']}/{progress['total']})")

What You Get with Adaptive

Intelligent Routing

Each request gets the optimal model automatically

Cost Optimization

Significant savings through smart provider selection

Provider Resilience

Automatic fallback when providers fail

Simple Integration

Works with standard OpenAI SDK patterns

Environment Variables

ADAPTIVE_API_KEY=your-adaptive-api-key

Next Steps