Actor: Always output a zip

Signed-off-by: Adam Kliment <adam@netmilk.net>
This commit is contained in:
Adam Kliment 2025-03-13 09:37:39 +01:00
parent 7cd1f06868
commit 1c9d8e29b0
4 changed files with 54 additions and 471 deletions

View File

@ -7,7 +7,6 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
&& rm -rf /var/lib/apt/lists/* \
&& mkdir -p /build/bin /build/lib/node_modules \
&& cp /usr/local/bin/node /build/bin/
# Set working directory
WORKDIR /build
@ -37,8 +36,11 @@ ENV PYTHONUNBUFFERED=1 \
USER root
WORKDIR /app
# Create all required directories and fix permissions in a single layer
RUN mkdir -p /build-files \
# Install required tools and create directories in a single layer
RUN dnf install -y \
jq \
&& dnf clean all \
&& mkdir -p /build-files \
/tmp \
/tmp/actor-input \
/tmp/actor-output \
@ -68,6 +70,7 @@ RUN chmod +x .actor/actor.sh
# Copy the build files from builder
COPY --from=builder --chown=1000:1000 /build /build-files
# Switch to non-root user
USER 1000

View File

@ -75,8 +75,11 @@ cleanup_temp_environment() {
# Function to push data to Apify dataset
push_to_dataset() {
local document_url="$1"
local result_url="$2"
# Example usage: push_to_dataset "$RESULT_URL" "$OUTPUT_SIZE" "zip"
local result_url="$1"
local size="$2"
local format="$3"
# Find Apify CLI command
find_apify_cmd
@ -88,14 +91,14 @@ push_to_dataset() {
# Use the --no-update-notifier flag if available
if $apify_cmd --help | grep -q "\--no-update-notifier"; then
if $apify_cmd --no-update-notifier actor:push-data "{\"url\": \"${document_url}\", \"output_file\": \"${result_url}\", \"status\": \"success\"}"; then
if $apify_cmd --no-update-notifier actor:push-data "{\"output_file\": \"${result_url}\", \"format\": \"${format}\", \"size\": \"${size}\", \"status\": \"success\"}"; then
echo "Successfully added record to dataset"
else
echo "Warning: Failed to add record to dataset"
fi
else
# Fall back to regular command
if $apify_cmd actor:push-data "{\"url\": \"${document_url}\", \"output_file\": \"${result_url}\", \"status\": \"success\"}"; then
if $apify_cmd actor:push-data "{\"output_file\": \"${result_url}\", \"format\": \"${format}\", \"size\": \"${size}\", \"status\": \"success\"}"; then
echo "Successfully added record to dataset"
else
echo "Warning: Failed to add record to dataset"
@ -133,7 +136,7 @@ echo "Working directory: $(pwd)"
# --- Get input ---
echo "Getting Apify ActorInput"
echo "Getting Apify Actor Input"
INPUT=$(apify actor get-input 2>/dev/null)
# --- Setup tools ---
@ -293,72 +296,36 @@ echo "Input content:" >&2
echo "$INPUT" >&2 # Send the raw input to stderr for debugging
echo "$INPUT" # Send the clean JSON to stdout for processing
# Extract values from INPUT using Python
echo "Using Python to parse input..."
DOCUMENT_URL="$(echo "$INPUT" | python -c "import sys, json; print(json.load(sys.stdin).get('documentUrl', ''))")"
OUTPUT_FORMAT="$(echo "$INPUT" | python -c "import sys, json; print(json.load(sys.stdin).get('outputFormat', 'md'))")"
OCR_ENABLED="$(echo "$INPUT" | python -c "import sys, json; print(str(json.load(sys.stdin).get('ocr', True)).lower())")"
# Validate input schema should already enforce this, but double-check
if [ -z "$DOCUMENT_URL" ]; then
echo "ERROR: No document URL provided in input"
# Try to push data to Actor but don't exit if it fails
find_apify_cmd
apify_cmd="$FOUND_APIFY_CMD"
if [ -n "$apify_cmd" ]; then
echo "Reporting missing document URL to Actor storage..."
setup_temp_environment
if $apify_cmd actor:push-data "{\"status\": \"error\", \"error\": \"No document URL provided in input\"}" 2>&1; then
echo "Successfully pushed error message to Actor storage"
else
echo "Warning: Failed to push error message to Actor storage"
fi
cleanup_temp_environment
fi
# Use default document URL for testing instead of exiting
echo "Using a default document URL for testing: https://arxiv.org/pdf/2408.09869"
DOCUMENT_URL="https://arxiv.org/pdf/2408.09869"
fi
if [ -z "$OUTPUT_FORMAT" ]; then
echo "No output format specified, defaulting to 'md'"
OUTPUT_FORMAT="md"
fi
# Ensure OCR_ENABLED has a valid boolean value
if [ -z "$OCR_ENABLED" ]; then
echo "No OCR setting specified, defaulting to true"
OCR_ENABLED="true"
fi
echo "Input values: documentUrl=$DOCUMENT_URL, outputFormat=$OUTPUT_FORMAT, ocr=$OCR_ENABLED"
# Create the request JSON
REQUEST_JSON="{\"options\":{\"to_formats\":[\"$OUTPUT_FORMAT\"],\"ocr\":$OCR_ENABLED},\"http_sources\":[{\"url\":\"$DOCUMENT_URL\"}]}"
REQUEST_JSON=$(echo $INPUT | jq '.options += {"return_as_file": true}')
echo "Creating request JSON:" >&2
echo "$REQUEST_JSON" >&2
echo "$REQUEST_JSON" > "$API_DIR/request.json"
# Send the conversion request using our Python script
echo "Sending conversion request to docling-serve API..."
python "$TOOLS_DIR/docling_processor.py" \
--api-endpoint "$DOCLING_API_ENDPOINT" \
--request-json "$API_DIR/request.json" \
--output-dir "$API_DIR" \
--output-format "$OUTPUT_FORMAT"
PYTHON_EXIT_CODE=$?
# Send the conversion request using our Python script
#echo "Sending conversion request to docling-serve API..."
#python "$TOOLS_DIR/docling_processor.py" \
# --api-endpoint "$DOCLING_API_ENDPOINT" \
# --request-json "$API_DIR/request.json" \
# --output-dir "$API_DIR" \
# --output-format "$OUTPUT_FORMAT"
echo "Curl the Docling API"
curl -s -H "content-type: application/json" -X POST --data-binary @$API_DIR/request.json -o $API_DIR/output.zip $DOCLING_API_ENDPOINT
CURL_EXIT_CODE=$?
# --- Check for various potential output files ---
echo "Checking for output files..."
if [ -f "$API_DIR/output.$OUTPUT_FORMAT" ]; then
if [ -f "$API_DIR/output.zip" ]; then
echo "Conversion completed successfully! Output file found."
# Get content from the converted file
OUTPUT_SIZE=$(wc -c < "$API_DIR/output.$OUTPUT_FORMAT")
OUTPUT_SIZE=$(wc -c < "$API_DIR/output.zip")
echo "Output file found with size: $OUTPUT_SIZE bytes"
# Calculate the access URL for result display
@ -366,22 +333,14 @@ if [ -f "$API_DIR/output.$OUTPUT_FORMAT" ]; then
echo "=============================="
echo "PROCESSING COMPLETE!"
echo "Document URL: ${DOCUMENT_URL}"
echo "Output format: ${OUTPUT_FORMAT}"
echo "Output size: ${OUTPUT_SIZE} bytes"
echo "=============================="
# Set the output content type based on format
CONTENT_TYPE="text/plain"
case "$OUTPUT_FORMAT" in
md) CONTENT_TYPE="text/markdown" ;;
html) CONTENT_TYPE="text/html" ;;
json) CONTENT_TYPE="application/json" ;;
text) CONTENT_TYPE="text/plain" ;;
esac
CONTENT_TYPE="application/zip"
# Upload the document content using our function
upload_to_kvs "$API_DIR/output.$OUTPUT_FORMAT" "OUTPUT" "$CONTENT_TYPE" "Document content"
upload_to_kvs "$API_DIR/output.zip" "OUTPUT" "$CONTENT_TYPE" "Document content"
# Only proceed with dataset record if document upload succeeded
if [ $? -eq 0 ]; then
@ -389,10 +348,10 @@ if [ -f "$API_DIR/output.$OUTPUT_FORMAT" ]; then
echo "=============================="
# Push data to dataset
push_to_dataset "$DOCUMENT_URL" "$RESULT_URL"
push_to_dataset "$RESULT_URL" "$OUTPUT_SIZE" "zip"
fi
else
echo "ERROR: No converted output file found at $API_DIR/output.$OUTPUT_FORMAT"
echo "ERROR: No converted output file found at $API_DIR/output.zip"
# Create error metadata
ERROR_METADATA="{\"status\":\"error\",\"error\":\"No converted output file found\",\"documentUrl\":\"$DOCUMENT_URL\"}"

View File

@ -1,376 +0,0 @@
#!/usr/bin/env python3
"""
Document Processing Script for Docling-Serve API
This script handles the communication with the docling-serve API,
processes the conversion request, and saves the output to the specified location.
"""
import argparse
import json
import os
import sys
import time
import traceback
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
# Global constants
DEFAULT_TIMEOUT = 300 # 5 minutes
OUTPUT_FORMATS = ["md", "html", "json", "text"]
def setup_arg_parser() -> argparse.ArgumentParser:
"""Set up command line argument parser."""
parser = argparse.ArgumentParser(description="Process documents using docling-serve API")
parser.add_argument("--api-endpoint", required=True, help="Docling API endpoint URL")
parser.add_argument("--request-json", required=True, help="Path to JSON file with request data")
parser.add_argument("--output-dir", required=True, help="Directory to save output files")
parser.add_argument("--output-format", required=True, help="Desired output format")
parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help="Request timeout in seconds")
return parser
def load_request_data(json_path: str) -> Dict:
"""Load request data from JSON file."""
try:
with open(json_path, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f"Error loading request data: {e}")
sys.exit(1)
def save_response_diagnostics(response_text: str, status_code: int, headers: Dict, output_dir: str) -> None:
"""Save full response and headers for debugging."""
with open(os.path.join(output_dir, "full_response.txt"), 'w') as f:
f.write(f"Status code: {status_code}\n")
f.write(f"Headers: {headers}\n\n")
# Truncate very long responses
if len(response_text) > 10000:
f.write(f"Content: {response_text[:10000]}...")
else:
f.write(f"Content: {response_text}")
def save_response_json(response_text: str, output_dir: str) -> None:
"""Save raw response JSON."""
with open(os.path.join(output_dir, "response.json"), 'w') as f:
f.write(response_text)
def save_structure_info(data: Dict, output_dir: str) -> None:
"""Save detailed information about response structure."""
with open(os.path.join(output_dir, "response_structure.txt"), 'w') as f:
f.write(f'Response keys: {list(data.keys())}\n')
# Document content details
if 'document' in data:
doc = data['document'] or {}
f.write(f'Document keys: {list(doc.keys() if doc else [])}\n')
# Check specific content fields
for content_type in ['html_content', 'md_content', 'text_content', 'json_content']:
if content_type in doc and doc[content_type]:
f.write(f'{content_type.replace("_content", "").upper()} content length: {len(doc[content_type])}\n')
elif content_type in doc:
f.write(f'{content_type.replace("_content", "").upper()} content is present but empty or null\n')
# Output structure details
if 'outputs' in data:
f.write(f'Outputs count: {len(data["outputs"])}\n')
if data['outputs']:
output = data['outputs'][0]
f.write(f'First output keys: {list(output.keys())}\n')
if 'files' in output:
f.write(f'Files count: {len(output["files"])}\n')
if output['files']:
file_data = output['files'][0]
f.write(f'First file keys: {list(file_data.keys())}\n')
if 'content' in file_data:
content_length = len(file_data['content'])
f.write(f'Content length: {content_length}\n')
def extract_content_from_file_output(data: Dict, output_format: str, output_dir: str) -> bool:
"""Extract content from 'files' output format."""
if 'outputs' not in data or not data['outputs']:
print('No outputs found in response')
return False
output = data['outputs'][0]
if 'files' not in output or not output['files']:
print('No files found in output')
print(f'Available fields: {list(output.keys())}')
return False
file_data = output['files'][0]
if 'content' not in file_data or not file_data['content']:
if 'content' in file_data:
print('Content field exists but is empty')
else:
print('No content field in file data')
print(f'Available fields: {list(file_data.keys())}')
return False
# Content found, save it
content = file_data['content']
print(f'Found content in file (length: {len(content)})')
with open(os.path.join(output_dir, f"output.{output_format}"), 'w') as f:
f.write(content)
print('CONVERSION SUCCESS')
return True
def extract_content_from_document(data: Dict, output_format: str, output_dir: str) -> bool:
"""Extract content from 'document' response format."""
if 'document' not in data or data.get('status') != 'success':
print('No document field or success status found in response')
return False
document = data['document'] or {}
# Check available formats
available_formats = []
for fmt in ['html', 'md', 'text', 'json']:
content_field = f'{fmt}_content'
if content_field in document and document[content_field]:
available_formats.append((fmt, document[content_field]))
if not available_formats:
# Check for empty fields
empty_fields = []
for fmt in ['html', 'md', 'text', 'json']:
content_field = f'{fmt}_content'
if content_field in document and not document[content_field]:
empty_fields.append(content_field)
if empty_fields:
print(f'Found content fields but they are empty or null: {empty_fields}')
else:
print('No content fields found in document')
print(f'Available fields in document: {list(document.keys() if document else [])}')
return False
# Found available formats
print(f'Found {len(available_formats)} available formats: {[f[0] for f in available_formats]}')
# First try to find exact requested format
requested_format_match = next((f for f in available_formats if f[0] == output_format.lower()), None)
if requested_format_match:
format_type, content = requested_format_match
print(f'Found content in requested format {format_type} (length: {len(content)})')
else:
# If requested format not found, use the first available
format_type, content = available_formats[0]
print(f'Requested format not found, using alternative format {format_type} (length: {len(content)})')
# Save with the matched format's extension
with open(os.path.join(output_dir, f"output.{format_type}"), 'w') as f:
f.write(content)
# If we're using a different format than requested, also save with requested extension
if format_type != output_format.lower():
print(f'Saving content with requested extension {format_type} -> {output_format}')
with open(os.path.join(output_dir, f"output.{output_format}"), 'w') as f:
f.write(content)
print('CONVERSION SUCCESS')
return True
def process_success_response(response_text: str, output_format: str, output_dir: str) -> bool:
"""Process a successful response and extract document content."""
try:
# Save raw response
save_response_json(response_text, output_dir)
# Parse JSON
data = json.loads(response_text)
print('Successfully parsed response as JSON')
# Save detailed structure info
save_structure_info(data, output_dir)
# Try both response formats
if extract_content_from_file_output(data, output_format, output_dir):
return True
if extract_content_from_document(data, output_format, output_dir):
return True
# Check for metadata
if 'metadata' in data:
print('Metadata found in response, saving to file')
with open(os.path.join(output_dir, "metadata.json"), 'w') as f:
json.dump(data['metadata'], f, indent=2)
print('CONVERSION PARTIAL - Some data available but not complete')
return False
except Exception as json_error:
print(f'Failed to parse response as JSON: {json_error}')
traceback.print_exc()
# Save raw content as text if JSON parsing fails
with open(os.path.join(output_dir, "output.txt"), 'w') as f:
f.write(response_text)
print('Saved raw response as text file')
print('CONVERSION PARTIAL - Raw response saved')
return False
def process_requests_api(api_endpoint: str, request_data: Dict, output_format: str, output_dir: str, timeout: int) -> bool:
"""Process using requests library."""
try:
import requests
print('Using requests library for API call')
# Record start time for timing
start_time = time.time()
print(f'Starting conversion request at {time.strftime("%H:%M:%S")}')
response = requests.post(
api_endpoint,
json=request_data,
timeout=timeout
)
elapsed = time.time() - start_time
print(f'Conversion request completed in {elapsed:.2f} seconds')
print(f'Response status code: {response.status_code}')
# Save response diagnostics
save_response_diagnostics(response.text, response.status_code, dict(response.headers), output_dir)
if response.status_code == 200:
return process_success_response(response.text, output_format, output_dir)
else:
print(f'Error response: {response.text[:500]}')
print('CONVERSION FAILED')
return False
except Exception as e:
print(f'Error during requests API call: {e}')
traceback.print_exc()
print('CONVERSION FAILED')
return False
def process_urllib_api(api_endpoint: str, request_data: Dict, output_format: str, output_dir: str, timeout: int) -> bool:
"""Process using urllib as fallback."""
try:
import urllib.request
import urllib.error
print('Using urllib library for API call')
headers = {'Content-Type': 'application/json'}
req_data = json.dumps(request_data).encode('utf-8')
req = urllib.request.Request(
api_endpoint,
data=req_data,
headers=headers,
method='POST'
)
try:
start_time = time.time()
print(f'Starting conversion request at {time.strftime("%H:%M:%S")}')
with urllib.request.urlopen(req, timeout=timeout) as response:
elapsed = time.time() - start_time
print(f'Conversion request completed in {elapsed:.2f} seconds')
print(f'Response status: {response.status}')
response_text = response.read().decode('utf-8')
save_response_diagnostics(response_text, response.status, dict(response.headers), output_dir)
if response.status == 200:
return process_success_response(response_text, output_format, output_dir)
else:
print(f'Error status: {response.status}')
print('CONVERSION FAILED')
return False
except urllib.error.HTTPError as e:
print(f'HTTP Error: {e.code} - {e.reason}')
print(f'Response body: {e.read().decode("utf-8")[:500]}')
print('CONVERSION FAILED')
return False
except urllib.error.URLError as e:
print(f'URL Error: {e.reason}')
print('CONVERSION FAILED')
return False
except Exception as e:
print(f'Unexpected error during urllib request: {e}')
traceback.print_exc()
print('CONVERSION FAILED')
return False
except Exception as e:
print(f'Error setting up urllib: {e}')
traceback.print_exc()
print('CONVERSION FAILED')
return False
def process_document(api_endpoint: str, request_json_path: str, output_format: str,
output_dir: str, timeout: int) -> bool:
"""Main function to process a document through the docling-serve API."""
try:
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
# Load request data
request_data = load_request_data(request_json_path)
# Log request info
if 'http_sources' in request_data and request_data['http_sources']:
print(f'Request to convert URL: {request_data["http_sources"][0]["url"]}')
if 'options' in request_data:
options = request_data['options']
if 'to_formats' in options and options['to_formats']:
print(f'Output format: {options["to_formats"][0]}')
if 'ocr' in options:
print(f'OCR enabled: {options["ocr"]}')
# Try requests first, fall back to urllib
try:
return process_requests_api(api_endpoint, request_data, output_format, output_dir, timeout)
except ImportError:
return process_urllib_api(api_endpoint, request_data, output_format, output_dir, timeout)
except Exception as e:
print(f'Error during conversion: {e}')
traceback.print_exc()
print('CONVERSION FAILED')
return False
def main():
"""Main entry point."""
parser = setup_arg_parser()
args = parser.parse_args()
success = process_document(
api_endpoint=args.api_endpoint,
request_json_path=args.request_json,
output_format=args.output_format,
output_dir=args.output_dir,
timeout=args.timeout
)
# Exit with appropriate code
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()

View File

@ -4,27 +4,24 @@
"type": "object",
"schemaVersion": 1,
"properties": {
"documentUrl": {
"title": "Document URL",
"type": "string",
"description": "URL of the document to process. Supported formats: PDF, DOCX, PPTX, XLSX, HTML, MD, XML, images, and more.",
"prefill": "https://arxiv.org/pdf/2408.09869.pdf",
"editor": "textfield"
"http_sources": {
"title": "Document URLs",
"type": "array",
"description": "URLs of documents to process. Supported formats: PDF, DOCX, PPTX, XLSX, HTML, MD, XML, images, and more.",
"editor": "json",
"prefill": [
{ "url": "https://vancura.dev/assets/actor-test/facial-hairstyles-and-filtering-facepiece-respirators.pdf" }
]
},
"outputFormat": {
"title": "Output Format",
"type": "string",
"description": "Desired output format after processing the document.",
"enum": ["md", "json", "html", "text", "doctags"],
"default": "md",
"editor": "select"
},
"ocr": {
"title": "Enable OCR",
"type": "boolean",
"description": "If enabled, OCR will be applied to scanned documents for text recognition.",
"default": true
"options": {
"title": "Processing Options",
"type": "object",
"description": "Document processing configuration options",
"editor": "json",
"prefill": {
"to_formats": ["md"]
}
}
},
"required": ["documentUrl"]
"required": ["options", "http_sources"]
}