Triển khai RAG pipeline — Production code

7 — RAGTrung cấp30 phút

Bạn sẽ học được
  • Viết VectorIndex class reusable
  • Handle document batch embedding efficiently
  • Cache embeddings (không re-compute)
  • Production-ready error handling

VectorIndex class

import json
import pickle
from pathlib import Path
from typing import Any, Dict, List, Tuple
import numpy as np
import voyageai

class VectorIndex:
    """Simple in-memory vector index with persistence."""
    
    def __init__(self, embed_model: str = "voyage-3-large"):
        self.embed_model = embed_model
        self.client = voyageai.Client()
        self.docs: List[Dict[str, Any]] = []
        self.embeddings: List[np.ndarray] = []
    
    def _embed_batch(self, texts: List[str], input_type: str) -> List[List[float]]:
        """Batch embed with retry."""
        if not texts:
            return []
        result = self.client.embed(
            texts,
            model=self.embed_model,
            input_type=input_type
        )
        return result.embeddings
    
    def add_document(self, content: str, metadata: Dict[str, Any] = None):
        """Add single document."""
        emb = self._embed_batch([content], input_type="document")[0]
        self.docs.append({"content": content, "metadata": metadata or {}})
        self.embeddings.append(np.array(emb))
    
    def add_documents(self, contents: List[str], metadatas: List[Dict] = None):
        """Batch add."""
        if metadatas is None:
            metadatas = [{}] * len(contents)
        
        # Embed trong batches 128
        BATCH = 128
        for i in range(0, len(contents), BATCH):
            batch = contents[i:i+BATCH]
            embs = self._embed_batch(batch, input_type="document")
            for content, emb, meta in zip(batch, embs, metadatas[i:i+BATCH]):
                self.docs.append({"content": content, "metadata": meta})
                self.embeddings.append(np.array(emb))
    
    def search(self, query: str, k: int = 5) -> List[Tuple[Dict, float]]:
        """Search by query text. Returns list of (doc, distance)."""
        if not self.embeddings:
            return []
        
        query_emb = np.array(self._embed_batch([query], input_type="query")[0])
        
        # Normalize
        query_emb = query_emb / np.linalg.norm(query_emb)
        
        # Stack all embeddings
        all_embs = np.stack(self.embeddings)
        norms = np.linalg.norm(all_embs, axis=1, keepdims=True)
        normalized = all_embs / norms
        
        # Similarity = dot product (both normalized)
        similarities = normalized @ query_emb
        
        # Top K
        top_indices = np.argsort(-similarities)[:k]
        
        return [
            (self.docs[i], float(1.0 - similarities[i]))  # distance = 1 - sim
            for i in top_indices
        ]
    
    def save(self, path: str):
        """Persist to disk."""
        with open(path, "wb") as f:
            pickle.dump({
                "docs": self.docs,
                "embeddings": self.embeddings,
                "embed_model": self.embed_model
            }, f)
    
    def load(self, path: str):
        """Load from disk."""
        with open(path, "rb") as f:
            data = pickle.load(f)
        self.docs = data["docs"]
        self.embeddings = data["embeddings"]
        self.embed_model = data["embed_model"]

Usage

from anthropic import Anthropic
import re

anthropic = Anthropic()

# === Setup ===
# 1. Load doc
with open("handbook.md") as f:
    doc = f.read()

# 2. Chunk
chunks = re.split(r"\n## ", doc)
chunks = [c.strip() for c in chunks if c.strip()]

# 3. Build index
index = VectorIndex()
metadatas = [{"chunk_id": i, "source": "handbook.md"} for i in range(len(chunks))]
index.add_documents(chunks, metadatas)

# 4. Save
index.save("handbook_index.pkl")

Usage (tiếp)

# === Query ===
# Load index (subsequent runs)
index = VectorIndex()
index.load("handbook_index.pkl")


def rag_answer(query: str) -> str:
    results = index.search(query, k=3)
    
    if not results or results[0][1] > 0.7:  # distance too high
        return "I don't have information on this."
    
    context = "\n---\n".join(doc["content"] for doc, _ in results)
    
    prompt = f"""Answer based on context.

<context>
{context}
</context>

Question: {query}"""
    
    response = anthropic.messages.create(
        model="claude-sonnet-5-20260205",
        max_tokens=1000,
        messages=[{"role": "user", "content": prompt}]
    )
    return response.content[0].text


print(rag_answer("What's our WFH policy?"))

Document loader pattern

Scale to many files:

from pathlib import Path

def load_markdown_files(directory: str) -> List[Dict]:
    """Load all .md files, return list of docs."""
    docs = []
    for path in Path(directory).rglob("*.md"):
        with open(path) as f:
            content = f.read()
        docs.append({"content": content, "path": str(path)})
    return docs


def chunk_markdown(content: str, source: str) -> List[Dict]:
    """Chunk markdown by H2 sections."""
    sections = re.split(r"\n## ", content)
    return [
        {"content": s.strip(), "source": source, "chunk_index": i}
        for i, s in enumerate(sections) if s.strip()
    ]


# Pipeline
files = load_markdown_files("docs/")
all_chunks = []
for f in files:
    all_chunks.extend(chunk_markdown(f["content"], f["path"]))

index = VectorIndex()
contents = [c["content"] for c in all_chunks]
metadatas = [{"source": c["source"], "chunk_index": c["chunk_index"]} for c in all_chunks]
index.add_documents(contents, metadatas)

index.save("docs_index.pkl")

Incremental update

Doc updated → re-index only changed chunks:

In production, use proper vector DB với delete/update APIs.

def update_doc(index: VectorIndex, file_path: str):
    """Remove old chunks từ file, add new."""
    # Remove old
    index.docs = [d for d in index.docs if d["metadata"].get("source") != file_path]
    # Need also remove from embeddings list (keep aligned)
    # ... maintain sync
    
    # Add new
    with open(file_path) as f:
        content = f.read()
    new_chunks = chunk_markdown(content, file_path)
    contents = [c["content"] for c in new_chunks]
    metadatas = [{"source": c["source"], "chunk_index": c["chunk_index"]} for c in new_chunks]
    index.add_documents(contents, metadatas)

Retrieval with citations

Inline citations + verifiable sources.

def rag_with_citations(query: str) -> Dict:
    results = index.search(query, k=3)
    
    context_parts = []
    for i, (doc, dist) in enumerate(results):
        context_parts.append(f"[{i+1}] {doc['content']}")
    
    context = "\n\n".join(context_parts)
    
    prompt = f"""Answer the question using the context. Cite sources [1], [2], etc.

<context>
{context}
</context>

Question: {query}"""
    
    response = anthropic.messages.create(
        model="claude-sonnet-5-20260205",
        max_tokens=1000,
        messages=[{"role": "user", "content": prompt}]
    )
    
    return {
        "answer": response.content[0].text,
        "sources": [
            {
                "number": i + 1,
                "source": doc["metadata"].get("source"),
                "chunk_id": doc["metadata"].get("chunk_id"),
                "snippet": doc["content"][:200]
            }
            for i, (doc, _) in enumerate(results)
        ]
    }


result = rag_with_citations("When do I get paid?")
print(result["answer"])
print("\nSources:")
for s in result["sources"]:
    print(f"  [{s['number']}] {s['source']}: {s['snippet']}...")

Error handling

from anthropic import APIError, RateLimitError
from voyageai.error import VoyageAPIError

def robust_rag(query: str, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            return rag_answer(query)
        except (RateLimitError, VoyageAPIError) as e:
            wait = 2 ** attempt
            logger.warning(f"Retry {attempt + 1} after {wait}s: {e}")
            time.sleep(wait)
        except APIError as e:
            logger.error(f"API error: {e}")
            if attempt == max_retries - 1:
                raise
    return None

Performance benchmarks

Scales ~linearly. For 100K chunks, use dedicated vector DB.

StageTime (1K chunks)
Embed 1K chunks (batch 128)~10s
Index build~1s
Save to disk<1s
Single query~200ms (embed 100ms + search 100ms)

Eval cho retrieval

Nếu < 85%, improve chunking or try hybrid search (bài 6.56).

test_queries = [
    {"query": "WFH policy", "expected_source": "policies/wfh.md"},
    {"query": "Paternity leave", "expected_source": "policies/leave.md"},
    # ... 50 queries
]

def eval_retrieval(index, queries):
    correct = 0
    for test in queries:
        results = index.search(test["query"], k=3)
        top_sources = [doc["metadata"]["source"] for doc, _ in results]
        if test["expected_source"] in top_sources:
            correct += 1
    return correct / len(queries)


accuracy = eval_retrieval(index, test_queries)
print(f"Retrieval accuracy: {accuracy:.0%}")
# Target: > 85%

Anti-patterns

❌ In-memory index > 50K chunks

Memory explode, slow load.

Fix: Use Pinecone/Chroma/Qdrant.

❌ No caching embeddings

Re-embed corpus mỗi change = slow + costly.

Fix: Save index to disk.

❌ Single query at time

Sequential queries waste.

Fix: Batch embed queries if possible.

❌ Hardcode model name

Switch model → re-embed all. Bakes in assumptions.

Fix: Store embed_model in index, verify match at load.

Áp dụng ngay

Bài tập 1: Full index cho real corpus (1 giờ)

Corpus: personal notes, blog, or 20-30 markdown files.

Bài tập 2: Eval retrieval (30 phút)

Write 20 test queries. Measure top-3 accuracy.

Tune chunk size / K.

  • Load files
  • Chunk
  • Embed + index
  • Save
  • Query interactively

Tóm tắt

🎯 VectorIndex class — add_documents, search, save, load.

🎯 Batch embedding (128 at time) cho speed.

🎯 Save to disk preprocessing results.

🎯 Citations grounded in source metadata.

🎯 Eval retrieval — measure accuracy, tune.

Nội dung này có hữu ích không?