refactor: address several input formats with same mime type

Signed-off-by: Cesar Berrospi Ramis <75900930+ceberam@users.noreply.github.com>
This commit is contained in:
Cesar Berrospi Ramis 2024-12-13 17:00:29 +01:00
parent c957901239
commit 8ee1ba455c
4 changed files with 175 additions and 25 deletions

View File

@ -89,7 +89,10 @@ class PatentUsptoDocumentBackend(DeclarativeDocumentBackend):
self.parser = PatentUsptoIce() self.parser = PatentUsptoIce()
elif "us-grant-025" in doctype_line: elif "us-grant-025" in doctype_line:
self.parser = PatentUsptoGrantV2() self.parser = PatentUsptoGrantV2()
elif "pap-v1" in doctype_line: elif all(
item in doctype_line
for item in ("patent-application-publication", "pap-v1")
):
self.parser = PatentUsptoAppV1() self.parser = PatentUsptoAppV1()
else: else:
self.parser = None self.parser = None

View File

@ -13,6 +13,7 @@ from docling_core.types.io import ( # DO ΝΟΤ REMOVE; explicitly exposed from
) )
from PIL.Image import Image from PIL.Image import Image
from pydantic import BaseModel, ConfigDict from pydantic import BaseModel, ConfigDict
from typing_extensions import Self, override
if TYPE_CHECKING: if TYPE_CHECKING:
from docling.backend.pdf_backend import PdfPageBackend from docling.backend.pdf_backend import PdfPageBackend
@ -28,15 +29,31 @@ class ConversionStatus(str, Enum):
class InputFormat(str, Enum): class InputFormat(str, Enum):
DOCX = "docx" """A document format supported by document backend parsers.
PPTX = "pptx"
HTML = "html" The field `is_custom` indicates whether the document format is more specific than
IMAGE = "image" the standard and content formats, typically defined by MIME types.
PDF = "pdf" """
ASCIIDOC = "asciidoc"
MD = "md" DOCX = "docx", False
XLSX = "xlsx" PPTX = "pptx", False
XML_USPTO = "uspto" HTML = "html", False
IMAGE = "image", False
PDF = "pdf", False
ASCIIDOC = "asciidoc", False
MD = "md", False
XLSX = "xlsx", False
XML_USPTO = "uspto", True
@override
def __new__(cls, value: str, _) -> Self:
obj = str.__new__(cls, [value])
obj._value_ = value
return obj
@override
def __init__(self, _, is_custom: bool) -> None:
self.is_custom: bool = is_custom
class OutputFormat(str, Enum): class OutputFormat(str, Enum):
@ -86,8 +103,10 @@ FormatToMimeType: Dict[InputFormat, List[str]] = {
InputFormat.XML_USPTO: ["application/xml", "text/plain"], InputFormat.XML_USPTO: ["application/xml", "text/plain"],
} }
MimeTypeToFormat = { MimeTypeToFormat: dict[str, list[InputFormat]] = {
mime: fmt for fmt, mimes in FormatToMimeType.items() for mime in mimes mime: [fmt for fmt in FormatToMimeType if mime in FormatToMimeType[fmt]]
for value in FormatToMimeType.values()
for mime in value
} }

View File

@ -3,7 +3,17 @@ import re
from enum import Enum from enum import Enum
from io import BytesIO from io import BytesIO
from pathlib import Path, PurePath from pathlib import Path, PurePath
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Type, Union from typing import (
TYPE_CHECKING,
Dict,
Iterable,
List,
Literal,
Optional,
Set,
Type,
Union,
)
import filetype import filetype
from docling_core.types.doc import ( from docling_core.types.doc import (
@ -235,7 +245,7 @@ class _DocumentConversionInput(BaseModel):
if isinstance(obj, Path): if isinstance(obj, Path):
yield InputDocument( yield InputDocument(
path_or_stream=obj, path_or_stream=obj,
format=format, format=format, # type: ignore[arg-type]
filename=obj.name, filename=obj.name,
limits=self.limits, limits=self.limits,
backend=backend, backend=backend,
@ -243,7 +253,7 @@ class _DocumentConversionInput(BaseModel):
elif isinstance(obj, DocumentStream): elif isinstance(obj, DocumentStream):
yield InputDocument( yield InputDocument(
path_or_stream=obj.stream, path_or_stream=obj.stream,
format=format, format=format, # type: ignore[arg-type]
filename=obj.name, filename=obj.name,
limits=self.limits, limits=self.limits,
backend=backend, backend=backend,
@ -251,15 +261,15 @@ class _DocumentConversionInput(BaseModel):
else: else:
raise RuntimeError(f"Unexpected obj type in iterator: {type(obj)}") raise RuntimeError(f"Unexpected obj type in iterator: {type(obj)}")
def _guess_format(self, obj: Union[Path, DocumentStream]): def _guess_format(self, obj: Union[Path, DocumentStream]) -> Optional[InputFormat]:
content = b"" # empty binary blob content = b"" # empty binary blob
format = None formats: list[InputFormat] = []
if isinstance(obj, Path): if isinstance(obj, Path):
mime = filetype.guess_mime(str(obj)) mime = filetype.guess_mime(str(obj))
if mime is None: if mime is None:
ext = obj.suffix[1:] ext = obj.suffix[1:]
mime = self._mime_from_extension(ext) mime = _DocumentConversionInput._mime_from_extension(ext)
if mime is None: # must guess from if mime is None: # must guess from
with obj.open("rb") as f: with obj.open("rb") as f:
content = f.read(1024) # Read first 1KB content = f.read(1024) # Read first 1KB
@ -274,15 +284,52 @@ class _DocumentConversionInput(BaseModel):
if ("." in obj.name and not obj.name.startswith(".")) if ("." in obj.name and not obj.name.startswith("."))
else "" else ""
) )
mime = self._mime_from_extension(ext) mime = _DocumentConversionInput._mime_from_extension(ext)
mime = mime or self._detect_html_xhtml(content) mime = mime or _DocumentConversionInput._detect_html_xhtml(content)
mime = mime or "text/plain" mime = mime or "text/plain"
formats = MimeTypeToFormat.get(mime, [])
if formats:
if len(formats) == 1 and not formats[0].is_custom:
return formats[0]
else: # ambiguity or custom cases
return _DocumentConversionInput._guess_from_content(
content, mime, formats
)
else:
return None
format = MimeTypeToFormat.get(mime) @staticmethod
return format def _guess_from_content(
content: bytes, mime: str, formats: list[InputFormat]
) -> Optional[InputFormat]:
"""Guess the input format of a document by checking part of its content."""
input_format: Optional[InputFormat] = None
content_str = content.decode("utf-8")
def _mime_from_extension(self, ext): if mime == "application/xml":
match_doctype = re.search(r"<!DOCTYPE [^>]+>", content_str)
if match_doctype:
xml_doctype = match_doctype.group()
if InputFormat.XML_USPTO in formats and any(
item in xml_doctype
for item in (
"us-patent-application-v4",
"us-patent-grant-v4",
"us-grant-025",
"patent-application-publication",
)
):
input_format = InputFormat.XML_USPTO
elif mime == "text/plain":
if InputFormat.XML_USPTO in formats and content_str.startswith("PATN\r\n"):
input_format = InputFormat.XML_USPTO
return input_format
@staticmethod
def _mime_from_extension(ext):
mime = None mime = None
if ext in FormatToExtensions[InputFormat.ASCIIDOC]: if ext in FormatToExtensions[InputFormat.ASCIIDOC]:
mime = FormatToMimeType[InputFormat.ASCIIDOC][0] mime = FormatToMimeType[InputFormat.ASCIIDOC][0]
@ -293,7 +340,19 @@ class _DocumentConversionInput(BaseModel):
return mime return mime
def _detect_html_xhtml(self, content): @staticmethod
def _detect_html_xhtml(
content: bytes,
) -> Optional[Literal["application/xhtml+xml", "application/xml", "text/html"]]:
"""Guess the mime type of an XHTML, HTML, or XML file from its content.
Args:
content: A short piece of a document from its beginning.
Returns:
The mime type of an XHTML, HTML, or XML file, or None if the content does
not match any of these formats.
"""
content_str = content.decode("ascii", errors="ignore").lower() content_str = content.decode("ascii", errors="ignore").lower()
# Remove XML comments # Remove XML comments
content_str = re.sub(r"<!--(.*?)-->", "", content_str, flags=re.DOTALL) content_str = re.sub(r"<!--(.*?)-->", "", content_str, flags=re.DOTALL)
@ -302,6 +361,8 @@ class _DocumentConversionInput(BaseModel):
if re.match(r"<\?xml", content_str): if re.match(r"<\?xml", content_str):
if "xhtml" in content_str[:1000]: if "xhtml" in content_str[:1000]:
return "application/xhtml+xml" return "application/xhtml+xml"
else:
return "application/xml"
if re.match(r"<!doctype\s+html|<html|<head|<body", content_str): if re.match(r"<!doctype\s+html|<html|<head|<body", content_str):
return "text/html" return "text/html"

View File

@ -3,7 +3,7 @@ from pathlib import Path
from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend
from docling.datamodel.base_models import DocumentStream, InputFormat from docling.datamodel.base_models import DocumentStream, InputFormat
from docling.datamodel.document import InputDocument from docling.datamodel.document import InputDocument, _DocumentConversionInput
def test_in_doc_from_valid_path(): def test_in_doc_from_valid_path():
@ -39,6 +39,73 @@ def test_in_doc_from_invalid_buf():
assert doc.valid == False assert doc.valid == False
def test_guess_format(tmp_path):
"""Test docling.datamodel.document._DocumentConversionInput.__guess_format"""
dci = _DocumentConversionInput(path_or_stream_iterator=[])
temp_dir = tmp_path / "test_guess_format"
temp_dir.mkdir()
# Valid PDF
buf = BytesIO(Path("./tests/data/2206.01062.pdf").open("rb").read())
stream = DocumentStream(name="my_doc.pdf", stream=buf)
assert dci._guess_format(stream) == InputFormat.PDF
doc_path = Path("./tests/data/2206.01062.pdf")
assert dci._guess_format(doc_path) == InputFormat.PDF
# Valid MS Office
buf = BytesIO(Path("./tests/data/docx/lorem_ipsum.docx").open("rb").read())
stream = DocumentStream(name="lorem_ipsum.docx", stream=buf)
assert dci._guess_format(stream) == InputFormat.DOCX
doc_path = Path("./tests/data/docx/lorem_ipsum.docx")
assert dci._guess_format(doc_path) == InputFormat.DOCX
# Valid HTML
buf = BytesIO(Path("./tests/data/html/wiki_duck.html").open("rb").read())
stream = DocumentStream(name="wiki_duck.html", stream=buf)
assert dci._guess_format(stream) == InputFormat.HTML
doc_path = Path("./tests/data/html/wiki_duck.html")
assert dci._guess_format(doc_path) == InputFormat.HTML
# Valid MD
buf = BytesIO(Path("./tests/data/md/wiki.md").open("rb").read())
stream = DocumentStream(name="wiki.md", stream=buf)
assert dci._guess_format(stream) == InputFormat.MD
doc_path = Path("./tests/data/md/wiki.md")
assert dci._guess_format(doc_path) == InputFormat.MD
# Valid XML USPTO patent
buf = BytesIO(Path("./tests/data/uspto/ipa20110039701.xml").open("rb").read())
stream = DocumentStream(name="ipa20110039701.xml", stream=buf)
assert dci._guess_format(stream) == InputFormat.XML_USPTO
doc_path = Path("./tests/data/uspto/ipa20110039701.xml")
assert dci._guess_format(doc_path) == InputFormat.XML_USPTO
buf = BytesIO(Path("./tests/data/uspto/pftaps057006474.txt").open("rb").read())
stream = DocumentStream(name="pftaps057006474.txt", stream=buf)
assert dci._guess_format(stream) == InputFormat.XML_USPTO
doc_path = Path("./tests/data/uspto/pftaps057006474.txt")
assert dci._guess_format(doc_path) == InputFormat.XML_USPTO
# Valid XML, non-supported flavor
xml_content = (
'<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE docling_test SYSTEM '
'"test.dtd"><docling>Docling parses documents</docling>'
)
doc_path = temp_dir / "docling_test.xml"
doc_path.write_text(xml_content, encoding="utf-8")
assert dci._guess_format(doc_path) == None
buf = BytesIO(Path(doc_path).open("rb").read())
stream = DocumentStream(name="docling_test.xml", stream=buf)
assert dci._guess_format(stream) == None
# Invalid USPTO patent (as plain text)
stream = DocumentStream(name="pftaps057006474.txt", stream=BytesIO(b"xyz"))
assert dci._guess_format(stream) == None
doc_path = temp_dir / "pftaps_wrong.txt"
doc_path.write_text("xyz", encoding="utf-8")
assert dci._guess_format(doc_path) == None
def _make_input_doc(path): def _make_input_doc(path):
in_doc = InputDocument( in_doc = InputDocument(
path_or_stream=path, path_or_stream=path,