Eval pipeline = 3 function nhỏ ghép lại. Mỗi function làm 1 việc rõ:
- Implement 3 function cốt lõi: run_prompt, run_test_case, run_eval
- Chạy eval pipeline end-to-end với dataset + placeholder grader
- Hiểu shape của results output
- Optimize pipeline bằng concurrency để tăng tốc
Function 1: run_prompt
Merge test case với prompt template, call Claude:
Đơn giản: format prompt từ test_case, gửi Claude, lấy text.
Iteration ở đây — mỗi version prompt thay đổi run_prompt.
from anthropic import Anthropic
client = Anthropic()
model = "claude-sonnet-5-20260205"
def add_user_message(messages, text):
messages.append({"role": "user", "content": text})
def chat(messages, system=None, temperature=1.0, stop_sequences=None):
params = {
"model": model,
"max_tokens": 1000,
"messages": messages,
"temperature": temperature
}
if system:
params["system"] = system
if stop_sequences:
params["stop_sequences"] = stop_sequences
return client.messages.create(**params).content[0].text
def run_prompt(test_case: dict) -> str:
"""Merge test case with prompt template, return Claude output."""
prompt = f"""Please solve the following task:
{test_case["task"]}"""
messages = []
add_user_message(messages, prompt)
return chat(messages)Function 2: run_test_case
Chạy 1 case, grade, trả structured result:
Structured result để:
- Review từng case
- Aggregate score
- Debug khi có case fail
def run_test_case(test_case: dict) -> dict:
"""Run single case, return result with output + score."""
output = run_prompt(test_case)
# TODO: replace với grader thật (bài 6.26-27)
score = 10 # placeholder
return {
"output": output,
"test_case": test_case,
"score": score
}Function 3: run_eval
Loop dataset, collect results:
Simple loop. Sẽ optimize concurrency sau.
def run_eval(dataset: list) -> list:
"""Run eval on full dataset."""
results = []
for test_case in dataset:
result = run_test_case(test_case)
results.append(result)
return resultsChạy pipeline
Output:
import json
# Load dataset
with open("dataset.json") as f:
dataset = json.load(f)
# Run
results = run_eval(dataset)
# Aggregate
avg_score = sum(r["score"] for r in results) / len(results)
print(f"Average score: {avg_score:.2f}")
# Inspect first case
print(json.dumps(results[0], indent=2))Chạy pipeline (tiếp)
Với 30 cases, Haiku model, pipeline chạy ~30 giây.
Average score: 10.00 ← placeholder, chưa có grader thật
{
"output": "Here's a Python function to validate IAM username...",
"test_case": {"task": "Create Python function to validate IAM username"},
"score": 10
}Shape của results
Lưu thêm metrics giúp debug:
- Latency cao ở case nào → optimize prompt
- Token usage cao ở case nào → reduce prompt
results = [
{
"output": "...", # Text Claude trả
"test_case": {...}, # Input case gốc
"score": float, # Điểm từ grader
"reasoning": "...", # (optional) Grader explain
"latency_ms": int, # (optional) Time
"tokens": {...}, # (optional) Usage
},
...
]Optimization: Concurrency
Chạy tuần tự 30 cases = 30 giây. Chạy 10 concurrent = 3-5 giây.
Chọn max_concurrency
Start 3, tăng dần nếu không rate-limited.
| Tier account | Rate limit | Concurrency |
|---|---|---|
| Free | 5 req/min | 1 |
| Tier 1 | 50 req/min | 3 |
| Tier 2 | 1000 req/min | 10 |
| Tier 3+ | 2000+ req/min | 20 |
import asyncio
from anthropic import AsyncAnthropic
async_client = AsyncAnthropic()
async def run_prompt_async(test_case: dict) -> str:
prompt = f"Please solve:\n\n{test_case['task']}"
msg = await async_client.messages.create(
model=model,
max_tokens=1000,
messages=[{"role": "user", "content": prompt}],
)
return msg.content[0].text
async def run_test_case_async(test_case: dict) -> dict:
output = await run_prompt_async(test_case)
score = 10 # placeholder
return {"output": output, "test_case": test_case, "score": score}
async def run_eval_async(dataset: list, max_concurrency: int = 5) -> list:
"""Run eval with bounded concurrency."""
semaphore = asyncio.Semaphore(max_concurrency)
async def _run_with_limit(case):
async with semaphore:
return await run_test_case_async(case)
tasks = [_run_with_limit(case) for case in dataset]
return await asyncio.gather(*tasks)
# Run
results = asyncio.run(run_eval_async(dataset, max_concurrency=5))Retry logic
Production pipeline phải handle transient errors:
import asyncio
from anthropic import RateLimitError, APIError
async def run_prompt_with_retry(test_case: dict, max_retries: int = 3) -> str:
for attempt in range(max_retries):
try:
return await run_prompt_async(test_case)
except RateLimitError:
wait = 2 ** attempt
await asyncio.sleep(wait)
except APIError as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(1)
raise Exception("Max retries exceeded")Inspecting verbose output
Prompt v1 simple → Claude verbose response. Đây là baseline để so sánh v2.
Observation:
Đây là bug của prompt, không phải của eval. Eval làm việc của nó — show cho bạn vấn đề.
- Output có markdown wrap
- Output có explanation không cần thiết
- Output format inconsistent
# Inspect
for i, r in enumerate(results[:3]):
print(f"\n=== Case {i+1}: {r['test_case']['task']} ===")
print(f"Output:\n{r['output']}")
print(f"Score: {r['score']}")Save results
Save JSON cho review + compare versions:
Filename pattern: results_v{N}.json — dễ so sánh.
def save_results(results, filename="results_v1.json"):
# Cleanup non-serializable fields if any
import json
with open(filename, "w") as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"Saved {len(results)} results to {filename}")
save_results(results, "results_v1.json")Anti-patterns
❌ Hardcode prompt trong run_prompt
Fix: Prompt là arg, hoặc config file, hoặc import từ prompts.py.
❌ Không handle exception
Case 27 fail → pipeline crash, mất results 1-26.
Fix: Try-except per case, skip fail với placeholder score = 0.
❌ Print output trong function
run_prompt có print() → không clean cho async.
Fix: Return data, print outside.
❌ Concurrency quá cao → rate limited
max_concurrency=50 cho Tier 1 → HTTP 429.
Fix: Check rate limit tier. Start conservative.
❌ Không save results
Chạy 10 phút, close notebook, mất kết quả.
Fix: Save results.json ngay sau mỗi run_eval.
def run_prompt(case):
prompt = f"Please solve: {case['task']}" # ← hardcodedÁp dụng ngay
Bài tập 1: Build pipeline (30 phút)
Trong notebook:
Bài tập 2: Add concurrency (20 phút)
Upgrade pipeline dùng async. Đo thời gian trước/sau:
Expect 5-10x speedup.
- Copy 3 function (run_prompt, run_test_case, run_eval)
- Load dataset.json (từ bài 6.24)
- Chạy eval (placeholder score)
- In ra 3 case đầu tiên
- Save results.json
import time
start = time.time()
# run eval
elapsed = time.time() - start
print(f"Elapsed: {elapsed:.1f}s")Tóm tắt
🎯 3 function: run_prompt (merge + call), run_test_case (+ grade), run_eval (loop).
🎯 Start đơn giản, placeholder grader. Bài sau add grader thật.
🎯 Concurrency tăng speed 5-10x — must for production datasets.
🎯 Retry + save results là production pattern.
🎯 Verbose output của baseline là FEATURE — show bug của prompt để bạn fix.