initial refactor iteration

Signed-off-by: Michele Dolfi <dol@zurich.ibm.com>
This commit is contained in:
Michele Dolfi 2024-10-11 16:31:13 +02:00
parent 136f16e85a
commit ca2a96d982
2 changed files with 72 additions and 47 deletions

View File

@ -25,15 +25,48 @@ class AbstractModelPipeline(ABC):
def __init__(self, pipeline_options: PipelineOptions): def __init__(self, pipeline_options: PipelineOptions):
self.pipeline_options = pipeline_options self.pipeline_options = pipeline_options
self.model_pipe: List[Callable] = [] self.model_pipe: List[Callable] = []
self.enrichment_pipe: List[Callable] = []
def execute(self, in_doc: InputDocument) -> ConversionResult:
conv_res = ConversionResult(input=in_doc)
_log.info(f"Processing document {in_doc.file.name}")
if not in_doc.valid:
conv_res.status = ConversionStatus.FAILURE
return conv_res
# TODO: propagate option for raises_on_error?
try:
conv_res = self._build_document(in_doc, conv_res)
conv_res = self._assemble_document(in_doc, conv_res)
conv_res = self._enrich_document(in_doc, conv_res)
conv_res.status = self._determine_status(in_doc, conv_res)
except Exception as e:
conv_res.status = ConversionStatus.FAILURE
return conv_res
@abstractmethod @abstractmethod
def execute(self, in_doc: InputDocument) -> ConversionResult: def _build_document(
self, in_doc: InputDocument, conv_res: ConversionResult
) -> ConversionResult:
pass pass
@abstractmethod
def _assemble_document( def _assemble_document(
self, in_doc: InputDocument, conv_res: ConversionResult self, in_doc: InputDocument, conv_res: ConversionResult
) -> ConversionResult: ) -> ConversionResult:
return conv_res
def _enrich_document(
self, in_doc: InputDocument, conv_res: ConversionResult
) -> ConversionResult:
pass
@abstractmethod
def _determine_status(
self, in_doc: InputDocument, conv_res: ConversionResult
) -> ConversionStatus:
pass pass
@classmethod @classmethod
@ -46,6 +79,12 @@ class AbstractModelPipeline(ABC):
def is_backend_supported(cls, backend: AbstractDocumentBackend): def is_backend_supported(cls, backend: AbstractDocumentBackend):
pass pass
# def _apply_on_elements(self, element_batch: Iterable[NodeItem]) -> Iterable[Any]:
# for model in self.model_pipe:
# element_batch = model(element_batch)
#
# yield from element_batch
class PaginatedModelPipeline(AbstractModelPipeline): # TODO this is a bad name. class PaginatedModelPipeline(AbstractModelPipeline): # TODO this is a bad name.
@ -55,10 +94,9 @@ class PaginatedModelPipeline(AbstractModelPipeline): # TODO this is a bad name.
yield from page_batch yield from page_batch
def execute(self, in_doc: InputDocument) -> ConversionResult: def _build_document(
conv_res = ConversionResult(input=in_doc) self, in_doc: InputDocument, conv_res: ConversionResult
) -> ConversionResult:
_log.info(f"Processing document {in_doc.file.name}")
if not isinstance(in_doc._backend, PdfDocumentBackend): if not isinstance(in_doc._backend, PdfDocumentBackend):
raise RuntimeError( raise RuntimeError(
@ -91,8 +129,24 @@ class PaginatedModelPipeline(AbstractModelPipeline): # TODO this is a bad name.
end_pb_time = time.time() - start_pb_time end_pb_time = time.time() - start_pb_time
_log.info(f"Finished converting page batch time={end_pb_time:.3f}") _log.info(f"Finished converting page batch time={end_pb_time:.3f}")
conv_res = self._assemble_document(in_doc, conv_res) except Exception as e:
conv_res.status = ConversionStatus.FAILURE
trace = "\n".join(traceback.format_exception(e))
_log.warning(
f"Encountered an error during conversion of document {in_doc.document_hash}:\n"
f"{trace}"
)
# raise e # TODO Debug, should not be here.
finally:
# Always unload the PDF backend, even in case of failure
if in_doc._backend:
in_doc._backend.unload()
return conv_res
def _determine_status(
self, in_doc: InputDocument, conv_res: ConversionResult
) -> ConversionStatus:
status = ConversionStatus.SUCCESS status = ConversionStatus.SUCCESS
for page in conv_res.pages: for page in conv_res.pages:
if not page._backend.is_valid(): if not page._backend.is_valid():
@ -105,22 +159,7 @@ class PaginatedModelPipeline(AbstractModelPipeline): # TODO this is a bad name.
) )
status = ConversionStatus.PARTIAL_SUCCESS status = ConversionStatus.PARTIAL_SUCCESS
conv_res.status = status return status
except Exception as e:
conv_res.status = ConversionStatus.FAILURE
trace = "\n".join(traceback.format_exception(e))
_log.info(
f"Encountered an error during conversion of document {in_doc.document_hash}:\n"
f"{trace}"
)
raise e # TODO Debug, should not be here.
finally:
# Always unload the PDF backend, even in case of failure
if in_doc._backend:
in_doc._backend.unload()
return conv_res
# Initialise and load resources for a page # Initialise and load resources for a page
@abstractmethod @abstractmethod

View File

@ -25,14 +25,9 @@ class SimpleModelPipeline(AbstractModelPipeline):
def __init__(self, pipeline_options: PipelineOptions): def __init__(self, pipeline_options: PipelineOptions):
super().__init__(pipeline_options) super().__init__(pipeline_options)
def execute(self, in_doc: InputDocument) -> ConversionResult: def _build_document(
conv_res = ConversionResult(input=in_doc) self, in_doc: InputDocument, conv_res: ConversionResult
) -> ConversionResult:
_log.info(f"Processing document {in_doc.file.name}")
if not in_doc.valid:
conv_res.status = ConversionStatus.FAILURE
return conv_res
if not isinstance(in_doc._backend, DeclarativeDocumentBackend): if not isinstance(in_doc._backend, DeclarativeDocumentBackend):
raise RuntimeError( raise RuntimeError(
@ -48,24 +43,15 @@ class SimpleModelPipeline(AbstractModelPipeline):
# a DoclingDocument straight. # a DoclingDocument straight.
conv_res.output = in_doc._backend.convert() conv_res.output = in_doc._backend.convert()
# Do other stuff with conv_res.experimental
conv_res = self._assemble_document(in_doc, conv_res)
conv_res.status = ConversionStatus.SUCCESS
return conv_res return conv_res
# def _apply_on_elements(self, element_batch: Iterable[NodeItem]) -> Iterable[Any]: def _determine_status(
# for model in self.model_pipe:
# element_batch = model(element_batch)
#
# yield from element_batch
def _assemble_document(
self, in_doc: InputDocument, conv_res: ConversionResult self, in_doc: InputDocument, conv_res: ConversionResult
) -> ConversionResult: ) -> ConversionStatus:
return conv_res # This is called only if the previous steps didn't raise.
# Since we don't have anything else to evaluate, we can
# safely return SUCCESS.
return ConversionStatus.SUCCESS
@classmethod @classmethod
def get_default_options(cls) -> PipelineOptions: def get_default_options(cls) -> PipelineOptions: