Parallelization workflows — Concurrent subtasks

9 — Workflows & AgentsTrung cấp15 phút

User upload part image → recommend material (metal, polymer, ceramic, composite, elastomer, wood).

Bạn sẽ học được
  • Understand parallelization pattern
  • Split single task → specialized parallel subtasks → aggregate
  • Implement với asyncio
  • Balance speed vs cost

Pattern

Each sub-task focused + specialized criteria. Aggregation sees full picture.

┌──────────────────────────────────────────────┐
│                                              │
│  Task: evaluate part                         │
│    │                                         │
│    ├─ Parallel                               │
│    │                                         │
│    ├─ Metal analysis    (Claude)              │
│    ├─ Polymer analysis  (Claude)              │
│    ├─ Ceramic analysis  (Claude)              │
│    ├─ Composite analysis (Claude)             │
│    ├─ Elastomer analysis (Claude)             │
│    └─ Wood analysis     (Claude)              │
│                                              │
│    All results →                             │
│                                              │
│  Aggregation (Claude) — compare → pick best │
│    │                                         │
│    ▼                                         │
│  Final recommendation                        │
│                                              │
└──────────────────────────────────────────────┘

Implementation với asyncio

Performance

Sequential: 6 × 2s = 12s Parallel: 2s (if concurrency allows)

6x speedup for free (just async).

import asyncio
from anthropic import AsyncAnthropic

client = AsyncAnthropic()

MATERIALS = ["metal", "polymer", "ceramic", "composite", "elastomer", "wood"]

MATERIAL_CRITERIA = {
    "metal": "Evaluate for: strength, weldability, corrosion resistance, cost.",
    "polymer": "Evaluate for: moldability, weight, temperature range, UV resistance.",
    "ceramic": "Evaluate for: hardness, brittleness, thermal resistance, machinability.",
    "composite": "Evaluate for: strength-to-weight, cost, manufacturing complexity.",
    "elastomer": "Evaluate for: flexibility, durability, chemical resistance, temperature range.",
    "wood": "Evaluate for: availability, machinability, aesthetics, moisture sensitivity."
}


async def analyze_material(image_bytes, material: str):
    """Single specialized analysis."""
    criteria = MATERIAL_CRITERIA[material]
    
    prompt = f"""You are a materials engineer evaluating a part for {material}.

{criteria}

For this specific material, analyze the part image:
- Suitability (1-10 scale)
- Top 3 strengths if using {material}
- Top 3 concerns
- Overall recommendation: recommended / not recommended"""
    
    response = await client.messages.create(
        model="claude-sonnet-5-20260205",
        max_tokens=1000,
        messages=[{
            "role": "user",
            "content": [
                {"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": image_bytes}},
                {"type": "text", "text": prompt}
            ]
        }]
    )
    return {
        "material": material,
        "analysis": response.content[0].text
    }


async def aggregate_recommendations(results: list):
    """Combine parallel analyses, pick best."""
    combined = "\n\n".join(
        f"=== {r['material']} ===\n{r['analysis']}"
        for r in results
    )
    
    prompt = f"""Given these specialized material analyses, pick the best material:

{combined}

Output:
- Best material: [name]
- Rationale: 2-3 sentences comparing to runner-up
- Confidence: high/medium/low"""
    
    response = await client.messages.create(
        model="claude-sonnet-5-20260205",
        max_tokens=500,
        messages=[{"role": "user", "content": prompt}]
    )
    return response.content[0].text


async def recommend_material(image_bytes):
    """Full pipeline."""
    # Parallel analyses
    tasks = [analyze_material(image_bytes, m) for m in MATERIALS]
    results = await asyncio.gather(*tasks)
    
    # Aggregate
    recommendation = await aggregate_recommendations(results)
    return recommendation


# Run
recommendation = asyncio.run(recommend_material(image_bytes))
print(recommendation)

Pattern variants

Homogeneous

Same prompt template, different inputs.

Example: 100 document chunks, each classify in parallel.

Heterogeneous

Different prompts, different specialties.

Example: material analysis (different prompt per material).

Both

Nested. For each material, evaluate in 3 conditions. 6 × 3 = 18 parallel calls.

Aggregation strategies

Voting

All subtasks vote → majority wins.

Simple, majority-rule.

Weighted

Some subtasks more authoritative.

votes = [r["pick"] for r in results]
winner = max(set(votes), key=votes.count)

Weighted

Claude aggregate

Let Claude synthesize (most flexible):

weighted_scores = defaultdict(float)
for r, weight in zip(results, weights):
    weighted_scores[r["pick"]] += weight

Claude aggregate

Captures nuance beyond votes.

summary = "\n".join(r["analysis"] for r in results)
final = await claude(f"Synthesize:\n{summary}")

When parallel worth

✅ Good fit

⚠️ Skip

  • Multiple independent subtasks
  • Total latency matters (user waiting)
  • Subtasks are similar shape (all classify, all evaluate)
  • Subtasks depend on each other (must sequential)
  • Subtask count < 3 (parallel overhead not worth)
  • Budget tight (parallel = same cost, but all charged)

Rate limits

Parallel hit rate limits faster:

Bounded concurrency

# 100 concurrent requests
tasks = [claude(prompt_i) for i in range(100)]
await asyncio.gather(*tasks)
# → likely rate-limited

Bounded concurrency

Limit concurrent calls to API tier.

async def bounded_gather(tasks, max_concurrent=5):
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def _with_sem(task):
        async with semaphore:
            return await task
    
    return await asyncio.gather(*(_with_sem(t) for t in tasks))

Cost aware

Parallel = same cost as sequential. Save time, not money.

For eval batches where time matters → parallel great.

For background jobs where time OK → sequential may be simpler.

Real-world use cases

📊 Data analysis

Sentiment classify 1000 tweets → parallel (fast analytics).

📄 Document processing

Extract structured data from 100 PDFs → parallel.

🏢 Multi-dimensional evaluation

Candidate fit for 5 roles → parallel eval per role.

🎬 Content generation

Generate 10 ad variants → parallel.

🔍 Research

Query multiple sources simultaneously → parallel.

Anti-patterns

❌ Forget bounded concurrency

Rate limited → wasted requests.

Fix: Semaphore with tier-appropriate limit.

❌ Parallel sequential steps

Fix: Sequential for dependent steps.

❌ Ignore partial failures

Parallel run, 1 fail → gather throws, lose all results.

Fix: return_exceptions=True or per-task try-except.

results = await asyncio.gather(step_a, step_b)
# But step_b needs step_a's output!

❌ Ignore partial failures

results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception)]

Áp dụng ngay

Bài tập 1: Parallelize for your app (30 phút)

Identify repeated work (10 document classify, 5 candidate eval, ...). Parallelize. Measure speedup.

Bài tập 2: Material recommender (30 phút)

Implement material recommender. Test với image (can use mock if no image).

Compare:

Measure quality (accuracy) + speed.

  • Single prompt approach
  • Parallel approach (6 calls + aggregate)

Tóm tắt

🎯 Parallel = concurrent subtasks. Speedup proportional to tasks.

🎯 Specialized prompts per parallel branch → higher quality.

🎯 Aggregation step combine results.

🎯 asyncio.gather + Semaphore for bounded concurrency.

🎯 Use when: independent subtasks, latency matters.

Nội dung này có hữu ích không?