diff --git a/docling/datamodel/pipeline_options.py b/docling/datamodel/pipeline_options.py index 06169fb8..36f26fef 100644 --- a/docling/datamodel/pipeline_options.py +++ b/docling/datamodel/pipeline_options.py @@ -332,3 +332,18 @@ class ProcessingPipeline(str, Enum): STANDARD = "standard" VLM = "vlm" ASR = "asr" + + +class ThreadedPdfPipelineOptions(PdfPipelineOptions): + """Pipeline options for the threaded PDF pipeline with batching and backpressure control""" + + # Batch sizes for different stages + ocr_batch_size: int = 4 + layout_batch_size: int = 4 + table_batch_size: int = 4 + + # Timing control + batch_timeout_seconds: float = 2.0 + + # Backpressure and queue control + queue_max_size: int = 100 diff --git a/docling/datamodel/settings.py b/docling/datamodel/settings.py index 6cfc953b..d4a9bdce 100644 --- a/docling/datamodel/settings.py +++ b/docling/datamodel/settings.py @@ -26,18 +26,13 @@ class DocumentLimits(BaseModel): class BatchConcurrencySettings(BaseModel): - doc_batch_size: int = 2 - doc_batch_concurrency: int = 2 - page_batch_size: int = 4 - page_batch_concurrency: int = 2 - elements_batch_size: int = 16 - - # doc_batch_size: int = 1 - # doc_batch_concurrency: int = 1 - # page_batch_size: int = 1 - # page_batch_concurrency: int = 1 - - # model_concurrency: int = 2 + doc_batch_size: int = 1 # Number of documents processed in one batch. Should be >= doc_batch_concurrency + doc_batch_concurrency: int = 1 # Number of parallel threads processing documents. Warning: Experimental! No benefit expected without free-threaded python. + page_batch_size: int = 4 # Number of pages processed in one batch. + page_batch_concurrency: int = 1 # Currently unused. + elements_batch_size: int = ( + 16 # Number of elements processed in one batch, in enrichment models. + ) # To force models into single core: export OMP_NUM_THREADS=1 diff --git a/docling/document_converter.py b/docling/document_converter.py index f3bcb89e..5ad19c6d 100644 --- a/docling/document_converter.py +++ b/docling/document_converter.py @@ -4,6 +4,7 @@ import sys import threading import time from collections.abc import Iterable, Iterator +from concurrent.futures import ThreadPoolExecutor from functools import partial from pathlib import Path from typing import Dict, List, Optional, Tuple, Type, Union @@ -284,24 +285,33 @@ class DocumentConverter: settings.perf.doc_batch_size, # pass format_options ): _log.info("Going to convert document batch...") + process_func = partial( + self._process_document, raises_on_error=raises_on_error + ) - # parallel processing only within input_batch - # with ThreadPoolExecutor( - # max_workers=settings.perf.doc_batch_concurrency - # ) as pool: - # yield from pool.map(self.process_document, input_batch) - # Note: PDF backends are not thread-safe, thread pool usage was disabled. - - for item in map( - partial(self._process_document, raises_on_error=raises_on_error), - input_batch, + if ( + settings.perf.doc_batch_concurrency > 1 + and settings.perf.doc_batch_size > 1 ): - elapsed = time.monotonic() - start_time - start_time = time.monotonic() - _log.info( - f"Finished converting document {item.input.file.name} in {elapsed:.2f} sec." - ) - yield item + with ThreadPoolExecutor( + max_workers=settings.perf.doc_batch_concurrency + ) as pool: + for item in pool.map( + process_func, + input_batch, + ): + yield item + else: + for item in map( + process_func, + input_batch, + ): + elapsed = time.monotonic() - start_time + start_time = time.monotonic() + _log.info( + f"Finished converting document {item.input.file.name} in {elapsed:.2f} sec." + ) + yield item def _get_pipeline(self, doc_format: InputFormat) -> Optional[BasePipeline]: """Retrieve or initialize a pipeline, reusing instances based on class and options.""" @@ -330,7 +340,7 @@ class DocumentConverter: f"Reusing cached pipeline for {pipeline_class.__name__} with options hash {options_hash}" ) - return self.initialized_pipelines[cache_key] + return self.initialized_pipelines[cache_key] def _process_document( self, in_doc: InputDocument, raises_on_error: bool diff --git a/docling/models/layout_model.py b/docling/models/layout_model.py index 05a86f31..2ed30c5d 100644 --- a/docling/models/layout_model.py +++ b/docling/models/layout_model.py @@ -3,7 +3,7 @@ import logging import warnings from collections.abc import Iterable from pathlib import Path -from typing import Optional +from typing import List, Optional, Union import numpy as np from docling_core.types.doc import DocItemLabel @@ -148,72 +148,90 @@ class LayoutModel(BasePageModel): def __call__( self, conv_res: ConversionResult, page_batch: Iterable[Page] ) -> Iterable[Page]: - for page in page_batch: + # Convert to list to allow multiple iterations + pages = list(page_batch) + + # Separate valid and invalid pages + valid_pages = [] + valid_page_images: List[Union[Image.Image, np.ndarray]] = [] + + for page in pages: + assert page._backend is not None + if not page._backend.is_valid(): + continue + + assert page.size is not None + page_image = page.get_image(scale=1.0) + assert page_image is not None + + valid_pages.append(page) + valid_page_images.append(page_image) + + # Process all valid pages with batch prediction + batch_predictions = [] + if valid_page_images: + with TimeRecorder(conv_res, "layout"): + batch_predictions = self.layout_predictor.predict_batch( # type: ignore[attr-defined] + valid_page_images + ) + + # Process each page with its predictions + valid_page_idx = 0 + for page in pages: assert page._backend is not None if not page._backend.is_valid(): yield page - else: - with TimeRecorder(conv_res, "layout"): - assert page.size is not None - page_image = page.get_image(scale=1.0) - assert page_image is not None + continue - clusters = [] - for ix, pred_item in enumerate( - self.layout_predictor.predict(page_image) - ): - label = DocItemLabel( - pred_item["label"] - .lower() - .replace(" ", "_") - .replace("-", "_") - ) # Temporary, until docling-ibm-model uses docling-core types - cluster = Cluster( - id=ix, - label=label, - confidence=pred_item["confidence"], - bbox=BoundingBox.model_validate(pred_item), - cells=[], - ) - clusters.append(cluster) + page_predictions = batch_predictions[valid_page_idx] + valid_page_idx += 1 - if settings.debug.visualize_raw_layout: - self.draw_clusters_and_cells_side_by_side( - conv_res, page, clusters, mode_prefix="raw" - ) + clusters = [] + for ix, pred_item in enumerate(page_predictions): + label = DocItemLabel( + pred_item["label"].lower().replace(" ", "_").replace("-", "_") + ) # Temporary, until docling-ibm-model uses docling-core types + cluster = Cluster( + id=ix, + label=label, + confidence=pred_item["confidence"], + bbox=BoundingBox.model_validate(pred_item), + cells=[], + ) + clusters.append(cluster) - # Apply postprocessing + if settings.debug.visualize_raw_layout: + self.draw_clusters_and_cells_side_by_side( + conv_res, page, clusters, mode_prefix="raw" + ) - processed_clusters, processed_cells = LayoutPostprocessor( - page, clusters, self.options - ).postprocess() - # Note: LayoutPostprocessor updates page.cells and page.parsed_page internally + # Apply postprocessing + processed_clusters, processed_cells = LayoutPostprocessor( + page, clusters, self.options + ).postprocess() + # Note: LayoutPostprocessor updates page.cells and page.parsed_page internally - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", - "Mean of empty slice|invalid value encountered in scalar divide", - RuntimeWarning, - "numpy", - ) + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", + "Mean of empty slice|invalid value encountered in scalar divide", + RuntimeWarning, + "numpy", + ) - conv_res.confidence.pages[page.page_no].layout_score = float( - np.mean([c.confidence for c in processed_clusters]) - ) + conv_res.confidence.pages[page.page_no].layout_score = float( + np.mean([c.confidence for c in processed_clusters]) + ) - conv_res.confidence.pages[page.page_no].ocr_score = float( - np.mean( - [c.confidence for c in processed_cells if c.from_ocr] - ) - ) + conv_res.confidence.pages[page.page_no].ocr_score = float( + np.mean([c.confidence for c in processed_cells if c.from_ocr]) + ) - page.predictions.layout = LayoutPrediction( - clusters=processed_clusters - ) + page.predictions.layout = LayoutPrediction(clusters=processed_clusters) - if settings.debug.visualize_layout: - self.draw_clusters_and_cells_side_by_side( - conv_res, page, processed_clusters, mode_prefix="postprocessed" - ) + if settings.debug.visualize_layout: + self.draw_clusters_and_cells_side_by_side( + conv_res, page, processed_clusters, mode_prefix="postprocessed" + ) - yield page + yield page diff --git a/docling/pipeline/threaded_standard_pdf_pipeline.py b/docling/pipeline/threaded_standard_pdf_pipeline.py new file mode 100644 index 00000000..964a8d42 --- /dev/null +++ b/docling/pipeline/threaded_standard_pdf_pipeline.py @@ -0,0 +1,606 @@ +# threaded_standard_pdf_pipeline.py +# pylint: disable=too-many-lines,invalid-name +"""Thread-safe, production-ready PDF pipeline +================================================ +A self-contained, thread-safe PDF conversion pipeline exploiting parallelism between pipeline stages and models. + +* **Per-run isolation** - every :py:meth:`execute` call uses its own bounded queues and worker + threads so that concurrent invocations never share mutable state. +* **Deterministic run identifiers** - pages are tracked with an internal *run-id* instead of + relying on :pyfunc:`id`, which may clash after garbage collection. +* **Explicit back-pressure & shutdown** - producers block on full queues; queue *close()* + propagates downstream so stages terminate deterministically without sentinels. +* **Minimal shared state** - heavyweight models are initialised once per pipeline instance + and only read by worker threads; no runtime mutability is exposed. +* **Strict typing & clean API usage** - code is fully annotated and respects *coding_rules.md*. +""" + +from __future__ import annotations + +import itertools +import logging +import threading +import time +from collections import defaultdict, deque +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Iterable, List, Optional, Sequence, Tuple + +from docling.backend.abstract_backend import AbstractDocumentBackend +from docling.backend.pdf_backend import PdfDocumentBackend +from docling.datamodel.base_models import AssembledUnit, ConversionStatus, Page +from docling.datamodel.document import ConversionResult +from docling.datamodel.pipeline_options import ThreadedPdfPipelineOptions +from docling.datamodel.settings import settings +from docling.models.code_formula_model import CodeFormulaModel, CodeFormulaModelOptions +from docling.models.document_picture_classifier import ( + DocumentPictureClassifier, + DocumentPictureClassifierOptions, +) +from docling.models.factories import get_ocr_factory, get_picture_description_factory +from docling.models.layout_model import LayoutModel +from docling.models.page_assemble_model import PageAssembleModel, PageAssembleOptions +from docling.models.page_preprocessing_model import ( + PagePreprocessingModel, + PagePreprocessingOptions, +) +from docling.models.picture_description_base_model import PictureDescriptionBaseModel +from docling.models.readingorder_model import ReadingOrderModel, ReadingOrderOptions +from docling.models.table_structure_model import TableStructureModel +from docling.pipeline.base_pipeline import BasePipeline +from docling.utils.profiling import ProfilingScope, TimeRecorder +from docling.utils.utils import chunkify + +_log = logging.getLogger(__name__) + +# ────────────────────────────────────────────────────────────────────────────── +# Helper data structures +# ────────────────────────────────────────────────────────────────────────────── + + +@dataclass +class ThreadedItem: + """Envelope that travels between pipeline stages.""" + + payload: Optional[Page] + run_id: int # Unique per *execute* call, monotonic across pipeline instance + page_no: int + conv_res: ConversionResult + error: Optional[Exception] = None + is_failed: bool = False + + +@dataclass +class ProcessingResult: + """Aggregated outcome of a pipeline run.""" + + pages: List[Page] = field(default_factory=list) + failed_pages: List[Tuple[int, Exception]] = field(default_factory=list) + total_expected: int = 0 + + @property + def success_count(self) -> int: + return len(self.pages) + + @property + def failure_count(self) -> int: + return len(self.failed_pages) + + @property + def is_partial_success(self) -> bool: + return 0 < self.success_count < self.total_expected + + @property + def is_complete_failure(self) -> bool: + return self.success_count == 0 and self.failure_count > 0 + + +class ThreadedQueue: + """Bounded queue with blocking put/ get_batch and explicit *close()* semantics.""" + + __slots__ = ("_closed", "_items", "_lock", "_max", "_not_empty", "_not_full") + + def __init__(self, max_size: int) -> None: + self._max: int = max_size + self._items: deque[ThreadedItem] = deque() + self._lock = threading.Lock() + self._not_full = threading.Condition(self._lock) + self._not_empty = threading.Condition(self._lock) + self._closed = False + + # ---------------------------------------------------------------- put() + def put(self, item: ThreadedItem, timeout: Optional[float] | None = None) -> bool: + """Block until queue accepts *item* or is closed. Returns *False* if closed.""" + with self._not_full: + if self._closed: + return False + start = time.monotonic() + while len(self._items) >= self._max and not self._closed: + if timeout is not None: + remaining = timeout - (time.monotonic() - start) + if remaining <= 0: + return False + self._not_full.wait(remaining) + else: + self._not_full.wait() + if self._closed: + return False + self._items.append(item) + self._not_empty.notify() + return True + + # ------------------------------------------------------------ get_batch() + def get_batch( + self, size: int, timeout: Optional[float] | None = None + ) -> List[ThreadedItem]: + """Return up to *size* items. Blocks until ≥1 item present or queue closed/timeout.""" + with self._not_empty: + start = time.monotonic() + while not self._items and not self._closed: + if timeout is not None: + remaining = timeout - (time.monotonic() - start) + if remaining <= 0: + return [] + self._not_empty.wait(remaining) + else: + self._not_empty.wait() + batch: List[ThreadedItem] = [] + while self._items and len(batch) < size: + batch.append(self._items.popleft()) + if batch: + self._not_full.notify_all() + return batch + + # ---------------------------------------------------------------- close() + def close(self) -> None: + with self._lock: + self._closed = True + self._not_empty.notify_all() + self._not_full.notify_all() + + # -------------------------------------------------------------- property + @property + def closed(self) -> bool: + return self._closed + + +class ThreadedPipelineStage: + """A single pipeline stage backed by one worker thread.""" + + def __init__( + self, + *, + name: str, + model: Any, + batch_size: int, + batch_timeout: float, + queue_max_size: int, + ) -> None: + self.name = name + self.model = model + self.batch_size = batch_size + self.batch_timeout = batch_timeout + self.input_queue = ThreadedQueue(queue_max_size) + self._outputs: list[ThreadedQueue] = [] + self._thread: Optional[threading.Thread] = None + self._running = False + + # ---------------------------------------------------------------- wiring + def add_output_queue(self, q: ThreadedQueue) -> None: + self._outputs.append(q) + + # -------------------------------------------------------------- lifecycle + def start(self) -> None: + if self._running: + return + self._running = True + self._thread = threading.Thread( + target=self._run, name=f"Stage-{self.name}", daemon=False + ) + self._thread.start() + + def stop(self) -> None: + if not self._running: + return + self._running = False + self.input_queue.close() + if self._thread is not None: + self._thread.join(timeout=30.0) + if self._thread.is_alive(): + _log.warning("Stage %s did not terminate cleanly within 30s", self.name) + + # ------------------------------------------------------------------ _run + def _run(self) -> None: + try: + while self._running: + batch = self.input_queue.get_batch(self.batch_size, self.batch_timeout) + if not batch and self.input_queue.closed: + break + processed = self._process_batch(batch) + self._emit(processed) + except Exception: # pragma: no cover - top-level guard + _log.exception("Fatal error in stage %s", self.name) + finally: + for q in self._outputs: + q.close() + + # ----------------------------------------------------- _process_batch() + def _process_batch(self, batch: Sequence[ThreadedItem]) -> list[ThreadedItem]: + """Run *model* on *batch* grouped by run_id to maximise batching.""" + groups: dict[int, list[ThreadedItem]] = defaultdict(list) + for itm in batch: + groups[itm.run_id].append(itm) + + result: list[ThreadedItem] = [] + for rid, items in groups.items(): + good: list[ThreadedItem] = [i for i in items if not i.is_failed] + if not good: + result.extend(items) + continue + try: + # Filter out None payloads and ensure type safety + pages_with_payloads = [ + (i, i.payload) for i in good if i.payload is not None + ] + if len(pages_with_payloads) != len(good): + # Some items have None payloads, mark all as failed + for it in items: + it.is_failed = True + it.error = RuntimeError("Page payload is None") + result.extend(items) + continue + + pages: List[Page] = [payload for _, payload in pages_with_payloads] + processed_pages = list(self.model(good[0].conv_res, pages)) # type: ignore[arg-type] + if len(processed_pages) != len(pages): # strict mismatch guard + raise RuntimeError( + f"Model {self.name} returned wrong number of pages" + ) + for idx, page in enumerate(processed_pages): + result.append( + ThreadedItem( + payload=page, + run_id=rid, + page_no=good[idx].page_no, + conv_res=good[idx].conv_res, + ) + ) + except Exception as exc: + _log.error("Stage %s failed for run %d: %s", self.name, rid, exc) + for it in items: + it.is_failed = True + it.error = exc + result.extend(items) + return result + + # -------------------------------------------------------------- _emit() + def _emit(self, items: Iterable[ThreadedItem]) -> None: + for item in items: + for q in self._outputs: + if not q.put(item): + _log.error("Output queue closed while emitting from %s", self.name) + + +@dataclass +class RunContext: + """Wiring for a single *execute* call.""" + + stages: list[ThreadedPipelineStage] + first_stage: ThreadedPipelineStage + output_queue: ThreadedQueue + + +# ────────────────────────────────────────────────────────────────────────────── +# Main pipeline +# ────────────────────────────────────────────────────────────────────────────── + + +class ThreadedStandardPdfPipeline(BasePipeline): + """High-performance PDF pipeline with multi-threaded stages.""" + + def __init__(self, pipeline_options: ThreadedPdfPipelineOptions) -> None: + super().__init__(pipeline_options) + self.pipeline_options: ThreadedPdfPipelineOptions = pipeline_options + self._run_seq = itertools.count(1) # deterministic, monotonic run ids + + # initialise heavy models once + self._init_models() + + # ──────────────────────────────────────────────────────────────────────── + # Heavy-model initialisation & helpers + # ──────────────────────────────────────────────────────────────────────── + + def _init_models(self) -> None: + art_path = self._resolve_artifacts_path() + self.keep_images = ( + self.pipeline_options.generate_page_images + or self.pipeline_options.generate_picture_images + or self.pipeline_options.generate_table_images + ) + self.preprocessing_model = PagePreprocessingModel( + options=PagePreprocessingOptions( + images_scale=self.pipeline_options.images_scale + ) + ) + self.ocr_model = self._make_ocr_model(art_path) + self.layout_model = LayoutModel( + artifacts_path=art_path, + accelerator_options=self.pipeline_options.accelerator_options, + options=self.pipeline_options.layout_options, + ) + self.table_model = TableStructureModel( + enabled=self.pipeline_options.do_table_structure, + artifacts_path=art_path, + options=self.pipeline_options.table_structure_options, + accelerator_options=self.pipeline_options.accelerator_options, + ) + self.assemble_model = PageAssembleModel(options=PageAssembleOptions()) + self.reading_order_model = ReadingOrderModel(options=ReadingOrderOptions()) + + # --- optional enrichment ------------------------------------------------ + self.enrichment_pipe = [] + code_formula = CodeFormulaModel( + enabled=self.pipeline_options.do_code_enrichment + or self.pipeline_options.do_formula_enrichment, + artifacts_path=art_path, + options=CodeFormulaModelOptions( + do_code_enrichment=self.pipeline_options.do_code_enrichment, + do_formula_enrichment=self.pipeline_options.do_formula_enrichment, + ), + accelerator_options=self.pipeline_options.accelerator_options, + ) + if code_formula.enabled: + self.enrichment_pipe.append(code_formula) + + picture_classifier = DocumentPictureClassifier( + enabled=self.pipeline_options.do_picture_classification, + artifacts_path=art_path, + options=DocumentPictureClassifierOptions(), + accelerator_options=self.pipeline_options.accelerator_options, + ) + if picture_classifier.enabled: + self.enrichment_pipe.append(picture_classifier) + + picture_descr = self._make_picture_description_model(art_path) + if picture_descr and picture_descr.enabled: + self.enrichment_pipe.append(picture_descr) + + self.keep_backend = any( + ( + self.pipeline_options.do_formula_enrichment, + self.pipeline_options.do_code_enrichment, + self.pipeline_options.do_picture_classification, + self.pipeline_options.do_picture_description, + ) + ) + + # ---------------------------------------------------------------- helpers + def _resolve_artifacts_path(self) -> Optional[Path]: + if self.pipeline_options.artifacts_path: + p = Path(self.pipeline_options.artifacts_path).expanduser() + elif settings.artifacts_path: + p = Path(settings.artifacts_path).expanduser() + else: + return None + if not p.is_dir(): + raise RuntimeError( + f"{p} does not exist or is not a directory containing the required models" + ) + return p + + def _make_ocr_model(self, art_path: Optional[Path]) -> Any: + factory = get_ocr_factory( + allow_external_plugins=self.pipeline_options.allow_external_plugins + ) + return factory.create_instance( + options=self.pipeline_options.ocr_options, + enabled=self.pipeline_options.do_ocr, + artifacts_path=art_path, + accelerator_options=self.pipeline_options.accelerator_options, + ) + + def _make_picture_description_model( + self, art_path: Optional[Path] + ) -> Optional[PictureDescriptionBaseModel]: + factory = get_picture_description_factory( + allow_external_plugins=self.pipeline_options.allow_external_plugins + ) + return factory.create_instance( + options=self.pipeline_options.picture_description_options, + enabled=self.pipeline_options.do_picture_description, + enable_remote_services=self.pipeline_options.enable_remote_services, + artifacts_path=art_path, + accelerator_options=self.pipeline_options.accelerator_options, + ) + + # ──────────────────────────────────────────────────────────────────────── + # Build - thread pipeline + # ──────────────────────────────────────────────────────────────────────── + + def _create_run_ctx(self) -> RunContext: + opts = self.pipeline_options + preprocess = ThreadedPipelineStage( + name="preprocess", + model=self.preprocessing_model, + batch_size=1, + batch_timeout=opts.batch_timeout_seconds, + queue_max_size=opts.queue_max_size, + ) + ocr = ThreadedPipelineStage( + name="ocr", + model=self.ocr_model, + batch_size=opts.ocr_batch_size, + batch_timeout=opts.batch_timeout_seconds, + queue_max_size=opts.queue_max_size, + ) + layout = ThreadedPipelineStage( + name="layout", + model=self.layout_model, + batch_size=opts.layout_batch_size, + batch_timeout=opts.batch_timeout_seconds, + queue_max_size=opts.queue_max_size, + ) + table = ThreadedPipelineStage( + name="table", + model=self.table_model, + batch_size=opts.table_batch_size, + batch_timeout=opts.batch_timeout_seconds, + queue_max_size=opts.queue_max_size, + ) + assemble = ThreadedPipelineStage( + name="assemble", + model=self.assemble_model, + batch_size=1, + batch_timeout=opts.batch_timeout_seconds, + queue_max_size=opts.queue_max_size, + ) + + # wire stages + output_q = ThreadedQueue(opts.queue_max_size) + preprocess.add_output_queue(ocr.input_queue) + ocr.add_output_queue(layout.input_queue) + layout.add_output_queue(table.input_queue) + table.add_output_queue(assemble.input_queue) + assemble.add_output_queue(output_q) + + stages = [preprocess, ocr, layout, table, assemble] + return RunContext(stages=stages, first_stage=preprocess, output_queue=output_q) + + # --------------------------------------------------------------------- build + def _build_document(self, conv_res: ConversionResult) -> ConversionResult: # type: ignore[override] + """Stream-build the document while interleaving producer and consumer work.""" + run_id = next(self._run_seq) + assert isinstance(conv_res.input._backend, PdfDocumentBackend) + backend = conv_res.input._backend + + # preload & initialise pages ------------------------------------------------------------- + start_page, end_page = conv_res.input.limits.page_range + pages: list[Page] = [] + for i in range(conv_res.input.page_count): + if start_page - 1 <= i <= end_page - 1: + page = Page(page_no=i) + page._backend = backend.load_page(i) + if page._backend and page._backend.is_valid(): + page.size = page._backend.get_size() + conv_res.pages.append(page) + pages.append(page) + + if not pages: + conv_res.status = ConversionStatus.FAILURE + return conv_res + + total_pages: int = len(pages) + ctx: RunContext = self._create_run_ctx() + for st in ctx.stages: + st.start() + + proc = ProcessingResult(total_expected=total_pages) + fed_idx: int = 0 # number of pages successfully queued + batch_size: int = 32 # drain chunk + try: + while proc.success_count + proc.failure_count < total_pages: + # 1) feed - try to enqueue until the first queue is full + while fed_idx < total_pages: + ok = ctx.first_stage.input_queue.put( + ThreadedItem( + payload=pages[fed_idx], + run_id=run_id, + page_no=pages[fed_idx].page_no, + conv_res=conv_res, + ), + timeout=0.0, # non-blocking try-put + ) + if ok: + fed_idx += 1 + if fed_idx == total_pages: + ctx.first_stage.input_queue.close() + else: # queue full - switch to draining + break + + # 2) drain - pull whatever is ready from the output side + out_batch = ctx.output_queue.get_batch(batch_size, timeout=0.05) + for itm in out_batch: + if itm.run_id != run_id: + continue + if itm.is_failed or itm.error: + proc.failed_pages.append( + (itm.page_no, itm.error or RuntimeError("unknown error")) + ) + else: + assert itm.payload is not None + proc.pages.append(itm.payload) + + # 3) failure safety - downstream closed early -> mark missing pages failed + if not out_batch and ctx.output_queue.closed: + missing = total_pages - (proc.success_count + proc.failure_count) + if missing > 0: + proc.failed_pages.extend( + [(-1, RuntimeError("pipeline terminated early"))] * missing + ) + break + finally: + for st in ctx.stages: + st.stop() + ctx.output_queue.close() + + self._integrate_results(conv_res, proc) + return conv_res + + # ---------------------------------------------------- integrate_results() + def _integrate_results( + self, conv_res: ConversionResult, proc: ProcessingResult + ) -> None: + page_map = {p.page_no: p for p in proc.pages} + conv_res.pages = [ + page_map.get(p.page_no, p) + for p in conv_res.pages + if p.page_no in page_map + or not any(fp == p.page_no for fp, _ in proc.failed_pages) + ] + if proc.is_complete_failure: + conv_res.status = ConversionStatus.FAILURE + elif proc.is_partial_success: + conv_res.status = ConversionStatus.PARTIAL_SUCCESS + else: + conv_res.status = ConversionStatus.SUCCESS + if not self.keep_images: + for p in conv_res.pages: + p._image_cache = {} + if not self.keep_backend: + for p in conv_res.pages: + if p._backend is not None: + p._backend.unload() + + # ---------------------------------------------------------------- assemble + def _assemble_document(self, conv_res: ConversionResult) -> ConversionResult: # type: ignore[override] + elements, headers, body = [], [], [] + with TimeRecorder(conv_res, "doc_assemble", scope=ProfilingScope.DOCUMENT): + for p in conv_res.pages: + if p.assembled: + elements.extend(p.assembled.elements) + headers.extend(p.assembled.headers) + body.extend(p.assembled.body) + conv_res.assembled = AssembledUnit( + elements=elements, headers=headers, body=body + ) + conv_res.document = self.reading_order_model(conv_res) + return conv_res + + # ---------------------------------------------------------------- misc + @classmethod + def get_default_options(cls) -> ThreadedPdfPipelineOptions: # type: ignore[override] + return ThreadedPdfPipelineOptions() + + @classmethod + def is_backend_supported(cls, backend: AbstractDocumentBackend) -> bool: # type: ignore[override] + return isinstance(backend, PdfDocumentBackend) + + def _determine_status(self, conv_res: ConversionResult) -> ConversionStatus: # type: ignore[override] + return conv_res.status + + def _unload(self, conv_res: ConversionResult) -> None: # type: ignore[override] + for p in conv_res.pages: + if p._backend is not None: + p._backend.unload() + if conv_res.input._backend: + conv_res.input._backend.unload() diff --git a/pyproject.toml b/pyproject.toml index c472e7c6..dfaa2872 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -150,6 +150,7 @@ package = true default-groups = "all" [tool.uv.sources] +docling-ibm-models = { git = "https://github.com/docling-project/docling-ibm-models.git", rev = "cau/batching-layout-model" } [tool.setuptools.packages.find] include = ["docling*"] diff --git a/tests/test_threaded_pipeline.py b/tests/test_threaded_pipeline.py new file mode 100644 index 00000000..ec6d3427 --- /dev/null +++ b/tests/test_threaded_pipeline.py @@ -0,0 +1,176 @@ +import logging +import time +from pathlib import Path +from typing import List + +import pytest + +from docling.datamodel.base_models import ConversionStatus, InputFormat +from docling.datamodel.document import ConversionResult +from docling.datamodel.pipeline_options import ( + PdfPipelineOptions, + ThreadedPdfPipelineOptions, +) +from docling.document_converter import DocumentConverter, PdfFormatOption +from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline +from docling.pipeline.threaded_standard_pdf_pipeline import ThreadedStandardPdfPipeline + + +def test_threaded_pipeline_multiple_documents(): + """Test threaded pipeline with multiple documents and compare with standard pipeline""" + test_files = [ + "tests/data/pdf/2203.01017v2.pdf", + "tests/data/pdf/2206.01062.pdf", + "tests/data/pdf/2305.03393v1.pdf", + ] + # test_files = [str(f) for f in Path("test/data/pdf").rglob("*.pdf")] + + do_ts = False + do_ocr = False + + run_threaded = True + run_serial = True + + if run_threaded: + # Threaded pipeline + threaded_converter = DocumentConverter( + format_options={ + InputFormat.PDF: PdfFormatOption( + pipeline_cls=ThreadedStandardPdfPipeline, + pipeline_options=ThreadedPdfPipelineOptions( + layout_batch_size=1, + table_batch_size=1, + ocr_batch_size=1, + batch_timeout_seconds=1.0, + do_table_structure=do_ts, + do_ocr=do_ocr, + ), + ) + } + ) + + threaded_converter.initialize_pipeline(InputFormat.PDF) + + # Test threaded pipeline + threaded_success_count = 0 + threaded_failure_count = 0 + start_time = time.perf_counter() + for result in threaded_converter.convert_all(test_files, raises_on_error=True): + print( + "Finished converting document with threaded pipeline:", + result.input.file.name, + ) + if result.status == ConversionStatus.SUCCESS: + threaded_success_count += 1 + else: + threaded_failure_count += 1 + threaded_time = time.perf_counter() - start_time + + del threaded_converter + + print(f"Threaded pipeline: {threaded_time:.2f} seconds") + + if run_serial: + # Standard pipeline + standard_converter = DocumentConverter( + format_options={ + InputFormat.PDF: PdfFormatOption( + pipeline_cls=StandardPdfPipeline, + pipeline_options=PdfPipelineOptions( + do_table_structure=do_ts, + do_ocr=do_ocr, + ), + ) + } + ) + + standard_converter.initialize_pipeline(InputFormat.PDF) + + # Test standard pipeline + standard_success_count = 0 + standard_failure_count = 0 + start_time = time.perf_counter() + for result in standard_converter.convert_all(test_files, raises_on_error=True): + print( + "Finished converting document with standard pipeline:", + result.input.file.name, + ) + if result.status == ConversionStatus.SUCCESS: + standard_success_count += 1 + else: + standard_failure_count += 1 + standard_time = time.perf_counter() - start_time + + del standard_converter + + print(f"Standard pipeline: {standard_time:.2f} seconds") + + # Verify results + if run_threaded and run_serial: + assert standard_success_count == threaded_success_count + assert standard_failure_count == threaded_failure_count + if run_serial: + assert standard_success_count == len(test_files) + assert standard_failure_count == 0 + if run_threaded: + assert threaded_success_count == len(test_files) + assert threaded_failure_count == 0 + + +def test_pipeline_comparison(): + """Compare all three pipeline implementations""" + test_file = "tests/data/pdf/2206.01062.pdf" + + # Sync pipeline + sync_converter = DocumentConverter( + format_options={ + InputFormat.PDF: PdfFormatOption( + pipeline_cls=StandardPdfPipeline, + ) + } + ) + + start_time = time.perf_counter() + sync_results = list(sync_converter.convert_all([test_file])) + sync_time = time.perf_counter() - start_time + + # Threaded pipeline + threaded_converter = DocumentConverter( + format_options={ + InputFormat.PDF: PdfFormatOption( + pipeline_cls=ThreadedStandardPdfPipeline, + pipeline_options=ThreadedPdfPipelineOptions( + layout_batch_size=1, + ocr_batch_size=1, + table_batch_size=1, + ), + ) + } + ) + + start_time = time.perf_counter() + threaded_results = list(threaded_converter.convert_all([test_file])) + threaded_time = time.perf_counter() - start_time + + print("\nPipeline Comparison:") + print(f"Sync pipeline: {sync_time:.2f} seconds") + print(f"Threaded pipeline: {threaded_time:.2f} seconds") + print(f"Speedup: {sync_time / threaded_time:.2f}x") + + # Verify results are equivalent + assert len(sync_results) == len(threaded_results) == 1 + assert ( + sync_results[0].status == threaded_results[0].status == ConversionStatus.SUCCESS + ) + + # Basic content comparison + sync_doc = sync_results[0].document + threaded_doc = threaded_results[0].document + + assert len(sync_doc.pages) == len(threaded_doc.pages) + assert len(sync_doc.texts) == len(threaded_doc.texts) + + +if __name__ == "__main__": + # Run basic performance test + test_pipeline_comparison() diff --git a/uv.lock b/uv.lock index c790638a..30d35235 100644 --- a/uv.lock +++ b/uv.lock @@ -908,7 +908,7 @@ requires-dist = [ { name = "beautifulsoup4", specifier = ">=4.12.3,<5.0.0" }, { name = "certifi", specifier = ">=2024.7.4" }, { name = "docling-core", extras = ["chunking"], specifier = ">=2.42.0,<3.0.0" }, - { name = "docling-ibm-models", specifier = ">=3.6.0,<4" }, + { name = "docling-ibm-models", git = "https://github.com/docling-project/docling-ibm-models.git?rev=cau%2Fbatching-layout-model" }, { name = "docling-parse", specifier = ">=4.0.0,<5.0.0" }, { name = "easyocr", specifier = ">=1.7,<2.0" }, { name = "filetype", specifier = ">=1.2.0,<2.0.0" }, @@ -1009,8 +1009,8 @@ chunking = [ [[package]] name = "docling-ibm-models" -version = "3.8.0" -source = { registry = "https://pypi.org/simple" } +version = "3.8.1" +source = { git = "https://github.com/docling-project/docling-ibm-models.git?rev=cau%2Fbatching-layout-model#aea6bc2cde3495ee5b93d62ac66698f59dc999c1" } dependencies = [ { name = "docling-core" }, { name = "huggingface-hub" }, @@ -1027,10 +1027,6 @@ dependencies = [ { name = "tqdm" }, { name = "transformers" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/52/cd/0ed305a983a3c0c52a2095dc46f91799fa682baa833d33c529ab9c307a49/docling_ibm_models-3.8.0.tar.gz", hash = "sha256:aab73e7f856d7307e7925c5bf0d460d69fc5d7a9e910bd19387b6a4943fcde2d", size = 86118, upload-time = "2025-07-09T08:18:49.323Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/48/17/650e533ae6cc6f1a9a6e1e1e4b8715c46b15b8c56e7f08c8a8a8c03973b0/docling_ibm_models-3.8.0-py3-none-any.whl", hash = "sha256:041293883d1075cb28fe721d598db53fea8c55ab3d5e77e78148df6479ace6d2", size = 86160, upload-time = "2025-07-09T08:18:47.81Z" }, -] [[package]] name = "docling-parse"