{"product_id":"async-patterns-cho-claude-api-queue-worker-va-concurrent-processing","title":"Async Patterns cho Claude API — Queue, Worker va Concurrent Processing","description":"\n\u003cp\u003eClaude API có latency từ 2 đến 30 giây tùy độ phức tạp của request. Nếu ứng dụng của bạn xử lý request đồng bộ (synchronous), người dùng sẽ phải chờ và server bị block. Async patterns giúp bạn xử lý hàng ngàn requests đồng thời mà không làm chậm hệ thống.\u003c\/p\u003e\n\n\u003ch2\u003eTại sao Async quan trọng với Claude API?\u003c\/h2\u003e\n\u003cp\u003eKhác với các API thông thường trả lời trong 50-200ms, Claude API cần thời gian suy nghĩ. Một request đơn giản mất 2-3 giây, request phức tạp có thể mất 15-30 giây. Trong thời gian đó:\u003c\/p\u003e\n\u003cul\u003e\n  \u003cli\u003e\n\u003cstrong\u003eServer thread bị block:\u003c\/strong\u003e Không thể xử lý request khác\u003c\/li\u003e\n  \u003cli\u003e\n\u003cstrong\u003eNgười dùng chờ:\u003c\/strong\u003e UX tệ, bounce rate cao\u003c\/li\u003e\n  \u003cli\u003e\n\u003cstrong\u003eTimeout:\u003c\/strong\u003e Reverse proxy (Nginx) thường timeout sau 30-60 giây\u003c\/li\u003e\n  \u003cli\u003e\n\u003cstrong\u003eRate limit:\u003c\/strong\u003e Anthropic giới hạn số requests đồng thời\u003c\/li\u003e\n\u003c\/ul\u003e\n\n\u003ch3\u003eMô hình đồng bộ vs bất đồng bộ\u003c\/h3\u003e\n\u003cpre\u003e\u003ccode\u003e# Mo hinh dong bo (BAD):\n# User -\u0026gt; API Server -\u0026gt; Claude API (cho 10s) -\u0026gt; Response -\u0026gt; User\n# Server bi block 10 giay, chi xu ly 1 request\/thread\n\n# Mo hinh bat dong bo (GOOD):\n# User -\u0026gt; API Server -\u0026gt; Queue -\u0026gt; Response \"Da nhan, dang xu ly\"\n#                         |\n#                    Worker Pool -\u0026gt; Claude API -\u0026gt; Callback\/Webhook\n#                                                    |\n#                                              User nhan ket qua\u003c\/code\u003e\u003c\/pre\u003e\n\n\u003ch2\u003eQueue-based Architecture\u003c\/h2\u003e\n\u003cp\u003eQueue là thành phần trung tâm của async pattern. Nó tách biệt việc nhận request (producer) và xử lý request (consumer), cho phép scale độc lập.\u003c\/p\u003e\n\n\u003ch3\u003eKiến trúc với Redis Queue\u003c\/h3\u003e\n\u003cpre\u003e\u003ccode\u003e# Cai dat dependencies\n# pip install redis rq anthropic\n\nimport redis\nfrom rq import Queue, Worker\nimport anthropic\nimport json\nfrom datetime import datetime\n\n# Ket noi Redis\nredis_conn = redis.Redis(host='localhost', port=6379, db=0)\n\n# Tao queue voi cac muc uu tien khac nhau\nhigh_queue = Queue('high', connection=redis_conn)\ndefault_queue = Queue('default', connection=redis_conn)\nlow_queue = Queue('low', connection=redis_conn)\n\n\ndef process_claude_request(request_id: str, message: str,\n                           model: str = \"claude-sonnet-4-20250514\",\n                           system_prompt: str = \"\",\n                           callback_url: str = None) -\u0026gt; dict:\n    \"\"\"Ham xu ly request — chay trong worker.\"\"\"\n    client = anthropic.Anthropic()\n\n    start_time = datetime.now()\n\n    try:\n        kwargs = {\n            \"model\": model,\n            \"max_tokens\": 4096,\n            \"messages\": [{\"role\": \"user\", \"content\": message}],\n        }\n        if system_prompt:\n            kwargs[\"system\"] = system_prompt\n\n        response = client.messages.create(**kwargs)\n\n        result = {\n            \"request_id\": request_id,\n            \"status\": \"completed\",\n            \"response\": response.content[0].text,\n            \"model\": model,\n            \"input_tokens\": response.usage.input_tokens,\n            \"output_tokens\": response.usage.output_tokens,\n            \"processing_time\": (datetime.now() - start_time).total_seconds(),\n        }\n\n    except anthropic.RateLimitError:\n        result = {\n            \"request_id\": request_id,\n            \"status\": \"rate_limited\",\n            \"error\": \"Rate limit exceeded, will retry\",\n            \"retry\": True,\n        }\n\n    except Exception as e:\n        result = {\n            \"request_id\": request_id,\n            \"status\": \"failed\",\n            \"error\": str(e),\n        }\n\n    # Luu ket qua vao Redis de client poll\n    redis_conn.setex(\n        f\"result:{request_id}\",\n        3600,  # TTL 1 gio\n        json.dumps(result)\n    )\n\n    # Goi callback neu co\n    if callback_url and result[\"status\"] == \"completed\":\n        import requests\n        try:\n            requests.post(callback_url, json=result, timeout=5)\n        except Exception:\n            pass\n\n    return result\u003c\/code\u003e\u003c\/pre\u003e\n\n\u003ch3\u003eAPI Server nhận request\u003c\/h3\u003e\n\u003cpre\u003e\u003ccode\u003e# Flask API server\nfrom flask import Flask, request, jsonify\nimport uuid\n\napp = Flask(__name__)\n\n@app.route('\/api\/claude', methods=['POST'])\ndef submit_request():\n    \"\"\"Nhan request va day vao queue.\"\"\"\n    data = request.json\n    request_id = str(uuid.uuid4())\n\n    # Chon queue dua tren priority\n    priority = data.get('priority', 'default')\n    queue_map = {\n        'high': high_queue,\n        'default': default_queue,\n        'low': low_queue,\n    }\n    q = queue_map.get(priority, default_queue)\n\n    # Day vao queue\n    job = q.enqueue(\n        process_claude_request,\n        request_id=request_id,\n        message=data['message'],\n        model=data.get('model', 'claude-sonnet-4-20250514'),\n        system_prompt=data.get('system_prompt', ''),\n        callback_url=data.get('callback_url'),\n        job_timeout=120,  # Timeout 2 phut\n        retry=3,          # Retry 3 lan neu fail\n    )\n\n    return jsonify({\n        \"request_id\": request_id,\n        \"status\": \"queued\",\n        \"position\": len(q),\n        \"estimated_time\": len(q) * 5,  # Uoc tinh 5s\/request\n    }), 202  # 202 Accepted\n\n@app.route('\/api\/claude\/\u003crequest_id\u003e', methods=['GET'])\ndef get_result(request_id):\n    \"\"\"Kiem tra ket qua.\"\"\"\n    result = redis_conn.get(f\"result:{request_id}\")\n\n    if result:\n        return jsonify(json.loads(result))\n    else:\n        return jsonify({\n            \"request_id\": request_id,\n            \"status\": \"processing\",\n        }), 202\u003c\/request_id\u003e\u003c\/code\u003e\u003c\/pre\u003e\n\n\u003ch2\u003eWorker Pool Pattern\u003c\/h2\u003e\n\u003cp\u003eWorker pool cho phép bạn chạy nhiều worker đồng thời, mỗi worker xử lý một request từ queue. Số lượng worker quyết định throughput của hệ thống.\u003c\/p\u003e\n\n\u003ch3\u003eCấu hình workers\u003c\/h3\u003e\n\u003cpre\u003e\u003ccode\u003e# Chay workers (moi lenh trong 1 terminal rieng)\n\n# Worker cho queue high priority\nrq worker high default low --name worker-1\n\n# Worker thu 2\nrq worker high default low --name worker-2\n\n# Worker thu 3 chi xu ly low priority\nrq worker low --name worker-3\n\n# Hoac dung supervisor de quan ly workers:\n# \/etc\/supervisor\/conf.d\/claude-workers.conf\n# [program:claude-worker]\n# command=rq worker high default low\n# numprocs=5\n# process_name=%(program_name)s-%(process_num)02d\n# autostart=true\n# autorestart=true\u003c\/code\u003e\u003c\/pre\u003e\n\n\u003ch3\u003eTính số lượng workers tối ưu\u003c\/h3\u003e\n\u003cpre\u003e\u003ccode\u003e# Cong thuc:\n# workers_needed = (requests_per_minute * avg_processing_time) \/ 60\n\n# Vi du:\n# - 100 requests\/phut\n# - Trung binh 6 giay\/request\n# - workers_needed = (100 * 6) \/ 60 = 10 workers\n\n# Nhung can tinh ca rate limit cua Anthropic:\n# - Tier 1: 50 requests\/phut\n# - Tier 2: 1000 requests\/phut\n# - Tier 3: 2000 requests\/phut\n\n# Neu ban o Tier 1, chi can toi da 5 workers\n# (50 req\/min * 6s \/ 60 = 5)\u003c\/code\u003e\u003c\/pre\u003e\n\n\u003ch2\u003eConcurrency Control với Semaphore\u003c\/h2\u003e\n\u003cp\u003eKhi dùng asyncio (Python) hoặc Promise (Node.js), bạn cần giới hạn số request đồng thời để không vượt rate limit:\u003c\/p\u003e\n\n\u003ch3\u003ePython asyncio implementation\u003c\/h3\u003e\n\u003cpre\u003e\u003ccode\u003eimport asyncio\nimport anthropic\nfrom datetime import datetime\n\nclass AsyncClaudeProcessor:\n    \"\"\"Xu ly nhieu requests dong thoi voi concurrency control.\"\"\"\n\n    def __init__(self, max_concurrent: int = 10):\n        self.client = anthropic.AsyncAnthropic()\n        self.semaphore = asyncio.Semaphore(max_concurrent)\n        self.results = []\n        self.errors = []\n\n    async def process_single(self, request_id: str, message: str,\n                              model: str = \"claude-sonnet-4-20250514\") -\u0026gt; dict:\n        \"\"\"Xu ly mot request voi semaphore control.\"\"\"\n        async with self.semaphore:\n            start = datetime.now()\n            try:\n                response = await self.client.messages.create(\n                    model=model,\n                    max_tokens=4096,\n                    messages=[{\"role\": \"user\", \"content\": message}]\n                )\n\n                result = {\n                    \"request_id\": request_id,\n                    \"status\": \"completed\",\n                    \"response\": response.content[0].text,\n                    \"tokens\": response.usage.output_tokens,\n                    \"time\": (datetime.now() - start).total_seconds(),\n                }\n                self.results.append(result)\n                return result\n\n            except anthropic.RateLimitError:\n                # Doi va retry\n                await asyncio.sleep(5)\n                return await self.process_single(request_id, message, model)\n\n            except Exception as e:\n                error = {\n                    \"request_id\": request_id,\n                    \"status\": \"failed\",\n                    \"error\": str(e),\n                }\n                self.errors.append(error)\n                return error\n\n    async def process_batch(self, requests: list) -\u0026gt; list:\n        \"\"\"Xu ly mot batch requests dong thoi.\"\"\"\n        tasks = [\n            self.process_single(\n                request_id=req[\"id\"],\n                message=req[\"message\"],\n                model=req.get(\"model\", \"claude-sonnet-4-20250514\")\n            )\n            for req in requests\n        ]\n\n        results = await asyncio.gather(*tasks, return_exceptions=True)\n        return results\n\n    def get_summary(self) -\u0026gt; dict:\n        \"\"\"Tong hop ket qua.\"\"\"\n        if not self.results:\n            return {\"total\": 0}\n\n        times = [r[\"time\"] for r in self.results]\n        return {\n            \"total\": len(self.results),\n            \"errors\": len(self.errors),\n            \"avg_time\": round(sum(times) \/ len(times), 2),\n            \"min_time\": round(min(times), 2),\n            \"max_time\": round(max(times), 2),\n            \"total_tokens\": sum(r[\"tokens\"] for r in self.results),\n        }\n\n\n# Su dung\nasync def main():\n    processor = AsyncClaudeProcessor(max_concurrent=5)\n\n    requests = [\n        {\"id\": f\"req-{i}\", \"message\": f\"Giai thich khai niem {topic}\"}\n        for i, topic in enumerate([\n            \"machine learning\", \"neural networks\", \"gradient descent\",\n            \"backpropagation\", \"attention mechanism\", \"transformer\",\n            \"fine-tuning\", \"prompt engineering\", \"RAG\", \"embeddings\"\n        ])\n    ]\n\n    print(f\"Processing {len(requests)} requests (max 5 concurrent)...\")\n    results = await processor.process_batch(requests)\n    print(processor.get_summary())\n\nasyncio.run(main())\u003c\/code\u003e\u003c\/pre\u003e\n\n\u003ch3\u003eNode.js implementation\u003c\/h3\u003e\n\u003cpre\u003e\u003ccode\u003econst Anthropic = require('@anthropic-ai\/sdk');\n\nclass AsyncProcessor {\n  constructor(maxConcurrent = 10) {\n    this.client = new Anthropic();\n    this.maxConcurrent = maxConcurrent;\n    this.running = 0;\n    this.queue = [];\n    this.results = [];\n  }\n\n  async processWithLimit(requests) {\n    const promises = requests.map(req =\u0026gt;\n      this._enqueue(req)\n    );\n    return Promise.all(promises);\n  }\n\n  _enqueue(request) {\n    return new Promise((resolve, reject) =\u0026gt; {\n      const task = async () =\u0026gt; {\n        try {\n          const result = await this._processOne(request);\n          resolve(result);\n        } catch (err) {\n          reject(err);\n        } finally {\n          this.running--;\n          this._dequeue();\n        }\n      };\n\n      if (this.running \u0026lt; this.maxConcurrent) {\n        this.running++;\n        task();\n      } else {\n        this.queue.push(task);\n      }\n    });\n  }\n\n  _dequeue() {\n    if (this.queue.length \u0026gt; 0 \u0026amp;\u0026amp; this.running \u0026lt; this.maxConcurrent) {\n      this.running++;\n      const task = this.queue.shift();\n      task();\n    }\n  }\n\n  async _processOne(request) {\n    const start = Date.now();\n\n    const response = await this.client.messages.create({\n      model: request.model || 'claude-sonnet-4-20250514',\n      max_tokens: 4096,\n      messages: [{ role: 'user', content: request.message }],\n    });\n\n    const result = {\n      requestId: request.id,\n      response: response.content[0].text,\n      tokens: response.usage.output_tokens,\n      timeMs: Date.now() - start,\n    };\n\n    this.results.push(result);\n    return result;\n  }\n}\n\n\/\/ Su dung\nasync function main() {\n  const processor = new AsyncProcessor(5);\n\n  const requests = Array.from({ length: 20 }, (_, i) =\u0026gt; ({\n    id: `req-${i}`,\n    message: `Giai thich ngan gon ve concept so ${i + 1} trong AI`,\n  }));\n\n  console.log(`Processing ${requests.length} requests...`);\n  const results = await processor.processWithLimit(requests);\n  console.log(`Done. Total results: ${results.length}`);\n}\n\nmain();\u003c\/code\u003e\u003c\/pre\u003e\n\n\u003ch2\u003ePriority Queuing\u003c\/h2\u003e\n\u003cp\u003eKhông phải mọi request đều quan trọng như nhau. Priority queuing đảm bảo request quan trọng được xử lý trước:\u003c\/p\u003e\n\n\u003cpre\u003e\u003ccode\u003eimport heapq\nimport asyncio\nfrom dataclasses import dataclass, field\nfrom typing import Any\n\n@dataclass(order=True)\nclass PriorityRequest:\n    priority: int  # So nho = uu tien cao\n    timestamp: float = field(compare=False)\n    request: dict = field(compare=False)\n    future: asyncio.Future = field(compare=False, default=None)\n\nclass PriorityQueue:\n    \"\"\"Queue voi muc uu tien.\"\"\"\n\n    PRIORITIES = {\n        \"critical\": 0,   # System alerts, error handling\n        \"high\": 1,        # Paying users, real-time chat\n        \"normal\": 2,      # Standard requests\n        \"low\": 3,         # Background tasks, batch processing\n        \"bulk\": 4,        # Data processing, analytics\n    }\n\n    def __init__(self):\n        self.heap = []\n        self.counter = 0\n\n    def enqueue(self, request: dict, priority: str = \"normal\") -\u0026gt; asyncio.Future:\n        \"\"\"Them request vao queue voi priority.\"\"\"\n        import time\n        future = asyncio.get_event_loop().create_future()\n\n        item = PriorityRequest(\n            priority=self.PRIORITIES.get(priority, 2),\n            timestamp=time.time(),\n            request=request,\n            future=future,\n        )\n\n        heapq.heappush(self.heap, item)\n        self.counter += 1\n        return future\n\n    def dequeue(self):\n        \"\"\"Lay request uu tien cao nhat.\"\"\"\n        if self.heap:\n            return heapq.heappop(self.heap)\n        return None\n\n    def size(self) -\u0026gt; int:\n        return len(self.heap)\n\n    def size_by_priority(self) -\u0026gt; dict:\n        \"\"\"Dem so request theo tung muc uu tien.\"\"\"\n        counts = {}\n        for item in self.heap:\n            p = item.priority\n            counts[p] = counts.get(p, 0) + 1\n        return counts\u003c\/code\u003e\u003c\/pre\u003e\n\n\u003ch2\u003eDead Letter Queue\u003c\/h2\u003e\n\u003cp\u003eKhi một request thất bại nhiều lần, nó cần được chuyển sang Dead Letter Queue (DLQ) để xử lý riêng, tránh block queue chính:\u003c\/p\u003e\n\n\u003cpre\u003e\u003ccode\u003eclass DeadLetterQueue:\n    \"\"\"Queue cho cac request that bai nhieu lan.\"\"\"\n\n    def __init__(self, redis_conn, max_retries: int = 3):\n        self.redis = redis_conn\n        self.max_retries = max_retries\n        self.dlq_key = \"dlq:claude\"\n\n    def should_retry(self, request_id: str) -\u0026gt; bool:\n        \"\"\"Kiem tra xem request con duoc retry khong.\"\"\"\n        retry_count = self.redis.get(f\"retry:{request_id}\")\n        if retry_count is None:\n            return True\n        return int(retry_count) \u0026lt; self.max_retries\n\n    def increment_retry(self, request_id: str):\n        \"\"\"Tang dem retry.\"\"\"\n        key = f\"retry:{request_id}\"\n        self.redis.incr(key)\n        self.redis.expire(key, 3600)  # TTL 1 gio\n\n    def send_to_dlq(self, request_id: str, request_data: dict,\n                     error: str):\n        \"\"\"Chuyen request that bai vao DLQ.\"\"\"\n        dlq_entry = json.dumps({\n            \"request_id\": request_id,\n            \"data\": request_data,\n            \"error\": error,\n            \"failed_at\": datetime.now().isoformat(),\n            \"retry_count\": int(self.redis.get(f\"retry:{request_id}\") or 0),\n        })\n        self.redis.lpush(self.dlq_key, dlq_entry)\n\n    def get_dlq_size(self) -\u0026gt; int:\n        \"\"\"Dem so request trong DLQ.\"\"\"\n        return self.redis.llen(self.dlq_key)\n\n    def process_dlq(self, handler_fn):\n        \"\"\"Xu ly thu cong cac request trong DLQ.\"\"\"\n        while True:\n            entry = self.redis.rpop(self.dlq_key)\n            if not entry:\n                break\n            data = json.loads(entry)\n            handler_fn(data)\n\n\n# Tich hop vao worker\ndlq = DeadLetterQueue(redis_conn, max_retries=3)\n\ndef process_with_dlq(request_id: str, message: str, **kwargs):\n    \"\"\"Xu ly request voi DLQ support.\"\"\"\n    try:\n        result = process_claude_request(request_id, message, **kwargs)\n\n        if result.get(\"retry\"):\n            if dlq.should_retry(request_id):\n                dlq.increment_retry(request_id)\n                # Re-enqueue\n                default_queue.enqueue(\n                    process_with_dlq,\n                    request_id=request_id,\n                    message=message,\n                    **kwargs\n                )\n            else:\n                dlq.send_to_dlq(request_id, {\"message\": message},\n                               result.get(\"error\", \"Max retries exceeded\"))\n\n        return result\n\n    except Exception as e:\n        if dlq.should_retry(request_id):\n            dlq.increment_retry(request_id)\n            raise  # RQ se tu retry\n        else:\n            dlq.send_to_dlq(request_id, {\"message\": message}, str(e))\u003c\/code\u003e\u003c\/pre\u003e\n\n\u003ch2\u003eMonitoring Queue Depth\u003c\/h2\u003e\n\u003cp\u003eGiám sát queue là yếu tố sống còn để hệ thống hoạt động ổn định:\u003c\/p\u003e\n\n\u003cpre\u003e\u003ccode\u003eimport time\n\nclass QueueMonitor:\n    \"\"\"Giam sat suc khoe cua queue system.\"\"\"\n\n    def __init__(self, redis_conn, queues: list):\n        self.redis = redis_conn\n        self.queues = queues\n        self.alerts = []\n\n    def get_metrics(self) -\u0026gt; dict:\n        \"\"\"Lay metrics hien tai.\"\"\"\n        metrics = {}\n        for q in self.queues:\n            metrics[q.name] = {\n                \"pending\": len(q),\n                \"workers\": len(q.workers),\n                \"failed\": q.failed_job_registry.count,\n            }\n        return metrics\n\n    def check_health(self) -\u0026gt; list:\n        \"\"\"Kiem tra suc khoe va tao canh bao.\"\"\"\n        alerts = []\n        metrics = self.get_metrics()\n\n        for name, m in metrics.items():\n            # Canh bao khi queue qua dai\n            if m[\"pending\"] \u0026gt; 100:\n                alerts.append({\n                    \"level\": \"critical\",\n                    \"queue\": name,\n                    \"message\": f\"Queue {name} co {m['pending']} jobs pending\",\n                    \"action\": \"Tang so workers hoac giam traffic\"\n                })\n            elif m[\"pending\"] \u0026gt; 50:\n                alerts.append({\n                    \"level\": \"warning\",\n                    \"queue\": name,\n                    \"message\": f\"Queue {name} dang tang: {m['pending']} jobs\",\n                })\n\n            # Canh bao khi khong co worker\n            if m[\"workers\"] == 0:\n                alerts.append({\n                    \"level\": \"critical\",\n                    \"queue\": name,\n                    \"message\": f\"Queue {name} khong co worker nao!\",\n                    \"action\": \"Khoi dong lai workers ngay\"\n                })\n\n            # Canh bao khi failed jobs nhieu\n            if m[\"failed\"] \u0026gt; 20:\n                alerts.append({\n                    \"level\": \"warning\",\n                    \"queue\": name,\n                    \"message\": f\"{m['failed']} failed jobs trong queue {name}\",\n                    \"action\": \"Kiem tra DLQ va error logs\"\n                })\n\n        self.alerts = alerts\n        return alerts\n\n    def print_dashboard(self):\n        \"\"\"In dashboard don gian.\"\"\"\n        metrics = self.get_metrics()\n        print(\"=\" * 50)\n        print(\"QUEUE DASHBOARD\")\n        print(\"=\" * 50)\n        for name, m in metrics.items():\n            status = \"OK\" if m[\"pending\"] \u0026lt; 50 else \"WARN\" if m[\"pending\"] \u0026lt; 100 else \"CRIT\"\n            print(f\"  [{status}] {name}: \"\n                  f\"{m['pending']} pending | \"\n                  f\"{m['workers']} workers | \"\n                  f\"{m['failed']} failed\")\n        print(\"=\" * 50)\u003c\/code\u003e\u003c\/pre\u003e\n\n\u003ch2\u003eSQS Alternative cho AWS\u003c\/h2\u003e\n\u003cp\u003eNếu ứng dụng chạy trên AWS, bạn có thể dùng SQS thay cho Redis:\u003c\/p\u003e\n\n\u003cpre\u003e\u003ccode\u003eimport boto3\nimport json\n\nclass SQSClaudeQueue:\n    \"\"\"Queue dung AWS SQS.\"\"\"\n\n    def __init__(self, queue_url: str):\n        self.sqs = boto3.client('sqs')\n        self.queue_url = queue_url\n\n    def send_message(self, request_id: str, message: str,\n                      priority: str = \"normal\"):\n        \"\"\"Gui message vao SQS queue.\"\"\"\n        self.sqs.send_message(\n            QueueUrl=self.queue_url,\n            MessageBody=json.dumps({\n                \"request_id\": request_id,\n                \"message\": message,\n                \"priority\": priority,\n            }),\n            MessageGroupId=priority,  # FIFO queue\n            MessageDeduplicationId=request_id,\n        )\n\n    def receive_and_process(self, process_fn):\n        \"\"\"Nhan va xu ly messages tu queue.\"\"\"\n        while True:\n            response = self.sqs.receive_message(\n                QueueUrl=self.queue_url,\n                MaxNumberOfMessages=1,\n                WaitTimeSeconds=20,  # Long polling\n            )\n\n            messages = response.get('Messages', [])\n            for msg in messages:\n                body = json.loads(msg['Body'])\n\n                try:\n                    result = process_fn(body)\n\n                    # Xoa message sau khi xu ly thanh cong\n                    self.sqs.delete_message(\n                        QueueUrl=self.queue_url,\n                        ReceiptHandle=msg['ReceiptHandle']\n                    )\n                except Exception as e:\n                    print(f\"Error processing {body['request_id']}: {e}\")\n                    # Message se tu dong quay lai queue sau visibility timeout\u003c\/code\u003e\u003c\/pre\u003e\n\n\u003ch2\u003eRate Limit Handling\u003c\/h2\u003e\n\u003cp\u003eAnthropic API có rate limits nghiêm ngặt. Hệ thống async cần xử lý rate limit một cách thông minh để không mất request:\u003c\/p\u003e\n\n\u003cpre\u003e\u003ccode\u003eimport time\nimport random\n\nclass RateLimitHandler:\n    \"\"\"Xu ly rate limit voi exponential backoff.\"\"\"\n\n    def __init__(self, client, max_retries: int = 5):\n        self.client = client\n        self.max_retries = max_retries\n        self.request_timestamps = []  # Theo doi request rate\n\n    def _wait_if_needed(self):\n        \"\"\"Kiem tra va cho neu gan dat rate limit.\"\"\"\n        now = time.time()\n        # Xoa cac timestamps cu hon 60 giay\n        self.request_timestamps = [\n            t for t in self.request_timestamps if now - t \u0026lt; 60\n        ]\n        # Neu gan dat limit (80% cua 50 req\/min)\n        if len(self.request_timestamps) \u0026gt;= 40:\n            sleep_time = 60 - (now - self.request_timestamps[0])\n            if sleep_time \u0026gt; 0:\n                print(f\"Approaching rate limit, waiting {sleep_time:.1f}s\")\n                time.sleep(sleep_time)\n\n    def call_with_retry(self, model: str, messages: list,\n                         max_tokens: int = 4096, **kwargs) -\u0026gt; dict:\n        \"\"\"Goi API voi retry logic.\"\"\"\n        self._wait_if_needed()\n\n        for attempt in range(self.max_retries):\n            try:\n                self.request_timestamps.append(time.time())\n                response = self.client.messages.create(\n                    model=model,\n                    max_tokens=max_tokens,\n                    messages=messages,\n                    **kwargs\n                )\n                return {\n                    \"success\": True,\n                    \"response\": response,\n                    \"attempts\": attempt + 1,\n                }\n\n            except Exception as e:\n                error_type = type(e).__name__\n                if \"RateLimit\" in error_type:\n                    # Exponential backoff voi jitter\n                    base_delay = 2 ** attempt\n                    jitter = random.uniform(0, base_delay * 0.5)\n                    delay = base_delay + jitter\n                    print(f\"Rate limited (attempt {attempt + 1}), \"\n                          f\"waiting {delay:.1f}s...\")\n                    time.sleep(delay)\n                elif \"Overloaded\" in error_type:\n                    # API overloaded, cho lau hon\n                    delay = 5 * (attempt + 1)\n                    print(f\"API overloaded, waiting {delay}s...\")\n                    time.sleep(delay)\n                else:\n                    raise  # Loi khac, khong retry\n\n        return {\"success\": False, \"error\": \"Max retries exceeded\"}\u003c\/code\u003e\u003c\/pre\u003e\n\n\u003ch2\u003eWebhook Callbacks\u003c\/h2\u003e\n\u003cp\u003eThay vì client poll kết quả, bạn có thể dùng webhook để thông báo khi xử lý xong. Phương pháp này hiệu quả hơn và giảm tải cho server:\u003c\/p\u003e\n\n\u003cpre\u003e\u003ccode\u003e# Webhook handler phia server\nfrom flask import Flask, request, jsonify\nimport hmac\nimport hashlib\n\napp = Flask(__name__)\nWEBHOOK_SECRET = \"your-webhook-secret\"\n\ndef send_webhook(callback_url: str, payload: dict):\n    \"\"\"Gui ket qua qua webhook voi signature xac thuc.\"\"\"\n    import requests as req\n\n    body = json.dumps(payload)\n\n    # Tao signature de client xac thuc\n    signature = hmac.new(\n        WEBHOOK_SECRET.encode(),\n        body.encode(),\n        hashlib.sha256\n    ).hexdigest()\n\n    headers = {\n        \"Content-Type\": \"application\/json\",\n        \"X-Webhook-Signature\": signature,\n    }\n\n    try:\n        response = req.post(\n            callback_url,\n            data=body,\n            headers=headers,\n            timeout=10\n        )\n\n        if response.status_code != 200:\n            # Retry sau 30 giay\n            print(f\"Webhook failed ({response.status_code}), will retry\")\n            return False\n\n        return True\n\n    except req.exceptions.RequestException as e:\n        print(f\"Webhook error: {e}\")\n        return False\n\n\n# Phia client nhan webhook\n@app.route('\/webhook\/claude-result', methods=['POST'])\ndef receive_webhook():\n    \"\"\"Nhan ket qua tu Claude processing.\"\"\"\n\n    # Xac thuc signature\n    signature = request.headers.get('X-Webhook-Signature', '')\n    expected = hmac.new(\n        WEBHOOK_SECRET.encode(),\n        request.data,\n        hashlib.sha256\n    ).hexdigest()\n\n    if not hmac.compare_digest(signature, expected):\n        return jsonify({\"error\": \"Invalid signature\"}), 401\n\n    data = request.json\n    request_id = data.get(\"request_id\")\n    result = data.get(\"response\")\n\n    # Xu ly ket qua (luu database, thong bao user, v.v.)\n    print(f\"Received result for {request_id}: {len(result)} chars\")\n\n    return jsonify({\"status\": \"received\"}), 200\u003c\/code\u003e\u003c\/pre\u003e\n\n\u003ch2\u003eBatch Processing với Claude API\u003c\/h2\u003e\n\u003cp\u003eKhi cần xử lý hàng trăm hoặc hàng ngàn items (ví dụ: dịch 500 bài viết, phân tích 1000 reviews), batch processing là cách tiếp cận tối ưu:\u003c\/p\u003e\n\n\u003cpre\u003e\u003ccode\u003eimport asyncio\nfrom typing import List, Callable\n\nclass BatchProcessor:\n    \"\"\"Xu ly batch lon voi progress tracking.\"\"\"\n\n    def __init__(self, max_concurrent: int = 5, batch_size: int = 50):\n        self.max_concurrent = max_concurrent\n        self.batch_size = batch_size\n        self.completed = 0\n        self.total = 0\n        self.failed = []\n\n    async def process_batch(self, items: list,\n                             process_fn: Callable,\n                             on_progress: Callable = None) -\u0026gt; list:\n        \"\"\"Xu ly danh sach items theo batches.\"\"\"\n        self.total = len(items)\n        self.completed = 0\n        all_results = []\n\n        # Chia thanh batches\n        batches = [\n            items[i:i + self.batch_size]\n            for i in range(0, len(items), self.batch_size)\n        ]\n\n        for batch_idx, batch in enumerate(batches):\n            print(f\"Batch {batch_idx + 1}\/{len(batches)} \"\n                  f\"({len(batch)} items)\")\n\n            # Xu ly tung batch voi concurrency control\n            semaphore = asyncio.Semaphore(self.max_concurrent)\n\n            async def process_with_sem(item):\n                async with semaphore:\n                    try:\n                        result = await process_fn(item)\n                        self.completed += 1\n                        if on_progress:\n                            on_progress(self.completed, self.total)\n                        return result\n                    except Exception as e:\n                        self.failed.append({\"item\": item, \"error\": str(e)})\n                        self.completed += 1\n                        return None\n\n            tasks = [process_with_sem(item) for item in batch]\n            results = await asyncio.gather(*tasks)\n            all_results.extend([r for r in results if r is not None])\n\n            # Nghi giua cac batches de tranh rate limit\n            if batch_idx \u0026lt; len(batches) - 1:\n                print(\"  Pausing 5s between batches...\")\n                await asyncio.sleep(5)\n\n        print(f\"\nCompleted: {self.completed}\/{self.total}\")\n        print(f\"Failed: {len(self.failed)}\")\n\n        return all_results\u003c\/code\u003e\u003c\/pre\u003e\n\n\u003ch2\u003eBest Practices\u003c\/h2\u003e\n\n\u003ch3\u003e1. Luôn set timeout\u003c\/h3\u003e\n\u003cp\u003eMọi request đến Claude API cần có timeout. Không để request chạy vô hạn — set 60-120 giây là hợp lý cho đa số trường hợp.\u003c\/p\u003e\n\n\u003ch3\u003e2. Implement backpressure\u003c\/h3\u003e\n\u003cp\u003eKhi queue đầy, từ chối request mới thay vì chấp nhận vô hạn. Trả về HTTP 429 hoặc 503 để client biết cần retry sau.\u003c\/p\u003e\n\n\u003ch3\u003e3. Idempotency\u003c\/h3\u003e\n\u003cp\u003eĐảm bảo xử lý cùng request nhiều lần cho cùng kết quả. Dùng request_id để deduplicate và tránh xử lý trùng.\u003c\/p\u003e\n\n\u003ch3\u003e4. Graceful shutdown\u003c\/h3\u003e\n\u003cp\u003eKhi stop worker, đợi request hiện tại hoàn thành trước khi tắt. Không kill giữa chừng — sẽ mất kết quả và token đã tiêu thụ.\u003c\/p\u003e\n\n\u003ch3\u003e5. Log mọi thứ\u003c\/h3\u003e\n\u003cp\u003eLog request_id, model, tokens, thời gian xử lý, và trạng thái cho mọi request. Đây là dữ liệu quý giá để debug và tối ưu.\u003c\/p\u003e\n\n\u003ch2\u003eTóm tắt\u003c\/h2\u003e\n\u003cp\u003eAsync patterns là bắt buộc khi xây dựng ứng dụng production với Claude API. Các điểm chính:\u003c\/p\u003e\n\u003cul\u003e\n  \u003cli\u003eDùng queue (Redis\/SQS) để tách việc nhận và xử lý request\u003c\/li\u003e\n  \u003cli\u003eWorker pool với số lượng workers tính theo throughput và rate limit\u003c\/li\u003e\n  \u003cli\u003eSemaphore để giới hạn concurrency, tránh vượt rate limit\u003c\/li\u003e\n  \u003cli\u003ePriority queue để ưu tiên request quan trọng\u003c\/li\u003e\n  \u003cli\u003eDead Letter Queue cho các request thất bại nhiều lần\u003c\/li\u003e\n  \u003cli\u003eMonitoring queue depth để phát hiện vấn đề sớm\u003c\/li\u003e\n\u003c\/ul\u003e\n\u003cp\u003eTìm hiểu thêm về các patterns API nâng cao tại \u003ca href=\"\/collections\/nang-cao\"\u003eThư viện Nâng cao\u003c\/a\u003e.\u003c\/p\u003e\n","brand":"Minh Tuấn","offers":[{"title":"Default Title","offer_id":47730150670548,"sku":null,"price":0.0,"currency_code":"VND","in_stock":true}],"thumbnail_url":"\/\/cdn.shopify.com\/s\/files\/1\/0821\/0264\/9044\/files\/async-patterns-cho-claude-api-queue-worker-va-concurrent-processing.jpg?v=1774715524","url":"https:\/\/claude.vn\/products\/async-patterns-cho-claude-api-queue-worker-va-concurrent-processing","provider":"CLAUDE.VN","version":"1.0","type":"link"}