User upload part image → recommend material (metal, polymer, ceramic, composite, elastomer, wood).
- Understand parallelization pattern
- Split single task → specialized parallel subtasks → aggregate
- Implement với asyncio
- Balance speed vs cost
Pattern
Each sub-task focused + specialized criteria. Aggregation sees full picture.
┌──────────────────────────────────────────────┐ │ │ │ Task: evaluate part │ │ │ │ │ ├─ Parallel │ │ │ │ │ ├─ Metal analysis (Claude) │ │ ├─ Polymer analysis (Claude) │ │ ├─ Ceramic analysis (Claude) │ │ ├─ Composite analysis (Claude) │ │ ├─ Elastomer analysis (Claude) │ │ └─ Wood analysis (Claude) │ │ │ │ All results → │ │ │ │ Aggregation (Claude) — compare → pick best │ │ │ │ │ ▼ │ │ Final recommendation │ │ │ └──────────────────────────────────────────────┘
Implementation với asyncio
Performance
Sequential: 6 × 2s = 12s Parallel: 2s (if concurrency allows)
6x speedup for free (just async).
import asyncio
from anthropic import AsyncAnthropic
client = AsyncAnthropic()
MATERIALS = ["metal", "polymer", "ceramic", "composite", "elastomer", "wood"]
MATERIAL_CRITERIA = {
"metal": "Evaluate for: strength, weldability, corrosion resistance, cost.",
"polymer": "Evaluate for: moldability, weight, temperature range, UV resistance.",
"ceramic": "Evaluate for: hardness, brittleness, thermal resistance, machinability.",
"composite": "Evaluate for: strength-to-weight, cost, manufacturing complexity.",
"elastomer": "Evaluate for: flexibility, durability, chemical resistance, temperature range.",
"wood": "Evaluate for: availability, machinability, aesthetics, moisture sensitivity."
}
async def analyze_material(image_bytes, material: str):
"""Single specialized analysis."""
criteria = MATERIAL_CRITERIA[material]
prompt = f"""You are a materials engineer evaluating a part for {material}.
{criteria}
For this specific material, analyze the part image:
- Suitability (1-10 scale)
- Top 3 strengths if using {material}
- Top 3 concerns
- Overall recommendation: recommended / not recommended"""
response = await client.messages.create(
model="claude-sonnet-5-20260205",
max_tokens=1000,
messages=[{
"role": "user",
"content": [
{"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": image_bytes}},
{"type": "text", "text": prompt}
]
}]
)
return {
"material": material,
"analysis": response.content[0].text
}
async def aggregate_recommendations(results: list):
"""Combine parallel analyses, pick best."""
combined = "\n\n".join(
f"=== {r['material']} ===\n{r['analysis']}"
for r in results
)
prompt = f"""Given these specialized material analyses, pick the best material:
{combined}
Output:
- Best material: [name]
- Rationale: 2-3 sentences comparing to runner-up
- Confidence: high/medium/low"""
response = await client.messages.create(
model="claude-sonnet-5-20260205",
max_tokens=500,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
async def recommend_material(image_bytes):
"""Full pipeline."""
# Parallel analyses
tasks = [analyze_material(image_bytes, m) for m in MATERIALS]
results = await asyncio.gather(*tasks)
# Aggregate
recommendation = await aggregate_recommendations(results)
return recommendation
# Run
recommendation = asyncio.run(recommend_material(image_bytes))
print(recommendation)Pattern variants
Homogeneous
Same prompt template, different inputs.
Example: 100 document chunks, each classify in parallel.
Heterogeneous
Different prompts, different specialties.
Example: material analysis (different prompt per material).
Both
Nested. For each material, evaluate in 3 conditions. 6 × 3 = 18 parallel calls.
Aggregation strategies
Voting
All subtasks vote → majority wins.
Simple, majority-rule.
Weighted
Some subtasks more authoritative.
votes = [r["pick"] for r in results]
winner = max(set(votes), key=votes.count)Weighted
Claude aggregate
Let Claude synthesize (most flexible):
weighted_scores = defaultdict(float)
for r, weight in zip(results, weights):
weighted_scores[r["pick"]] += weightClaude aggregate
Captures nuance beyond votes.
summary = "\n".join(r["analysis"] for r in results)
final = await claude(f"Synthesize:\n{summary}")When parallel worth
✅ Good fit
⚠️ Skip
- Multiple independent subtasks
- Total latency matters (user waiting)
- Subtasks are similar shape (all classify, all evaluate)
- Subtasks depend on each other (must sequential)
- Subtask count < 3 (parallel overhead not worth)
- Budget tight (parallel = same cost, but all charged)
Rate limits
Parallel hit rate limits faster:
Bounded concurrency
# 100 concurrent requests
tasks = [claude(prompt_i) for i in range(100)]
await asyncio.gather(*tasks)
# → likely rate-limitedBounded concurrency
Limit concurrent calls to API tier.
async def bounded_gather(tasks, max_concurrent=5):
semaphore = asyncio.Semaphore(max_concurrent)
async def _with_sem(task):
async with semaphore:
return await task
return await asyncio.gather(*(_with_sem(t) for t in tasks))Cost aware
Parallel = same cost as sequential. Save time, not money.
For eval batches where time matters → parallel great.
For background jobs where time OK → sequential may be simpler.
Real-world use cases
📊 Data analysis
Sentiment classify 1000 tweets → parallel (fast analytics).
📄 Document processing
Extract structured data from 100 PDFs → parallel.
🏢 Multi-dimensional evaluation
Candidate fit for 5 roles → parallel eval per role.
🎬 Content generation
Generate 10 ad variants → parallel.
🔍 Research
Query multiple sources simultaneously → parallel.
Anti-patterns
❌ Forget bounded concurrency
Rate limited → wasted requests.
Fix: Semaphore with tier-appropriate limit.
❌ Parallel sequential steps
Fix: Sequential for dependent steps.
❌ Ignore partial failures
Parallel run, 1 fail → gather throws, lose all results.
Fix: return_exceptions=True or per-task try-except.
results = await asyncio.gather(step_a, step_b)
# But step_b needs step_a's output!❌ Ignore partial failures
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception)]Áp dụng ngay
Bài tập 1: Parallelize for your app (30 phút)
Identify repeated work (10 document classify, 5 candidate eval, ...). Parallelize. Measure speedup.
Bài tập 2: Material recommender (30 phút)
Implement material recommender. Test với image (can use mock if no image).
Compare:
Measure quality (accuracy) + speed.
- Single prompt approach
- Parallel approach (6 calls + aggregate)
Tóm tắt
🎯 Parallel = concurrent subtasks. Speedup proportional to tasks.
🎯 Specialized prompts per parallel branch → higher quality.
🎯 Aggregation step combine results.
🎯 asyncio.gather + Semaphore for bounded concurrency.
🎯 Use when: independent subtasks, latency matters.