Remove unused args

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>
This commit is contained in:
Christoph Auer 2025-07-16 17:50:38 +02:00
parent 4397bb2c44
commit 04085ba86d
3 changed files with 15 additions and 15 deletions

View File

@ -347,11 +347,9 @@ class ThreadedPdfPipelineOptions(PdfPipelineOptions):
# Backpressure and queue control # Backpressure and queue control
queue_max_size: int = 100 queue_max_size: int = 100
max_workers: Optional[int] = None # None uses ThreadPoolExecutor default
# Pipeline coordination # Pipeline coordination - batch_timeout_seconds is the only safe timeout
stage_timeout_seconds: float = 10.0 # Timeout for feeding items to stages # stage_timeout_seconds and collection_timeout_seconds removed to prevent data loss
collection_timeout_seconds: float = 5.0 # Timeout for collecting results
@classmethod @classmethod
def from_sync_options( def from_sync_options(

View File

@ -3,7 +3,7 @@ import logging
import warnings import warnings
from collections.abc import Iterable from collections.abc import Iterable
from pathlib import Path from pathlib import Path
from typing import Optional from typing import List, Optional, Union
import numpy as np import numpy as np
from docling_core.types.doc import DocItemLabel from docling_core.types.doc import DocItemLabel
@ -153,7 +153,7 @@ class LayoutModel(BasePageModel):
# Separate valid and invalid pages # Separate valid and invalid pages
valid_pages = [] valid_pages = []
valid_page_images = [] valid_page_images: List[Union[Image.Image, np.ndarray]] = []
for page in pages: for page in pages:
assert page._backend is not None assert page._backend is not None

View File

@ -590,11 +590,11 @@ class ThreadedStandardPdfPipeline(BasePipeline):
page_no=page.page_no, page_no=page.page_no,
) )
# Feed into first stage with timeout # Feed into first stage - block patiently to ensure no pages are dropped
if not self.preprocess_stage.input_queue.put( if not self.preprocess_stage.input_queue.put(item, timeout=None):
item, timeout=self.pipeline_options.stage_timeout_seconds _log.error(
): f"Failed to feed page {page.page_no} - queue was closed unexpectedly"
_log.warning(f"Failed to feed page {page.page_no} due to backpressure") )
def _collect_results_with_recovery( def _collect_results_with_recovery(
self, conv_res: ConversionResult, expected_count: int self, conv_res: ConversionResult, expected_count: int
@ -603,22 +603,24 @@ class ThreadedStandardPdfPipeline(BasePipeline):
result = ProcessingResult(total_expected=expected_count) result = ProcessingResult(total_expected=expected_count)
doc_id = id(conv_res) doc_id = id(conv_res)
# Collect from output queue # Collect from output queue - block patiently to ensure all pages are collected
while len(result.pages) + len(result.failed_pages) < expected_count: while len(result.pages) + len(result.failed_pages) < expected_count:
batch = self.output_queue.get_batch( batch = self.output_queue.get_batch(
batch_size=expected_count batch_size=expected_count
- len(result.pages) - len(result.pages)
- len(result.failed_pages), - len(result.failed_pages),
timeout=self.pipeline_options.collection_timeout_seconds, timeout=None, # Block indefinitely to ensure no pages are lost
) )
if not batch: if not batch:
# Timeout reached, log missing pages # Empty batch only happens when queue is closed - all stages must have finished
missing_count = ( missing_count = (
expected_count - len(result.pages) - len(result.failed_pages) expected_count - len(result.pages) - len(result.failed_pages)
) )
if missing_count > 0: if missing_count > 0:
_log.warning(f"Pipeline timeout: missing {missing_count} pages") _log.error(
f"Pipeline closed unexpectedly: missing {missing_count} pages"
)
break break
for item in batch: for item in batch: