Update document_converter.py

Fixing caching same class with different options by using composite key (class, options)

# TODO this will ignore if different options have been defined for the same pipeline class.

at row 292

Signed-off-by: mislavmartinic <mislav.martinic@pontistechnology.com>
This commit is contained in:
mislavmartinic 2025-03-13 14:48:17 +13:00 committed by GitHub
parent 17c5bf1242
commit 955bed5c5c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -5,6 +5,7 @@ import time
from functools import partial
from pathlib import Path
from typing import Dict, Iterable, Iterator, List, Optional, Tuple, Type, Union
import hashlib
from pydantic import BaseModel, ConfigDict, model_validator, validate_call
@ -181,7 +182,12 @@ class DocumentConverter:
)
for format in self.allowed_formats
}
self.initialized_pipelines: Dict[Type[BasePipeline], BasePipeline] = {}
self.initialized_pipelines: Dict[Tuple[Type[BasePipeline], str], BasePipeline] = {}
def _get_pipeline_options_hash(self, pipeline_options: PipelineOptions) -> str:
"""Generate a hash of pipeline options to use as part of the cache key."""
options_str = str(pipeline_options.model_dump() if hasattr(pipeline_options, 'model_dump') else pipeline_options)
return hashlib.md5(options_str.encode('utf-8')).hexdigest()
def initialize_pipeline(self, format: InputFormat):
"""Initialize the conversion pipeline for the selected format."""
@ -279,31 +285,30 @@ class DocumentConverter:
yield item
def _get_pipeline(self, doc_format: InputFormat) -> Optional[BasePipeline]:
"""Retrieve or initialize a pipeline, reusing instances based on class and options."""
fopt = self.format_to_options.get(doc_format)
if fopt is None:
if fopt is None or fopt.pipeline_options is None:
return None
else:
pipeline_class = fopt.pipeline_cls
pipeline_options = fopt.pipeline_options
if pipeline_options is None:
return None
# TODO this will ignore if different options have been defined for the same pipeline class.
if (
pipeline_class not in self.initialized_pipelines
or self.initialized_pipelines[pipeline_class].pipeline_options
!= pipeline_options
):
self.initialized_pipelines[pipeline_class] = pipeline_class(
pipeline_options=pipeline_options
)
return self.initialized_pipelines[pipeline_class]
pipeline_class = fopt.pipeline_cls
pipeline_options = fopt.pipeline_options
options_hash = self._get_pipeline_options_hash(pipeline_options)
# Use a composite key to cache pipelines
cache_key = (pipeline_class, options_hash)
if cache_key not in self.initialized_pipelines:
_log.info(f"Initializing pipeline for {pipeline_class.__name__} with options hash {options_hash}")
self.initialized_pipelines[cache_key] = pipeline_class(pipeline_options=pipeline_options)
else:
_log.debug(f"Reusing cached pipeline for {pipeline_class.__name__} with options hash {options_hash}")
return self.initialized_pipelines[cache_key]
def _process_document(
self, in_doc: InputDocument, raises_on_error: bool
) -> ConversionResult:
valid = (
self.allowed_formats is not None and in_doc.format in self.allowed_formats
)
@ -345,7 +350,6 @@ class DocumentConverter:
else:
if raises_on_error:
raise ConversionError(f"Input document {in_doc.file} is not valid.")
else:
# invalid doc or not of desired format
conv_res = ConversionResult(