diff --git a/docling/pipeline/base_pipeline.py b/docling/pipeline/base_pipeline.py new file mode 100644 index 00000000..7cc90329 --- /dev/null +++ b/docling/pipeline/base_pipeline.py @@ -0,0 +1,170 @@ +import functools +import logging +import time +import traceback +from abc import ABC, abstractmethod +from typing import Callable, Iterable, List + +from docling.backend.abstract_backend import AbstractDocumentBackend +from docling.backend.pdf_backend import PdfDocumentBackend +from docling.datamodel.base_models import ( + ConversionStatus, + DoclingComponentType, + ErrorItem, + Page, +) +from docling.datamodel.document import ConversionResult, InputDocument +from docling.datamodel.pipeline_options import PipelineOptions +from docling.datamodel.settings import settings +from docling.utils.utils import chunkify + +_log = logging.getLogger(__name__) + + +class AbstractPipeline(ABC): + def __init__(self, pipeline_options: PipelineOptions): + self.pipeline_options = pipeline_options + self.build_pipe: List[Callable] = [] + self.enrichment_pipe: List[Callable] = [] + + def execute(self, in_doc: InputDocument) -> ConversionResult: + conv_res = ConversionResult(input=in_doc) + + _log.info(f"Processing document {in_doc.file.name}") + + if not in_doc.valid: + conv_res.status = ConversionStatus.FAILURE + return conv_res + + # TODO: propagate option for raises_on_error? + try: + # These steps are building and assembling the structure of the + # output DoclingDocument + conv_res = self._build_document(in_doc, conv_res) + conv_res = self._assemble_document(in_doc, conv_res) + # From this stage, all operations should rely only on conv_res.output + conv_res = self._enrich_document(in_doc, conv_res) + conv_res.status = self._determine_status(in_doc, conv_res) + except Exception as e: + conv_res.status = ConversionStatus.FAILURE + + return conv_res + + @abstractmethod + def _build_document( + self, in_doc: InputDocument, conv_res: ConversionResult + ) -> ConversionResult: + pass + + def _assemble_document( + self, in_doc: InputDocument, conv_res: ConversionResult + ) -> ConversionResult: + return conv_res + + def _enrich_document( + self, in_doc: InputDocument, conv_res: ConversionResult + ) -> ConversionResult: + return conv_res + + @abstractmethod + def _determine_status( + self, in_doc: InputDocument, conv_res: ConversionResult + ) -> ConversionStatus: + pass + + @classmethod + @abstractmethod + def get_default_options(cls) -> PipelineOptions: + pass + + @classmethod + @abstractmethod + def is_backend_supported(cls, backend: AbstractDocumentBackend): + pass + + # def _apply_on_elements(self, element_batch: Iterable[NodeItem]) -> Iterable[Any]: + # for model in self.build_pipe: + # element_batch = model(element_batch) + # + # yield from element_batch + + +class PaginatedPipeline(AbstractPipeline): # TODO this is a bad name. + + def _apply_on_pages(self, page_batch: Iterable[Page]) -> Iterable[Page]: + for model in self.build_pipe: + page_batch = model(page_batch) + + yield from page_batch + + def _build_document( + self, in_doc: InputDocument, conv_res: ConversionResult + ) -> ConversionResult: + + if not isinstance(in_doc._backend, PdfDocumentBackend): + raise RuntimeError( + f"The selected backend {type(in_doc._backend).__name__} for {in_doc.file} is not a PDF backend. " + f"Can not convert this with a PDF pipeline. " + f"Please check your format configuration on DocumentConverter." + ) + # conv_res.status = ConversionStatus.FAILURE + # return conv_res + + for i in range(0, in_doc.page_count): + conv_res.pages.append(Page(page_no=i)) + + try: + # Iterate batches of pages (page_batch_size) in the doc + for page_batch in chunkify(conv_res.pages, settings.perf.page_batch_size): + start_pb_time = time.time() + + # 1. Initialise the page resources + init_pages = map( + functools.partial(self.initialize_page, in_doc), page_batch + ) + + # 2. Run pipeline stages + pipeline_pages = self._apply_on_pages(init_pages) + + for p in pipeline_pages: # Must exhaust! + pass + + end_pb_time = time.time() - start_pb_time + _log.info(f"Finished converting page batch time={end_pb_time:.3f}") + + except Exception as e: + conv_res.status = ConversionStatus.FAILURE + trace = "\n".join(traceback.format_exception(e)) + _log.warning( + f"Encountered an error during conversion of document {in_doc.document_hash}:\n" + f"{trace}" + ) + # raise e # TODO Debug, should not be here. + finally: + # Always unload the PDF backend, even in case of failure + if in_doc._backend: + in_doc._backend.unload() + + return conv_res + + def _determine_status( + self, in_doc: InputDocument, conv_res: ConversionResult + ) -> ConversionStatus: + status = ConversionStatus.SUCCESS + for page in conv_res.pages: + if not page._backend.is_valid(): + conv_res.errors.append( + ErrorItem( + component_type=DoclingComponentType.DOCUMENT_BACKEND, + module_name=type(page._backend).__name__, + error_message=f"Page {page.page_no} failed to parse.", + ) + ) + status = ConversionStatus.PARTIAL_SUCCESS + + return status + + # Initialise and load resources for a page + @abstractmethod + def initialize_page(self, doc: InputDocument, page: Page) -> Page: + pass diff --git a/docling/pipeline/simple_pipeline.py b/docling/pipeline/simple_pipeline.py new file mode 100644 index 00000000..ee27cff7 --- /dev/null +++ b/docling/pipeline/simple_pipeline.py @@ -0,0 +1,59 @@ +import logging + +from docling.backend.abstract_backend import ( + AbstractDocumentBackend, + DeclarativeDocumentBackend, +) +from docling.datamodel.base_models import ConversionStatus +from docling.datamodel.document import ConversionResult, InputDocument +from docling.datamodel.pipeline_options import PipelineOptions +from docling.pipeline.base_pipeline import AbstractPipeline + +_log = logging.getLogger(__name__) + + +class SimplePipeline(AbstractPipeline): + """SimpleModelPipeline. + + This class is used at the moment for formats / backends + which produce straight DoclingDocument output. + """ + + def __init__(self, pipeline_options: PipelineOptions): + super().__init__(pipeline_options) + + def _build_document( + self, in_doc: InputDocument, conv_res: ConversionResult + ) -> ConversionResult: + + if not isinstance(in_doc._backend, DeclarativeDocumentBackend): + raise RuntimeError( + f"The selected backend {type(in_doc._backend).__name__} for {in_doc.file} is not a declarative backend. " + f"Can not convert this with simple pipeline. " + f"Please check your format configuration on DocumentConverter." + ) + # conv_res.status = ConversionStatus.FAILURE + # return conv_res + + # Instead of running a page-level pipeline to build up the document structure, + # the backend is expected to be of type DeclarativeDocumentBackend, which can output + # a DoclingDocument straight. + + conv_res.output = in_doc._backend.convert() + return conv_res + + def _determine_status( + self, in_doc: InputDocument, conv_res: ConversionResult + ) -> ConversionStatus: + # This is called only if the previous steps didn't raise. + # Since we don't have anything else to evaluate, we can + # safely return SUCCESS. + return ConversionStatus.SUCCESS + + @classmethod + def get_default_options(cls) -> PipelineOptions: + return PipelineOptions() + + @classmethod + def is_backend_supported(cls, backend: AbstractDocumentBackend): + return isinstance(backend, DeclarativeDocumentBackend) diff --git a/docling/pipeline/standard_pdf_pipeline.py b/docling/pipeline/standard_pdf_pipeline.py new file mode 100644 index 00000000..baf0e6df --- /dev/null +++ b/docling/pipeline/standard_pdf_pipeline.py @@ -0,0 +1,155 @@ +import logging +from pathlib import Path +from typing import Optional + +from docling.backend.abstract_backend import AbstractDocumentBackend +from docling.backend.pdf_backend import PdfDocumentBackend +from docling.datamodel.base_models import AssembledUnit, Page +from docling.datamodel.document import ConversionResult, InputDocument +from docling.datamodel.pipeline_options import ( + EasyOcrOptions, + PdfPipelineOptions, + TesseractCliOcrOptions, + TesseractOcrOptions, +) +from docling.models.base_ocr_model import BaseOcrModel +from docling.models.ds_glm_model import GlmModel, GlmOptions +from docling.models.easyocr_model import EasyOcrModel +from docling.models.layout_model import LayoutModel +from docling.models.page_assemble_model import PageAssembleModel, PageAssembleOptions +from docling.models.page_preprocessing_model import ( + PagePreprocessingModel, + PagePreprocessingOptions, +) +from docling.models.table_structure_model import TableStructureModel +from docling.models.tesseract_ocr_cli_model import TesseractOcrCliModel +from docling.models.tesseract_ocr_model import TesseractOcrModel +from docling.pipeline.base_pipeline import PaginatedPipeline + +_log = logging.getLogger(__name__) + + +class StandardPdfPipeline(PaginatedPipeline): + _layout_model_path = "model_artifacts/layout/beehive_v0.0.5_pt" + _table_model_path = "model_artifacts/tableformer" + + def __init__(self, pipeline_options: PdfPipelineOptions): + super().__init__(pipeline_options) + self.pipeline_options: PdfPipelineOptions + + if not pipeline_options.artifacts_path: + artifacts_path = self.download_models_hf() + + self.artifacts_path = Path(artifacts_path) + self.glm_model = GlmModel( + options=GlmOptions( + create_legacy_output=pipeline_options.create_legacy_output + ) + ) + + if (ocr_model := self.get_ocr_model()) is None: + raise RuntimeError( + f"The specified OCR kind is not supported: {pipeline_options.ocr_options.kind}." + ) + + self.build_pipe = [ + # Pre-processing + PagePreprocessingModel( + options=PagePreprocessingOptions( + images_scale=pipeline_options.images_scale + ) + ), + # OCR + ocr_model, + # Layout model + LayoutModel( + artifacts_path=artifacts_path / StandardPdfPipeline._layout_model_path + ), + # Table structure model + TableStructureModel( + enabled=pipeline_options.do_table_structure, + artifacts_path=artifacts_path / StandardPdfPipeline._table_model_path, + options=pipeline_options.table_structure_options, + ), + # Page assemble + PageAssembleModel( + options=PageAssembleOptions( + keep_images=pipeline_options.images_scale is not None + ) + ), + ] + + self.enrichment_pipe = [ + # Other models working on `NodeItem` elements in the DoclingDocument + ] + + @staticmethod + def download_models_hf( + local_dir: Optional[Path] = None, force: bool = False + ) -> Path: + from huggingface_hub import snapshot_download + + download_path = snapshot_download( + repo_id="ds4sd/docling-models", + force_download=force, + local_dir=local_dir, + revision="v2.0.1", + ) + + return Path(download_path) + + def get_ocr_model(self) -> Optional[BaseOcrModel]: + if isinstance(self.pipeline_options.ocr_options, EasyOcrOptions): + return EasyOcrModel( + enabled=self.pipeline_options.do_ocr, + options=self.pipeline_options.ocr_options, + ) + elif isinstance(self.pipeline_options.ocr_options, TesseractCliOcrOptions): + return TesseractOcrCliModel( + enabled=self.pipeline_options.do_ocr, + options=self.pipeline_options.ocr_options, + ) + elif isinstance(self.pipeline_options.ocr_options, TesseractOcrOptions): + return TesseractOcrModel( + enabled=self.pipeline_options.do_ocr, + options=self.pipeline_options.ocr_options, + ) + return None + + def initialize_page(self, doc: InputDocument, page: Page) -> Page: + page._backend = doc._backend.load_page(page.page_no) + page.size = page._backend.get_size() + + return page + + def _assemble_document( + self, in_doc: InputDocument, conv_res: ConversionResult + ) -> ConversionResult: + all_elements = [] + all_headers = [] + all_body = [] + + for p in conv_res.pages: + + for el in p.assembled.body: + all_body.append(el) + for el in p.assembled.headers: + all_headers.append(el) + for el in p.assembled.elements: + all_elements.append(el) + + conv_res.assembled = AssembledUnit( + elements=all_elements, headers=all_headers, body=all_body + ) + + conv_res.output, conv_res.legacy_output = self.glm_model(conv_res) + + return conv_res + + @classmethod + def get_default_options(cls) -> PdfPipelineOptions: + return PdfPipelineOptions() + + @classmethod + def is_backend_supported(cls, backend: AbstractDocumentBackend): + return isinstance(backend, PdfDocumentBackend)