diff --git a/docling/datamodel/pipeline_options.py b/docling/datamodel/pipeline_options.py index 6359b40d..c33a4efa 100644 --- a/docling/datamodel/pipeline_options.py +++ b/docling/datamodel/pipeline_options.py @@ -347,11 +347,9 @@ class ThreadedPdfPipelineOptions(PdfPipelineOptions): # Backpressure and queue control queue_max_size: int = 100 - max_workers: Optional[int] = None # None uses ThreadPoolExecutor default - # Pipeline coordination - stage_timeout_seconds: float = 10.0 # Timeout for feeding items to stages - collection_timeout_seconds: float = 5.0 # Timeout for collecting results + # Pipeline coordination - batch_timeout_seconds is the only safe timeout + # stage_timeout_seconds and collection_timeout_seconds removed to prevent data loss @classmethod def from_sync_options( diff --git a/docling/models/layout_model.py b/docling/models/layout_model.py index 66305789..2ed30c5d 100644 --- a/docling/models/layout_model.py +++ b/docling/models/layout_model.py @@ -3,7 +3,7 @@ import logging import warnings from collections.abc import Iterable from pathlib import Path -from typing import Optional +from typing import List, Optional, Union import numpy as np from docling_core.types.doc import DocItemLabel @@ -153,7 +153,7 @@ class LayoutModel(BasePageModel): # Separate valid and invalid pages valid_pages = [] - valid_page_images = [] + valid_page_images: List[Union[Image.Image, np.ndarray]] = [] for page in pages: assert page._backend is not None diff --git a/docling/pipeline/threaded_standard_pdf_pipeline.py b/docling/pipeline/threaded_standard_pdf_pipeline.py index 59538074..6ef4e90b 100644 --- a/docling/pipeline/threaded_standard_pdf_pipeline.py +++ b/docling/pipeline/threaded_standard_pdf_pipeline.py @@ -590,11 +590,11 @@ class ThreadedStandardPdfPipeline(BasePipeline): page_no=page.page_no, ) - # Feed into first stage with timeout - if not self.preprocess_stage.input_queue.put( - item, timeout=self.pipeline_options.stage_timeout_seconds - ): - _log.warning(f"Failed to feed page {page.page_no} due to backpressure") + # Feed into first stage - block patiently to ensure no pages are dropped + if not self.preprocess_stage.input_queue.put(item, timeout=None): + _log.error( + f"Failed to feed page {page.page_no} - queue was closed unexpectedly" + ) def _collect_results_with_recovery( self, conv_res: ConversionResult, expected_count: int @@ -603,22 +603,24 @@ class ThreadedStandardPdfPipeline(BasePipeline): result = ProcessingResult(total_expected=expected_count) doc_id = id(conv_res) - # Collect from output queue + # Collect from output queue - block patiently to ensure all pages are collected while len(result.pages) + len(result.failed_pages) < expected_count: batch = self.output_queue.get_batch( batch_size=expected_count - len(result.pages) - len(result.failed_pages), - timeout=self.pipeline_options.collection_timeout_seconds, + timeout=None, # Block indefinitely to ensure no pages are lost ) if not batch: - # Timeout reached, log missing pages + # Empty batch only happens when queue is closed - all stages must have finished missing_count = ( expected_count - len(result.pages) - len(result.failed_pages) ) if missing_count > 0: - _log.warning(f"Pipeline timeout: missing {missing_count} pages") + _log.error( + f"Pipeline closed unexpectedly: missing {missing_count} pages" + ) break for item in batch: