Revise pipeline

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>
This commit is contained in:
Christoph Auer 2025-07-18 14:33:03 +02:00
parent 9fd01f3399
commit 33a24848a0
3 changed files with 455 additions and 529 deletions

View File

@ -1,8 +1,10 @@
import hashlib
import logging
import sys
import threading
import time
from collections.abc import Iterable, Iterator
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Type, Union
@ -50,6 +52,9 @@ from docling.utils.utils import chunkify
_log = logging.getLogger(__name__)
# Module-level lock for pipeline cache
_pipeline_cache_lock = threading.Lock()
class FormatOption(BaseModel):
pipeline_cls: Type[BasePipeline]
@ -284,10 +289,13 @@ class DocumentConverter:
_log.info("Going to convert document batch...")
# parallel processing only within input_batch
# with ThreadPoolExecutor(
#with ThreadPoolExecutor(
# max_workers=settings.perf.doc_batch_concurrency
# ) as pool:
# yield from pool.map(self.process_document, input_batch)
#) as pool:
# yield from pool.map(
# partial(self._process_document, raises_on_error=raises_on_error),
# input_batch,
# )
# Note: PDF backends are not thread-safe, thread pool usage was disabled.
for item in map(
@ -315,19 +323,20 @@ class DocumentConverter:
# Use a composite key to cache pipelines
cache_key = (pipeline_class, options_hash)
if cache_key not in self.initialized_pipelines:
_log.info(
f"Initializing pipeline for {pipeline_class.__name__} with options hash {options_hash}"
)
self.initialized_pipelines[cache_key] = pipeline_class(
pipeline_options=pipeline_options
)
else:
_log.debug(
f"Reusing cached pipeline for {pipeline_class.__name__} with options hash {options_hash}"
)
with _pipeline_cache_lock:
if cache_key not in self.initialized_pipelines:
_log.info(
f"Initializing pipeline for {pipeline_class.__name__} with options hash {options_hash}"
)
self.initialized_pipelines[cache_key] = pipeline_class(
pipeline_options=pipeline_options
)
else:
_log.debug(
f"Reusing cached pipeline for {pipeline_class.__name__} with options hash {options_hash}"
)
return self.initialized_pipelines[cache_key]
return self.initialized_pipelines[cache_key]
def _process_document(
self, in_doc: InputDocument, raises_on_error: bool

File diff suppressed because it is too large Load Diff

View File

@ -1,56 +1,104 @@
import logging
import time
from pathlib import Path
from typing import List
import pytest
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.datamodel.base_models import InputFormat, ConversionStatus
from docling.datamodel.base_models import ConversionStatus, InputFormat
from docling.datamodel.document import ConversionResult
from docling.datamodel.pipeline_options import (
PdfPipelineOptions,
ThreadedPdfPipelineOptions
ThreadedPdfPipelineOptions,
)
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline
from docling.pipeline.threaded_standard_pdf_pipeline import ThreadedStandardPdfPipeline
def test_threaded_pipeline_multiple_documents():
"""Test threaded pipeline with multiple documents"""
converter = DocumentConverter(
"""Test threaded pipeline with multiple documents and compare with standard pipeline"""
test_files = [
"tests/data/pdf/2203.01017v2.pdf",
"tests/data/pdf/2206.01062.pdf",
"tests/data/pdf/2305.03393v1.pdf"
]
# Standard pipeline
standard_converter = DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(
pipeline_cls=StandardPdfPipeline,
pipeline_options=PdfPipelineOptions(
do_table_structure=True,
do_ocr=True,
),
)
}
)
# Threaded pipeline
threaded_converter = DocumentConverter(
format_options={
InputFormat.PDF: PdfFormatOption(
pipeline_cls=ThreadedStandardPdfPipeline,
pipeline_options=ThreadedPdfPipelineOptions(
layout_batch_size=48,
ocr_batch_size=24,
layout_batch_size=1,
table_batch_size=1,
ocr_batch_size=1,
batch_timeout_seconds=1.0,
)
do_table_structure=True,
do_ocr=True,
),
)
}
)
# Test threaded pipeline with multiple documents
results = []
# Test standard pipeline
standard_results = []
start_time = time.perf_counter()
for result in converter.convert_all([
"tests/data/pdf/2206.01062.pdf",
"tests/data/pdf/2305.03393v1.pdf"
]):
results.append(result)
end_time = time.perf_counter()
conversion_duration = end_time - start_time
print(f"Threaded multi-doc conversion took {conversion_duration:.2f} seconds")
assert len(results) == 2
for result in results:
for result in standard_converter.convert_all(test_files, raises_on_error=True):
print("Finished converting document with standard pipeline:", result.input.file.name)
standard_results.append(result)
standard_time = time.perf_counter() - start_time
del standard_converter
# Test threaded pipeline
threaded_results = []
start_time = time.perf_counter()
for result in threaded_converter.convert_all(test_files, raises_on_error=True):
print("Finished converting document with threaded pipeline:", result.input.file.name)
threaded_results.append(result)
threaded_time = time.perf_counter() - start_time
del threaded_converter
print("\nMulti-document Pipeline Comparison:")
print(f"Standard pipeline: {standard_time:.2f} seconds")
print(f"Threaded pipeline: {threaded_time:.2f} seconds")
print(f"Speedup: {standard_time / threaded_time:.2f}x")
# Verify results
assert len(standard_results) == len(threaded_results)
for result in standard_results:
assert result.status == ConversionStatus.SUCCESS
for result in threaded_results:
assert result.status == ConversionStatus.SUCCESS
# Basic content comparison
for i, (standard_result, threaded_result) in enumerate(zip(standard_results, threaded_results)):
standard_doc = standard_result.document
threaded_doc = threaded_result.document
assert len(standard_doc.pages) == len(threaded_doc.pages), f"Document {i} page count mismatch"
assert len(standard_doc.texts) == len(threaded_doc.texts), f"Document {i} text count mismatch"
def test_pipeline_comparison():
"""Compare all three pipeline implementations"""
test_file = "tests/data/pdf/2206.01062.pdf"
# Sync pipeline
sync_converter = DocumentConverter(
format_options={
@ -59,11 +107,11 @@ def test_pipeline_comparison():
)
}
)
start_time = time.perf_counter()
sync_results = list(sync_converter.convert_all([test_file]))
sync_time = time.perf_counter() - start_time
# Threaded pipeline
threaded_converter = DocumentConverter(
format_options={
@ -73,35 +121,34 @@ def test_pipeline_comparison():
layout_batch_size=1,
ocr_batch_size=1,
table_batch_size=1,
)
),
)
}
)
start_time = time.perf_counter()
threaded_results = list(threaded_converter.convert_all([test_file]))
threaded_time = time.perf_counter() - start_time
print(f"\nPipeline Comparison:")
print("\nPipeline Comparison:")
print(f"Sync pipeline: {sync_time:.2f} seconds")
print(f"Threaded pipeline: {threaded_time:.2f} seconds")
print(f"Speedup: {sync_time/threaded_time:.2f}x")
print(f"Speedup: {sync_time / threaded_time:.2f}x")
# Verify results are equivalent
assert len(sync_results) == len(threaded_results) == 1
assert sync_results[0].status == threaded_results[0].status == ConversionStatus.SUCCESS
assert (
sync_results[0].status == threaded_results[0].status == ConversionStatus.SUCCESS
)
# Basic content comparison
sync_doc = sync_results[0].document
threaded_doc = threaded_results[0].document
assert len(sync_doc.pages) == len(threaded_doc.pages)
assert len(sync_doc.texts) == len(threaded_doc.texts)
if __name__ == "__main__":
# Run basic performance test
test_pipeline_comparison()
test_pipeline_comparison()