Building MiniRAG: A Complete RAG Backend in Five Steps
Building MiniRAG: A Complete RAG Backend in Five Steps
After five intense development sessions, I just wrapped up building MiniRAG - a complete backend for Retrieval-Augmented Generation (RAG) applications. What started as a simple idea turned into a comprehensive system with multi-tenant authentication, document ingestion pipelines, vector search, and chat capabilities. Here's the story of how it all came together.
The Architecture Vision
MiniRAG follows a clean, modular architecture built around five core components:
- Multi-tenant Foundation - Secure isolation for different organizations
- Authentication Layer - Bearer token-based API security
- Content Management - Bot profiles and document sources
- Ingestion Pipeline - Async document processing and vector storage
- Chat Interface - LLM-powered conversations with retrieval context
The tech stack? FastAPI for the API layer, PostgreSQL for relational data, Qdrant for vector search, Redis for task queues, all orchestrated with Docker Compose.
Step-by-Step Implementation
Step 1: Laying the Foundation
# Core models that everything builds on
class Tenant(Base):
id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4)
name: Mapped[str] = mapped_column(String(100))
class User(Base):
tenant_id: Mapped[UUID] = mapped_column(ForeignKey("tenants.id"))
email: Mapped[str] = mapped_column(String(255), unique=True)
The foundation centered around proper multi-tenancy from day one. Every API call gets scoped to a tenant, ensuring complete data isolation.
Step 2: Security First
async def get_current_tenant_id(
token: str = Depends(oauth2_scheme),
session: AsyncSession = Depends(get_session)
) -> UUID:
# Validate bearer token and extract tenant context
api_token = await session.get(ApiToken, token)
if not api_token:
raise HTTPException(401, "Invalid token")
return api_token.tenant_id
Authentication became a dependency injection pattern - every protected endpoint automatically gets the authenticated tenant ID. Clean and secure.
Step 3: Content Management APIs
The API layer emerged as standard CRUD operations, but with tenant-scoped queries baked in:
@router.post("/", response_model=BotProfileResponse)
async def create_bot_profile(
profile: BotProfileCreate,
tenant_id: UUID = Depends(get_current_tenant_id),
session: AsyncSession = Depends(get_session)
):
# All operations automatically scoped to tenant
db_profile = BotProfile(**profile.model_dump(), tenant_id=tenant_id)
Step 4: The Ingestion Pipeline
This is where things got interesting. Documents need to be chunked, embedded, and stored in vector format - all asynchronously:
async def ingest_source_content(source_id: UUID):
"""Background task for processing documents"""
chunks = await chunking_service.chunk_content(content)
embeddings = await embedding_service.embed_chunks(chunks)
await vector_store.store_embeddings(source_id, chunks, embeddings)
The pipeline handles the entire flow from raw text to searchable vectors, with proper error handling and status tracking.
Step 5: Bringing It All Together
The chat endpoint orchestrates everything - retrieving relevant context, managing conversation history, and generating responses:
# Retrieve relevant context from vector store
context_chunks = await vector_store.similarity_search(
query=message.content,
bot_profile_id=bot_profile_id,
limit=5
)
# Build LLM prompt with context and history
response = await llm_client.generate_response(
messages=chat_history + [user_message],
context=context_chunks
)
Lessons Learned (The Fun Parts)
The Chat History Gotcha
The Problem: I initially saved the user's message to the database, then loaded chat history. This caused the current message to appear twice in the conversation context sent to the LLM.
The Solution: Load history before saving the new message. Seems obvious in hindsight, but it took some debugging to catch this subtle ordering issue.
Testing Database Dependencies
The Problem: My ingestion worker tests kept trying to connect to PostgreSQL, even though I wanted them to use SQLite in-memory for speed.
The Solution: Dependency injection to the rescue! I created a test-specific session factory and patched it in during tests:
# In conftest.py
@pytest.fixture(scope="session")
def test_session_factory():
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
return async_sessionmaker(engine)
# Patch the production dependency
monkeypatch.setattr("app.workers.ingest.async_session_factory", test_session_factory)
FastAPI Version Drift
The Problem: HTTP_422_UNPROCESSABLE_ENTITY started throwing deprecation warnings.
The Solution: Updated to HTTP_422_UNPROCESSABLE_CONTENT. A small change, but it's these little version compatibility issues that can trip you up.
The Numbers
After five development sessions:
- ✅ 31 async tests passing
- ✅ 2 commits pushed to production
- ✅ 5 major components fully implemented
- ✅ Zero Docker dependency for testing (SQLite in-memory FTW)
What's Next?
The core is solid, but there's always more to build:
- File upload support for document ingestion
- URL scraping for web content
- Streaming responses for better chat UX
- Rate limiting and enhanced validation
- Alembic migrations for production database management
Key Takeaways
Building MiniRAG reinforced a few important principles:
- Start with security - Multi-tenancy and auth from day one, not bolted on later
- Dependency injection scales - FastAPI's DI system made testing and modularity effortless
- Async all the way - From database queries to LLM calls, async/await prevented any blocking operations
- Test without infrastructure - SQLite in-memory kept tests fast and CI-friendly
The complete codebase is available on GitHub at mini-chat-rag, and I'm excited to see how the RAG space continues to evolve. Building the foundational pieces yourself really deepens your understanding of how these AI-powered systems work under the hood.
Want to dive deeper into any of these implementation details? Feel free to explore the code or reach out with questions about building production RAG systems.