diff --git a/tests/test_threaded_pipeline.py b/tests/test_threaded_pipeline.py index b431c508..29796f65 100644 --- a/tests/test_threaded_pipeline.py +++ b/tests/test_threaded_pipeline.py @@ -23,93 +23,104 @@ def test_threaded_pipeline_multiple_documents(): "tests/data/pdf/2206.01062.pdf", "tests/data/pdf/2305.03393v1.pdf", ] + # test_files = [str(f) for f in Path("/home/ubuntu/datasets/flat_bench_set").rglob("*.pdf")] - # Threaded pipeline - threaded_converter = DocumentConverter( - format_options={ - InputFormat.PDF: PdfFormatOption( - pipeline_cls=ThreadedStandardPdfPipeline, - pipeline_options=ThreadedPdfPipelineOptions( - layout_batch_size=1, - table_batch_size=1, - ocr_batch_size=1, - batch_timeout_seconds=1.0, - do_table_structure=True, - do_ocr=True, - ), - ) - } - ) + do_ts = False + do_ocr = False - threaded_converter.initialize_pipeline(InputFormat.PDF) + run_threaded = True + run_serial = True - # Test threaded pipeline - threaded_results = [] - start_time = time.perf_counter() - for result in threaded_converter.convert_all(test_files, raises_on_error=True): - print( - "Finished converting document with threaded pipeline:", - result.input.file.name, + if run_threaded: + # Threaded pipeline + threaded_converter = DocumentConverter( + format_options={ + InputFormat.PDF: PdfFormatOption( + pipeline_cls=ThreadedStandardPdfPipeline, + pipeline_options=ThreadedPdfPipelineOptions( + layout_batch_size=1, + table_batch_size=1, + ocr_batch_size=1, + batch_timeout_seconds=1.0, + do_table_structure=do_ts, + do_ocr=do_ocr, + ), + ) + } ) - threaded_results.append(result) - threaded_time = time.perf_counter() - start_time - del threaded_converter + threaded_converter.initialize_pipeline(InputFormat.PDF) - print("\nMulti-document Pipeline Comparison:") - print(f"Threaded pipeline: {threaded_time:.2f} seconds") - - # Standard pipeline - standard_converter = DocumentConverter( - format_options={ - InputFormat.PDF: PdfFormatOption( - pipeline_cls=StandardPdfPipeline, - pipeline_options=PdfPipelineOptions( - do_table_structure=True, - do_ocr=True, - ), + # Test threaded pipeline + threaded_results = [] + start_time = time.perf_counter() + for result in threaded_converter.convert_all(test_files, raises_on_error=True): + print( + "Finished converting document with threaded pipeline:", + result.input.file.name, ) - } - ) + threaded_results.append(result) + threaded_time = time.perf_counter() - start_time - standard_converter.initialize_pipeline(InputFormat.PDF) + del threaded_converter - # Test standard pipeline - standard_results = [] - start_time = time.perf_counter() - for result in standard_converter.convert_all(test_files, raises_on_error=True): - print( - "Finished converting document with standard pipeline:", - result.input.file.name, + print(f"Threaded pipeline: {threaded_time:.2f} seconds") + + if run_serial: + # Standard pipeline + standard_converter = DocumentConverter( + format_options={ + InputFormat.PDF: PdfFormatOption( + pipeline_cls=StandardPdfPipeline, + pipeline_options=PdfPipelineOptions( + do_table_structure=do_ts, + do_ocr=do_ocr, + ), + ) + } ) - standard_results.append(result) - standard_time = time.perf_counter() - start_time - del standard_converter + standard_converter.initialize_pipeline(InputFormat.PDF) - print(f"Standard pipeline: {standard_time:.2f} seconds") - print(f"Speedup: {standard_time / threaded_time:.2f}x") + # Test standard pipeline + standard_results = [] + start_time = time.perf_counter() + for result in standard_converter.convert_all(test_files, raises_on_error=True): + print( + "Finished converting document with standard pipeline:", + result.input.file.name, + ) + standard_results.append(result) + standard_time = time.perf_counter() - start_time + + del standard_converter + + print(f"Standard pipeline: {standard_time:.2f} seconds") # Verify results - assert len(standard_results) == len(threaded_results) - for result in standard_results: - assert result.status == ConversionStatus.SUCCESS - for result in threaded_results: - assert result.status == ConversionStatus.SUCCESS + if run_threaded and run_serial: + assert len(standard_results) == len(threaded_results) + if run_serial: + for result in standard_results: + assert result.status == ConversionStatus.SUCCESS + if run_threaded: + for result in threaded_results: + assert result.status == ConversionStatus.SUCCESS - # Basic content comparison - for i, (standard_result, threaded_result) in enumerate( - zip(standard_results, threaded_results) - ): - standard_doc = standard_result.document - threaded_doc = threaded_result.document + if run_serial and run_threaded: + # Basic content comparison + for i, (standard_result, threaded_result) in enumerate( + zip(standard_results, threaded_results) + ): + standard_doc = standard_result.document + threaded_doc = threaded_result.document - assert len(standard_doc.pages) == len(threaded_doc.pages), ( - f"Document {i} page count mismatch" - ) - assert len(standard_doc.texts) == len(threaded_doc.texts), ( - f"Document {i} text count mismatch" - ) + assert len(standard_doc.pages) == len(threaded_doc.pages), ( + f"Document {i} page count mismatch" + ) + assert len(standard_doc.texts) == len(threaded_doc.texts), ( + f"Document {i} text count mismatch" + ) def test_pipeline_comparison():