From e3aa8cd7709d8f5cc0a4c3b944a1b75975b72237 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 17 Nov 2025 09:23:28 +0000 Subject: [PATCH] feat: add document_timeout support to StandardPdfPipeline - Add timeout tracking in _build_document method - Check elapsed time against document_timeout in processing loop - Set PARTIAL_SUCCESS status when timeout is exceeded - Add test for document_timeout behavior Co-authored-by: cau-git <60343111+cau-git@users.noreply.github.com> --- docling/pipeline/standard_pdf_pipeline.py | 20 +++++++++++++---- tests/test_threaded_pipeline.py | 26 +++++++++++++++++++++++ 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/docling/pipeline/standard_pdf_pipeline.py b/docling/pipeline/standard_pdf_pipeline.py index 82bf012f..827aa63b 100644 --- a/docling/pipeline/standard_pdf_pipeline.py +++ b/docling/pipeline/standard_pdf_pipeline.py @@ -538,6 +538,8 @@ class StandardPdfPipeline(ConvertPipeline): proc = ProcessingResult(total_expected=total_pages) fed_idx: int = 0 # number of pages successfully queued batch_size: int = 32 # drain chunk + start_time: float = time.monotonic() + timeout_exceeded: bool = False try: while proc.success_count + proc.failure_count < total_pages: # 1) feed - try to enqueue until the first queue is full @@ -571,7 +573,17 @@ class StandardPdfPipeline(ConvertPipeline): assert itm.payload is not None proc.pages.append(itm.payload) - # 3) failure safety - downstream closed early -> mark missing pages failed + # 3) timeout check - respect document_timeout if configured + if self.pipeline_options.document_timeout is not None: + elapsed_time = time.monotonic() - start_time + if elapsed_time > self.pipeline_options.document_timeout: + _log.warning( + f"Document processing time ({elapsed_time:.3f} seconds) exceeded the specified timeout of {self.pipeline_options.document_timeout:.3f} seconds" + ) + timeout_exceeded = True + break + + # 4) failure safety - downstream closed early -> mark missing pages failed if not out_batch and ctx.output_queue.closed: missing = total_pages - (proc.success_count + proc.failure_count) if missing > 0: @@ -584,12 +596,12 @@ class StandardPdfPipeline(ConvertPipeline): st.stop() ctx.output_queue.close() - self._integrate_results(conv_res, proc) + self._integrate_results(conv_res, proc, timeout_exceeded=timeout_exceeded) return conv_res # ---------------------------------------------------- integrate_results() def _integrate_results( - self, conv_res: ConversionResult, proc: ProcessingResult + self, conv_res: ConversionResult, proc: ProcessingResult, timeout_exceeded: bool = False ) -> None: page_map = {p.page_no: p for p in proc.pages} conv_res.pages = [ @@ -600,7 +612,7 @@ class StandardPdfPipeline(ConvertPipeline): ] if proc.is_complete_failure: conv_res.status = ConversionStatus.FAILURE - elif proc.is_partial_success: + elif timeout_exceeded or proc.is_partial_success: conv_res.status = ConversionStatus.PARTIAL_SUCCESS else: conv_res.status = ConversionStatus.SUCCESS diff --git a/tests/test_threaded_pipeline.py b/tests/test_threaded_pipeline.py index c24716cd..987171b0 100644 --- a/tests/test_threaded_pipeline.py +++ b/tests/test_threaded_pipeline.py @@ -17,6 +17,32 @@ from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline from docling.pipeline.threaded_standard_pdf_pipeline import ThreadedStandardPdfPipeline +def test_standard_pipeline_document_timeout(): + """Test that StandardPdfPipeline respects document_timeout""" + test_file = "tests/data/pdf/2203.01017v2.pdf" # Large file to ensure timeout can occur + + # Configure pipeline with very short timeout + converter = DocumentConverter( + format_options={ + InputFormat.PDF: PdfFormatOption( + pipeline_cls=StandardPdfPipeline, + pipeline_options=ThreadedPdfPipelineOptions( + document_timeout=0.1, # Very short timeout (100ms) + do_ocr=False, # Disable OCR to speed up processing + do_table_structure=False, # Disable table structure to speed up processing + ), + ) + } + ) + + result = converter.convert(test_file) + + # Verify that timeout was respected + assert result.status == ConversionStatus.PARTIAL_SUCCESS, ( + f"Expected PARTIAL_SUCCESS due to timeout, got {result.status}" + ) + + def test_threaded_pipeline_multiple_documents(): """Test threaded pipeline with multiple documents and compare with standard pipeline""" test_files = [