- Viết VectorIndex class reusable
- Handle document batch embedding efficiently
- Cache embeddings (không re-compute)
- Production-ready error handling
VectorIndex class
import json
import pickle
from pathlib import Path
from typing import Any, Dict, List, Tuple
import numpy as np
import voyageai
class VectorIndex:
"""Simple in-memory vector index with persistence."""
def __init__(self, embed_model: str = "voyage-3-large"):
self.embed_model = embed_model
self.client = voyageai.Client()
self.docs: List[Dict[str, Any]] = []
self.embeddings: List[np.ndarray] = []
def _embed_batch(self, texts: List[str], input_type: str) -> List[List[float]]:
"""Batch embed with retry."""
if not texts:
return []
result = self.client.embed(
texts,
model=self.embed_model,
input_type=input_type
)
return result.embeddings
def add_document(self, content: str, metadata: Dict[str, Any] = None):
"""Add single document."""
emb = self._embed_batch([content], input_type="document")[0]
self.docs.append({"content": content, "metadata": metadata or {}})
self.embeddings.append(np.array(emb))
def add_documents(self, contents: List[str], metadatas: List[Dict] = None):
"""Batch add."""
if metadatas is None:
metadatas = [{}] * len(contents)
# Embed trong batches 128
BATCH = 128
for i in range(0, len(contents), BATCH):
batch = contents[i:i+BATCH]
embs = self._embed_batch(batch, input_type="document")
for content, emb, meta in zip(batch, embs, metadatas[i:i+BATCH]):
self.docs.append({"content": content, "metadata": meta})
self.embeddings.append(np.array(emb))
def search(self, query: str, k: int = 5) -> List[Tuple[Dict, float]]:
"""Search by query text. Returns list of (doc, distance)."""
if not self.embeddings:
return []
query_emb = np.array(self._embed_batch([query], input_type="query")[0])
# Normalize
query_emb = query_emb / np.linalg.norm(query_emb)
# Stack all embeddings
all_embs = np.stack(self.embeddings)
norms = np.linalg.norm(all_embs, axis=1, keepdims=True)
normalized = all_embs / norms
# Similarity = dot product (both normalized)
similarities = normalized @ query_emb
# Top K
top_indices = np.argsort(-similarities)[:k]
return [
(self.docs[i], float(1.0 - similarities[i])) # distance = 1 - sim
for i in top_indices
]
def save(self, path: str):
"""Persist to disk."""
with open(path, "wb") as f:
pickle.dump({
"docs": self.docs,
"embeddings": self.embeddings,
"embed_model": self.embed_model
}, f)
def load(self, path: str):
"""Load from disk."""
with open(path, "rb") as f:
data = pickle.load(f)
self.docs = data["docs"]
self.embeddings = data["embeddings"]
self.embed_model = data["embed_model"]Usage
from anthropic import Anthropic
import re
anthropic = Anthropic()
# === Setup ===
# 1. Load doc
with open("handbook.md") as f:
doc = f.read()
# 2. Chunk
chunks = re.split(r"\n## ", doc)
chunks = [c.strip() for c in chunks if c.strip()]
# 3. Build index
index = VectorIndex()
metadatas = [{"chunk_id": i, "source": "handbook.md"} for i in range(len(chunks))]
index.add_documents(chunks, metadatas)
# 4. Save
index.save("handbook_index.pkl")Usage (tiếp)
# === Query ===
# Load index (subsequent runs)
index = VectorIndex()
index.load("handbook_index.pkl")
def rag_answer(query: str) -> str:
results = index.search(query, k=3)
if not results or results[0][1] > 0.7: # distance too high
return "I don't have information on this."
context = "\n---\n".join(doc["content"] for doc, _ in results)
prompt = f"""Answer based on context.
<context>
{context}
</context>
Question: {query}"""
response = anthropic.messages.create(
model="claude-sonnet-5-20260205",
max_tokens=1000,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
print(rag_answer("What's our WFH policy?"))Document loader pattern
Scale to many files:
from pathlib import Path
def load_markdown_files(directory: str) -> List[Dict]:
"""Load all .md files, return list of docs."""
docs = []
for path in Path(directory).rglob("*.md"):
with open(path) as f:
content = f.read()
docs.append({"content": content, "path": str(path)})
return docs
def chunk_markdown(content: str, source: str) -> List[Dict]:
"""Chunk markdown by H2 sections."""
sections = re.split(r"\n## ", content)
return [
{"content": s.strip(), "source": source, "chunk_index": i}
for i, s in enumerate(sections) if s.strip()
]
# Pipeline
files = load_markdown_files("docs/")
all_chunks = []
for f in files:
all_chunks.extend(chunk_markdown(f["content"], f["path"]))
index = VectorIndex()
contents = [c["content"] for c in all_chunks]
metadatas = [{"source": c["source"], "chunk_index": c["chunk_index"]} for c in all_chunks]
index.add_documents(contents, metadatas)
index.save("docs_index.pkl")Incremental update
Doc updated → re-index only changed chunks:
In production, use proper vector DB với delete/update APIs.
def update_doc(index: VectorIndex, file_path: str):
"""Remove old chunks từ file, add new."""
# Remove old
index.docs = [d for d in index.docs if d["metadata"].get("source") != file_path]
# Need also remove from embeddings list (keep aligned)
# ... maintain sync
# Add new
with open(file_path) as f:
content = f.read()
new_chunks = chunk_markdown(content, file_path)
contents = [c["content"] for c in new_chunks]
metadatas = [{"source": c["source"], "chunk_index": c["chunk_index"]} for c in new_chunks]
index.add_documents(contents, metadatas)Retrieval with citations
Inline citations + verifiable sources.
def rag_with_citations(query: str) -> Dict:
results = index.search(query, k=3)
context_parts = []
for i, (doc, dist) in enumerate(results):
context_parts.append(f"[{i+1}] {doc['content']}")
context = "\n\n".join(context_parts)
prompt = f"""Answer the question using the context. Cite sources [1], [2], etc.
<context>
{context}
</context>
Question: {query}"""
response = anthropic.messages.create(
model="claude-sonnet-5-20260205",
max_tokens=1000,
messages=[{"role": "user", "content": prompt}]
)
return {
"answer": response.content[0].text,
"sources": [
{
"number": i + 1,
"source": doc["metadata"].get("source"),
"chunk_id": doc["metadata"].get("chunk_id"),
"snippet": doc["content"][:200]
}
for i, (doc, _) in enumerate(results)
]
}
result = rag_with_citations("When do I get paid?")
print(result["answer"])
print("\nSources:")
for s in result["sources"]:
print(f" [{s['number']}] {s['source']}: {s['snippet']}...")Error handling
from anthropic import APIError, RateLimitError
from voyageai.error import VoyageAPIError
def robust_rag(query: str, max_retries: int = 3):
for attempt in range(max_retries):
try:
return rag_answer(query)
except (RateLimitError, VoyageAPIError) as e:
wait = 2 ** attempt
logger.warning(f"Retry {attempt + 1} after {wait}s: {e}")
time.sleep(wait)
except APIError as e:
logger.error(f"API error: {e}")
if attempt == max_retries - 1:
raise
return NonePerformance benchmarks
Scales ~linearly. For 100K chunks, use dedicated vector DB.
| Stage | Time (1K chunks) |
|---|---|
| Embed 1K chunks (batch 128) | ~10s |
| Index build | ~1s |
| Save to disk | <1s |
| Single query | ~200ms (embed 100ms + search 100ms) |
Eval cho retrieval
Nếu < 85%, improve chunking or try hybrid search (bài 6.56).
test_queries = [
{"query": "WFH policy", "expected_source": "policies/wfh.md"},
{"query": "Paternity leave", "expected_source": "policies/leave.md"},
# ... 50 queries
]
def eval_retrieval(index, queries):
correct = 0
for test in queries:
results = index.search(test["query"], k=3)
top_sources = [doc["metadata"]["source"] for doc, _ in results]
if test["expected_source"] in top_sources:
correct += 1
return correct / len(queries)
accuracy = eval_retrieval(index, test_queries)
print(f"Retrieval accuracy: {accuracy:.0%}")
# Target: > 85%Anti-patterns
❌ In-memory index > 50K chunks
Memory explode, slow load.
Fix: Use Pinecone/Chroma/Qdrant.
❌ No caching embeddings
Re-embed corpus mỗi change = slow + costly.
Fix: Save index to disk.
❌ Single query at time
Sequential queries waste.
Fix: Batch embed queries if possible.
❌ Hardcode model name
Switch model → re-embed all. Bakes in assumptions.
Fix: Store embed_model in index, verify match at load.
Áp dụng ngay
Bài tập 1: Full index cho real corpus (1 giờ)
Corpus: personal notes, blog, or 20-30 markdown files.
Bài tập 2: Eval retrieval (30 phút)
Write 20 test queries. Measure top-3 accuracy.
Tune chunk size / K.
- Load files
- Chunk
- Embed + index
- Save
- Query interactively
Tóm tắt
🎯 VectorIndex class — add_documents, search, save, load.
🎯 Batch embedding (128 at time) cho speed.
🎯 Save to disk preprocessing results.
🎯 Citations grounded in source metadata.
🎯 Eval retrieval — measure accuracy, tune.