# Advanced Chunking

In [1]:
%pip install -qU docling docling-core sentence-transformers transformers semchunk lancedb pydantic

Note: you may need to restart the kernel to use updated packages.


## Setup

In [2]:
from dataclasses import dataclass
from pathlib import Path
from tempfile import mkdtemp
from typing import Any, Iterator, Optional, Union

import lancedb
import semchunk
from docling_core.transforms.chunker import (
    BaseChunk,
    BaseChunker,
    DocMeta,
    HierarchicalChunker,
)
from docling_core.transforms.chunker.hierarchical_chunker import DocChunk
from docling_core.types import DoclingDocument
from pydantic import ConfigDict, PositiveInt
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer

from docling.document_converter import DocumentConverter

In [3]:
DOC_SOURCE = "http://bill.murdocks.org/iccbr2011murdock_web.pdf"
EMBED_MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2"
TOKENIZER = AutoTokenizer.from_pretrained(EMBED_MODEL_ID)
EMBED_MODEL = SentenceTransformer(EMBED_MODEL_ID)
MAX_TOKENS = 64

## Chunker Definition

In [4]:
class HybridChunker(BaseChunker):

    model_config: ConfigDict = ConfigDict(arbitrary_types_allowed=True)

    inner_chunker: BaseChunker = HierarchicalChunker()
    # TODO: improve typing for tokenizer below (ran into issues with `PreTrainedTokenizer`):
    tokenizer: Any
    max_tokens: PositiveInt

    def _count_tokens(self, text: Optional[Union[str, list[str]]]):
        if text is None:
            return 0
        elif isinstance(text, list):
            total = 0
            for t in text:
                total += self._count_tokens(t)
            return total
        return len(self.tokenizer.tokenize(text, max_length=None))

    def _make_splitter(self):
        return semchunk.chunkerify(self.tokenizer, self.max_tokens)

    @dataclass
    class _ChunkLengthInfo:
        total_len: int
        text_len: int
        other_len: int

    def _doc_chunk_length(self, doc_chunk: DocChunk):
        text_length = self._count_tokens(doc_chunk.text)
        # Note that count_tokens handles None and lists, making this code simpler:
        headings_length = self._count_tokens(doc_chunk.meta.headings)
        captions_length = self._count_tokens(doc_chunk.meta.captions)
        total = text_length + headings_length + captions_length
        return self._ChunkLengthInfo(
            total_len=total,
            text_len=text_length,
            other_len=total - text_length,
        )

    @classmethod
    def _make_chunk_from_doc_items(
        cls, doc_chunk: DocChunk, window_text: str, window_start: int, window_end: int
    ):
        meta = DocMeta(
            doc_items=doc_chunk.meta.doc_items[window_start : window_end + 1],
            headings=doc_chunk.meta.headings,
            captions=doc_chunk.meta.captions,
        )
        new_chunk = DocChunk(text=window_text, meta=meta)
        return new_chunk

    @classmethod
    def _merge_text(cls, t1, t2):
        if t1 == "":
            return t2
        elif t2 == "":
            return t1
        else:
            return t1 + "\n" + t2

    def _split_by_doc_items(self, doc_chunk: DocChunk) -> list[DocChunk]:
        if doc_chunk.meta.doc_items == None or len(doc_chunk.meta.doc_items) <= 1:
            return [doc_chunk]
        length = self._doc_chunk_length(doc_chunk)
        if length.total_len <= self.max_tokens:
            return [doc_chunk]
        else:
            chunks = []
            window_start = 0
            window_end = 0
            window_text = ""
            window_text_length = 0
            other_length = length.other_len
            l = len(doc_chunk.meta.doc_items)
            while window_end < l:
                doc_item = doc_chunk.meta.doc_items[window_end]
                text = doc_item.text
                text_length = self._count_tokens(text)
                if (
                    text_length + window_text_length + other_length < self.max_tokens
                    and window_end < l - 1
                ):
                    # Still room left to add more to this chunk AND still at least one item left
                    window_end += 1
                    window_text_length += text_length
                    window_text = self._merge_text(window_text, text)
                elif text_length + window_text_length + other_length < self.max_tokens:
                    # All the items in the window fit into the chunk and there are no other items left
                    window_text = self._merge_text(window_text, text)
                    new_chunk = self._make_chunk_from_doc_items(
                        doc_chunk, window_text, window_start, window_end
                    )
                    chunks.append(new_chunk)
                    window_end = l
                elif window_start == window_end:
                    # Only one item in the window and it doesn't fit into the chunk.  So we'll just make it a chunk for now and it will get split in the plain text splitter.
                    window_text = self._merge_text(window_text, text)
                    new_chunk = self._make_chunk_from_doc_items(
                        doc_chunk, window_text, window_start, window_end
                    )
                    chunks.append(new_chunk)
                    window_start = window_end + 1
                    window_end = window_start
                    window_text = ""
                    window_text_length = 0
                else:
                    # Multiple items in the window but they don't fit into the chunk.  However, the existing items must have fit or we wouldn't have gotten here.
                    # So we put everything but the last item into the chunk and then start a new window INCLUDING the current window end.
                    new_chunk = self._make_chunk_from_doc_items(
                        doc_chunk, window_text, window_start, window_end - 1
                    )
                    chunks.append(new_chunk)
                    window_start = window_end
                    window_text = ""
                    window_text_length = 0
            return chunks

    def _split_using_plain_text(
        self,
        doc_chunk: DocChunk,
        plain_text_splitter,
    ):
        lengths = self._doc_chunk_length(doc_chunk)
        if lengths.total_len <= self.max_tokens:
            return [doc_chunk]
        else:
            # How much room is there for text after subtracting out the headers and captions:
            available_length = self.max_tokens - lengths.other_len
            if available_length <= 0:
                raise ValueError(
                    "Headers and captions for this chunk are longer than the total amount of size for the chunk.  This is not supported now."
                )
            text = doc_chunk.text
            segments = plain_text_splitter.chunk(text)
            chunks = [DocChunk(text=s, meta=doc_chunk.meta) for s in segments]
            return chunks

    def _merge_chunks_with_matching_metadata(self, chunks: list[DocChunk]):
        output_chunks = []
        window_start = 0
        window_end = 0
        l = len(chunks)
        while window_end < l:
            chunk = chunks[window_end]
            lengths = self._doc_chunk_length(chunk)
            headings_and_captions = (chunk.meta.headings, chunk.meta.captions)
            if window_start == window_end:
                # starting a new block of chunks to potentially merge
                current_headings_and_captions = headings_and_captions
                window_text = chunk.text
                window_other_length = lengths.other_len
                window_text_length = lengths.text_len
                window_items = chunk.meta.doc_items
                window_end += 1
                first_chunk_of_window = chunk
            elif (
                headings_and_captions == current_headings_and_captions
                and window_text_length + window_other_length + lengths.text_len
                <= self.max_tokens
            ):
                # there is room to include the new chunk so add it to the window and continue
                window_text = self._merge_text(window_text, chunk.text)
                window_text_length += lengths.text_len
                window_items = window_items + chunk.meta.doc_items
                window_end += 1
            else:
                # no more room OR the start of new metadata.  Either way, end the block and use the current window_end as the start of a new block
                if window_start + 1 == window_end:
                    # just one chunk so use it as is
                    output_chunks.append(first_chunk_of_window)
                else:
                    new_meta = DocMeta(
                        doc_items=window_items,
                        headings=headings_and_captions[0],
                        captions=headings_and_captions[1],
                    )
                    new_chunk = DocChunk(text=window_text, meta=new_meta)
                    output_chunks.append(new_chunk)
                window_start = window_end  # no need to reset window_text, etc. because that will be reset in the next iteration in the if window_start == window_end block

        return output_chunks

    def _merge_chunks_with_mismatching_metadata(self, chunks, *_):
        # placeholder, for now we're not merging across text with different headings+captions
        # in principal it seems like a good idea for cases where you can merge entire sections
        # but it is not clear what you do about the metadata then because some of it apples to
        return chunks

    def _merge_chunks(self, chunks: list[DocChunk]) -> list[DocChunk]:
        # merges as many chunks as possible that have the same headings+captions.
        initial_merged_chunks = self._merge_chunks_with_matching_metadata(chunks)
        # merges chunks with different headings+captions.  This is later so that merges within a section or other grouping are preferred.
        final_merged_chunks = self._merge_chunks_with_mismatching_metadata(
            initial_merged_chunks
        )
        return final_merged_chunks

    @classmethod
    def _make_text_for_embedding(cls, chunk: DocChunk):
        output = ""
        if chunk.meta.headings != None:
            for h in chunk.meta.headings:
                output += h + "\n"
        if chunk.meta.captions != None:
            for c in chunk.meta.captions:
                output += c + "\n"
        output += chunk.text
        return output

    def _adjust_chunks_for_fixed_size(self, chunks: list[DocChunk], splitter):
        split_by_items = [x for c in chunks for x in self._split_by_doc_items(c)]
        split_recursively = [
            x for c in split_by_items for x in self._split_using_plain_text(c, splitter)
        ]
        merged = self._merge_chunks(split_recursively)
        text_expanded = [
            DocChunk.model_validate(
                {**c.model_dump(), "text": self._make_text_for_embedding(c)}
            )
            for c in merged
        ]
        return text_expanded

    def chunk(self, dl_doc: DoclingDocument, **kwargs) -> Iterator[BaseChunk]:
        preliminary_chunks = self.inner_chunker.chunk(dl_doc=dl_doc, **kwargs)
        splitter = self._make_splitter()
        output_chunks = self._adjust_chunks_for_fixed_size(preliminary_chunks, splitter)
        return iter(output_chunks)

## Usage

In [5]:
conv_res = DocumentConverter().convert(source=DOC_SOURCE)
doc = conv_res.document

chunker = HybridChunker(
    tokenizer=TOKENIZER,
    max_tokens=MAX_TOKENS,
)
chunks = list(chunker.chunk(dl_doc=doc))

for chunk in chunks[:5]:
    print(chunk.text)
    print(chunker._count_tokens(chunk.text))

Using CPU. Note: This module is much faster with a GPU.


J. William Murdock
murdockj@us.ibm.com IBM T.J. Watson Research Center P.O. Box 704 Yorktown Heights, NY 10598
39
J. William Murdock
Abstract. The Jeopardy! television quiz show asks natural-language questions and requires natural-language answers. One useful source of information for answering Jeopardy! questions is text from written sources such as encyclopedias or news articles. A text passage may partially or fully indicate that some candidate answer is the correct answer to the question. Recognizing
70
J. William Murdock
whether it does requires determining the extent to which what the passage is saying about the candidate answer is similar to what the question is saying about the desired answer. This paper describes how structure mapping [1] (an algorithm originally developed for analogical reasoning) is applied to determine similarity between content in questions and passages. That algorithm
70
J. William Murdock
is one of many used in the Watson question answering system [2]. I

## Vector Retrieval

In [6]:
def make_lancedb_index(db_uri, index_name, chunks: list[DocChunk], embedding_model):
    db = lancedb.connect(db_uri)
    data = []
    for chunk in chunks:
        embeddings = embedding_model.encode(chunk.text)
        data_item = {
            "vector": embeddings,
            "text": chunk.text,
            "headings": chunk.meta.headings,
            "captions": chunk.meta.captions,
        }
        data.append(data_item)
    tbl = db.create_table(index_name, data=data, exist_ok=True)
    return tbl


db_uri = str(Path(mkdtemp()) / "docling.db")  # or set as needed
index = make_lancedb_index(db_uri, doc.name, chunks, EMBED_MODEL)

sample_query = "Making SME greedy and pragmatic"
sample_embedding = EMBED_MODEL.encode(sample_query)
results = index.search(sample_embedding).limit(5)

results.to_pandas()

Unnamed: 0,vector,text,headings,captions,_distance
0,"[-0.025746439, 0.03888134, 0.0033668755, -0.03...","References\n3. Forbus, K. and Oblinger, D. (19...",[References],,0.332435
1,"[0.034203544, 0.10181023, 0.003722408, 0.00506...",5 Evaluation and Conclusions\nconsider to be a...,[5 Evaluation and Conclusions],,1.469304
2,"[0.04400234, -0.034766007, -0.00025527124, 0.0...","References\n4. McCord, M. C. (1990). Slot Gram...",[References],,1.525625
3,"[0.112926826, -0.010892201, 0.007714559, -0.06...","3 Syntactic-Semantic Graphs\nplay , about , Ut...",[3 Syntactic-Semantic Graphs],,1.54055
4,"[0.025994677, 0.08402823, 0.03268827, -0.03727...","4 Algorithm\nIn using this algorithm, we have ...",[4 Algorithm],,1.576838
