Update threaded test

Signed-off-by: Ubuntu <ubuntu@ip-172-31-30-253.eu-central-1.compute.internal>
This commit is contained in:
Ubuntu 2025-07-18 15:18:21 +00:00
parent 988db91bff
commit 89acdb5db2

View File

@ -23,93 +23,104 @@ def test_threaded_pipeline_multiple_documents():
"tests/data/pdf/2206.01062.pdf", "tests/data/pdf/2206.01062.pdf",
"tests/data/pdf/2305.03393v1.pdf", "tests/data/pdf/2305.03393v1.pdf",
] ]
# test_files = [str(f) for f in Path("/home/ubuntu/datasets/flat_bench_set").rglob("*.pdf")]
# Threaded pipeline do_ts = False
threaded_converter = DocumentConverter( do_ocr = False
format_options={
InputFormat.PDF: PdfFormatOption(
pipeline_cls=ThreadedStandardPdfPipeline,
pipeline_options=ThreadedPdfPipelineOptions(
layout_batch_size=1,
table_batch_size=1,
ocr_batch_size=1,
batch_timeout_seconds=1.0,
do_table_structure=True,
do_ocr=True,
),
)
}
)
threaded_converter.initialize_pipeline(InputFormat.PDF) run_threaded = True
run_serial = True
# Test threaded pipeline if run_threaded:
threaded_results = [] # Threaded pipeline
start_time = time.perf_counter() threaded_converter = DocumentConverter(
for result in threaded_converter.convert_all(test_files, raises_on_error=True): format_options={
print( InputFormat.PDF: PdfFormatOption(
"Finished converting document with threaded pipeline:", pipeline_cls=ThreadedStandardPdfPipeline,
result.input.file.name, pipeline_options=ThreadedPdfPipelineOptions(
layout_batch_size=1,
table_batch_size=1,
ocr_batch_size=1,
batch_timeout_seconds=1.0,
do_table_structure=do_ts,
do_ocr=do_ocr,
),
)
}
) )
threaded_results.append(result)
threaded_time = time.perf_counter() - start_time
del threaded_converter threaded_converter.initialize_pipeline(InputFormat.PDF)
print("\nMulti-document Pipeline Comparison:") # Test threaded pipeline
print(f"Threaded pipeline: {threaded_time:.2f} seconds") threaded_results = []
start_time = time.perf_counter()
# Standard pipeline for result in threaded_converter.convert_all(test_files, raises_on_error=True):
standard_converter = DocumentConverter( print(
format_options={ "Finished converting document with threaded pipeline:",
InputFormat.PDF: PdfFormatOption( result.input.file.name,
pipeline_cls=StandardPdfPipeline,
pipeline_options=PdfPipelineOptions(
do_table_structure=True,
do_ocr=True,
),
) )
} threaded_results.append(result)
) threaded_time = time.perf_counter() - start_time
standard_converter.initialize_pipeline(InputFormat.PDF) del threaded_converter
# Test standard pipeline print(f"Threaded pipeline: {threaded_time:.2f} seconds")
standard_results = []
start_time = time.perf_counter() if run_serial:
for result in standard_converter.convert_all(test_files, raises_on_error=True): # Standard pipeline
print( standard_converter = DocumentConverter(
"Finished converting document with standard pipeline:", format_options={
result.input.file.name, InputFormat.PDF: PdfFormatOption(
pipeline_cls=StandardPdfPipeline,
pipeline_options=PdfPipelineOptions(
do_table_structure=do_ts,
do_ocr=do_ocr,
),
)
}
) )
standard_results.append(result)
standard_time = time.perf_counter() - start_time
del standard_converter standard_converter.initialize_pipeline(InputFormat.PDF)
print(f"Standard pipeline: {standard_time:.2f} seconds") # Test standard pipeline
print(f"Speedup: {standard_time / threaded_time:.2f}x") standard_results = []
start_time = time.perf_counter()
for result in standard_converter.convert_all(test_files, raises_on_error=True):
print(
"Finished converting document with standard pipeline:",
result.input.file.name,
)
standard_results.append(result)
standard_time = time.perf_counter() - start_time
del standard_converter
print(f"Standard pipeline: {standard_time:.2f} seconds")
# Verify results # Verify results
assert len(standard_results) == len(threaded_results) if run_threaded and run_serial:
for result in standard_results: assert len(standard_results) == len(threaded_results)
assert result.status == ConversionStatus.SUCCESS if run_serial:
for result in threaded_results: for result in standard_results:
assert result.status == ConversionStatus.SUCCESS assert result.status == ConversionStatus.SUCCESS
if run_threaded:
for result in threaded_results:
assert result.status == ConversionStatus.SUCCESS
# Basic content comparison if run_serial and run_threaded:
for i, (standard_result, threaded_result) in enumerate( # Basic content comparison
zip(standard_results, threaded_results) for i, (standard_result, threaded_result) in enumerate(
): zip(standard_results, threaded_results)
standard_doc = standard_result.document ):
threaded_doc = threaded_result.document standard_doc = standard_result.document
threaded_doc = threaded_result.document
assert len(standard_doc.pages) == len(threaded_doc.pages), ( assert len(standard_doc.pages) == len(threaded_doc.pages), (
f"Document {i} page count mismatch" f"Document {i} page count mismatch"
) )
assert len(standard_doc.texts) == len(threaded_doc.texts), ( assert len(standard_doc.texts) == len(threaded_doc.texts), (
f"Document {i} text count mismatch" f"Document {i} text count mismatch"
) )
def test_pipeline_comparison(): def test_pipeline_comparison():