From 425f38a5aa131b271417e9ff769fb223a6ef7fec Mon Sep 17 00:00:00 2001 From: Christoph Auer Date: Wed, 23 Jul 2025 16:03:25 +0200 Subject: [PATCH] Clean up unused code Signed-off-by: Christoph Auer --- docling/datamodel/pipeline_options.py | 11 ----- .../threaded_standard_pdf_pipeline.py | 46 +------------------ 2 files changed, 1 insertion(+), 56 deletions(-) diff --git a/docling/datamodel/pipeline_options.py b/docling/datamodel/pipeline_options.py index c33a4efa..36f26fef 100644 --- a/docling/datamodel/pipeline_options.py +++ b/docling/datamodel/pipeline_options.py @@ -347,14 +347,3 @@ class ThreadedPdfPipelineOptions(PdfPipelineOptions): # Backpressure and queue control queue_max_size: int = 100 - - # Pipeline coordination - batch_timeout_seconds is the only safe timeout - # stage_timeout_seconds and collection_timeout_seconds removed to prevent data loss - - @classmethod - def from_sync_options( - cls, sync_options: PdfPipelineOptions - ) -> "ThreadedPdfPipelineOptions": - """Convert sync options to threaded options""" - data = sync_options.model_dump() - return cls(**data) diff --git a/docling/pipeline/threaded_standard_pdf_pipeline.py b/docling/pipeline/threaded_standard_pdf_pipeline.py index 1f7e9a1e..964a8d42 100644 --- a/docling/pipeline/threaded_standard_pdf_pipeline.py +++ b/docling/pipeline/threaded_standard_pdf_pipeline.py @@ -21,18 +21,15 @@ import itertools import logging import threading import time -import warnings from collections import defaultdict, deque from dataclasses import dataclass, field from pathlib import Path from typing import Any, Iterable, List, Optional, Sequence, Tuple -from docling_core.types.doc import NodeItem - from docling.backend.abstract_backend import AbstractDocumentBackend from docling.backend.pdf_backend import PdfDocumentBackend from docling.datamodel.base_models import AssembledUnit, ConversionStatus, Page -from docling.datamodel.document import ConversionResult, InputDocument +from docling.datamodel.document import ConversionResult from docling.datamodel.pipeline_options import ThreadedPdfPipelineOptions from docling.datamodel.settings import settings from docling.models.code_formula_model import CodeFormulaModel, CodeFormulaModelOptions @@ -549,47 +546,6 @@ class ThreadedStandardPdfPipeline(BasePipeline): self._integrate_results(conv_res, proc) return conv_res - # -------------------------------------------------------------- feed_pages - def _feed_pages( - self, - stage: ThreadedPipelineStage, - pages: Sequence[Page], - conv_res: ConversionResult, - run_id: int, - ) -> None: - for pg in pages: - ok = stage.input_queue.put( - ThreadedItem( - payload=pg, run_id=run_id, page_no=pg.page_no, conv_res=conv_res - ) - ) - if not ok: - raise RuntimeError("Input queue closed while feeding pages") - stage.input_queue.close() - - # ------------------------------------------------------------- _collect() - def _collect( - self, ctx: RunContext, conv_res: ConversionResult, run_id: int, expected: int - ) -> ProcessingResult: - res = ProcessingResult(total_expected=expected) - while True: - batch = ctx.output_queue.get_batch(expected, timeout=None) - if not batch and ctx.output_queue.closed: - break - for itm in batch: - if itm.run_id != run_id: - continue # not our run (should not happen due to isolation) - if itm.is_failed or itm.error: - res.failed_pages.append( - (itm.page_no, itm.error or Exception("unknown error")) - ) - else: - assert itm.payload is not None - res.pages.append(itm.payload) - if res.success_count + res.failure_count >= expected: - break - return res - # ---------------------------------------------------- integrate_results() def _integrate_results( self, conv_res: ConversionResult, proc: ProcessingResult