fix: use new resolve_source_to_x functions to avoid tempfile leftovers (#490)

use new resolve_source_to_x functions

Signed-off-by: Michele Dolfi <dol@zurich.ibm.com>
This commit is contained in:
Michele Dolfi 2024-12-03 09:28:52 +01:00 committed by Michele Dolfi
parent 72e76512bd
commit a762c8394e
2 changed files with 88 additions and 84 deletions

View File

@ -2,6 +2,7 @@ import importlib
import json import json
import logging import logging
import re import re
import tempfile
import time import time
import warnings import warnings
from enum import Enum from enum import Enum
@ -9,7 +10,7 @@ from pathlib import Path
from typing import Annotated, Dict, Iterable, List, Optional, Type from typing import Annotated, Dict, Iterable, List, Optional, Type
import typer import typer
from docling_core.utils.file import resolve_file_source from docling_core.utils.file import resolve_source_to_path
from docling.backend.docling_parse_backend import DoclingParseDocumentBackend from docling.backend.docling_parse_backend import DoclingParseDocumentBackend
from docling.backend.docling_parse_v2_backend import DoclingParseV2DocumentBackend from docling.backend.docling_parse_v2_backend import DoclingParseV2DocumentBackend
@ -256,95 +257,98 @@ def convert(
if from_formats is None: if from_formats is None:
from_formats = [e for e in InputFormat] from_formats = [e for e in InputFormat]
input_doc_paths: List[Path] = [] with tempfile.TemporaryDirectory() as tempdir:
for src in input_sources: input_doc_paths: List[Path] = []
source = resolve_file_source(source=src) for src in input_sources:
if not source.exists(): source = resolve_source_to_path(source=src, workdir=Path(tempdir))
err_console.print( if not source.exists():
f"[red]Error: The input file {source} does not exist.[/red]" err_console.print(
) f"[red]Error: The input file {source} does not exist.[/red]"
raise typer.Abort() )
elif source.is_dir(): raise typer.Abort()
for fmt in from_formats: elif source.is_dir():
for ext in FormatToExtensions[fmt]: for fmt in from_formats:
input_doc_paths.extend(list(source.glob(f"**/*.{ext}"))) for ext in FormatToExtensions[fmt]:
input_doc_paths.extend(list(source.glob(f"**/*.{ext.upper()}"))) input_doc_paths.extend(list(source.glob(f"**/*.{ext}")))
input_doc_paths.extend(list(source.glob(f"**/*.{ext.upper()}")))
else:
input_doc_paths.append(source)
if to_formats is None:
to_formats = [OutputFormat.MARKDOWN]
export_json = OutputFormat.JSON in to_formats
export_md = OutputFormat.MARKDOWN in to_formats
export_txt = OutputFormat.TEXT in to_formats
export_doctags = OutputFormat.DOCTAGS in to_formats
if ocr_engine == OcrEngine.EASYOCR:
ocr_options: OcrOptions = EasyOcrOptions(force_full_page_ocr=force_ocr)
elif ocr_engine == OcrEngine.TESSERACT_CLI:
ocr_options = TesseractCliOcrOptions(force_full_page_ocr=force_ocr)
elif ocr_engine == OcrEngine.TESSERACT:
ocr_options = TesseractOcrOptions(force_full_page_ocr=force_ocr)
elif ocr_engine == OcrEngine.OCRMAC:
ocr_options = OcrMacOptions(force_full_page_ocr=force_ocr)
elif ocr_engine == OcrEngine.RAPIDOCR:
ocr_options = RapidOcrOptions(force_full_page_ocr=force_ocr)
else: else:
input_doc_paths.append(source) raise RuntimeError(f"Unexpected OCR engine type {ocr_engine}")
if to_formats is None: ocr_lang_list = _split_list(ocr_lang)
to_formats = [OutputFormat.MARKDOWN] if ocr_lang_list is not None:
ocr_options.lang = ocr_lang_list
export_json = OutputFormat.JSON in to_formats pipeline_options = PdfPipelineOptions(
export_md = OutputFormat.MARKDOWN in to_formats do_ocr=ocr,
export_txt = OutputFormat.TEXT in to_formats ocr_options=ocr_options,
export_doctags = OutputFormat.DOCTAGS in to_formats do_table_structure=True,
if ocr_engine == OcrEngine.EASYOCR:
ocr_options: OcrOptions = EasyOcrOptions(force_full_page_ocr=force_ocr)
elif ocr_engine == OcrEngine.TESSERACT_CLI:
ocr_options = TesseractCliOcrOptions(force_full_page_ocr=force_ocr)
elif ocr_engine == OcrEngine.TESSERACT:
ocr_options = TesseractOcrOptions(force_full_page_ocr=force_ocr)
elif ocr_engine == OcrEngine.OCRMAC:
ocr_options = OcrMacOptions(force_full_page_ocr=force_ocr)
elif ocr_engine == OcrEngine.RAPIDOCR:
ocr_options = RapidOcrOptions(force_full_page_ocr=force_ocr)
else:
raise RuntimeError(f"Unexpected OCR engine type {ocr_engine}")
ocr_lang_list = _split_list(ocr_lang)
if ocr_lang_list is not None:
ocr_options.lang = ocr_lang_list
pipeline_options = PdfPipelineOptions(
do_ocr=ocr,
ocr_options=ocr_options,
do_table_structure=True,
)
pipeline_options.table_structure_options.do_cell_matching = True # do_cell_matching
pipeline_options.table_structure_options.mode = table_mode
if artifacts_path is not None:
pipeline_options.artifacts_path = artifacts_path
if pdf_backend == PdfBackend.DLPARSE_V1:
backend: Type[PdfDocumentBackend] = DoclingParseDocumentBackend
elif pdf_backend == PdfBackend.DLPARSE_V2:
backend = DoclingParseV2DocumentBackend
elif pdf_backend == PdfBackend.PYPDFIUM2:
backend = PyPdfiumDocumentBackend
else:
raise RuntimeError(f"Unexpected PDF backend type {pdf_backend}")
format_options: Dict[InputFormat, FormatOption] = {
InputFormat.PDF: PdfFormatOption(
pipeline_options=pipeline_options,
backend=backend, # pdf_backend
) )
} pipeline_options.table_structure_options.do_cell_matching = (
doc_converter = DocumentConverter( True # do_cell_matching
allowed_formats=from_formats, )
format_options=format_options, pipeline_options.table_structure_options.mode = table_mode
)
start_time = time.time() if artifacts_path is not None:
pipeline_options.artifacts_path = artifacts_path
conv_results = doc_converter.convert_all( if pdf_backend == PdfBackend.DLPARSE_V1:
input_doc_paths, raises_on_error=abort_on_error backend: Type[PdfDocumentBackend] = DoclingParseDocumentBackend
) elif pdf_backend == PdfBackend.DLPARSE_V2:
backend = DoclingParseV2DocumentBackend
elif pdf_backend == PdfBackend.PYPDFIUM2:
backend = PyPdfiumDocumentBackend
else:
raise RuntimeError(f"Unexpected PDF backend type {pdf_backend}")
output.mkdir(parents=True, exist_ok=True) format_options: Dict[InputFormat, FormatOption] = {
export_documents( InputFormat.PDF: PdfFormatOption(
conv_results, pipeline_options=pipeline_options,
output_dir=output, backend=backend, # pdf_backend
export_json=export_json, )
export_md=export_md, }
export_txt=export_txt, doc_converter = DocumentConverter(
export_doctags=export_doctags, allowed_formats=from_formats,
) format_options=format_options,
)
end_time = time.time() - start_time start_time = time.time()
conv_results = doc_converter.convert_all(
input_doc_paths, raises_on_error=abort_on_error
)
output.mkdir(parents=True, exist_ok=True)
export_documents(
conv_results,
output_dir=output,
export_json=export_json,
export_md=export_md,
export_txt=export_txt,
export_doctags=export_doctags,
)
end_time = time.time() - start_time
_log.info(f"All documents were converted in {end_time:.2f} seconds.") _log.info(f"All documents were converted in {end_time:.2f} seconds.")

View File

@ -32,7 +32,7 @@ from docling_core.types.legacy_doc.document import (
) )
from docling_core.types.legacy_doc.document import CCSFileInfoObject as DsFileInfoObject from docling_core.types.legacy_doc.document import CCSFileInfoObject as DsFileInfoObject
from docling_core.types.legacy_doc.document import ExportedCCSDocument as DsDocument from docling_core.types.legacy_doc.document import ExportedCCSDocument as DsDocument
from docling_core.utils.file import resolve_file_source from docling_core.utils.file import resolve_source_to_stream
from pydantic import BaseModel from pydantic import BaseModel
from typing_extensions import deprecated from typing_extensions import deprecated
@ -459,7 +459,7 @@ class _DocumentConversionInput(BaseModel):
self, format_options: Dict[InputFormat, "FormatOption"] self, format_options: Dict[InputFormat, "FormatOption"]
) -> Iterable[InputDocument]: ) -> Iterable[InputDocument]:
for item in self.path_or_stream_iterator: for item in self.path_or_stream_iterator:
obj = resolve_file_source(item) if isinstance(item, str) else item obj = resolve_source_to_stream(item) if isinstance(item, str) else item
format = self._guess_format(obj) format = self._guess_format(obj)
if format not in format_options.keys(): if format not in format_options.keys():
_log.info( _log.info(