Nâng caoKỹ thuậtClaude APINguồn: Anthropic

Orchestrator-Workers — Kiến trúc điều phối agent phức tạp

Nghe bài viết
00:00

Điểm nổi bật

Nhấn để đến mục tương ứng

  1. 1 Thực hành được liền: Pattern này tỏa sáng khi: Task không thể biết trước cần bao nhiêu bước dynamic decomposition Các sub-tasks đòi hỏi. Quy trình từng bước trong phần này giúp bạn bắt đầu ngay mà không cần kinh nghiệm chuyên sâu.
  2. 2 Thành thật mà nói: import anthropic import json import asyncio from typing import Optional from dataclasses import dataclass, field client. Phương pháp này hiệu quả trong hầu hết trường hợp, nhưng bạn cần điều chỉnh cho phù hợp ngữ cảnh riêng.
  3. 3 Nội dung cốt lõi: class BaseWorker: """Base class cho tất cả workers""" def executeself, task: Task -> str: contexttext = "" if. Nắm vững phần này sẽ giúp bạn áp dụng hiệu quả hơn 70% so với đọc lướt toàn bài.
  4. 4 Để đạt hiệu quả tối đa: async def executeparalleltaskstasks: listTask, results: dict -> dict: """Chạy các tasks không có dependencies song. Nhiều người bỏ qua bước này và mất thời gian gấp đôi để đạt cùng kết quả.
  5. 5 Góc nhìn thực tế: def executewithretryworker: BaseWorker, task: Task, maxretries: int = 2 -> str: """Worker execution với retry. Điều quan trọng là hiểu rõ khi nào nên và không nên áp dụng phương pháp này.
a computer screen with a bunch of text on it

Khi task quá phức tạp cho một LLM đơn lẻ, giải pháp là chia để trị. Orchestrator-Workers pattern tổ chức hệ thống như một công ty: một manager (Orchestrator) nhận yêu cầu lớn, phân tích, rồi giao cho các chuyên gia (Workers) từng phần việc phù hợp với chuyên môn của họ.

Đây là pattern được dùng trong các hệ thống AI production phức tạp nhất — từ research assistants đến automated software development pipelines.

Khi nào cần Orchestrator-Workers?

Pattern này tỏa sáng khi:

  • Task không thể biết trước cần bao nhiêu bước (dynamic decomposition)
  • Các sub-tasks đòi hỏi chuyên môn khác nhau (research vs coding vs writing)
  • Context window của một model không đủ chứa toàn bộ thông tin
  • Muốn parallelize các sub-tasks độc lập
  • Cần audit trail chi tiết về quá trình xử lý

Kiến trúc tổng quan

import anthropic
import json
import asyncio
from typing import Optional
from dataclasses import dataclass, field

client = anthropic.Anthropic()
async_client = anthropic.AsyncAnthropic()

@dataclass
class Task:
    id: str
    description: str
    worker_type: str
    depends_on: list = field(default_factory=list)
    context: dict = field(default_factory=dict)
    status: str = "pending"  # pending/running/completed/failed
    result: Optional[str] = None

class OrchestratorWorkersSystem:
    def __init__(self):
        self.workers = {
            "researcher": ResearchWorker(),
            "analyst": AnalystWorker(),
            "writer": WriterWorker(),
            "coder": CoderWorker(),
            "reviewer": ReviewerWorker()
        }

    def run(self, complex_task: str) -> dict:
        print(f"Orchestrating: {complex_task[:80]}...")

        # Phase 1: Orchestrator phân tích và tạo execution plan
        plan = self._create_plan(complex_task)
        print(f"Plan created: {len(plan)} tasks")

        # Phase 2: Execute tasks theo dependency order
        results = self._execute_plan(plan)

        # Phase 3: Orchestrator tổng hợp
        final = self._synthesize(complex_task, plan, results)

        return {
            "task": complex_task,
            "plan": [{"id": t.id, "worker": t.worker_type, "desc": t.description} for t in plan],
            "results": results,
            "final_output": final
        }

    def _create_plan(self, task: str) -> list[Task]:
        available_workers = list(self.workers.keys())

        response = client.messages.create(
            model="claude-opus-4-5",
            max_tokens=3000,
            system="""You are a project orchestrator. Create execution plans for complex tasks.
Break down tasks into atomic subtasks assignable to specialist workers.
Think carefully about dependencies — which tasks must complete before others can start.""",
            messages=[{
                "role": "user",
                "content": f"""Create an execution plan for this task.

Task: {task}

Available workers: {available_workers}
Worker capabilities:
- researcher: find facts, gather information, search knowledge
- analyst: analyze data, identify patterns, make comparisons
- writer: create content, reports, summaries, documentation
- coder: write code, scripts, technical implementations
- reviewer: quality check, fact-check, improve outputs

Return a JSON array of tasks:
[
  {{
    "id": "T1",
    "description": "specific instruction for this worker",
    "worker_type": "researcher|analyst|writer|coder|reviewer",
    "depends_on": []
  }},
  ...
]

Important: depends_on should list task IDs that must complete first."""
            }]
        )

        try:
            text = response.content[0].text
            start = text.find('[')
            end = text.rfind(']') + 1
            tasks_data = json.loads(text[start:end])

            return [
                Task(
                    id=t["id"],
                    description=t["description"],
                    worker_type=t["worker_type"],
                    depends_on=t.get("depends_on", [])
                )
                for t in tasks_data
            ]
        except Exception as e:
            print(f"Plan parsing failed: {e}")
            # Fallback: single task
            return [Task("T1", task, "writer", [])]

    def _execute_plan(self, tasks: list[Task]) -> dict:
        results = {}
        completed_ids = set()

        # Topological execution
        max_rounds = len(tasks) * 2
        round_num = 0

        while len(completed_ids) < len(tasks) and round_num < max_rounds:
            round_num += 1
            progress_made = False

            for task in tasks:
                if task.id in completed_ids:
                    continue

                # Check dependencies
                if all(dep in completed_ids for dep in task.depends_on):
                    # Inject dependency results as context
                    task.context = {
                        dep_id: results[dep_id]
                        for dep_id in task.depends_on
                        if dep_id in results
                    }

                    print(f"  Running {task.id} ({task.worker_type}): {task.description[:50]}...")
                    worker = self.workers.get(task.worker_type, self.workers["writer"])
                    task.result = worker.execute(task)
                    results[task.id] = task.result
                    completed_ids.add(task.id)
                    task.status = "completed"
                    progress_made = True

            if not progress_made:
                print("Warning: Dependency deadlock detected, breaking remaining tasks")
                break

        return results

    def _synthesize(self, original_task: str, plan: list[Task], results: dict) -> str:
        results_text = "

".join([
            f"=== {t.id} ({t.worker_type}): {t.description[:60]} ===
{results.get(t.id, 'No result')}"
            for t in plan
        ])

        response = client.messages.create(
            model="claude-opus-4-5",
            max_tokens=4000,
            system="You are an expert synthesizer. Combine worker outputs into a coherent, high-quality final deliverable.",
            messages=[{
                "role": "user",
                "content": f"""Original task: {original_task}

Worker outputs:
{results_text}

Synthesize all worker outputs into a comprehensive final response.
The final output should be self-contained and directly address the original task."""
            }]
        )
        return response.content[0].text

Worker Implementations

class BaseWorker:
    """Base class cho tất cả workers"""

    def execute(self, task: Task) -> str:
        context_text = ""
        if task.context:
            context_parts = [f"--- {k}: {v[:300]}..." for k, v in task.context.items()]
            context_text = "
Context from previous tasks:
" + "
".join(context_parts)

        prompt = f"{task.description}{context_text}"
        response = client.messages.create(
            model=self.model,
            max_tokens=self.max_tokens,
            system=self.system_prompt,
            messages=[{"role": "user", "content": prompt}]
        )
        return response.content[0].text

class ResearchWorker(BaseWorker):
    model = "claude-haiku-4-5"
    max_tokens = 2000
    system_prompt = """You are a meticulous researcher. Gather facts, cite sources when possible,
identify gaps in information, and flag uncertainty. Be thorough but concise."""

class AnalystWorker(BaseWorker):
    model = "claude-haiku-4-5"
    max_tokens = 2000
    system_prompt = """You are a data analyst. Identify patterns, make comparisons,
provide quantitative insights where possible. Structure analysis clearly."""

class WriterWorker(BaseWorker):
    model = "claude-haiku-4-5"
    max_tokens = 3000
    system_prompt = """You are a professional writer. Create clear, engaging, well-structured content.
Adapt tone and style to context. Use formatting (headers, bullets) appropriately."""

class CoderWorker(BaseWorker):
    model = "claude-opus-4-5"  # Coder dùng model mạnh hơn
    max_tokens = 4000
    system_prompt = """You are a senior software engineer. Write clean, production-ready code
with proper error handling, comments, and following best practices."""

class ReviewerWorker(BaseWorker):
    model = "claude-haiku-4-5"
    max_tokens = 1000
    system_prompt = """You are a quality reviewer. Check for accuracy, completeness, consistency,
and clarity. Provide specific, actionable improvement suggestions."""

Ví dụ thực tế: Research Report Generator

system = OrchestratorWorkersSystem()

# Task phức tạp đòi hỏi nhiều loại chuyên môn
result = system.run("""
Create a comprehensive market analysis report on:
"AI adoption in Vietnamese SMEs (small-medium enterprises) in 2024-2025"

The report should include:
1. Current state of AI adoption
2. Key barriers and opportunities
3. Success case studies
4. Recommendations for SME owners
5. Technology roadmap for the next 2 years
""")

print("=" * 60)
print("EXECUTION PLAN:")
for t in result["plan"]:
    print(f"  {t['id']} [{t['worker']}]: {t['desc'][:60]}")

print("
" + "=" * 60)
print("FINAL REPORT:")
print(result["final_output"])

Output plan điển hình trông như sau:

EXECUTION PLAN:
  T1 [researcher]: Research current AI adoption rates in Vietnamese SMEs
  T2 [researcher]: Find specific AI adoption case studies from Vietnamese companies
  T3 [analyst]: Analyze barriers to AI adoption based on T1 findings
  T4 [analyst]: Identify top opportunities from T1 and T2 context
  T5 [writer]: Write executive summary using T1, T3, T4
  T6 [coder]: Create data visualization code for T1 statistics
  T7 [writer]: Write full report combining all sections T1-T6
  T8 [reviewer]: Review and improve final report T7

Async Orchestration cho hiệu suất cao

async def execute_parallel_tasks(tasks: list[Task], results: dict) -> dict:
    """Chạy các tasks không có dependencies song song"""

    # Group tasks không phụ thuộc nhau
    completed = set(results.keys())
    executable = [
        t for t in tasks
        if t.id not in completed
        and all(dep in completed for dep in t.depends_on)
    ]

    if not executable:
        return results

    print(f"Running {len(executable)} tasks in parallel...")

    async def run_async_task(task: Task) -> tuple:
        task.context = {dep: results[dep] for dep in task.depends_on if dep in results}
        context_text = "
".join([f"{k}: {v[:200]}" for k, v in task.context.items()])
        prompt = f"{task.description}
{context_text}" if context_text else task.description

        response = await async_client.messages.create(
            model="claude-haiku-4-5",
            max_tokens=2000,
            messages=[{"role": "user", "content": prompt}]
        )
        return task.id, response.content[0].text

    new_results = await asyncio.gather(*[run_async_task(t) for t in executable])

    for task_id, result in new_results:
        results[task_id] = result

    return results

Error Handling và Resilience

def execute_with_retry(worker: BaseWorker, task: Task, max_retries: int = 2) -> str:
    """Worker execution với retry logic"""
    last_error = None

    for attempt in range(max_retries + 1):
        try:
            return worker.execute(task)
        except anthropic.RateLimitError:
            import time
            wait_time = (2 ** attempt) * 5  # Exponential backoff
            print(f"Rate limit hit, waiting {wait_time}s...")
            time.sleep(wait_time)
        except anthropic.APIError as e:
            last_error = e
            print(f"API error on attempt {attempt + 1}: {e}")

    # Fallback: simplified version of task
    print(f"All retries failed for {task.id}, using fallback")
    response = client.messages.create(
        model="claude-haiku-4-5",
        max_tokens=500,
        messages=[{"role": "user", "content": f"Briefly: {task.description}"}]
    )
    return f"[Fallback result] {response.content[0].text}"

So sánh với các Patterns khác

Pattern Task Structure Complexity Cost
Simple LLM call Single task Thấp Thấp nhất
Prompt Chaining Linear steps Thấp Thấp
Routing Branching Trung bình Thấp
Evaluator-Optimizer Iterative Trung bình Trung bình
Orchestrator-Workers Dynamic DAG Cao Cao

Tổng kết

Orchestrator-Workers là pattern phức tạp nhất nhưng mạnh nhất trong toolkit agentic AI. Khi implement:

  • Đầu tư vào Orchestrator prompt — plan quality quyết định mọi thứ
  • Mỗi Worker cần system prompt chuyên biệt rõ ràng
  • Xử lý dependencies cẩn thận để tránh deadlocks
  • Log toàn bộ plan và kết quả để debug
  • Dùng model nhỏ hơn cho simple workers để tiết kiệm chi phí
  • Implement retry và fallback cho production reliability

Tiếp theo: Kết hợp Orchestrator-Workers với Extended Thinking để Orchestrator lập kế hoạch tốt hơn, và Evaluator-Optimizer để mỗi Worker tự cải thiện output.

Tính năng liên quan:Orchestrator-WorkersMulti-AgentTask DecompositionArchitecture

Bai viet co huu ich khong?

Bản quyền thuộc về tác giả. Vui lòng dẫn nguồn khi chia sẻ.

Bình luận (0)
Ảnh đại diện
Đăng nhập để bình luận...
Đăng nhập để bình luận
  • Đang tải bình luận...

Đăng ký nhận bản tin

Nhận bài viết hay nhất về sản phẩm và vận hành, gửi thẳng vào hộp thư của bạn.

Bảo mật thông tin. Hủy đăng ký bất cứ lúc nào. Chính sách bảo mật.