Streaming Chat Implementation

This example demonstrates how to implement real-time streaming chat using Adaptive’s OpenAI-compatible streaming API.

Python Streaming Example

import openai

client = openai.OpenAI(
    base_url="https://www.llmadaptive.uk/api/v1",
    api_key="your-adaptive-api-key"
)

def stream_chat(message):
    stream = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": message}],
        stream=True
    )
    
    print("AI: ", end="", flush=True)
    for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            print(chunk.choices[0].delta.content, end="", flush=True)
    print()  # New line after complete response

# Example usage
stream_chat("Explain quantum computing in simple terms")

JavaScript/React Streaming

import OpenAI from 'openai';

const client = new OpenAI({
  baseURL: 'https://www.llmadaptive.uk/api/v1',
  apiKey: 'your-adaptive-api-key',
});

async function streamChat(message, onChunk) {
  const stream = await client.chat.completions.create({
    model: 'gpt-3.5-turbo',
    messages: [{ role: 'user', content: message }],
    stream: true,
  });

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content || '';
    if (content) {
      onChunk(content);
    }
  }
}

// React component example
function StreamingChat() {
  const [message, setMessage] = useState('');
  const [response, setResponse] = useState('');
  const [isStreaming, setIsStreaming] = useState(false);

  const handleSubmit = async (e) => {
    e.preventDefault();
    setResponse('');
    setIsStreaming(true);

    await streamChat(message, (chunk) => {
      setResponse(prev => prev + chunk);
    });

    setIsStreaming(false);
  };

  return (
    <div>
      <form onSubmit={handleSubmit}>
        <input
          value={message}
          onChange={(e) => setMessage(e.target.value)}
          placeholder="Ask a question..."
          disabled={isStreaming}
        />
        <button type="submit" disabled={isStreaming}>
          {isStreaming ? 'Streaming...' : 'Send'}
        </button>
      </form>
      
      <div className="response">
        {response}
        {isStreaming && <span className="cursor">|</span>}
      </div>
    </div>
  );
}

Server-Sent Events (SSE)

// Express.js server endpoint
app.post('/api/chat/stream', async (req, res) => {
  const { message } = req.body;
  
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    'Access-Control-Allow-Origin': '*',
  });

  try {
    const stream = await client.chat.completions.create({
      model: 'gpt-3.5-turbo',
      messages: [{ role: 'user', content: message }],
      stream: true,
    });

    for await (const chunk of stream) {
      const content = chunk.choices[0]?.delta?.content || '';
      if (content) {
        res.write(`data: ${JSON.stringify({ content })}\n\n`);
      }
    }

    res.write('data: [DONE]\n\n');
    res.end();
  } catch (error) {
    res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
    res.end();
  }
});

Vercel AI SDK Integration

import { openai } from '@ai-sdk/openai';
import { streamText } from 'ai';

// Configure Vercel AI SDK with Adaptive
const adaptiveOpenAI = openai({
  baseURL: 'https://www.llmadaptive.uk/api/v1',
  apiKey: process.env.ADAPTIVE_API_KEY,
});

export async function POST(req: Request) {
  const { messages } = await req.json();

  const result = await streamText({
    model: adaptiveOpenAI('gpt-3.5-turbo'),
    messages,
  });

  return result.toAIStreamResponse();
}

Advanced Streaming Features

Message History with Streaming

class StreamingChatBot:
    def __init__(self, api_key):
        self.client = openai.OpenAI(
            base_url="https://www.llmadaptive.uk/api/v1",
            api_key=api_key
        )
        self.conversation = []
    
    def add_message(self, role, content):
        self.conversation.append({"role": role, "content": content})
    
    def stream_response(self):
        stream = self.client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=self.conversation,
            stream=True
        )
        
        response_content = ""
        for chunk in stream:
            if chunk.choices[0].delta.content is not None:
                content = chunk.choices[0].delta.content
                response_content += content
                yield content
        
        # Add assistant response to conversation
        self.add_message("assistant", response_content)

# Usage
bot = StreamingChatBot("your-adaptive-api-key")
bot.add_message("user", "What's the weather like?")

print("AI: ", end="", flush=True)
for chunk in bot.stream_response():
    print(chunk, end="", flush=True)
print()

Error Handling for Streams

import openai
import time

def robust_stream_chat(message, max_retries=3):
    for attempt in range(max_retries):
        try:
            stream = client.chat.completions.create(
                model="gpt-3.5-turbo",
                messages=[{"role": "user", "content": message}],
                stream=True,
                timeout=30
            )
            
            for chunk in stream:
                if chunk.choices[0].delta.content is not None:
                    yield chunk.choices[0].delta.content
            return
            
        except openai.APITimeoutError:
            print(f"Timeout on attempt {attempt + 1}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # Exponential backoff
            continue
        except openai.APIError as e:
            print(f"API error: {e}")
            break
    
    yield "Sorry, I'm having trouble connecting right now."

Performance Benefits

Streaming Advantages:
  • Faster Perceived Response: Users see content immediately
  • Lower Memory Usage: Process chunks instead of full response
  • Better UX: Real-time feedback during generation
  • Cost Efficient: Only pay for tokens actually used
With Adaptive:
  • Smart Model Selection: Automatically uses fastest appropriate model
  • Optimized Routing: Reduced latency through intelligent provider selection
  • Fallback Protection: Automatic switching if streaming fails

Integration Examples