mirror of
https://github.com/DS4SD/docling.git
synced 2025-07-27 04:24:45 +00:00
Merge e6d5e4e48f
into aec29a7315
This commit is contained in:
commit
e5407304a0
@ -332,3 +332,18 @@ class ProcessingPipeline(str, Enum):
|
|||||||
STANDARD = "standard"
|
STANDARD = "standard"
|
||||||
VLM = "vlm"
|
VLM = "vlm"
|
||||||
ASR = "asr"
|
ASR = "asr"
|
||||||
|
|
||||||
|
|
||||||
|
class ThreadedPdfPipelineOptions(PdfPipelineOptions):
|
||||||
|
"""Pipeline options for the threaded PDF pipeline with batching and backpressure control"""
|
||||||
|
|
||||||
|
# Batch sizes for different stages
|
||||||
|
ocr_batch_size: int = 4
|
||||||
|
layout_batch_size: int = 4
|
||||||
|
table_batch_size: int = 4
|
||||||
|
|
||||||
|
# Timing control
|
||||||
|
batch_timeout_seconds: float = 2.0
|
||||||
|
|
||||||
|
# Backpressure and queue control
|
||||||
|
queue_max_size: int = 100
|
||||||
|
@ -26,18 +26,13 @@ class DocumentLimits(BaseModel):
|
|||||||
|
|
||||||
|
|
||||||
class BatchConcurrencySettings(BaseModel):
|
class BatchConcurrencySettings(BaseModel):
|
||||||
doc_batch_size: int = 2
|
doc_batch_size: int = 1 # Number of documents processed in one batch. Should be >= doc_batch_concurrency
|
||||||
doc_batch_concurrency: int = 2
|
doc_batch_concurrency: int = 1 # Number of parallel threads processing documents. Warning: Experimental! No benefit expected without free-threaded python.
|
||||||
page_batch_size: int = 4
|
page_batch_size: int = 4 # Number of pages processed in one batch.
|
||||||
page_batch_concurrency: int = 2
|
page_batch_concurrency: int = 1 # Currently unused.
|
||||||
elements_batch_size: int = 16
|
elements_batch_size: int = (
|
||||||
|
16 # Number of elements processed in one batch, in enrichment models.
|
||||||
# doc_batch_size: int = 1
|
)
|
||||||
# doc_batch_concurrency: int = 1
|
|
||||||
# page_batch_size: int = 1
|
|
||||||
# page_batch_concurrency: int = 1
|
|
||||||
|
|
||||||
# model_concurrency: int = 2
|
|
||||||
|
|
||||||
# To force models into single core: export OMP_NUM_THREADS=1
|
# To force models into single core: export OMP_NUM_THREADS=1
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@ import sys
|
|||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
from collections.abc import Iterable, Iterator
|
from collections.abc import Iterable, Iterator
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Dict, List, Optional, Tuple, Type, Union
|
from typing import Dict, List, Optional, Tuple, Type, Union
|
||||||
@ -284,16 +285,25 @@ class DocumentConverter:
|
|||||||
settings.perf.doc_batch_size, # pass format_options
|
settings.perf.doc_batch_size, # pass format_options
|
||||||
):
|
):
|
||||||
_log.info("Going to convert document batch...")
|
_log.info("Going to convert document batch...")
|
||||||
|
process_func = partial(
|
||||||
|
self._process_document, raises_on_error=raises_on_error
|
||||||
|
)
|
||||||
|
|
||||||
# parallel processing only within input_batch
|
if (
|
||||||
# with ThreadPoolExecutor(
|
settings.perf.doc_batch_concurrency > 1
|
||||||
# max_workers=settings.perf.doc_batch_concurrency
|
and settings.perf.doc_batch_size > 1
|
||||||
# ) as pool:
|
):
|
||||||
# yield from pool.map(self.process_document, input_batch)
|
with ThreadPoolExecutor(
|
||||||
# Note: PDF backends are not thread-safe, thread pool usage was disabled.
|
max_workers=settings.perf.doc_batch_concurrency
|
||||||
|
) as pool:
|
||||||
|
for item in pool.map(
|
||||||
|
process_func,
|
||||||
|
input_batch,
|
||||||
|
):
|
||||||
|
yield item
|
||||||
|
else:
|
||||||
for item in map(
|
for item in map(
|
||||||
partial(self._process_document, raises_on_error=raises_on_error),
|
process_func,
|
||||||
input_batch,
|
input_batch,
|
||||||
):
|
):
|
||||||
elapsed = time.monotonic() - start_time
|
elapsed = time.monotonic() - start_time
|
||||||
|
@ -3,7 +3,7 @@ import logging
|
|||||||
import warnings
|
import warnings
|
||||||
from collections.abc import Iterable
|
from collections.abc import Iterable
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional
|
from typing import List, Optional, Union
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from docling_core.types.doc import DocItemLabel
|
from docling_core.types.doc import DocItemLabel
|
||||||
@ -148,25 +148,48 @@ class LayoutModel(BasePageModel):
|
|||||||
def __call__(
|
def __call__(
|
||||||
self, conv_res: ConversionResult, page_batch: Iterable[Page]
|
self, conv_res: ConversionResult, page_batch: Iterable[Page]
|
||||||
) -> Iterable[Page]:
|
) -> Iterable[Page]:
|
||||||
for page in page_batch:
|
# Convert to list to allow multiple iterations
|
||||||
|
pages = list(page_batch)
|
||||||
|
|
||||||
|
# Separate valid and invalid pages
|
||||||
|
valid_pages = []
|
||||||
|
valid_page_images: List[Union[Image.Image, np.ndarray]] = []
|
||||||
|
|
||||||
|
for page in pages:
|
||||||
assert page._backend is not None
|
assert page._backend is not None
|
||||||
if not page._backend.is_valid():
|
if not page._backend.is_valid():
|
||||||
yield page
|
continue
|
||||||
else:
|
|
||||||
with TimeRecorder(conv_res, "layout"):
|
|
||||||
assert page.size is not None
|
assert page.size is not None
|
||||||
page_image = page.get_image(scale=1.0)
|
page_image = page.get_image(scale=1.0)
|
||||||
assert page_image is not None
|
assert page_image is not None
|
||||||
|
|
||||||
|
valid_pages.append(page)
|
||||||
|
valid_page_images.append(page_image)
|
||||||
|
|
||||||
|
# Process all valid pages with batch prediction
|
||||||
|
batch_predictions = []
|
||||||
|
if valid_page_images:
|
||||||
|
with TimeRecorder(conv_res, "layout"):
|
||||||
|
batch_predictions = self.layout_predictor.predict_batch( # type: ignore[attr-defined]
|
||||||
|
valid_page_images
|
||||||
|
)
|
||||||
|
|
||||||
|
# Process each page with its predictions
|
||||||
|
valid_page_idx = 0
|
||||||
|
for page in pages:
|
||||||
|
assert page._backend is not None
|
||||||
|
if not page._backend.is_valid():
|
||||||
|
yield page
|
||||||
|
continue
|
||||||
|
|
||||||
|
page_predictions = batch_predictions[valid_page_idx]
|
||||||
|
valid_page_idx += 1
|
||||||
|
|
||||||
clusters = []
|
clusters = []
|
||||||
for ix, pred_item in enumerate(
|
for ix, pred_item in enumerate(page_predictions):
|
||||||
self.layout_predictor.predict(page_image)
|
|
||||||
):
|
|
||||||
label = DocItemLabel(
|
label = DocItemLabel(
|
||||||
pred_item["label"]
|
pred_item["label"].lower().replace(" ", "_").replace("-", "_")
|
||||||
.lower()
|
|
||||||
.replace(" ", "_")
|
|
||||||
.replace("-", "_")
|
|
||||||
) # Temporary, until docling-ibm-model uses docling-core types
|
) # Temporary, until docling-ibm-model uses docling-core types
|
||||||
cluster = Cluster(
|
cluster = Cluster(
|
||||||
id=ix,
|
id=ix,
|
||||||
@ -183,7 +206,6 @@ class LayoutModel(BasePageModel):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Apply postprocessing
|
# Apply postprocessing
|
||||||
|
|
||||||
processed_clusters, processed_cells = LayoutPostprocessor(
|
processed_clusters, processed_cells = LayoutPostprocessor(
|
||||||
page, clusters, self.options
|
page, clusters, self.options
|
||||||
).postprocess()
|
).postprocess()
|
||||||
@ -202,14 +224,10 @@ class LayoutModel(BasePageModel):
|
|||||||
)
|
)
|
||||||
|
|
||||||
conv_res.confidence.pages[page.page_no].ocr_score = float(
|
conv_res.confidence.pages[page.page_no].ocr_score = float(
|
||||||
np.mean(
|
np.mean([c.confidence for c in processed_cells if c.from_ocr])
|
||||||
[c.confidence for c in processed_cells if c.from_ocr]
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
page.predictions.layout = LayoutPrediction(
|
page.predictions.layout = LayoutPrediction(clusters=processed_clusters)
|
||||||
clusters=processed_clusters
|
|
||||||
)
|
|
||||||
|
|
||||||
if settings.debug.visualize_layout:
|
if settings.debug.visualize_layout:
|
||||||
self.draw_clusters_and_cells_side_by_side(
|
self.draw_clusters_and_cells_side_by_side(
|
||||||
|
605
docling/pipeline/threaded_standard_pdf_pipeline.py
Normal file
605
docling/pipeline/threaded_standard_pdf_pipeline.py
Normal file
@ -0,0 +1,605 @@
|
|||||||
|
# threaded_standard_pdf_pipeline.py
|
||||||
|
"""Thread-safe, production-ready PDF pipeline
|
||||||
|
================================================
|
||||||
|
A self-contained, thread-safe PDF conversion pipeline exploiting parallelism between pipeline stages and models.
|
||||||
|
|
||||||
|
* **Per-run isolation** - every :py:meth:`execute` call uses its own bounded queues and worker
|
||||||
|
threads so that concurrent invocations never share mutable state.
|
||||||
|
* **Deterministic run identifiers** - pages are tracked with an internal *run-id* instead of
|
||||||
|
relying on :pyfunc:`id`, which may clash after garbage collection.
|
||||||
|
* **Explicit back-pressure & shutdown** - producers block on full queues; queue *close()*
|
||||||
|
propagates downstream so stages terminate deterministically without sentinels.
|
||||||
|
* **Minimal shared state** - heavyweight models are initialised once per pipeline instance
|
||||||
|
and only read by worker threads; no runtime mutability is exposed.
|
||||||
|
* **Strict typing & clean API usage** - code is fully annotated and respects *coding_rules.md*.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import itertools
|
||||||
|
import logging
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from collections import defaultdict, deque
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Iterable, List, Optional, Sequence, Tuple
|
||||||
|
|
||||||
|
from docling.backend.abstract_backend import AbstractDocumentBackend
|
||||||
|
from docling.backend.pdf_backend import PdfDocumentBackend
|
||||||
|
from docling.datamodel.base_models import AssembledUnit, ConversionStatus, Page
|
||||||
|
from docling.datamodel.document import ConversionResult
|
||||||
|
from docling.datamodel.pipeline_options import ThreadedPdfPipelineOptions
|
||||||
|
from docling.datamodel.settings import settings
|
||||||
|
from docling.models.code_formula_model import CodeFormulaModel, CodeFormulaModelOptions
|
||||||
|
from docling.models.document_picture_classifier import (
|
||||||
|
DocumentPictureClassifier,
|
||||||
|
DocumentPictureClassifierOptions,
|
||||||
|
)
|
||||||
|
from docling.models.factories import get_ocr_factory, get_picture_description_factory
|
||||||
|
from docling.models.layout_model import LayoutModel
|
||||||
|
from docling.models.page_assemble_model import PageAssembleModel, PageAssembleOptions
|
||||||
|
from docling.models.page_preprocessing_model import (
|
||||||
|
PagePreprocessingModel,
|
||||||
|
PagePreprocessingOptions,
|
||||||
|
)
|
||||||
|
from docling.models.picture_description_base_model import PictureDescriptionBaseModel
|
||||||
|
from docling.models.readingorder_model import ReadingOrderModel, ReadingOrderOptions
|
||||||
|
from docling.models.table_structure_model import TableStructureModel
|
||||||
|
from docling.pipeline.base_pipeline import BasePipeline
|
||||||
|
from docling.utils.profiling import ProfilingScope, TimeRecorder
|
||||||
|
from docling.utils.utils import chunkify
|
||||||
|
|
||||||
|
_log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# ──────────────────────────────────────────────────────────────────────────────
|
||||||
|
# Helper data structures
|
||||||
|
# ──────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ThreadedItem:
|
||||||
|
"""Envelope that travels between pipeline stages."""
|
||||||
|
|
||||||
|
payload: Optional[Page]
|
||||||
|
run_id: int # Unique per *execute* call, monotonic across pipeline instance
|
||||||
|
page_no: int
|
||||||
|
conv_res: ConversionResult
|
||||||
|
error: Optional[Exception] = None
|
||||||
|
is_failed: bool = False
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ProcessingResult:
|
||||||
|
"""Aggregated outcome of a pipeline run."""
|
||||||
|
|
||||||
|
pages: List[Page] = field(default_factory=list)
|
||||||
|
failed_pages: List[Tuple[int, Exception]] = field(default_factory=list)
|
||||||
|
total_expected: int = 0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def success_count(self) -> int:
|
||||||
|
return len(self.pages)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def failure_count(self) -> int:
|
||||||
|
return len(self.failed_pages)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_partial_success(self) -> bool:
|
||||||
|
return 0 < self.success_count < self.total_expected
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_complete_failure(self) -> bool:
|
||||||
|
return self.success_count == 0 and self.failure_count > 0
|
||||||
|
|
||||||
|
|
||||||
|
class ThreadedQueue:
|
||||||
|
"""Bounded queue with blocking put/ get_batch and explicit *close()* semantics."""
|
||||||
|
|
||||||
|
__slots__ = ("_closed", "_items", "_lock", "_max", "_not_empty", "_not_full")
|
||||||
|
|
||||||
|
def __init__(self, max_size: int) -> None:
|
||||||
|
self._max: int = max_size
|
||||||
|
self._items: deque[ThreadedItem] = deque()
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
self._not_full = threading.Condition(self._lock)
|
||||||
|
self._not_empty = threading.Condition(self._lock)
|
||||||
|
self._closed = False
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------- put()
|
||||||
|
def put(self, item: ThreadedItem, timeout: Optional[float] | None = None) -> bool:
|
||||||
|
"""Block until queue accepts *item* or is closed. Returns *False* if closed."""
|
||||||
|
with self._not_full:
|
||||||
|
if self._closed:
|
||||||
|
return False
|
||||||
|
start = time.monotonic()
|
||||||
|
while len(self._items) >= self._max and not self._closed:
|
||||||
|
if timeout is not None:
|
||||||
|
remaining = timeout - (time.monotonic() - start)
|
||||||
|
if remaining <= 0:
|
||||||
|
return False
|
||||||
|
self._not_full.wait(remaining)
|
||||||
|
else:
|
||||||
|
self._not_full.wait()
|
||||||
|
if self._closed:
|
||||||
|
return False
|
||||||
|
self._items.append(item)
|
||||||
|
self._not_empty.notify()
|
||||||
|
return True
|
||||||
|
|
||||||
|
# ------------------------------------------------------------ get_batch()
|
||||||
|
def get_batch(
|
||||||
|
self, size: int, timeout: Optional[float] | None = None
|
||||||
|
) -> List[ThreadedItem]:
|
||||||
|
"""Return up to *size* items. Blocks until ≥1 item present or queue closed/timeout."""
|
||||||
|
with self._not_empty:
|
||||||
|
start = time.monotonic()
|
||||||
|
while not self._items and not self._closed:
|
||||||
|
if timeout is not None:
|
||||||
|
remaining = timeout - (time.monotonic() - start)
|
||||||
|
if remaining <= 0:
|
||||||
|
return []
|
||||||
|
self._not_empty.wait(remaining)
|
||||||
|
else:
|
||||||
|
self._not_empty.wait()
|
||||||
|
batch: List[ThreadedItem] = []
|
||||||
|
while self._items and len(batch) < size:
|
||||||
|
batch.append(self._items.popleft())
|
||||||
|
if batch:
|
||||||
|
self._not_full.notify_all()
|
||||||
|
return batch
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------- close()
|
||||||
|
def close(self) -> None:
|
||||||
|
with self._lock:
|
||||||
|
self._closed = True
|
||||||
|
self._not_empty.notify_all()
|
||||||
|
self._not_full.notify_all()
|
||||||
|
|
||||||
|
# -------------------------------------------------------------- property
|
||||||
|
@property
|
||||||
|
def closed(self) -> bool:
|
||||||
|
return self._closed
|
||||||
|
|
||||||
|
|
||||||
|
class ThreadedPipelineStage:
|
||||||
|
"""A single pipeline stage backed by one worker thread."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
name: str,
|
||||||
|
model: Any,
|
||||||
|
batch_size: int,
|
||||||
|
batch_timeout: float,
|
||||||
|
queue_max_size: int,
|
||||||
|
) -> None:
|
||||||
|
self.name = name
|
||||||
|
self.model = model
|
||||||
|
self.batch_size = batch_size
|
||||||
|
self.batch_timeout = batch_timeout
|
||||||
|
self.input_queue = ThreadedQueue(queue_max_size)
|
||||||
|
self._outputs: list[ThreadedQueue] = []
|
||||||
|
self._thread: Optional[threading.Thread] = None
|
||||||
|
self._running = False
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------- wiring
|
||||||
|
def add_output_queue(self, q: ThreadedQueue) -> None:
|
||||||
|
self._outputs.append(q)
|
||||||
|
|
||||||
|
# -------------------------------------------------------------- lifecycle
|
||||||
|
def start(self) -> None:
|
||||||
|
if self._running:
|
||||||
|
return
|
||||||
|
self._running = True
|
||||||
|
self._thread = threading.Thread(
|
||||||
|
target=self._run, name=f"Stage-{self.name}", daemon=False
|
||||||
|
)
|
||||||
|
self._thread.start()
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
if not self._running:
|
||||||
|
return
|
||||||
|
self._running = False
|
||||||
|
self.input_queue.close()
|
||||||
|
if self._thread is not None:
|
||||||
|
self._thread.join(timeout=30.0)
|
||||||
|
if self._thread.is_alive():
|
||||||
|
_log.warning("Stage %s did not terminate cleanly within 30s", self.name)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------ _run
|
||||||
|
def _run(self) -> None:
|
||||||
|
try:
|
||||||
|
while self._running:
|
||||||
|
batch = self.input_queue.get_batch(self.batch_size, self.batch_timeout)
|
||||||
|
if not batch and self.input_queue.closed:
|
||||||
|
break
|
||||||
|
processed = self._process_batch(batch)
|
||||||
|
self._emit(processed)
|
||||||
|
except Exception: # pragma: no cover - top-level guard
|
||||||
|
_log.exception("Fatal error in stage %s", self.name)
|
||||||
|
finally:
|
||||||
|
for q in self._outputs:
|
||||||
|
q.close()
|
||||||
|
|
||||||
|
# ----------------------------------------------------- _process_batch()
|
||||||
|
def _process_batch(self, batch: Sequence[ThreadedItem]) -> list[ThreadedItem]:
|
||||||
|
"""Run *model* on *batch* grouped by run_id to maximise batching."""
|
||||||
|
groups: dict[int, list[ThreadedItem]] = defaultdict(list)
|
||||||
|
for itm in batch:
|
||||||
|
groups[itm.run_id].append(itm)
|
||||||
|
|
||||||
|
result: list[ThreadedItem] = []
|
||||||
|
for rid, items in groups.items():
|
||||||
|
good: list[ThreadedItem] = [i for i in items if not i.is_failed]
|
||||||
|
if not good:
|
||||||
|
result.extend(items)
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
# Filter out None payloads and ensure type safety
|
||||||
|
pages_with_payloads = [
|
||||||
|
(i, i.payload) for i in good if i.payload is not None
|
||||||
|
]
|
||||||
|
if len(pages_with_payloads) != len(good):
|
||||||
|
# Some items have None payloads, mark all as failed
|
||||||
|
for it in items:
|
||||||
|
it.is_failed = True
|
||||||
|
it.error = RuntimeError("Page payload is None")
|
||||||
|
result.extend(items)
|
||||||
|
continue
|
||||||
|
|
||||||
|
pages: List[Page] = [payload for _, payload in pages_with_payloads]
|
||||||
|
processed_pages = list(self.model(good[0].conv_res, pages)) # type: ignore[arg-type]
|
||||||
|
if len(processed_pages) != len(pages): # strict mismatch guard
|
||||||
|
raise RuntimeError(
|
||||||
|
f"Model {self.name} returned wrong number of pages"
|
||||||
|
)
|
||||||
|
for idx, page in enumerate(processed_pages):
|
||||||
|
result.append(
|
||||||
|
ThreadedItem(
|
||||||
|
payload=page,
|
||||||
|
run_id=rid,
|
||||||
|
page_no=good[idx].page_no,
|
||||||
|
conv_res=good[idx].conv_res,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
_log.error("Stage %s failed for run %d: %s", self.name, rid, exc)
|
||||||
|
for it in items:
|
||||||
|
it.is_failed = True
|
||||||
|
it.error = exc
|
||||||
|
result.extend(items)
|
||||||
|
return result
|
||||||
|
|
||||||
|
# -------------------------------------------------------------- _emit()
|
||||||
|
def _emit(self, items: Iterable[ThreadedItem]) -> None:
|
||||||
|
for item in items:
|
||||||
|
for q in self._outputs:
|
||||||
|
if not q.put(item):
|
||||||
|
_log.error("Output queue closed while emitting from %s", self.name)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RunContext:
|
||||||
|
"""Wiring for a single *execute* call."""
|
||||||
|
|
||||||
|
stages: list[ThreadedPipelineStage]
|
||||||
|
first_stage: ThreadedPipelineStage
|
||||||
|
output_queue: ThreadedQueue
|
||||||
|
|
||||||
|
|
||||||
|
# ──────────────────────────────────────────────────────────────────────────────
|
||||||
|
# Main pipeline
|
||||||
|
# ──────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class ThreadedStandardPdfPipeline(BasePipeline):
|
||||||
|
"""High-performance PDF pipeline with multi-threaded stages."""
|
||||||
|
|
||||||
|
def __init__(self, pipeline_options: ThreadedPdfPipelineOptions) -> None:
|
||||||
|
super().__init__(pipeline_options)
|
||||||
|
self.pipeline_options: ThreadedPdfPipelineOptions = pipeline_options
|
||||||
|
self._run_seq = itertools.count(1) # deterministic, monotonic run ids
|
||||||
|
|
||||||
|
# initialise heavy models once
|
||||||
|
self._init_models()
|
||||||
|
|
||||||
|
# ────────────────────────────────────────────────────────────────────────
|
||||||
|
# Heavy-model initialisation & helpers
|
||||||
|
# ────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _init_models(self) -> None:
|
||||||
|
art_path = self._resolve_artifacts_path()
|
||||||
|
self.keep_images = (
|
||||||
|
self.pipeline_options.generate_page_images
|
||||||
|
or self.pipeline_options.generate_picture_images
|
||||||
|
or self.pipeline_options.generate_table_images
|
||||||
|
)
|
||||||
|
self.preprocessing_model = PagePreprocessingModel(
|
||||||
|
options=PagePreprocessingOptions(
|
||||||
|
images_scale=self.pipeline_options.images_scale
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.ocr_model = self._make_ocr_model(art_path)
|
||||||
|
self.layout_model = LayoutModel(
|
||||||
|
artifacts_path=art_path,
|
||||||
|
accelerator_options=self.pipeline_options.accelerator_options,
|
||||||
|
options=self.pipeline_options.layout_options,
|
||||||
|
)
|
||||||
|
self.table_model = TableStructureModel(
|
||||||
|
enabled=self.pipeline_options.do_table_structure,
|
||||||
|
artifacts_path=art_path,
|
||||||
|
options=self.pipeline_options.table_structure_options,
|
||||||
|
accelerator_options=self.pipeline_options.accelerator_options,
|
||||||
|
)
|
||||||
|
self.assemble_model = PageAssembleModel(options=PageAssembleOptions())
|
||||||
|
self.reading_order_model = ReadingOrderModel(options=ReadingOrderOptions())
|
||||||
|
|
||||||
|
# --- optional enrichment ------------------------------------------------
|
||||||
|
self.enrichment_pipe = []
|
||||||
|
code_formula = CodeFormulaModel(
|
||||||
|
enabled=self.pipeline_options.do_code_enrichment
|
||||||
|
or self.pipeline_options.do_formula_enrichment,
|
||||||
|
artifacts_path=art_path,
|
||||||
|
options=CodeFormulaModelOptions(
|
||||||
|
do_code_enrichment=self.pipeline_options.do_code_enrichment,
|
||||||
|
do_formula_enrichment=self.pipeline_options.do_formula_enrichment,
|
||||||
|
),
|
||||||
|
accelerator_options=self.pipeline_options.accelerator_options,
|
||||||
|
)
|
||||||
|
if code_formula.enabled:
|
||||||
|
self.enrichment_pipe.append(code_formula)
|
||||||
|
|
||||||
|
picture_classifier = DocumentPictureClassifier(
|
||||||
|
enabled=self.pipeline_options.do_picture_classification,
|
||||||
|
artifacts_path=art_path,
|
||||||
|
options=DocumentPictureClassifierOptions(),
|
||||||
|
accelerator_options=self.pipeline_options.accelerator_options,
|
||||||
|
)
|
||||||
|
if picture_classifier.enabled:
|
||||||
|
self.enrichment_pipe.append(picture_classifier)
|
||||||
|
|
||||||
|
picture_descr = self._make_picture_description_model(art_path)
|
||||||
|
if picture_descr and picture_descr.enabled:
|
||||||
|
self.enrichment_pipe.append(picture_descr)
|
||||||
|
|
||||||
|
self.keep_backend = any(
|
||||||
|
(
|
||||||
|
self.pipeline_options.do_formula_enrichment,
|
||||||
|
self.pipeline_options.do_code_enrichment,
|
||||||
|
self.pipeline_options.do_picture_classification,
|
||||||
|
self.pipeline_options.do_picture_description,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------- helpers
|
||||||
|
def _resolve_artifacts_path(self) -> Optional[Path]:
|
||||||
|
if self.pipeline_options.artifacts_path:
|
||||||
|
p = Path(self.pipeline_options.artifacts_path).expanduser()
|
||||||
|
elif settings.artifacts_path:
|
||||||
|
p = Path(settings.artifacts_path).expanduser()
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
if not p.is_dir():
|
||||||
|
raise RuntimeError(
|
||||||
|
f"{p} does not exist or is not a directory containing the required models"
|
||||||
|
)
|
||||||
|
return p
|
||||||
|
|
||||||
|
def _make_ocr_model(self, art_path: Optional[Path]) -> Any:
|
||||||
|
factory = get_ocr_factory(
|
||||||
|
allow_external_plugins=self.pipeline_options.allow_external_plugins
|
||||||
|
)
|
||||||
|
return factory.create_instance(
|
||||||
|
options=self.pipeline_options.ocr_options,
|
||||||
|
enabled=self.pipeline_options.do_ocr,
|
||||||
|
artifacts_path=art_path,
|
||||||
|
accelerator_options=self.pipeline_options.accelerator_options,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _make_picture_description_model(
|
||||||
|
self, art_path: Optional[Path]
|
||||||
|
) -> Optional[PictureDescriptionBaseModel]:
|
||||||
|
factory = get_picture_description_factory(
|
||||||
|
allow_external_plugins=self.pipeline_options.allow_external_plugins
|
||||||
|
)
|
||||||
|
return factory.create_instance(
|
||||||
|
options=self.pipeline_options.picture_description_options,
|
||||||
|
enabled=self.pipeline_options.do_picture_description,
|
||||||
|
enable_remote_services=self.pipeline_options.enable_remote_services,
|
||||||
|
artifacts_path=art_path,
|
||||||
|
accelerator_options=self.pipeline_options.accelerator_options,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ────────────────────────────────────────────────────────────────────────
|
||||||
|
# Build - thread pipeline
|
||||||
|
# ────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _create_run_ctx(self) -> RunContext:
|
||||||
|
opts = self.pipeline_options
|
||||||
|
preprocess = ThreadedPipelineStage(
|
||||||
|
name="preprocess",
|
||||||
|
model=self.preprocessing_model,
|
||||||
|
batch_size=1,
|
||||||
|
batch_timeout=opts.batch_timeout_seconds,
|
||||||
|
queue_max_size=opts.queue_max_size,
|
||||||
|
)
|
||||||
|
ocr = ThreadedPipelineStage(
|
||||||
|
name="ocr",
|
||||||
|
model=self.ocr_model,
|
||||||
|
batch_size=opts.ocr_batch_size,
|
||||||
|
batch_timeout=opts.batch_timeout_seconds,
|
||||||
|
queue_max_size=opts.queue_max_size,
|
||||||
|
)
|
||||||
|
layout = ThreadedPipelineStage(
|
||||||
|
name="layout",
|
||||||
|
model=self.layout_model,
|
||||||
|
batch_size=opts.layout_batch_size,
|
||||||
|
batch_timeout=opts.batch_timeout_seconds,
|
||||||
|
queue_max_size=opts.queue_max_size,
|
||||||
|
)
|
||||||
|
table = ThreadedPipelineStage(
|
||||||
|
name="table",
|
||||||
|
model=self.table_model,
|
||||||
|
batch_size=opts.table_batch_size,
|
||||||
|
batch_timeout=opts.batch_timeout_seconds,
|
||||||
|
queue_max_size=opts.queue_max_size,
|
||||||
|
)
|
||||||
|
assemble = ThreadedPipelineStage(
|
||||||
|
name="assemble",
|
||||||
|
model=self.assemble_model,
|
||||||
|
batch_size=1,
|
||||||
|
batch_timeout=opts.batch_timeout_seconds,
|
||||||
|
queue_max_size=opts.queue_max_size,
|
||||||
|
)
|
||||||
|
|
||||||
|
# wire stages
|
||||||
|
output_q = ThreadedQueue(opts.queue_max_size)
|
||||||
|
preprocess.add_output_queue(ocr.input_queue)
|
||||||
|
ocr.add_output_queue(layout.input_queue)
|
||||||
|
layout.add_output_queue(table.input_queue)
|
||||||
|
table.add_output_queue(assemble.input_queue)
|
||||||
|
assemble.add_output_queue(output_q)
|
||||||
|
|
||||||
|
stages = [preprocess, ocr, layout, table, assemble]
|
||||||
|
return RunContext(stages=stages, first_stage=preprocess, output_queue=output_q)
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------- build
|
||||||
|
def _build_document(self, conv_res: ConversionResult) -> ConversionResult:
|
||||||
|
"""Stream-build the document while interleaving producer and consumer work."""
|
||||||
|
run_id = next(self._run_seq)
|
||||||
|
assert isinstance(conv_res.input._backend, PdfDocumentBackend)
|
||||||
|
backend = conv_res.input._backend
|
||||||
|
|
||||||
|
# preload & initialise pages -------------------------------------------------------------
|
||||||
|
start_page, end_page = conv_res.input.limits.page_range
|
||||||
|
pages: list[Page] = []
|
||||||
|
for i in range(conv_res.input.page_count):
|
||||||
|
if start_page - 1 <= i <= end_page - 1:
|
||||||
|
page = Page(page_no=i)
|
||||||
|
page._backend = backend.load_page(i)
|
||||||
|
if page._backend and page._backend.is_valid():
|
||||||
|
page.size = page._backend.get_size()
|
||||||
|
conv_res.pages.append(page)
|
||||||
|
pages.append(page)
|
||||||
|
|
||||||
|
if not pages:
|
||||||
|
conv_res.status = ConversionStatus.FAILURE
|
||||||
|
return conv_res
|
||||||
|
|
||||||
|
total_pages: int = len(pages)
|
||||||
|
ctx: RunContext = self._create_run_ctx()
|
||||||
|
for st in ctx.stages:
|
||||||
|
st.start()
|
||||||
|
|
||||||
|
proc = ProcessingResult(total_expected=total_pages)
|
||||||
|
fed_idx: int = 0 # number of pages successfully queued
|
||||||
|
batch_size: int = 32 # drain chunk
|
||||||
|
try:
|
||||||
|
while proc.success_count + proc.failure_count < total_pages:
|
||||||
|
# 1) feed - try to enqueue until the first queue is full
|
||||||
|
while fed_idx < total_pages:
|
||||||
|
ok = ctx.first_stage.input_queue.put(
|
||||||
|
ThreadedItem(
|
||||||
|
payload=pages[fed_idx],
|
||||||
|
run_id=run_id,
|
||||||
|
page_no=pages[fed_idx].page_no,
|
||||||
|
conv_res=conv_res,
|
||||||
|
),
|
||||||
|
timeout=0.0, # non-blocking try-put
|
||||||
|
)
|
||||||
|
if ok:
|
||||||
|
fed_idx += 1
|
||||||
|
if fed_idx == total_pages:
|
||||||
|
ctx.first_stage.input_queue.close()
|
||||||
|
else: # queue full - switch to draining
|
||||||
|
break
|
||||||
|
|
||||||
|
# 2) drain - pull whatever is ready from the output side
|
||||||
|
out_batch = ctx.output_queue.get_batch(batch_size, timeout=0.05)
|
||||||
|
for itm in out_batch:
|
||||||
|
if itm.run_id != run_id:
|
||||||
|
continue
|
||||||
|
if itm.is_failed or itm.error:
|
||||||
|
proc.failed_pages.append(
|
||||||
|
(itm.page_no, itm.error or RuntimeError("unknown error"))
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
assert itm.payload is not None
|
||||||
|
proc.pages.append(itm.payload)
|
||||||
|
|
||||||
|
# 3) failure safety - downstream closed early -> mark missing pages failed
|
||||||
|
if not out_batch and ctx.output_queue.closed:
|
||||||
|
missing = total_pages - (proc.success_count + proc.failure_count)
|
||||||
|
if missing > 0:
|
||||||
|
proc.failed_pages.extend(
|
||||||
|
[(-1, RuntimeError("pipeline terminated early"))] * missing
|
||||||
|
)
|
||||||
|
break
|
||||||
|
finally:
|
||||||
|
for st in ctx.stages:
|
||||||
|
st.stop()
|
||||||
|
ctx.output_queue.close()
|
||||||
|
|
||||||
|
self._integrate_results(conv_res, proc)
|
||||||
|
return conv_res
|
||||||
|
|
||||||
|
# ---------------------------------------------------- integrate_results()
|
||||||
|
def _integrate_results(
|
||||||
|
self, conv_res: ConversionResult, proc: ProcessingResult
|
||||||
|
) -> None:
|
||||||
|
page_map = {p.page_no: p for p in proc.pages}
|
||||||
|
conv_res.pages = [
|
||||||
|
page_map.get(p.page_no, p)
|
||||||
|
for p in conv_res.pages
|
||||||
|
if p.page_no in page_map
|
||||||
|
or not any(fp == p.page_no for fp, _ in proc.failed_pages)
|
||||||
|
]
|
||||||
|
if proc.is_complete_failure:
|
||||||
|
conv_res.status = ConversionStatus.FAILURE
|
||||||
|
elif proc.is_partial_success:
|
||||||
|
conv_res.status = ConversionStatus.PARTIAL_SUCCESS
|
||||||
|
else:
|
||||||
|
conv_res.status = ConversionStatus.SUCCESS
|
||||||
|
if not self.keep_images:
|
||||||
|
for p in conv_res.pages:
|
||||||
|
p._image_cache = {}
|
||||||
|
if not self.keep_backend:
|
||||||
|
for p in conv_res.pages:
|
||||||
|
if p._backend is not None:
|
||||||
|
p._backend.unload()
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------- assemble
|
||||||
|
def _assemble_document(self, conv_res: ConversionResult) -> ConversionResult:
|
||||||
|
elements, headers, body = [], [], []
|
||||||
|
with TimeRecorder(conv_res, "doc_assemble", scope=ProfilingScope.DOCUMENT):
|
||||||
|
for p in conv_res.pages:
|
||||||
|
if p.assembled:
|
||||||
|
elements.extend(p.assembled.elements)
|
||||||
|
headers.extend(p.assembled.headers)
|
||||||
|
body.extend(p.assembled.body)
|
||||||
|
conv_res.assembled = AssembledUnit(
|
||||||
|
elements=elements, headers=headers, body=body
|
||||||
|
)
|
||||||
|
conv_res.document = self.reading_order_model(conv_res)
|
||||||
|
return conv_res
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------- misc
|
||||||
|
@classmethod
|
||||||
|
def get_default_options(cls) -> ThreadedPdfPipelineOptions:
|
||||||
|
return ThreadedPdfPipelineOptions()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def is_backend_supported(cls, backend: AbstractDocumentBackend) -> bool:
|
||||||
|
return isinstance(backend, PdfDocumentBackend)
|
||||||
|
|
||||||
|
def _determine_status(self, conv_res: ConversionResult) -> ConversionStatus:
|
||||||
|
return conv_res.status
|
||||||
|
|
||||||
|
def _unload(self, conv_res: ConversionResult) -> None:
|
||||||
|
for p in conv_res.pages:
|
||||||
|
if p._backend is not None:
|
||||||
|
p._backend.unload()
|
||||||
|
if conv_res.input._backend:
|
||||||
|
conv_res.input._backend.unload()
|
@ -46,7 +46,7 @@ dependencies = [
|
|||||||
'pydantic (>=2.0.0,<3.0.0)',
|
'pydantic (>=2.0.0,<3.0.0)',
|
||||||
'docling-core[chunking] (>=2.42.0,<3.0.0)',
|
'docling-core[chunking] (>=2.42.0,<3.0.0)',
|
||||||
'docling-parse (>=4.0.0,<5.0.0)',
|
'docling-parse (>=4.0.0,<5.0.0)',
|
||||||
"docling-ibm-models>=3.6.0,<4",
|
"docling-ibm-models>=3.9.0,<4",
|
||||||
'filetype (>=1.2.0,<2.0.0)',
|
'filetype (>=1.2.0,<2.0.0)',
|
||||||
'pypdfium2 (>=4.30.0,!=4.30.1,<5.0.0)',
|
'pypdfium2 (>=4.30.0,!=4.30.1,<5.0.0)',
|
||||||
'pydantic-settings (>=2.3.0,<3.0.0)',
|
'pydantic-settings (>=2.3.0,<3.0.0)',
|
||||||
@ -149,8 +149,6 @@ constraints = [
|
|||||||
package = true
|
package = true
|
||||||
default-groups = "all"
|
default-groups = "all"
|
||||||
|
|
||||||
[tool.uv.sources]
|
|
||||||
|
|
||||||
[tool.setuptools.packages.find]
|
[tool.setuptools.packages.find]
|
||||||
include = ["docling*"]
|
include = ["docling*"]
|
||||||
|
|
||||||
|
@ -25,5 +25,5 @@ def test_settings():
|
|||||||
assert m.settings.debug.visualize_raw_layout is True
|
assert m.settings.debug.visualize_raw_layout is True
|
||||||
|
|
||||||
# Check nested defaults
|
# Check nested defaults
|
||||||
assert m.settings.perf.doc_batch_size == 2
|
assert m.settings.perf.doc_batch_size == 1
|
||||||
assert m.settings.debug.visualize_ocr is False
|
assert m.settings.debug.visualize_ocr is False
|
||||||
|
176
tests/test_threaded_pipeline.py
Normal file
176
tests/test_threaded_pipeline.py
Normal file
@ -0,0 +1,176 @@
|
|||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from docling.datamodel.base_models import ConversionStatus, InputFormat
|
||||||
|
from docling.datamodel.document import ConversionResult
|
||||||
|
from docling.datamodel.pipeline_options import (
|
||||||
|
PdfPipelineOptions,
|
||||||
|
ThreadedPdfPipelineOptions,
|
||||||
|
)
|
||||||
|
from docling.document_converter import DocumentConverter, PdfFormatOption
|
||||||
|
from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline
|
||||||
|
from docling.pipeline.threaded_standard_pdf_pipeline import ThreadedStandardPdfPipeline
|
||||||
|
|
||||||
|
|
||||||
|
def test_threaded_pipeline_multiple_documents():
|
||||||
|
"""Test threaded pipeline with multiple documents and compare with standard pipeline"""
|
||||||
|
test_files = [
|
||||||
|
"tests/data/pdf/2203.01017v2.pdf",
|
||||||
|
"tests/data/pdf/2206.01062.pdf",
|
||||||
|
"tests/data/pdf/2305.03393v1.pdf",
|
||||||
|
]
|
||||||
|
# test_files = [str(f) for f in Path("test/data/pdf").rglob("*.pdf")]
|
||||||
|
|
||||||
|
do_ts = False
|
||||||
|
do_ocr = False
|
||||||
|
|
||||||
|
run_threaded = True
|
||||||
|
run_serial = True
|
||||||
|
|
||||||
|
if run_threaded:
|
||||||
|
# Threaded pipeline
|
||||||
|
threaded_converter = DocumentConverter(
|
||||||
|
format_options={
|
||||||
|
InputFormat.PDF: PdfFormatOption(
|
||||||
|
pipeline_cls=ThreadedStandardPdfPipeline,
|
||||||
|
pipeline_options=ThreadedPdfPipelineOptions(
|
||||||
|
layout_batch_size=1,
|
||||||
|
table_batch_size=1,
|
||||||
|
ocr_batch_size=1,
|
||||||
|
batch_timeout_seconds=1.0,
|
||||||
|
do_table_structure=do_ts,
|
||||||
|
do_ocr=do_ocr,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
threaded_converter.initialize_pipeline(InputFormat.PDF)
|
||||||
|
|
||||||
|
# Test threaded pipeline
|
||||||
|
threaded_success_count = 0
|
||||||
|
threaded_failure_count = 0
|
||||||
|
start_time = time.perf_counter()
|
||||||
|
for result in threaded_converter.convert_all(test_files, raises_on_error=True):
|
||||||
|
print(
|
||||||
|
"Finished converting document with threaded pipeline:",
|
||||||
|
result.input.file.name,
|
||||||
|
)
|
||||||
|
if result.status == ConversionStatus.SUCCESS:
|
||||||
|
threaded_success_count += 1
|
||||||
|
else:
|
||||||
|
threaded_failure_count += 1
|
||||||
|
threaded_time = time.perf_counter() - start_time
|
||||||
|
|
||||||
|
del threaded_converter
|
||||||
|
|
||||||
|
print(f"Threaded pipeline: {threaded_time:.2f} seconds")
|
||||||
|
|
||||||
|
if run_serial:
|
||||||
|
# Standard pipeline
|
||||||
|
standard_converter = DocumentConverter(
|
||||||
|
format_options={
|
||||||
|
InputFormat.PDF: PdfFormatOption(
|
||||||
|
pipeline_cls=StandardPdfPipeline,
|
||||||
|
pipeline_options=PdfPipelineOptions(
|
||||||
|
do_table_structure=do_ts,
|
||||||
|
do_ocr=do_ocr,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
standard_converter.initialize_pipeline(InputFormat.PDF)
|
||||||
|
|
||||||
|
# Test standard pipeline
|
||||||
|
standard_success_count = 0
|
||||||
|
standard_failure_count = 0
|
||||||
|
start_time = time.perf_counter()
|
||||||
|
for result in standard_converter.convert_all(test_files, raises_on_error=True):
|
||||||
|
print(
|
||||||
|
"Finished converting document with standard pipeline:",
|
||||||
|
result.input.file.name,
|
||||||
|
)
|
||||||
|
if result.status == ConversionStatus.SUCCESS:
|
||||||
|
standard_success_count += 1
|
||||||
|
else:
|
||||||
|
standard_failure_count += 1
|
||||||
|
standard_time = time.perf_counter() - start_time
|
||||||
|
|
||||||
|
del standard_converter
|
||||||
|
|
||||||
|
print(f"Standard pipeline: {standard_time:.2f} seconds")
|
||||||
|
|
||||||
|
# Verify results
|
||||||
|
if run_threaded and run_serial:
|
||||||
|
assert standard_success_count == threaded_success_count
|
||||||
|
assert standard_failure_count == threaded_failure_count
|
||||||
|
if run_serial:
|
||||||
|
assert standard_success_count == len(test_files)
|
||||||
|
assert standard_failure_count == 0
|
||||||
|
if run_threaded:
|
||||||
|
assert threaded_success_count == len(test_files)
|
||||||
|
assert threaded_failure_count == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_pipeline_comparison():
|
||||||
|
"""Compare all three pipeline implementations"""
|
||||||
|
test_file = "tests/data/pdf/2206.01062.pdf"
|
||||||
|
|
||||||
|
# Sync pipeline
|
||||||
|
sync_converter = DocumentConverter(
|
||||||
|
format_options={
|
||||||
|
InputFormat.PDF: PdfFormatOption(
|
||||||
|
pipeline_cls=StandardPdfPipeline,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
start_time = time.perf_counter()
|
||||||
|
sync_results = list(sync_converter.convert_all([test_file]))
|
||||||
|
sync_time = time.perf_counter() - start_time
|
||||||
|
|
||||||
|
# Threaded pipeline
|
||||||
|
threaded_converter = DocumentConverter(
|
||||||
|
format_options={
|
||||||
|
InputFormat.PDF: PdfFormatOption(
|
||||||
|
pipeline_cls=ThreadedStandardPdfPipeline,
|
||||||
|
pipeline_options=ThreadedPdfPipelineOptions(
|
||||||
|
layout_batch_size=1,
|
||||||
|
ocr_batch_size=1,
|
||||||
|
table_batch_size=1,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
start_time = time.perf_counter()
|
||||||
|
threaded_results = list(threaded_converter.convert_all([test_file]))
|
||||||
|
threaded_time = time.perf_counter() - start_time
|
||||||
|
|
||||||
|
print("\nPipeline Comparison:")
|
||||||
|
print(f"Sync pipeline: {sync_time:.2f} seconds")
|
||||||
|
print(f"Threaded pipeline: {threaded_time:.2f} seconds")
|
||||||
|
print(f"Speedup: {sync_time / threaded_time:.2f}x")
|
||||||
|
|
||||||
|
# Verify results are equivalent
|
||||||
|
assert len(sync_results) == len(threaded_results) == 1
|
||||||
|
assert (
|
||||||
|
sync_results[0].status == threaded_results[0].status == ConversionStatus.SUCCESS
|
||||||
|
)
|
||||||
|
|
||||||
|
# Basic content comparison
|
||||||
|
sync_doc = sync_results[0].document
|
||||||
|
threaded_doc = threaded_results[0].document
|
||||||
|
|
||||||
|
assert len(sync_doc.pages) == len(threaded_doc.pages)
|
||||||
|
assert len(sync_doc.texts) == len(threaded_doc.texts)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Run basic performance test
|
||||||
|
test_pipeline_comparison()
|
2
uv.lock
generated
2
uv.lock
generated
@ -943,7 +943,7 @@ requires-dist = [
|
|||||||
{ name = "beautifulsoup4", specifier = ">=4.12.3,<5.0.0" },
|
{ name = "beautifulsoup4", specifier = ">=4.12.3,<5.0.0" },
|
||||||
{ name = "certifi", specifier = ">=2024.7.4" },
|
{ name = "certifi", specifier = ">=2024.7.4" },
|
||||||
{ name = "docling-core", extras = ["chunking"], specifier = ">=2.42.0,<3.0.0" },
|
{ name = "docling-core", extras = ["chunking"], specifier = ">=2.42.0,<3.0.0" },
|
||||||
{ name = "docling-ibm-models", specifier = ">=3.6.0,<4" },
|
{ name = "docling-ibm-models", specifier = ">=3.9.0,<4" },
|
||||||
{ name = "docling-parse", specifier = ">=4.0.0,<5.0.0" },
|
{ name = "docling-parse", specifier = ">=4.0.0,<5.0.0" },
|
||||||
{ name = "easyocr", specifier = ">=1.7,<2.0" },
|
{ name = "easyocr", specifier = ">=1.7,<2.0" },
|
||||||
{ name = "filetype", specifier = ">=1.2.0,<2.0.0" },
|
{ name = "filetype", specifier = ">=1.2.0,<2.0.0" },
|
||||||
|
Loading…
Reference in New Issue
Block a user