From de0d9b50a21c176dedc8f318fa0ca315b1bf1c5b Mon Sep 17 00:00:00 2001 From: Christoph Auer Date: Wed, 23 Jul 2025 15:52:12 +0200 Subject: [PATCH] Option to enable threadpool with doc_batch_concurrency setting Signed-off-by: Christoph Auer --- docling/datamodel/settings.py | 19 ++++++--------- docling/document_converter.py | 45 ++++++++++++++++++++--------------- 2 files changed, 33 insertions(+), 31 deletions(-) diff --git a/docling/datamodel/settings.py b/docling/datamodel/settings.py index 6cfc953b..d4a9bdce 100644 --- a/docling/datamodel/settings.py +++ b/docling/datamodel/settings.py @@ -26,18 +26,13 @@ class DocumentLimits(BaseModel): class BatchConcurrencySettings(BaseModel): - doc_batch_size: int = 2 - doc_batch_concurrency: int = 2 - page_batch_size: int = 4 - page_batch_concurrency: int = 2 - elements_batch_size: int = 16 - - # doc_batch_size: int = 1 - # doc_batch_concurrency: int = 1 - # page_batch_size: int = 1 - # page_batch_concurrency: int = 1 - - # model_concurrency: int = 2 + doc_batch_size: int = 1 # Number of documents processed in one batch. Should be >= doc_batch_concurrency + doc_batch_concurrency: int = 1 # Number of parallel threads processing documents. Warning: Experimental! No benefit expected without free-threaded python. + page_batch_size: int = 4 # Number of pages processed in one batch. + page_batch_concurrency: int = 1 # Currently unused. + elements_batch_size: int = ( + 16 # Number of elements processed in one batch, in enrichment models. + ) # To force models into single core: export OMP_NUM_THREADS=1 diff --git a/docling/document_converter.py b/docling/document_converter.py index bbafb304..5ad19c6d 100644 --- a/docling/document_converter.py +++ b/docling/document_converter.py @@ -4,6 +4,7 @@ import sys import threading import time from collections.abc import Iterable, Iterator +from concurrent.futures import ThreadPoolExecutor from functools import partial from pathlib import Path from typing import Dict, List, Optional, Tuple, Type, Union @@ -284,27 +285,33 @@ class DocumentConverter: settings.perf.doc_batch_size, # pass format_options ): _log.info("Going to convert document batch...") + process_func = partial( + self._process_document, raises_on_error=raises_on_error + ) - # parallel processing only within input_batch - # with ThreadPoolExecutor( - # max_workers=settings.perf.doc_batch_concurrency - # ) as pool: - # yield from pool.map( - # partial(self._process_document, raises_on_error=raises_on_error), - # input_batch, - # ) - # Note: PDF backends are not thread-safe, thread pool usage was disabled. - - for item in map( - partial(self._process_document, raises_on_error=raises_on_error), - input_batch, + if ( + settings.perf.doc_batch_concurrency > 1 + and settings.perf.doc_batch_size > 1 ): - elapsed = time.monotonic() - start_time - start_time = time.monotonic() - _log.info( - f"Finished converting document {item.input.file.name} in {elapsed:.2f} sec." - ) - yield item + with ThreadPoolExecutor( + max_workers=settings.perf.doc_batch_concurrency + ) as pool: + for item in pool.map( + process_func, + input_batch, + ): + yield item + else: + for item in map( + process_func, + input_batch, + ): + elapsed = time.monotonic() - start_time + start_time = time.monotonic() + _log.info( + f"Finished converting document {item.input.file.name} in {elapsed:.2f} sec." + ) + yield item def _get_pipeline(self, doc_format: InputFormat) -> Optional[BasePipeline]: """Retrieve or initialize a pipeline, reusing instances based on class and options."""