Clean up unused code

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>
This commit is contained in:
Christoph Auer 2025-07-23 16:03:25 +02:00
parent de0d9b50a2
commit 425f38a5aa
2 changed files with 1 additions and 56 deletions

View File

@ -347,14 +347,3 @@ class ThreadedPdfPipelineOptions(PdfPipelineOptions):
# Backpressure and queue control
queue_max_size: int = 100
# Pipeline coordination - batch_timeout_seconds is the only safe timeout
# stage_timeout_seconds and collection_timeout_seconds removed to prevent data loss
@classmethod
def from_sync_options(
cls, sync_options: PdfPipelineOptions
) -> "ThreadedPdfPipelineOptions":
"""Convert sync options to threaded options"""
data = sync_options.model_dump()
return cls(**data)

View File

@ -21,18 +21,15 @@ import itertools
import logging
import threading
import time
import warnings
from collections import defaultdict, deque
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterable, List, Optional, Sequence, Tuple
from docling_core.types.doc import NodeItem
from docling.backend.abstract_backend import AbstractDocumentBackend
from docling.backend.pdf_backend import PdfDocumentBackend
from docling.datamodel.base_models import AssembledUnit, ConversionStatus, Page
from docling.datamodel.document import ConversionResult, InputDocument
from docling.datamodel.document import ConversionResult
from docling.datamodel.pipeline_options import ThreadedPdfPipelineOptions
from docling.datamodel.settings import settings
from docling.models.code_formula_model import CodeFormulaModel, CodeFormulaModelOptions
@ -549,47 +546,6 @@ class ThreadedStandardPdfPipeline(BasePipeline):
self._integrate_results(conv_res, proc)
return conv_res
# -------------------------------------------------------------- feed_pages
def _feed_pages(
self,
stage: ThreadedPipelineStage,
pages: Sequence[Page],
conv_res: ConversionResult,
run_id: int,
) -> None:
for pg in pages:
ok = stage.input_queue.put(
ThreadedItem(
payload=pg, run_id=run_id, page_no=pg.page_no, conv_res=conv_res
)
)
if not ok:
raise RuntimeError("Input queue closed while feeding pages")
stage.input_queue.close()
# ------------------------------------------------------------- _collect()
def _collect(
self, ctx: RunContext, conv_res: ConversionResult, run_id: int, expected: int
) -> ProcessingResult:
res = ProcessingResult(total_expected=expected)
while True:
batch = ctx.output_queue.get_batch(expected, timeout=None)
if not batch and ctx.output_queue.closed:
break
for itm in batch:
if itm.run_id != run_id:
continue # not our run (should not happen due to isolation)
if itm.is_failed or itm.error:
res.failed_pages.append(
(itm.page_no, itm.error or Exception("unknown error"))
)
else:
assert itm.payload is not None
res.pages.append(itm.payload)
if res.success_count + res.failure_count >= expected:
break
return res
# ---------------------------------------------------- integrate_results()
def _integrate_results(
self, conv_res: ConversionResult, proc: ProcessingResult