mirror of
https://github.com/DS4SD/docling.git
synced 2025-12-08 12:48:28 +00:00
fix: Respect document_timeout in new threaded StandardPdfPipeline (#2653)
* fix: Respect document_timeout in new threaded StandardPdfPipeline Signed-off-by: Christoph Auer <cau@zurich.ibm.com> * add test case to test_options Signed-off-by: Christoph Auer <cau@zurich.ibm.com> * fix: Make sure unprocessed pages are not getting into assemble_document Signed-off-by: Christoph Auer <cau@zurich.ibm.com> --------- Signed-off-by: Christoph Auer <cau@zurich.ibm.com>
This commit is contained in:
@@ -179,6 +179,7 @@ class ThreadedPipelineStage:
|
|||||||
batch_timeout: float,
|
batch_timeout: float,
|
||||||
queue_max_size: int,
|
queue_max_size: int,
|
||||||
postprocess: Optional[Callable[[ThreadedItem], None]] = None,
|
postprocess: Optional[Callable[[ThreadedItem], None]] = None,
|
||||||
|
timed_out_run_ids: Optional[set[int]] = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
self.name = name
|
self.name = name
|
||||||
self.model = model
|
self.model = model
|
||||||
@@ -189,6 +190,9 @@ class ThreadedPipelineStage:
|
|||||||
self._thread: Optional[threading.Thread] = None
|
self._thread: Optional[threading.Thread] = None
|
||||||
self._running = False
|
self._running = False
|
||||||
self._postprocess = postprocess
|
self._postprocess = postprocess
|
||||||
|
self._timed_out_run_ids = (
|
||||||
|
timed_out_run_ids if timed_out_run_ids is not None else set()
|
||||||
|
)
|
||||||
|
|
||||||
# ---------------------------------------------------------------- wiring
|
# ---------------------------------------------------------------- wiring
|
||||||
def add_output_queue(self, q: ThreadedQueue) -> None:
|
def add_output_queue(self, q: ThreadedQueue) -> None:
|
||||||
@@ -200,7 +204,7 @@ class ThreadedPipelineStage:
|
|||||||
return
|
return
|
||||||
self._running = True
|
self._running = True
|
||||||
self._thread = threading.Thread(
|
self._thread = threading.Thread(
|
||||||
target=self._run, name=f"Stage-{self.name}", daemon=True
|
target=self._run, name=f"Stage-{self.name}", daemon=False
|
||||||
)
|
)
|
||||||
self._thread.start()
|
self._thread.start()
|
||||||
|
|
||||||
@@ -210,9 +214,14 @@ class ThreadedPipelineStage:
|
|||||||
self._running = False
|
self._running = False
|
||||||
self.input_queue.close()
|
self.input_queue.close()
|
||||||
if self._thread is not None:
|
if self._thread is not None:
|
||||||
self._thread.join(timeout=30.0)
|
# Give thread 2s to finish naturally before abandoning
|
||||||
|
self._thread.join(timeout=15.0)
|
||||||
if self._thread.is_alive():
|
if self._thread.is_alive():
|
||||||
_log.warning("Stage %s did not terminate cleanly within 30s", self.name)
|
_log.warning(
|
||||||
|
"Stage %s thread did not terminate within 15s. "
|
||||||
|
"Thread is likely stuck in a blocking call and will be abandoned (resources may leak).",
|
||||||
|
self.name,
|
||||||
|
)
|
||||||
|
|
||||||
# ------------------------------------------------------------------ _run
|
# ------------------------------------------------------------------ _run
|
||||||
def _run(self) -> None:
|
def _run(self) -> None:
|
||||||
@@ -238,6 +247,16 @@ class ThreadedPipelineStage:
|
|||||||
|
|
||||||
result: list[ThreadedItem] = []
|
result: list[ThreadedItem] = []
|
||||||
for rid, items in groups.items():
|
for rid, items in groups.items():
|
||||||
|
# If run_id is timed out, skip processing but pass through items as-is
|
||||||
|
# This allows already-completed work to flow through while aborting new work
|
||||||
|
if rid in self._timed_out_run_ids:
|
||||||
|
for it in items:
|
||||||
|
it.is_failed = True
|
||||||
|
if it.error is None:
|
||||||
|
it.error = RuntimeError("document timeout exceeded")
|
||||||
|
result.extend(items)
|
||||||
|
continue
|
||||||
|
|
||||||
good: list[ThreadedItem] = [i for i in items if not i.is_failed]
|
good: list[ThreadedItem] = [i for i in items if not i.is_failed]
|
||||||
if not good:
|
if not good:
|
||||||
result.extend(items)
|
result.extend(items)
|
||||||
@@ -299,6 +318,7 @@ class PreprocessThreadedStage(ThreadedPipelineStage):
|
|||||||
batch_timeout: float,
|
batch_timeout: float,
|
||||||
queue_max_size: int,
|
queue_max_size: int,
|
||||||
model: Any,
|
model: Any,
|
||||||
|
timed_out_run_ids: Optional[set[int]] = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
super().__init__(
|
super().__init__(
|
||||||
name="preprocess",
|
name="preprocess",
|
||||||
@@ -306,6 +326,7 @@ class PreprocessThreadedStage(ThreadedPipelineStage):
|
|||||||
batch_size=1,
|
batch_size=1,
|
||||||
batch_timeout=batch_timeout,
|
batch_timeout=batch_timeout,
|
||||||
queue_max_size=queue_max_size,
|
queue_max_size=queue_max_size,
|
||||||
|
timed_out_run_ids=timed_out_run_ids,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _process_batch(self, batch: Sequence[ThreadedItem]) -> list[ThreadedItem]:
|
def _process_batch(self, batch: Sequence[ThreadedItem]) -> list[ThreadedItem]:
|
||||||
@@ -315,6 +336,16 @@ class PreprocessThreadedStage(ThreadedPipelineStage):
|
|||||||
|
|
||||||
result: list[ThreadedItem] = []
|
result: list[ThreadedItem] = []
|
||||||
for rid, items in groups.items():
|
for rid, items in groups.items():
|
||||||
|
# If run_id is timed out, skip processing but pass through items as-is
|
||||||
|
# This allows already-completed work to flow through while aborting new work
|
||||||
|
if rid in self._timed_out_run_ids:
|
||||||
|
for it in items:
|
||||||
|
it.is_failed = True
|
||||||
|
if it.error is None:
|
||||||
|
it.error = RuntimeError("document timeout exceeded")
|
||||||
|
result.extend(items)
|
||||||
|
continue
|
||||||
|
|
||||||
good = [i for i in items if not i.is_failed]
|
good = [i for i in items if not i.is_failed]
|
||||||
if not good:
|
if not good:
|
||||||
result.extend(items)
|
result.extend(items)
|
||||||
@@ -369,6 +400,7 @@ class RunContext:
|
|||||||
stages: list[ThreadedPipelineStage]
|
stages: list[ThreadedPipelineStage]
|
||||||
first_stage: ThreadedPipelineStage
|
first_stage: ThreadedPipelineStage
|
||||||
output_queue: ThreadedQueue
|
output_queue: ThreadedQueue
|
||||||
|
timed_out_run_ids: set[int] = field(default_factory=set)
|
||||||
|
|
||||||
|
|
||||||
# ──────────────────────────────────────────────────────────────────────────────
|
# ──────────────────────────────────────────────────────────────────────────────
|
||||||
@@ -473,10 +505,12 @@ class StandardPdfPipeline(ConvertPipeline):
|
|||||||
|
|
||||||
def _create_run_ctx(self) -> RunContext:
|
def _create_run_ctx(self) -> RunContext:
|
||||||
opts = self.pipeline_options
|
opts = self.pipeline_options
|
||||||
|
timed_out_run_ids: set[int] = set()
|
||||||
preprocess = PreprocessThreadedStage(
|
preprocess = PreprocessThreadedStage(
|
||||||
batch_timeout=opts.batch_polling_interval_seconds,
|
batch_timeout=opts.batch_polling_interval_seconds,
|
||||||
queue_max_size=opts.queue_max_size,
|
queue_max_size=opts.queue_max_size,
|
||||||
model=self.preprocessing_model,
|
model=self.preprocessing_model,
|
||||||
|
timed_out_run_ids=timed_out_run_ids,
|
||||||
)
|
)
|
||||||
ocr = ThreadedPipelineStage(
|
ocr = ThreadedPipelineStage(
|
||||||
name="ocr",
|
name="ocr",
|
||||||
@@ -484,6 +518,7 @@ class StandardPdfPipeline(ConvertPipeline):
|
|||||||
batch_size=opts.ocr_batch_size,
|
batch_size=opts.ocr_batch_size,
|
||||||
batch_timeout=opts.batch_polling_interval_seconds,
|
batch_timeout=opts.batch_polling_interval_seconds,
|
||||||
queue_max_size=opts.queue_max_size,
|
queue_max_size=opts.queue_max_size,
|
||||||
|
timed_out_run_ids=timed_out_run_ids,
|
||||||
)
|
)
|
||||||
layout = ThreadedPipelineStage(
|
layout = ThreadedPipelineStage(
|
||||||
name="layout",
|
name="layout",
|
||||||
@@ -491,6 +526,7 @@ class StandardPdfPipeline(ConvertPipeline):
|
|||||||
batch_size=opts.layout_batch_size,
|
batch_size=opts.layout_batch_size,
|
||||||
batch_timeout=opts.batch_polling_interval_seconds,
|
batch_timeout=opts.batch_polling_interval_seconds,
|
||||||
queue_max_size=opts.queue_max_size,
|
queue_max_size=opts.queue_max_size,
|
||||||
|
timed_out_run_ids=timed_out_run_ids,
|
||||||
)
|
)
|
||||||
table = ThreadedPipelineStage(
|
table = ThreadedPipelineStage(
|
||||||
name="table",
|
name="table",
|
||||||
@@ -498,6 +534,7 @@ class StandardPdfPipeline(ConvertPipeline):
|
|||||||
batch_size=opts.table_batch_size,
|
batch_size=opts.table_batch_size,
|
||||||
batch_timeout=opts.batch_polling_interval_seconds,
|
batch_timeout=opts.batch_polling_interval_seconds,
|
||||||
queue_max_size=opts.queue_max_size,
|
queue_max_size=opts.queue_max_size,
|
||||||
|
timed_out_run_ids=timed_out_run_ids,
|
||||||
)
|
)
|
||||||
assemble = ThreadedPipelineStage(
|
assemble = ThreadedPipelineStage(
|
||||||
name="assemble",
|
name="assemble",
|
||||||
@@ -506,6 +543,7 @@ class StandardPdfPipeline(ConvertPipeline):
|
|||||||
batch_timeout=opts.batch_polling_interval_seconds,
|
batch_timeout=opts.batch_polling_interval_seconds,
|
||||||
queue_max_size=opts.queue_max_size,
|
queue_max_size=opts.queue_max_size,
|
||||||
postprocess=self._release_page_resources,
|
postprocess=self._release_page_resources,
|
||||||
|
timed_out_run_ids=timed_out_run_ids,
|
||||||
)
|
)
|
||||||
|
|
||||||
# wire stages
|
# wire stages
|
||||||
@@ -517,11 +555,22 @@ class StandardPdfPipeline(ConvertPipeline):
|
|||||||
assemble.add_output_queue(output_q)
|
assemble.add_output_queue(output_q)
|
||||||
|
|
||||||
stages = [preprocess, ocr, layout, table, assemble]
|
stages = [preprocess, ocr, layout, table, assemble]
|
||||||
return RunContext(stages=stages, first_stage=preprocess, output_queue=output_q)
|
return RunContext(
|
||||||
|
stages=stages,
|
||||||
|
first_stage=preprocess,
|
||||||
|
output_queue=output_q,
|
||||||
|
timed_out_run_ids=timed_out_run_ids,
|
||||||
|
)
|
||||||
|
|
||||||
# --------------------------------------------------------------------- build
|
# --------------------------------------------------------------------- build
|
||||||
def _build_document(self, conv_res: ConversionResult) -> ConversionResult:
|
def _build_document(self, conv_res: ConversionResult) -> ConversionResult:
|
||||||
"""Stream-build the document while interleaving producer and consumer work."""
|
"""Stream-build the document while interleaving producer and consumer work.
|
||||||
|
|
||||||
|
Note: If a worker thread gets stuck in a blocking call (model inference or PDF backend
|
||||||
|
load_page/get_size), that thread will be abandoned after a brief wait (15s) during cleanup.
|
||||||
|
The thread continues running until the blocking call completes, potentially holding
|
||||||
|
resources (e.g., pypdfium2_lock).
|
||||||
|
"""
|
||||||
run_id = next(self._run_seq)
|
run_id = next(self._run_seq)
|
||||||
assert isinstance(conv_res.input._backend, PdfDocumentBackend)
|
assert isinstance(conv_res.input._backend, PdfDocumentBackend)
|
||||||
|
|
||||||
@@ -546,26 +595,50 @@ class StandardPdfPipeline(ConvertPipeline):
|
|||||||
proc = ProcessingResult(total_expected=total_pages)
|
proc = ProcessingResult(total_expected=total_pages)
|
||||||
fed_idx: int = 0 # number of pages successfully queued
|
fed_idx: int = 0 # number of pages successfully queued
|
||||||
batch_size: int = 32 # drain chunk
|
batch_size: int = 32 # drain chunk
|
||||||
|
start_time = time.monotonic()
|
||||||
|
timeout_exceeded = False
|
||||||
|
input_queue_closed = False
|
||||||
try:
|
try:
|
||||||
while proc.success_count + proc.failure_count < total_pages:
|
while proc.success_count + proc.failure_count < total_pages:
|
||||||
# 1) feed - try to enqueue until the first queue is full
|
# Check timeout
|
||||||
while fed_idx < total_pages:
|
if (
|
||||||
ok = ctx.first_stage.input_queue.put(
|
self.pipeline_options.document_timeout is not None
|
||||||
ThreadedItem(
|
and not timeout_exceeded
|
||||||
payload=pages[fed_idx],
|
):
|
||||||
run_id=run_id,
|
elapsed_time = time.monotonic() - start_time
|
||||||
page_no=pages[fed_idx].page_no,
|
if elapsed_time > self.pipeline_options.document_timeout:
|
||||||
conv_res=conv_res,
|
_log.warning(
|
||||||
),
|
f"Document processing time ({elapsed_time:.3f}s) "
|
||||||
timeout=0.0, # non-blocking try-put
|
f"exceeded timeout of {self.pipeline_options.document_timeout:.3f}s"
|
||||||
)
|
)
|
||||||
if ok:
|
timeout_exceeded = True
|
||||||
fed_idx += 1
|
ctx.timed_out_run_ids.add(run_id)
|
||||||
if fed_idx == total_pages:
|
if not input_queue_closed:
|
||||||
ctx.first_stage.input_queue.close()
|
ctx.first_stage.input_queue.close()
|
||||||
else: # queue full - switch to draining
|
input_queue_closed = True
|
||||||
|
# Break immediately - don't wait for in-flight work
|
||||||
break
|
break
|
||||||
|
|
||||||
|
# 1) feed - try to enqueue until the first queue is full
|
||||||
|
if not input_queue_closed:
|
||||||
|
while fed_idx < total_pages:
|
||||||
|
ok = ctx.first_stage.input_queue.put(
|
||||||
|
ThreadedItem(
|
||||||
|
payload=pages[fed_idx],
|
||||||
|
run_id=run_id,
|
||||||
|
page_no=pages[fed_idx].page_no,
|
||||||
|
conv_res=conv_res,
|
||||||
|
),
|
||||||
|
timeout=0.0, # non-blocking try-put
|
||||||
|
)
|
||||||
|
if ok:
|
||||||
|
fed_idx += 1
|
||||||
|
if fed_idx == total_pages:
|
||||||
|
ctx.first_stage.input_queue.close()
|
||||||
|
input_queue_closed = True
|
||||||
|
else: # queue full - switch to draining
|
||||||
|
break
|
||||||
|
|
||||||
# 2) drain - pull whatever is ready from the output side
|
# 2) drain - pull whatever is ready from the output side
|
||||||
out_batch = ctx.output_queue.get_batch(batch_size, timeout=0.05)
|
out_batch = ctx.output_queue.get_batch(batch_size, timeout=0.05)
|
||||||
for itm in out_batch:
|
for itm in out_batch:
|
||||||
@@ -579,7 +652,7 @@ class StandardPdfPipeline(ConvertPipeline):
|
|||||||
assert itm.payload is not None
|
assert itm.payload is not None
|
||||||
proc.pages.append(itm.payload)
|
proc.pages.append(itm.payload)
|
||||||
|
|
||||||
# 3) failure safety - downstream closed early -> mark missing pages failed
|
# 3) failure safety - downstream closed early
|
||||||
if not out_batch and ctx.output_queue.closed:
|
if not out_batch and ctx.output_queue.closed:
|
||||||
missing = total_pages - (proc.success_count + proc.failure_count)
|
missing = total_pages - (proc.success_count + proc.failure_count)
|
||||||
if missing > 0:
|
if missing > 0:
|
||||||
@@ -587,24 +660,36 @@ class StandardPdfPipeline(ConvertPipeline):
|
|||||||
[(-1, RuntimeError("pipeline terminated early"))] * missing
|
[(-1, RuntimeError("pipeline terminated early"))] * missing
|
||||||
)
|
)
|
||||||
break
|
break
|
||||||
|
|
||||||
|
# Mark remaining pages as failed if timeout occurred
|
||||||
|
if timeout_exceeded:
|
||||||
|
completed_page_nos = {p.page_no for p in proc.pages} | {
|
||||||
|
fp for fp, _ in proc.failed_pages
|
||||||
|
}
|
||||||
|
for page in pages[fed_idx:]:
|
||||||
|
if page.page_no not in completed_page_nos:
|
||||||
|
proc.failed_pages.append(
|
||||||
|
(page.page_no, RuntimeError("document timeout exceeded"))
|
||||||
|
)
|
||||||
finally:
|
finally:
|
||||||
for st in ctx.stages:
|
for st in ctx.stages:
|
||||||
st.stop()
|
st.stop()
|
||||||
ctx.output_queue.close()
|
ctx.output_queue.close()
|
||||||
|
|
||||||
self._integrate_results(conv_res, proc)
|
self._integrate_results(conv_res, proc, timeout_exceeded=timeout_exceeded)
|
||||||
return conv_res
|
return conv_res
|
||||||
|
|
||||||
# ---------------------------------------------------- integrate_results()
|
# ---------------------------------------------------- integrate_results()
|
||||||
def _integrate_results(
|
def _integrate_results(
|
||||||
self, conv_res: ConversionResult, proc: ProcessingResult
|
self,
|
||||||
|
conv_res: ConversionResult,
|
||||||
|
proc: ProcessingResult,
|
||||||
|
timeout_exceeded: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
page_map = {p.page_no: p for p in proc.pages}
|
page_map = {p.page_no: p for p in proc.pages}
|
||||||
|
# Only keep pages that successfully completed processing
|
||||||
conv_res.pages = [
|
conv_res.pages = [
|
||||||
page_map.get(p.page_no, p)
|
page_map[p.page_no] for p in conv_res.pages if p.page_no in page_map
|
||||||
for p in conv_res.pages
|
|
||||||
if p.page_no in page_map
|
|
||||||
or not any(fp == p.page_no for fp, _ in proc.failed_pages)
|
|
||||||
]
|
]
|
||||||
# Add error details from failed pages
|
# Add error details from failed pages
|
||||||
for page_no, error in proc.failed_pages:
|
for page_no, error in proc.failed_pages:
|
||||||
@@ -616,7 +701,10 @@ class StandardPdfPipeline(ConvertPipeline):
|
|||||||
error_message=f"{page_label}: {error_msg}" if error_msg else page_label,
|
error_message=f"{page_label}: {error_msg}" if error_msg else page_label,
|
||||||
)
|
)
|
||||||
conv_res.errors.append(error_item)
|
conv_res.errors.append(error_item)
|
||||||
if proc.is_complete_failure:
|
if timeout_exceeded and proc.total_expected > 0:
|
||||||
|
# Timeout exceeded: set PARTIAL_SUCCESS if any pages were attempted
|
||||||
|
conv_res.status = ConversionStatus.PARTIAL_SUCCESS
|
||||||
|
elif proc.is_complete_failure:
|
||||||
conv_res.status = ConversionStatus.FAILURE
|
conv_res.status = ConversionStatus.FAILURE
|
||||||
elif proc.is_partial_success:
|
elif proc.is_partial_success:
|
||||||
conv_res.status = ConversionStatus.PARTIAL_SUCCESS
|
conv_res.status = ConversionStatus.PARTIAL_SUCCESS
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ from docling.datamodel.pipeline_options import (
|
|||||||
TableFormerMode,
|
TableFormerMode,
|
||||||
)
|
)
|
||||||
from docling.document_converter import DocumentConverter, PdfFormatOption
|
from docling.document_converter import DocumentConverter, PdfFormatOption
|
||||||
|
from docling.pipeline.legacy_standard_pdf_pipeline import LegacyStandardPdfPipeline
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
@@ -118,6 +119,33 @@ def test_page_range(test_doc_path):
|
|||||||
assert doc_result.status == ConversionStatus.FAILURE
|
assert doc_result.status == ConversionStatus.FAILURE
|
||||||
|
|
||||||
|
|
||||||
|
def test_document_timeout(test_doc_path):
|
||||||
|
converter = DocumentConverter(
|
||||||
|
format_options={
|
||||||
|
InputFormat.PDF: PdfFormatOption(
|
||||||
|
pipeline_options=PdfPipelineOptions(document_timeout=1)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
result = converter.convert(test_doc_path)
|
||||||
|
assert result.status == ConversionStatus.PARTIAL_SUCCESS, (
|
||||||
|
"Expected document timeout to be used"
|
||||||
|
)
|
||||||
|
|
||||||
|
converter = DocumentConverter(
|
||||||
|
format_options={
|
||||||
|
InputFormat.PDF: PdfFormatOption(
|
||||||
|
pipeline_options=PdfPipelineOptions(document_timeout=1),
|
||||||
|
pipeline_cls=LegacyStandardPdfPipeline,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
result = converter.convert(test_doc_path)
|
||||||
|
assert result.status == ConversionStatus.PARTIAL_SUCCESS, (
|
||||||
|
"Expected document timeout to be used"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_ocr_coverage_threshold(test_doc_path):
|
def test_ocr_coverage_threshold(test_doc_path):
|
||||||
pipeline_options = PdfPipelineOptions()
|
pipeline_options = PdfPipelineOptions()
|
||||||
pipeline_options.do_ocr = True
|
pipeline_options.do_ocr = True
|
||||||
|
|||||||
Reference in New Issue
Block a user