feat: add document_timeout support to StandardPdfPipeline

- Add timeout tracking in _build_document method
- Check elapsed time against document_timeout in processing loop
- Set PARTIAL_SUCCESS status when timeout is exceeded
- Add test for document_timeout behavior

Co-authored-by: cau-git <60343111+cau-git@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2025-11-17 09:23:28 +00:00
parent f3ed123b51
commit e3aa8cd770
2 changed files with 42 additions and 4 deletions

View File

@@ -538,6 +538,8 @@ class StandardPdfPipeline(ConvertPipeline):
proc = ProcessingResult(total_expected=total_pages) proc = ProcessingResult(total_expected=total_pages)
fed_idx: int = 0 # number of pages successfully queued fed_idx: int = 0 # number of pages successfully queued
batch_size: int = 32 # drain chunk batch_size: int = 32 # drain chunk
start_time: float = time.monotonic()
timeout_exceeded: bool = False
try: try:
while proc.success_count + proc.failure_count < total_pages: while proc.success_count + proc.failure_count < total_pages:
# 1) feed - try to enqueue until the first queue is full # 1) feed - try to enqueue until the first queue is full
@@ -571,7 +573,17 @@ class StandardPdfPipeline(ConvertPipeline):
assert itm.payload is not None assert itm.payload is not None
proc.pages.append(itm.payload) proc.pages.append(itm.payload)
# 3) failure safety - downstream closed early -> mark missing pages failed # 3) timeout check - respect document_timeout if configured
if self.pipeline_options.document_timeout is not None:
elapsed_time = time.monotonic() - start_time
if elapsed_time > self.pipeline_options.document_timeout:
_log.warning(
f"Document processing time ({elapsed_time:.3f} seconds) exceeded the specified timeout of {self.pipeline_options.document_timeout:.3f} seconds"
)
timeout_exceeded = True
break
# 4) failure safety - downstream closed early -> mark missing pages failed
if not out_batch and ctx.output_queue.closed: if not out_batch and ctx.output_queue.closed:
missing = total_pages - (proc.success_count + proc.failure_count) missing = total_pages - (proc.success_count + proc.failure_count)
if missing > 0: if missing > 0:
@@ -584,12 +596,12 @@ class StandardPdfPipeline(ConvertPipeline):
st.stop() st.stop()
ctx.output_queue.close() ctx.output_queue.close()
self._integrate_results(conv_res, proc) self._integrate_results(conv_res, proc, timeout_exceeded=timeout_exceeded)
return conv_res return conv_res
# ---------------------------------------------------- integrate_results() # ---------------------------------------------------- integrate_results()
def _integrate_results( def _integrate_results(
self, conv_res: ConversionResult, proc: ProcessingResult self, conv_res: ConversionResult, proc: ProcessingResult, timeout_exceeded: bool = False
) -> None: ) -> None:
page_map = {p.page_no: p for p in proc.pages} page_map = {p.page_no: p for p in proc.pages}
conv_res.pages = [ conv_res.pages = [
@@ -600,7 +612,7 @@ class StandardPdfPipeline(ConvertPipeline):
] ]
if proc.is_complete_failure: if proc.is_complete_failure:
conv_res.status = ConversionStatus.FAILURE conv_res.status = ConversionStatus.FAILURE
elif proc.is_partial_success: elif timeout_exceeded or proc.is_partial_success:
conv_res.status = ConversionStatus.PARTIAL_SUCCESS conv_res.status = ConversionStatus.PARTIAL_SUCCESS
else: else:
conv_res.status = ConversionStatus.SUCCESS conv_res.status = ConversionStatus.SUCCESS

View File

@@ -17,6 +17,32 @@ from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline
from docling.pipeline.threaded_standard_pdf_pipeline import ThreadedStandardPdfPipeline from docling.pipeline.threaded_standard_pdf_pipeline import ThreadedStandardPdfPipeline
def test_standard_pipeline_document_timeout():
"""Test that StandardPdfPipeline respects document_timeout"""
test_file = "tests/data/pdf/2203.01017v2.pdf" # Large file to ensure timeout can occur
# Configure pipeline with very short timeout
converter = DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(
pipeline_cls=StandardPdfPipeline,
pipeline_options=ThreadedPdfPipelineOptions(
document_timeout=0.1, # Very short timeout (100ms)
do_ocr=False, # Disable OCR to speed up processing
do_table_structure=False, # Disable table structure to speed up processing
),
)
}
)
result = converter.convert(test_file)
# Verify that timeout was respected
assert result.status == ConversionStatus.PARTIAL_SUCCESS, (
f"Expected PARTIAL_SUCCESS due to timeout, got {result.status}"
)
def test_threaded_pipeline_multiple_documents(): def test_threaded_pipeline_multiple_documents():
"""Test threaded pipeline with multiple documents and compare with standard pipeline""" """Test threaded pipeline with multiple documents and compare with standard pipeline"""
test_files = [ test_files = [