diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 4dbd4161..1318ee47 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -20,7 +20,7 @@ env: tests/test_asr_pipeline.py tests/test_threaded_pipeline.py PYTEST_TO_SKIP: |- - EXAMPLES_TO_SKIP: '^(batch_convert|compare_vlm_models|minimal|minimal_vlm_pipeline|minimal_asr_pipeline|export_multimodal|custom_convert|develop_picture_enrichment|rapidocr_with_custom_models|offline_convert|pictures_description|pictures_description_api|vlm_pipeline_api_model|granitedocling_repetition_stopping|mlx_whisper_example|gpu_standard_pipeline|gpu_vlm_pipeline)\.py$' + EXAMPLES_TO_SKIP: '^(batch_convert|compare_vlm_models|minimal|minimal_vlm_pipeline|minimal_asr_pipeline|export_multimodal|custom_convert|develop_picture_enrichment|rapidocr_with_custom_models|offline_convert|pictures_description|pictures_description_api|vlm_pipeline_api_model|granitedocling_repetition_stopping|mlx_whisper_example|gpu_standard_pipeline|gpu_vlm_pipeline|demo_layout_vlm)\.py$' jobs: lint: @@ -28,7 +28,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ['3.12'] + python-version: ["3.12"] steps: - uses: actions/checkout@v5 @@ -58,240 +58,240 @@ jobs: uv run pre-commit run --all-files run-tests-1: - runs-on: ubuntu-latest - strategy: - fail-fast: false - matrix: - python-version: ['3.9', '3.10', '3.11', '3.12', '3.13', '3.14'] - steps: - - uses: actions/checkout@v5 + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] + steps: + - uses: actions/checkout@v5 - - name: Grant permissions to APT cache directory # allows restore - run: sudo chown -R $USER:$USER /var/cache/apt/archives + - name: Grant permissions to APT cache directory # allows restore + run: sudo chown -R $USER:$USER /var/cache/apt/archives - - name: Cache APT packages - id: apt-cache - uses: actions/cache@v4 - with: - path: /var/cache/apt/archives - key: apt-packages-${{ runner.os }}-${{ hashFiles('.github/workflows/checks.yml') }} - restore-keys: | - apt-packages-${{ runner.os }}- + - name: Cache APT packages + id: apt-cache + uses: actions/cache@v4 + with: + path: /var/cache/apt/archives + key: apt-packages-${{ runner.os }}-${{ hashFiles('.github/workflows/checks.yml') }} + restore-keys: | + apt-packages-${{ runner.os }}- - - name: Install System Dependencies - run: | - sudo apt-get -qq update - sudo apt-get -qq install -y ffmpeg tesseract-ocr tesseract-ocr-eng tesseract-ocr-fra tesseract-ocr-deu tesseract-ocr-spa tesseract-ocr-script-latn libleptonica-dev libtesseract-dev libreoffice pkg-config + - name: Install System Dependencies + run: | + sudo apt-get -qq update + sudo apt-get -qq install -y ffmpeg tesseract-ocr tesseract-ocr-eng tesseract-ocr-fra tesseract-ocr-deu tesseract-ocr-spa tesseract-ocr-script-latn libleptonica-dev libtesseract-dev libreoffice pkg-config - - name: Set TESSDATA_PREFIX - run: echo "TESSDATA_PREFIX=$(dpkg -L tesseract-ocr-eng | grep tessdata$)" >> "$GITHUB_ENV" + - name: Set TESSDATA_PREFIX + run: echo "TESSDATA_PREFIX=$(dpkg -L tesseract-ocr-eng | grep tessdata$)" >> "$GITHUB_ENV" - - name: Install uv and set the python version - uses: astral-sh/setup-uv@v6 - with: - python-version: ${{ matrix.python-version }} + - name: Install uv and set the python version + uses: astral-sh/setup-uv@v6 + with: + python-version: ${{ matrix.python-version }} - - name: Install Python Dependencies - run: uv sync --frozen --all-extras + - name: Install Python Dependencies + run: uv sync --frozen --all-extras - - name: Cache Models - uses: actions/cache@v4 - with: - path: | - ~/.cache/huggingface - ~/.cache/modelscope - ~/.EasyOCR/ - key: models-cache + - name: Cache Models + uses: actions/cache@v4 + with: + path: | + ~/.cache/huggingface + ~/.cache/modelscope + ~/.EasyOCR/ + key: models-cache - - name: Pre-download Models - run: uv run python -c "import easyocr; reader = easyocr.Reader(['en', 'fr', 'de', 'es'])" + - name: Pre-download Models + run: uv run python -c "import easyocr; reader = easyocr.Reader(['en', 'fr', 'de', 'es'])" - - name: Run tests for GROUP1 - run: | - echo "--- Running tests ---" - GROUP1=$(echo "$PYTEST_ML" | sed -e 's/^/--ignore=/' | tr '\n' ' ') - echo "Running tests for GROUP1" - uv run pytest -v --durations=0 --cov=docling --cov-report=xml --cov-context=test $GROUP1 + - name: Run tests for GROUP1 + run: | + echo "--- Running tests ---" + GROUP1=$(echo "$PYTEST_ML" | sed -e 's/^/--ignore=/' | tr '\n' ' ') + echo "Running tests for GROUP1" + uv run pytest -v --durations=0 --cov=docling --cov-report=xml --cov-context=test $GROUP1 - - name: Upload coverage to Codecov - if: inputs.push_coverage - uses: codecov/codecov-action@v5 - with: - token: ${{ secrets.CODECOV_TOKEN }} - files: ./coverage.xml - flags: run-tests-1 + - name: Upload coverage to Codecov + if: inputs.push_coverage + uses: codecov/codecov-action@v5 + with: + token: ${{ secrets.CODECOV_TOKEN }} + files: ./coverage.xml + flags: run-tests-1 - - name: Grant permissions to APT cache directory # allows backup - run: sudo chown -R $USER:$USER /var/cache/apt/archives + - name: Grant permissions to APT cache directory # allows backup + run: sudo chown -R $USER:$USER /var/cache/apt/archives run-tests-2: - runs-on: ubuntu-latest - strategy: - fail-fast: false - matrix: - python-version: ['3.9', '3.10', '3.11', '3.12', '3.13', '3.14'] - steps: - - uses: actions/checkout@v5 + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] + steps: + - uses: actions/checkout@v5 - - name: Grant permissions to APT cache directory # allows restore - run: sudo chown -R $USER:$USER /var/cache/apt/archives + - name: Grant permissions to APT cache directory # allows restore + run: sudo chown -R $USER:$USER /var/cache/apt/archives - - name: Cache APT packages - id: apt-cache - uses: actions/cache@v4 - with: - path: /var/cache/apt/archives - key: apt-packages-${{ runner.os }}-${{ hashFiles('.github/workflows/checks.yml') }} - restore-keys: | - apt-packages-${{ runner.os }}- + - name: Cache APT packages + id: apt-cache + uses: actions/cache@v4 + with: + path: /var/cache/apt/archives + key: apt-packages-${{ runner.os }}-${{ hashFiles('.github/workflows/checks.yml') }} + restore-keys: | + apt-packages-${{ runner.os }}- - - name: Install System Dependencies - run: | - sudo apt-get -qq update - sudo apt-get -qq install -y ffmpeg tesseract-ocr tesseract-ocr-eng tesseract-ocr-fra tesseract-ocr-deu tesseract-ocr-spa tesseract-ocr-script-latn libleptonica-dev libtesseract-dev libreoffice pkg-config + - name: Install System Dependencies + run: | + sudo apt-get -qq update + sudo apt-get -qq install -y ffmpeg tesseract-ocr tesseract-ocr-eng tesseract-ocr-fra tesseract-ocr-deu tesseract-ocr-spa tesseract-ocr-script-latn libleptonica-dev libtesseract-dev libreoffice pkg-config - - name: Set TESSDATA_PREFIX - run: echo "TESSDATA_PREFIX=$(dpkg -L tesseract-ocr-eng | grep tessdata$)" >> "$GITHUB_ENV" + - name: Set TESSDATA_PREFIX + run: echo "TESSDATA_PREFIX=$(dpkg -L tesseract-ocr-eng | grep tessdata$)" >> "$GITHUB_ENV" - - name: Install uv and set the python version - uses: astral-sh/setup-uv@v6 - with: - python-version: ${{ matrix.python-version }} + - name: Install uv and set the python version + uses: astral-sh/setup-uv@v6 + with: + python-version: ${{ matrix.python-version }} - - name: Install Python Dependencies - run: uv sync --frozen --all-extras + - name: Install Python Dependencies + run: uv sync --frozen --all-extras - - name: Cache Models - uses: actions/cache@v4 - with: - path: | - ~/.cache/huggingface - ~/.cache/modelscope - ~/.EasyOCR/ - key: models-cache + - name: Cache Models + uses: actions/cache@v4 + with: + path: | + ~/.cache/huggingface + ~/.cache/modelscope + ~/.EasyOCR/ + key: models-cache - - name: Pre-download Models - run: uv run python -c "import easyocr; reader = easyocr.Reader(['en', 'fr', 'de', 'es'])" + - name: Pre-download Models + run: uv run python -c "import easyocr; reader = easyocr.Reader(['en', 'fr', 'de', 'es'])" - - name: Run tests for GROUP2 - run: | - echo "--- Running tests ---" - GROUP2=$(echo "$PYTEST_ML" | tr '\n' ' ') - echo "Running tests for GROUP2" - DESELECT_OPT="" - if [ -n "$PYTEST_TO_SKIP" ]; then - DESELECT_OPT="--deselect $PYTEST_TO_SKIP" - fi - echo "Running tests for GROUP2" - uv run pytest -v --durations=0 --cov=docling --cov-report=xml --cov-context=test $GROUP2 $DESELECT_OPT + - name: Run tests for GROUP2 + run: | + echo "--- Running tests ---" + GROUP2=$(echo "$PYTEST_ML" | tr '\n' ' ') + echo "Running tests for GROUP2" + DESELECT_OPT="" + if [ -n "$PYTEST_TO_SKIP" ]; then + DESELECT_OPT="--deselect $PYTEST_TO_SKIP" + fi + echo "Running tests for GROUP2" + uv run pytest -v --durations=0 --cov=docling --cov-report=xml --cov-context=test $GROUP2 $DESELECT_OPT - - name: Upload coverage to Codecov - if: inputs.push_coverage - uses: codecov/codecov-action@v5 - with: - token: ${{ secrets.CODECOV_TOKEN }} - files: ./coverage.xml - flags: run-tests-2 + - name: Upload coverage to Codecov + if: inputs.push_coverage + uses: codecov/codecov-action@v5 + with: + token: ${{ secrets.CODECOV_TOKEN }} + files: ./coverage.xml + flags: run-tests-2 - - name: Grant permissions to APT cache directory # allows backup - run: sudo chown -R $USER:$USER /var/cache/apt/archives + - name: Grant permissions to APT cache directory # allows backup + run: sudo chown -R $USER:$USER /var/cache/apt/archives run-examples: - runs-on: ubuntu-latest - strategy: - fail-fast: false - matrix: - python-version: ['3.9', '3.10', '3.11', '3.12', '3.13', '3.14'] - steps: - - uses: actions/checkout@v5 + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", "3.14"] + steps: + - uses: actions/checkout@v5 - - name: Grant permissions to APT cache directory # allows restore - run: sudo chown -R $USER:$USER /var/cache/apt/archives + - name: Grant permissions to APT cache directory # allows restore + run: sudo chown -R $USER:$USER /var/cache/apt/archives - - name: Cache APT packages - id: apt-cache - uses: actions/cache@v4 - with: - path: /var/cache/apt/archives - key: apt-packages-${{ runner.os }}-${{ hashFiles('.github/workflows/checks.yml') }} - restore-keys: | - apt-packages-${{ runner.os }}- + - name: Cache APT packages + id: apt-cache + uses: actions/cache@v4 + with: + path: /var/cache/apt/archives + key: apt-packages-${{ runner.os }}-${{ hashFiles('.github/workflows/checks.yml') }} + restore-keys: | + apt-packages-${{ runner.os }}- - - name: Install System Dependencies - run: | - sudo apt-get -qq update - sudo apt-get -qq install -y ffmpeg tesseract-ocr tesseract-ocr-eng tesseract-ocr-fra tesseract-ocr-deu tesseract-ocr-spa tesseract-ocr-script-latn libleptonica-dev libtesseract-dev libreoffice pkg-config + - name: Install System Dependencies + run: | + sudo apt-get -qq update + sudo apt-get -qq install -y ffmpeg tesseract-ocr tesseract-ocr-eng tesseract-ocr-fra tesseract-ocr-deu tesseract-ocr-spa tesseract-ocr-script-latn libleptonica-dev libtesseract-dev libreoffice pkg-config - - name: Set TESSDATA_PREFIX - run: echo "TESSDATA_PREFIX=$(dpkg -L tesseract-ocr-eng | grep tessdata$)" >> "$GITHUB_ENV" + - name: Set TESSDATA_PREFIX + run: echo "TESSDATA_PREFIX=$(dpkg -L tesseract-ocr-eng | grep tessdata$)" >> "$GITHUB_ENV" - - name: Install uv and set the python version - uses: astral-sh/setup-uv@v6 - with: - python-version: ${{ matrix.python-version }} + - name: Install uv and set the python version + uses: astral-sh/setup-uv@v6 + with: + python-version: ${{ matrix.python-version }} - - name: Install Python Dependencies - run: uv sync --frozen --all-extras + - name: Install Python Dependencies + run: uv sync --frozen --all-extras - - name: Cache Models - uses: actions/cache@v4 - with: - path: | - ~/.cache/huggingface - ~/.cache/modelscope - ~/.EasyOCR/ - key: models-cache + - name: Cache Models + uses: actions/cache@v4 + with: + path: | + ~/.cache/huggingface + ~/.cache/modelscope + ~/.EasyOCR/ + key: models-cache - - name: Free up disk space - run: | - df -h - sudo rm -rf /usr/share/dotnet - sudo rm -rf /usr/local/lib/android - sudo rm -rf /opt/ghc - sudo apt-get clean - df -h + - name: Free up disk space + run: | + df -h + sudo rm -rf /usr/share/dotnet + sudo rm -rf /usr/local/lib/android + sudo rm -rf /opt/ghc + sudo apt-get clean + df -h - - name: Run examples - run: | - echo "--- Creating output directory ---" - mkdir -p scratch - - echo "--- Running examples ---" - - summary_file="runtime_summary.log" - echo "--- Example Runtimes ---" > "$summary_file" - - for file in docs/examples/*.py; do - if [[ "$(basename "$file")" =~ ${EXAMPLES_TO_SKIP} ]]; then - echo "Skipping example: $(basename "$file")" - else - echo "--- Running example $(basename "$file") ---" - - start_time=$SECONDS + - name: Run examples + run: | + echo "--- Creating output directory ---" + mkdir -p scratch - uv run --no-sync python "$file" || exit 1 - duration=$((SECONDS - start_time)) - echo "Finished in ${duration}s." - - echo "$(basename "$file"): ${duration}s" >> "$summary_file" - fi - done - - echo - echo "===================================" - echo " Final Runtime Summary " - echo "===================================" - cat "$summary_file" - echo "===================================" + echo "--- Running examples ---" - - name: Grant permissions to APT cache directory # allows backup - run: sudo chown -R $USER:$USER /var/cache/apt/archives + summary_file="runtime_summary.log" + echo "--- Example Runtimes ---" > "$summary_file" + + for file in docs/examples/*.py; do + if [[ "$(basename "$file")" =~ ${EXAMPLES_TO_SKIP} ]]; then + echo "Skipping example: $(basename "$file")" + else + echo "--- Running example $(basename "$file") ---" + + start_time=$SECONDS + + uv run --no-sync python "$file" || exit 1 + duration=$((SECONDS - start_time)) + echo "Finished in ${duration}s." + + echo "$(basename "$file"): ${duration}s" >> "$summary_file" + fi + done + + echo + echo "===================================" + echo " Final Runtime Summary " + echo "===================================" + cat "$summary_file" + echo "===================================" + + - name: Grant permissions to APT cache directory # allows backup + run: sudo chown -R $USER:$USER /var/cache/apt/archives build-package: runs-on: ubuntu-latest strategy: matrix: - python-version: ['3.12'] + python-version: ["3.12"] steps: - uses: actions/checkout@v5 @@ -322,7 +322,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ['3.12'] + python-version: ["3.12"] steps: - name: Download all the dists uses: actions/download-artifact@v4 diff --git a/docling/datamodel/base_models.py b/docling/datamodel/base_models.py index cd826f96..411df4ca 100644 --- a/docling/datamodel/base_models.py +++ b/docling/datamodel/base_models.py @@ -216,6 +216,7 @@ class VlmPrediction(BaseModel): generation_time: float = -1 num_tokens: Optional[int] = None stop_reason: VlmStopReason = VlmStopReason.UNSPECIFIED + input_prompt: Optional[str] = None class ContainerElement( diff --git a/docling/datamodel/pipeline_options_vlm_model.py b/docling/datamodel/pipeline_options_vlm_model.py index 9b03d58a..d90b8d70 100644 --- a/docling/datamodel/pipeline_options_vlm_model.py +++ b/docling/datamodel/pipeline_options_vlm_model.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import Any, Dict, List, Literal, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union from docling_core.types.doc.page import SegmentedPage from pydantic import AnyUrl, BaseModel, ConfigDict @@ -9,6 +9,11 @@ from typing_extensions import deprecated from docling.datamodel.accelerator_options import AcceleratorDevice from docling.models.utils.generation_utils import GenerationStopper +if TYPE_CHECKING: + from docling_core.types.doc.page import SegmentedPage + + from docling.datamodel.base_models import Page + class BaseVlmOptions(BaseModel): kind: str @@ -17,7 +22,22 @@ class BaseVlmOptions(BaseModel): max_size: Optional[int] = None temperature: float = 0.0 - def build_prompt(self, page: Optional[SegmentedPage]) -> str: + def build_prompt( + self, + page: Optional["SegmentedPage"], + *, + _internal_page: Optional["Page"] = None, + ) -> str: + """Build the prompt for VLM inference. + + Args: + page: The parsed/segmented page to process. + _internal_page: Internal parameter for experimental layout-aware pipelines. + Do not rely on this in user code - subject to change. + + Returns: + The formatted prompt string. + """ return self.prompt def decode_response(self, text: str) -> str: @@ -83,6 +103,7 @@ class InlineVlmOptions(BaseVlmOptions): use_kv_cache: bool = True max_new_tokens: int = 4096 track_generated_tokens: bool = False + track_input_prompt: bool = False @property def repo_cache_folder(self) -> str: @@ -110,3 +131,4 @@ class ApiVlmOptions(BaseVlmOptions): stop_strings: List[str] = [] custom_stopping_criteria: List[Union[GenerationStopper]] = [] + track_input_prompt: bool = False diff --git a/docling/experimental/__init__.py b/docling/experimental/__init__.py new file mode 100644 index 00000000..e21e5131 --- /dev/null +++ b/docling/experimental/__init__.py @@ -0,0 +1,5 @@ +"""Experimental modules for Docling. + +This package contains experimental features that are under development +and may change or be removed in future versions. +""" diff --git a/docling/experimental/datamodel/__init__.py b/docling/experimental/datamodel/__init__.py new file mode 100644 index 00000000..c76b060a --- /dev/null +++ b/docling/experimental/datamodel/__init__.py @@ -0,0 +1 @@ +"""Experimental datamodel modules.""" diff --git a/docling/experimental/datamodel/threaded_layout_vlm_pipeline_options.py b/docling/experimental/datamodel/threaded_layout_vlm_pipeline_options.py new file mode 100644 index 00000000..533b50fc --- /dev/null +++ b/docling/experimental/datamodel/threaded_layout_vlm_pipeline_options.py @@ -0,0 +1,45 @@ +"""Options for the threaded layout+VLM pipeline.""" + +from typing import Union + +from pydantic import model_validator + +from docling.datamodel.layout_model_specs import DOCLING_LAYOUT_HERON +from docling.datamodel.pipeline_options import LayoutOptions, PaginatedPipelineOptions +from docling.datamodel.pipeline_options_vlm_model import ( + ApiVlmOptions, + InlineVlmOptions, + ResponseFormat, +) +from docling.datamodel.vlm_model_specs import GRANITEDOCLING_TRANSFORMERS + + +class ThreadedLayoutVlmPipelineOptions(PaginatedPipelineOptions): + """Pipeline options for the threaded layout+VLM pipeline.""" + + images_scale: float = 2.0 + + # VLM configuration (will be enhanced with layout awareness by the pipeline) + vlm_options: Union[InlineVlmOptions, ApiVlmOptions] = GRANITEDOCLING_TRANSFORMERS + + # Layout model configuration + layout_options: LayoutOptions = LayoutOptions( + model_spec=DOCLING_LAYOUT_HERON, skip_cell_assignment=True + ) + + # Threading and batching controls + layout_batch_size: int = 4 + vlm_batch_size: int = 4 + batch_timeout_seconds: float = 2.0 + queue_max_size: int = 50 + + @model_validator(mode="after") + def validate_response_format(self): + """Validate that VLM response format is DOCTAGS (required for this pipeline).""" + if self.vlm_options.response_format != ResponseFormat.DOCTAGS: + raise ValueError( + f"ThreadedLayoutVlmPipeline only supports DOCTAGS response format, " + f"but got {self.vlm_options.response_format}. " + f"Please set vlm_options.response_format=ResponseFormat.DOCTAGS" + ) + return self diff --git a/docling/experimental/pipeline/__init__.py b/docling/experimental/pipeline/__init__.py new file mode 100644 index 00000000..9a966763 --- /dev/null +++ b/docling/experimental/pipeline/__init__.py @@ -0,0 +1 @@ +"""Experimental pipeline modules.""" diff --git a/docling/experimental/pipeline/threaded_layout_vlm_pipeline.py b/docling/experimental/pipeline/threaded_layout_vlm_pipeline.py new file mode 100644 index 00000000..92c0c104 --- /dev/null +++ b/docling/experimental/pipeline/threaded_layout_vlm_pipeline.py @@ -0,0 +1,436 @@ +"""Threaded Layout+VLM Pipeline +================================ +A specialized two-stage threaded pipeline that combines layout model preprocessing +with VLM processing. The layout model detects document elements and coordinates, +which are then injected into the VLM prompt for enhanced structured output. +""" + +from __future__ import annotations + +import itertools +import logging +from pathlib import Path +from typing import TYPE_CHECKING, List, Optional, Union, cast + +from docling_core.types.doc import DoclingDocument +from docling_core.types.doc.document import DocTagsDocument +from PIL import Image as PILImage + +if TYPE_CHECKING: + from docling_core.types.doc.page import SegmentedPage + +from docling.backend.abstract_backend import AbstractDocumentBackend +from docling.backend.pdf_backend import PdfDocumentBackend +from docling.datamodel.base_models import ConversionStatus, Page +from docling.datamodel.document import ConversionResult +from docling.datamodel.pipeline_options_vlm_model import ( + ApiVlmOptions, + InferenceFramework, + InlineVlmOptions, +) +from docling.datamodel.settings import settings +from docling.experimental.datamodel.threaded_layout_vlm_pipeline_options import ( + ThreadedLayoutVlmPipelineOptions, +) +from docling.models.api_vlm_model import ApiVlmModel +from docling.models.base_model import BaseVlmPageModel +from docling.models.layout_model import LayoutModel +from docling.models.vlm_models_inline.hf_transformers_model import ( + HuggingFaceTransformersVlmModel, +) +from docling.models.vlm_models_inline.mlx_model import HuggingFaceMlxModel +from docling.pipeline.base_pipeline import BasePipeline +from docling.pipeline.standard_pdf_pipeline import ( + ProcessingResult, + RunContext, + ThreadedItem, + ThreadedPipelineStage, + ThreadedQueue, +) +from docling.utils.profiling import ProfilingScope, TimeRecorder + +_log = logging.getLogger(__name__) + + +class ThreadedLayoutVlmPipeline(BasePipeline): + """Two-stage threaded pipeline: Layout Model → VLM Model.""" + + def __init__(self, pipeline_options: ThreadedLayoutVlmPipelineOptions) -> None: + super().__init__(pipeline_options) + self.pipeline_options: ThreadedLayoutVlmPipelineOptions = pipeline_options + self._run_seq = itertools.count(1) # deterministic, monotonic run ids + + # VLM model type (initialized in _init_models) + self.vlm_model: BaseVlmPageModel + + # Initialize models + self._init_models() + + def _init_models(self) -> None: + """Initialize layout and VLM models.""" + art_path = self._resolve_artifacts_path() + + # Layout model + self.layout_model = LayoutModel( + artifacts_path=art_path, + accelerator_options=self.pipeline_options.accelerator_options, + options=self.pipeline_options.layout_options, + ) + + # VLM model based on options type + # Create layout-aware VLM options internally + base_vlm_options = self.pipeline_options.vlm_options + + class LayoutAwareVlmOptions(type(base_vlm_options)): # type: ignore[misc] + def build_prompt( + self, + page: Optional[SegmentedPage], + *, + _internal_page: Optional[Page] = None, + ) -> str: + base_prompt = self.prompt + augmented_prompt = base_prompt + + # In this layout-aware pipeline, _internal_page is always provided + if _internal_page is None: + return base_prompt + + if not _internal_page.size: + _log.warning( + f"Page size not available for page {_internal_page.page_no}. Cannot enhance prompt with layout info." + ) + return base_prompt + + if _internal_page.predictions.layout: + from docling_core.types.doc.tokens import DocumentToken + + layout_elements = [] + for cluster in _internal_page.predictions.layout.clusters: + # Get proper tag name from DocItemLabel + tag_name = DocumentToken.create_token_name_from_doc_item_label( + label=cluster.label + ) + + # Convert bbox to tuple and get location tokens + bbox_tuple = cluster.bbox.as_tuple() + location_tokens = DocumentToken.get_location( + bbox=bbox_tuple, + page_w=_internal_page.size.width, + page_h=_internal_page.size.height, + ) + + # Create XML element with DocTags format + xml_element = f"<{tag_name}>{location_tokens}" + layout_elements.append(xml_element) + + if layout_elements: + # Join elements with newlines and wrap in layout tags + layout_xml = ( + "" + "\n".join(layout_elements) + "" + ) + layout_injection = f"{layout_xml}" + + augmented_prompt = base_prompt + layout_injection + + _log.debug( + "Enhanced Prompt with Layout Info: %s\n", augmented_prompt + ) + + return augmented_prompt + + vlm_options = LayoutAwareVlmOptions(**base_vlm_options.model_dump()) + + if isinstance(base_vlm_options, ApiVlmOptions): + self.vlm_model = ApiVlmModel( + enabled=True, + enable_remote_services=self.pipeline_options.enable_remote_services, + vlm_options=vlm_options, + ) + elif isinstance(base_vlm_options, InlineVlmOptions): + if vlm_options.inference_framework == InferenceFramework.TRANSFORMERS: + self.vlm_model = HuggingFaceTransformersVlmModel( + enabled=True, + artifacts_path=art_path, + accelerator_options=self.pipeline_options.accelerator_options, + vlm_options=vlm_options, + ) + elif vlm_options.inference_framework == InferenceFramework.MLX: + self.vlm_model = HuggingFaceMlxModel( + enabled=True, + artifacts_path=art_path, + accelerator_options=self.pipeline_options.accelerator_options, + vlm_options=vlm_options, + ) + elif vlm_options.inference_framework == InferenceFramework.VLLM: + from docling.models.vlm_models_inline.vllm_model import VllmVlmModel + + self.vlm_model = VllmVlmModel( + enabled=True, + artifacts_path=art_path, + accelerator_options=self.pipeline_options.accelerator_options, + vlm_options=vlm_options, + ) + else: + raise ValueError( + f"Unsupported VLM inference framework: {vlm_options.inference_framework}" + ) + else: + raise ValueError(f"Unsupported VLM options type: {type(base_vlm_options)}") + + def _resolve_artifacts_path(self) -> Optional[Path]: + """Resolve artifacts path from options or settings.""" + if self.pipeline_options.artifacts_path: + p = Path(self.pipeline_options.artifacts_path).expanduser() + elif settings.artifacts_path: + p = Path(settings.artifacts_path).expanduser() + else: + return None + if not p.is_dir(): + raise RuntimeError( + f"{p} does not exist or is not a directory containing the required models" + ) + return p + + def _create_run_ctx(self) -> RunContext: + """Create pipeline stages and wire them together.""" + opts = self.pipeline_options + + # Layout stage + layout_stage = ThreadedPipelineStage( + name="layout", + model=self.layout_model, + batch_size=opts.layout_batch_size, + batch_timeout=opts.batch_timeout_seconds, + queue_max_size=opts.queue_max_size, + ) + + # VLM stage - now layout-aware through enhanced build_prompt + vlm_stage = ThreadedPipelineStage( + name="vlm", + model=self.vlm_model, + batch_size=opts.vlm_batch_size, + batch_timeout=opts.batch_timeout_seconds, + queue_max_size=opts.queue_max_size, + ) + + # Wire stages + output_q = ThreadedQueue(opts.queue_max_size) + layout_stage.add_output_queue(vlm_stage.input_queue) + vlm_stage.add_output_queue(output_q) + + stages = [layout_stage, vlm_stage] + return RunContext( + stages=stages, first_stage=layout_stage, output_queue=output_q + ) + + def _build_document(self, conv_res: ConversionResult) -> ConversionResult: + """Build document using threaded layout+VLM pipeline.""" + run_id = next(self._run_seq) + assert isinstance(conv_res.input._backend, PdfDocumentBackend) + backend = conv_res.input._backend + + # Initialize pages + start_page, end_page = conv_res.input.limits.page_range + pages: List[Page] = [] + for i in range(conv_res.input.page_count): + if start_page - 1 <= i <= end_page - 1: + page = Page(page_no=i) + page._backend = backend.load_page(i) + if page._backend and page._backend.is_valid(): + page.size = page._backend.get_size() + conv_res.pages.append(page) + pages.append(page) + + if not pages: + conv_res.status = ConversionStatus.FAILURE + return conv_res + + total_pages = len(pages) + ctx = self._create_run_ctx() + for st in ctx.stages: + st.start() + + proc = ProcessingResult(total_expected=total_pages) + fed_idx = 0 + batch_size = 32 + + try: + while proc.success_count + proc.failure_count < total_pages: + # Feed pages to first stage + while fed_idx < total_pages: + ok = ctx.first_stage.input_queue.put( + ThreadedItem( + payload=pages[fed_idx], + run_id=run_id, + page_no=pages[fed_idx].page_no, + conv_res=conv_res, + ), + timeout=0.0, + ) + if ok: + fed_idx += 1 + if fed_idx == total_pages: + ctx.first_stage.input_queue.close() + else: + break + + # Drain results from output + out_batch = ctx.output_queue.get_batch(batch_size, timeout=0.05) + for itm in out_batch: + if itm.run_id != run_id: + continue + if itm.is_failed or itm.error: + proc.failed_pages.append( + (itm.page_no, itm.error or RuntimeError("unknown error")) + ) + else: + assert itm.payload is not None + proc.pages.append(itm.payload) + + # Handle early termination + if not out_batch and ctx.output_queue.closed: + missing = total_pages - (proc.success_count + proc.failure_count) + if missing > 0: + proc.failed_pages.extend( + [(-1, RuntimeError("pipeline terminated early"))] * missing + ) + break + finally: + for st in ctx.stages: + st.stop() + ctx.output_queue.close() + + self._integrate_results(conv_res, proc) + return conv_res + + def _integrate_results( + self, conv_res: ConversionResult, proc: ProcessingResult + ) -> None: + """Integrate processing results into conversion result.""" + page_map = {p.page_no: p for p in proc.pages} + + # Track failed pages for cleanup + failed_page_nos = {fp for fp, _ in proc.failed_pages} + + # Collect pages that will be removed (failed pages) for resource cleanup + pages_to_remove = [p for p in conv_res.pages if p.page_no in failed_page_nos] + + conv_res.pages = [ + page_map.get(p.page_no, p) + for p in conv_res.pages + if p.page_no in page_map + or not any(fp == p.page_no for fp, _ in proc.failed_pages) + ] + + if proc.is_complete_failure: + conv_res.status = ConversionStatus.FAILURE + elif proc.is_partial_success: + conv_res.status = ConversionStatus.PARTIAL_SUCCESS + else: + conv_res.status = ConversionStatus.SUCCESS + + # Clean up resources for failed pages that were removed + for p in pages_to_remove: + if p._backend is not None: + p._backend.unload() + p._image_cache = {} + # Clean up parsed_page if it exists (it's Optional[SegmentedPdfPage]) + if p.parsed_page is not None: + del p.parsed_page + p.parsed_page = None + + # Clean up images if not needed for remaining pages + if not self.pipeline_options.generate_page_images: + for p in conv_res.pages: + p._image_cache = {} + + def _assemble_document(self, conv_res: ConversionResult) -> ConversionResult: + """Assemble final document from VLM predictions.""" + from docling_core.types.doc import DocItem, ImageRef, PictureItem + + from docling.datamodel.pipeline_options_vlm_model import ResponseFormat + + with TimeRecorder(conv_res, "doc_assemble", scope=ProfilingScope.DOCUMENT): + # Response format validation is done in ThreadedLayoutVlmPipelineOptions + # This check is kept as a safety net, but should never trigger if validation works + if ( + self.pipeline_options.vlm_options.response_format + != ResponseFormat.DOCTAGS + ): + raise RuntimeError( + f"Unsupported VLM response format {self.pipeline_options.vlm_options.response_format}. Only DOCTAGS format is supported." + ) + conv_res.document = self._turn_dt_into_doc(conv_res) + + # Generate images of the requested element types + if self.pipeline_options.generate_picture_images: + # Create mapping from page_no to Page object since pages may be non-continuous + page_map = {p.page_no: p for p in conv_res.pages} + scale = self.pipeline_options.images_scale + for element, _level in conv_res.document.iterate_items(): + if not isinstance(element, DocItem) or len(element.prov) == 0: + continue + if ( + isinstance(element, PictureItem) + and self.pipeline_options.generate_picture_images + ): + page_no = element.prov[0].page_no + page = page_map.get(page_no) + if page is None: + _log.warning( + f"Page {page_no} not found in conversion result for picture element. Skipping image generation." + ) + continue + assert page.size is not None + assert page.image is not None + + crop_bbox = ( + element.prov[0] + .bbox.scaled(scale=scale) + .to_top_left_origin(page_height=page.size.height * scale) + ) + + cropped_im = page.image.crop(crop_bbox.as_tuple()) + element.image = ImageRef.from_pil( + cropped_im, dpi=int(72 * scale) + ) + + return conv_res + + def _turn_dt_into_doc(self, conv_res: ConversionResult) -> DoclingDocument: + """Convert DOCTAGS response format to DoclingDocument.""" + doctags_list = [] + image_list = [] + for page in conv_res.pages: + # Only include pages that have both an image and VLM predictions + if page.image and page.predictions.vlm_response: + predicted_doctags = page.predictions.vlm_response.text + image_list.append(page.image) + doctags_list.append(predicted_doctags) + + doctags_list_c = cast(List[Union[Path, str]], doctags_list) + image_list_c = cast(List[Union[Path, PILImage.Image]], image_list) + doctags_doc = DocTagsDocument.from_doctags_and_image_pairs( + doctags_list_c, image_list_c + ) + document = DoclingDocument.load_from_doctags(doctag_document=doctags_doc) + + return document + + @classmethod + def get_default_options(cls) -> ThreadedLayoutVlmPipelineOptions: + return ThreadedLayoutVlmPipelineOptions() + + @classmethod + def is_backend_supported(cls, backend: AbstractDocumentBackend) -> bool: + return isinstance(backend, PdfDocumentBackend) + + def _determine_status(self, conv_res: ConversionResult) -> ConversionStatus: + return conv_res.status + + def _unload(self, conv_res: ConversionResult) -> None: + for p in conv_res.pages: + if p._backend is not None: + p._backend.unload() + if conv_res.input._backend: + conv_res.input._backend.unload() diff --git a/docling/models/api_vlm_model.py b/docling/models/api_vlm_model.py index 2c9a1f9a..d33130b4 100644 --- a/docling/models/api_vlm_model.py +++ b/docling/models/api_vlm_model.py @@ -1,13 +1,15 @@ from collections.abc import Iterable from concurrent.futures import ThreadPoolExecutor +from typing import Union -from transformers import StoppingCriteria +import numpy as np +from PIL.Image import Image from docling.datamodel.base_models import Page, VlmPrediction, VlmStopReason from docling.datamodel.document import ConversionResult from docling.datamodel.pipeline_options_vlm_model import ApiVlmOptions from docling.exceptions import OperationNotAllowed -from docling.models.base_model import BasePageModel +from docling.models.base_model import BaseVlmPageModel from docling.models.utils.generation_utils import GenerationStopper from docling.utils.api_image_request import ( api_image_request, @@ -16,7 +18,10 @@ from docling.utils.api_image_request import ( from docling.utils.profiling import TimeRecorder -class ApiVlmModel(BasePageModel): +class ApiVlmModel(BaseVlmPageModel): + # Override the vlm_options type annotation from BaseVlmPageModel + vlm_options: ApiVlmOptions # type: ignore[assignment] + def __init__( self, enabled: bool, @@ -43,66 +48,133 @@ class ApiVlmModel(BasePageModel): def __call__( self, conv_res: ConversionResult, page_batch: Iterable[Page] ) -> Iterable[Page]: - def _vlm_request(page): + page_list = list(page_batch) + if not page_list: + return + + original_order = page_list[:] + valid_pages = [] + + for page in page_list: assert page._backend is not None - if not page._backend.is_valid(): - return page + if page._backend.is_valid(): + valid_pages.append(page) + # Process valid pages in batch + if valid_pages: with TimeRecorder(conv_res, "vlm"): - assert page.size is not None + # Prepare images and prompts for batch processing + images = [] + prompts = [] + pages_with_images = [] - hi_res_image = page.get_image( - scale=self.vlm_options.scale, max_size=self.vlm_options.max_size - ) - assert hi_res_image is not None - if hi_res_image and hi_res_image.mode != "RGB": - hi_res_image = hi_res_image.convert("RGB") + for page in valid_pages: + assert page.size is not None + hi_res_image = page.get_image( + scale=self.vlm_options.scale, max_size=self.vlm_options.max_size + ) - prompt = self.vlm_options.build_prompt(page.parsed_page) - stop_reason = VlmStopReason.UNSPECIFIED + # Only process pages with valid images + if hi_res_image is not None: + images.append(hi_res_image) + prompt = self._build_prompt_safe(page) + prompts.append(prompt) + pages_with_images.append(page) - if self.vlm_options.custom_stopping_criteria: - # Instantiate any GenerationStopper classes before passing to streaming - instantiated_stoppers = [] - for criteria in self.vlm_options.custom_stopping_criteria: - if isinstance(criteria, GenerationStopper): - instantiated_stoppers.append(criteria) - elif isinstance(criteria, type) and issubclass( - criteria, GenerationStopper - ): - instantiated_stoppers.append(criteria()) - # Skip non-GenerationStopper criteria (should have been caught in validation) - - # Streaming path with early abort support + # Use process_images for the actual inference + if images: # Only if we have valid images with TimeRecorder(conv_res, "vlm_inference"): - page_tags, num_tokens = api_image_request_streaming( - image=hi_res_image, - prompt=prompt, - url=self.vlm_options.url, - timeout=self.timeout, - headers=self.vlm_options.headers, - generation_stoppers=instantiated_stoppers, - **self.params, - ) - page_tags = self.vlm_options.decode_response(page_tags) + predictions = list(self.process_images(images, prompts)) + + # Attach results to pages + for page, prediction in zip(pages_with_images, predictions): + page.predictions.vlm_response = prediction + + # Yield pages preserving original order + for page in original_order: + yield page + + def process_images( + self, + image_batch: Iterable[Union[Image, np.ndarray]], + prompt: Union[str, list[str]], + ) -> Iterable[VlmPrediction]: + """Process raw images without page metadata.""" + images = list(image_batch) + + # Handle prompt parameter + if isinstance(prompt, str): + prompts = [prompt] * len(images) + elif isinstance(prompt, list): + if len(prompt) != len(images): + raise ValueError( + f"Prompt list length ({len(prompt)}) must match image count ({len(images)})" + ) + prompts = prompt + + def _process_single_image(image_prompt_pair): + image, prompt_text = image_prompt_pair + + # Convert numpy array to PIL Image if needed + if isinstance(image, np.ndarray): + if image.ndim == 3 and image.shape[2] in [3, 4]: + from PIL import Image as PILImage + + image = PILImage.fromarray(image.astype(np.uint8)) + elif image.ndim == 2: + from PIL import Image as PILImage + + image = PILImage.fromarray(image.astype(np.uint8), mode="L") else: - # Non-streaming fallback (existing behavior) - with TimeRecorder(conv_res, "vlm_inference"): - page_tags, num_tokens, stop_reason = api_image_request( - image=hi_res_image, - prompt=prompt, - url=self.vlm_options.url, - timeout=self.timeout, - headers=self.vlm_options.headers, - **self.params, - ) + raise ValueError(f"Unsupported numpy array shape: {image.shape}") - page_tags = self.vlm_options.decode_response(page_tags) + # Ensure image is in RGB mode + if image.mode != "RGB": + image = image.convert("RGB") - page.predictions.vlm_response = VlmPrediction( - text=page_tags, num_tokens=num_tokens, stop_reason=stop_reason + stop_reason = VlmStopReason.UNSPECIFIED + + if self.vlm_options.custom_stopping_criteria: + # Instantiate any GenerationStopper classes before passing to streaming + instantiated_stoppers = [] + for criteria in self.vlm_options.custom_stopping_criteria: + if isinstance(criteria, GenerationStopper): + instantiated_stoppers.append(criteria) + elif isinstance(criteria, type) and issubclass( + criteria, GenerationStopper + ): + instantiated_stoppers.append(criteria()) + # Skip non-GenerationStopper criteria (should have been caught in validation) + + # Streaming path with early abort support + page_tags, num_tokens = api_image_request_streaming( + image=image, + prompt=prompt_text, + url=self.vlm_options.url, + timeout=self.timeout, + headers=self.vlm_options.headers, + generation_stoppers=instantiated_stoppers, + **self.params, ) - return page + else: + # Non-streaming fallback (existing behavior) + page_tags, num_tokens, stop_reason = api_image_request( + image=image, + prompt=prompt_text, + url=self.vlm_options.url, + timeout=self.timeout, + headers=self.vlm_options.headers, + **self.params, + ) + + page_tags = self.vlm_options.decode_response(page_tags) + input_prompt = prompt_text if self.vlm_options.track_input_prompt else None + return VlmPrediction( + text=page_tags, + num_tokens=num_tokens, + stop_reason=stop_reason, + input_prompt=input_prompt, + ) with ThreadPoolExecutor(max_workers=self.concurrency) as executor: - yield from executor.map(_vlm_request, page_batch) + yield from executor.map(_process_single_image, zip(images, prompts)) diff --git a/docling/models/base_model.py b/docling/models/base_model.py index 45f1503f..c69b5018 100644 --- a/docling/models/base_model.py +++ b/docling/models/base_model.py @@ -76,6 +76,24 @@ class BaseVlmPageModel(BasePageModel, BaseVlmModel): vlm_options: InlineVlmOptions processor: Any + def _build_prompt_safe(self, page: Page) -> str: + """Build prompt with backward compatibility for user overrides. + + Tries to call build_prompt with _internal_page parameter (for layout-aware + pipelines). Falls back to basic call if user override doesn't accept it. + + Args: + page: The full Page object with layout predictions and parsed_page. + + Returns: + The formatted prompt string. + """ + try: + return self.vlm_options.build_prompt(page.parsed_page, _internal_page=page) + except TypeError: + # User override doesn't accept _internal_page - fall back to basic call + return self.vlm_options.build_prompt(page.parsed_page) + @abstractmethod def __call__( self, conv_res: ConversionResult, page_batch: Iterable[Page] diff --git a/docling/models/vlm_models_inline/hf_transformers_model.py b/docling/models/vlm_models_inline/hf_transformers_model.py index f9aefcb8..15b57dfa 100644 --- a/docling/models/vlm_models_inline/hf_transformers_model.py +++ b/docling/models/vlm_models_inline/hf_transformers_model.py @@ -176,14 +176,15 @@ class HuggingFaceTransformersVlmModel(BaseVlmPageModel, HuggingFaceModelDownload images.append(hi_res_image) # Define prompt structure - user_prompt = self.vlm_options.build_prompt(page.parsed_page) + user_prompt = self._build_prompt_safe(page) user_prompts.append(user_prompt) pages_with_images.append(page) # Use process_images for the actual inference if images: # Only if we have valid images - predictions = list(self.process_images(images, user_prompts)) + with TimeRecorder(conv_res, "vlm_inference"): + predictions = list(self.process_images(images, user_prompts)) # Attach results to pages for page, prediction in zip(pages_with_images, predictions): @@ -375,7 +376,10 @@ class HuggingFaceTransformersVlmModel(BaseVlmPageModel, HuggingFaceModelDownload f"for batch size {generated_ids.shape[0]}." ) - for text in decoded_texts: + for i, text in enumerate(decoded_texts): + input_prompt = ( + prompts[i] if self.vlm_options.track_input_prompt and prompts else None + ) # Apply decode_response to the output text decoded_text = self.vlm_options.decode_response(text) yield VlmPrediction( @@ -383,4 +387,5 @@ class HuggingFaceTransformersVlmModel(BaseVlmPageModel, HuggingFaceModelDownload generation_time=generation_time, num_tokens=num_tokens, stop_reason=VlmStopReason.UNSPECIFIED, + input_prompt=input_prompt, ) diff --git a/docling/models/vlm_models_inline/mlx_model.py b/docling/models/vlm_models_inline/mlx_model.py index 871c19ba..e6f0a2fb 100644 --- a/docling/models/vlm_models_inline/mlx_model.py +++ b/docling/models/vlm_models_inline/mlx_model.py @@ -134,10 +134,7 @@ class HuggingFaceMlxModel(BaseVlmPageModel, HuggingFaceModelDownloadMixin): images.append(hi_res_image) # Define prompt structure - if callable(self.vlm_options.prompt): - user_prompt = self.vlm_options.prompt(page.parsed_page) - else: - user_prompt = self.vlm_options.prompt + user_prompt = self._build_prompt_safe(page) user_prompts.append(user_prompt) pages_with_images.append(page) @@ -319,11 +316,15 @@ class HuggingFaceMlxModel(BaseVlmPageModel, HuggingFaceModelDownloadMixin): # Apply decode_response to the output before yielding decoded_output = self.vlm_options.decode_response(output) + input_prompt = ( + formatted_prompt if self.vlm_options.track_input_prompt else None + ) yield VlmPrediction( text=decoded_output, generation_time=generation_time, generated_tokens=tokens, num_tokens=len(tokens), stop_reason=VlmStopReason.UNSPECIFIED, + input_prompt=input_prompt, ) _log.debug("MLX model: Released global lock") diff --git a/docling/models/vlm_models_inline/vllm_model.py b/docling/models/vlm_models_inline/vllm_model.py index 9254f4d5..b4e4ece4 100644 --- a/docling/models/vlm_models_inline/vllm_model.py +++ b/docling/models/vlm_models_inline/vllm_model.py @@ -233,7 +233,7 @@ class VllmVlmModel(BaseVlmPageModel, HuggingFaceModelDownloadMixin): images.append(hi_res_image) # Define prompt structure - user_prompt = self.vlm_options.build_prompt(page.parsed_page) + user_prompt = self._build_prompt_safe(page) user_prompts.append(user_prompt) pages_with_images.append(page) @@ -314,19 +314,25 @@ class VllmVlmModel(BaseVlmPageModel, HuggingFaceModelDownloadMixin): num_tokens_within_batch = 0 # Emit predictions - for output in outputs: + for i, output in enumerate(outputs): text = output.outputs[0].text if output.outputs else "" stop_reason = ( VlmStopReason.END_OF_SEQUENCE if output.outputs[0].stop_reason else VlmStopReason.LENGTH ) - generated_tokens = ( - [VlmPredictionToken(token=int(t)) for t in output.outputs[0].token_ids] - if self.vlm_options.track_generated_tokens - else [] - ) + + generated_tokens = [ + VlmPredictionToken(token=int(t)) for t in output.outputs[0].token_ids + ] num_tokens = len(generated_tokens) + + if not self.vlm_options.track_generated_tokens: + generated_tokens = [] + + input_prompt = prompts[i] if self.vlm_options.track_input_prompt else None + _log.debug(f"VLM generated response carries input prompt: {input_prompt}") + decoded_text = self.vlm_options.decode_response(text) yield VlmPrediction( text=decoded_text, @@ -334,4 +340,5 @@ class VllmVlmModel(BaseVlmPageModel, HuggingFaceModelDownloadMixin): num_tokens=num_tokens, stop_reason=stop_reason, generated_tokens=generated_tokens, + input_prompt=input_prompt, ) diff --git a/docs/examples/demo_layout_vlm.py b/docs/examples/demo_layout_vlm.py new file mode 100644 index 00000000..18eb4aa1 --- /dev/null +++ b/docs/examples/demo_layout_vlm.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python3 +"""Demo script for the new ThreadedLayoutVlmPipeline. + +This script demonstrates the usage of the experimental ThreadedLayoutVlmPipeline pipeline +that combines layout model preprocessing with VLM processing in a threaded manner. +""" + +import argparse +import logging +import traceback +from pathlib import Path + +from docling.datamodel.base_models import ConversionStatus, InputFormat +from docling.datamodel.pipeline_options_vlm_model import ApiVlmOptions, ResponseFormat +from docling.datamodel.vlm_model_specs import GRANITEDOCLING_TRANSFORMERS +from docling.document_converter import DocumentConverter, PdfFormatOption +from docling.experimental.datamodel.threaded_layout_vlm_pipeline_options import ( + ThreadedLayoutVlmPipelineOptions, +) +from docling.experimental.pipeline.threaded_layout_vlm_pipeline import ( + ThreadedLayoutVlmPipeline, +) + +_log = logging.getLogger(__name__) + + +def _parse_args(): + parser = argparse.ArgumentParser( + description="Demo script for the experimental ThreadedLayoutVlmPipeline" + ) + parser.add_argument( + "--input-file", + type=str, + default="tests/data/pdf/code_and_formula.pdf", + help="Path to a PDF file", + ) + parser.add_argument( + "--output-dir", + type=str, + default="scratch/demo_layout_vlm/", + help="Output directory for converted files", + ) + return parser.parse_args() + + +# Can be used to read multiple pdf files under a folder +# def _get_docs(input_doc_path): +# """Yield DocumentStream objects from list of input document paths""" +# for path in input_doc_path: +# buf = BytesIO(path.read_bytes()) +# stream = DocumentStream(name=path.name, stream=buf) +# yield stream + + +def openai_compatible_vlm_options( + model: str, + prompt: str, + format: ResponseFormat, + hostname_and_port, + temperature: float = 0.7, + max_tokens: int = 4096, + api_key: str = "", + skip_special_tokens=False, +): + headers = {} + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + + options = ApiVlmOptions( + url=f"http://{hostname_and_port}/v1/chat/completions", # LM studio defaults to port 1234, VLLM to 8000 + params=dict( + model=model, + max_tokens=max_tokens, + skip_special_tokens=skip_special_tokens, # needed for VLLM + ), + headers=headers, + prompt=prompt, + timeout=90, + scale=2.0, + temperature=temperature, + response_format=format, + ) + + return options + + +def demo_threaded_layout_vlm_pipeline( + input_doc_path: Path, out_dir_layout_aware: Path, use_api_vlm: bool +): + """Demonstrate the threaded layout+VLM pipeline.""" + + vlm_options = GRANITEDOCLING_TRANSFORMERS.model_copy() + + if use_api_vlm: + vlm_options = openai_compatible_vlm_options( + model="granite-docling-258m-mlx", # For VLLM use "ibm-granite/granite-docling-258M" + hostname_and_port="localhost:1234", # LM studio defaults to port 1234, VLLM to 8000 + prompt="Convert this page to docling.", + format=ResponseFormat.DOCTAGS, + api_key="", + ) + vlm_options.track_input_prompt = True + + # Configure pipeline options + print("Configuring pipeline options...") + pipeline_options_layout_aware = ThreadedLayoutVlmPipelineOptions( + # VLM configuration - defaults to GRANITEDOCLING_TRANSFORMERS + vlm_options=vlm_options, + # Layout configuration - defaults to DOCLING_LAYOUT_HERON + # Batch sizes for parallel processing + layout_batch_size=2, + vlm_batch_size=1, + # Queue configuration + queue_max_size=10, + # Image processing + images_scale=2.0, + generate_page_images=True, + enable_remote_services=use_api_vlm, + ) + + # Create converter with the new pipeline + print("Initializing DocumentConverter (this may take a while - loading models)...") + doc_converter_layout_enhanced = DocumentConverter( + format_options={ + InputFormat.PDF: PdfFormatOption( + pipeline_cls=ThreadedLayoutVlmPipeline, + pipeline_options=pipeline_options_layout_aware, + ) + } + ) + + result_layout_aware = doc_converter_layout_enhanced.convert( + source=input_doc_path, raises_on_error=False + ) + + if result_layout_aware.status == ConversionStatus.FAILURE: + _log.error(f"Conversion failed: {result_layout_aware.status}") + + doc_filename = result_layout_aware.input.file.stem + result_layout_aware.document.save_as_json( + out_dir_layout_aware / f"{doc_filename}.json" + ) + + result_layout_aware.document.save_as_html( + out_dir_layout_aware / f"{doc_filename}.html" + ) + for page in result_layout_aware.pages: + _log.info("Page %s of VLM response:", page.page_no) + if page.predictions.vlm_response: + _log.info(page.predictions.vlm_response) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + try: + args = _parse_args() + _log.info( + f"Parsed arguments: input={args.input_file}, output={args.output_dir}" + ) + + input_path = Path(args.input_file) + + if not input_path.exists(): + raise FileNotFoundError(f"Input file does not exist: {input_path}") + + if input_path.suffix.lower() != ".pdf": + raise ValueError(f"Input file must be a PDF: {input_path}") + + out_dir_layout_aware = Path(args.output_dir) / "layout_aware/" + out_dir_layout_aware.mkdir(parents=True, exist_ok=True) + + use_api_vlm = False # Set to False to use inline VLM model + + demo_threaded_layout_vlm_pipeline(input_path, out_dir_layout_aware, use_api_vlm) + except Exception: + traceback.print_exc() + raise