Streaming response — Hiển thị text khi Claude đang sinh

2 — Gọi Claude qua APICơ bản25 phút

Hãy so sánh 2 trải nghiệm:

Bạn sẽ học được
  • Giải thích lý do streaming cải thiện UX chatbot một cách đột phá
  • Bật streaming với stream=True và xử lý event stream
  • Dùng simplified streaming API (client.messages.stream) để code gọn
  • Lấy cả streaming text và final complete message cho lưu trữ
  • Nhận diện 6 loại event trong stream

Vấn đề: Response time có thể 10-30 giây

Với prompt dài, model lớn, tác vụ phức tạp:

Nếu chờ hết → 10s màn hình đơ.

Với streaming:

  • Time to first token (TTFT): ~500ms
  • Token per second (TPS) của Sonnet: ~80 tok/s
  • 800 token response = 10 giây generation
  • User thấy chữ đầu sau 500ms
  • Đọc dần dần, "thời gian chờ" biến mất về cảm nhận

Cách streaming hoạt động

Thay vì 1 HTTP response lớn, bạn nhận nhiều event nhỏ qua Server-Sent Events (SSE).

┌──────────────────────────────────────────────────┐
│                                                  │
│   Non-stream:                                    │
│                                                  │
│   ┌─────────┐             ┌──────────────┐       │
│   │ Request │ ────────▶   │              │       │
│   │         │  đợi 10s    │  Full text   │       │
│   │         │ ◀────────   │              │       │
│   └─────────┘             └──────────────┘       │
│                                                  │
│   Stream:                                        │
│                                                  │
│   ┌─────────┐             ┌─────┐                │
│   │ Request │ ────────▶   │     │ 0.5s: "Quan"   │
│   │         │ ◀────────   └─────┘                │
│   │         │             ┌─────┐                │
│   │         │ ◀────────   │     │ 0.6s: "tum"    │
│   │         │             └─────┘                │
│   │         │ ◀────────   ... tiếp tục           │
│   └─────────┘                                    │
│                                                  │
└──────────────────────────────────────────────────┘

6 loại event trong stream

Anthropic gửi 6 loại event trong suốt stream:

Trong đa số use case, bạn chỉ quan tâm ContentBlockDelta — chứa chunk text thật.

┌─────────────────────────────────────────────────────┐
│                                                     │
│  1. MessageStart                                    │
│     "Tôi bắt đầu message mới"                       │
│                                                     │
│  2. ContentBlockStart                               │
│     "Bắt đầu 1 block (text / tool / thinking)"      │
│                                                     │
│  3. ContentBlockDelta ← event quan trọng nhất       │
│     "Đây là 1 chunk text"                           │
│     (lặp lại nhiều lần)                             │
│                                                     │
│  4. ContentBlockStop                                │
│     "Kết thúc block này"                            │
│                                                     │
│  5. MessageDelta                                    │
│     "Update stop_reason, usage..."                  │
│                                                     │
│  6. MessageStop                                     │
│     "Hết message"                                   │
│                                                     │
└─────────────────────────────────────────────────────┘

Cách 1: Low-level với stream=True

Output (rút gọn):

messages = [{"role": "user", "content": "Count from 1 to 5 slowly"}]

stream = client.messages.create(
    model=model,
    max_tokens=200,
    messages=messages,
    stream=True,  # ← Bật streaming
)

for event in stream:
    print(event)

Cách 1: Low-level với stream=True (tiếp)

Filter chỉ lấy text

MessageStartEvent(message=Message(id='msg_...', ...))
ContentBlockStartEvent(index=0, content_block=TextBlock(text=''))
ContentBlockDeltaEvent(index=0, delta=TextDelta(text='1'))
ContentBlockDeltaEvent(index=0, delta=TextDelta(text=', '))
ContentBlockDeltaEvent(index=0, delta=TextDelta(text='2'))
ContentBlockDeltaEvent(index=0, delta=TextDelta(text=', 3'))
...
ContentBlockStopEvent(index=0)
MessageDeltaEvent(delta={'stop_reason': 'end_turn'})
MessageStopEvent()

Filter chỉ lấy text

end="" để không xuống dòng. flush=True để in ngay, không đợi buffer.

stream = client.messages.create(
    model=model,
    max_tokens=200,
    messages=messages,
    stream=True,
)

full_text = ""
for event in stream:
    if event.type == "content_block_delta":
        chunk = event.delta.text
        print(chunk, end="", flush=True)  # In ngay, không newline
        full_text += chunk

print("\n\nFinal text:", full_text)

Cách 2: High-level với client.messages.stream()

SDK có API gọn hơn qua context manager:

Khác biệt:

Khi nào dùng cách nào?

  • client.messages.stream() (không có .create) — dùng context manager
  • stream.text_stream — iterator chỉ yield text chunk (đã filter)
  • Cleaner, ngắn hơn 50%
CáchDùng khi
stream=True + manual filterCần handle tool_use, thinking blocks, edge case
client.messages.stream()Chatbot thường — 95% use case
messages = [{"role": "user", "content": "Count from 1 to 5"}]

with client.messages.stream(
    model=model,
    max_tokens=200,
    messages=messages,
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

Lấy complete message sau stream

Bạn cần stream cho UX, nhưng cũng cần full message để:

Với context manager, sau khi stream xong có stream.get_final_message():

Best of both worlds: realtime UX + complete metadata.

  • Lưu vào database
  • Append vào messages history cho multi-turn
  • Log usage tokens
messages = [{"role": "user", "content": "Describe Vietnam briefly"}]

with client.messages.stream(
    model=model,
    max_tokens=500,
    messages=messages,
) as stream:
    # Stream đến user
    for text in stream.text_stream:
        print(text, end="", flush=True)
    
    # Sau khi xong, lấy full message
    final = stream.get_final_message()

print("\n\n=== Message metadata ===")
print(f"Stop reason: {final.stop_reason}")
print(f"Input tokens: {final.usage.input_tokens}")
print(f"Output tokens: {final.usage.output_tokens}")

# Append vào history
full_text = final.content[0].text
messages.append({"role": "assistant", "content": full_text})

Integration vào chatbot từ bài 6.8

Update function chat() để support streaming:

Dùng trong chatbot loop:

def chat_stream(messages: list, verbose: bool = False) -> str:
    """
    Chat với streaming output.
    
    Returns: full response text
    """
    with client.messages.stream(
        model=model,
        max_tokens=1000,
        messages=messages,
    ) as stream:
        # Print từng chunk
        for text in stream.text_stream:
            print(text, end="", flush=True)
        
        # Newline after stream
        print()
        
        # Get metadata
        final = stream.get_final_message()
        
        if verbose:
            print(f"  [tokens: {final.usage.input_tokens}+{final.usage.output_tokens}]")
    
    return final.content[0].text

Integration vào chatbot từ bài 6.8 (tiếp)

UX lập tức tăng 10x.

messages = []

while True:
    user_input = input("\nYou: ").strip()
    if not user_input or user_input == "quit":
        break
    
    add_user_message(messages, user_input)
    print("Claude: ", end="")
    
    answer = chat_stream(messages, verbose=True)
    add_assistant_message(messages, answer)

Ví dụ thực chiến: Writer assistant với streaming

Tình huống

Bạn build assistant cho copywriter — user paste draft, AI suggest revision. Response thường 500-1000 từ, không stream = đợi quá lâu.

Code

UX khác biệt

Không stream: User paste 500 từ → chờ 12 giây → tất cả hiện ra.

Có stream: User paste 500 từ → 0.5 giây thấy chữ đầu → đọc song song lúc Claude viết → tổng "cảm giác" 3 giây.

def writer_assistant():
    messages = []
    
    while True:
        draft = input("\nPaste draft (or 'quit'):\n").strip()
        if draft == "quit":
            break
        
        add_user_message(messages, 
            f"Review draft này, đề xuất 3 cải thiện cụ thể:\n\n{draft}")
        
        print("\n🖋️ Assistant review:\n")
        print("-" * 40)
        
        with client.messages.stream(
            model=model,
            max_tokens=2000,
            messages=messages,
        ) as stream:
            for text in stream.text_stream:
                print(text, end="", flush=True)
            
            final = stream.get_final_message()
        
        print("\n" + "-" * 40)
        add_assistant_message(messages, final.content[0].text)

Case studies theo ngành

🎧 Customer Support — Chatbot web

Không stream: Bounce rate 35% khi response > 5s.

Có stream: Bounce rate giảm còn 12%. Customer cảm thấy "được hồi đáp".

📝 Content — Marketing tool

Task: Generate ad copy 3 variants.

Stream: Writer đọc variant 1 xong thì variant 2 bắt đầu stream → parallel consumption, faster iteration.

💻 Developer Tools — Code assistant

Task: Suggest refactor cho function dài.

Stream critical: Code refactor thường 50-200 dòng. Không stream = dev cảm thấy "tool chậm", switch sang Copilot.

Anti-patterns streaming

❌ Stream vào file log

Hiện tượng: Mỗi chunk log 1 dòng → log file có 500 dòng cho 1 response.

Fix: Collect full_text, log 1 entry khi stream xong.

❌ Stream khi không hiển thị UI

Hiện tượng: Backend batch job dùng stream.

Fix: Backend job dùng non-stream. Stream chỉ cần cho realtime UI.

❌ Forget flush=True

Hiện tượng: Text vẫn hiện "all at once" dù đã bật stream.

Nguyên nhân: stdout buffer chưa flush.

Fix: print(text, end="", flush=True).

❌ Không handle None từ text_stream

Hiện tượng: Occasional crash.

Fix:

❌ Append chunks vào messages thay vì final text

Hiện tượng:

for text in stream.text_stream:
    if text:  # ← guard
        print(text, end="", flush=True)

❌ Append chunks vào messages thay vì final text

Fix: Dùng stream.get_final_message() sau khi stream xong.

# ❌ Bug
for text in stream.text_stream:
    messages.append({"role": "assistant", "content": text})
# → messages có 500 entries assistant!

Áp dụng ngay

Bài tập 1: Upgrade chatbot với streaming (20 phút)

Quay lại 02_chat.ipynb (bài 6.8). Update chat() thành chat_stream(). Chạy lại, so sánh feel.

Bài tập 2: Đo TTFT (Time To First Token) (15 phút)

Viết script đo TTFT:

Chạy 3 lần cho Haiku, Sonnet, Opus. Ghi lại:

Insight: Haiku TTFT thường nhanh nhất — là lý do Haiku tốt cho realtime chat.

ModelTTFTTotal
Haiku______
Sonnet______
Opus______
import time

messages = [{"role": "user", "content": "Count to 10"}]

start = time.time()

with client.messages.stream(
    model=model,
    max_tokens=100,
    messages=messages,
) as stream:
    for i, text in enumerate(stream.text_stream):
        if i == 0:
            ttft = time.time() - start
            print(f"\n[TTFT: {ttft*1000:.0f}ms]\n")
        print(text, end="", flush=True)

total = time.time() - start
print(f"\n\n[Total: {total*1000:.0f}ms]")

Mẹo nâng cao

Mẹo 1: Stream vào WebSocket cho web app

Mẹo 2: Parse tool_use trong stream

Tool use blocks cũng stream. Nếu bạn làm agent, cần parse:

# FastAPI example
from fastapi import WebSocket

async def chat_endpoint(websocket: WebSocket):
    await websocket.accept()
    data = await websocket.receive_json()
    
    with client.messages.stream(
        model=model,
        max_tokens=2000,
        messages=data["messages"],
    ) as stream:
        for text in stream.text_stream:
            await websocket.send_text(text)
    
    await websocket.close()

Mẹo 2: Parse tool_use trong stream

Chi tiết ở Module 5.

Mẹo 3: Cancel stream giữa chừng

Nếu user click "Stop" giữa chừng, đóng stream:

for event in stream:
    if event.type == "content_block_start":
        if event.content_block.type == "tool_use":
            print(f"\n[Tool call: {event.content_block.name}]")

Mẹo 3: Cancel stream giữa chừng

SDK tự đóng connection khi exit context manager.

with client.messages.stream(...) as stream:
    for text in stream.text_stream:
        if user_clicked_stop:
            break  # exit cleanly
        print(text, end="", flush=True)

Tóm tắt bài học

🎯 Streaming là bắt buộc cho chatbot UX. 40% engagement up, 50% bounce down.

🎯 2 cách: stream=True (low-level, full control) hoặc client.messages.stream() (high-level, gọn).

🎯 6 loại event, quan tâm nhất là ContentBlockDelta. High-level API đã filter sẵn.

🎯 stream.get_final_message() = best of both worlds — stream realtime + metadata đầy đủ.

🎯 Không stream cho batch job. Stream chỉ cho realtime UI.

Tài liệu tham khảo
  • Streaming docs
  • Server-Sent Events spec
Nội dung này có hữu ích không?