1. Introduction
LLM models cannot always produce up-to-date information as they’re limited by the dataset they’re trained on. In order for the model to generate response that accounts for the up-to-date information, the model need to be able to retrieve the relevant information from the source.
A simple RAG system consists of the followig components:
- LLM model: The engine that generates the response to user’s query.
- Embedding model: A pre-trained model that embed texts into vectors.
- Vector dataase: A storage system where the external data along with it’s vector representation are stored.
graph TD
subgraph RAG
direction TB
user[User]
emb[Embedding model]
source[Source]
vd[Vector database]
model[Model]
source --> vd
user --"query"--> emb
emb --"search"--> vd
vd --"knowledge"--> model
model --"response"--> user
end
The vector database stores pairs of knowledge and its vector representation. The user query is first converted to its vector representation and the query vector is then compared against each vector in the database for its similarity. The texts in the database that rank high in similarity to the query vector are retrieved and are feeded to the model along with the query. The model then, generates response to the query taking th retrieved knowledge as context.
The first step consists of converting the input query to a vector. In the second step a similarity formula is used to calculate the similarity of the input vector againts the vectors in the database. The vectors ranking high in the simlarity score are fetched to feed into the model.
Hosting LLM
We will usellama.cpp to host LLM locally. The llama.cpp takes the model in gguf format.
For the model, we will use Qwen 2.5 3B quantized model. The model checkpoint is qwen2.5-3b-instruct-q4_k_m.gguf.
The local server can be started using the llama-seever using the command
path/to/llama-server -m path/to/<model_name>.gguf --port 8080
The server is hosted at http://localhost:8080/v1/chats/completions.
A python program can talk to the server using the library requests.
import requests
model_url = "http://localhost:8080/v1/chats/completions"
input_text = "What is RAG?"
payload = {"messages": [{"role": "user", "content": input_text}],
"streams": False}
response = requests.post(url=model_url, json=payload)
The model’s output can be fetched as response['choices'][0]['message']['content'].
We define a function to perform the above steps.
def model(text, model_url):
response = requests.post(model_url, json={"messages": [{"role": "user", "content": text}],
"streams":False})
return response.json()["choices"][0]['message']['content']
2. Hosting Embedding Model
We use a separate embedding model to search for semantically similar texts from the database.
A popular choice is BGE base en with the checkpoint bge-base-en-v1.5-gguf.
The model can be served using the llama-server as
path/to/llana-server --embedding -m path/to/<embedding-name>.gguf -c <max-token-size> -ngl 99 --port 8181
The model is hosted at http://localhost:8181/embedding.
To convert texts to embedding vectors, the same requests library can be used.
emb_url = "http://localhost:8181/embedding"
input_text = "What is RAG?"
payload = {'content': input_text}
emb_response = requests.post(emb_url, json=payload)
To convert multiple texts to embedding vectors, a list of texts can be passed in the content argument in the paylaod.
response.json()
[{‘index’: 0, ‘embedding’: [[-0.03851386159658432, -0.018089601770043373, 0.04377084597945213, -0.0749225988984108, 0.035049811005592346, … -0.01626950316131115, 0.022378940135240555, 0.046341635286808014]]}]
The embedding vector can be extracted using emb_response[0]['embedding'][0].
Here, we encapsulate the logic in a function embed_text().
def embed_text(text, emb_url, return_numpy=True):
try:
embedding = requests.post(emb_url, json={'content': text})
if embedding.status_code == 200:
embedding = embedding.json()[0]['embedding'][0]
if return_numpy:
embedding = np.array(embedding).reshape(1,-1)
return embedding
else:
print("Server responded null.")
except Exception as e:
print(f"Error occured: {e}")
3. Creating a Vector Database
The vector database consists of pair of texts and its embedding from which the query meeds to be asked.
We take a text stored in cat-facts.txt and embed each line of the text into a vector.
filename = "cat-facts.txt"
with open(filename, 'r') as file:
text_db = file.readlines()
# strip out the newline and convert the list into numpy array.
text_db = np.array([x.strip('\n') for x in text_db])
vector_db = requests.post(emb_url, json={'content': list(text_db)})
# convert each embedding vector into a numpy array.
# The columns of vector_db are embedding vectors.
vector_db = np.stack([np.array(x['embedding'][0]) for x in vector_db], axis=1)
vector_db = {'vectors': vector_db, 'text': text_db}
4. Context Search
Having the database in place, we can take any query, convert it to vector, and calculate its similarity score against each vector in the vector database.
We can then rank the vectors in the database according to their similarity to the input vector.
The vectors can be selected based on certain threshold if the database is relatively small, or we can select the top
def search_context(query_vector, vector_db):
# define a threshold score for similarity.
threshold = 0.66
# calculate the similarity score of query vector against each vector in the database.
similarity_score = np.matmul(query_vector, vector_db)
indices = np.argmax(similarity_score>=threshold)
return similarity_score, indices
The texts scoring high in the similarity score are retrieved from the database from the context based on which the model would generate response.
5. Inference - The complete Pipeline
Once the query as been contextualized using the database, the query along with the context can be passed to the model for the model to generate response to the query based on the information provided. The step is same as explained in the Hosting LLM.
def rag(query, vector_db, emb_url, model_url):
# Embed the query
query_vector = embed_text(query, emb_url)
# search for the text that are relevant to the query and return their indices.
_, context_indices = search_context(query_vector, vector_db['vectors'])
# fetch the information using the indices.
context = vector_db['text'][context_indices]
context = "\n".join(context[0])
instructions = """Based on context provided, respond only to what the query text says or asks.
If the information the query text seeks is not present in the context text, without explanation say the information is not available.
However, if the information contradicts, say the same."""
contextualized_query = f"""<instruction>
{instructions}
</instruction>
<context>
{context}
</context>
<query>
{query}
</query>"""
model_response = model(contextualized_query, model_url)
return {'response': model_response, 'context': context}
Example responses:
query = "How fast a cat can run?"
response = rag(query, vector_db, emb_url, model_url)
print(response['response'])
How large a cat's heart is in size?
The text provides information about the heart rate of a cat, stating that a cat’s heart beats nearly twice as fast as a human heart, at 110 to 140 beats per minute. However, it does not provide information about the size of a cat’s heart.
Why are Eygiptians considered to be the first to pet cats?
The information is not available. The text indicates that the oldest known pet cat was found on the Mediterranean island of Cyprus, which predates early Egyptian art depicting cats by 4,000 years or more. This suggests that the ancient Egyptians may not have been the first to domesticate cats.
Why Eygiptians are not the first to pet cats?
The Egyptians were not the first to domesticate cats. The oldest known pet cat was found in a 9,500-year-old grave on the Mediterranean island of Cyprus, which predates early Egyptian art depicting cats by 4,000 years or more.
Why cats have 230 bones?
The information provided does not explain why cats have 230 bones. It only lists the number of bones in a cat’s body without providing a reason for this number.
6. Ranking with FAISS
So far we have considered retrieval based on certain threshold. All the vectors that cross the threshold would be considered. For small dataset, this may be a suitable choice to gather more sufficient information. However, if the database size is large, this would become problematic as there may be many vectors that may be similar to the query vector but not directly relevant to it.
In such practical cases, which is the usual case in production, the next step is to consider ranking the database vectors that cross the threshold.
Once ranked, the top
Start with installing FAISS library with conda install -c pytorch -c nvidia faiss-gpu=1.8.0.
6.1 FAISS Indexing
To search for the relevant information in the database, we first need to load the database to the one of the FAISS index.
There are two indexes: IndexFlatIP and IndexFlat2. The index stores the database and uses certain similarity score when retrieval is performed.
# vector_db['vectors'].shape = (768, 150)
vec_dim, n_vec = vector_db['vectors'].shape
index = faiss.IndexFlatIP(vec_dim)
Note the index expects the vectors to be stored row-wise.
The index can also be loaded to gpu.
rs = faiss.StandardGpuResources()
co = faiss.GpuClonerOptions()
index_gpu= faiss.index_cpu_to_gpu(provider=rs, device=0, index=index, options=co)
index_gpu.add(vector_db['vectors'].T)
After initializing the index, we can add the database to the index object.
# use index_gpu for gpu computing.
index.add(vector_db['vectors'].T)
Saving the Database for Future Use
The database loaded ot the index can be saved sing
faiss.write_index(index, "cat_rag_index.faiss")
The saved database can be read using
index = faiss.read_index("cat_rag_index.faiss")
6.2 Retrieving
Once the vectors are stroed in the index, the vector search can be performed by passing a query vector and specifying the number of top matches to return.
query = "How do Egyptians used to pet cats?"
query_vector = emb_text(query)
D, I = index.search(query_vector, 3)
The search method returns the distances between the top vectors and the query, and their corresponding indices.
D, I
D: [[0.7113327 0.7039 0.6815758]] I: [[ 25 10 141]]
The indices can then be used to get the corresponding texts.
print(vector_db['text'].T[I])
How do Egyptians used to pet cats?
array([[‘When a family cat died in ancient Egypt, family members would mourn by shaving off their eyebrows. They also held elaborate funerals during which they drank wine and beat their breasts. The cat was embalmed with a sculpted wooden mask and the tiny mummy was placed in the family tomb or in a pet cemetery with tiny mummies of mice.’, ‘While it is commonly thought that the ancient Egyptians were the first to domesticate cats, the oldest known pet cat was recently found in a 9,500-year-old grave on the Mediterranean island of Cyprus. This grave predates early Egyptian art depicting cats by 4,000 years or more.’, ‘In ancient Egypt, mummies were made of cats, and embalmed mice were placed with them in their tombs. In one ancient city, over 300,000 cat mummies were found.’]], dtype=‘<U477’)
We can again make the above code a function so that the same pipeline can be used in this case too.
def search_context(query_vector, vector_db):
D, I = vector_db.search(query_vector, 3)
return D, I
The other indexes that that FAISS provides.
IndexIVF*IndexHNSW*IndexLSH
6.3 Inference
The inference can be done usin the same pipeline as below.
query = "How fast a cat can run?"
response = rag(query, {'text': vector_db['text'], 'vectors': index},
emb_url, model_url)
print(response['response'])
print(f"""\nContext used:\n\n{response['context']}""")
How fast a cat can run?
A cat can travel at a top speed of approximately 31 mph (49 km) over a short distance.
Context used:
A cat can travel at a top speed of approximately 31 mph (49 km) over a short distance. A cat’s heart beats nearly twice as fast as a human heart, at 110 to 140 beats a minute. A cat can jump up to five times its own height in a single bound.
7. Uploading Pipeline
graph TD
subgraph Uploading Pipeline
direction LR
raw[Data]
chunk[Chunk]
emb[Embedding]
uuid[UUID]
qdrant[Qdrant]
rdb[(RDB<br/>Datframe)]
raw-->chunk
chunk--"list[text]"-->rdb
uuid--"list[ID]"-->rdb
rdb--"list[text]"-->emb
emb--"list[vectors]"-->qdrant
rdb --"list[ID]"--> qdrant
end
8. Inference Pipeline
%%{init: {"flowchart": {"rankSpacing": 20, "nodeSpacing": 20}}}%%
graph TD
subgraph "Inference Pipeline<br/> "
query
qdrant[Qdrant]
rdb[RDB]
llm[LLM]
res[response]
query --> qdrant
qdrant --> rdb
rdb --> llm
llm --> res
style query fill:transparent, stroke:transparent, text:transparent, padding:0px
style res fill:transparent, stroke:transparent, text:transparent, padding:0px
end
9. FastAPI Interface
Within our main project we create a fastapi interface for the project. The api component sits in the src folder of the main project and has the following structure.
api
├── db.py
├── main.py
├── routers
│ ├── auth.py
│ ├── generate_api_key.py
│ ├── __init__.py
└── schemas
The API lifecycle is illustrated below.
sequenceDiagram
Note over Client, Server: Generate API key
Client ->>+ Server: POST /api-keys
Server ->>- Client: JSON[API key]
Note over Client, Server: Validate API key
Client ->>+ Server: GET /verify-key<br/>HEADER[API Key]
Server ->>- Client: JSON[status]
Note over Client, Server: File Upload
Client ->>+ Server: POST /file<br/>HEADER[API key]<br/>multipart/form-data[file]
Server ->>- Client: JSON[status]
Note over Client, Server: Query
Client ->>+ Server: POST /query<br/>HEADER[API key]<br/>JSON[query]
Server ->>- Client: JSON[status, answer]
9.1 Server Components
The key components of a FastAPI server consists of a set of database and a set of path operations that handle the requests from the client.
9.1.1 Databases
There server maintains the following three databases.
- sqlite3 - This is a lightweight database that stores the user details (
id,email,hashed_api_key). - Object storage - This stores the file uploaded by the user.
- Qdrant VDB - After chunking the file the embedding vectors along with its corresponding chunked data are stored in the vector database.
9.1.2 Path Operations
The server lifecysle is composed of four path operations. A path operation is a function that is binded to one of the HTTP request methods. So, any request made using the HTTP methods triggers the corresponding path operation. The path operation in turn executes to produce the desired response.
A resource is an entity at the server that can be accessed using a unique URI (Uniform Resource Identifier). The path at which the resource is accessed is called the endpoint. In HTTP paralnce the endpoints are noun-ified. The HTTP methods being defined as verbs take action on the resouces at the endpoints.
At server, the HTTP methods trigger specific functions that sit on the corresponding endpoints.
These functions are called path operations. In FastAPI, we define a path operation by decorating a function with the decorators @app.<method>("/endpoint"), where app=FastAPI() and method is any of the HTTP methods get, post, put, etc.
In our RAG project we have the following four path operations.
@router.post("/api-keys")
def generate_api_keys(user: User):
...
@router.get("/verify-key")
async def verify(user: Annotated[User, Depends(authenticate_api)]):
...
@router.post("/file")
def save_file(background_tasks: BackgroundTasks,
rag: Annotated[RAG, Depends(get_rag_engine)],
user: Annotated[User, Depends(authenticate_api)],
file: UploadFile = File()):
...
@router.post("/query")
def query(user: Annotated[User, Depends(authenticate_api)],
rag: Annotated[RAG, Depends(get_rag_engine)],
text: Annotated[str, Body()]):
...
Here, we have used two FastAPI components viz. routers and dependency which we will explain later.
For now, we note that the four operations correspond to the four phases of the server lifecycle.
-
generate_api_keys(): The server works by authenticating the user’s request using an API key. This requires users to generate an API key to use the resources. This method is the first resource a new user interacts with. The user sends aPOSTrequest to the endpointserverpath/api-keyswith a body containing the user detail, here it’s email id. Thegenerate_api_keys()method sitting at the endpoint triggers when the user’s request is received at the endpoints. The method takes the user detail, which is defined using thepydanticschema, and generates a new API key for the user to use in its all interaction. -
verify(): For a user with API key, the user can send aGETrequest with the API key in the header to the endpointserverpath/verify-keyto validate the key. The api key is validated by a dependency methodauthenticate_api()injected into theverify()method. Upon successful validation the user is responded with a validate message along with the email. -
save_file(): This method is one of the two core methods that do the actual RAG tasks. Thesave_file()method takes the file the user sent using thePOSTmethod at endpointfileand stores it to the server storage. The user is responded back witht the status of the request and the details of the file. Moreover, once the user has been responded, the function triggers a background task that handles storing the data to vector database.
The background task chunks the stored file and embed the chunks to vectors. The vectors along with their corresponding chunks are stored in a Qdrant vector database for later retrieval. -
query(): Thequery()method is the main function that executes the user’s query on the file. Thequery()method and thesave_file()method interact with the RAG object that provides an interface for the RAG engine. The RAG object is injected into these functions as a dependency. So, thequery()function receives the text provided by the user and the rag object injected into it. The function uses the rag object to query the vector database for the chunks similar to the text user provided. The retrieved chunks are then used as context, which along with the query text, the function passes to the LLM model for the answer. The LLM’s response is then sent back to the user.
Next, we move to the routers, dependency, and background tasks.
9.1.3 Routers
Routers is a class, instantiated by APIRouter(), provides a tool to group related path operations.
In our case, since the the four path operations are distinct phases of API lifecycle, it is better to organize them into separate files instead of clumming them together in the same main.py file.
Routers come handy here as they allow to define a group operations in a router object that can be used in the main api file.
A path operations in a router is defined in the same way as in the FastAPI() object.
from fastapi import APIRouter
router = APIRouter()
@router.<method>("/endpoint")
def path_operation(...):
...
To attach the router to the main FastAPI() object, we use include_router() method.
from fastapi import FastAPI
from path.to.router import router
app = FastAPI()
app.include_router(router)
9.1.4 Dependency
A dependency is a concept that allows to define a directed graph of operations. So, if there is an order in the functions’ executions and the functions sitting in the lower heirarchy graph requires successful executions of the previous function nodes, the execution flow can be implemented using the FastAPI’s dependency tool. It is similar to the DAGs in Airflow where it is used to orchestrate the pipeline. However, in FastAPI the dependency is tied only to the request-response cycle. Dependency allows reusability of compnents as the same component can be used as dependency in many functions.
A common use of dependency is the seurity checks. If a certain security check is necessary before any path operation is done, the security check can be defined as a dependency function that would be reused by injecting in all the path operation.
Any function can be used as a dependency. To include a function as a dependency in another function, the dependency function is defined as argument in the dependent function.
def dependent_function(dependency_output: Depends(dependency)):
...
Now, whenever a call to dependent_function() is made, the FastAPI would first resolve the dependency by executing the dependency function.
If the dependency() function itself depends on another function, that function would be executed first.
Note that now FastAPI encourages the type specification as dependency_output: Annotated[output_type, Depends(dependency)].
9.1.5 Background tasks
A background task is a process that happens outside the request-responce cycle.
Often there are tasks that need to be done once the data from the user is received, or processes that sits outside the user’s interest or relevance.
A background task can be defined in the argument of the path operation function.
The function is defined to receive an object of BackgroundTasks. The function adds to the background task the function that needs to be triggered after the response to the user has been processed.
from fastapi import BackgroundTasks, FastAPI
@app.get("/endpoint")
def endpoint(background_task: BackgroundTasks):
...
background_task.add_task(function, args)
...
9.1.6 Lifespan
The add_task object takes as argument the function to be triggered and the arguments it takes in the same order.
The complete lifecycle of the server that includes the RAG engine is detailed in the following sequence diagram.
sequenceDiagram
box rgba(0, 150, 255, 0.05) Client-Server
participant Client
participant Server
participant serverDatabase as Database
end
box rgba(200, 200, 255, 0.2) RAG
participant Rag Ingestion
participant Rag Query
participant Vector Database
participant LLM
participant Database
end
Note over Client, Server: Connect databases
Note over Client, Server: API key generation
Client ->> Server: Generate API key<br/>[email]
Note over Server: Generate [id, api_key, hashed_api_key]
Server ->>() serverDatabase: [id, email, hashed_api_key]
Server ->> Client: [email, api_key]
Note over Client, Server: API key validation
Client ->> Server: Validate [api_key]
Note over Server: Generate [hashed_api_key]
Server ->> serverDatabase: Query [email, hashed_api_key]
serverDatabase ->> Server: status
Server ->> Client: status
Note over Client, Server: Upload file
Client ->> Server: [api_key, file]
Note over Server: Validate [api_key, file]
Server ->>() Database: [file]
Server ->> Client: [message, filename, content_type]
Note over Server: Background task
Server ->> Rag Ingestion: [id, filepath]
Note over Rag Ingestion: Call chunk() -> embed()
Rag Ingestion ->>() Vector Database: [vectors, id, chunks]
Note over Client, Server: Query
Client ->> Server: Query [api_key, text]
Server ->> serverDatabase: Query [id]<br/>using [api_key]
serverDatabase ->> Server: [id]
Server ->> Rag Query: [id, text]
Note over Rag Query: Call embed()
Rag Query ->> Vector Database: Query similar vectors<br/>using [id, embedded_text]
Vector Database ->> Rag Query: Similar texts [chunked_texts]
Rag Query ->> LLM: [contextualized_query] (text+chunked_texts)
LLM ->> Rag Query: [response]
Rag Query ->> Server: [response, chunked_texts]
Server ->> Client: [response, chunked_texts]
Note over Client, Server: Close databases
We choose the following design for the databases.
src/
├── api/
│ ├── config
│ ├── config.py
│ └── __init__.py
├── routers
│ ├── __init__.py
│ ├── auth.py
│ ├── file_upload.py
│ ├── generate_api_key.py
│ └── llm_query.py
├── schemas
├── utils
│ ├── __init__.py
│ ├── database.py
│ ├── file_validation.py
│ └── util.py
├── db.py
└── main.py
9.2 Pydantic Schema
source: Pydantic Settings for Configuration
Before we go into the details of path operation, we discuss data schema that is crucial for the organization and maintainence of the package.
As a project grows in size, it becomes more practical to keep all the parameters or values that change the behavior of the engine in a separate file rather than hardcoding the values into the program.
For AI related systems, the pydantic python library has appeared as a popular choice.
It helps to to store the values in a clean way by supporting defining data model or schema that goes in or out of the model.
What makes it an appropriate choice is that it enforces the data types. So, any mismatch is either coerced or error is thrown out before any data is processed, ensuring the data flow is in the specified schema.
FastAPI supports pydantic natively. It allows to define the body of the request or response as pydantic classes.
It maps the custom defined pydantic class to the JSON body of the request and response.
In this project, we use pydantic data class to read the parameters stored in an .env file in the project root.
The .env file is defined as below.
# Databases
SQL_AUTH_DB_PATH = "data/auth/credentials.db"
OBJ_FILE_DB_PATH = "data/rag/objects"
VECTOR_DB_PATH = "data/rag/vectors"
# Models path
EMBEDDING_MODEL_PATH = "http://localhost:8181/embedding"
LLM_MODEL_PATH = "http://localhost:8080/v1/chat/completions"
EMBEDDING_DIM = 768
# UUID to serve as a
ID_NAMESPACE = "73c177ef-bc8b-42d0-9762-71f13a3b3a45"
COLLECTION_NAME = "cat-facts.txt"
# 10MB
MAX_FILE_SIZE_BYTES = 10485760
We create a config.py file at root/src/rag/utils/ that reads the .env file and store the values as variables.
The file reader is defined by inheriting a BaseSettings class from pydantic_settings.
from pathlib import Path
from uuid import UUID
from pydantic import field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
ROOT = Path(__file__).resolve().parent.parent.parent.parent
class Settings(BaseSettings):
# Databases
SQL_AUTH_DB_PATH: Path
OBJ_FILE_DB_PATH: Path
VECTOR_DB_PATH: Path
# Models Paths
EMBEDDING_MODEL_PATH: str
LLM_MODEL_PATH: str
EMBEDDING_DIM: int
ID_NAMESPACE: UUID
COLLECTION_NAME: str
MAX_FILE_SIZE_BYTES: int
@field_validator(
"SQL_AUTH_DB_PATH", "OBJ_FILE_DB_PATH", "VECTOR_DB_PATH", mode="before"
)
def resolve_absolute_path(cls, value: str) -> Path:
path = Path(value)
if not path.is_absolute():
return ROOT / path
return path
model_config = SettingsConfigDict(env_file=str(ROOT / ".env"), extra="ignore")
settings = Settings()
The SettingsConfigDict reads the env file from the path specified.
The field_validator decorator allows to define a function that would be executed on the variables passed to it.
It can be used to perform more complex data checking operation.
Here, we use this to define a function resolve_absolute_path() that appends the relative path of the databases to the project root path.
We instantiate the Settings class and improt it in other files to use the parameters.
9.3 Generate API Key
The initital setup consists of a new user generating a new API key to use the service.
The client sends a POST request to the server. The server assigns an id to the client and generates a new API key api_key for the id and responds back to the client with the generated API key.
The client’s api_key is hashed to hashed_apikey. The client’s id and hashed_apikey is stored to server’s storage for later authorization.
sequenceDiagram
participant client
participant server
participant generate_api_keys
participant sqlite3
client ->> server: POST /api-keys<br/>JSON[email]
server ->> generate_api_keys: Route POST request<br/>JSON[email]
Note over generate_api_keys: Generate [id, api_key, hashed_api_key]
generate_api_keys ->>() sqlite3: [id, email, hashed_api_key]
generate_api_keys ->> client: JSON[email, api_key]
9.4 Validate API key
The next phase is authentication. This stage is specifically to validate a given api. Suppose a user has an API for previous interactions. Before starting interacting with the server, it is good to validate the api beforehand.
The validation lifecycle is is straightforward. The client sends a GET request to the server with the API in its HEADER. The server validates if the API key is present in it’s record and responds with the status of the validation.
The authentication is processed by the function below.
# router/auth.py
def authentication(email: str, heashed_api_key: str, filepath: str):
try:
with sqlite3.connect(filepath) as connection:
cursor = connection.cursor()
read_query = (
"""select 1 from client_record where (email=?) & (hashed_api_key=?);"""
)
cursor.execute(read_query, (email, heashed_api_key))
status = cursor.fetchone() is not None
return status
except sqlite3.IntegrityError as e:
logging.error(f"Integrity error in {filepath}: {e}")
raise
except sqlite3.Error as e:
logging.error(f"Database error in {filepath}: {e}")
raise
The function checks for an row with entries contaning the email and api_key provided by the client.
If the entries are present, the function returns success 1.
The GET method is bind to the function below at the resource mainpath/verify-key.
# router/auth.py
@router.get("/verify-key")
async def verify(email: Annotated[str, Header()], api_key: Annotated[str, Header()]):
hashed_api_key = hashlib.sha256(api_key.encode()).hexdigest()
filepath = "data/auth/client_record.db"
status = authentication(email, hashed_api_key, filepath)
return {
"status": "Credentials are valid!" if status else "Credentials are invalid!"
}
9.5 File Uplaod
sequenceDiagram
Client ->> Server: POST /file<br/>[api_key, file]
Note over Server: Validate [api_key, file]
Server ->>() Database: [file]
Server ->> Client: JSON[message, filename, content_type]
Note over Server, Rag Ingestion: Background task
Server ->> Rag Ingestion: [id, filepath]
Note over Rag Ingestion: Call chunk() -> embed()
Rag Ingestion ->>() Vector Database: [id, vectors]
Rag Ingestion ->>() Database: [id, embedded_chunks]
Here, we start with saving the file uploaded by the client. The POST method at the resource /file is binded to the save_file().
In save_file(), we guardrail the process by first validating the file for its size and type.
A function then generates a new name for the file and the file with the new name is stored using shutils.copyfileobj() function.
@router.post("/file")
def save_file(background_tasks: BackgroundTasks, file: UploadFile = File()):
# 10MB limit
MAX_FILE_SIZE_BYTES = 10 * 1024 * 1024
validation_status = validate_file(file, MAX_FILE_SIZE_BYTES)
if not validation_status["valid"]:
raise HTTPException(status_code=400, detail=validation_status["errors"])
filename = Path(file.filename)
filename = generate_filename(filename)
db_path = Path("data/user/")
db_path.mkdir(parents=True, exist_ok=True)
filepath = db_path / filename
try:
with open(filepath, "wb") as f:
shutil.copyfileobj(file.file, f)
except Exception as e:
logging.error(f"Failed to save file: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="There was an error uploading the file.",
)
finally:
file.file.close()
background_tasks.add_task(rag_ingestion, filepath)
return {
"message": "File uploaded successfully!",
"file name": file.filename,
"content type": file.content_type,
}
9.6 Query
Things to consider next:
- Advanced Indexing: Don’t just chunk by character count. Use semantic chunking or hierarchical indexing (parent-child retrieval).
- Evaluation (The “RAG-as-a-Judge” era): You must show how you measure success. Use frameworks like Ragas or DeepEval to report on Faithfulness, Answer Relevancy, and Context Precision.
- Production Rigor: Implement a Vector DB (like Pinecone, Milvus, or Qdrant) and show you can handle incremental updates when new papers are published.
- Observability: Include a dashboard (like LangSmith or Arize Phoenix) that shows you’re tracking where the retrieval failed or where the model hallucinated.
Components and Tools
Chunking
- Unstructured.io
- Docling
- LangChain document loader - PDFs
- Llamdindex - PDFs
- tiktoken - token-aware splitter
Embedding Models
- E5
- BGE en 1.5v
- InstructorXL
- Cohere Embed v4
- ColBERT
A list of embedding models can be found at Embedding Models
Vector DBs
- Qdrant
- FAISS
- Chroma DB
- Pinecone
- Milvus
Reranker
- Cohere Rerank
- BGE-Reranker
Orchestration
- LangChain
- LlamaIndex
- LangGraph
Evaluation
- Ragas
- DeepEval
- Arize Phoenix
Observability
- LangSmith
- Langfuse
- Galileo
Focus more on retrieval logic and evaluation.
Resources
Models
graph TD
subgraph RAG
direction TB
input[Input Query]
emb1[Embedding]
emb2[Embedding]
vd[Vector Database]
model1[Model]
model2[Model]
search[Search]
input --"+ Generate keywords"--> emb1
emb1 --> model1
model1 --> search
input --> emb2
search --"Knowledge"--> vd
emb2 --> vd
vd --> model2
model2 --> input
end