propagate raises, add enrichment model, some renaming

Signed-off-by: Michele Dolfi <dol@zurich.ibm.com>
This commit is contained in:
Michele Dolfi 2024-10-13 16:03:19 +02:00
parent 941b51aa3e
commit c1ed447c21
12 changed files with 118 additions and 76 deletions

View File

@ -14,6 +14,7 @@ class BatchConcurrencySettings(BaseModel):
doc_batch_concurrency: int = 2
page_batch_size: int = 4
page_batch_concurrency: int = 2
elements_batch_size: int = 16
# doc_batch_size: int = 1
# doc_batch_concurrency: int = 1

View File

@ -1,6 +1,7 @@
import logging
import sys
import time
from functools import partial
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Type
@ -19,7 +20,7 @@ from docling.datamodel.document import (
)
from docling.datamodel.pipeline_options import PipelineOptions
from docling.datamodel.settings import DocumentLimits, settings
from docling.pipeline.base_pipeline import AbstractPipeline
from docling.pipeline.base_pipeline import BasePipeline
from docling.pipeline.simple_pipeline import SimplePipeline
from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline
from docling.utils.utils import chunkify
@ -28,7 +29,7 @@ _log = logging.getLogger(__name__)
class FormatOption(BaseModel):
pipeline_cls: Type[AbstractPipeline]
pipeline_cls: Type[BasePipeline]
pipeline_options: Optional[PipelineOptions] = None
backend: Type[AbstractDocumentBackend]
@ -110,7 +111,7 @@ class DocumentConverter:
_log.info(f"Requested format {f} will use default options.")
self.format_to_options[f] = _format_to_default_options[f]
self.initialized_pipelines: Dict[Type[AbstractPipeline], AbstractPipeline] = {}
self.initialized_pipelines: Dict[Type[BasePipeline], BasePipeline] = {}
@validate_call(config=ConfigDict(strict=True))
def convert(
@ -145,7 +146,7 @@ class DocumentConverter:
path_or_stream_iterator=source,
limit=limits,
)
conv_res_iter = self._convert(conv_input)
conv_res_iter = self._convert(conv_input, raises_on_error=raises_on_error)
for conv_res in conv_res_iter:
if raises_on_error and conv_res.status not in {
ConversionStatus.SUCCESS,
@ -158,7 +159,7 @@ class DocumentConverter:
yield conv_res
def _convert(
self, conv_input: _DocumentConversionInput
self, conv_input: _DocumentConversionInput, raises_on_error: bool
) -> Iterable[ConversionResult]:
for input_batch in chunkify(
conv_input.docs(self.format_to_options),
@ -172,11 +173,14 @@ class DocumentConverter:
# yield from pool.map(self.process_document, input_batch)
# Note: PDF backends are not thread-safe, thread pool usage was disabled.
for item in map(self.process_document, input_batch):
for item in map(
partial(self.process_document, raises_on_error=raises_on_error),
input_batch,
):
if item is not None:
yield item
def _get_pipeline(self, doc: InputDocument) -> Optional[AbstractPipeline]:
def _get_pipeline(self, doc: InputDocument) -> Optional[BasePipeline]:
fopt = self.format_to_options.get(doc.format)
if fopt is None:
@ -196,20 +200,24 @@ class DocumentConverter:
)
return self.initialized_pipelines[pipeline_class]
def process_document(self, in_doc: InputDocument) -> ConversionResult:
def process_document(
self, in_doc: InputDocument, raises_on_error: bool
) -> ConversionResult:
if in_doc.format not in self.allowed_formats:
return None
else:
start_doc_time = time.time()
conv_res = self._execute_pipeline(in_doc)
conv_res = self._execute_pipeline(in_doc, raises_on_error=raises_on_error)
end_doc_time = time.time() - start_doc_time
_log.info(f"Finished converting document in {end_doc_time:.2f} seconds.")
return conv_res
def _execute_pipeline(self, in_doc: InputDocument) -> Optional[ConversionResult]:
def _execute_pipeline(
self, in_doc: InputDocument, raises_on_error: bool
) -> Optional[ConversionResult]:
if in_doc.valid:
pipeline = self._get_pipeline(in_doc)
if pipeline is None: # Can't find a default pipeline. Should this raise?
@ -217,7 +225,7 @@ class DocumentConverter:
conv_res.status = ConversionStatus.FAILURE
return conv_res
conv_res = pipeline.execute(in_doc)
conv_res = pipeline.execute(in_doc, raises_on_error=raises_on_error)
else:
# invalid doc or not of desired format

View File

@ -6,15 +6,20 @@ from docling_core.types.experimental import DoclingDocument, NodeItem
from docling.datamodel.base_models import Page
class AbstractPageModel(ABC):
class BasePageModel(ABC):
@abstractmethod
def __call__(self, page_batch: Iterable[Page]) -> Iterable[Page]:
pass
class AbstractEnrichmentModel(ABC):
class BaseEnrichmentModel(ABC):
@abstractmethod
def is_processable(self, doc: DoclingDocument, element: NodeItem) -> bool:
pass
@abstractmethod
def __call__(
self, doc: DoclingDocument, elements: Iterable[NodeItem]
self, doc: DoclingDocument, element_batch: Iterable[NodeItem]
) -> Iterable[Any]:
pass

View File

@ -0,0 +1,24 @@
from typing import Any, Iterable
from docling_core.types.experimental import DoclingDocument, NodeItem
from docling_core.types.experimental.document import BasePictureData, PictureItem
from docling.models.base_model import BaseEnrichmentModel
class DummyPictureData(BasePictureData):
hello: str
class DummyPictureClassifierEnrichmentModel(BaseEnrichmentModel):
def is_processable(self, doc: DoclingDocument, element: NodeItem) -> bool:
return isinstance(element, PictureItem)
def __call__(
self, doc: DoclingDocument, element_batch: Iterable[NodeItem]
) -> Iterable[Any]:
for element in element_batch:
assert isinstance(element, PictureItem)
element.data = DummyPictureData(hello="world")
yield element

View File

@ -17,13 +17,13 @@ from docling.datamodel.base_models import (
LayoutPrediction,
Page,
)
from docling.models.abstract_model import AbstractPageModel
from docling.models.base_model import BasePageModel
from docling.utils import layout_utils as lu
_log = logging.getLogger(__name__)
class LayoutModel(AbstractPageModel):
class LayoutModel(BasePageModel):
TEXT_ELEM_LABELS = [
DocItemLabel.TEXT,

View File

@ -12,7 +12,7 @@ from docling.datamodel.base_models import (
Table,
TextElement,
)
from docling.models.abstract_model import AbstractPageModel
from docling.models.base_model import BasePageModel
from docling.models.layout_model import LayoutModel
_log = logging.getLogger(__name__)
@ -22,7 +22,7 @@ class PageAssembleOptions(BaseModel):
keep_images: bool = False
class PageAssembleModel(AbstractPageModel):
class PageAssembleModel(BasePageModel):
def __init__(self, options: PageAssembleOptions):
self.options = options

View File

@ -4,14 +4,14 @@ from PIL import ImageDraw
from pydantic import BaseModel
from docling.datamodel.base_models import Page
from docling.models.abstract_model import AbstractPageModel
from docling.models.base_model import BasePageModel
class PagePreprocessingOptions(BaseModel):
images_scale: Optional[float]
class PagePreprocessingModel(AbstractPageModel):
class PagePreprocessingModel(BasePageModel):
def __init__(self, options: PagePreprocessingOptions):
self.options = options

View File

@ -11,10 +11,10 @@ from PIL import ImageDraw
from docling.datamodel.base_models import Page, Table, TableStructurePrediction
from docling.datamodel.pipeline_options import TableFormerMode, TableStructureOptions
from docling.models.abstract_model import AbstractPageModel
from docling.models.base_model import BasePageModel
class TableStructureModel(AbstractPageModel):
class TableStructureModel(BasePageModel):
def __init__(
self, enabled: bool, artifacts_path: Path, options: TableStructureOptions
):

View File

@ -5,6 +5,8 @@ import traceback
from abc import ABC, abstractmethod
from typing import Callable, Iterable, List
from docling_core.types.experimental import DoclingDocument, NodeItem
from docling.backend.abstract_backend import AbstractDocumentBackend
from docling.backend.pdf_backend import PdfDocumentBackend
from docling.datamodel.base_models import (
@ -16,18 +18,19 @@ from docling.datamodel.base_models import (
from docling.datamodel.document import ConversionResult, InputDocument
from docling.datamodel.pipeline_options import PipelineOptions
from docling.datamodel.settings import settings
from docling.models.base_model import BaseEnrichmentModel
from docling.utils.utils import chunkify
_log = logging.getLogger(__name__)
class AbstractPipeline(ABC):
class BasePipeline(ABC):
def __init__(self, pipeline_options: PipelineOptions):
self.pipeline_options = pipeline_options
self.build_pipe: List[Callable] = []
self.enrichment_pipe: List[Callable] = []
self.enrichment_pipe: List[BaseEnrichmentModel] = []
def execute(self, in_doc: InputDocument) -> ConversionResult:
def execute(self, in_doc: InputDocument, raises_on_error: bool) -> ConversionResult:
conv_res = ConversionResult(input=in_doc)
_log.info(f"Processing document {in_doc.file.name}")
@ -47,6 +50,8 @@ class AbstractPipeline(ABC):
conv_res.status = self._determine_status(in_doc, conv_res)
except Exception as e:
conv_res.status = ConversionStatus.FAILURE
if raises_on_error:
raise e
return conv_res
@ -64,6 +69,26 @@ class AbstractPipeline(ABC):
def _enrich_document(
self, in_doc: InputDocument, conv_res: ConversionResult
) -> ConversionResult:
def _filter_elements(
doc: DoclingDocument, model: BaseEnrichmentModel
) -> Iterable[NodeItem]:
for element, _level in doc.iterate_items():
if model.is_processable(doc=doc, element=element):
yield element
for model in self.enrichment_pipe:
for element_batch in chunkify(
_filter_elements(conv_res.output, model),
settings.perf.elements_batch_size,
):
# TODO: currently we assume the element itself is modified, because
# we don't have an interface to save the element back to the document
for element in model(
doc=conv_res.output, element_batch=element_batch
): # Must exhaust!
pass
return conv_res
@abstractmethod
@ -89,7 +114,7 @@ class AbstractPipeline(ABC):
# yield from element_batch
class PaginatedPipeline(AbstractPipeline): # TODO this is a bad name.
class PaginatedPipeline(BasePipeline): # TODO this is a bad name.
def _apply_on_pages(self, page_batch: Iterable[Page]) -> Iterable[Page]:
for model in self.build_pipe:
@ -139,7 +164,8 @@ class PaginatedPipeline(AbstractPipeline): # TODO this is a bad name.
f"Encountered an error during conversion of document {in_doc.document_hash}:\n"
f"{trace}"
)
# raise e # TODO Debug, should not be here.
raise e
finally:
# Always unload the PDF backend, even in case of failure
if in_doc._backend:

View File

@ -7,12 +7,12 @@ from docling.backend.abstract_backend import (
from docling.datamodel.base_models import ConversionStatus
from docling.datamodel.document import ConversionResult, InputDocument
from docling.datamodel.pipeline_options import PipelineOptions
from docling.pipeline.base_pipeline import AbstractPipeline
from docling.pipeline.base_pipeline import BasePipeline
_log = logging.getLogger(__name__)
class SimplePipeline(AbstractPipeline):
class SimplePipeline(BasePipeline):
"""SimpleModelPipeline.
This class is used at the moment for formats / backends

View File

@ -14,6 +14,9 @@ from docling.datamodel.pipeline_options import (
)
from docling.models.base_ocr_model import BaseOcrModel
from docling.models.ds_glm_model import GlmModel, GlmOptions
from docling.models.dummy_picture_enrichment import (
DummyPictureClassifierEnrichmentModel,
)
from docling.models.easyocr_model import EasyOcrModel
from docling.models.layout_model import LayoutModel
from docling.models.page_assemble_model import PageAssembleModel, PageAssembleOptions
@ -81,6 +84,7 @@ class StandardPdfPipeline(PaginatedPipeline):
self.enrichment_pipe = [
# Other models working on `NodeItem` elements in the DoclingDocument
# DummyPictureClassifierEnrichmentModel()
]
@staticmethod

View File

@ -17,51 +17,6 @@ from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline
_log = logging.getLogger(__name__)
def export_documents(
conv_results: Iterable[ConversionResult],
output_dir: Path,
):
output_dir.mkdir(parents=True, exist_ok=True)
success_count = 0
failure_count = 0
for conv_res in conv_results:
if conv_res.status == ConversionStatus.SUCCESS:
success_count += 1
doc_filename = conv_res.input.file.stem
# Export Deep Search document JSON format:
with (output_dir / f"{doc_filename}.json").open(
"w", encoding="utf-8"
) as fp:
fp.write(json.dumps(conv_res.render_as_dict()))
# Export Text format:
with (output_dir / f"{doc_filename}.txt").open("w", encoding="utf-8") as fp:
fp.write(conv_res.render_as_text())
# Export Markdown format:
with (output_dir / f"{doc_filename}.md").open("w", encoding="utf-8") as fp:
fp.write(conv_res.render_as_markdown())
# Export Document Tags format:
with (output_dir / f"{doc_filename}.doctags").open(
"w", encoding="utf-8"
) as fp:
fp.write(conv_res.render_as_doctags())
else:
_log.info(f"Document {conv_res.input.file} failed to convert.")
failure_count += 1
_log.info(
f"Processed {success_count + failure_count} docs, of which {failure_count} failed"
)
return success_count, failure_count
def main():
logging.basicConfig(level=logging.INFO)
@ -151,13 +106,32 @@ def main():
###########################################################################
start_time = time.time()
conv_result = doc_converter.convert(input_doc_path)
end_time = time.time() - start_time
_log.info(f"Document converted in {end_time:.2f} seconds.")
## Export results
output_dir = Path("./scratch")
output_dir.mkdir(parents=True, exist_ok=True)
doc_filename = conv_result.input.file.stem
# Export Deep Search document JSON format:
with (output_dir / f"{doc_filename}.json").open("w", encoding="utf-8") as fp:
fp.write(json.dumps(conv_result.output.export_to_dict()))
# Export Text format:
with (output_dir / f"{doc_filename}.txt").open("w", encoding="utf-8") as fp:
fp.write(conv_result.output.export_to_text())
# Export Markdown format:
with (output_dir / f"{doc_filename}.md").open("w", encoding="utf-8") as fp:
fp.write(conv_result.output.export_to_markdown())
# Export Document Tags format:
with (output_dir / f"{doc_filename}.doctags").open("w", encoding="utf-8") as fp:
fp.write(conv_result.output.export_to_document_tokens())
if __name__ == "__main__":
main()