Add DoclingParseV3 backend implementation

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>
This commit is contained in:
Christoph Auer 2025-02-06 20:29:44 +01:00
parent ed74fe2ec0
commit 3f0e98b1ad
3 changed files with 288 additions and 8 deletions

View File

@ -0,0 +1,198 @@
import logging
import random
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING, Iterable, List, Optional, Union
import pypdfium2 as pdfium
from docling_core.types.doc import BoundingBox, CoordOrigin
from docling_parse.document import PageBoundaryType, ParsedPdfPage
from docling_parse.pdf_parser import DoclingPdfParser, PdfDocument
from docling_parse.pdf_parsers import pdf_parser_v2
from PIL import Image, ImageDraw
from pypdfium2 import PdfPage
from docling.backend.pdf_backend import PdfDocumentBackend, PdfPageBackend
from docling.datamodel.base_models import Cell, Size
if TYPE_CHECKING:
from docling.datamodel.document import InputDocument
_log = logging.getLogger(__name__)
class DoclingParseV3PageBackend(PdfPageBackend):
def __init__(self, parsed_page: ParsedPdfPage, page_obj: PdfPage):
self._ppage = page_obj
self._dpage = parsed_page
self.valid = parsed_page is not None
def is_valid(self) -> bool:
return self.valid
def get_text_in_rect(self, bbox: BoundingBox) -> str:
# Find intersecting cells on the page
text_piece = ""
page_size = self.get_size()
scale = (
1 # FIX - Replace with param in get_text_in_rect across backends (optional)
)
for i, cell in enumerate(self._dpage.sanitized.cells):
cell_bbox = (
cell.rect.to_bounding_box()
.to_top_left_origin(page_height=page_size.height)
.scaled(scale)
)
overlap_frac = cell_bbox.intersection_area_with(bbox) / cell_bbox.area()
if overlap_frac > 0.5:
if len(text_piece) > 0:
text_piece += " "
text_piece += cell.text
return text_piece
def get_text_cells(self) -> Iterable[Cell]:
cells: List[Cell] = []
cell_counter = 0
page_size = self.get_size()
for i, cell in enumerate(self._dpage.sanitized.cells):
cell_bbox = cell.rect.to_bounding_box()
if cell_bbox.r < cell_bbox.l:
cell_bbox.r, cell_bbox.l = cell_bbox.l, cell_bbox.r
if cell_bbox.b > cell_bbox.t:
cell_bbox.b, cell_bbox.t = cell_bbox.t, cell_bbox.b
text_piece = cell.text
cells.append(
Cell(
id=cell_counter,
text=text_piece,
bbox=cell_bbox.to_top_left_origin(page_size.height),
)
)
cell_counter += 1
def draw_clusters_and_cells():
image = (
self.get_page_image()
) # make new image to avoid drawing on the saved ones
draw = ImageDraw.Draw(image)
for c in cells:
x0, y0, x1, y1 = c.bbox.as_tuple()
cell_color = (
random.randint(30, 140),
random.randint(30, 140),
random.randint(30, 140),
)
draw.rectangle([(x0, y0), (x1, y1)], outline=cell_color)
image.show()
# draw_clusters_and_cells()
return cells
def get_bitmap_rects(self, scale: float = 1) -> Iterable[BoundingBox]:
AREA_THRESHOLD = 0 # 32 * 32
images = self._dpage.sanitized.bitmap_resources
for img in images:
cropbox = img.rect.to_bounding_box().to_top_left_origin(
self.get_size().height
)
if cropbox.area() > AREA_THRESHOLD:
cropbox = cropbox.scaled(scale=scale)
yield cropbox
def get_page_image(
self, scale: float = 1, cropbox: Optional[BoundingBox] = None
) -> Image.Image:
page_size = self.get_size()
if not cropbox:
cropbox = BoundingBox(
l=0,
r=page_size.width,
t=0,
b=page_size.height,
coord_origin=CoordOrigin.TOPLEFT,
)
padbox = BoundingBox(
l=0, r=0, t=0, b=0, coord_origin=CoordOrigin.BOTTOMLEFT
)
else:
padbox = cropbox.to_bottom_left_origin(page_size.height).model_copy()
padbox.r = page_size.width - padbox.r
padbox.t = page_size.height - padbox.t
image = (
self._ppage.render(
scale=scale * 1.5,
rotation=0, # no additional rotation
crop=padbox.as_tuple(),
)
.to_pil()
.resize(size=(round(cropbox.width * scale), round(cropbox.height * scale)))
) # We resize the image from 1.5x the given scale to make it sharper.
return image
def get_size(self) -> Size:
return Size(
width=self._dpage.sanitized.dimension.width,
height=self._dpage.sanitized.dimension.height,
)
def unload(self):
self._ppage = None
self._dpage = None
class DoclingParseV3DocumentBackend(PdfDocumentBackend):
def __init__(self, in_doc: "InputDocument", path_or_stream: Union[BytesIO, Path]):
super().__init__(in_doc, path_or_stream)
self._pdoc = pdfium.PdfDocument(self.path_or_stream)
self.parser = DoclingPdfParser(loglevel="fatal")
self.dp_doc: PdfDocument = self.parser.load(path_or_stream=path_or_stream)
success = self.dp_doc is not None
if not success:
raise RuntimeError(
f"docling-parse v2 could not load document {self.document_hash}."
)
def page_count(self) -> int:
# return len(self._pdoc) # To be replaced with docling-parse API
len_1 = len(self._pdoc)
len_2 = self.dp_doc.number_of_pages()
if len_1 != len_2:
_log.error(f"Inconsistent number of pages: {len_1}!={len_2}")
return len_2
def load_page(self, page_no: int) -> DoclingParseV3PageBackend:
return DoclingParseV3PageBackend(
self.dp_doc.get_page(page_no + 1), self._pdoc[page_no]
)
def is_valid(self) -> bool:
return self.page_count() > 0
def unload(self):
super().unload()
self.dp_doc.unload()
self._pdoc.close()
self._pdoc = None

View File

@ -6,10 +6,11 @@ from typing import Iterable
import yaml import yaml
from docling.datamodel.base_models import ConversionStatus from docling.backend.docling_parse_v3_backend import DoclingParseV3DocumentBackend
from docling.datamodel.base_models import ConversionStatus, InputFormat
from docling.datamodel.document import ConversionResult from docling.datamodel.document import ConversionResult
from docling.datamodel.settings import settings from docling.datamodel.settings import settings
from docling.document_converter import DocumentConverter from docling.document_converter import DocumentConverter, PdfFormatOption
_log = logging.getLogger(__name__) _log = logging.getLogger(__name__)
@ -103,10 +104,11 @@ def main():
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
input_doc_paths = [ input_doc_paths = [
Path("./tests/data/2206.01062.pdf"), Path("tests/data/redp5110_sampled.pdf"),
Path("./tests/data/2203.01017v2.pdf"), # Path("./tests/data/2206.01062.pdf"),
Path("./tests/data/2305.03393v1.pdf"), # Path("./tests/data/2203.01017v2.pdf"),
Path("./tests/data/redp5110_sampled.pdf"), # Path("./tests/data/2305.03393v1.pdf"),
# Path("./tests/data/redp5110_sampled.pdf"),
] ]
# buf = BytesIO(Path("./test/data/2206.01062.pdf").open("rb").read()) # buf = BytesIO(Path("./test/data/2206.01062.pdf").open("rb").read())
@ -119,13 +121,17 @@ def main():
# settings.debug.visualize_tables = True # settings.debug.visualize_tables = True
# settings.debug.visualize_cells = True # settings.debug.visualize_cells = True
doc_converter = DocumentConverter() doc_converter = DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(backend=DoclingParseV3DocumentBackend)
}
)
start_time = time.time() start_time = time.time()
conv_results = doc_converter.convert_all( conv_results = doc_converter.convert_all(
input_doc_paths, input_doc_paths,
raises_on_error=False, # to let conversion run through all and examine results at the end raises_on_error=True, # to let conversion run through all and examine results at the end
) )
success_count, partial_success_count, failure_count = export_documents( success_count, partial_success_count, failure_count = export_documents(
conv_results, output_dir=Path("scratch") conv_results, output_dir=Path("scratch")

View File

@ -0,0 +1,76 @@
from pathlib import Path
import pytest
from docling.backend.docling_parse_v3_backend import (
DoclingParseV3DocumentBackend,
DoclingParseV3PageBackend,
)
from docling.datamodel.base_models import BoundingBox, InputFormat
from docling.datamodel.document import InputDocument
@pytest.fixture
def test_doc_path():
return Path("./tests/data/2206.01062.pdf")
def _get_backend(pdf_doc):
in_doc = InputDocument(
path_or_stream=pdf_doc,
format=InputFormat.PDF,
backend=DoclingParseV3DocumentBackend,
)
doc_backend = in_doc._backend
return doc_backend
def test_text_cell_counts():
pdf_doc = Path("./tests/data/redp5110_sampled.pdf")
doc_backend = _get_backend(pdf_doc)
for page_index in range(0, doc_backend.page_count()):
last_cell_count = None
for i in range(10):
page_backend: DoclingParseV3PageBackend = doc_backend.load_page(0)
cells = list(page_backend.get_text_cells())
if last_cell_count is None:
last_cell_count = len(cells)
if len(cells) != last_cell_count:
assert (
False
), "Loading page multiple times yielded non-identical text cell counts"
last_cell_count = len(cells)
def test_get_text_from_rect(test_doc_path):
doc_backend = _get_backend(test_doc_path)
page_backend: DoclingParseV3PageBackend = doc_backend.load_page(0)
# Get the title text of the DocLayNet paper
textpiece = page_backend.get_text_in_rect(
bbox=BoundingBox(l=102, t=77, r=511, b=124)
)
ref = "DocLayNet: A Large Human-Annotated Dataset for Document-Layout Analysis"
assert textpiece.strip() == ref
def test_crop_page_image(test_doc_path):
doc_backend = _get_backend(test_doc_path)
page_backend: DoclingParseV3PageBackend = doc_backend.load_page(0)
# Crop out "Figure 1" from the DocLayNet paper
im = page_backend.get_page_image(
scale=2, cropbox=BoundingBox(l=317, t=246, r=574, b=527)
)
# im.show()
def test_num_pages(test_doc_path):
doc_backend = _get_backend(test_doc_path)
doc_backend.page_count() == 9