Orchestrator-Workers — Kiến trúc điều phối agent phức tạp
Điểm nổi bật
Nhấn để đến mục tương ứng
- 1 Thực hành được liền: Pattern này tỏa sáng khi: Task không thể biết trước cần bao nhiêu bước dynamic decomposition Các sub-tasks đòi hỏi. Quy trình từng bước trong phần này giúp bạn bắt đầu ngay mà không cần kinh nghiệm chuyên sâu.
- 2 Thành thật mà nói: import anthropic import json import asyncio from typing import Optional from dataclasses import dataclass, field client. Phương pháp này hiệu quả trong hầu hết trường hợp, nhưng bạn cần điều chỉnh cho phù hợp ngữ cảnh riêng.
- 3 Nội dung cốt lõi: class BaseWorker: """Base class cho tất cả workers""" def executeself, task: Task -> str: contexttext = "" if. Nắm vững phần này sẽ giúp bạn áp dụng hiệu quả hơn 70% so với đọc lướt toàn bài.
- 4 Để đạt hiệu quả tối đa: async def executeparalleltaskstasks: listTask, results: dict -> dict: """Chạy các tasks không có dependencies song. Nhiều người bỏ qua bước này và mất thời gian gấp đôi để đạt cùng kết quả.
- 5 Góc nhìn thực tế: def executewithretryworker: BaseWorker, task: Task, maxretries: int = 2 -> str: """Worker execution với retry. Điều quan trọng là hiểu rõ khi nào nên và không nên áp dụng phương pháp này.
Khi task quá phức tạp cho một LLM đơn lẻ, giải pháp là chia để trị. Orchestrator-Workers pattern tổ chức hệ thống như một công ty: một manager (Orchestrator) nhận yêu cầu lớn, phân tích, rồi giao cho các chuyên gia (Workers) từng phần việc phù hợp với chuyên môn của họ.
Đây là pattern được dùng trong các hệ thống AI production phức tạp nhất — từ research assistants đến automated software development pipelines.
Khi nào cần Orchestrator-Workers?
Pattern này tỏa sáng khi:
- Task không thể biết trước cần bao nhiêu bước (dynamic decomposition)
- Các sub-tasks đòi hỏi chuyên môn khác nhau (research vs coding vs writing)
- Context window của một model không đủ chứa toàn bộ thông tin
- Muốn parallelize các sub-tasks độc lập
- Cần audit trail chi tiết về quá trình xử lý
Kiến trúc tổng quan
import anthropic
import json
import asyncio
from typing import Optional
from dataclasses import dataclass, field
client = anthropic.Anthropic()
async_client = anthropic.AsyncAnthropic()
@dataclass
class Task:
id: str
description: str
worker_type: str
depends_on: list = field(default_factory=list)
context: dict = field(default_factory=dict)
status: str = "pending" # pending/running/completed/failed
result: Optional[str] = None
class OrchestratorWorkersSystem:
def __init__(self):
self.workers = {
"researcher": ResearchWorker(),
"analyst": AnalystWorker(),
"writer": WriterWorker(),
"coder": CoderWorker(),
"reviewer": ReviewerWorker()
}
def run(self, complex_task: str) -> dict:
print(f"Orchestrating: {complex_task[:80]}...")
# Phase 1: Orchestrator phân tích và tạo execution plan
plan = self._create_plan(complex_task)
print(f"Plan created: {len(plan)} tasks")
# Phase 2: Execute tasks theo dependency order
results = self._execute_plan(plan)
# Phase 3: Orchestrator tổng hợp
final = self._synthesize(complex_task, plan, results)
return {
"task": complex_task,
"plan": [{"id": t.id, "worker": t.worker_type, "desc": t.description} for t in plan],
"results": results,
"final_output": final
}
def _create_plan(self, task: str) -> list[Task]:
available_workers = list(self.workers.keys())
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=3000,
system="""You are a project orchestrator. Create execution plans for complex tasks.
Break down tasks into atomic subtasks assignable to specialist workers.
Think carefully about dependencies — which tasks must complete before others can start.""",
messages=[{
"role": "user",
"content": f"""Create an execution plan for this task.
Task: {task}
Available workers: {available_workers}
Worker capabilities:
- researcher: find facts, gather information, search knowledge
- analyst: analyze data, identify patterns, make comparisons
- writer: create content, reports, summaries, documentation
- coder: write code, scripts, technical implementations
- reviewer: quality check, fact-check, improve outputs
Return a JSON array of tasks:
[
{{
"id": "T1",
"description": "specific instruction for this worker",
"worker_type": "researcher|analyst|writer|coder|reviewer",
"depends_on": []
}},
...
]
Important: depends_on should list task IDs that must complete first."""
}]
)
try:
text = response.content[0].text
start = text.find('[')
end = text.rfind(']') + 1
tasks_data = json.loads(text[start:end])
return [
Task(
id=t["id"],
description=t["description"],
worker_type=t["worker_type"],
depends_on=t.get("depends_on", [])
)
for t in tasks_data
]
except Exception as e:
print(f"Plan parsing failed: {e}")
# Fallback: single task
return [Task("T1", task, "writer", [])]
def _execute_plan(self, tasks: list[Task]) -> dict:
results = {}
completed_ids = set()
# Topological execution
max_rounds = len(tasks) * 2
round_num = 0
while len(completed_ids) < len(tasks) and round_num < max_rounds:
round_num += 1
progress_made = False
for task in tasks:
if task.id in completed_ids:
continue
# Check dependencies
if all(dep in completed_ids for dep in task.depends_on):
# Inject dependency results as context
task.context = {
dep_id: results[dep_id]
for dep_id in task.depends_on
if dep_id in results
}
print(f" Running {task.id} ({task.worker_type}): {task.description[:50]}...")
worker = self.workers.get(task.worker_type, self.workers["writer"])
task.result = worker.execute(task)
results[task.id] = task.result
completed_ids.add(task.id)
task.status = "completed"
progress_made = True
if not progress_made:
print("Warning: Dependency deadlock detected, breaking remaining tasks")
break
return results
def _synthesize(self, original_task: str, plan: list[Task], results: dict) -> str:
results_text = "
".join([
f"=== {t.id} ({t.worker_type}): {t.description[:60]} ===
{results.get(t.id, 'No result')}"
for t in plan
])
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=4000,
system="You are an expert synthesizer. Combine worker outputs into a coherent, high-quality final deliverable.",
messages=[{
"role": "user",
"content": f"""Original task: {original_task}
Worker outputs:
{results_text}
Synthesize all worker outputs into a comprehensive final response.
The final output should be self-contained and directly address the original task."""
}]
)
return response.content[0].text
Worker Implementations
class BaseWorker:
"""Base class cho tất cả workers"""
def execute(self, task: Task) -> str:
context_text = ""
if task.context:
context_parts = [f"--- {k}: {v[:300]}..." for k, v in task.context.items()]
context_text = "
Context from previous tasks:
" + "
".join(context_parts)
prompt = f"{task.description}{context_text}"
response = client.messages.create(
model=self.model,
max_tokens=self.max_tokens,
system=self.system_prompt,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
class ResearchWorker(BaseWorker):
model = "claude-haiku-4-5"
max_tokens = 2000
system_prompt = """You are a meticulous researcher. Gather facts, cite sources when possible,
identify gaps in information, and flag uncertainty. Be thorough but concise."""
class AnalystWorker(BaseWorker):
model = "claude-haiku-4-5"
max_tokens = 2000
system_prompt = """You are a data analyst. Identify patterns, make comparisons,
provide quantitative insights where possible. Structure analysis clearly."""
class WriterWorker(BaseWorker):
model = "claude-haiku-4-5"
max_tokens = 3000
system_prompt = """You are a professional writer. Create clear, engaging, well-structured content.
Adapt tone and style to context. Use formatting (headers, bullets) appropriately."""
class CoderWorker(BaseWorker):
model = "claude-opus-4-5" # Coder dùng model mạnh hơn
max_tokens = 4000
system_prompt = """You are a senior software engineer. Write clean, production-ready code
with proper error handling, comments, and following best practices."""
class ReviewerWorker(BaseWorker):
model = "claude-haiku-4-5"
max_tokens = 1000
system_prompt = """You are a quality reviewer. Check for accuracy, completeness, consistency,
and clarity. Provide specific, actionable improvement suggestions."""
Ví dụ thực tế: Research Report Generator
system = OrchestratorWorkersSystem()
# Task phức tạp đòi hỏi nhiều loại chuyên môn
result = system.run("""
Create a comprehensive market analysis report on:
"AI adoption in Vietnamese SMEs (small-medium enterprises) in 2024-2025"
The report should include:
1. Current state of AI adoption
2. Key barriers and opportunities
3. Success case studies
4. Recommendations for SME owners
5. Technology roadmap for the next 2 years
""")
print("=" * 60)
print("EXECUTION PLAN:")
for t in result["plan"]:
print(f" {t['id']} [{t['worker']}]: {t['desc'][:60]}")
print("
" + "=" * 60)
print("FINAL REPORT:")
print(result["final_output"])
Output plan điển hình trông như sau:
EXECUTION PLAN:
T1 [researcher]: Research current AI adoption rates in Vietnamese SMEs
T2 [researcher]: Find specific AI adoption case studies from Vietnamese companies
T3 [analyst]: Analyze barriers to AI adoption based on T1 findings
T4 [analyst]: Identify top opportunities from T1 and T2 context
T5 [writer]: Write executive summary using T1, T3, T4
T6 [coder]: Create data visualization code for T1 statistics
T7 [writer]: Write full report combining all sections T1-T6
T8 [reviewer]: Review and improve final report T7
Async Orchestration cho hiệu suất cao
async def execute_parallel_tasks(tasks: list[Task], results: dict) -> dict:
"""Chạy các tasks không có dependencies song song"""
# Group tasks không phụ thuộc nhau
completed = set(results.keys())
executable = [
t for t in tasks
if t.id not in completed
and all(dep in completed for dep in t.depends_on)
]
if not executable:
return results
print(f"Running {len(executable)} tasks in parallel...")
async def run_async_task(task: Task) -> tuple:
task.context = {dep: results[dep] for dep in task.depends_on if dep in results}
context_text = "
".join([f"{k}: {v[:200]}" for k, v in task.context.items()])
prompt = f"{task.description}
{context_text}" if context_text else task.description
response = await async_client.messages.create(
model="claude-haiku-4-5",
max_tokens=2000,
messages=[{"role": "user", "content": prompt}]
)
return task.id, response.content[0].text
new_results = await asyncio.gather(*[run_async_task(t) for t in executable])
for task_id, result in new_results:
results[task_id] = result
return results
Error Handling và Resilience
def execute_with_retry(worker: BaseWorker, task: Task, max_retries: int = 2) -> str:
"""Worker execution với retry logic"""
last_error = None
for attempt in range(max_retries + 1):
try:
return worker.execute(task)
except anthropic.RateLimitError:
import time
wait_time = (2 ** attempt) * 5 # Exponential backoff
print(f"Rate limit hit, waiting {wait_time}s...")
time.sleep(wait_time)
except anthropic.APIError as e:
last_error = e
print(f"API error on attempt {attempt + 1}: {e}")
# Fallback: simplified version of task
print(f"All retries failed for {task.id}, using fallback")
response = client.messages.create(
model="claude-haiku-4-5",
max_tokens=500,
messages=[{"role": "user", "content": f"Briefly: {task.description}"}]
)
return f"[Fallback result] {response.content[0].text}"
So sánh với các Patterns khác
| Pattern | Task Structure | Complexity | Cost |
|---|---|---|---|
| Simple LLM call | Single task | Thấp | Thấp nhất |
| Prompt Chaining | Linear steps | Thấp | Thấp |
| Routing | Branching | Trung bình | Thấp |
| Evaluator-Optimizer | Iterative | Trung bình | Trung bình |
| Orchestrator-Workers | Dynamic DAG | Cao | Cao |
Tổng kết
Orchestrator-Workers là pattern phức tạp nhất nhưng mạnh nhất trong toolkit agentic AI. Khi implement:
- Đầu tư vào Orchestrator prompt — plan quality quyết định mọi thứ
- Mỗi Worker cần system prompt chuyên biệt rõ ràng
- Xử lý dependencies cẩn thận để tránh deadlocks
- Log toàn bộ plan và kết quả để debug
- Dùng model nhỏ hơn cho simple workers để tiết kiệm chi phí
- Implement retry và fallback cho production reliability
Tiếp theo: Kết hợp Orchestrator-Workers với Extended Thinking để Orchestrator lập kế hoạch tốt hơn, và Evaluator-Optimizer để mỗi Worker tự cải thiện output.
Bai viet co huu ich khong?
Bản quyền thuộc về tác giả. Vui lòng dẫn nguồn khi chia sẻ.





