diff --git a/docling/pipeline/base_model_pipeline.py b/docling/pipeline/base_model_pipeline.py index aa4384b2..7e3f9fea 100644 --- a/docling/pipeline/base_model_pipeline.py +++ b/docling/pipeline/base_model_pipeline.py @@ -25,15 +25,48 @@ class AbstractModelPipeline(ABC): def __init__(self, pipeline_options: PipelineOptions): self.pipeline_options = pipeline_options self.model_pipe: List[Callable] = [] + self.enrichment_pipe: List[Callable] = [] + + def execute(self, in_doc: InputDocument) -> ConversionResult: + conv_res = ConversionResult(input=in_doc) + + _log.info(f"Processing document {in_doc.file.name}") + + if not in_doc.valid: + conv_res.status = ConversionStatus.FAILURE + return conv_res + + # TODO: propagate option for raises_on_error? + try: + conv_res = self._build_document(in_doc, conv_res) + conv_res = self._assemble_document(in_doc, conv_res) + conv_res = self._enrich_document(in_doc, conv_res) + conv_res.status = self._determine_status(in_doc, conv_res) + except Exception as e: + conv_res.status = ConversionStatus.FAILURE + + return conv_res @abstractmethod - def execute(self, in_doc: InputDocument) -> ConversionResult: + def _build_document( + self, in_doc: InputDocument, conv_res: ConversionResult + ) -> ConversionResult: pass - @abstractmethod def _assemble_document( self, in_doc: InputDocument, conv_res: ConversionResult ) -> ConversionResult: + return conv_res + + def _enrich_document( + self, in_doc: InputDocument, conv_res: ConversionResult + ) -> ConversionResult: + return conv_res + + @abstractmethod + def _determine_status( + self, in_doc: InputDocument, conv_res: ConversionResult + ) -> ConversionStatus: pass @classmethod @@ -46,6 +79,12 @@ class AbstractModelPipeline(ABC): def is_backend_supported(cls, backend: AbstractDocumentBackend): pass + # def _apply_on_elements(self, element_batch: Iterable[NodeItem]) -> Iterable[Any]: + # for model in self.model_pipe: + # element_batch = model(element_batch) + # + # yield from element_batch + class PaginatedModelPipeline(AbstractModelPipeline): # TODO this is a bad name. @@ -55,10 +94,9 @@ class PaginatedModelPipeline(AbstractModelPipeline): # TODO this is a bad name. yield from page_batch - def execute(self, in_doc: InputDocument) -> ConversionResult: - conv_res = ConversionResult(input=in_doc) - - _log.info(f"Processing document {in_doc.file.name}") + def _build_document( + self, in_doc: InputDocument, conv_res: ConversionResult + ) -> ConversionResult: if not isinstance(in_doc._backend, PdfDocumentBackend): raise RuntimeError( @@ -91,30 +129,14 @@ class PaginatedModelPipeline(AbstractModelPipeline): # TODO this is a bad name. end_pb_time = time.time() - start_pb_time _log.info(f"Finished converting page batch time={end_pb_time:.3f}") - conv_res = self._assemble_document(in_doc, conv_res) - - status = ConversionStatus.SUCCESS - for page in conv_res.pages: - if not page._backend.is_valid(): - conv_res.errors.append( - ErrorItem( - component_type=DoclingComponentType.DOCUMENT_BACKEND, - module_name=type(page._backend).__name__, - error_message=f"Page {page.page_no} failed to parse.", - ) - ) - status = ConversionStatus.PARTIAL_SUCCESS - - conv_res.status = status - except Exception as e: conv_res.status = ConversionStatus.FAILURE trace = "\n".join(traceback.format_exception(e)) - _log.info( + _log.warning( f"Encountered an error during conversion of document {in_doc.document_hash}:\n" f"{trace}" ) - raise e # TODO Debug, should not be here. + # raise e # TODO Debug, should not be here. finally: # Always unload the PDF backend, even in case of failure if in_doc._backend: @@ -122,6 +144,23 @@ class PaginatedModelPipeline(AbstractModelPipeline): # TODO this is a bad name. return conv_res + def _determine_status( + self, in_doc: InputDocument, conv_res: ConversionResult + ) -> ConversionStatus: + status = ConversionStatus.SUCCESS + for page in conv_res.pages: + if not page._backend.is_valid(): + conv_res.errors.append( + ErrorItem( + component_type=DoclingComponentType.DOCUMENT_BACKEND, + module_name=type(page._backend).__name__, + error_message=f"Page {page.page_no} failed to parse.", + ) + ) + status = ConversionStatus.PARTIAL_SUCCESS + + return status + # Initialise and load resources for a page @abstractmethod def initialize_page(self, doc: InputDocument, page: Page) -> Page: diff --git a/docling/pipeline/simple_model_pipeline.py b/docling/pipeline/simple_model_pipeline.py index 98708a53..ee5febab 100644 --- a/docling/pipeline/simple_model_pipeline.py +++ b/docling/pipeline/simple_model_pipeline.py @@ -22,14 +22,9 @@ class SimpleModelPipeline(AbstractModelPipeline): def __init__(self, pipeline_options: PipelineOptions): super().__init__(pipeline_options) - def execute(self, in_doc: InputDocument) -> ConversionResult: - conv_res = ConversionResult(input=in_doc) - - _log.info(f"Processing document {in_doc.file.name}") - - if not in_doc.valid: - conv_res.status = ConversionStatus.FAILURE - return conv_res + def _build_document( + self, in_doc: InputDocument, conv_res: ConversionResult + ) -> ConversionResult: if not isinstance(in_doc._backend, DeclarativeDocumentBackend): raise RuntimeError( @@ -45,24 +40,15 @@ class SimpleModelPipeline(AbstractModelPipeline): # a DoclingDocument straight. conv_res.output = in_doc._backend.convert() - # Do other stuff with conv_res.experimental - - conv_res = self._assemble_document(in_doc, conv_res) - - conv_res.status = ConversionStatus.SUCCESS - return conv_res - # def _apply_on_elements(self, element_batch: Iterable[NodeItem]) -> Iterable[Any]: - # for model in self.model_pipe: - # element_batch = model(element_batch) - # - # yield from element_batch - - def _assemble_document( + def _determine_status( self, in_doc: InputDocument, conv_res: ConversionResult - ) -> ConversionResult: - return conv_res + ) -> ConversionStatus: + # This is called only if the previous steps didn't raise. + # Since we don't have anything else to evaluate, we can + # safely return SUCCESS. + return ConversionStatus.SUCCESS @classmethod def get_default_options(cls) -> PipelineOptions: