feat: Threaded PDF pipeline (#1951)

* Initial async pdf pipeline

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* UpstreamAwareQueue

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Refactoring into async pipeline primitives and graph

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Cleanups and safety improvements

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Better threaded PDF pipeline

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Pin docling-ibm-models

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Remove unused args

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Add test

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Revise pipeline

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Unload doc backend

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Revert "Unload doc backend"

This reverts commit 01066f0b6e.

* Remove redundant method

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Update threaded test

Signed-off-by: Ubuntu <ubuntu@ip-172-31-30-253.eu-central-1.compute.internal>

* Stop accumulating docs in test run

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Fix: don't starve on docs with > max_queue_size pages

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Fix: don't starve on docs with > max_queue_size pages

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* DCO Remediation Commit for Christoph Auer <cau@zurich.ibm.com>

I, Christoph Auer <cau@zurich.ibm.com>, hereby add my Signed-off-by to this commit: fa71cde950
I, Ubuntu <ubuntu@ip-172-31-30-253.eu-central-1.compute.internal>, hereby add my Signed-off-by to this commit: d66da87d96

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Fix: python3.9 compat

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Option to enable threadpool with doc_batch_concurrency setting

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Clean up unused code

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Fix settings defaults expectations

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Use released docling-ibm-models

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

* Remove ignores for typing/linting

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>

---------

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>
Signed-off-by: Ubuntu <ubuntu@ip-172-31-30-253.eu-central-1.compute.internal>
Co-authored-by: Ubuntu <ubuntu@ip-172-31-30-253.eu-central-1.compute.internal>
This commit is contained in:
Christoph Auer
2025-07-26 11:49:37 +02:00
committed by GitHub
parent aec29a7315
commit aed772ab33
9 changed files with 908 additions and 91 deletions

View File

@@ -4,6 +4,7 @@ import sys
import threading
import time
from collections.abc import Iterable, Iterator
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Type, Union
@@ -284,24 +285,33 @@ class DocumentConverter:
settings.perf.doc_batch_size, # pass format_options
):
_log.info("Going to convert document batch...")
process_func = partial(
self._process_document, raises_on_error=raises_on_error
)
# parallel processing only within input_batch
# with ThreadPoolExecutor(
# max_workers=settings.perf.doc_batch_concurrency
# ) as pool:
# yield from pool.map(self.process_document, input_batch)
# Note: PDF backends are not thread-safe, thread pool usage was disabled.
for item in map(
partial(self._process_document, raises_on_error=raises_on_error),
input_batch,
if (
settings.perf.doc_batch_concurrency > 1
and settings.perf.doc_batch_size > 1
):
elapsed = time.monotonic() - start_time
start_time = time.monotonic()
_log.info(
f"Finished converting document {item.input.file.name} in {elapsed:.2f} sec."
)
yield item
with ThreadPoolExecutor(
max_workers=settings.perf.doc_batch_concurrency
) as pool:
for item in pool.map(
process_func,
input_batch,
):
yield item
else:
for item in map(
process_func,
input_batch,
):
elapsed = time.monotonic() - start_time
start_time = time.monotonic()
_log.info(
f"Finished converting document {item.input.file.name} in {elapsed:.2f} sec."
)
yield item
def _get_pipeline(self, doc_format: InputFormat) -> Optional[BasePipeline]:
"""Retrieve or initialize a pipeline, reusing instances based on class and options."""
@@ -330,7 +340,7 @@ class DocumentConverter:
f"Reusing cached pipeline for {pipeline_class.__name__} with options hash {options_hash}"
)
return self.initialized_pipelines[cache_key]
return self.initialized_pipelines[cache_key]
def _process_document(
self, in_doc: InputDocument, raises_on_error: bool