# Basic RAG pipeline with LlamaIndex

In [1]:
# requirements for this example:
%pip install -qq docling docling-core python-dotenv llama-index-embeddings-huggingface llama-index-llms-huggingface-api llama-index-vector-stores-milvus jsonpath-ng

Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
from tempfile import TemporaryDirectory

from dotenv import load_dotenv
from pydantic import TypeAdapter
from rich.console import Console
from rich.pretty import pprint

console = Console()


load_dotenv()

True

In [3]:
import warnings

warnings.filterwarnings(action="ignore", category=UserWarning, module="pydantic|torch")
warnings.filterwarnings(action="ignore", category=FutureWarning, module="easyocr")

## LlamaIndex extensions

Below we define our framework extensions:
- `DoclingPDFReader` which will be used to create LlamaIndex documents, and
- `HierarchicalJSONNodeParser`, which will be used to create LlamaIndex nodes out of the documents

```mermaid
flowchart LR
    subgraph LI extensions
        direction LR
        Reader -->|LI Documents| Parser
    end
    Reader(DoclingPDFReader)
    Parser(HierarchicalJSONNodeParser)
    Encoder(EmbedModel)
    Store(VectorStore)
    Parser  -->|LI Nodes| Encoder -->|LI Nodes| Store
```

In [4]:
from enum import Enum
from typing import Iterable

from llama_index.core.readers.base import BasePydanticReader
from llama_index.core.schema import Document as LIDocument
from pydantic import BaseModel

from docling.document_converter import DocumentConverter


class DocumentMetadata(BaseModel):
    dl_doc_hash: str


class DoclingPDFReader(BasePydanticReader):
    class ParseType(str, Enum):
        MARKDOWN = "markdown"
        JSON = "json"

    parse_type: ParseType = ParseType.MARKDOWN

    def lazy_load_data(self, file_path: str | list[str]) -> Iterable[LIDocument]:
        file_paths = file_path if isinstance(file_path, list) else [file_path]
        converter = DocumentConverter()
        for source in file_paths:
            dl_doc = converter.convert_single(source).output
            match self.parse_type:
                case self.ParseType.MARKDOWN:
                    text = dl_doc.export_to_markdown()
                case self.ParseType.JSON:
                    text = dl_doc.model_dump_json()
                case _:
                    raise RuntimeError(
                        f"Unexpected parse type encountered: {self.parse_type}"
                    )
            excl_metadata_keys = ["dl_doc_hash"]
            li_doc = LIDocument(
                doc_id=dl_doc.file_info.document_hash,
                text=text,
                excluded_embed_metadata_keys=excl_metadata_keys,
                excluded_llm_metadata_keys=excl_metadata_keys,
            )
            li_doc.metadata = DocumentMetadata(
                dl_doc_hash=dl_doc.file_info.document_hash,
            ).model_dump()
            yield li_doc

In [5]:
from typing import Any, Iterable, Sequence

from docling_core.transforms.chunker import HierarchicalChunker
from docling_core.types import Document as DLDocument
from llama_index.core import Document as LIDocument
from llama_index.core.node_parser.interface import NodeParser
from llama_index.core.schema import (
    BaseNode,
    NodeRelationship,
    RelatedNodeType,
    TextNode,
)
from llama_index.core.utils import get_tqdm_iterable


class NodeMetadata(BaseModel):
    path: str


class HierarchicalJSONNodeParser(NodeParser):

    include_metadata: bool = False

    def _parse_nodes(
        self,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
        **kwargs: Any,
    ) -> list[BaseNode]:
        nodes_with_progress: Iterable[BaseNode] = get_tqdm_iterable(
            items=nodes, show_progress=show_progress, desc="Parsing nodes"
        )
        all_nodes: list[BaseNode] = []
        chunker = HierarchicalChunker()
        for input_node in nodes_with_progress:
            li_doc = LIDocument.model_validate(input_node)
            dl_doc: DLDocument = DLDocument.model_validate_json(li_doc.get_content())
            chunk_iter = chunker.chunk(dl_doc=dl_doc)
            for chunk in chunk_iter:
                rels: dict[NodeRelationship, RelatedNodeType] = {
                    NodeRelationship.SOURCE: li_doc.as_related_node_info(),
                }
                excl_metadata_keys = ["path"]
                node = TextNode(
                    text=chunk.text,
                    excluded_embed_metadata_keys=excl_metadata_keys,
                    excluded_llm_metadata_keys=excl_metadata_keys,
                    relationships=rels,
                )
                node.metadata = NodeMetadata(
                    path=chunk.path,
                ).model_dump()
                all_nodes.append(node)
        return all_nodes

## Reader and node parser

### Using JSON

To leverage Docling's rich document structure format, we namely set the parse type to JSON and use a HierarchicalJSONNodeParser accordingly:

In [6]:
reader = DoclingPDFReader(parse_type=DoclingPDFReader.ParseType.JSON)
node_parser = HierarchicalJSONNodeParser()

### Using Markdown

Alternatively, to just use the flat Markdown export instead of the native document format, one can uncomment and use the following:

In [7]:
# from llama_index.core.node_parser import MarkdownNodeParser

# reader = DoclingPDFReader(parse_type=DoclingPDFReader.ParseType.MARKDOWN)
# node_parser = MarkdownNodeParser()
# transformations = [node_parser]

## Transformations

Our transformations currently include the `node_parser`:

In [8]:
transformations = [node_parser]

One can include add more transformations, e.g. further chunking based on text size / overlap, as shown below:

In [9]:
# from llama_index.core.node_parser import TokenTextSplitter

# splitter = TokenTextSplitter(
#     chunk_size=1024,
#     chunk_overlap=20,
# )
# transformations.append(splitter)

## Embed model

In [10]:
from llama_index.embeddings.huggingface import HuggingFaceEmbedding

embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")

## Vector store

In [11]:
INGEST = True  # whether to ingest from scratch or reuse an existing vector store

In [12]:
from llama_index.vector_stores.milvus import MilvusVectorStore

MILVUS_URL = os.environ.get(
    "MILVUS_URL", f"{(tmp_dir := TemporaryDirectory()).name}/milvus_demo.db"
)
MILVUS_COLL_NAME = os.environ.get("MILVUS_COLL_NAME", "basic_llamaindex_pipeline")
MILVUS_KWARGS = TypeAdapter(dict).validate_json(os.environ.get("MILVUS_KWARGS", "{}"))
vector_store = MilvusVectorStore(
    uri=MILVUS_URL,
    collection_name=MILVUS_COLL_NAME,
    dim=len(embed_model.get_text_embedding("hi")),
    overwrite=INGEST,
    **MILVUS_KWARGS,
)

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


In [13]:
from llama_index.core import StorageContext, VectorStoreIndex
from llama_index.core.ingestion import IngestionPipeline

if INGEST:
    # in this case we ingest the data into the vector store
    docs = reader.load_data(
        file_path="https://arxiv.org/pdf/2206.01062",  # DocLayNet paper
    )
    console.rule(f"example `Document`:")
    pprint(docs, max_length=1, max_string=50, max_depth=4)
    pipeline = IngestionPipeline(
        transformations=transformations,
        vector_store=vector_store,
    )
    nodes = pipeline.run(documents=docs)
    print(f"num of Nodes (chunks): {len(nodes)}")
    console.rule(f"example Node (chunk):")
    pprint(
        nodes[6],
        max_length=2,
        max_string=105,
        max_depth=3,
    )
    index = VectorStoreIndex(nodes=nodes, embed_model=embed_model)

else:
    # in this case we just load the vector store index
    index = VectorStoreIndex.from_vector_store(
        vector_store=vector_store,
        embed_model=embed_model,
    )

Fetching 7 files:   0%|          | 0/7 [00:00<?, ?it/s]

num of Nodes (chunks): 76


## LLM

In [14]:
from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI

HF_API_KEY = os.environ.get("HF_API_KEY")

llm = HuggingFaceInferenceAPI(
    token=HF_API_KEY,
    model_name="mistralai/Mistral-7B-Instruct-v0.3",
)

## RAG

In [15]:
from llama_index.core import PromptTemplate

TEXT_QA_TEMPLATE_STR = "Context information is below.\n---------------------\n{context_str}\n---------------------\nGiven the context information and not prior knowledge, answer the query.\nQuery: {query_str}\nAnswer:\n"

query_engine = index.as_query_engine(
    llm=llm,
    text_qa_template=PromptTemplate(TEXT_QA_TEMPLATE_STR),
)
query_res = query_engine.query("How many pages were human annotated?")
pprint(query_res, max_length=1, max_string=250, max_depth=5)

## Fetching Docling native layout info


In [16]:
import jsonpath_ng
from docling_core.types import BaseText, Document, Ref

NativeNode = BaseText | Ref


def get_native_node(dl_doc: Document, path: str) -> NativeNode:
    jsonpath_expr = jsonpath_ng.parse(path)
    jsonpath_res = [match.value for match in jsonpath_expr.find(dl_doc.model_dump())]
    if (num_res := len(jsonpath_res)) == 0:
        raise RuntimeError(f"No results found for {path}")
    elif num_res > 1:
        # currently only single result supported
        raise RuntimeError(f"Multiple results found for {path}")
    jres = jsonpath_res[0]
    return TypeAdapter(NativeNode).validate_python(jres)

In [17]:
from docling_core.types import Document as DLDocument

dl_docs = [DLDocument.model_validate_json(doc.text) for doc in docs]
for retr_item in query_res.source_nodes:
    source_metadata = DocumentMetadata.model_validate(
        retr_item.node.source_node.metadata
    )
    dl_doc_hash = source_metadata.dl_doc_hash
    dl_doc = [d for d in dl_docs if d.file_info.document_hash == dl_doc_hash][0]
    path = NodeMetadata.model_validate(retr_item.node.metadata).path
    native_node = get_native_node(dl_doc=dl_doc, path=path)
    console.rule(f"dl_doc_hash={dl_doc_hash[:7]}...\n{path=}")
    pprint(native_node)