diff --git a/docling/pipeline/standard_pdf_pipeline.py b/docling/pipeline/standard_pdf_pipeline.py index 6c662c0e..2eaaadca 100644 --- a/docling/pipeline/standard_pdf_pipeline.py +++ b/docling/pipeline/standard_pdf_pipeline.py @@ -179,6 +179,7 @@ class ThreadedPipelineStage: batch_timeout: float, queue_max_size: int, postprocess: Optional[Callable[[ThreadedItem], None]] = None, + timed_out_run_ids: Optional[set[int]] = None, ) -> None: self.name = name self.model = model @@ -189,6 +190,9 @@ class ThreadedPipelineStage: self._thread: Optional[threading.Thread] = None self._running = False self._postprocess = postprocess + self._timed_out_run_ids = ( + timed_out_run_ids if timed_out_run_ids is not None else set() + ) # ---------------------------------------------------------------- wiring def add_output_queue(self, q: ThreadedQueue) -> None: @@ -200,7 +204,7 @@ class ThreadedPipelineStage: return self._running = True self._thread = threading.Thread( - target=self._run, name=f"Stage-{self.name}", daemon=True + target=self._run, name=f"Stage-{self.name}", daemon=False ) self._thread.start() @@ -210,9 +214,14 @@ class ThreadedPipelineStage: self._running = False self.input_queue.close() if self._thread is not None: - self._thread.join(timeout=30.0) + # Give thread 2s to finish naturally before abandoning + self._thread.join(timeout=15.0) if self._thread.is_alive(): - _log.warning("Stage %s did not terminate cleanly within 30s", self.name) + _log.warning( + "Stage %s thread did not terminate within 15s. " + "Thread is likely stuck in a blocking call and will be abandoned (resources may leak).", + self.name, + ) # ------------------------------------------------------------------ _run def _run(self) -> None: @@ -238,6 +247,16 @@ class ThreadedPipelineStage: result: list[ThreadedItem] = [] for rid, items in groups.items(): + # If run_id is timed out, skip processing but pass through items as-is + # This allows already-completed work to flow through while aborting new work + if rid in self._timed_out_run_ids: + for it in items: + it.is_failed = True + if it.error is None: + it.error = RuntimeError("document timeout exceeded") + result.extend(items) + continue + good: list[ThreadedItem] = [i for i in items if not i.is_failed] if not good: result.extend(items) @@ -299,6 +318,7 @@ class PreprocessThreadedStage(ThreadedPipelineStage): batch_timeout: float, queue_max_size: int, model: Any, + timed_out_run_ids: Optional[set[int]] = None, ) -> None: super().__init__( name="preprocess", @@ -306,6 +326,7 @@ class PreprocessThreadedStage(ThreadedPipelineStage): batch_size=1, batch_timeout=batch_timeout, queue_max_size=queue_max_size, + timed_out_run_ids=timed_out_run_ids, ) def _process_batch(self, batch: Sequence[ThreadedItem]) -> list[ThreadedItem]: @@ -315,6 +336,16 @@ class PreprocessThreadedStage(ThreadedPipelineStage): result: list[ThreadedItem] = [] for rid, items in groups.items(): + # If run_id is timed out, skip processing but pass through items as-is + # This allows already-completed work to flow through while aborting new work + if rid in self._timed_out_run_ids: + for it in items: + it.is_failed = True + if it.error is None: + it.error = RuntimeError("document timeout exceeded") + result.extend(items) + continue + good = [i for i in items if not i.is_failed] if not good: result.extend(items) @@ -369,6 +400,7 @@ class RunContext: stages: list[ThreadedPipelineStage] first_stage: ThreadedPipelineStage output_queue: ThreadedQueue + timed_out_run_ids: set[int] = field(default_factory=set) # ────────────────────────────────────────────────────────────────────────────── @@ -473,10 +505,12 @@ class StandardPdfPipeline(ConvertPipeline): def _create_run_ctx(self) -> RunContext: opts = self.pipeline_options + timed_out_run_ids: set[int] = set() preprocess = PreprocessThreadedStage( batch_timeout=opts.batch_polling_interval_seconds, queue_max_size=opts.queue_max_size, model=self.preprocessing_model, + timed_out_run_ids=timed_out_run_ids, ) ocr = ThreadedPipelineStage( name="ocr", @@ -484,6 +518,7 @@ class StandardPdfPipeline(ConvertPipeline): batch_size=opts.ocr_batch_size, batch_timeout=opts.batch_polling_interval_seconds, queue_max_size=opts.queue_max_size, + timed_out_run_ids=timed_out_run_ids, ) layout = ThreadedPipelineStage( name="layout", @@ -491,6 +526,7 @@ class StandardPdfPipeline(ConvertPipeline): batch_size=opts.layout_batch_size, batch_timeout=opts.batch_polling_interval_seconds, queue_max_size=opts.queue_max_size, + timed_out_run_ids=timed_out_run_ids, ) table = ThreadedPipelineStage( name="table", @@ -498,6 +534,7 @@ class StandardPdfPipeline(ConvertPipeline): batch_size=opts.table_batch_size, batch_timeout=opts.batch_polling_interval_seconds, queue_max_size=opts.queue_max_size, + timed_out_run_ids=timed_out_run_ids, ) assemble = ThreadedPipelineStage( name="assemble", @@ -506,6 +543,7 @@ class StandardPdfPipeline(ConvertPipeline): batch_timeout=opts.batch_polling_interval_seconds, queue_max_size=opts.queue_max_size, postprocess=self._release_page_resources, + timed_out_run_ids=timed_out_run_ids, ) # wire stages @@ -517,11 +555,22 @@ class StandardPdfPipeline(ConvertPipeline): assemble.add_output_queue(output_q) stages = [preprocess, ocr, layout, table, assemble] - return RunContext(stages=stages, first_stage=preprocess, output_queue=output_q) + return RunContext( + stages=stages, + first_stage=preprocess, + output_queue=output_q, + timed_out_run_ids=timed_out_run_ids, + ) # --------------------------------------------------------------------- build def _build_document(self, conv_res: ConversionResult) -> ConversionResult: - """Stream-build the document while interleaving producer and consumer work.""" + """Stream-build the document while interleaving producer and consumer work. + + Note: If a worker thread gets stuck in a blocking call (model inference or PDF backend + load_page/get_size), that thread will be abandoned after a brief wait (15s) during cleanup. + The thread continues running until the blocking call completes, potentially holding + resources (e.g., pypdfium2_lock). + """ run_id = next(self._run_seq) assert isinstance(conv_res.input._backend, PdfDocumentBackend) @@ -546,26 +595,50 @@ class StandardPdfPipeline(ConvertPipeline): proc = ProcessingResult(total_expected=total_pages) fed_idx: int = 0 # number of pages successfully queued batch_size: int = 32 # drain chunk + start_time = time.monotonic() + timeout_exceeded = False + input_queue_closed = False try: while proc.success_count + proc.failure_count < total_pages: - # 1) feed - try to enqueue until the first queue is full - while fed_idx < total_pages: - ok = ctx.first_stage.input_queue.put( - ThreadedItem( - payload=pages[fed_idx], - run_id=run_id, - page_no=pages[fed_idx].page_no, - conv_res=conv_res, - ), - timeout=0.0, # non-blocking try-put - ) - if ok: - fed_idx += 1 - if fed_idx == total_pages: + # Check timeout + if ( + self.pipeline_options.document_timeout is not None + and not timeout_exceeded + ): + elapsed_time = time.monotonic() - start_time + if elapsed_time > self.pipeline_options.document_timeout: + _log.warning( + f"Document processing time ({elapsed_time:.3f}s) " + f"exceeded timeout of {self.pipeline_options.document_timeout:.3f}s" + ) + timeout_exceeded = True + ctx.timed_out_run_ids.add(run_id) + if not input_queue_closed: ctx.first_stage.input_queue.close() - else: # queue full - switch to draining + input_queue_closed = True + # Break immediately - don't wait for in-flight work break + # 1) feed - try to enqueue until the first queue is full + if not input_queue_closed: + while fed_idx < total_pages: + ok = ctx.first_stage.input_queue.put( + ThreadedItem( + payload=pages[fed_idx], + run_id=run_id, + page_no=pages[fed_idx].page_no, + conv_res=conv_res, + ), + timeout=0.0, # non-blocking try-put + ) + if ok: + fed_idx += 1 + if fed_idx == total_pages: + ctx.first_stage.input_queue.close() + input_queue_closed = True + else: # queue full - switch to draining + break + # 2) drain - pull whatever is ready from the output side out_batch = ctx.output_queue.get_batch(batch_size, timeout=0.05) for itm in out_batch: @@ -579,7 +652,7 @@ class StandardPdfPipeline(ConvertPipeline): assert itm.payload is not None proc.pages.append(itm.payload) - # 3) failure safety - downstream closed early -> mark missing pages failed + # 3) failure safety - downstream closed early if not out_batch and ctx.output_queue.closed: missing = total_pages - (proc.success_count + proc.failure_count) if missing > 0: @@ -587,24 +660,36 @@ class StandardPdfPipeline(ConvertPipeline): [(-1, RuntimeError("pipeline terminated early"))] * missing ) break + + # Mark remaining pages as failed if timeout occurred + if timeout_exceeded: + completed_page_nos = {p.page_no for p in proc.pages} | { + fp for fp, _ in proc.failed_pages + } + for page in pages[fed_idx:]: + if page.page_no not in completed_page_nos: + proc.failed_pages.append( + (page.page_no, RuntimeError("document timeout exceeded")) + ) finally: for st in ctx.stages: st.stop() ctx.output_queue.close() - self._integrate_results(conv_res, proc) + self._integrate_results(conv_res, proc, timeout_exceeded=timeout_exceeded) return conv_res # ---------------------------------------------------- integrate_results() def _integrate_results( - self, conv_res: ConversionResult, proc: ProcessingResult + self, + conv_res: ConversionResult, + proc: ProcessingResult, + timeout_exceeded: bool = False, ) -> None: page_map = {p.page_no: p for p in proc.pages} + # Only keep pages that successfully completed processing conv_res.pages = [ - page_map.get(p.page_no, p) - for p in conv_res.pages - if p.page_no in page_map - or not any(fp == p.page_no for fp, _ in proc.failed_pages) + page_map[p.page_no] for p in conv_res.pages if p.page_no in page_map ] # Add error details from failed pages for page_no, error in proc.failed_pages: @@ -616,7 +701,10 @@ class StandardPdfPipeline(ConvertPipeline): error_message=f"{page_label}: {error_msg}" if error_msg else page_label, ) conv_res.errors.append(error_item) - if proc.is_complete_failure: + if timeout_exceeded and proc.total_expected > 0: + # Timeout exceeded: set PARTIAL_SUCCESS if any pages were attempted + conv_res.status = ConversionStatus.PARTIAL_SUCCESS + elif proc.is_complete_failure: conv_res.status = ConversionStatus.FAILURE elif proc.is_partial_success: conv_res.status = ConversionStatus.PARTIAL_SUCCESS diff --git a/tests/test_options.py b/tests/test_options.py index a0835d1c..1e06378f 100644 --- a/tests/test_options.py +++ b/tests/test_options.py @@ -15,6 +15,7 @@ from docling.datamodel.pipeline_options import ( TableFormerMode, ) from docling.document_converter import DocumentConverter, PdfFormatOption +from docling.pipeline.legacy_standard_pdf_pipeline import LegacyStandardPdfPipeline @pytest.fixture @@ -118,6 +119,33 @@ def test_page_range(test_doc_path): assert doc_result.status == ConversionStatus.FAILURE +def test_document_timeout(test_doc_path): + converter = DocumentConverter( + format_options={ + InputFormat.PDF: PdfFormatOption( + pipeline_options=PdfPipelineOptions(document_timeout=1) + ) + } + ) + result = converter.convert(test_doc_path) + assert result.status == ConversionStatus.PARTIAL_SUCCESS, ( + "Expected document timeout to be used" + ) + + converter = DocumentConverter( + format_options={ + InputFormat.PDF: PdfFormatOption( + pipeline_options=PdfPipelineOptions(document_timeout=1), + pipeline_cls=LegacyStandardPdfPipeline, + ) + } + ) + result = converter.convert(test_doc_path) + assert result.status == ConversionStatus.PARTIAL_SUCCESS, ( + "Expected document timeout to be used" + ) + + def test_ocr_coverage_threshold(test_doc_path): pipeline_options = PdfPipelineOptions() pipeline_options.do_ocr = True