Option to enable threadpool with doc_batch_concurrency setting

Signed-off-by: Christoph Auer <cau@zurich.ibm.com>
This commit is contained in:
Christoph Auer 2025-07-23 15:52:12 +02:00
parent 7b4db1940d
commit de0d9b50a2
2 changed files with 33 additions and 31 deletions

View File

@ -26,18 +26,13 @@ class DocumentLimits(BaseModel):
class BatchConcurrencySettings(BaseModel):
doc_batch_size: int = 2
doc_batch_concurrency: int = 2
page_batch_size: int = 4
page_batch_concurrency: int = 2
elements_batch_size: int = 16
# doc_batch_size: int = 1
# doc_batch_concurrency: int = 1
# page_batch_size: int = 1
# page_batch_concurrency: int = 1
# model_concurrency: int = 2
doc_batch_size: int = 1 # Number of documents processed in one batch. Should be >= doc_batch_concurrency
doc_batch_concurrency: int = 1 # Number of parallel threads processing documents. Warning: Experimental! No benefit expected without free-threaded python.
page_batch_size: int = 4 # Number of pages processed in one batch.
page_batch_concurrency: int = 1 # Currently unused.
elements_batch_size: int = (
16 # Number of elements processed in one batch, in enrichment models.
)
# To force models into single core: export OMP_NUM_THREADS=1

View File

@ -4,6 +4,7 @@ import sys
import threading
import time
from collections.abc import Iterable, Iterator
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Type, Union
@ -284,27 +285,33 @@ class DocumentConverter:
settings.perf.doc_batch_size, # pass format_options
):
_log.info("Going to convert document batch...")
process_func = partial(
self._process_document, raises_on_error=raises_on_error
)
# parallel processing only within input_batch
# with ThreadPoolExecutor(
# max_workers=settings.perf.doc_batch_concurrency
# ) as pool:
# yield from pool.map(
# partial(self._process_document, raises_on_error=raises_on_error),
# input_batch,
# )
# Note: PDF backends are not thread-safe, thread pool usage was disabled.
for item in map(
partial(self._process_document, raises_on_error=raises_on_error),
input_batch,
if (
settings.perf.doc_batch_concurrency > 1
and settings.perf.doc_batch_size > 1
):
elapsed = time.monotonic() - start_time
start_time = time.monotonic()
_log.info(
f"Finished converting document {item.input.file.name} in {elapsed:.2f} sec."
)
yield item
with ThreadPoolExecutor(
max_workers=settings.perf.doc_batch_concurrency
) as pool:
for item in pool.map(
process_func,
input_batch,
):
yield item
else:
for item in map(
process_func,
input_batch,
):
elapsed = time.monotonic() - start_time
start_time = time.monotonic()
_log.info(
f"Finished converting document {item.input.file.name} in {elapsed:.2f} sec."
)
yield item
def _get_pipeline(self, doc_format: InputFormat) -> Optional[BasePipeline]:
"""Retrieve or initialize a pipeline, reusing instances based on class and options."""