Trung cấpHướng dẫnClaude APINguồn: Anthropic

Claude Streaming API — Real-time response cho ứng dụng chat

Nghe bài viết
00:00

Điểm nổi bật

Nhấn để đến mục tương ứng

  1. 1 Bước tiếp theo là tích hợp streaming vào ứng dụng chat của bạn và tối ưu trải nghiệm người dùng.
  2. 2 Với React, dùng ref thay vì state cho text accumulation Markdown rendering: Nếu response chứa Markdown, cần render incremental.
  3. 3 Có ba lý do chính khiến streaming trở thành tiêu chuẩn cho ứng dụng AI chat: Time to First Token (TTFT): Với non-streaming, người dùng không thấy gì cho đến khi toàn bộ response hoàn tất (có thể 10-30 giây).
  4. 4 Streaming API giải quyết vấn đề này bằng cách gửi từng phần (token) của câu trả lời ngay khi chúng được tạo ra, giống như cách ChatGPT hay Claude.ai hiển thị text "chạy" từ từ trên màn hình.
  5. 5 Khác biệt với WebSocket (two-way communication), SSE là one-way: server gửi events đến client.
computer program screengrab

Khi người dùng gửi một câu hỏi cho Claude, họ không muốn đợi 10-30 giây để nhận toàn bộ câu trả lời. Streaming API giải quyết vấn đề này bằng cách gửi từng phần (token) của câu trả lời ngay khi chúng được tạo ra, giống như cách ChatGPT hay Claude.ai hiển thị text "chạy" từ từ trên màn hình. Bài viết này hướng dẫn bạn triển khai Streaming API từ backend đến frontend.

Tại sao cần Streaming?

Có ba lý do chính khiến streaming trở thành tiêu chuẩn cho ứng dụng AI chat:

  • Time to First Token (TTFT): Với non-streaming, người dùng không thấy gì cho đến khi toàn bộ response hoàn tất (có thể 10-30 giây). Với streaming, token đầu tiên xuất hiện trong 0.5-2 giây, tạo cảm giác phản hồi tức thì
  • Perceived performance: Dù tổng thời gian tạo response là như nhau, streaming khiến người dùng cảm thấy ứng dụng nhanh hơn vì họ bắt đầu đọc ngay khi text xuất hiện
  • Early abort: Người dùng có thể đọc phần đầu response và hủy nếu câu trả lời không đúng hướng, tiết kiệm token và chi phí API

Server-Sent Events (SSE) — Nền tảng của Streaming

Claude Streaming API sử dụng Server-Sent Events (SSE), một giao thức web chuẩn cho phép server push data đến client qua HTTP connection đơn hướng.

Khác biệt với WebSocket (two-way communication), SSE là one-way: server gửi events đến client. Điều này phù hợp hoàn hảo cho AI chat vì client gửi 1 request và server stream response về.

Cấu trúc SSE event từ Claude API

Claude API gửi các event types sau trong quá trình streaming:

  • message_start: Bắt đầu message, chứa metadata (model, usage)
  • content_block_start: Bắt đầu một content block (text hoặc tool_use)
  • content_block_delta: Phần nội dung tiếp theo (delta text)
  • content_block_stop: Kết thúc content block
  • message_delta: Metadata cuối message (stop_reason, usage)
  • message_stop: Kết thúc toàn bộ message

Triển khai với Python

Cài đặt

pip install anthropic

Streaming cơ bản với Python SDK

import anthropic

client = anthropic.Anthropic()

# Streaming cơ bản
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Giải thích blockchain trong 5 câu."}
    ]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

print()  # Newline sau khi stream kết thúc

Streaming với event handling chi tiết

import anthropic

client = anthropic.Anthropic()

# Xử lý từng event type
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Viết đoạn code Python sort algorithm."}
    ]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)
        elif event.type == "message_start":
            print(f"[Model: {event.message.model}]")
        elif event.type == "message_delta":
            print(f"
[Tokens used: {event.usage.output_tokens}]")
        elif event.type == "message_stop":
            print("
[Stream completed]")

Streaming trong FastAPI backend

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
import json

app = FastAPI()
client = anthropic.Anthropic()

async def generate_stream(user_message: str):
    """Generator function that yields SSE events to client."""
    with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=2048,
        messages=[
            {"role": "user", "content": user_message}
        ]
    ) as stream:
        for text in stream.text_stream:
            # Format as SSE event
            data = json.dumps({"type": "text", "content": text})
            yield f"data: {data}

"

    # Signal stream end
    yield f"data: {json.dumps({'type': 'done'})}

"

@app.post("/api/chat")
async def chat(request: dict):
    user_message = request.get("message", "")
    return StreamingResponse(
        generate_stream(user_message),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Disable nginx buffering
        }
    )

Triển khai với Node.js

Cài đặt

npm install @anthropic-ai/sdk

Streaming cơ bản với Node.js SDK

import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic();

async function main() {
  const stream = client.messages.stream({
    model: "claude-sonnet-4-20250514",
    max_tokens: 1024,
    messages: [
      { role: "user", content: "Giải thích AI trong 5 câu." }
    ]
  });

  // Event-based handling
  stream.on("text", (text) => {
    process.stdout.write(text);
  });

  stream.on("message", (message) => {
    console.log("
[Total tokens:", message.usage.output_tokens, "]");
  });

  // Wait for stream to finish
  const finalMessage = await stream.finalMessage();
  console.log("[Stop reason:", finalMessage.stop_reason, "]");
}

main();

Streaming trong Express.js backend

import express from "express";
import Anthropic from "@anthropic-ai/sdk";

const app = express();
const client = new Anthropic();

app.use(express.json());

app.post("/api/chat", async (req, res) => {
  const { message } = req.body;

  // Set SSE headers
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");
  res.setHeader("X-Accel-Buffering", "no");

  try {
    const stream = client.messages.stream({
      model: "claude-sonnet-4-20250514",
      max_tokens: 2048,
      messages: [{ role: "user", content: message }]
    });

    stream.on("text", (text) => {
      const data = JSON.stringify({ type: "text", content: text });
      res.write("data: " + data + "\n\n");
    });

    stream.on("error", (error) => {
      const data = JSON.stringify({
        type: "error",
        content: error.message
      });
      res.write("data: " + data + "\n\n");
      res.end();
    });

    stream.on("end", () => {
      res.write("data: " + JSON.stringify({ type: "done" }) + "\n\n");
      res.end();
    });

    // Handle client disconnect
    req.on("close", () => {
      stream.abort();
    });
  } catch (error) {
    res.write("data: " + JSON.stringify({
      type: "error",
      content: error.message
    }) + "\n\n");
    res.end();
  }
});

app.listen(3000, () => console.log("Server running on port 3000"));

Frontend: Render streaming response

Vanilla JavaScript với EventSource

// Frontend code - kết nối với SSE endpoint
async function sendMessage(userMessage) {
  const responseDiv = document.getElementById("response");
  responseDiv.textContent = "";

  const response = await fetch("/api/chat", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ message: userMessage })
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value);
    const lines = chunk.split("
");

    for (const line of lines) {
      if (line.startsWith("data: ")) {
        const data = JSON.parse(line.slice(6));

        if (data.type === "text") {
          responseDiv.textContent += data.content;
        } else if (data.type === "done") {
          console.log("Stream completed");
        } else if (data.type === "error") {
          responseDiv.textContent += "
[Error: " + data.content + "]";
        }
      }
    }
  }
}

UI rendering patterns

Khi render streaming text, có một số patterns quan trọng để UI mượt mà:

  • Append-only rendering: Chỉ append text mới, không re-render toàn bộ nội dung mỗi khi nhận token mới. Với React, dùng ref thay vì state cho text accumulation
  • Markdown rendering: Nếu response chứa Markdown, cần render incremental. Thư viện như marked hoặc markdown-it có thể parse partial Markdown
  • Auto-scroll: Tự động scroll xuống cuối khi text mới xuất hiện, nhưng dừng auto-scroll nếu user đã scroll lên để đọc
  • Cursor animation: Hiển thị blinking cursor ở cuối text đang stream để cho thấy response chưa hoàn tất

Xử lý lỗi Mid-Stream (Error Recovery)

Streaming có thể bị gián đoạn giữa chừng do network issues, rate limiting, hoặc server errors. Cần xử lý graceful.

Các loại lỗi thường gặp

  • Network disconnect: Mất kết nối internet giữa chừng
  • Rate limiting (429): Vượt quá giới hạn requests per minute
  • Overloaded (529): Server Claude đang quá tải
  • Timeout: Response quá dài, vượt quá timeout setting

Strategy xử lý lỗi

import anthropic
import time

def stream_with_retry(messages, max_retries=3):
    """Stream with exponential backoff retry."""
    client = anthropic.Anthropic()
    accumulated_text = ""

    for attempt in range(max_retries):
        try:
            with client.messages.stream(
                model="claude-sonnet-4-20250514",
                max_tokens=4096,
                messages=messages
            ) as stream:
                for text in stream.text_stream:
                    accumulated_text += text
                    yield text

            # Stream completed successfully
            return

        except anthropic.APIStatusError as e:
            if e.status_code == 429:
                # Rate limited - wait and retry
                wait_time = 2 ** attempt
                print(f"
Rate limited. Waiting {wait_time}s...")
                time.sleep(wait_time)
            elif e.status_code == 529:
                # Overloaded - wait longer
                wait_time = 5 * (attempt + 1)
                print(f"
Server overloaded. Waiting {wait_time}s...")
                time.sleep(wait_time)
            else:
                raise

        except anthropic.APIConnectionError:
            # Network error - retry with accumulated context
            wait_time = 2 ** attempt
            print(f"
Connection lost. Retrying in {wait_time}s...")
            time.sleep(wait_time)

            if accumulated_text:
                # Continue from where we left off
                messages = messages + [
                    {"role": "assistant", "content": accumulated_text},
                    {"role": "user", "content": "Hãy tiếp tục từ chỗ bạn dừng lại."}
                ]

    raise Exception("Max retries exceeded")

Streaming với Tool Use

Khi Claude sử dụng tools (function calling) trong streaming mode, flow phức tạp hơn vì response có thể chứa cả text blocks và tool_use blocks.

Xử lý streaming tool use trong Python

import anthropic
import json

client = anthropic.Anthropic()

tools = [
    {
        "name": "get_weather",
        "description": "Get current weather for a location",
        "input_schema": {
            "type": "object",
            "properties": {
                "location": {
                    "type": "string",
                    "description": "City name, e.g. 'Ho Chi Minh City'"
                }
            },
            "required": ["location"]
        }
    }
]

def handle_tool_call(tool_name, tool_input):
    """Execute tool and return result."""
    if tool_name == "get_weather":
        # Simulate API call
        return {"temperature": 32, "condition": "sunny"}
    return {"error": "Unknown tool"}

# First stream - may contain tool use
current_tool_name = None
current_tool_input = ""

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    tools=tools,
    messages=[
        {"role": "user",
         "content": "Thời tiết TP.HCM hôm nay thế nào?"}
    ]
) as stream:
    for event in stream:
        if event.type == "content_block_start":
            if hasattr(event.content_block, "type"):
                if event.content_block.type == "tool_use":
                    current_tool_name = event.content_block.name
                    current_tool_input = ""
                    print(f"[Calling tool: {current_tool_name}]")

        elif event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)
            elif event.delta.type == "input_json_delta":
                current_tool_input += event.delta.partial_json

        elif event.type == "content_block_stop":
            if current_tool_name:
                # Execute tool
                tool_input = json.loads(current_tool_input)
                result = handle_tool_call(
                    current_tool_name, tool_input
                )
                print(f"[Tool result: {result}]")
                current_tool_name = None

Đếm token trong quá trình Stream

Theo dõi token usage trong streaming giúp bạn kiểm soát chi phí và tuân thủ giới hạn context.

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Viết bài thơ về Hà Nội."}
    ]
) as stream:
    for event in stream:
        if event.type == "message_start":
            input_tokens = event.message.usage.input_tokens
            print(f"[Input tokens: {input_tokens}]")

        elif event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)

        elif event.type == "message_delta":
            output_tokens = event.usage.output_tokens
            print(f"
[Output tokens: {output_tokens}]")

            # Calculate cost (Claude Sonnet pricing)
            input_cost = input_tokens * 3 / 1_000_000
            output_cost = output_tokens * 15 / 1_000_000
            total_cost = input_cost + output_cost
            print(f"[Estimated cost: ${total_cost:.4f}]")

Production Checklist

Trước khi deploy streaming API lên production, hãy kiểm tra các mục sau:

Backend

  • Timeout configuration: Set timeout phù hợp (60-120 giây cho long responses). Đảm bảo reverse proxy (nginx, CloudFlare) không timeout sớm hơn
  • Buffering disabled: Nginx, CloudFlare và các proxy thường buffer response. Cần disable buffering cho SSE endpoints
  • Connection limits: Mỗi streaming connection giữ 1 HTTP connection mở. Cần tính toán concurrent connections
  • Rate limiting: Implement rate limiting ở application level, không chỉ dựa vào Claude API rate limits
  • Logging: Log start/end của mỗi stream, token usage, errors

Frontend

  • Cancel button: Cho phép user hủy stream giữa chừng (gọi AbortController)
  • Loading state: Hiển thị loading indicator trước khi token đầu tiên xuất hiện
  • Error UI: Hiển thị error message thân thiện khi stream bị lỗi
  • Reconnection: Tự động retry khi mất kết nối tạm thời
  • Memory management: Với conversation dài, cần quản lý DOM elements để tránh memory leak

Nginx configuration cho SSE

# nginx.conf - Cấu hình cho SSE streaming
location /api/chat {
    proxy_pass http://localhost:3000;
    proxy_http_version 1.1;
    proxy_set_header Connection "";

    # Disable buffering for SSE
    proxy_buffering off;
    proxy_cache off;

    # Timeout settings
    proxy_read_timeout 120s;
    proxy_send_timeout 120s;

    # Disable gzip for SSE (can cause buffering)
    gzip off;
}

So sánh Streaming vs Non-Streaming

Không phải mọi use case đều cần streaming. Dưới đây là hướng dẫn khi nào nên dùng:

  • Nên dùng Streaming: Chat interfaces, content generation, code generation — bất kỳ khi nào user nhìn vào response đang được tạo
  • Không cần Streaming: Background processing, batch operations, API-to-API calls nơi không có người dùng chờ đợi
  • Cân nhắc: Khi cần parse toàn bộ response trước khi xử lý (ví dụ: JSON output), streaming thêm complexity mà không nhiều benefit

Streaming với Extended Thinking

Claude hỗ trợ extended thinking (suy nghĩ sâu) kết hợp với streaming. Khi bật extended thinking, bạn sẽ nhận được thinking blocks trước content blocks.

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=16000,
    thinking={
        "type": "enabled",
        "budget_tokens": 10000
    },
    messages=[
        {"role": "user",
         "content": "Phân tích chiến lược kinh doanh cho startup edtech tại VN."}
    ]
) as stream:
    for event in stream:
        if event.type == "content_block_start":
            if event.content_block.type == "thinking":
                print("[Thinking...]")
            elif event.content_block.type == "text":
                print("[Response:]")

        elif event.type == "content_block_delta":
            if event.delta.type == "thinking_delta":
                # Optionally show thinking to user
                pass
            elif event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)

Bước tiếp theo

Bạn đã nắm được cách triển khai Claude Streaming API từ backend đến frontend, bao gồm xử lý lỗi, tool use, và token counting. Bước tiếp theo là tích hợp streaming vào ứng dụng chat của bạn và tối ưu trải nghiệm người dùng. Khám phá thêm các hướng dẫn kỹ thuật tại Thư viện Nâng cao.

Tính năng liên quan:StreamingServer-Sent EventsReal-time UIError RecoveryTool Use

Bai viet co huu ich khong?

Bản quyền thuộc về tác giả. Vui lòng dẫn nguồn khi chia sẻ.

Bình luận (0)
Ảnh đại diện
Đăng nhập để bình luận...
Đăng nhập để bình luận
  • Đang tải bình luận...

Đăng ký nhận bản tin

Nhận bài viết hay nhất về sản phẩm và vận hành, gửi thẳng vào hộp thư của bạn.

Bảo mật thông tin. Hủy đăng ký bất cứ lúc nào. Chính sách bảo mật.