Nâng caoHướng dẫnClaude APINguồn: Anthropic

Async Patterns cho Claude API — Queue, Worker va Concurrent Processing

Nghe bài viết
00:00

Điểm nổi bật

Nhấn để đến mục tương ứng

  1. 1 Claude API co latency tu 2 den 30 giay tuy do phuc tap cua request.
  2. 2 Khac voi cac API thong thuong tra loi trong 50-200ms, Claude API can thoi gian suy nghi.
  3. 3 Luon set timeout Moi request den Claude API can co timeout.
  4. 4 Khong kill giua chung — se mat ket qua va token da tieu thu.
  5. 5 Log moi thu Log request_id, model, tokens, thoi gian xu ly, va trang thai cho moi request.
flat screen computer monitor

Claude API có latency từ 2 đến 30 giây tùy độ phức tạp của request. Nếu ứng dụng của bạn xử lý request đồng bộ (synchronous), người dùng sẽ phải chờ và server bị block. Async patterns giúp bạn xử lý hàng ngàn requests đồng thời mà không làm chậm hệ thống.

Tại sao Async quan trọng với Claude API?

Khác với các API thông thường trả lời trong 50-200ms, Claude API cần thời gian suy nghĩ. Một request đơn giản mất 2-3 giây, request phức tạp có thể mất 15-30 giây. Trong thời gian đó:

  • Server thread bị block: Không thể xử lý request khác
  • Người dùng chờ: UX tệ, bounce rate cao
  • Timeout: Reverse proxy (Nginx) thường timeout sau 30-60 giây
  • Rate limit: Anthropic giới hạn số requests đồng thời

Mô hình đồng bộ vs bất đồng bộ

# Mo hinh dong bo (BAD):
# User -> API Server -> Claude API (cho 10s) -> Response -> User
# Server bi block 10 giay, chi xu ly 1 request/thread

# Mo hinh bat dong bo (GOOD):
# User -> API Server -> Queue -> Response "Da nhan, dang xu ly"
#                         |
#                    Worker Pool -> Claude API -> Callback/Webhook
#                                                    |
#                                              User nhan ket qua

Queue-based Architecture

Queue là thành phần trung tâm của async pattern. Nó tách biệt việc nhận request (producer) và xử lý request (consumer), cho phép scale độc lập.

Kiến trúc với Redis Queue

# Cai dat dependencies
# pip install redis rq anthropic

import redis
from rq import Queue, Worker
import anthropic
import json
from datetime import datetime

# Ket noi Redis
redis_conn = redis.Redis(host='localhost', port=6379, db=0)

# Tao queue voi cac muc uu tien khac nhau
high_queue = Queue('high', connection=redis_conn)
default_queue = Queue('default', connection=redis_conn)
low_queue = Queue('low', connection=redis_conn)


def process_claude_request(request_id: str, message: str,
                           model: str = "claude-sonnet-4-20250514",
                           system_prompt: str = "",
                           callback_url: str = None) -> dict:
    """Ham xu ly request — chay trong worker."""
    client = anthropic.Anthropic()

    start_time = datetime.now()

    try:
        kwargs = {
            "model": model,
            "max_tokens": 4096,
            "messages": [{"role": "user", "content": message}],
        }
        if system_prompt:
            kwargs["system"] = system_prompt

        response = client.messages.create(**kwargs)

        result = {
            "request_id": request_id,
            "status": "completed",
            "response": response.content[0].text,
            "model": model,
            "input_tokens": response.usage.input_tokens,
            "output_tokens": response.usage.output_tokens,
            "processing_time": (datetime.now() - start_time).total_seconds(),
        }

    except anthropic.RateLimitError:
        result = {
            "request_id": request_id,
            "status": "rate_limited",
            "error": "Rate limit exceeded, will retry",
            "retry": True,
        }

    except Exception as e:
        result = {
            "request_id": request_id,
            "status": "failed",
            "error": str(e),
        }

    # Luu ket qua vao Redis de client poll
    redis_conn.setex(
        f"result:{request_id}",
        3600,  # TTL 1 gio
        json.dumps(result)
    )

    # Goi callback neu co
    if callback_url and result["status"] == "completed":
        import requests
        try:
            requests.post(callback_url, json=result, timeout=5)
        except Exception:
            pass

    return result

API Server nhận request

# Flask API server
from flask import Flask, request, jsonify
import uuid

app = Flask(__name__)

@app.route('/api/claude', methods=['POST'])
def submit_request():
    """Nhan request va day vao queue."""
    data = request.json
    request_id = str(uuid.uuid4())

    # Chon queue dua tren priority
    priority = data.get('priority', 'default')
    queue_map = {
        'high': high_queue,
        'default': default_queue,
        'low': low_queue,
    }
    q = queue_map.get(priority, default_queue)

    # Day vao queue
    job = q.enqueue(
        process_claude_request,
        request_id=request_id,
        message=data['message'],
        model=data.get('model', 'claude-sonnet-4-20250514'),
        system_prompt=data.get('system_prompt', ''),
        callback_url=data.get('callback_url'),
        job_timeout=120,  # Timeout 2 phut
        retry=3,          # Retry 3 lan neu fail
    )

    return jsonify({
        "request_id": request_id,
        "status": "queued",
        "position": len(q),
        "estimated_time": len(q) * 5,  # Uoc tinh 5s/request
    }), 202  # 202 Accepted

@app.route('/api/claude/', methods=['GET'])
def get_result(request_id):
    """Kiem tra ket qua."""
    result = redis_conn.get(f"result:{request_id}")

    if result:
        return jsonify(json.loads(result))
    else:
        return jsonify({
            "request_id": request_id,
            "status": "processing",
        }), 202

Worker Pool Pattern

Worker pool cho phép bạn chạy nhiều worker đồng thời, mỗi worker xử lý một request từ queue. Số lượng worker quyết định throughput của hệ thống.

Cấu hình workers

# Chay workers (moi lenh trong 1 terminal rieng)

# Worker cho queue high priority
rq worker high default low --name worker-1

# Worker thu 2
rq worker high default low --name worker-2

# Worker thu 3 chi xu ly low priority
rq worker low --name worker-3

# Hoac dung supervisor de quan ly workers:
# /etc/supervisor/conf.d/claude-workers.conf
# [program:claude-worker]
# command=rq worker high default low
# numprocs=5
# process_name=%(program_name)s-%(process_num)02d
# autostart=true
# autorestart=true

Tính số lượng workers tối ưu

# Cong thuc:
# workers_needed = (requests_per_minute * avg_processing_time) / 60

# Vi du:
# - 100 requests/phut
# - Trung binh 6 giay/request
# - workers_needed = (100 * 6) / 60 = 10 workers

# Nhung can tinh ca rate limit cua Anthropic:
# - Tier 1: 50 requests/phut
# - Tier 2: 1000 requests/phut
# - Tier 3: 2000 requests/phut

# Neu ban o Tier 1, chi can toi da 5 workers
# (50 req/min * 6s / 60 = 5)

Concurrency Control với Semaphore

Khi dùng asyncio (Python) hoặc Promise (Node.js), bạn cần giới hạn số request đồng thời để không vượt rate limit:

Python asyncio implementation

import asyncio
import anthropic
from datetime import datetime

class AsyncClaudeProcessor:
    """Xu ly nhieu requests dong thoi voi concurrency control."""

    def __init__(self, max_concurrent: int = 10):
        self.client = anthropic.AsyncAnthropic()
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.results = []
        self.errors = []

    async def process_single(self, request_id: str, message: str,
                              model: str = "claude-sonnet-4-20250514") -> dict:
        """Xu ly mot request voi semaphore control."""
        async with self.semaphore:
            start = datetime.now()
            try:
                response = await self.client.messages.create(
                    model=model,
                    max_tokens=4096,
                    messages=[{"role": "user", "content": message}]
                )

                result = {
                    "request_id": request_id,
                    "status": "completed",
                    "response": response.content[0].text,
                    "tokens": response.usage.output_tokens,
                    "time": (datetime.now() - start).total_seconds(),
                }
                self.results.append(result)
                return result

            except anthropic.RateLimitError:
                # Doi va retry
                await asyncio.sleep(5)
                return await self.process_single(request_id, message, model)

            except Exception as e:
                error = {
                    "request_id": request_id,
                    "status": "failed",
                    "error": str(e),
                }
                self.errors.append(error)
                return error

    async def process_batch(self, requests: list) -> list:
        """Xu ly mot batch requests dong thoi."""
        tasks = [
            self.process_single(
                request_id=req["id"],
                message=req["message"],
                model=req.get("model", "claude-sonnet-4-20250514")
            )
            for req in requests
        ]

        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

    def get_summary(self) -> dict:
        """Tong hop ket qua."""
        if not self.results:
            return {"total": 0}

        times = [r["time"] for r in self.results]
        return {
            "total": len(self.results),
            "errors": len(self.errors),
            "avg_time": round(sum(times) / len(times), 2),
            "min_time": round(min(times), 2),
            "max_time": round(max(times), 2),
            "total_tokens": sum(r["tokens"] for r in self.results),
        }


# Su dung
async def main():
    processor = AsyncClaudeProcessor(max_concurrent=5)

    requests = [
        {"id": f"req-{i}", "message": f"Giai thich khai niem {topic}"}
        for i, topic in enumerate([
            "machine learning", "neural networks", "gradient descent",
            "backpropagation", "attention mechanism", "transformer",
            "fine-tuning", "prompt engineering", "RAG", "embeddings"
        ])
    ]

    print(f"Processing {len(requests)} requests (max 5 concurrent)...")
    results = await processor.process_batch(requests)
    print(processor.get_summary())

asyncio.run(main())

Node.js implementation

const Anthropic = require('@anthropic-ai/sdk');

class AsyncProcessor {
  constructor(maxConcurrent = 10) {
    this.client = new Anthropic();
    this.maxConcurrent = maxConcurrent;
    this.running = 0;
    this.queue = [];
    this.results = [];
  }

  async processWithLimit(requests) {
    const promises = requests.map(req =>
      this._enqueue(req)
    );
    return Promise.all(promises);
  }

  _enqueue(request) {
    return new Promise((resolve, reject) => {
      const task = async () => {
        try {
          const result = await this._processOne(request);
          resolve(result);
        } catch (err) {
          reject(err);
        } finally {
          this.running--;
          this._dequeue();
        }
      };

      if (this.running < this.maxConcurrent) {
        this.running++;
        task();
      } else {
        this.queue.push(task);
      }
    });
  }

  _dequeue() {
    if (this.queue.length > 0 && this.running < this.maxConcurrent) {
      this.running++;
      const task = this.queue.shift();
      task();
    }
  }

  async _processOne(request) {
    const start = Date.now();

    const response = await this.client.messages.create({
      model: request.model || 'claude-sonnet-4-20250514',
      max_tokens: 4096,
      messages: [{ role: 'user', content: request.message }],
    });

    const result = {
      requestId: request.id,
      response: response.content[0].text,
      tokens: response.usage.output_tokens,
      timeMs: Date.now() - start,
    };

    this.results.push(result);
    return result;
  }
}

// Su dung
async function main() {
  const processor = new AsyncProcessor(5);

  const requests = Array.from({ length: 20 }, (_, i) => ({
    id: `req-${i}`,
    message: `Giai thich ngan gon ve concept so ${i + 1} trong AI`,
  }));

  console.log(`Processing ${requests.length} requests...`);
  const results = await processor.processWithLimit(requests);
  console.log(`Done. Total results: ${results.length}`);
}

main();

Priority Queuing

Không phải mọi request đều quan trọng như nhau. Priority queuing đảm bảo request quan trọng được xử lý trước:

import heapq
import asyncio
from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PriorityRequest:
    priority: int  # So nho = uu tien cao
    timestamp: float = field(compare=False)
    request: dict = field(compare=False)
    future: asyncio.Future = field(compare=False, default=None)

class PriorityQueue:
    """Queue voi muc uu tien."""

    PRIORITIES = {
        "critical": 0,   # System alerts, error handling
        "high": 1,        # Paying users, real-time chat
        "normal": 2,      # Standard requests
        "low": 3,         # Background tasks, batch processing
        "bulk": 4,        # Data processing, analytics
    }

    def __init__(self):
        self.heap = []
        self.counter = 0

    def enqueue(self, request: dict, priority: str = "normal") -> asyncio.Future:
        """Them request vao queue voi priority."""
        import time
        future = asyncio.get_event_loop().create_future()

        item = PriorityRequest(
            priority=self.PRIORITIES.get(priority, 2),
            timestamp=time.time(),
            request=request,
            future=future,
        )

        heapq.heappush(self.heap, item)
        self.counter += 1
        return future

    def dequeue(self):
        """Lay request uu tien cao nhat."""
        if self.heap:
            return heapq.heappop(self.heap)
        return None

    def size(self) -> int:
        return len(self.heap)

    def size_by_priority(self) -> dict:
        """Dem so request theo tung muc uu tien."""
        counts = {}
        for item in self.heap:
            p = item.priority
            counts[p] = counts.get(p, 0) + 1
        return counts

Dead Letter Queue

Khi một request thất bại nhiều lần, nó cần được chuyển sang Dead Letter Queue (DLQ) để xử lý riêng, tránh block queue chính:

class DeadLetterQueue:
    """Queue cho cac request that bai nhieu lan."""

    def __init__(self, redis_conn, max_retries: int = 3):
        self.redis = redis_conn
        self.max_retries = max_retries
        self.dlq_key = "dlq:claude"

    def should_retry(self, request_id: str) -> bool:
        """Kiem tra xem request con duoc retry khong."""
        retry_count = self.redis.get(f"retry:{request_id}")
        if retry_count is None:
            return True
        return int(retry_count) < self.max_retries

    def increment_retry(self, request_id: str):
        """Tang dem retry."""
        key = f"retry:{request_id}"
        self.redis.incr(key)
        self.redis.expire(key, 3600)  # TTL 1 gio

    def send_to_dlq(self, request_id: str, request_data: dict,
                     error: str):
        """Chuyen request that bai vao DLQ."""
        dlq_entry = json.dumps({
            "request_id": request_id,
            "data": request_data,
            "error": error,
            "failed_at": datetime.now().isoformat(),
            "retry_count": int(self.redis.get(f"retry:{request_id}") or 0),
        })
        self.redis.lpush(self.dlq_key, dlq_entry)

    def get_dlq_size(self) -> int:
        """Dem so request trong DLQ."""
        return self.redis.llen(self.dlq_key)

    def process_dlq(self, handler_fn):
        """Xu ly thu cong cac request trong DLQ."""
        while True:
            entry = self.redis.rpop(self.dlq_key)
            if not entry:
                break
            data = json.loads(entry)
            handler_fn(data)


# Tich hop vao worker
dlq = DeadLetterQueue(redis_conn, max_retries=3)

def process_with_dlq(request_id: str, message: str, **kwargs):
    """Xu ly request voi DLQ support."""
    try:
        result = process_claude_request(request_id, message, **kwargs)

        if result.get("retry"):
            if dlq.should_retry(request_id):
                dlq.increment_retry(request_id)
                # Re-enqueue
                default_queue.enqueue(
                    process_with_dlq,
                    request_id=request_id,
                    message=message,
                    **kwargs
                )
            else:
                dlq.send_to_dlq(request_id, {"message": message},
                               result.get("error", "Max retries exceeded"))

        return result

    except Exception as e:
        if dlq.should_retry(request_id):
            dlq.increment_retry(request_id)
            raise  # RQ se tu retry
        else:
            dlq.send_to_dlq(request_id, {"message": message}, str(e))

Monitoring Queue Depth

Giám sát queue là yếu tố sống còn để hệ thống hoạt động ổn định:

import time

class QueueMonitor:
    """Giam sat suc khoe cua queue system."""

    def __init__(self, redis_conn, queues: list):
        self.redis = redis_conn
        self.queues = queues
        self.alerts = []

    def get_metrics(self) -> dict:
        """Lay metrics hien tai."""
        metrics = {}
        for q in self.queues:
            metrics[q.name] = {
                "pending": len(q),
                "workers": len(q.workers),
                "failed": q.failed_job_registry.count,
            }
        return metrics

    def check_health(self) -> list:
        """Kiem tra suc khoe va tao canh bao."""
        alerts = []
        metrics = self.get_metrics()

        for name, m in metrics.items():
            # Canh bao khi queue qua dai
            if m["pending"] > 100:
                alerts.append({
                    "level": "critical",
                    "queue": name,
                    "message": f"Queue {name} co {m['pending']} jobs pending",
                    "action": "Tang so workers hoac giam traffic"
                })
            elif m["pending"] > 50:
                alerts.append({
                    "level": "warning",
                    "queue": name,
                    "message": f"Queue {name} dang tang: {m['pending']} jobs",
                })

            # Canh bao khi khong co worker
            if m["workers"] == 0:
                alerts.append({
                    "level": "critical",
                    "queue": name,
                    "message": f"Queue {name} khong co worker nao!",
                    "action": "Khoi dong lai workers ngay"
                })

            # Canh bao khi failed jobs nhieu
            if m["failed"] > 20:
                alerts.append({
                    "level": "warning",
                    "queue": name,
                    "message": f"{m['failed']} failed jobs trong queue {name}",
                    "action": "Kiem tra DLQ va error logs"
                })

        self.alerts = alerts
        return alerts

    def print_dashboard(self):
        """In dashboard don gian."""
        metrics = self.get_metrics()
        print("=" * 50)
        print("QUEUE DASHBOARD")
        print("=" * 50)
        for name, m in metrics.items():
            status = "OK" if m["pending"] < 50 else "WARN" if m["pending"] < 100 else "CRIT"
            print(f"  [{status}] {name}: "
                  f"{m['pending']} pending | "
                  f"{m['workers']} workers | "
                  f"{m['failed']} failed")
        print("=" * 50)

SQS Alternative cho AWS

Nếu ứng dụng chạy trên AWS, bạn có thể dùng SQS thay cho Redis:

import boto3
import json

class SQSClaudeQueue:
    """Queue dung AWS SQS."""

    def __init__(self, queue_url: str):
        self.sqs = boto3.client('sqs')
        self.queue_url = queue_url

    def send_message(self, request_id: str, message: str,
                      priority: str = "normal"):
        """Gui message vao SQS queue."""
        self.sqs.send_message(
            QueueUrl=self.queue_url,
            MessageBody=json.dumps({
                "request_id": request_id,
                "message": message,
                "priority": priority,
            }),
            MessageGroupId=priority,  # FIFO queue
            MessageDeduplicationId=request_id,
        )

    def receive_and_process(self, process_fn):
        """Nhan va xu ly messages tu queue."""
        while True:
            response = self.sqs.receive_message(
                QueueUrl=self.queue_url,
                MaxNumberOfMessages=1,
                WaitTimeSeconds=20,  # Long polling
            )

            messages = response.get('Messages', [])
            for msg in messages:
                body = json.loads(msg['Body'])

                try:
                    result = process_fn(body)

                    # Xoa message sau khi xu ly thanh cong
                    self.sqs.delete_message(
                        QueueUrl=self.queue_url,
                        ReceiptHandle=msg['ReceiptHandle']
                    )
                except Exception as e:
                    print(f"Error processing {body['request_id']}: {e}")
                    # Message se tu dong quay lai queue sau visibility timeout

Rate Limit Handling

Anthropic API có rate limits nghiêm ngặt. Hệ thống async cần xử lý rate limit một cách thông minh để không mất request:

import time
import random

class RateLimitHandler:
    """Xu ly rate limit voi exponential backoff."""

    def __init__(self, client, max_retries: int = 5):
        self.client = client
        self.max_retries = max_retries
        self.request_timestamps = []  # Theo doi request rate

    def _wait_if_needed(self):
        """Kiem tra va cho neu gan dat rate limit."""
        now = time.time()
        # Xoa cac timestamps cu hon 60 giay
        self.request_timestamps = [
            t for t in self.request_timestamps if now - t < 60
        ]
        # Neu gan dat limit (80% cua 50 req/min)
        if len(self.request_timestamps) >= 40:
            sleep_time = 60 - (now - self.request_timestamps[0])
            if sleep_time > 0:
                print(f"Approaching rate limit, waiting {sleep_time:.1f}s")
                time.sleep(sleep_time)

    def call_with_retry(self, model: str, messages: list,
                         max_tokens: int = 4096, **kwargs) -> dict:
        """Goi API voi retry logic."""
        self._wait_if_needed()

        for attempt in range(self.max_retries):
            try:
                self.request_timestamps.append(time.time())
                response = self.client.messages.create(
                    model=model,
                    max_tokens=max_tokens,
                    messages=messages,
                    **kwargs
                )
                return {
                    "success": True,
                    "response": response,
                    "attempts": attempt + 1,
                }

            except Exception as e:
                error_type = type(e).__name__
                if "RateLimit" in error_type:
                    # Exponential backoff voi jitter
                    base_delay = 2 ** attempt
                    jitter = random.uniform(0, base_delay * 0.5)
                    delay = base_delay + jitter
                    print(f"Rate limited (attempt {attempt + 1}), "
                          f"waiting {delay:.1f}s...")
                    time.sleep(delay)
                elif "Overloaded" in error_type:
                    # API overloaded, cho lau hon
                    delay = 5 * (attempt + 1)
                    print(f"API overloaded, waiting {delay}s...")
                    time.sleep(delay)
                else:
                    raise  # Loi khac, khong retry

        return {"success": False, "error": "Max retries exceeded"}

Webhook Callbacks

Thay vì client poll kết quả, bạn có thể dùng webhook để thông báo khi xử lý xong. Phương pháp này hiệu quả hơn và giảm tải cho server:

# Webhook handler phia server
from flask import Flask, request, jsonify
import hmac
import hashlib

app = Flask(__name__)
WEBHOOK_SECRET = "your-webhook-secret"

def send_webhook(callback_url: str, payload: dict):
    """Gui ket qua qua webhook voi signature xac thuc."""
    import requests as req

    body = json.dumps(payload)

    # Tao signature de client xac thuc
    signature = hmac.new(
        WEBHOOK_SECRET.encode(),
        body.encode(),
        hashlib.sha256
    ).hexdigest()

    headers = {
        "Content-Type": "application/json",
        "X-Webhook-Signature": signature,
    }

    try:
        response = req.post(
            callback_url,
            data=body,
            headers=headers,
            timeout=10
        )

        if response.status_code != 200:
            # Retry sau 30 giay
            print(f"Webhook failed ({response.status_code}), will retry")
            return False

        return True

    except req.exceptions.RequestException as e:
        print(f"Webhook error: {e}")
        return False


# Phia client nhan webhook
@app.route('/webhook/claude-result', methods=['POST'])
def receive_webhook():
    """Nhan ket qua tu Claude processing."""

    # Xac thuc signature
    signature = request.headers.get('X-Webhook-Signature', '')
    expected = hmac.new(
        WEBHOOK_SECRET.encode(),
        request.data,
        hashlib.sha256
    ).hexdigest()

    if not hmac.compare_digest(signature, expected):
        return jsonify({"error": "Invalid signature"}), 401

    data = request.json
    request_id = data.get("request_id")
    result = data.get("response")

    # Xu ly ket qua (luu database, thong bao user, v.v.)
    print(f"Received result for {request_id}: {len(result)} chars")

    return jsonify({"status": "received"}), 200

Batch Processing với Claude API

Khi cần xử lý hàng trăm hoặc hàng ngàn items (ví dụ: dịch 500 bài viết, phân tích 1000 reviews), batch processing là cách tiếp cận tối ưu:

import asyncio
from typing import List, Callable

class BatchProcessor:
    """Xu ly batch lon voi progress tracking."""

    def __init__(self, max_concurrent: int = 5, batch_size: int = 50):
        self.max_concurrent = max_concurrent
        self.batch_size = batch_size
        self.completed = 0
        self.total = 0
        self.failed = []

    async def process_batch(self, items: list,
                             process_fn: Callable,
                             on_progress: Callable = None) -> list:
        """Xu ly danh sach items theo batches."""
        self.total = len(items)
        self.completed = 0
        all_results = []

        # Chia thanh batches
        batches = [
            items[i:i + self.batch_size]
            for i in range(0, len(items), self.batch_size)
        ]

        for batch_idx, batch in enumerate(batches):
            print(f"Batch {batch_idx + 1}/{len(batches)} "
                  f"({len(batch)} items)")

            # Xu ly tung batch voi concurrency control
            semaphore = asyncio.Semaphore(self.max_concurrent)

            async def process_with_sem(item):
                async with semaphore:
                    try:
                        result = await process_fn(item)
                        self.completed += 1
                        if on_progress:
                            on_progress(self.completed, self.total)
                        return result
                    except Exception as e:
                        self.failed.append({"item": item, "error": str(e)})
                        self.completed += 1
                        return None

            tasks = [process_with_sem(item) for item in batch]
            results = await asyncio.gather(*tasks)
            all_results.extend([r for r in results if r is not None])

            # Nghi giua cac batches de tranh rate limit
            if batch_idx < len(batches) - 1:
                print("  Pausing 5s between batches...")
                await asyncio.sleep(5)

        print(f"
Completed: {self.completed}/{self.total}")
        print(f"Failed: {len(self.failed)}")

        return all_results

Best Practices

1. Luôn set timeout

Mọi request đến Claude API cần có timeout. Không để request chạy vô hạn — set 60-120 giây là hợp lý cho đa số trường hợp.

2. Implement backpressure

Khi queue đầy, từ chối request mới thay vì chấp nhận vô hạn. Trả về HTTP 429 hoặc 503 để client biết cần retry sau.

3. Idempotency

Đảm bảo xử lý cùng request nhiều lần cho cùng kết quả. Dùng request_id để deduplicate và tránh xử lý trùng.

4. Graceful shutdown

Khi stop worker, đợi request hiện tại hoàn thành trước khi tắt. Không kill giữa chừng — sẽ mất kết quả và token đã tiêu thụ.

5. Log mọi thứ

Log request_id, model, tokens, thời gian xử lý, và trạng thái cho mọi request. Đây là dữ liệu quý giá để debug và tối ưu.

Tóm tắt

Async patterns là bắt buộc khi xây dựng ứng dụng production với Claude API. Các điểm chính:

  • Dùng queue (Redis/SQS) để tách việc nhận và xử lý request
  • Worker pool với số lượng workers tính theo throughput và rate limit
  • Semaphore để giới hạn concurrency, tránh vượt rate limit
  • Priority queue để ưu tiên request quan trọng
  • Dead Letter Queue cho các request thất bại nhiều lần
  • Monitoring queue depth để phát hiện vấn đề sớm

Tìm hiểu thêm về các patterns API nâng cao tại Thư viện Nâng cao.

Tính năng liên quan:Async ProcessingQueue ArchitectureWorker PoolConcurrency Control

Bai viet co huu ich khong?

Bản quyền thuộc về tác giả. Vui lòng dẫn nguồn khi chia sẻ.

Bình luận (0)
Ảnh đại diện
Đăng nhập để bình luận...
Đăng nhập để bình luận
  • Đang tải bình luận...

Đăng ký nhận bản tin

Nhận bài viết hay nhất về sản phẩm và vận hành, gửi thẳng vào hộp thư của bạn.

Bảo mật thông tin. Hủy đăng ký bất cứ lúc nào. Chính sách bảo mật.