Fix: don't starve on docs with > max_queue_size pages

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>
This commit is contained in:
Christoph Auer 2025-07-19 17:11:32 +02:00
parent b36ad76b2a
commit 009cc24d0d

View File

@ -29,6 +29,7 @@ from typing import Any, Iterable, List, Optional, Sequence, Tuple
from docling_core.types.doc import NodeItem
from docling.backend.abstract_backend import AbstractDocumentBackend
from docling.backend.pdf_backend import PdfDocumentBackend
from docling.datamodel.base_models import AssembledUnit, ConversionStatus, Page
from docling.datamodel.document import ConversionResult, InputDocument
@ -470,19 +471,12 @@ class ThreadedStandardPdfPipeline(BasePipeline):
# --------------------------------------------------------------------- build
def _build_document(self, conv_res: ConversionResult) -> ConversionResult: # type: ignore[override]
"""Stream-build the document while interleaving producer and consumer work."""
run_id = next(self._run_seq)
return self._build_document_with_run_id(conv_res, run_id)
assert isinstance(conv_res.input._backend, PdfDocumentBackend)
backend = conv_res.input._backend
def _build_document_with_run_id(
self, conv_res: ConversionResult, run_id: int
) -> ConversionResult:
"""Internal method that handles the actual document building with run_id."""
backend: PdfDocumentBackend = conv_res.input._backend # type: ignore[assignment]
if not isinstance(backend, PdfDocumentBackend):
conv_res.status = ConversionStatus.FAILURE
return conv_res
# preload page objects
# preload & initialise pages -------------------------------------------------------------
start_page, end_page = conv_res.input.limits.page_range
pages: list[Page] = []
for i in range(conv_res.input.page_count):
@ -493,21 +487,66 @@ class ThreadedStandardPdfPipeline(BasePipeline):
page.size = page._backend.get_size()
conv_res.pages.append(page)
pages.append(page)
if not pages:
conv_res.status = ConversionStatus.FAILURE
return conv_res
ctx = self._create_run_ctx()
total_pages: int = len(pages)
ctx: RunContext = self._create_run_ctx()
for st in ctx.stages:
st.start()
proc = ProcessingResult(total_expected=total_pages)
fed_idx: int = 0 # number of pages successfully queued
batch_size: int = 32 # drain chunk
try:
self._feed_pages(ctx.first_stage, pages, conv_res, run_id)
proc = self._collect(ctx, conv_res, run_id, len(pages))
self._integrate_results(conv_res, proc)
while proc.success_count + proc.failure_count < total_pages:
# 1) feed - try to enqueue until the first queue is full
while fed_idx < total_pages:
ok = ctx.first_stage.input_queue.put(
ThreadedItem(
payload=pages[fed_idx],
run_id=run_id,
page_no=pages[fed_idx].page_no,
conv_res=conv_res,
),
timeout=0.0, # non-blocking try-put
)
if ok:
fed_idx += 1
if fed_idx == total_pages:
ctx.first_stage.input_queue.close()
else: # queue full - switch to draining
break
# 2) drain - pull whatever is ready from the output side
out_batch = ctx.output_queue.get_batch(batch_size, timeout=0.05)
for itm in out_batch:
if itm.run_id != run_id:
continue
if itm.is_failed or itm.error:
proc.failed_pages.append(
(itm.page_no, itm.error or RuntimeError("unknown error"))
)
else:
assert itm.payload is not None
proc.pages.append(itm.payload)
# 3) failure safety - downstream closed early -> mark missing pages failed
if not out_batch and ctx.output_queue.closed:
missing = total_pages - (proc.success_count + proc.failure_count)
if missing > 0:
proc.failed_pages.extend(
[(-1, RuntimeError("pipeline terminated early"))] * missing
)
break
finally:
for st in ctx.stages:
st.stop()
ctx.output_queue.close()
self._integrate_results(conv_res, proc)
return conv_res
# -------------------------------------------------------------- feed_pages
@ -597,7 +636,7 @@ class ThreadedStandardPdfPipeline(BasePipeline):
return ThreadedPdfPipelineOptions()
@classmethod
def is_backend_supported(cls, backend) -> bool: # type: ignore[override]
def is_backend_supported(cls, backend: AbstractDocumentBackend) -> bool: # type: ignore[override]
return isinstance(backend, PdfDocumentBackend)
def _determine_status(self, conv_res: ConversionResult) -> ConversionStatus: # type: ignore[override]