Claude Streaming API — Real-time response cho ứng dụng chat
Điểm nổi bật
Nhấn để đến mục tương ứng
- 1 Bước tiếp theo là tích hợp streaming vào ứng dụng chat của bạn và tối ưu trải nghiệm người dùng.
- 2 Với React, dùng ref thay vì state cho text accumulation Markdown rendering: Nếu response chứa Markdown, cần render incremental.
- 3 Có ba lý do chính khiến streaming trở thành tiêu chuẩn cho ứng dụng AI chat: Time to First Token (TTFT): Với non-streaming, người dùng không thấy gì cho đến khi toàn bộ response hoàn tất (có thể 10-30 giây).
- 4 Streaming API giải quyết vấn đề này bằng cách gửi từng phần (token) của câu trả lời ngay khi chúng được tạo ra, giống như cách ChatGPT hay Claude.ai hiển thị text "chạy" từ từ trên màn hình.
- 5 Khác biệt với WebSocket (two-way communication), SSE là one-way: server gửi events đến client.
Khi người dùng gửi một câu hỏi cho Claude, họ không muốn đợi 10-30 giây để nhận toàn bộ câu trả lời. Streaming API giải quyết vấn đề này bằng cách gửi từng phần (token) của câu trả lời ngay khi chúng được tạo ra, giống như cách ChatGPT hay Claude.ai hiển thị text "chạy" từ từ trên màn hình. Bài viết này hướng dẫn bạn triển khai Streaming API từ backend đến frontend.
Tại sao cần Streaming?
Có ba lý do chính khiến streaming trở thành tiêu chuẩn cho ứng dụng AI chat:
- Time to First Token (TTFT): Với non-streaming, người dùng không thấy gì cho đến khi toàn bộ response hoàn tất (có thể 10-30 giây). Với streaming, token đầu tiên xuất hiện trong 0.5-2 giây, tạo cảm giác phản hồi tức thì
- Perceived performance: Dù tổng thời gian tạo response là như nhau, streaming khiến người dùng cảm thấy ứng dụng nhanh hơn vì họ bắt đầu đọc ngay khi text xuất hiện
- Early abort: Người dùng có thể đọc phần đầu response và hủy nếu câu trả lời không đúng hướng, tiết kiệm token và chi phí API
Server-Sent Events (SSE) — Nền tảng của Streaming
Claude Streaming API sử dụng Server-Sent Events (SSE), một giao thức web chuẩn cho phép server push data đến client qua HTTP connection đơn hướng.
Khác biệt với WebSocket (two-way communication), SSE là one-way: server gửi events đến client. Điều này phù hợp hoàn hảo cho AI chat vì client gửi 1 request và server stream response về.
Cấu trúc SSE event từ Claude API
Claude API gửi các event types sau trong quá trình streaming:
- message_start: Bắt đầu message, chứa metadata (model, usage)
- content_block_start: Bắt đầu một content block (text hoặc tool_use)
- content_block_delta: Phần nội dung tiếp theo (delta text)
- content_block_stop: Kết thúc content block
- message_delta: Metadata cuối message (stop_reason, usage)
- message_stop: Kết thúc toàn bộ message
Triển khai với Python
Cài đặt
pip install anthropic
Streaming cơ bản với Python SDK
import anthropic
client = anthropic.Anthropic()
# Streaming cơ bản
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{"role": "user", "content": "Giải thích blockchain trong 5 câu."}
]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print() # Newline sau khi stream kết thúc
Streaming với event handling chi tiết
import anthropic
client = anthropic.Anthropic()
# Xử lý từng event type
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{"role": "user", "content": "Viết đoạn code Python sort algorithm."}
]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.type == "message_start":
print(f"[Model: {event.message.model}]")
elif event.type == "message_delta":
print(f"
[Tokens used: {event.usage.output_tokens}]")
elif event.type == "message_stop":
print("
[Stream completed]")
Streaming trong FastAPI backend
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
import json
app = FastAPI()
client = anthropic.Anthropic()
async def generate_stream(user_message: str):
"""Generator function that yields SSE events to client."""
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=2048,
messages=[
{"role": "user", "content": user_message}
]
) as stream:
for text in stream.text_stream:
# Format as SSE event
data = json.dumps({"type": "text", "content": text})
yield f"data: {data}
"
# Signal stream end
yield f"data: {json.dumps({'type': 'done'})}
"
@app.post("/api/chat")
async def chat(request: dict):
user_message = request.get("message", "")
return StreamingResponse(
generate_stream(user_message),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
}
)
Triển khai với Node.js
Cài đặt
npm install @anthropic-ai/sdk
Streaming cơ bản với Node.js SDK
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
async function main() {
const stream = client.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 1024,
messages: [
{ role: "user", content: "Giải thích AI trong 5 câu." }
]
});
// Event-based handling
stream.on("text", (text) => {
process.stdout.write(text);
});
stream.on("message", (message) => {
console.log("
[Total tokens:", message.usage.output_tokens, "]");
});
// Wait for stream to finish
const finalMessage = await stream.finalMessage();
console.log("[Stop reason:", finalMessage.stop_reason, "]");
}
main();
Streaming trong Express.js backend
import express from "express";
import Anthropic from "@anthropic-ai/sdk";
const app = express();
const client = new Anthropic();
app.use(express.json());
app.post("/api/chat", async (req, res) => {
const { message } = req.body;
// Set SSE headers
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no");
try {
const stream = client.messages.stream({
model: "claude-sonnet-4-20250514",
max_tokens: 2048,
messages: [{ role: "user", content: message }]
});
stream.on("text", (text) => {
const data = JSON.stringify({ type: "text", content: text });
res.write("data: " + data + "\n\n");
});
stream.on("error", (error) => {
const data = JSON.stringify({
type: "error",
content: error.message
});
res.write("data: " + data + "\n\n");
res.end();
});
stream.on("end", () => {
res.write("data: " + JSON.stringify({ type: "done" }) + "\n\n");
res.end();
});
// Handle client disconnect
req.on("close", () => {
stream.abort();
});
} catch (error) {
res.write("data: " + JSON.stringify({
type: "error",
content: error.message
}) + "\n\n");
res.end();
}
});
app.listen(3000, () => console.log("Server running on port 3000"));
Frontend: Render streaming response
Vanilla JavaScript với EventSource
// Frontend code - kết nối với SSE endpoint
async function sendMessage(userMessage) {
const responseDiv = document.getElementById("response");
responseDiv.textContent = "";
const response = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message: userMessage })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split("
");
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = JSON.parse(line.slice(6));
if (data.type === "text") {
responseDiv.textContent += data.content;
} else if (data.type === "done") {
console.log("Stream completed");
} else if (data.type === "error") {
responseDiv.textContent += "
[Error: " + data.content + "]";
}
}
}
}
}
UI rendering patterns
Khi render streaming text, có một số patterns quan trọng để UI mượt mà:
- Append-only rendering: Chỉ append text mới, không re-render toàn bộ nội dung mỗi khi nhận token mới. Với React, dùng ref thay vì state cho text accumulation
- Markdown rendering: Nếu response chứa Markdown, cần render incremental. Thư viện như marked hoặc markdown-it có thể parse partial Markdown
- Auto-scroll: Tự động scroll xuống cuối khi text mới xuất hiện, nhưng dừng auto-scroll nếu user đã scroll lên để đọc
- Cursor animation: Hiển thị blinking cursor ở cuối text đang stream để cho thấy response chưa hoàn tất
Xử lý lỗi Mid-Stream (Error Recovery)
Streaming có thể bị gián đoạn giữa chừng do network issues, rate limiting, hoặc server errors. Cần xử lý graceful.
Các loại lỗi thường gặp
- Network disconnect: Mất kết nối internet giữa chừng
- Rate limiting (429): Vượt quá giới hạn requests per minute
- Overloaded (529): Server Claude đang quá tải
- Timeout: Response quá dài, vượt quá timeout setting
Strategy xử lý lỗi
import anthropic
import time
def stream_with_retry(messages, max_retries=3):
"""Stream with exponential backoff retry."""
client = anthropic.Anthropic()
accumulated_text = ""
for attempt in range(max_retries):
try:
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=messages
) as stream:
for text in stream.text_stream:
accumulated_text += text
yield text
# Stream completed successfully
return
except anthropic.APIStatusError as e:
if e.status_code == 429:
# Rate limited - wait and retry
wait_time = 2 ** attempt
print(f"
Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
elif e.status_code == 529:
# Overloaded - wait longer
wait_time = 5 * (attempt + 1)
print(f"
Server overloaded. Waiting {wait_time}s...")
time.sleep(wait_time)
else:
raise
except anthropic.APIConnectionError:
# Network error - retry with accumulated context
wait_time = 2 ** attempt
print(f"
Connection lost. Retrying in {wait_time}s...")
time.sleep(wait_time)
if accumulated_text:
# Continue from where we left off
messages = messages + [
{"role": "assistant", "content": accumulated_text},
{"role": "user", "content": "Hãy tiếp tục từ chỗ bạn dừng lại."}
]
raise Exception("Max retries exceeded")
Streaming với Tool Use
Khi Claude sử dụng tools (function calling) trong streaming mode, flow phức tạp hơn vì response có thể chứa cả text blocks và tool_use blocks.
Xử lý streaming tool use trong Python
import anthropic
import json
client = anthropic.Anthropic()
tools = [
{
"name": "get_weather",
"description": "Get current weather for a location",
"input_schema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City name, e.g. 'Ho Chi Minh City'"
}
},
"required": ["location"]
}
}
]
def handle_tool_call(tool_name, tool_input):
"""Execute tool and return result."""
if tool_name == "get_weather":
# Simulate API call
return {"temperature": 32, "condition": "sunny"}
return {"error": "Unknown tool"}
# First stream - may contain tool use
current_tool_name = None
current_tool_input = ""
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
tools=tools,
messages=[
{"role": "user",
"content": "Thời tiết TP.HCM hôm nay thế nào?"}
]
) as stream:
for event in stream:
if event.type == "content_block_start":
if hasattr(event.content_block, "type"):
if event.content_block.type == "tool_use":
current_tool_name = event.content_block.name
current_tool_input = ""
print(f"[Calling tool: {current_tool_name}]")
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.delta.type == "input_json_delta":
current_tool_input += event.delta.partial_json
elif event.type == "content_block_stop":
if current_tool_name:
# Execute tool
tool_input = json.loads(current_tool_input)
result = handle_tool_call(
current_tool_name, tool_input
)
print(f"[Tool result: {result}]")
current_tool_name = None
Đếm token trong quá trình Stream
Theo dõi token usage trong streaming giúp bạn kiểm soát chi phí và tuân thủ giới hạn context.
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{"role": "user", "content": "Viết bài thơ về Hà Nội."}
]
) as stream:
for event in stream:
if event.type == "message_start":
input_tokens = event.message.usage.input_tokens
print(f"[Input tokens: {input_tokens}]")
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.type == "message_delta":
output_tokens = event.usage.output_tokens
print(f"
[Output tokens: {output_tokens}]")
# Calculate cost (Claude Sonnet pricing)
input_cost = input_tokens * 3 / 1_000_000
output_cost = output_tokens * 15 / 1_000_000
total_cost = input_cost + output_cost
print(f"[Estimated cost: ${total_cost:.4f}]")
Production Checklist
Trước khi deploy streaming API lên production, hãy kiểm tra các mục sau:
Backend
- Timeout configuration: Set timeout phù hợp (60-120 giây cho long responses). Đảm bảo reverse proxy (nginx, CloudFlare) không timeout sớm hơn
- Buffering disabled: Nginx, CloudFlare và các proxy thường buffer response. Cần disable buffering cho SSE endpoints
- Connection limits: Mỗi streaming connection giữ 1 HTTP connection mở. Cần tính toán concurrent connections
- Rate limiting: Implement rate limiting ở application level, không chỉ dựa vào Claude API rate limits
- Logging: Log start/end của mỗi stream, token usage, errors
Frontend
- Cancel button: Cho phép user hủy stream giữa chừng (gọi AbortController)
- Loading state: Hiển thị loading indicator trước khi token đầu tiên xuất hiện
- Error UI: Hiển thị error message thân thiện khi stream bị lỗi
- Reconnection: Tự động retry khi mất kết nối tạm thời
- Memory management: Với conversation dài, cần quản lý DOM elements để tránh memory leak
Nginx configuration cho SSE
# nginx.conf - Cấu hình cho SSE streaming
location /api/chat {
proxy_pass http://localhost:3000;
proxy_http_version 1.1;
proxy_set_header Connection "";
# Disable buffering for SSE
proxy_buffering off;
proxy_cache off;
# Timeout settings
proxy_read_timeout 120s;
proxy_send_timeout 120s;
# Disable gzip for SSE (can cause buffering)
gzip off;
}
So sánh Streaming vs Non-Streaming
Không phải mọi use case đều cần streaming. Dưới đây là hướng dẫn khi nào nên dùng:
- Nên dùng Streaming: Chat interfaces, content generation, code generation — bất kỳ khi nào user nhìn vào response đang được tạo
- Không cần Streaming: Background processing, batch operations, API-to-API calls nơi không có người dùng chờ đợi
- Cân nhắc: Khi cần parse toàn bộ response trước khi xử lý (ví dụ: JSON output), streaming thêm complexity mà không nhiều benefit
Streaming với Extended Thinking
Claude hỗ trợ extended thinking (suy nghĩ sâu) kết hợp với streaming. Khi bật extended thinking, bạn sẽ nhận được thinking blocks trước content blocks.
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=16000,
thinking={
"type": "enabled",
"budget_tokens": 10000
},
messages=[
{"role": "user",
"content": "Phân tích chiến lược kinh doanh cho startup edtech tại VN."}
]
) as stream:
for event in stream:
if event.type == "content_block_start":
if event.content_block.type == "thinking":
print("[Thinking...]")
elif event.content_block.type == "text":
print("[Response:]")
elif event.type == "content_block_delta":
if event.delta.type == "thinking_delta":
# Optionally show thinking to user
pass
elif event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
Bước tiếp theo
Bạn đã nắm được cách triển khai Claude Streaming API từ backend đến frontend, bao gồm xử lý lỗi, tool use, và token counting. Bước tiếp theo là tích hợp streaming vào ứng dụng chat của bạn và tối ưu trải nghiệm người dùng. Khám phá thêm các hướng dẫn kỹ thuật tại Thư viện Nâng cao.
Bai viet co huu ich khong?
Bản quyền thuộc về tác giả. Vui lòng dẫn nguồn khi chia sẻ.




