Streaming Chat Implementation
This example demonstrates how to implement real-time streaming chat using Adaptive’s OpenAI-compatible streaming API.Python Streaming Example
Copy
import openai
client = openai.OpenAI(
base_url="https://www.llmadaptive.uk/api/v1",
api_key="your-adaptive-api-key"
)
def stream_chat(message):
stream = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": message}],
stream=True
)
print("AI: ", end="", flush=True)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
print() # New line after complete response
# Example usage
stream_chat("Explain quantum computing in simple terms")
JavaScript/React Streaming
Copy
import OpenAI from 'openai';
const client = new OpenAI({
baseURL: 'https://www.llmadaptive.uk/api/v1',
apiKey: 'your-adaptive-api-key',
});
async function streamChat(message, onChunk) {
const stream = await client.chat.completions.create({
model: 'gpt-3.5-turbo',
messages: [{ role: 'user', content: message }],
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
if (content) {
onChunk(content);
}
}
}
// React component example
function StreamingChat() {
const [message, setMessage] = useState('');
const [response, setResponse] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const handleSubmit = async (e) => {
e.preventDefault();
setResponse('');
setIsStreaming(true);
await streamChat(message, (chunk) => {
setResponse(prev => prev + chunk);
});
setIsStreaming(false);
};
return (
<div>
<form onSubmit={handleSubmit}>
<input
value={message}
onChange={(e) => setMessage(e.target.value)}
placeholder="Ask a question..."
disabled={isStreaming}
/>
<button type="submit" disabled={isStreaming}>
{isStreaming ? 'Streaming...' : 'Send'}
</button>
</form>
<div className="response">
{response}
{isStreaming && <span className="cursor">|</span>}
</div>
</div>
);
}
Server-Sent Events (SSE)
Copy
// Express.js server endpoint
app.post('/api/chat/stream', async (req, res) => {
const { message } = req.body;
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
});
try {
const stream = await client.chat.completions.create({
model: 'gpt-3.5-turbo',
messages: [{ role: 'user', content: message }],
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
if (content) {
res.write(`data: ${JSON.stringify({ content })}\n\n`);
}
}
res.write('data: [DONE]\n\n');
res.end();
} catch (error) {
res.write(`data: ${JSON.stringify({ error: error.message })}\n\n`);
res.end();
}
});
Vercel AI SDK Integration
Copy
import { openai } from '@ai-sdk/openai';
import { streamText } from 'ai';
// Configure Vercel AI SDK with Adaptive
const adaptiveOpenAI = openai({
baseURL: 'https://www.llmadaptive.uk/api/v1',
apiKey: process.env.ADAPTIVE_API_KEY,
});
export async function POST(req: Request) {
const { messages } = await req.json();
const result = await streamText({
model: adaptiveOpenAI('gpt-3.5-turbo'),
messages,
});
return result.toAIStreamResponse();
}
Advanced Streaming Features
Message History with Streaming
Copy
class StreamingChatBot:
def __init__(self, api_key):
self.client = openai.OpenAI(
base_url="https://www.llmadaptive.uk/api/v1",
api_key=api_key
)
self.conversation = []
def add_message(self, role, content):
self.conversation.append({"role": role, "content": content})
def stream_response(self):
stream = self.client.chat.completions.create(
model="gpt-3.5-turbo",
messages=self.conversation,
stream=True
)
response_content = ""
for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
response_content += content
yield content
# Add assistant response to conversation
self.add_message("assistant", response_content)
# Usage
bot = StreamingChatBot("your-adaptive-api-key")
bot.add_message("user", "What's the weather like?")
print("AI: ", end="", flush=True)
for chunk in bot.stream_response():
print(chunk, end="", flush=True)
print()
Error Handling for Streams
Copy
import openai
import time
def robust_stream_chat(message, max_retries=3):
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": message}],
stream=True,
timeout=30
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
yield chunk.choices[0].delta.content
return
except openai.APITimeoutError:
print(f"Timeout on attempt {attempt + 1}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue
except openai.APIError as e:
print(f"API error: {e}")
break
yield "Sorry, I'm having trouble connecting right now."
Performance Benefits
Streaming Advantages:- Faster Perceived Response: Users see content immediately
- Lower Memory Usage: Process chunks instead of full response
- Better UX: Real-time feedback during generation
- Cost Efficient: Only pay for tokens actually used
- Smart Model Selection: Automatically uses fastest appropriate model
- Optimized Routing: Reduced latency through intelligent provider selection
- Fallback Protection: Automatic switching if streaming fails