Retrieval Augmented Generation also known as (RAG) is the process of optimizing the output of a large language model, so it references an authoritative knowledge base outside of its training data sources before generating a response. In the the rapidly evolving landscape of AI and natural language processing (NLP), RAG Techniques have emerged as a powerful approach to improve the quality, accuracy, and relevance of generated content. Retrieval Augmented Generation (RAG) combines retrieval mechanisms with generative models, allowing AI systems to pull in pertinent information and produce more coherent and informed outputs. Below, we explore 21 RAG Techniques that can elevate your AI projects.
Simple RAG
Simple RAG encodes document content into a vector DB (store), enabling quick retrieval of relevant information to enhance model responses. This foundational technique is a great starting point for those new to RAG Techniques.
Context Enrichment
Adding surrounding context to each retrieved chunk improves the coherence and completeness of the returned information. Context enrichment ensures that the AI understands not just the snippet but its surrounding context as well, making it one of the essential RAG Techniques.
Multi-faceted Filtering
Applying various filtering techniques, such as refiner metadata or similarity thresholds, refines the quality of retrieved results. Multi-faceted filtering helps ensure that the AI retrieves the most relevant and accurate information, a vital aspect of RAG Techniques.
Fusion Retrieval
Fusion retrieval combines vector-based similarity search with keyword-based retrieval, enhancing document retrieval by leveraging the strengths of both methods, making it a crucial RAG Technique.
Intelligent Reranking
After initially retrieving documents, intelligent reranking reassesses and reorders them to prioritize the most pertinent information for further processing. This is one of the most important RAG Techniques for enhancing AI accuracy.
Query Transformation
This technique modifies or expands the original query with steps like query rewriting, step-based prompting, and sub-query decomposition to enhance the retrieval process, making it an advanced RAG Technique.
Hierarchical Indices
Hierarchical indices first identify relevant document sections through summaries, then drill down to specific details within those sections, allowing for more precise retrieval in the context of RAG Techniques.
Hypothetical Questions (HyDE)
Hypothetical Document Embeddings transforms queries into hypothetical documents containing answers, bridging the gap between query and document distributions in vector space, enhancing the effectiveness of RAG Techniques.
Choose Chunk Size
Selecting an appropriate fixed size for text chunks can balance context preservation and retrieval efficiency, helping to maintain the integrity of the information and is an integral part of RAG Techniques.
Semantic Chunking
Unlike traditional methods that split text by fixed character/word counts, semantic chunking creates more meaningful, context-aware segments, enhancing the relevance of retrieved data in RAG Techniques.
Context Compression
Context compression compresses and extracts the most pertinent parts of documents in the context of a given query, ensuring that only the most relevant information is returned, an essential function of RAG Techniques.
Explainable Information Retrieval
Not only does explainable information retrieval pull in relevant documents based on a query, but it also provides explanations for why each document was selected, improving transparency in RAG Techniques. A python implementation is seen below
### Import Libraries
import os
import sys
from dotenv import load_dotenv
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..'))) # Add the parent directory to the path sicnce we work with notebooks
from helper_functions import *
from evaluation.evalute_rag import *
# Load environment variables from a .env file
load_dotenv()
# Set the OpenAI API key environment variable
os.environ["OPENAI_API_KEY"] = os.getenv('OPENAI_API_KEY')
###Define the explainable retriever class
class ExplainableRetriever:
def __init__(self, texts):
self.embeddings = OpenAIEmbeddings()
self.vectorstore = FAISS.from_texts(texts, self.embeddings)
self.llm = ChatOpenAI(temperature=0, model_name="gpt-4o-mini", max_tokens=4000)
# Create a base retriever
self.retriever = self.vectorstore.as_retriever(search_kwargs={"k": 5})
# Create an explanation chain
explain_prompt = PromptTemplate(
input_variables=["query", "context"],
template="""
Analyze the relationship between the following query and the retrieved context.
Explain why this context is relevant to the query and how it might help answer the query.
Query: {query}
Context: {context}
Explanation:
"""
)
self.explain_chain = explain_prompt | self.llm
def retrieve_and_explain(self, query):
# Retrieve relevant documents
docs = self.retriever.get_relevant_documents(query)
explained_results = []
for doc in docs:
# Generate explanation
input_data = {"query": query, "context": doc.page_content}
explanation = self.explain_chain.invoke(input_data).content
explained_results.append({
"content": doc.page_content,
"explanation": explanation
})
return explained_results
##Create a mock example and explainable retriever instance
# Usage
texts = [
"The sky is blue because of the way sunlight interacts with the atmosphere.",
"Photosynthesis is the process by which plants use sunlight to produce energy.",
"Global warming is caused by the increase of greenhouse gases in Earth's atmosphere."
]
explainable_retriever = ExplainableRetriever(texts)
##Show the results
query = "Why is the sky blue?"
results = explainable_retriever.retrieve_and_explain(query)
for i, result in enumerate(results, 1):
print(f"Result {i}:")
print(f"Content: {result['content']}")
print(f"Explanation: {result['explanation']}")
print()
Retrieval with Feedback
This technique utilizes user feedback on the relevance and quality of retrieved documents and generated responses to fine-tune retrieval and ranking models, a crucial aspect of advanced RAG Techniques.
### Import Libraries
###Import relevant libraries
import os
import sys
from dotenv import load_dotenv
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import ChatOpenAI
from langchain.chains import RetrievalQA
import json
from typing import List, Dict, Any
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..'))) ## Add the parent directory to the path sicnce we work with notebooks
from helper_functions import *
from evaluation.evalute_rag import *
# Load environment variables from a .env file
load_dotenv()
# Set the OpenAI API key environment variable
os.environ["OPENAI_API_KEY"] = os.getenv('OPENAI_API_KEY')
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
####Define documents path
path = "../data/Understanding_Climate_Change.pdf"
Create vector store and retrieval QA chain
content = read_pdf_to_string(path)
vectorstore = encode_from_string(content)
retriever = vectorstore.as_retriever()
llm = ChatOpenAI(temperature=0, model_name="gpt-4o", max_tokens=4000)
qa_chain = RetrievalQA.from_chain_type(llm, retriever=retriever)
F####unction to format user feedback in a dictionary
def get_user_feedback(query, response, relevance, quality, comments=""):
return {
"query": query,
"response": response,
"relevance": int(relevance),
"quality": int(quality),
"comments": comments
}
##Function to store the feedback in a json file
def store_feedback(feedback):
with open("../data/feedback_data.json", "a") as f:
json.dump(feedback, f)
f.write("\n")
###Function to read the feedback file
def load_feedback_data():
feedback_data = []
try:
with open("../data/feedback_data.json", "r") as f:
for line in f:
feedback_data.append(json.loads(line.strip()))
except FileNotFoundError:
print("No feedback data file found. Starting with empty feedback.")
return feedback_data
####Function to adjust files relevancy based on the feedbacks file
class Response(BaseModel):
answer: str = Field(..., title="The answer to the question. The options can be only 'Yes' or 'No'")
def adjust_relevance_scores(query: str, docs: List[Any], feedback_data: List[Dict[str, Any]]) -> List[Any]:
# Create a prompt template for relevance checking
relevance_prompt = PromptTemplate(
input_variables=["query", "feedback_query", "doc_content", "feedback_response"],
template="""
Determine if the following feedback response is relevant to the current query and document content.
You are also provided with the Feedback original query that was used to generate the feedback response.
Current query: {query}
Feedback query: {feedback_query}
Document content: {doc_content}
Feedback response: {feedback_response}
Is this feedback relevant? Respond with only 'Yes' or 'No'.
"""
)
llm = ChatOpenAI(temperature=0, model_name="gpt-4o", max_tokens=4000)
# Create an LLMChain for relevance checking
relevance_chain = relevance_prompt | llm.with_structured_output(Response)
for doc in docs:
relevant_feedback = []
for feedback in feedback_data:
# Use LLM to check relevance
input_data = {
"query": query,
"feedback_query": feedback['query'],
"doc_content": doc.page_content[:1000],
"feedback_response": feedback['response']
}
result = relevance_chain.invoke(input_data).answer
if result == 'yes':
relevant_feedback.append(feedback)
# Adjust the relevance score based on feedback
if relevant_feedback:
avg_relevance = sum(f['relevance'] for f in relevant_feedback) / len(relevant_feedback)
doc.metadata['relevance_score'] *= (avg_relevance / 3) # Assuming a 1-5 scale, 3 is neutral
# Re-rank documents based on adjusted scores
return sorted(docs, key=lambda x: x.metadata['relevance_score'], reverse=True)
###Function to fine tune the vector index to include also queries + answers that received good feedbacks
def fine_tune_index(feedback_data: List[Dict[str, Any]], texts: List[str]) -> Any:
# Filter high-quality responses
good_responses = [f for f in feedback_data if f['relevance'] >= 4 and f['quality'] >= 4]
# Extract queries and responses, and create new documents
additional_texts = []
for f in good_responses:
combined_text = f['query'] + " " + f['response']
additional_texts.append(combined_text)
# make the list a string
additional_texts = " ".join(additional_texts)
# Create a new index with original and high-quality texts
all_texts = texts + additional_texts
new_vectorstore = encode_from_string(all_texts)
return new_vectorstore
####Demonstration of how to retrieve answers with respect to user feedbacks
query = "What is the greenhouse effect?"
# Get response from RAG system
response = qa_chain(query)["result"]
relevance = 5
quality = 5
# Collect feedback
feedback = get_user_feedback(query, response, relevance, quality)
# Store feedback
store_feedback(feedback)
# Adjust relevance scores for future retrievals
docs = retriever.get_relevant_documents(query)
adjusted_docs = adjust_relevance_scores(query, docs, load_feedback_data())
# Update the retriever with adjusted docs
retriever.search_kwargs['k'] = len(adjusted_docs)
retriever.search_kwargs['docs'] = adjusted_docs
#####Finetune the vectorstore periodicly
# Periodically (e.g., daily or weekly), fine-tune the index
new_vectorstore = fine_tune_index(load_feedback_data(), content)
retriever = new_vectorstore.as_retriever()
Adaptive Retrieval
Adaptive retrieval classifies queries into different categories and applies tailored strategies (like factual or contextual extraction) for each, ensuring the best possible retrieval outcomes in RAG Techniques. Traditional RAG systems often use a one-size-fits-all approach to retrieval, which can be suboptimal for different types of queries. Our adaptive system is motivated by the understanding that different types of questions require different retrieval strategies. For example, a factual query might benefit from precise, focused retrieval, while an analytical query might require a broader, more diverse set of information. The python code that demonstrates adaptive learning is seen below
### Import Libraries
####Import relevant libraries
import os
import sys
from dotenv import load_dotenv
from langchain.prompts import PromptTemplate
from langchain.vectorstores import FAISS
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.prompts import PromptTemplate
from langchain_core.retrievers import BaseRetriever
from typing import Dict, Any
from langchain.docstore.document import Document
from langchain_openai import ChatOpenAI
from langchain_core.pydantic_v1 import BaseModel, Field
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..'))) # Add the parent directory to the path sicnce we work with notebooks
from helper_functions import *
from evaluation.evalute_rag import *
# Load environment variables from a .env file
load_dotenv()
# Set the OpenAI API key environment variable
os.environ["OPENAI_API_KEY"] = os.getenv('OPENAI_API_KEY')
########Define the query classifer class
class categories_options(BaseModel):
category: str = Field(description="The category of the query, the options are: Factual, Analytical, Opinion, or Contextual", example="Factual")
class QueryClassifier:
def __init__(self):
self.llm = ChatOpenAI(temperature=0, model_name="gpt-4o", max_tokens=4000)
self.prompt = PromptTemplate(
input_variables=["query"],
template="Classify the following query into one of these categories: Factual, Analytical, Opinion, or Contextual.\nQuery: {query}\nCategory:"
)
self.chain = self.prompt | self.llm.with_structured_output(categories_options)
def classify(self, query):
print("clasiffying query")
return self.chain.invoke(query).category
#####Define the Base Retriever class, such that the complex ones will inherit from it
class BaseRetrievalStrategy:
def __init__(self, texts):
self.embeddings = OpenAIEmbeddings()
text_splitter = CharacterTextSplitter(chunk_size=800, chunk_overlap=0)
self.documents = text_splitter.create_documents(texts)
self.db = FAISS.from_documents(self.documents, self.embeddings)
self.llm = ChatOpenAI(temperature=0, model_name="gpt-4o", max_tokens=4000)
def retrieve(self, query, k=4):
return self.db.similarity_search(query, k=k)
######Define Factual retriever strategy
class relevant_score(BaseModel):
score: float = Field(description="The relevance score of the document to the query", example=8.0)
class FactualRetrievalStrategy(BaseRetrievalStrategy):
def retrieve(self, query, k=4):
print("retrieving factual")
# Use LLM to enhance the query
enhanced_query_prompt = PromptTemplate(
input_variables=["query"],
template="Enhance this factual query for better information retrieval: {query}"
)
query_chain = enhanced_query_prompt | self.llm
enhanced_query = query_chain.invoke(query).content
print(f'enhande query: {enhanced_query}')
# Retrieve documents using the enhanced query
docs = self.db.similarity_search(enhanced_query, k=k*2)
# Use LLM to rank the relevance of retrieved documents
ranking_prompt = PromptTemplate(
input_variables=["query", "doc"],
template="On a scale of 1-10, how relevant is this document to the query: '{query}'?\nDocument: {doc}\nRelevance score:"
)
ranking_chain = ranking_prompt | self.llm.with_structured_output(relevant_score)
ranked_docs = []
print("ranking docs")
for doc in docs:
input_data = {"query": enhanced_query, "doc": doc.page_content}
score = float(ranking_chain.invoke(input_data).score)
ranked_docs.append((doc, score))
# Sort by relevance score and return top k
ranked_docs.sort(key=lambda x: x[1], reverse=True)
return [doc for doc, _ in ranked_docs[:k]]
#####Define Analytical reriever strategy
class SelectedIndices(BaseModel):
indices: List[int] = Field(description="Indices of selected documents", example=[0, 1, 2, 3])
class SubQueries(BaseModel):
sub_queries: List[str] = Field(description="List of sub-queries for comprehensive analysis", example=["What is the population of New York?", "What is the GDP of New York?"])
class AnalyticalRetrievalStrategy(BaseRetrievalStrategy):
def retrieve(self, query, k=4):
print("retrieving analytical")
# Use LLM to generate sub-queries for comprehensive analysis
sub_queries_prompt = PromptTemplate(
input_variables=["query", "k"],
template="Generate {k} sub-questions for: {query}"
)
llm = ChatOpenAI(temperature=0, model_name="gpt-4o", max_tokens=4000)
sub_queries_chain = sub_queries_prompt | llm.with_structured_output(SubQueries)
input_data = {"query": query, "k": k}
sub_queries = sub_queries_chain.invoke(input_data).sub_queries
print(f'sub queries for comprehensive analysis: {sub_queries}')
all_docs = []
for sub_query in sub_queries:
all_docs.extend(self.db.similarity_search(sub_query, k=2))
# Use LLM to ensure diversity and relevance
diversity_prompt = PromptTemplate(
input_variables=["query", "docs", "k"],
template="""Select the most diverse and relevant set of {k} documents for the query: '{query}'\nDocuments: {docs}\n
Return only the indices of selected documents as a list of integers."""
)
diversity_chain = diversity_prompt | self.llm.with_structured_output(SelectedIndices)
docs_text = "\n".join([f"{i}: {doc.page_content[:50]}..." for i, doc in enumerate(all_docs)])
input_data = {"query": query, "docs": docs_text, "k": k}
selected_indices_result = diversity_chain.invoke(input_data).indices
print(f'selected diverse and relevant documents')
return [all_docs[i] for i in selected_indices_result if i < len(all_docs)]
######Define Opinion retriever strategy
class OpinionRetrievalStrategy(BaseRetrievalStrategy):
def retrieve(self, query, k=3):
print("retrieving opinion")
# Use LLM to identify potential viewpoints
viewpoints_prompt = PromptTemplate(
input_variables=["query", "k"],
template="Identify {k} distinct viewpoints or perspectives on the topic: {query}"
)
viewpoints_chain = viewpoints_prompt | self.llm
input_data = {"query": query, "k": k}
viewpoints = viewpoints_chain.invoke(input_data).content.split('\n')
print(f'viewpoints: {viewpoints}')
all_docs = []
for viewpoint in viewpoints:
all_docs.extend(self.db.similarity_search(f"{query} {viewpoint}", k=2))
# Use LLM to classify and select diverse opinions
opinion_prompt = PromptTemplate(
input_variables=["query", "docs", "k"],
template="Classify these documents into distinct opinions on '{query}' and select the {k} most representative and diverse viewpoints:\nDocuments: {docs}\nSelected indices:"
)
opinion_chain = opinion_prompt | self.llm.with_structured_output(SelectedIndices)
docs_text = "\n".join([f"{i}: {doc.page_content[:100]}..." for i, doc in enumerate(all_docs)])
input_data = {"query": query, "docs": docs_text, "k": k}
selected_indices = opinion_chain.invoke(input_data).indices
print(f'selected diverse and relevant documents')
return [all_docs[int(i)] for i in selected_indices.split() if i.isdigit() and int(i) < len(all_docs)]
######Define Contextual retriever strategy
class ContextualRetrievalStrategy(BaseRetrievalStrategy):
def retrieve(self, query, k=4, user_context=None):
print("retrieving contextual")
# Use LLM to incorporate user context into the query
context_prompt = PromptTemplate(
input_variables=["query", "context"],
template="Given the user context: {context}\nReformulate the query to best address the user's needs: {query}"
)
context_chain = context_prompt | self.llm
input_data = {"query": query, "context": user_context or "No specific context provided"}
contextualized_query = context_chain.invoke(input_data).content
print(f'contextualized query: {contextualized_query}')
# Retrieve documents using the contextualized query
docs = self.db.similarity_search(contextualized_query, k=k*2)
# Use LLM to rank the relevance of retrieved documents considering the user context
ranking_prompt = PromptTemplate(
input_variables=["query", "context", "doc"],
template="Given the query: '{query}' and user context: '{context}', rate the relevance of this document on a scale of 1-10:\nDocument: {doc}\nRelevance score:"
)
ranking_chain = ranking_prompt | self.llm.with_structured_output(relevant_score)
print("ranking docs")
ranked_docs = []
for doc in docs:
input_data = {"query": contextualized_query, "context": user_context or "No specific context provided", "doc": doc.page_content}
score = float(ranking_chain.invoke(input_data).score)
ranked_docs.append((doc, score))
# Sort by relevance score and return top k
ranked_docs.sort(key=lambda x: x[1], reverse=True)
return [doc for doc, _ in ranked_docs[:k]]
####Define the Adapive retriever class
class AdaptiveRetriever:
def __init__(self, texts: List[str]):
self.classifier = QueryClassifier()
self.strategies = {
"Factual": FactualRetrievalStrategy(texts),
"Analytical": AnalyticalRetrievalStrategy(texts),
"Opinion": OpinionRetrievalStrategy(texts),
"Contextual": ContextualRetrievalStrategy(texts)
}
def get_relevant_documents(self, query: str) -> List[Document]:
category = self.classifier.classify(query)
strategy = self.strategies[category]
return strategy.retrieve(query)
#######Define aditional retriever that inherits from langchain BaseRetriever
class PydanticAdaptiveRetriever(BaseRetriever):
adaptive_retriever: AdaptiveRetriever = Field(exclude=True)
class Config:
arbitrary_types_allowed = True
def get_relevant_documents(self, query: str) -> List[Document]:
return self.adaptive_retriever.get_relevant_documents(query)
async def aget_relevant_documents(self, query: str) -> List[Document]:
return self.get_relevant_documents(query)
#####Define the Adaptive RAG class
class AdaptiveRAG:
def __init__(self, texts: List[str]):
adaptive_retriever = AdaptiveRetriever(texts)
self.retriever = PydanticAdaptiveRetriever(adaptive_retriever=adaptive_retriever)
self.llm = ChatOpenAI(temperature=0, model_name="gpt-4o", max_tokens=4000)
# Create a custom prompt
prompt_template = """Use the following pieces of context to answer the question at the end.
If you don't know the answer, just say that you don't know, don't try to make up an answer.
{context}
Question: {question}
Answer:"""
prompt = PromptTemplate(template=prompt_template, input_variables=["context", "question"])
# Create the LLM chain
self.llm_chain = prompt | self.llm
def answer(self, query: str) -> str:
docs = self.retriever.get_relevant_documents(query)
input_data = {"context": "\n".join([doc.page_content for doc in docs]), "question": query}
return self.llm_chain.invoke(input_data)
####Demonstrate use of this model
# Usage
texts = [
"The Earth is the third planet from the Sun and the only astronomical object known to harbor life."
]
rag_system = AdaptiveRAG(texts)
#####Showcase the four different types of queries
factual_result = rag_system.answer("What is the distance between the Earth and the Sun?").content
print(f"Answer: {factual_result}")
analytical_result = rag_system.answer("How does the Earth's distance from the Sun affect its climate?").content
print(f"Answer: {analytical_result}")
opinion_result = rag_system.answer("What are the different theories about the origin of life on Earth?").content
print(f"Answer: {opinion_result}")
contextual_result = rag_system.answer("How does the Earth's position in the Solar System influence its habitability?").content
print(f"Answer: {contextual_result}")
Iterative Retrieval
Iterative retrieval analyzes initial results and generates follow-up queries to fill in gaps or clarify information, leading to more comprehensive retrieval, making it a vital component of RAG Techniques.
Ensemble Retrieval
Ensemble retrieval applies different embedding models or retrieval algorithms, using voting or weighting mechanisms to determine the final set of retrieved documents, an advanced strategy in RAG Techniques.
Graph RAG
Graph RAG retrieves entities and their relationships from a knowledge graph relevant to the query, combining unstructured text with more formalized knowledge sources, an innovative approach in RAG Techniques.
Multi-Modal Retrieval
Integrating models that can retrieve and understand different data modalities (such as text, images, and video), multi-modal retrieval offers a holistic approach to retrieving diverse types of content, a comprehensive strategy in RAG Techniques.
RAPTOR
RAPTOR uses abstractive summarization to recursively process and summarize retrieved documents, enabling a tree structure for hierarchical content, an advanced RAG Technique.
###Imports and Setup
import numpy as np
import pandas as pd
from typing import List, Dict, Any
from sklearn.mixture import GaussianMixture
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
from langchain.schema import AIMessage
import matplotlib.pyplot as plt
import logging
import os
import sys
from dotenv import load_dotenv
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..'))) # Add the parent directory to the path sicnce we work with notebooks
from helper_functions import *
from evaluation.evalute_rag import *
# Load environment variables from a .env file
load_dotenv()
# Set the OpenAI API key environment variable
os.environ["OPENAI_API_KEY"] = os.getenv('OPENAI_API_KEY')
###Define logging, llm and embeddings
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
embeddings = OpenAIEmbeddings()
llm = ChatOpenAI(model_name="gpt-4o-mini")
###Helper Functions
def extract_text(item):
"""Extract text content from either a string or an AIMessage object."""
if isinstance(item, AIMessage):
return item.content
return item
def embed_texts(texts: List[str]) -> List[List[float]]:
"""Embed texts using OpenAIEmbeddings."""
logging.info(f"Embedding {len(texts)} texts")
return embeddings.embed_documents([extract_text(text) for text in texts])
def perform_clustering(embeddings: np.ndarray, n_clusters: int = 10) -> np.ndarray:
"""Perform clustering on embeddings using Gaussian Mixture Model."""
logging.info(f"Performing clustering with {n_clusters} clusters")
gm = GaussianMixture(n_components=n_clusters, random_state=42)
return gm.fit_predict(embeddings)
def summarize_texts(texts: List[str]) -> str:
"""Summarize a list of texts using OpenAI."""
logging.info(f"Summarizing {len(texts)} texts")
prompt = ChatPromptTemplate.from_template(
"Summarize the following text concisely:\n\n{text}"
)
chain = prompt | llm
input_data = {"text": texts}
return chain.invoke(input_data)
def visualize_clusters(embeddings: np.ndarray, labels: np.ndarray, level: int):
"""Visualize clusters using PCA."""
from sklearn.decomposition import PCA
pca = PCA(n_components=2)
reduced_embeddings = pca.fit_transform(embeddings)
plt.figure(figsize=(10, 8))
scatter = plt.scatter(reduced_embeddings[:, 0], reduced_embeddings[:, 1], c=labels, cmap='viridis')
plt.colorbar(scatter)
plt.title(f'Cluster Visualization - Level {level}')
plt.xlabel('First Principal Component')
plt.ylabel('Second Principal Component')
plt.show()
####RAPTOR Core Function
def build_raptor_tree(texts: List[str], max_levels: int = 3) -> Dict[int, pd.DataFrame]:
"""Build the RAPTOR tree structure with level metadata and parent-child relationships."""
results = {}
current_texts = [extract_text(text) for text in texts]
current_metadata = [{"level": 0, "origin": "original", "parent_id": None} for _ in texts]
for level in range(1, max_levels + 1):
logging.info(f"Processing level {level}")
embeddings = embed_texts(current_texts)
n_clusters = min(10, len(current_texts) // 2)
cluster_labels = perform_clustering(np.array(embeddings), n_clusters)
df = pd.DataFrame({
'text': current_texts,
'embedding': embeddings,
'cluster': cluster_labels,
'metadata': current_metadata
})
results[level-1] = df
summaries = []
new_metadata = []
for cluster in df['cluster'].unique():
cluster_docs = df[df['cluster'] == cluster]
cluster_texts = cluster_docs['text'].tolist()
cluster_metadata = cluster_docs['metadata'].tolist()
summary = summarize_texts(cluster_texts)
summaries.append(summary)
new_metadata.append({
"level": level,
"origin": f"summary_of_cluster_{cluster}_level_{level-1}",
"child_ids": [meta.get('id') for meta in cluster_metadata],
"id": f"summary_{level}_{cluster}"
})
current_texts = summaries
current_metadata = new_metadata
if len(current_texts) <= 1:
results[level] = pd.DataFrame({
'text': current_texts,
'embedding': embed_texts(current_texts),
'cluster': [0],
'metadata': current_metadata
})
logging.info(f"Stopping at level {level} as we have only one summary")
break
return results
######Vectorstore Function
def build_vectorstore(tree_results: Dict[int, pd.DataFrame]) -> FAISS:
"""Build a FAISS vectorstore from all texts in the RAPTOR tree."""
all_texts = []
all_embeddings = []
all_metadatas = []
for level, df in tree_results.items():
all_texts.extend([str(text) for text in df['text'].tolist()])
all_embeddings.extend([embedding.tolist() if isinstance(embedding, np.ndarray) else embedding for embedding in df['embedding'].tolist()])
all_metadatas.extend(df['metadata'].tolist())
logging.info(f"Building vectorstore with {len(all_texts)} texts")
# Create Document objects manually to ensure correct types
documents = [Document(page_content=str(text), metadata=metadata)
for text, metadata in zip(all_texts, all_metadatas)]
return FAISS.from_documents(documents, embeddings)
#####Define tree traversal retrieval
def tree_traversal_retrieval(query: str, vectorstore: FAISS, k: int = 3) -> List[Document]:
"""Perform tree traversal retrieval."""
query_embedding = embeddings.embed_query(query)
def retrieve_level(level: int, parent_ids: List[str] = None) -> List[Document]:
if parent_ids:
docs = vectorstore.similarity_search_by_vector_with_relevance_scores(
query_embedding,
k=k,
filter=lambda meta: meta['level'] == level and meta['id'] in parent_ids
)
else:
docs = vectorstore.similarity_search_by_vector_with_relevance_scores(
query_embedding,
k=k,
filter=lambda meta: meta['level'] == level
)
if not docs or level == 0:
return docs
child_ids = [doc.metadata.get('child_ids', []) for doc, _ in docs]
child_ids = [item for sublist in child_ids for item in sublist] # Flatten the list
child_docs = retrieve_level(level - 1, child_ids)
return docs + child_docs
max_level = max(doc.metadata['level'] for doc in vectorstore.docstore.values())
return retrieve_level(max_level)
########Create Retriever
def create_retriever(vectorstore: FAISS) -> ContextualCompressionRetriever:
"""Create a retriever with contextual compression."""
logging.info("Creating contextual compression retriever")
base_retriever = vectorstore.as_retriever()
prompt = ChatPromptTemplate.from_template(
"Given the following context and question, extract only the relevant information for answering the question:\n\n"
"Context: {context}\n"
"Question: {question}\n\n"
"Relevant Information:"
)
extractor = LLMChainExtractor.from_llm(llm, prompt=prompt)
return ContextualCompressionRetriever(
base_compressor=extractor,
base_retriever=base_retriever
)
#######Define hierarchical retrieval
def hierarchical_retrieval(query: str, retriever: ContextualCompressionRetriever, max_level: int) -> List[Document]:
"""Perform hierarchical retrieval starting from the highest level, handling potential None values."""
all_retrieved_docs = []
for level in range(max_level, -1, -1):
# Retrieve documents from the current level
level_docs = retriever.get_relevant_documents(
query,
filter=lambda meta: meta['level'] == level
)
all_retrieved_docs.extend(level_docs)
# If we've found documents, retrieve their children from the next level down
if level_docs and level > 0:
child_ids = [doc.metadata.get('child_ids', []) for doc in level_docs]
child_ids = [item for sublist in child_ids for item in sublist if item is not None] # Flatten and filter None
if child_ids: # Only modify query if there are valid child IDs
child_query = f" AND id:({' OR '.join(str(id) for id in child_ids)})"
query += child_query
return all_retrieved_docs
#########RAPTOR Query Process (Online Process)
def raptor_query(query: str, retriever: ContextualCompressionRetriever, max_level: int) -> Dict[str, Any]:
"""Process a query using the RAPTOR system with hierarchical retrieval."""
logging.info(f"Processing query: {query}")
relevant_docs = hierarchical_retrieval(query, retriever, max_level)
doc_details = []
for i, doc in enumerate(relevant_docs, 1):
doc_details.append({
"index": i,
"content": doc.page_content,
"metadata": doc.metadata,
"level": doc.metadata.get('level', 'Unknown'),
"similarity_score": doc.metadata.get('score', 'N/A')
})
context = "\n\n".join([doc.page_content for doc in relevant_docs])
prompt = ChatPromptTemplate.from_template(
"Given the following context, please answer the question:\n\n"
"Context: {context}\n\n"
"Question: {question}\n\n"
"Answer:"
)
chain = LLMChain(llm=llm, prompt=prompt)
answer = chain.run(context=context, question=query)
logging.info("Query processing completed")
result = {
"query": query,
"retrieved_documents": doc_details,
"num_docs_retrieved": len(relevant_docs),
"context_used": context,
"answer": answer,
"model_used": llm.model_name,
}
return result
def print_query_details(result: Dict[str, Any]):
"""Print detailed information about the query process, including tree level metadata."""
print(f"Query: {result['query']}")
print(f"\nNumber of documents retrieved: {result['num_docs_retrieved']}")
print(f"\nRetrieved Documents:")
for doc in result['retrieved_documents']:
print(f" Document {doc['index']}:")
print(f" Content: {doc['content'][:100]}...") # Show first 100 characters
print(f" Similarity Score: {doc['similarity_score']}")
print(f" Tree Level: {doc['metadata'].get('level', 'Unknown')}")
print(f" Origin: {doc['metadata'].get('origin', 'Unknown')}")
if 'child_docs' in doc['metadata']:
print(f" Number of Child Documents: {len(doc['metadata']['child_docs'])}")
print()
print(f"\nContext used for answer generation:")
print(result['context_used'])
print(f"\nGenerated Answer:")
print(result['answer'])
print(f"\nModel Used: {result['model_used']}")
#######Example Usage and Visualization
#####Define data folder
path = "../data/Understanding_Climate_Change.pdf"
###Process texts
loader = PyPDFLoader(path)
documents = loader.load()
texts = [doc.page_content for doc in documents]
######Create RAPTOR components instances
# Build the RAPTOR tree
tree_results = build_raptor_tree(texts)
# Build vectorstore
vectorstore = build_vectorstore(tree_results)
# Create retriever
retriever = create_retriever(vectorstore)
#######Run a query and observe where it got the data from + results
# Run the pipeline
max_level = 3 # Adjust based on your tree depth
query = "What is the greenhouse effect?"
result = raptor_query(query, retriever, max_level)
print_query_details(result)
Self RAG
A multi-step process, Self RAG includes retrieval evaluation, document retrieval, relevance evaluation, response generation, and a final assessment to improve model responses, a crucial process in RAG Techniques.
####Import relevant libraries
import os
import sys
from dotenv import load_dotenv
from langchain.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.pydantic_v1 import BaseModel, Field
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..'))) # Add the parent directory to the path sicnce we work with notebooks
from helper_functions import *
from evaluation.evalute_rag import *
# Load environment variables from a .env file
load_dotenv()
# Set the OpenAI API key environment variable
os.environ["OPENAI_API_KEY"] = os.getenv('OPENAI_API_KEY')
##Define files path
path = "../data/Understanding_Climate_Change.pdf"
####Create a vector store
vectorstore = encode_pdf(path)
Initialize the language model
llm = ChatOpenAI(model="gpt-4o-mini", max_tokens=1000, temperature=0)
#####Defining prompt templates
class RetrievalResponse(BaseModel):
response: str = Field(..., title="Determines if retrieval is necessary", description="Output only 'Yes' or 'No'.")
retrieval_prompt = PromptTemplate(
input_variables=["query"],
template="Given the query '{query}', determine if retrieval is necessary. Output only 'Yes' or 'No'."
)
class RelevanceResponse(BaseModel):
response: str = Field(..., title="Determines if context is relevant", description="Output only 'Relevant' or 'Irrelevant'.")
relevance_prompt = PromptTemplate(
input_variables=["query", "context"],
template="Given the query '{query}' and the context '{context}', determine if the context is relevant. Output only 'Relevant' or 'Irrelevant'."
)
class GenerationResponse(BaseModel):
response: str = Field(..., title="Generated response", description="The generated response.")
generation_prompt = PromptTemplate(
input_variables=["query", "context"],
template="Given the query '{query}' and the context '{context}', generate a response."
)
class SupportResponse(BaseModel):
response: str = Field(..., title="Determines if response is supported", description="Output 'Fully supported', 'Partially supported', or 'No support'.")
support_prompt = PromptTemplate(
input_variables=["response", "context"],
template="Given the response '{response}' and the context '{context}', determine if the response is supported by the context. Output 'Fully supported', 'Partially supported', or 'No support'."
)
class UtilityResponse(BaseModel):
response: int = Field(..., title="Utility rating", description="Rate the utility of the response from 1 to 5.")
utility_prompt = PromptTemplate(
input_variables=["query", "response"],
template="Given the query '{query}' and the response '{response}', rate the utility of the response from 1 to 5."
)
# Create LLMChains for each step
retrieval_chain = retrieval_prompt | llm.with_structured_output(RetrievalResponse)
relevance_chain = relevance_prompt | llm.with_structured_output(RelevanceResponse)
generation_chain = generation_prompt | llm.with_structured_output(GenerationResponse)
support_chain = support_prompt | llm.with_structured_output(SupportResponse)
utility_chain = utility_prompt | llm.with_structured_output(UtilityResponse)
Defining the self RAG logic flow
def self_rag(query, vectorstore, top_k=3):
print(f"\nProcessing query: {query}")
# Step 1: Determine if retrieval is necessary
print("Step 1: Determining if retrieval is necessary...")
input_data = {"query": query}
retrieval_decision = retrieval_chain.invoke(input_data).response.strip().lower()
print(f"Retrieval decision: {retrieval_decision}")
if retrieval_decision == 'yes':
# Step 2: Retrieve relevant documents
print("Step 2: Retrieving relevant documents...")
docs = vectorstore.similarity_search(query, k=top_k)
contexts = [doc.page_content for doc in docs]
print(f"Retrieved {len(contexts)} documents")
# Step 3: Evaluate relevance of retrieved documents
print("Step 3: Evaluating relevance of retrieved documents...")
relevant_contexts = []
for i, context in enumerate(contexts):
input_data = {"query": query, "context": context}
relevance = relevance_chain.invoke(input_data).response.strip().lower()
print(f"Document {i+1} relevance: {relevance}")
if relevance == 'relevant':
relevant_contexts.append(context)
print(f"Number of relevant contexts: {len(relevant_contexts)}")
# If no relevant contexts found, generate without retrieval
if not relevant_contexts:
print("No relevant contexts found. Generating without retrieval...")
input_data = {"query": query, "context": "No relevant context found."}
return generation_chain.invoke(input_data).response
# Step 4: Generate response using relevant contexts
print("Step 4: Generating responses using relevant contexts...")
responses = []
for i, context in enumerate(relevant_contexts):
print(f"Generating response for context {i+1}...")
input_data = {"query": query, "context": context}
response = generation_chain.invoke(input_data).response
# Step 5: Assess support
print(f"Step 5: Assessing support for response {i+1}...")
input_data = {"response": response, "context": context}
support = support_chain.invoke(input_data).response.strip().lower()
print(f"Support assessment: {support}")
# Step 6: Evaluate utility
print(f"Step 6: Evaluating utility for response {i+1}...")
input_data = {"query": query, "response": response}
utility = int(utility_chain.invoke(input_data).response)
print(f"Utility score: {utility}")
responses.append((response, support, utility))
# Select the best response based on support and utility
print("Selecting the best response...")
best_response = max(responses, key=lambda x: (x[1] == 'fully supported', x[2]))
print(f"Best response support: {best_response[1]}, utility: {best_response[2]}")
return best_response[0]
else:
# Generate without retrieval
print("Generating without retrieval...")
input_data = {"query": query, "context": "No retrieval necessary."}
return generation_chain.invoke(input_data).response
####Test the self-RAG function easy query with high relevance
query = "What is the impact of climate change on the environment?"
response = self_rag(query, vectorstore)
print("\nFinal response:")
print(response)
####Test the self-RAG function with a more challenging query with low relevance
query = "how did harry beat quirrell?"
response = self_rag(query, vectorstore)
print("\nFinal response:")
print(response)
Corrective RAG
Corrective RAG dynamically evaluates and corrects the retrieval process, amending the workflow to improve the relevance and accuracy of generated responses, an essential refinement in RAG Techniques.
Incorporating these RAG Techniques into your AI and NLP projects can significantly improve the performance of generative models. Whether you’re looking to enhance simple information retrieval or implement advanced multi-modal systems, these strategies provide a robust toolkit for developing cutting-edge AI applications.
For more in-depth exploration of RAG Techniques, you can refer to these external resources from collabnix done by Adesoji Alu on google colab: