diff --git a/docling/document_converter.py b/docling/document_converter.py index 1a0a9d75..e2b57a83 100644 --- a/docling/document_converter.py +++ b/docling/document_converter.py @@ -1,8 +1,10 @@ import hashlib import logging import sys +import threading import time from collections.abc import Iterable, Iterator +from concurrent.futures import ThreadPoolExecutor from functools import partial from pathlib import Path from typing import Dict, List, Optional, Tuple, Type, Union @@ -50,6 +52,9 @@ from docling.utils.utils import chunkify _log = logging.getLogger(__name__) +# Module-level lock for pipeline cache +_pipeline_cache_lock = threading.Lock() + class FormatOption(BaseModel): pipeline_cls: Type[BasePipeline] @@ -284,10 +289,13 @@ class DocumentConverter: _log.info("Going to convert document batch...") # parallel processing only within input_batch - # with ThreadPoolExecutor( + #with ThreadPoolExecutor( # max_workers=settings.perf.doc_batch_concurrency - # ) as pool: - # yield from pool.map(self.process_document, input_batch) + #) as pool: + # yield from pool.map( + # partial(self._process_document, raises_on_error=raises_on_error), + # input_batch, + # ) # Note: PDF backends are not thread-safe, thread pool usage was disabled. for item in map( @@ -315,19 +323,20 @@ class DocumentConverter: # Use a composite key to cache pipelines cache_key = (pipeline_class, options_hash) - if cache_key not in self.initialized_pipelines: - _log.info( - f"Initializing pipeline for {pipeline_class.__name__} with options hash {options_hash}" - ) - self.initialized_pipelines[cache_key] = pipeline_class( - pipeline_options=pipeline_options - ) - else: - _log.debug( - f"Reusing cached pipeline for {pipeline_class.__name__} with options hash {options_hash}" - ) + with _pipeline_cache_lock: + if cache_key not in self.initialized_pipelines: + _log.info( + f"Initializing pipeline for {pipeline_class.__name__} with options hash {options_hash}" + ) + self.initialized_pipelines[cache_key] = pipeline_class( + pipeline_options=pipeline_options + ) + else: + _log.debug( + f"Reusing cached pipeline for {pipeline_class.__name__} with options hash {options_hash}" + ) - return self.initialized_pipelines[cache_key] + return self.initialized_pipelines[cache_key] def _process_document( self, in_doc: InputDocument, raises_on_error: bool diff --git a/docling/pipeline/threaded_standard_pdf_pipeline.py b/docling/pipeline/threaded_standard_pdf_pipeline.py index 6ef4e90b..a3331461 100644 --- a/docling/pipeline/threaded_standard_pdf_pipeline.py +++ b/docling/pipeline/threaded_standard_pdf_pipeline.py @@ -1,3 +1,29 @@ +# threaded_standard_pdf_pipeline.py +# pylint: disable=too-many-lines +""" +Thread-safe, multi-threaded PDF pipeline. + +Key points +---------- +* Heavy models are initialised once per pipeline instance and safely reused across threads. +* Every `execute()` call creates its own `RunContext` with fresh queues and worker threads + so concurrent executions never share mutable state. +* Back-pressure remains intact because every `ThreadedQueue` is bounded (`max_size` from + pipeline options). When a downstream queue is full, upstream stages block on a condition + variable instead of busy-polling. +* Pipeline termination relies on queue closing: + - The producer thread closes its output queue after the last real page is queued. + - Each stage propagates that closure downstream by closing its own output queues + once it has drained its input queue and detected that it is closed. + - The collector finishes when it has received either + * all expected pages, or + * the output queue is closed and empty. +* Per-page errors are captured, propagated, and turned into + `ConversionStatus.PARTIAL_SUCCESS` when appropriate. +""" + +from __future__ import annotations + import logging import threading import time @@ -6,10 +32,9 @@ import weakref from collections import defaultdict, deque from dataclasses import dataclass, field from pathlib import Path -from typing import Any, Dict, Iterable, List, Optional, Protocol, Tuple, Union, cast +from typing import Any, List, Optional, Sequence, Tuple -import numpy as np -from docling_core.types.doc import DocItem, ImageRef, PictureItem, TableItem +from docling_core.types.doc import ImageRef, PictureItem, TableItem from docling.backend.pdf_backend import PdfDocumentBackend from docling.datamodel.base_models import AssembledUnit, ConversionStatus, Page @@ -37,27 +62,30 @@ from docling.utils.utils import chunkify _log = logging.getLogger(__name__) +# ────────────────────────────────────────────────────────────────────────────── +# Helper data structures +# ────────────────────────────────────────────────────────────────────────────── + @dataclass class ThreadedItem: - """Item flowing through the threaded pipeline with document context""" + """Item that moves between pipeline stages.""" - payload: Page + payload: Optional[Page] conv_res_id: int conv_res: ConversionResult page_no: int = -1 error: Optional[Exception] = None is_failed: bool = False - def __post_init__(self): - """Ensure proper initialization of page number""" - if self.page_no == -1 and isinstance(self.payload, Page): + def __post_init__(self) -> None: + if self.page_no == -1 and self.payload is not None: self.page_no = self.payload.page_no @dataclass class ProcessingResult: - """Result of processing with error tracking for partial results""" + """Aggregate of processed and failed pages.""" pages: List[Page] = field(default_factory=list) failed_pages: List[Tuple[int, Exception]] = field(default_factory=list) @@ -82,29 +110,30 @@ class ProcessingResult: @dataclass class ThreadedQueue: - """Thread-safe queue with backpressure control and memory management""" + """Bounded queue with explicit back-pressure and close semantics.""" - max_size: int = 100 - items: deque = field(default_factory=deque) + max_size: int = 512 + items: deque[ThreadedItem] = field(default_factory=deque) lock: threading.Lock = field(default_factory=threading.Lock) not_full: threading.Condition = field(init=False) not_empty: threading.Condition = field(init=False) closed: bool = False - def __post_init__(self): + def __post_init__(self) -> None: self.not_full = threading.Condition(self.lock) self.not_empty = threading.Condition(self.lock) + # ------------------------------------------------------------------ put() def put(self, item: ThreadedItem, timeout: Optional[float] = None) -> bool: - """Put item with backpressure control""" + """Block until the queue has room or is closed. Returns False if closed.""" with self.not_full: if self.closed: return False - start_time = time.time() + start = time.monotonic() while len(self.items) >= self.max_size and not self.closed: if timeout is not None: - remaining = timeout - (time.time() - start_time) + remaining = timeout - (time.monotonic() - start) if remaining <= 0: return False self.not_full.wait(remaining) @@ -118,57 +147,51 @@ class ThreadedQueue: self.not_empty.notify() return True + # ------------------------------------------------------------ get_batch() def get_batch( self, batch_size: int, timeout: Optional[float] = None ) -> List[ThreadedItem]: - """Get a batch of items""" + """Return up to `batch_size` items; may return fewer on timeout/closure.""" with self.not_empty: - start_time = time.time() - - # Wait for at least one item - while len(self.items) == 0 and not self.closed: + start = time.monotonic() + while not self.items and not self.closed: if timeout is not None: - remaining = timeout - (time.time() - start_time) + remaining = timeout - (time.monotonic() - start) if remaining <= 0: return [] self.not_empty.wait(remaining) else: self.not_empty.wait() - # Collect batch batch: List[ThreadedItem] = [] - while len(batch) < batch_size and len(self.items) > 0: + while len(batch) < batch_size and self.items: batch.append(self.items.popleft()) if batch: self.not_full.notify_all() - return batch - def close(self): - """Close the queue and wake up waiting threads""" + # ------------------------------------------------------------------ close + def close(self) -> None: with self.lock: self.closed = True self.not_empty.notify_all() self.not_full.notify_all() - def is_empty(self) -> bool: - with self.lock: - return len(self.items) == 0 - - def size(self) -> int: - with self.lock: - return len(self.items) - - def cleanup(self): - """Clean up resources and clear items""" + # --------------------------------------------------------------- cleanup + def cleanup(self) -> None: with self.lock: self.items.clear() self.closed = True +# ────────────────────────────────────────────────────────────────────────────── +# Pipeline stage +# ────────────────────────────────────────────────────────────────────────────── + + class ThreadedPipelineStage: - """A pipeline stage that processes items using dedicated threads""" + """A processing stage with its own thread, batch size and timeouts.""" def __init__( self, @@ -177,180 +200,177 @@ class ThreadedPipelineStage: batch_size: int, batch_timeout: float, queue_max_size: int, - ): - self.name = name - self.model = model - self.batch_size = batch_size - self.batch_timeout = batch_timeout - self.input_queue = ThreadedQueue(max_size=queue_max_size) + ) -> None: + self.name: str = name + self.model: Any = model + self.batch_size: int = batch_size + self.batch_timeout: float = batch_timeout + self.input_queue: ThreadedQueue = ThreadedQueue(max_size=queue_max_size) self.output_queues: List[ThreadedQueue] = [] - self.running = False - self.thread: Optional[threading.Thread] = None + self._thread: Optional[threading.Thread] = None + self._running: bool = False - def add_output_queue(self, queue: ThreadedQueue): - """Connect this stage to an output queue""" + # ---------------------------------------------------------------- wiring + def add_output_queue(self, queue: ThreadedQueue) -> None: self.output_queues.append(queue) - def start(self): - """Start the stage processing thread""" - if not self.running: - self.running = True - self.thread = threading.Thread(target=self._run, name=f"Stage-{self.name}") - self.thread.daemon = False # Ensure proper shutdown - self.thread.start() + # ----------------------------------------------------------- lifecycle + def start(self) -> None: + if self._running: + return + self._running = True + self._thread = threading.Thread( + target=self._run, name=f"Stage-{self.name}", daemon=False + ) + self._thread.start() - def stop(self): - """Stop the stage processing""" - self.running = False + def stop(self) -> None: + if not self._running: + return + self._running = False self.input_queue.close() - if self.thread: - self.thread.join(timeout=30.0) # Reasonable timeout for shutdown - if self.thread.is_alive(): - _log.warning(f"Stage {self.name} thread did not shutdown gracefully") + if self._thread is not None: + self._thread.join(timeout=30.0) + if self._thread.is_alive(): + _log.warning("Stage %s thread did not shut down in time", self.name) - def _run(self): - """Main processing loop for the stage""" + def cleanup(self) -> None: + self.input_queue.cleanup() + for q in self.output_queues: + q.cleanup() + + # ------------------------------------------------------------------ _run + def _run(self) -> None: try: - while self.running: + while self._running: batch = self.input_queue.get_batch( self.batch_size, timeout=self.batch_timeout ) + # Exit when upstream is finished and queue drained if not batch and self.input_queue.closed: break - if batch: - try: - processed_items = self._process_batch(batch) - self._send_to_outputs(processed_items) - except Exception as e: - _log.error(f"Error in stage {self.name}: {e}", exc_info=True) - # Send failed items downstream for partial processing - failed_items = [] - for item in batch: - item.is_failed = True - item.error = e - failed_items.append(item) - self._send_to_outputs(failed_items) - - except Exception as e: - _log.error(f"Fatal error in stage {self.name}: {e}", exc_info=True) + processed_items = self._process_batch(batch) + self._send_to_outputs(processed_items) + except Exception as exc: # pragma: no cover - safety net + _log.error("Fatal error in stage %s: %s", self.name, exc, exc_info=True) finally: - # Close output queues when done - for queue in self.output_queues: - queue.close() + # Propagate closure to downstream + for q in self.output_queues: + q.close() - def _process_batch(self, batch: List[ThreadedItem]) -> List[ThreadedItem]: - """Process a batch through the model with error handling""" - # Group by document to maintain document integrity - grouped_by_doc = defaultdict(list) - for item in batch: - grouped_by_doc[item.conv_res_id].append(item) + # ------------------------------------------------------- _process_batch + def _process_batch(self, batch: Sequence[ThreadedItem]) -> List[ThreadedItem]: + grouped: dict[int, List[ThreadedItem]] = defaultdict(list) + for itm in batch: + grouped[itm.conv_res_id].append(itm) - processed_items = [] - for conv_res_id, items in grouped_by_doc.items(): + out: List[ThreadedItem] = [] + for conv_res_id, items in grouped.items(): try: - # Filter out already failed items - valid_items = [item for item in items if not item.is_failed] - failed_items = [item for item in items if item.is_failed] + valid = [it for it in items if not it.is_failed] + fail = [it for it in items if it.is_failed] - if valid_items: - conv_res = valid_items[0].conv_res - pages = [item.payload for item in valid_items] + if valid: + conv_res = valid[0].conv_res + pages = [it.payload for it in valid] # type: ignore[arg-type] + assert all(p is not None for p in pages) + processed_pages = list(self.model(conv_res, pages)) # type: ignore[arg-type] - # Process through model - processed_pages = list(self.model(conv_res, pages)) - - # Re-wrap processed pages - for i, page in enumerate(processed_pages): - processed_items.append( + for idx, page in enumerate(processed_pages): + out.append( ThreadedItem( payload=page, - conv_res_id=valid_items[i].conv_res_id, - conv_res=valid_items[i].conv_res, - page_no=valid_items[i].page_no, + conv_res_id=conv_res_id, + conv_res=conv_res, + page_no=valid[idx].page_no, ) ) + out.extend(fail) + except Exception as exc: + _log.error( + "Model %s failed for doc %s: %s", self.name, conv_res_id, exc + ) + for it in items: + it.is_failed = True + it.error = exc + out.append(it) - # Pass through failed items for downstream handling - processed_items.extend(failed_items) + return out - except Exception as e: - _log.error(f"Model {self.name} failed for document {conv_res_id}: {e}") - # Mark all items as failed but continue processing - for item in items: - item.is_failed = True - item.error = e - processed_items.append(item) - - return processed_items - - def _send_to_outputs(self, items: List[ThreadedItem]): - """Send processed items to output queues""" + # ------------------------------------------------------ _send_to_outputs + def _send_to_outputs(self, items: Sequence[ThreadedItem]) -> None: for item in items: - for queue in self.output_queues: - # Use timeout to prevent blocking indefinitely - if not queue.put(item, timeout=5.0): - _log.warning( - f"Failed to send item from {self.name} due to backpressure" - ) + for q in self.output_queues: + if not q.put(item, timeout=None): + _log.error("Queue closed while writing from %s", self.name) - def cleanup(self): - """Clean up stage resources""" - if self.input_queue: - self.input_queue.cleanup() - for queue in self.output_queues: - queue.cleanup() + +# ────────────────────────────────────────────────────────────────────────────── +# Run context (per-execute mutable state) +# ────────────────────────────────────────────────────────────────────────────── + + +@dataclass +class RunContext: + preprocess_stage: ThreadedPipelineStage + stages: List[ThreadedPipelineStage] + output_queue: ThreadedQueue + + +# ────────────────────────────────────────────────────────────────────────────── +# Main pipeline +# ────────────────────────────────────────────────────────────────────────────── class ThreadedStandardPdfPipeline(BasePipeline): - """ - A threaded pipeline implementation that processes pages through - dedicated stage threads with batching and backpressure control. - """ + """Thread-safe PDF pipeline with per-run isolation and queue-closing protocol.""" - def __init__(self, pipeline_options: ThreadedPdfPipelineOptions): + # ---------------------------------------------------------------- ctor + def __init__(self, pipeline_options: ThreadedPdfPipelineOptions) -> None: super().__init__(pipeline_options) self.pipeline_options: ThreadedPdfPipelineOptions = pipeline_options - # Initialize attributes with proper type annotations + # Flags set by enrichment logic self.keep_backend: bool = False self.keep_images: bool = False - # Model attributes - will be initialized in _initialize_models + # Placeholders for heavy models self.preprocessing_model: PagePreprocessingModel - self.ocr_model: Any # OCR models have different base types from factory + self.ocr_model: Any self.layout_model: LayoutModel self.table_model: TableStructureModel self.assemble_model: PageAssembleModel self.reading_order_model: ReadingOrderModel self._initialize_models() - self._setup_pipeline() - # Use weak references for memory management + # Weak tracking for stage-internal look-ups self._document_tracker: weakref.WeakValueDictionary[int, ConversionResult] = ( weakref.WeakValueDictionary() ) self._document_lock = threading.Lock() + # ──────────────────────────────────────────────────────────────────────── + # Model helpers + # ──────────────────────────────────────────────────────────────────────── + def _get_artifacts_path(self) -> Optional[Path]: - """Get artifacts path from options or settings""" - artifacts_path = None - if self.pipeline_options.artifacts_path is not None: - artifacts_path = Path(self.pipeline_options.artifacts_path).expanduser() - elif settings.artifacts_path is not None: - artifacts_path = Path(settings.artifacts_path).expanduser() + if self.pipeline_options.artifacts_path: + path = Path(self.pipeline_options.artifacts_path).expanduser() + elif settings.artifacts_path: + path = Path(settings.artifacts_path).expanduser() + else: + path = None - if artifacts_path is not None and not artifacts_path.is_dir(): + if path is not None and not path.is_dir(): raise RuntimeError( - f"The value of {artifacts_path=} is not valid. " - "When defined, it must point to a folder containing all models required by the pipeline." + f"{path=} is not a directory containing the required models." ) - return artifacts_path + return path - def _get_ocr_model(self, artifacts_path: Optional[Path] = None): - """Get OCR model instance""" + def _get_ocr_model(self, artifacts_path: Optional[Path]) -> Any: factory = get_ocr_factory( allow_external_plugins=self.pipeline_options.allow_external_plugins ) @@ -361,8 +381,10 @@ class ThreadedStandardPdfPipeline(BasePipeline): accelerator_options=self.pipeline_options.accelerator_options, ) - def _get_picture_description_model(self, artifacts_path: Optional[Path] = None): - """Get picture description model instance""" + def _get_picture_description_model( + self, + artifacts_path: Optional[Path], + ) -> Optional[PictureDescriptionBaseModel]: factory = get_picture_description_factory( allow_external_plugins=self.pipeline_options.allow_external_plugins ) @@ -374,11 +396,13 @@ class ThreadedStandardPdfPipeline(BasePipeline): accelerator_options=self.pipeline_options.accelerator_options, ) - def _initialize_models(self): - """Initialize all pipeline models""" + # ──────────────────────────────────────────────────────────────────────── + # Heavy-model initialisation + # ──────────────────────────────────────────────────────────────────────── + + def _initialize_models(self) -> None: artifacts_path = self._get_artifacts_path() - # Check if we need to keep images for processing with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=DeprecationWarning) self.keep_images = ( @@ -389,35 +413,27 @@ class ThreadedStandardPdfPipeline(BasePipeline): self.preprocessing_model = PagePreprocessingModel( options=PagePreprocessingOptions( - images_scale=self.pipeline_options.images_scale, + images_scale=self.pipeline_options.images_scale ) ) - self.ocr_model = self._get_ocr_model(artifacts_path) - self.layout_model = LayoutModel( artifacts_path=artifacts_path, accelerator_options=self.pipeline_options.accelerator_options, options=self.pipeline_options.layout_options, ) - self.table_model = TableStructureModel( enabled=self.pipeline_options.do_table_structure, artifacts_path=artifacts_path, options=self.pipeline_options.table_structure_options, accelerator_options=self.pipeline_options.accelerator_options, ) - self.assemble_model = PageAssembleModel(options=PageAssembleOptions()) - - # Reading order and enrichment models self.reading_order_model = ReadingOrderModel(options=ReadingOrderOptions()) - # Initialize enrichment models and add only enabled ones to enrichment_pipe + # Optional enrichment self.enrichment_pipe = [] - - # Code Formula Enrichment Model - code_formula_model = CodeFormulaModel( + code_formula = CodeFormulaModel( enabled=self.pipeline_options.do_code_enrichment or self.pipeline_options.do_formula_enrichment, artifacts_path=artifacts_path, @@ -427,10 +443,9 @@ class ThreadedStandardPdfPipeline(BasePipeline): ), accelerator_options=self.pipeline_options.accelerator_options, ) - if code_formula_model.enabled: - self.enrichment_pipe.append(code_formula_model) + if code_formula.enabled: + self.enrichment_pipe.append(code_formula) - # Document Picture Classifier picture_classifier = DocumentPictureClassifier( enabled=self.pipeline_options.do_picture_classification, artifacts_path=artifacts_path, @@ -440,98 +455,91 @@ class ThreadedStandardPdfPipeline(BasePipeline): if picture_classifier.enabled: self.enrichment_pipe.append(picture_classifier) - # Picture description model - picture_description_model = self._get_picture_description_model(artifacts_path) - if picture_description_model is not None and picture_description_model.enabled: - self.enrichment_pipe.append(picture_description_model) + picture_descr = self._get_picture_description_model(artifacts_path) + if picture_descr and picture_descr.enabled: + self.enrichment_pipe.append(picture_descr) - # Determine if we need to keep backend for enrichment - if ( - self.pipeline_options.do_formula_enrichment - or self.pipeline_options.do_code_enrichment - or self.pipeline_options.do_picture_classification - or self.pipeline_options.do_picture_description - ): - self.keep_backend = True + self.keep_backend = any( + ( + self.pipeline_options.do_formula_enrichment, + self.pipeline_options.do_code_enrichment, + self.pipeline_options.do_picture_classification, + self.pipeline_options.do_picture_description, + ) + ) - def _setup_pipeline(self): - """Setup the pipeline stages and connections with proper typing""" - # Use pipeline options directly - they have proper defaults + # ──────────────────────────────────────────────────────────────────────── + # Run context creation + # ──────────────────────────────────────────────────────────────────────── + + def _create_run(self) -> RunContext: opts = self.pipeline_options - # Create pipeline stages - self.preprocess_stage = ThreadedPipelineStage( + preprocess = ThreadedPipelineStage( "preprocess", self.preprocessing_model, - 1, - opts.batch_timeout_seconds, - opts.queue_max_size, + batch_size=1, + batch_timeout=opts.batch_timeout_seconds, + queue_max_size=opts.queue_max_size, ) - self.ocr_stage = ThreadedPipelineStage( + ocr = ThreadedPipelineStage( "ocr", self.ocr_model, - opts.ocr_batch_size, - opts.batch_timeout_seconds, - opts.queue_max_size, + batch_size=opts.ocr_batch_size, + batch_timeout=opts.batch_timeout_seconds, + queue_max_size=opts.queue_max_size, ) - self.layout_stage = ThreadedPipelineStage( + layout = ThreadedPipelineStage( "layout", self.layout_model, - opts.layout_batch_size, - opts.batch_timeout_seconds, - opts.queue_max_size, + batch_size=opts.layout_batch_size, + batch_timeout=opts.batch_timeout_seconds, + queue_max_size=opts.queue_max_size, ) - self.table_stage = ThreadedPipelineStage( + table = ThreadedPipelineStage( "table", self.table_model, - opts.table_batch_size, - opts.batch_timeout_seconds, - opts.queue_max_size, + batch_size=opts.table_batch_size, + batch_timeout=opts.batch_timeout_seconds, + queue_max_size=opts.queue_max_size, ) - self.assemble_stage = ThreadedPipelineStage( + assemble = ThreadedPipelineStage( "assemble", self.assemble_model, - 1, - opts.batch_timeout_seconds, - opts.queue_max_size, + batch_size=1, + batch_timeout=opts.batch_timeout_seconds, + queue_max_size=opts.queue_max_size, ) - # Create output queue for final results - self.output_queue = ThreadedQueue(max_size=opts.queue_max_size) + # Wiring + output_queue = ThreadedQueue(max_size=opts.queue_max_size) + preprocess.add_output_queue(ocr.input_queue) + ocr.add_output_queue(layout.input_queue) + layout.add_output_queue(table.input_queue) + table.add_output_queue(assemble.input_queue) + assemble.add_output_queue(output_queue) - # Connect stages in pipeline order - self.preprocess_stage.add_output_queue(self.ocr_stage.input_queue) - self.ocr_stage.add_output_queue(self.layout_stage.input_queue) - self.layout_stage.add_output_queue(self.table_stage.input_queue) - self.table_stage.add_output_queue(self.assemble_stage.input_queue) - self.assemble_stage.add_output_queue(self.output_queue) + stages = [preprocess, ocr, layout, table, assemble] + return RunContext( + preprocess_stage=preprocess, stages=stages, output_queue=output_queue + ) - self.stages = [ - self.preprocess_stage, - self.ocr_stage, - self.layout_stage, - self.table_stage, - self.assemble_stage, - ] + # ──────────────────────────────────────────────────────────────────────── + # Document build + # ──────────────────────────────────────────────────────────────────────── def _build_document(self, conv_res: ConversionResult) -> ConversionResult: - """Build document by processing pages through threaded pipeline""" if not isinstance(conv_res.input._backend, PdfDocumentBackend): - raise RuntimeError( - f"The selected backend {type(conv_res.input._backend).__name__} for {conv_res.input.file} is not a PDF backend." - ) + raise RuntimeError("Input backend must be PdfDocumentBackend") with TimeRecorder(conv_res, "doc_build", scope=ProfilingScope.DOCUMENT): - # Initialize pages + # ---------------------------------------------------------------- pages start_page, end_page = conv_res.input.limits.page_range - pages_to_process = [] - + pages_to_process: List[Page] = [] for i in range(conv_res.input.page_count): - if (start_page - 1) <= i <= (end_page - 1): + if start_page - 1 <= i <= end_page - 1: page = Page(page_no=i) conv_res.pages.append(page) - - # Initialize page backend page._backend = conv_res.input._backend.load_page(i) if page._backend and page._backend.is_valid(): page.size = page._backend.get_size() @@ -541,297 +549,159 @@ class ThreadedStandardPdfPipeline(BasePipeline): conv_res.status = ConversionStatus.FAILURE return conv_res - # Register document for tracking with weak reference - doc_id = id(conv_res) - with self._document_lock: - self._document_tracker[doc_id] = conv_res + # ---------------------------------------------------------------- run ctx + ctx = self._create_run() - # Start pipeline stages - for stage in self.stages: + # Weak-map registration (for potential cross-stage look-ups) + with self._document_lock: + self._document_tracker[id(conv_res)] = conv_res + + for stage in ctx.stages: stage.start() try: - # Feed pages into pipeline - self._feed_pipeline(pages_to_process, conv_res) - - # Collect results from pipeline with partial processing support + self._feed_pipeline(ctx.preprocess_stage, pages_to_process, conv_res) result = self._collect_results_with_recovery( - conv_res, len(pages_to_process) + ctx, conv_res, len(pages_to_process) ) - - # Update conv_res with processed pages and handle partial results self._update_document_with_results(conv_res, result) - finally: - # Stop pipeline stages - for stage in self.stages: + for stage in ctx.stages: stage.stop() - - # Cleanup stage resources - for stage in self.stages: stage.cleanup() - - # Cleanup output queue - self.output_queue.cleanup() - - # Cleanup document tracking + ctx.output_queue.cleanup() with self._document_lock: - self._document_tracker.pop(doc_id, None) + self._document_tracker.pop(id(conv_res), None) return conv_res - def _feed_pipeline(self, pages: List[Page], conv_res: ConversionResult): - """Feed pages into the pipeline""" - for page in pages: - item = ThreadedItem( - payload=page, - conv_res_id=id(conv_res), - conv_res=conv_res, - page_no=page.page_no, - ) + # ──────────────────────────────────────────────────────────────────────── + # Feed pages + # ──────────────────────────────────────────────────────────────────────── - # Feed into first stage - block patiently to ensure no pages are dropped - if not self.preprocess_stage.input_queue.put(item, timeout=None): - _log.error( - f"Failed to feed page {page.page_no} - queue was closed unexpectedly" + def _feed_pipeline( + self, + preprocess_stage: ThreadedPipelineStage, + pages: Sequence[Page], + conv_res: ConversionResult, + ) -> None: + for pg in pages: + ok = preprocess_stage.input_queue.put( + ThreadedItem( + payload=pg, + conv_res_id=id(conv_res), + conv_res=conv_res, + page_no=pg.page_no, + ), + timeout=None, + ) + if not ok: + raise RuntimeError( + "Preprocess queue closed unexpectedly while feeding pages" ) + # Upstream finished: close queue (no sentinel needed) + preprocess_stage.input_queue.close() + + # ──────────────────────────────────────────────────────────────────────── + # Collect results + # ──────────────────────────────────────────────────────────────────────── + def _collect_results_with_recovery( - self, conv_res: ConversionResult, expected_count: int + self, + ctx: RunContext, + conv_res: ConversionResult, + expected_count: int, ) -> ProcessingResult: - """Collect processed pages from the pipeline with partial result support""" result = ProcessingResult(total_expected=expected_count) - doc_id = id(conv_res) - # Collect from output queue - block patiently to ensure all pages are collected - while len(result.pages) + len(result.failed_pages) < expected_count: - batch = self.output_queue.get_batch( - batch_size=expected_count - - len(result.pages) - - len(result.failed_pages), - timeout=None, # Block indefinitely to ensure no pages are lost - ) - - if not batch: - # Empty batch only happens when queue is closed - all stages must have finished - missing_count = ( - expected_count - len(result.pages) - len(result.failed_pages) - ) - if missing_count > 0: - _log.error( - f"Pipeline closed unexpectedly: missing {missing_count} pages" - ) + while True: + batch = ctx.output_queue.get_batch(batch_size=expected_count, timeout=None) + if not batch and ctx.output_queue.closed: break for item in batch: - if item.conv_res_id == doc_id: - if item.is_failed or item.error is not None: - result.failed_pages.append( - (item.page_no, item.error or Exception("Unknown error")) - ) - _log.warning( - f"Page {item.page_no} failed processing: {item.error}" - ) - else: - result.pages.append(item.payload) + if item.conv_res_id != id(conv_res): + # In per-run isolation this cannot happen + continue + if item.is_failed or item.error: + result.failed_pages.append( + (item.page_no, item.error or Exception("Unknown error")) + ) + else: + assert item.payload is not None + result.pages.append(item.payload) + + # Terminate when all pages accounted for + if (result.success_count + result.failure_count) >= expected_count: + break return result - def _update_document_with_results( - self, conv_res: ConversionResult, result: ProcessingResult - ): - """Update document with processing results and handle partial success""" - # Update conv_res with successfully processed pages - page_map = {p.page_no: p for p in result.pages} - valid_pages = [] + # ──────────────────────────────────────────────────────────────────────── + # Update ConversionResult + # ──────────────────────────────────────────────────────────────────────── + def _update_document_with_results( + self, conv_res: ConversionResult, proc: ProcessingResult + ) -> None: + page_map = {p.page_no: p for p in proc.pages} + new_pages: List[Page] = [] for page in conv_res.pages: if page.page_no in page_map: - valid_pages.append(page_map[page.page_no]) - elif not any( - failed_page_no == page.page_no - for failed_page_no, _ in result.failed_pages - ): - # Page wasn't processed but also didn't explicitly fail - keep original - valid_pages.append(page) + new_pages.append(page_map[page.page_no]) + elif not any(fp_no == page.page_no for fp_no, _ in proc.failed_pages): + new_pages.append(page) + conv_res.pages = new_pages - conv_res.pages = valid_pages - - # Handle partial results - if result.is_partial_success: - _log.warning( - f"Partial processing success: {result.success_count} pages succeeded, " - f"{result.failure_count} pages failed" - ) + if proc.is_partial_success: conv_res.status = ConversionStatus.PARTIAL_SUCCESS - elif result.is_complete_failure: - _log.error("Complete processing failure: all pages failed") + elif proc.is_complete_failure: conv_res.status = ConversionStatus.FAILURE - elif result.success_count > 0: - # All expected pages processed successfully + else: conv_res.status = ConversionStatus.SUCCESS - # Clean up page resources if not keeping images if not self.keep_images: - for page in conv_res.pages: - # _image_cache is always present on Page objects, no need for hasattr - page._image_cache = {} - - # Clean up page backends if not keeping them + for p in conv_res.pages: + p._image_cache = {} if not self.keep_backend: - for page in conv_res.pages: - if page._backend is not None: - page._backend.unload() + for p in conv_res.pages: + if p._backend is not None: + p._backend.unload() + + # ──────────────────────────────────────────────────────────────────────── + # Assemble / enrich + # ──────────────────────────────────────────────────────────────────────── def _assemble_document(self, conv_res: ConversionResult) -> ConversionResult: - """Assemble the final document from processed pages""" - all_elements = [] - all_headers = [] - all_body = [] - + all_elements, all_headers, all_body = [], [], [] with TimeRecorder(conv_res, "doc_assemble", scope=ProfilingScope.DOCUMENT): for p in conv_res.pages: - if p.assembled is not None: - for el in p.assembled.body: - all_body.append(el) - for el in p.assembled.headers: - all_headers.append(el) - for el in p.assembled.elements: - all_elements.append(el) + if p.assembled: + all_elements.extend(p.assembled.elements) + all_headers.extend(p.assembled.headers) + all_body.extend(p.assembled.body) conv_res.assembled = AssembledUnit( elements=all_elements, headers=all_headers, body=all_body ) - conv_res.document = self.reading_order_model(conv_res) - - # Generate page images - if self.pipeline_options.generate_page_images: - for page in conv_res.pages: - if page.image is not None: - page_no = page.page_no + 1 - conv_res.document.pages[page_no].image = ImageRef.from_pil( - page.image, dpi=int(72 * self.pipeline_options.images_scale) - ) - - # Generate element images - self._generate_element_images(conv_res) - - # Aggregate confidence scores - self._aggregate_confidence(conv_res) - return conv_res - def _generate_element_images(self, conv_res: ConversionResult): - """Generate images for picture and table elements""" - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", category=DeprecationWarning) - if ( - self.pipeline_options.generate_picture_images - or self.pipeline_options.generate_table_images - ): - scale = self.pipeline_options.images_scale - for element, _level in conv_res.document.iterate_items(): - if not isinstance(element, DocItem) or len(element.prov) == 0: - continue - if ( - isinstance(element, PictureItem) - and self.pipeline_options.generate_picture_images - ) or ( - isinstance(element, TableItem) - and self.pipeline_options.generate_table_images - ): - page_ix = element.prov[0].page_no - 1 - page = next( - (p for p in conv_res.pages if p.page_no == page_ix), None - ) - if ( - page is not None - and page.size is not None - and page.image is not None - ): - crop_bbox = ( - element.prov[0] - .bbox.scaled(scale=scale) - .to_top_left_origin( - page_height=page.size.height * scale - ) - ) - cropped_im = page.image.crop(crop_bbox.as_tuple()) - element.image = ImageRef.from_pil( - cropped_im, dpi=int(72 * scale) - ) - - def _aggregate_confidence(self, conv_res: ConversionResult): - """Aggregate confidence scores across pages""" - if len(conv_res.pages) > 0: - import warnings - - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", - category=RuntimeWarning, - message="Mean of empty slice|All-NaN slice encountered", - ) - conv_res.confidence.layout_score = float( - np.nanmean( - [c.layout_score for c in conv_res.confidence.pages.values()] - ) - ) - conv_res.confidence.parse_score = float( - np.nanquantile( - [c.parse_score for c in conv_res.confidence.pages.values()], - q=0.1, - ) - ) - conv_res.confidence.table_score = float( - np.nanmean( - [c.table_score for c in conv_res.confidence.pages.values()] - ) - ) - conv_res.confidence.ocr_score = float( - np.nanmean( - [c.ocr_score for c in conv_res.confidence.pages.values()] - ) - ) - - def _enrich_document(self, conv_res: ConversionResult) -> ConversionResult: - """Run enrichment models on the document""" - - def _prepare_elements(conv_res: ConversionResult, model: Any) -> Iterable[Any]: - for doc_element, _level in conv_res.document.iterate_items(): - prepared_element = model.prepare_element( - conv_res=conv_res, element=doc_element - ) - if prepared_element is not None: - yield prepared_element - - with TimeRecorder(conv_res, "doc_enrich", scope=ProfilingScope.DOCUMENT): - for model in self.enrichment_pipe: - for element_batch in chunkify( - _prepare_elements(conv_res, model), - model.elements_batch_size, - ): - for element in model( - doc=conv_res.document, element_batch=element_batch - ): # Must exhaust! - pass - - return conv_res - - def _determine_status(self, conv_res: ConversionResult) -> ConversionStatus: - """Determine the final conversion status""" - if conv_res.status == ConversionStatus.PARTIAL_SUCCESS: - return ConversionStatus.PARTIAL_SUCCESS - elif conv_res.pages and conv_res.document: - return ConversionStatus.SUCCESS - else: - return ConversionStatus.FAILURE + # ──────────────────────────────────────────────────────────────────────── + # Base overrides + # ──────────────────────────────────────────────────────────────────────── @classmethod def get_default_options(cls) -> ThreadedPdfPipelineOptions: - return ThreadedPdfPipelineOptions() + return ThreadedPdfPipelineOptions() # type: ignore[call-arg] @classmethod - def is_backend_supported(cls, backend): + def is_backend_supported(cls, backend) -> bool: # type: ignore[override] return isinstance(backend, PdfDocumentBackend) + + def _determine_status(self, conv_res: ConversionResult) -> ConversionStatus: + return conv_res.status + + def _unload(self, conv_res: ConversionResult) -> None: + return diff --git a/tests/test_threaded_pipeline.py b/tests/test_threaded_pipeline.py index 469aec1e..9635f50c 100644 --- a/tests/test_threaded_pipeline.py +++ b/tests/test_threaded_pipeline.py @@ -1,56 +1,104 @@ +import logging import time from pathlib import Path from typing import List import pytest -from docling.document_converter import DocumentConverter, PdfFormatOption -from docling.datamodel.base_models import InputFormat, ConversionStatus +from docling.datamodel.base_models import ConversionStatus, InputFormat from docling.datamodel.document import ConversionResult from docling.datamodel.pipeline_options import ( PdfPipelineOptions, - ThreadedPdfPipelineOptions + ThreadedPdfPipelineOptions, ) +from docling.document_converter import DocumentConverter, PdfFormatOption from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline from docling.pipeline.threaded_standard_pdf_pipeline import ThreadedStandardPdfPipeline + def test_threaded_pipeline_multiple_documents(): - """Test threaded pipeline with multiple documents""" - converter = DocumentConverter( + """Test threaded pipeline with multiple documents and compare with standard pipeline""" + test_files = [ + "tests/data/pdf/2203.01017v2.pdf", + "tests/data/pdf/2206.01062.pdf", + "tests/data/pdf/2305.03393v1.pdf" + ] + + # Standard pipeline + standard_converter = DocumentConverter( + format_options={ + InputFormat.PDF: PdfFormatOption( + pipeline_cls=StandardPdfPipeline, + pipeline_options=PdfPipelineOptions( + do_table_structure=True, + do_ocr=True, + ), + ) + } + ) + + # Threaded pipeline + threaded_converter = DocumentConverter( format_options={ InputFormat.PDF: PdfFormatOption( pipeline_cls=ThreadedStandardPdfPipeline, pipeline_options=ThreadedPdfPipelineOptions( - layout_batch_size=48, - ocr_batch_size=24, + layout_batch_size=1, + table_batch_size=1, + ocr_batch_size=1, batch_timeout_seconds=1.0, - ) + do_table_structure=True, + do_ocr=True, + ), ) } ) - - # Test threaded pipeline with multiple documents - results = [] + + # Test standard pipeline + standard_results = [] start_time = time.perf_counter() - for result in converter.convert_all([ - "tests/data/pdf/2206.01062.pdf", - "tests/data/pdf/2305.03393v1.pdf" - ]): - results.append(result) - end_time = time.perf_counter() - - conversion_duration = end_time - start_time - print(f"Threaded multi-doc conversion took {conversion_duration:.2f} seconds") - - assert len(results) == 2 - for result in results: + for result in standard_converter.convert_all(test_files, raises_on_error=True): + print("Finished converting document with standard pipeline:", result.input.file.name) + standard_results.append(result) + standard_time = time.perf_counter() - start_time + + del standard_converter + + # Test threaded pipeline + threaded_results = [] + start_time = time.perf_counter() + for result in threaded_converter.convert_all(test_files, raises_on_error=True): + print("Finished converting document with threaded pipeline:", result.input.file.name) + threaded_results.append(result) + threaded_time = time.perf_counter() - start_time + + del threaded_converter + + print("\nMulti-document Pipeline Comparison:") + print(f"Standard pipeline: {standard_time:.2f} seconds") + print(f"Threaded pipeline: {threaded_time:.2f} seconds") + print(f"Speedup: {standard_time / threaded_time:.2f}x") + + # Verify results + assert len(standard_results) == len(threaded_results) + for result in standard_results: assert result.status == ConversionStatus.SUCCESS + for result in threaded_results: + assert result.status == ConversionStatus.SUCCESS + + # Basic content comparison + for i, (standard_result, threaded_result) in enumerate(zip(standard_results, threaded_results)): + standard_doc = standard_result.document + threaded_doc = threaded_result.document + + assert len(standard_doc.pages) == len(threaded_doc.pages), f"Document {i} page count mismatch" + assert len(standard_doc.texts) == len(threaded_doc.texts), f"Document {i} text count mismatch" def test_pipeline_comparison(): """Compare all three pipeline implementations""" test_file = "tests/data/pdf/2206.01062.pdf" - + # Sync pipeline sync_converter = DocumentConverter( format_options={ @@ -59,11 +107,11 @@ def test_pipeline_comparison(): ) } ) - + start_time = time.perf_counter() sync_results = list(sync_converter.convert_all([test_file])) sync_time = time.perf_counter() - start_time - + # Threaded pipeline threaded_converter = DocumentConverter( format_options={ @@ -73,35 +121,34 @@ def test_pipeline_comparison(): layout_batch_size=1, ocr_batch_size=1, table_batch_size=1, - ) + ), ) } ) - + start_time = time.perf_counter() threaded_results = list(threaded_converter.convert_all([test_file])) threaded_time = time.perf_counter() - start_time - - print(f"\nPipeline Comparison:") + + print("\nPipeline Comparison:") print(f"Sync pipeline: {sync_time:.2f} seconds") print(f"Threaded pipeline: {threaded_time:.2f} seconds") - print(f"Speedup: {sync_time/threaded_time:.2f}x") - + print(f"Speedup: {sync_time / threaded_time:.2f}x") + # Verify results are equivalent assert len(sync_results) == len(threaded_results) == 1 - assert sync_results[0].status == threaded_results[0].status == ConversionStatus.SUCCESS - + assert ( + sync_results[0].status == threaded_results[0].status == ConversionStatus.SUCCESS + ) + # Basic content comparison sync_doc = sync_results[0].document threaded_doc = threaded_results[0].document - + assert len(sync_doc.pages) == len(threaded_doc.pages) assert len(sync_doc.texts) == len(threaded_doc.texts) - - - if __name__ == "__main__": # Run basic performance test - test_pipeline_comparison() \ No newline at end of file + test_pipeline_comparison()