Lots of improvements

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>
This commit is contained in:
Christoph Auer
2024-10-08 16:38:42 +02:00
parent c0447206af
commit 203cf19b1b
10 changed files with 169 additions and 77 deletions

View File

@@ -98,21 +98,18 @@ class InputDocument(BaseModel):
def __init__(
self,
path_or_stream: Union[BytesIO, Path],
format: InputFormat,
backend: AbstractDocumentBackend,
filename: Optional[str] = None,
limits: Optional[DocumentLimits] = None,
backend: Optional[Type[AbstractDocumentBackend]] = None,
format: Optional[InputFormat] = None,
):
super().__init__()
self.limits = limits or DocumentLimits()
self.format = format
try:
if isinstance(path_or_stream, Path):
mime = filetype.guess_mime(str(path_or_stream))
if mime is None:
if path_or_stream.suffix == ".html":
mime = "text/html"
self.file = path_or_stream
self.filesize = path_or_stream.stat().st_size
@@ -121,11 +118,9 @@ class InputDocument(BaseModel):
else:
self.document_hash = create_file_hash(path_or_stream)
self._init_doc(backend, mime, path_or_stream)
self._init_doc(backend, path_or_stream)
elif isinstance(path_or_stream, BytesIO):
mime = filetype.guess_mime(path_or_stream.read(8192))
self.file = PurePath(filename)
self.filesize = path_or_stream.getbuffer().nbytes
@@ -134,7 +129,7 @@ class InputDocument(BaseModel):
else:
self.document_hash = create_file_hash(path_or_stream)
self._init_doc(backend, mime, path_or_stream)
self._init_doc(backend, path_or_stream)
# For paginated backends, check if the maximum page count is exceeded.
if self.valid and self._backend.is_valid():
@@ -158,23 +153,19 @@ class InputDocument(BaseModel):
def _init_doc(
self,
backend: AbstractDocumentBackend,
mime: str,
path_or_stream: Union[BytesIO, Path],
) -> None:
self.format = MimeTypeToFormat.get(mime)
if self.format is not None:
backend = backend or _input_format_default_backends.get(self.format)
if backend is None:
backend = _input_format_default_backends.get(self.format)
if backend is None:
self.valid = False
raise RuntimeError(
f"Could not find suitable default backend for format: {self.format}"
f"Could not find suitable backend for file: {self.file}"
)
if self.format is None or self.format not in backend.supported_formats():
# TODO decide if to raise exception here too.
self.valid = False
else:
self._backend = backend(
path_or_stream=path_or_stream, document_hash=self.document_hash
)
self._backend = backend(
path_or_stream=path_or_stream, document_hash=self.document_hash
)
@deprecated("Use `ConversionResult` instead.")
@@ -478,17 +469,46 @@ class DocumentConversionInput(BaseModel):
limits: Optional[DocumentLimits] = DocumentLimits()
def docs(
self, backend: Optional[Type[AbstractDocumentBackend]] = None
self, format_options: Dict[InputFormat, "FormatOption"]
) -> Iterable[InputDocument]:
for obj in self._path_or_stream_iterator:
if isinstance(obj, Path):
mime = filetype.guess_mime(str(obj))
if mime is None:
if obj.suffix == ".html":
mime = "text/html"
format = MimeTypeToFormat.get(mime)
if format not in format_options.keys():
continue
else:
backend = format_options.get(format).backend
yield InputDocument(
path_or_stream=obj, limits=self.limits, backend=backend
path_or_stream=obj,
format=format,
limits=self.limits,
backend=backend,
)
elif isinstance(obj, DocumentStream):
mime = filetype.guess_mime(obj.stream.read(8192))
obj.stream.seek(0)
if mime is None:
if obj.suffix == ".html":
mime = "text/html"
format = MimeTypeToFormat.get(mime)
if format not in format_options.keys():
continue
else:
backend = format_options.get(format).backend
yield InputDocument(
path_or_stream=obj.stream,
format=format,
filename=obj.filename,
limits=self.limits,
backend=backend,

View File

@@ -8,6 +8,7 @@ import requests
from pydantic import AnyHttpUrl, BaseModel, ConfigDict, TypeAdapter, ValidationError
from docling.backend.abstract_backend import AbstractDocumentBackend
from docling.backend.docling_parse_backend import DoclingParseDocumentBackend
from docling.datamodel.base_models import ConversionStatus, InputFormat
from docling.datamodel.document import (
ConversionResult,
@@ -27,7 +28,7 @@ _log = logging.getLogger(__name__)
class FormatOption(BaseModel):
pipeline_cls: Type[BaseModelPipeline]
pipeline_options: Optional[PipelineOptions] = None
backend: Optional[Type[AbstractDocumentBackend]]
backend: Optional[Type[AbstractDocumentBackend]] = None
model_config = ConfigDict(arbitrary_types_allowed=True)
@@ -47,11 +48,29 @@ class FormatOption(BaseModel):
)
class PdfFormatOption(FormatOption):
def __init__(
self,
pipeline_cls: Optional[Type[BaseModelPipeline]] = None,
pipeline_options: Optional[PipelineOptions] = None,
backend: Optional[Type[AbstractDocumentBackend]] = None,
):
if pipeline_cls is None:
pipeline_cls = StandardPdfModelPipeline
if backend is None:
backend = DoclingParseDocumentBackend
super().__init__(
pipeline_cls=pipeline_cls,
pipeline_options=pipeline_options,
backend=backend,
)
_format_to_default_options = {
InputFormat.DOCX: FormatOption(pipeline_cls=SimpleModelPipeline),
InputFormat.PPTX: FormatOption(pipeline_cls=SimpleModelPipeline),
InputFormat.HTML: FormatOption(pipeline_cls=SimpleModelPipeline),
InputFormat.IMAGE: None,
InputFormat.IMAGE: FormatOption(pipeline_cls=StandardPdfModelPipeline),
InputFormat.PDF: FormatOption(pipeline_cls=StandardPdfModelPipeline),
}
@@ -61,11 +80,26 @@ class DocumentConverter:
def __init__(
self,
formats: List[InputFormat] = [e for e in InputFormat],
format_options: Dict[InputFormat, FormatOption] = _format_to_default_options,
formats: Optional[List[InputFormat]] = None,
format_options: Optional[Dict[InputFormat, FormatOption]] = None,
):
self.formats = formats
self.format_to_options = format_options
if self.formats is None:
if self.format_to_options is not None:
self.formats = self.format_to_options.keys()
else:
self.formats = [e for e in InputFormat] # all formats
if self.format_to_options is None:
self.format_to_options = _format_to_default_options
for f in self.formats:
if f not in self.format_to_options.keys():
_log.info(f"Requested format {f} will use default options.")
self.format_to_options[f] = _format_to_default_options[f]
self.initialized_pipelines: Dict[Type[BaseModelPipeline], BaseModelPipeline] = (
{}
)
@@ -73,7 +107,8 @@ class DocumentConverter:
def convert(self, input: DocumentConversionInput) -> Iterable[ConversionResult]:
for input_batch in chunkify(
input.docs(), settings.perf.doc_batch_size # pass format_options
input.docs(self.format_to_options),
settings.perf.doc_batch_size, # pass format_options
):
_log.info(f"Going to convert document batch...")
# parallel processing only within input_batch
@@ -83,7 +118,9 @@ class DocumentConverter:
# yield from pool.map(self.process_document, input_batch)
# Note: PDF backends are not thread-safe, thread pool usage was disabled.
yield from map(self.process_document, input_batch)
for item in map(self.process_document, input_batch):
if item is not None:
yield item
def convert_single(self, source: Path | AnyHttpUrl | str) -> ConversionResult:
"""Convert a single document.
@@ -136,31 +173,40 @@ class DocumentConverter:
return conv_res
def _get_pipeline(self, doc: InputDocument) -> Optional[BaseModelPipeline]:
pipeline_class = None
fopt = self.format_to_options.get(doc.format)
if fopt is None:
return None
raise RuntimeError(f"Could not get pipeline for document {doc.file}")
else:
pipeline_class = fopt.pipeline_cls
pipeline_options = fopt.pipeline_options
if pipeline_class not in self.initialized_pipelines:
# TODO this will ignore if different options have been defined for the same pipeline class.
if (
pipeline_class not in self.initialized_pipelines
or self.initialized_pipelines[pipeline_class].pipeline_options
!= pipeline_options
):
self.initialized_pipelines[pipeline_class] = pipeline_class(
pipeline_options=pipeline_class.get_default_options()
pipeline_options=pipeline_options
)
return self.initialized_pipelines[pipeline_class]
def process_document(self, in_doc: InputDocument) -> ConversionResult:
start_doc_time = time.time()
if in_doc.format not in self.formats:
return None
else:
start_doc_time = time.time()
conv_res = self._execute_pipeline(in_doc)
conv_res = self._execute_pipeline(in_doc)
end_doc_time = time.time() - start_doc_time
_log.info(f"Finished converting document in {end_doc_time:.2f} seconds.")
end_doc_time = time.time() - start_doc_time
_log.info(f"Finished converting document in {end_doc_time:.2f} seconds.")
return conv_res
return conv_res
def _execute_pipeline(self, in_doc: InputDocument) -> ConversionResult:
if in_doc.valid and in_doc.format in self.formats:
def _execute_pipeline(self, in_doc: InputDocument) -> Optional[ConversionResult]:
if in_doc.valid:
pipeline = self._get_pipeline(in_doc)
if pipeline is None: # Can't find a default pipeline. Should this raise?
conv_res = ConversionResult(input=in_doc)
@@ -168,7 +214,9 @@ class DocumentConverter:
return conv_res
conv_res = pipeline.execute(in_doc)
else: # invalid doc or not of desired format
else:
# invalid doc or not of desired format
conv_res = ConversionResult(input=in_doc)
conv_res.status = ConversionStatus.FAILURE
# TODO add error log why it failed.

View File

@@ -109,7 +109,7 @@ class PaginatedModelPipeline(BaseModelPipeline): # TODO this is a bad name.
f"Encountered an error during conversion of document {in_doc.document_hash}:\n"
f"{trace}"
)
raise e
raise e # TODO Debug, should not be here.
finally:
# Always unload the PDF backend, even in case of failure
if in_doc._backend: