diff --git a/docling/pipeline/threaded_standard_pdf_pipeline.py b/docling/pipeline/threaded_standard_pdf_pipeline.py index cafca746..5c6c0752 100644 --- a/docling/pipeline/threaded_standard_pdf_pipeline.py +++ b/docling/pipeline/threaded_standard_pdf_pipeline.py @@ -29,6 +29,7 @@ from typing import Any, Iterable, List, Optional, Sequence, Tuple from docling_core.types.doc import NodeItem +from docling.backend.abstract_backend import AbstractDocumentBackend from docling.backend.pdf_backend import PdfDocumentBackend from docling.datamodel.base_models import AssembledUnit, ConversionStatus, Page from docling.datamodel.document import ConversionResult, InputDocument @@ -470,19 +471,12 @@ class ThreadedStandardPdfPipeline(BasePipeline): # --------------------------------------------------------------------- build def _build_document(self, conv_res: ConversionResult) -> ConversionResult: # type: ignore[override] + """Stream-build the document while interleaving producer and consumer work.""" run_id = next(self._run_seq) - return self._build_document_with_run_id(conv_res, run_id) + assert isinstance(conv_res.input._backend, PdfDocumentBackend) + backend = conv_res.input._backend - def _build_document_with_run_id( - self, conv_res: ConversionResult, run_id: int - ) -> ConversionResult: - """Internal method that handles the actual document building with run_id.""" - backend: PdfDocumentBackend = conv_res.input._backend # type: ignore[assignment] - if not isinstance(backend, PdfDocumentBackend): - conv_res.status = ConversionStatus.FAILURE - return conv_res - - # preload page objects + # preload & initialise pages ------------------------------------------------------------- start_page, end_page = conv_res.input.limits.page_range pages: list[Page] = [] for i in range(conv_res.input.page_count): @@ -493,21 +487,66 @@ class ThreadedStandardPdfPipeline(BasePipeline): page.size = page._backend.get_size() conv_res.pages.append(page) pages.append(page) + if not pages: conv_res.status = ConversionStatus.FAILURE return conv_res - ctx = self._create_run_ctx() + total_pages: int = len(pages) + ctx: RunContext = self._create_run_ctx() for st in ctx.stages: st.start() + + proc = ProcessingResult(total_expected=total_pages) + fed_idx: int = 0 # number of pages successfully queued + batch_size: int = 32 # drain chunk try: - self._feed_pages(ctx.first_stage, pages, conv_res, run_id) - proc = self._collect(ctx, conv_res, run_id, len(pages)) - self._integrate_results(conv_res, proc) + while proc.success_count + proc.failure_count < total_pages: + # 1) feed - try to enqueue until the first queue is full + while fed_idx < total_pages: + ok = ctx.first_stage.input_queue.put( + ThreadedItem( + payload=pages[fed_idx], + run_id=run_id, + page_no=pages[fed_idx].page_no, + conv_res=conv_res, + ), + timeout=0.0, # non-blocking try-put + ) + if ok: + fed_idx += 1 + if fed_idx == total_pages: + ctx.first_stage.input_queue.close() + else: # queue full - switch to draining + break + + # 2) drain - pull whatever is ready from the output side + out_batch = ctx.output_queue.get_batch(batch_size, timeout=0.05) + for itm in out_batch: + if itm.run_id != run_id: + continue + if itm.is_failed or itm.error: + proc.failed_pages.append( + (itm.page_no, itm.error or RuntimeError("unknown error")) + ) + else: + assert itm.payload is not None + proc.pages.append(itm.payload) + + # 3) failure safety - downstream closed early -> mark missing pages failed + if not out_batch and ctx.output_queue.closed: + missing = total_pages - (proc.success_count + proc.failure_count) + if missing > 0: + proc.failed_pages.extend( + [(-1, RuntimeError("pipeline terminated early"))] * missing + ) + break finally: for st in ctx.stages: st.stop() ctx.output_queue.close() + + self._integrate_results(conv_res, proc) return conv_res # -------------------------------------------------------------- feed_pages @@ -597,7 +636,7 @@ class ThreadedStandardPdfPipeline(BasePipeline): return ThreadedPdfPipelineOptions() @classmethod - def is_backend_supported(cls, backend) -> bool: # type: ignore[override] + def is_backend_supported(cls, backend: AbstractDocumentBackend) -> bool: # type: ignore[override] return isinstance(backend, PdfDocumentBackend) def _determine_status(self, conv_res: ConversionResult) -> ConversionStatus: # type: ignore[override]