19 Commits

Author SHA1 Message Date
afourney
cd6aa41361 Adjust warning filters and update dependencies (#1143)
Adjusts warning filters to be more contextual
Updates dependencies for magika and youtube-transcript-api
Updates the version to 0.1.0a5 in __about__.py
2025-03-19 22:09:14 -07:00
afourney
716f74dcb9 Consider anything with a charset as plain text-convertible. (#1142) 2025-03-19 20:46:35 -07:00
afourney
a93e0567e6 EPub Support. Adapted #123 to not use epublib. (#1131)
* Adapted #123 to not use epublib.
* Updated README.md
2025-03-17 07:48:15 -07:00
afourney
c5f70b904f Have magika read from the stream. (#1136) 2025-03-17 07:39:19 -07:00
afourney
53834fdd24 Investigate and silence warnings. (#1133) 2025-03-15 23:41:35 -07:00
afourney
5c565b7d79 Fix remaining mypy errors. (#1132) 2025-03-15 23:12:48 -07:00
afourney
a78857bd43 Added epub test file. (#1130) 2025-03-15 18:34:51 -07:00
afourney
09df7fe8df Small fixes for autogen integration. (#1124) 2025-03-12 19:18:11 -07:00
Adam Fourney
6a9f09b153 Updated Magika dependency. 2025-03-12 16:15:33 -07:00
afourney
0b815fb916 Bumping version to 0.1.0a2 (#1123) 2025-03-12 11:44:19 -07:00
Emanuele Meazzo
12620f1545 Handle not supported plot type in pptx (#1122)
* Handle not supported plot type in pptx
* Fixed formatting.
2025-03-12 11:26:23 -07:00
afourney
5f75e16d20 Refactored tests. (#1120)
* Refactored tests.
* Fixed CI errors, and included misc tests.
* Omit mskanji from streaminfo test.
* Omit mskanji from no hints test.
* Log results of debugging in comments (linked to Magika issue)
* Added docs as to when to use misc tests.
2025-03-12 11:08:06 -07:00
yushihang
75140a90e2 fix: correct f-string formatting in FileConversionException (#1121) 2025-03-12 10:15:09 -07:00
afourney
af1be36e0c Added CLI options for extension, mimetypes, and charset. (#1115) 2025-03-11 13:16:33 -07:00
Adam Fourney
2a2ccc86aa Added mimetypes to _rss_converter 2025-03-10 16:17:41 -07:00
Adam Fourney
2e51ba22e7 Enhance type guessing. 2025-03-10 16:05:41 -07:00
afourney
8f8e58c9bb Minimize guesses when guesses are compatible. (#1114)
* Minimize guesses when guesses are compatible.
2025-03-10 15:30:44 -07:00
afourney
8e73a325c6 Switch from puremagic to magika. (#1108) 2025-03-10 12:49:52 -07:00
Mohit Agarwal
2405f201af fix typo in well-known path list (#1109) 2025-03-08 19:32:44 -08:00
24 changed files with 1131 additions and 645 deletions

View File

@@ -14,7 +14,7 @@ MarkItDown is a lightweight Python utility for converting various files to Markd
At present, MarkItDown supports: At present, MarkItDown supports:
- PDF - PDF
- PowerPoint (reading in top-to-bottom, left-to-right order) - PowerPoint
- Word - Word
- Excel - Excel
- Images (EXIF metadata and OCR) - Images (EXIF metadata and OCR)
@@ -23,6 +23,7 @@ At present, MarkItDown supports:
- Text-based formats (CSV, JSON, XML) - Text-based formats (CSV, JSON, XML)
- ZIP files (iterates over contents) - ZIP files (iterates over contents)
- Youtube URLs - Youtube URLs
- EPubs
- ... and more! - ... and more!
## Why Markdown? ## Why Markdown?

View File

@@ -27,8 +27,7 @@ dependencies = [
"beautifulsoup4", "beautifulsoup4",
"requests", "requests",
"markdownify", "markdownify",
"puremagic", "magika~=0.6.1",
"pathvalidate",
"charset-normalizer", "charset-normalizer",
] ]
@@ -43,7 +42,7 @@ all = [
"olefile", "olefile",
"pydub", "pydub",
"SpeechRecognition", "SpeechRecognition",
"youtube-transcript-api", "youtube-transcript-api~=1.0.0",
"azure-ai-documentintelligence", "azure-ai-documentintelligence",
"azure-identity" "azure-identity"
] ]

View File

@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2024-present Adam Fourney <adamfo@microsoft.com> # SPDX-FileCopyrightText: 2024-present Adam Fourney <adamfo@microsoft.com>
# #
# SPDX-License-Identifier: MIT # SPDX-License-Identifier: MIT
__version__ = "0.1.0a1" __version__ = "0.1.0a5"

View File

@@ -3,10 +3,11 @@
# SPDX-License-Identifier: MIT # SPDX-License-Identifier: MIT
import argparse import argparse
import sys import sys
import codecs
from textwrap import dedent from textwrap import dedent
from importlib.metadata import entry_points from importlib.metadata import entry_points
from .__about__ import __version__ from .__about__ import __version__
from ._markitdown import MarkItDown, DocumentConverterResult from ._markitdown import MarkItDown, StreamInfo, DocumentConverterResult
def main(): def main():
@@ -58,6 +59,24 @@ def main():
help="Output file name. If not provided, output is written to stdout.", help="Output file name. If not provided, output is written to stdout.",
) )
parser.add_argument(
"-x",
"--extension",
help="Provide a hint about the file extension (e.g., when reading from stdin).",
)
parser.add_argument(
"-m",
"--mime-type",
help="Provide a hint about the file's MIME type.",
)
parser.add_argument(
"-c",
"--charset",
help="Provide a hint about the file's charset (e.g, UTF-8).",
)
parser.add_argument( parser.add_argument(
"-d", "-d",
"--use-docintel", "--use-docintel",
@@ -88,6 +107,48 @@ def main():
parser.add_argument("filename", nargs="?") parser.add_argument("filename", nargs="?")
args = parser.parse_args() args = parser.parse_args()
# Parse the extension hint
extension_hint = args.extension
if extension_hint is not None:
extension_hint = extension_hint.strip().lower()
if len(extension_hint) > 0:
if not extension_hint.startswith("."):
extension_hint = "." + extension_hint
else:
extension_hint = None
# Parse the mime type
mime_type_hint = args.mime_type
if mime_type_hint is not None:
mime_type_hint = mime_type_hint.strip()
if len(mime_type_hint) > 0:
if mime_type_hint.count("/") != 1:
_exit_with_error(f"Invalid MIME type: {mime_type_hint}")
else:
mime_type_hint = None
# Parse the charset
charset_hint = args.charset
if charset_hint is not None:
charset_hint = charset_hint.strip()
if len(charset_hint) > 0:
try:
charset_hint = codecs.lookup(charset_hint).name
except LookupError:
_exit_with_error(f"Invalid charset: {charset_hint}")
else:
charset_hint = None
stream_info = None
if (
extension_hint is not None
or mime_type_hint is not None
or charset_hint is not None
):
stream_info = StreamInfo(
extension=extension_hint, mimetype=mime_type_hint, charset=charset_hint
)
if args.list_plugins: if args.list_plugins:
# List installed plugins, then exit # List installed plugins, then exit
print("Installed MarkItDown 3rd-party Plugins:\n") print("Installed MarkItDown 3rd-party Plugins:\n")
@@ -107,11 +168,12 @@ def main():
if args.use_docintel: if args.use_docintel:
if args.endpoint is None: if args.endpoint is None:
raise ValueError( _exit_with_error(
"Document Intelligence Endpoint is required when using Document Intelligence." "Document Intelligence Endpoint is required when using Document Intelligence."
) )
elif args.filename is None: elif args.filename is None:
raise ValueError("Filename is required when using Document Intelligence.") _exit_with_error("Filename is required when using Document Intelligence.")
markitdown = MarkItDown( markitdown = MarkItDown(
enable_plugins=args.use_plugins, docintel_endpoint=args.endpoint enable_plugins=args.use_plugins, docintel_endpoint=args.endpoint
) )
@@ -119,9 +181,9 @@ def main():
markitdown = MarkItDown(enable_plugins=args.use_plugins) markitdown = MarkItDown(enable_plugins=args.use_plugins)
if args.filename is None: if args.filename is None:
result = markitdown.convert_stream(sys.stdin.buffer) result = markitdown.convert_stream(sys.stdin.buffer, stream_info=stream_info)
else: else:
result = markitdown.convert(args.filename) result = markitdown.convert(args.filename, stream_info=stream_info)
_handle_output(args, result) _handle_output(args, result)
@@ -135,5 +197,10 @@ def _handle_output(args, result: DocumentConverterResult):
print(result.text_content) print(result.text_content)
def _exit_with_error(message: str):
print(message)
sys.exit(1)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -69,7 +69,7 @@ class FileConversionException(MarkItDownException):
message = f"File conversion failed after {len(attempts)} attempts:\n" message = f"File conversion failed after {len(attempts)} attempts:\n"
for attempt in attempts: for attempt in attempts:
if attempt.exc_info is None: if attempt.exc_info is None:
message += " - {type(attempt.converter).__name__} provided no execution info." message += f" - {type(attempt.converter).__name__} provided no execution info."
else: else:
message += f" - {type(attempt.converter).__name__} threw {attempt.exc_info[0].__name__} with message: {attempt.exc_info[1]}\n" message += f" - {type(attempt.converter).__name__} threw {attempt.exc_info[0].__name__} with message: {attempt.exc_info[1]}\n"

View File

@@ -14,12 +14,12 @@ from typing import Any, List, Optional, Union, BinaryIO
from pathlib import Path from pathlib import Path
from urllib.parse import urlparse from urllib.parse import urlparse
from warnings import warn from warnings import warn
# File-format detection
import puremagic
import requests import requests
import magika
import charset_normalizer
import codecs
from ._stream_info import StreamInfo, _guess_stream_info_from_stream from ._stream_info import StreamInfo
from .converters import ( from .converters import (
PlainTextConverter, PlainTextConverter,
@@ -38,6 +38,7 @@ from .converters import (
AudioConverter, AudioConverter,
OutlookMsgConverter, OutlookMsgConverter,
ZipConverter, ZipConverter,
EpubConverter,
DocumentIntelligenceConverter, DocumentIntelligenceConverter,
) )
@@ -110,6 +111,8 @@ class MarkItDown:
else: else:
self._requests_session = requests_session self._requests_session = requests_session
self._magika = magika.Magika()
# TODO - remove these (see enable_builtins) # TODO - remove these (see enable_builtins)
self._llm_client: Any = None self._llm_client: Any = None
self._llm_model: Union[str | None] = None self._llm_model: Union[str | None] = None
@@ -156,7 +159,8 @@ class MarkItDown:
"/opt", "/opt",
"/opt/bin", "/opt/bin",
"/opt/local/bin", "/opt/local/bin",
"/opt/homebrew/bin" "C:\\Windows\\System32", "/opt/homebrew/bin",
"C:\\Windows\\System32",
"C:\\Program Files", "C:\\Program Files",
"C:\\Program Files (x86)", "C:\\Program Files (x86)",
] ]
@@ -188,6 +192,7 @@ class MarkItDown:
self.register_converter(IpynbConverter()) self.register_converter(IpynbConverter())
self.register_converter(PdfConverter()) self.register_converter(PdfConverter())
self.register_converter(OutlookMsgConverter()) self.register_converter(OutlookMsgConverter())
self.register_converter(EpubConverter())
# Register Document Intelligence converter at the top of the stack if endpoint is provided # Register Document Intelligence converter at the top of the stack if endpoint is provided
docintel_endpoint = kwargs.get("docintel_endpoint") docintel_endpoint = kwargs.get("docintel_endpoint")
@@ -241,7 +246,14 @@ class MarkItDown:
or source.startswith("https://") or source.startswith("https://")
or source.startswith("file://") or source.startswith("file://")
): ):
return self.convert_url(source, **kwargs) # Rename the url argument to mock_url
# (Deprecated -- use stream_info)
_kwargs = {k: v for k, v in kwargs.items()}
if "url" in _kwargs:
_kwargs["mock_url"] = _kwargs["url"]
del _kwargs["url"]
return self.convert_url(source, stream_info=stream_info, **_kwargs)
else: else:
return self.convert_local(source, stream_info=stream_info, **kwargs) return self.convert_local(source, stream_info=stream_info, **kwargs)
# Path object # Path object
@@ -249,14 +261,14 @@ class MarkItDown:
return self.convert_local(source, stream_info=stream_info, **kwargs) return self.convert_local(source, stream_info=stream_info, **kwargs)
# Request response # Request response
elif isinstance(source, requests.Response): elif isinstance(source, requests.Response):
return self.convert_response(source, **kwargs) return self.convert_response(source, stream_info=stream_info, **kwargs)
# Binary stream # Binary stream
elif ( elif (
hasattr(source, "read") hasattr(source, "read")
and callable(source.read) and callable(source.read)
and not isinstance(source, io.TextIOBase) and not isinstance(source, io.TextIOBase)
): ):
return self.convert_stream(source, **kwargs) return self.convert_stream(source, stream_info=stream_info, **kwargs)
else: else:
raise TypeError( raise TypeError(
f"Invalid source type: {type(source)}. Expected str, requests.Response, BinaryIO." f"Invalid source type: {type(source)}. Expected str, requests.Response, BinaryIO."
@@ -275,33 +287,28 @@ class MarkItDown:
path = str(path) path = str(path)
# Build a base StreamInfo object from which to start guesses # Build a base StreamInfo object from which to start guesses
base_stream_info = StreamInfo( base_guess = StreamInfo(
local_path=path, local_path=path,
extension=os.path.splitext(path)[1], extension=os.path.splitext(path)[1],
filename=os.path.basename(path), filename=os.path.basename(path),
) )
# Extend the base_stream_info with any additional info from the arguments # Extend the base_guess with any additional info from the arguments
if stream_info is not None: if stream_info is not None:
base_stream_info = base_stream_info.copy_and_update(stream_info) base_guess = base_guess.copy_and_update(stream_info)
if file_extension is not None: if file_extension is not None:
# Deprecated -- use stream_info # Deprecated -- use stream_info
base_stream_info = base_stream_info.copy_and_update( base_guess = base_guess.copy_and_update(extension=file_extension)
extension=file_extension
)
if url is not None: if url is not None:
# Deprecated -- use stream_info # Deprecated -- use stream_info
base_stream_info = base_stream_info.copy_and_update(url=url) base_guess = base_guess.copy_and_update(url=url)
with open(path, "rb") as fh: with open(path, "rb") as fh:
# Prepare a list of configurations to try, starting with the base_stream_info guesses = self._get_stream_info_guesses(
guesses: List[StreamInfo] = [base_stream_info] file_stream=fh, base_guess=base_guess
for guess in _guess_stream_info_from_stream( )
file_stream=fh, filename_hint=path
):
guesses.append(base_stream_info.copy_and_update(guess))
return self._convert(file_stream=fh, stream_info_guesses=guesses, **kwargs) return self._convert(file_stream=fh, stream_info_guesses=guesses, **kwargs)
def convert_stream( def convert_stream(
@@ -334,21 +341,6 @@ class MarkItDown:
assert base_guess is not None # for mypy assert base_guess is not None # for mypy
base_guess = base_guess.copy_and_update(url=url) base_guess = base_guess.copy_and_update(url=url)
# Append the base guess, if it's non-trivial
if base_guess is not None:
if base_guess.mimetype is not None or base_guess.extension is not None:
guesses.append(base_guess)
else:
# Create a base guess with no information
base_guess = StreamInfo()
# Create a placeholder filename to help with guessing
placeholder_filename = None
if base_guess.filename is not None:
placeholder_filename = base_guess.filename
elif base_guess.extension is not None:
placeholder_filename = "placeholder" + base_guess.extension
# Check if we have a seekable stream. If not, load the entire stream into memory. # Check if we have a seekable stream. If not, load the entire stream into memory.
if not stream.seekable(): if not stream.seekable():
buffer = io.BytesIO() buffer = io.BytesIO()
@@ -361,21 +353,32 @@ class MarkItDown:
stream = buffer stream = buffer
# Add guesses based on stream content # Add guesses based on stream content
for guess in _guess_stream_info_from_stream( guesses = self._get_stream_info_guesses(
file_stream=stream, filename_hint=placeholder_filename file_stream=stream, base_guess=base_guess or StreamInfo()
): )
guesses.append(base_guess.copy_and_update(guess))
# Perform the conversion
return self._convert(file_stream=stream, stream_info_guesses=guesses, **kwargs) return self._convert(file_stream=stream, stream_info_guesses=guesses, **kwargs)
def convert_url( def convert_url(
self, url: str, **kwargs: Any self,
url: str,
*,
stream_info: Optional[StreamInfo] = None,
file_extension: Optional[str] = None, # Deprecated -- use stream_info
mock_url: Optional[
str
] = None, # Mock the request as if it came from a different URL
**kwargs: Any,
) -> DocumentConverterResult: # TODO: fix kwargs type ) -> DocumentConverterResult: # TODO: fix kwargs type
# Send a HTTP request to the URL # Send a HTTP request to the URL
response = self._requests_session.get(url, stream=True) response = self._requests_session.get(url, stream=True)
response.raise_for_status() response.raise_for_status()
return self.convert_response(response, **kwargs) return self.convert_response(
response,
stream_info=stream_info,
file_extension=file_extension,
url=mock_url,
**kwargs,
)
def convert_response( def convert_response(
self, self,
@@ -437,31 +440,16 @@ class MarkItDown:
# Deprecated -- use stream_info # Deprecated -- use stream_info
base_guess = base_guess.copy_and_update(url=url) base_guess = base_guess.copy_and_update(url=url)
# Add the guess if its non-trivial
guesses: List[StreamInfo] = []
if base_guess.mimetype is not None or base_guess.extension is not None:
guesses.append(base_guess)
# Read into BytesIO # Read into BytesIO
buffer = io.BytesIO() buffer = io.BytesIO()
for chunk in response.iter_content(chunk_size=512): for chunk in response.iter_content(chunk_size=512):
buffer.write(chunk) buffer.write(chunk)
buffer.seek(0) buffer.seek(0)
# Create a placeholder filename to help with guessing
placeholder_filename = None
if base_guess.filename is not None:
placeholder_filename = base_guess.filename
elif base_guess.extension is not None:
placeholder_filename = "placeholder" + base_guess.extension
# Add guesses based on stream content
for guess in _guess_stream_info_from_stream(
file_stream=buffer, filename_hint=placeholder_filename
):
guesses.append(base_guess.copy_and_update(guess))
# Convert # Convert
guesses = self._get_stream_info_guesses(
file_stream=buffer, base_guess=base_guess
)
return self._convert(file_stream=buffer, stream_info_guesses=guesses, **kwargs) return self._convert(file_stream=buffer, stream_info_guesses=guesses, **kwargs)
def _convert( def _convert(
@@ -595,3 +583,115 @@ class MarkItDown:
self._converters.insert( self._converters.insert(
0, ConverterRegistration(converter=converter, priority=priority) 0, ConverterRegistration(converter=converter, priority=priority)
) )
def _get_stream_info_guesses(
self, file_stream: BinaryIO, base_guess: StreamInfo
) -> List[StreamInfo]:
"""
Given a base guess, attempt to guess or expand on the stream info using the stream content (via magika).
"""
guesses: List[StreamInfo] = []
# Enhance the base guess with information based on the extension or mimetype
enhanced_guess = base_guess.copy_and_update()
# If there's an extension and no mimetype, try to guess the mimetype
if base_guess.mimetype is None and base_guess.extension is not None:
_m, _ = mimetypes.guess_type(
"placeholder" + base_guess.extension, strict=False
)
if _m is not None:
enhanced_guess = enhanced_guess.copy_and_update(mimetype=_m)
# If there's a mimetype and no extension, try to guess the extension
if base_guess.mimetype is not None and base_guess.extension is None:
_e = mimetypes.guess_all_extensions(base_guess.mimetype, strict=False)
if len(_e) > 0:
enhanced_guess = enhanced_guess.copy_and_update(extension=_e[0])
# Call magika to guess from the stream
cur_pos = file_stream.tell()
try:
result = self._magika.identify_stream(file_stream)
if result.status == "ok" and result.prediction.output.label != "unknown":
# If it's text, also guess the charset
charset = None
if result.prediction.output.is_text:
# Read the first 4k to guess the charset
file_stream.seek(cur_pos)
stream_page = file_stream.read(4096)
charset_result = charset_normalizer.from_bytes(stream_page).best()
if charset_result is not None:
charset = self._normalize_charset(charset_result.encoding)
# Normalize the first extension listed
guessed_extension = None
if len(result.prediction.output.extensions) > 0:
guessed_extension = "." + result.prediction.output.extensions[0]
# Determine if the guess is compatible with the base guess
compatible = True
if (
base_guess.mimetype is not None
and base_guess.mimetype != result.prediction.output.mime_type
):
compatible = False
if (
base_guess.extension is not None
and base_guess.extension.lstrip(".")
not in result.prediction.output.extensions
):
compatible = False
if (
base_guess.charset is not None
and self._normalize_charset(base_guess.charset) != charset
):
compatible = False
if compatible:
# Add the compatible base guess
guesses.append(
StreamInfo(
mimetype=base_guess.mimetype
or result.prediction.output.mime_type,
extension=base_guess.extension or guessed_extension,
charset=base_guess.charset or charset,
filename=base_guess.filename,
local_path=base_guess.local_path,
url=base_guess.url,
)
)
else:
# The magika guess was incompatible with the base guess, so add both guesses
guesses.append(enhanced_guess)
guesses.append(
StreamInfo(
mimetype=result.prediction.output.mime_type,
extension=guessed_extension,
charset=charset,
filename=base_guess.filename,
local_path=base_guess.local_path,
url=base_guess.url,
)
)
else:
# There were no other guesses, so just add the base guess
guesses.append(enhanced_guess)
finally:
file_stream.seek(cur_pos)
return guesses
def _normalize_charset(self, charset: str | None) -> str | None:
"""
Normalize a charset string to a canonical form.
"""
if charset is None:
return None
try:
return codecs.lookup(charset).name
except LookupError:
return charset

View File

@@ -1,14 +1,5 @@
import puremagic
import mimetypes
import os
from dataclasses import dataclass, asdict from dataclasses import dataclass, asdict
from typing import Optional, BinaryIO, List, TypeVar, Type from typing import Optional
# Mimetype substitutions table
MIMETYPE_SUBSTITUTIONS = {
"application/excel": "application/vnd.ms-excel",
"application/mspowerpoint": "application/vnd.ms-powerpoint",
}
@dataclass(kw_only=True, frozen=True) @dataclass(kw_only=True, frozen=True)
@@ -39,84 +30,3 @@ class StreamInfo:
new_info.update(kwargs) new_info.update(kwargs)
return StreamInfo(**new_info) return StreamInfo(**new_info)
# Behavior subject to change.
# Do not rely on this outside of this module.
def _guess_stream_info_from_stream(
file_stream: BinaryIO,
*,
filename_hint: Optional[str] = None,
) -> List[StreamInfo]:
"""
Guess StreamInfo properties (mostly mimetype and extension) from a stream.
Args:
- stream: The stream to guess the StreamInfo from.
- filename_hint [Optional]: A filename hint to help with the guessing (may be a placeholder, and not actually be the file name)
Returns a list of StreamInfo objects in order of confidence.
"""
guesses: List[StreamInfo] = []
# Add a guess purely based on the filename hint
if filename_hint:
try:
# Requires Python 3.13+
mimetype, _ = mimetypes.guess_file_type(filename_hint) # type: ignore
except AttributeError:
mimetype, _ = mimetypes.guess_type(filename_hint)
if mimetype:
guesses.append(
StreamInfo(
mimetype=mimetype, extension=os.path.splitext(filename_hint)[1]
)
)
def _puremagic(
file_stream, filename_hint
) -> List[puremagic.main.PureMagicWithConfidence]:
"""Wrap guesses to handle exceptions."""
try:
return puremagic.magic_stream(file_stream, filename=filename_hint)
except puremagic.main.PureError as e:
return []
cur_pos = file_stream.tell()
type_guesses = _puremagic(file_stream, filename_hint=filename_hint)
if len(type_guesses) == 0:
# Fix for: https://github.com/microsoft/markitdown/issues/222
# If there are no guesses, then try again after trimming leading ASCII whitespaces.
# ASCII whitespace characters are those byte values in the sequence b' \t\n\r\x0b\f'
# (space, tab, newline, carriage return, vertical tab, form feed).
# Eat all the leading whitespace
file_stream.seek(cur_pos)
while True:
char = file_stream.read(1)
if not char: # End of file
break
if not char.isspace():
file_stream.seek(file_stream.tell() - 1)
break
# Try again
type_guesses = _puremagic(file_stream, filename_hint=filename_hint)
file_stream.seek(cur_pos)
# Convert and return the guesses
for guess in type_guesses:
kwargs: dict[str, str] = {}
if guess.extension:
kwargs["extension"] = guess.extension
if guess.mime_type:
kwargs["mimetype"] = MIMETYPE_SUBSTITUTIONS.get(
guess.mime_type, guess.mime_type
)
if len(kwargs) > 0:
# We don't add the filename_hint, because sometimes it's just a placeholder,
# and, in any case, doesn't add new information.
guesses.append(StreamInfo(**kwargs))
return guesses

View File

@@ -18,6 +18,7 @@ from ._audio_converter import AudioConverter
from ._outlook_msg_converter import OutlookMsgConverter from ._outlook_msg_converter import OutlookMsgConverter
from ._zip_converter import ZipConverter from ._zip_converter import ZipConverter
from ._doc_intel_converter import DocumentIntelligenceConverter from ._doc_intel_converter import DocumentIntelligenceConverter
from ._epub_converter import EpubConverter
__all__ = [ __all__ = [
"PlainTextConverter", "PlainTextConverter",
@@ -37,4 +38,5 @@ __all__ = [
"OutlookMsgConverter", "OutlookMsgConverter",
"ZipConverter", "ZipConverter",
"DocumentIntelligenceConverter", "DocumentIntelligenceConverter",
"EpubConverter",
] ]

View File

@@ -1,6 +1,7 @@
import io import io
import re import re
import base64 import base64
import binascii
from urllib.parse import parse_qs, urlparse from urllib.parse import parse_qs, urlparse
from typing import Any, BinaryIO, Optional from typing import Any, BinaryIO, Optional
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
@@ -60,6 +61,8 @@ class BingSerpConverter(DocumentConverter):
stream_info: StreamInfo, stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter **kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult: ) -> DocumentConverterResult:
assert stream_info.url is not None
# Parse the query parameters # Parse the query parameters
parsed_params = parse_qs(urlparse(stream_info.url).query) parsed_params = parse_qs(urlparse(stream_info.url).query)
query = parsed_params.get("q", [""])[0] query = parsed_params.get("q", [""])[0]
@@ -79,6 +82,9 @@ class BingSerpConverter(DocumentConverter):
_markdownify = _CustomMarkdownify() _markdownify = _CustomMarkdownify()
results = list() results = list()
for result in soup.find_all(class_="b_algo"): for result in soup.find_all(class_="b_algo"):
if not hasattr(result, "find_all"):
continue
# Rewrite redirect urls # Rewrite redirect urls
for a in result.find_all("a", href=True): for a in result.find_all("a", href=True):
parsed_href = urlparse(a["href"]) parsed_href = urlparse(a["href"])

View File

@@ -0,0 +1,147 @@
import os
import zipfile
import xml.dom.minidom as minidom
from typing import BinaryIO, Any, Dict, List
from ._html_converter import HtmlConverter
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo
ACCEPTED_MIME_TYPE_PREFIXES = [
"application/epub",
"application/epub+zip",
"application/x-epub+zip",
]
ACCEPTED_FILE_EXTENSIONS = [".epub"]
MIME_TYPE_MAPPING = {
".html": "text/html",
".xhtml": "application/xhtml+xml",
}
class EpubConverter(HtmlConverter):
"""
Converts EPUB files to Markdown. Style information (e.g.m headings) and tables are preserved where possible.
"""
def __init__(self):
super().__init__()
self._html_converter = HtmlConverter()
def accepts(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> bool:
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
if extension in ACCEPTED_FILE_EXTENSIONS:
return True
for prefix in ACCEPTED_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return True
return False
def convert(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
with zipfile.ZipFile(file_stream, "r") as z:
# Extracts metadata (title, authors, language, publisher, date, description, cover) from an EPUB file."""
# Locate content.opf
container_dom = minidom.parse(z.open("META-INF/container.xml"))
opf_path = container_dom.getElementsByTagName("rootfile")[0].getAttribute(
"full-path"
)
# Parse content.opf
opf_dom = minidom.parse(z.open(opf_path))
metadata: Dict[str, Any] = {
"title": self._get_text_from_node(opf_dom, "dc:title"),
"authors": self._get_all_texts_from_nodes(opf_dom, "dc:creator"),
"language": self._get_text_from_node(opf_dom, "dc:language"),
"publisher": self._get_text_from_node(opf_dom, "dc:publisher"),
"date": self._get_text_from_node(opf_dom, "dc:date"),
"description": self._get_text_from_node(opf_dom, "dc:description"),
"identifier": self._get_text_from_node(opf_dom, "dc:identifier"),
}
# Extract manifest items (ID → href mapping)
manifest = {
item.getAttribute("id"): item.getAttribute("href")
for item in opf_dom.getElementsByTagName("item")
}
# Extract spine order (ID refs)
spine_items = opf_dom.getElementsByTagName("itemref")
spine_order = [item.getAttribute("idref") for item in spine_items]
# Convert spine order to actual file paths
base_path = "/".join(
opf_path.split("/")[:-1]
) # Get base directory of content.opf
spine = [
f"{base_path}/{manifest[item_id]}" if base_path else manifest[item_id]
for item_id in spine_order
if item_id in manifest
]
# Extract and convert the content
markdown_content: List[str] = []
for file in spine:
if file in z.namelist():
with z.open(file) as f:
filename = os.path.basename(file)
extension = os.path.splitext(filename)[1].lower()
mimetype = MIME_TYPE_MAPPING.get(extension)
converted_content = self._html_converter.convert(
f,
StreamInfo(
mimetype=mimetype,
extension=extension,
filename=filename,
),
)
markdown_content.append(converted_content.markdown.strip())
# Format and add the metadata
metadata_markdown = []
for key, value in metadata.items():
if isinstance(value, list):
value = ", ".join(value)
if value:
metadata_markdown.append(f"**{key.capitalize()}:** {value}")
markdown_content.insert(0, "\n".join(metadata_markdown))
return DocumentConverterResult(
markdown="\n\n".join(markdown_content), title=metadata["title"]
)
def _get_text_from_node(self, dom: minidom.Document, tag_name: str) -> str | None:
"""Convenience function to extract a single occurrence of a tag (e.g., title)."""
texts = self._get_all_texts_from_nodes(dom, tag_name)
if len(texts) > 0:
return texts[0]
else:
return None
def _get_all_texts_from_nodes(
self, dom: minidom.Document, tag_name: str
) -> List[str]:
"""Helper function to extract all occurrences of a tag (e.g., multiple authors)."""
texts: List[str] = []
for node in dom.getElementsByTagName(tag_name):
if node.firstChild and hasattr(node.firstChild, "nodeValue"):
texts.append(node.firstChild.nodeValue.strip())
return texts

View File

@@ -9,7 +9,7 @@ from .._exceptions import MissingDependencyException, MISSING_DEPENDENCY_MESSAGE
_dependency_exc_info = None _dependency_exc_info = None
olefile = None olefile = None
try: try:
import olefile import olefile # type: ignore[no-redef]
except ImportError: except ImportError:
# Preserve the error and stack trace for later # Preserve the error and stack trace for later
_dependency_exc_info = sys.exc_info() _dependency_exc_info = sys.exc_info()
@@ -56,12 +56,13 @@ class OutlookMsgConverter(DocumentConverter):
# Brue force, check if it's an Outlook file # Brue force, check if it's an Outlook file
try: try:
msg = olefile.OleFileIO(file_stream) if olefile is not None:
toc = "\n".join([str(stream) for stream in msg.listdir()]) msg = olefile.OleFileIO(file_stream)
return ( toc = "\n".join([str(stream) for stream in msg.listdir()])
"__properties_version1.0" in toc return (
and "__recip_version1.0_#00000000" in toc "__properties_version1.0" in toc
) and "__recip_version1.0_#00000000" in toc
)
except Exception as e: except Exception as e:
pass pass
finally: finally:
@@ -89,7 +90,11 @@ class OutlookMsgConverter(DocumentConverter):
_dependency_exc_info[2] _dependency_exc_info[2]
) )
assert (
olefile is not None
) # If we made it this far, olefile should be available
msg = olefile.OleFileIO(file_stream) msg = olefile.OleFileIO(file_stream)
# Extract email metadata # Extract email metadata
md_content = "# Email Message\n\n" md_content = "# Email Message\n\n"
@@ -121,6 +126,7 @@ class OutlookMsgConverter(DocumentConverter):
def _get_stream_data(self, msg: Any, stream_path: str) -> Union[str, None]: def _get_stream_data(self, msg: Any, stream_path: str) -> Union[str, None]:
"""Helper to safely extract and decode stream data from the MSG file.""" """Helper to safely extract and decode stream data from the MSG file."""
assert olefile is not None
assert isinstance( assert isinstance(
msg, olefile.OleFileIO msg, olefile.OleFileIO
) # Ensure msg is of the correct type (type hinting is not possible with the optional olefile package) ) # Ensure msg is of the correct type (type hinting is not possible with the optional olefile package)

View File

@@ -17,12 +17,16 @@ except ImportError:
ACCEPTED_MIME_TYPE_PREFIXES = [ ACCEPTED_MIME_TYPE_PREFIXES = [
"text/", "text/",
"application/json", "application/json",
"application/markdown",
] ]
# Mimetypes to ignore (commonly confused extensions) ACCEPTED_FILE_EXTENSIONS = [
IGNORE_MIME_TYPE_PREFIXES = [ ".txt",
"text/vnd.in3d.spot", # .spo wich is confused with xls, doc, etc. ".text",
"text/vnd.graphviz", # .dot which is confused with xls, doc, etc. ".md",
".markdown",
".json",
".jsonl",
] ]
@@ -38,9 +42,14 @@ class PlainTextConverter(DocumentConverter):
mimetype = (stream_info.mimetype or "").lower() mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower() extension = (stream_info.extension or "").lower()
for prefix in IGNORE_MIME_TYPE_PREFIXES: # If we have a charset, we can safely assume it's text
if mimetype.startswith(prefix): # With Magika in the earlier stages, this handles most cases
return False if stream_info.charset is not None:
return True
# Otherwise, check the mimetype and extension
if extension in ACCEPTED_FILE_EXTENSIONS:
return True
for prefix in ACCEPTED_MIME_TYPE_PREFIXES: for prefix in ACCEPTED_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix): if mimetype.startswith(prefix):

View File

@@ -211,24 +211,32 @@ class PptxConverter(DocumentConverter):
return self._html_converter.convert_string(html_table).markdown.strip() + "\n" return self._html_converter.convert_string(html_table).markdown.strip() + "\n"
def _convert_chart_to_markdown(self, chart): def _convert_chart_to_markdown(self, chart):
md = "\n\n### Chart" try:
if chart.has_title: md = "\n\n### Chart"
md += f": {chart.chart_title.text_frame.text}" if chart.has_title:
md += "\n\n" md += f": {chart.chart_title.text_frame.text}"
data = [] md += "\n\n"
category_names = [c.label for c in chart.plots[0].categories] data = []
series_names = [s.name for s in chart.series] category_names = [c.label for c in chart.plots[0].categories]
data.append(["Category"] + series_names) series_names = [s.name for s in chart.series]
data.append(["Category"] + series_names)
for idx, category in enumerate(category_names): for idx, category in enumerate(category_names):
row = [category] row = [category]
for series in chart.series: for series in chart.series:
row.append(series.values[idx]) row.append(series.values[idx])
data.append(row) data.append(row)
markdown_table = [] markdown_table = []
for row in data: for row in data:
markdown_table.append("| " + " | ".join(map(str, row)) + " |") markdown_table.append("| " + " | ".join(map(str, row)) + " |")
header = markdown_table[0] header = markdown_table[0]
separator = "|" + "|".join(["---"] * len(data[0])) + "|" separator = "|" + "|".join(["---"] * len(data[0])) + "|"
return md + "\n".join([header, separator] + markdown_table[1:]) return md + "\n".join([header, separator] + markdown_table[1:])
except ValueError as e:
# Handle the specific error for unsupported chart types
if "unsupported plot type" in str(e):
return "\n\n[unsupported chart]\n\n"
except Exception:
# Catch any other exceptions that might occur
return "\n\n[unsupported chart]\n\n"

View File

@@ -8,7 +8,9 @@ from .._base_converter import DocumentConverter, DocumentConverterResult
PRECISE_MIME_TYPE_PREFIXES = [ PRECISE_MIME_TYPE_PREFIXES = [
"application/rss", "application/rss",
"application/rss+xml",
"application/atom", "application/atom",
"application/atom+xml",
] ]
PRECISE_FILE_EXTENSIONS = [".rss", ".atom"] PRECISE_FILE_EXTENSIONS = [".rss", ".atom"]
@@ -64,7 +66,7 @@ class RssConverter(DocumentConverter):
file_stream.seek(cur_pos) file_stream.seek(cur_pos)
return False return False
def _feed_type(self, doc: Any) -> str: def _feed_type(self, doc: Any) -> str | None:
if doc.getElementsByTagName("rss"): if doc.getElementsByTagName("rss"):
return "rss" return "rss"
elif doc.getElementsByTagName("feed"): elif doc.getElementsByTagName("feed"):
@@ -128,10 +130,10 @@ class RssConverter(DocumentConverter):
Returns None if the feed type is not recognized or something goes wrong. Returns None if the feed type is not recognized or something goes wrong.
""" """
root = doc.getElementsByTagName("rss")[0] root = doc.getElementsByTagName("rss")[0]
channel = root.getElementsByTagName("channel") channel_list = root.getElementsByTagName("channel")
if not channel: if not channel_list:
return None raise ValueError("No channel found in RSS feed")
channel = channel[0] channel = channel_list[0]
channel_title = self._get_data_by_tag_name(channel, "title") channel_title = self._get_data_by_tag_name(channel, "title")
channel_description = self._get_data_by_tag_name(channel, "description") channel_description = self._get_data_by_tag_name(channel, "description")
items = channel.getElementsByTagName("item") items = channel.getElementsByTagName("item")
@@ -139,8 +141,6 @@ class RssConverter(DocumentConverter):
md_text = f"# {channel_title}\n" md_text = f"# {channel_title}\n"
if channel_description: if channel_description:
md_text += f"{channel_description}\n" md_text += f"{channel_description}\n"
if not items:
items = []
for item in items: for item in items:
title = self._get_data_by_tag_name(item, "title") title = self._get_data_by_tag_name(item, "title")
description = self._get_data_by_tag_name(item, "description") description = self._get_data_by_tag_name(item, "description")
@@ -181,5 +181,6 @@ class RssConverter(DocumentConverter):
return None return None
fc = nodes[0].firstChild fc = nodes[0].firstChild
if fc: if fc:
return fc.data if hasattr(fc, "data"):
return fc.data
return None return None

View File

@@ -7,8 +7,14 @@ from .._exceptions import MissingDependencyException
# Save reporting of any exceptions for later # Save reporting of any exceptions for later
_dependency_exc_info = None _dependency_exc_info = None
try: try:
import speech_recognition as sr # Suppress some warnings on library import
import pydub import warnings
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=SyntaxWarning)
import speech_recognition as sr
import pydub
except ImportError: except ImportError:
# Preserve the error and stack trace for later # Preserve the error and stack trace for later
_dependency_exc_info = sys.exc_info() _dependency_exc_info = sys.exc_info()

View File

@@ -1,7 +1,7 @@
import io import io
import re import re
import bs4
from typing import Any, BinaryIO, Optional from typing import Any, BinaryIO, Optional
from bs4 import BeautifulSoup
from .._base_converter import DocumentConverter, DocumentConverterResult from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo from .._stream_info import StreamInfo
@@ -57,7 +57,7 @@ class WikipediaConverter(DocumentConverter):
) -> DocumentConverterResult: ) -> DocumentConverterResult:
# Parse the stream # Parse the stream
encoding = "utf-8" if stream_info.charset is None else stream_info.charset encoding = "utf-8" if stream_info.charset is None else stream_info.charset
soup = BeautifulSoup(file_stream, "html.parser", from_encoding=encoding) soup = bs4.BeautifulSoup(file_stream, "html.parser", from_encoding=encoding)
# Remove javascript and style blocks # Remove javascript and style blocks
for script in soup(["script", "style"]): for script in soup(["script", "style"]):
@@ -72,9 +72,8 @@ class WikipediaConverter(DocumentConverter):
if body_elm: if body_elm:
# What's the title # What's the title
if title_elm and len(title_elm) > 0: if title_elm and isinstance(title_elm, bs4.Tag):
main_title = title_elm.string # type: ignore main_title = title_elm.string
assert isinstance(main_title, str)
# Convert the page # Convert the page
webpage_text = f"# {main_title}\n\n" + _CustomMarkdownify().convert_soup( webpage_text = f"# {main_title}\n\n" + _CustomMarkdownify().convert_soup(

View File

@@ -3,17 +3,22 @@ import json
import time import time
import io import io
import re import re
import bs4
from typing import Any, BinaryIO, Optional, Dict, List, Union from typing import Any, BinaryIO, Optional, Dict, List, Union
from urllib.parse import parse_qs, urlparse, unquote from urllib.parse import parse_qs, urlparse, unquote
from bs4 import BeautifulSoup
from .._base_converter import DocumentConverter, DocumentConverterResult from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo from .._stream_info import StreamInfo
from ._markdownify import _CustomMarkdownify
# Optional YouTube transcription support # Optional YouTube transcription support
try: try:
from youtube_transcript_api import YouTubeTranscriptApi # Suppress some warnings on library import
import warnings
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=SyntaxWarning)
# Patch submitted upstream to fix the SyntaxWarning
from youtube_transcript_api import YouTubeTranscriptApi
IS_YOUTUBE_TRANSCRIPT_CAPABLE = True IS_YOUTUBE_TRANSCRIPT_CAPABLE = True
except ModuleNotFoundError: except ModuleNotFoundError:
@@ -72,21 +77,31 @@ class YouTubeConverter(DocumentConverter):
) -> DocumentConverterResult: ) -> DocumentConverterResult:
# Parse the stream # Parse the stream
encoding = "utf-8" if stream_info.charset is None else stream_info.charset encoding = "utf-8" if stream_info.charset is None else stream_info.charset
soup = BeautifulSoup(file_stream, "html.parser", from_encoding=encoding) soup = bs4.BeautifulSoup(file_stream, "html.parser", from_encoding=encoding)
# Read the meta tags # Read the meta tags
metadata: Dict[str, str] = {"title": soup.title.string} metadata: Dict[str, str] = {}
if soup.title and soup.title.string:
metadata["title"] = soup.title.string
for meta in soup(["meta"]): for meta in soup(["meta"]):
if not isinstance(meta, bs4.Tag):
continue
for a in meta.attrs: for a in meta.attrs:
if a in ["itemprop", "property", "name"]: if a in ["itemprop", "property", "name"]:
content = meta.get("content", "") key = str(meta.get(a, ""))
if content: # Only add non-empty content content = str(meta.get("content", ""))
metadata[meta[a]] = content if key and content: # Only add non-empty content
metadata[key] = content
break break
# Try reading the description # Try reading the description
try: try:
for script in soup(["script"]): for script in soup(["script"]):
if not isinstance(script, bs4.Tag):
continue
if not script.string: # Skip empty scripts if not script.string: # Skip empty scripts
continue continue
content = script.string content = script.string
@@ -132,6 +147,7 @@ class YouTubeConverter(DocumentConverter):
webpage_text += f"\n### Description\n{description}\n" webpage_text += f"\n### Description\n{description}\n"
if IS_YOUTUBE_TRANSCRIPT_CAPABLE: if IS_YOUTUBE_TRANSCRIPT_CAPABLE:
ytt_api = YouTubeTranscriptApi()
transcript_text = "" transcript_text = ""
parsed_url = urlparse(stream_info.url) # type: ignore parsed_url = urlparse(stream_info.url) # type: ignore
params = parse_qs(parsed_url.query) # type: ignore params = parse_qs(parsed_url.query) # type: ignore
@@ -143,7 +159,7 @@ class YouTubeConverter(DocumentConverter):
) )
# Retry the transcript fetching operation # Retry the transcript fetching operation
transcript = self._retry_operation( transcript = self._retry_operation(
lambda: YouTubeTranscriptApi.get_transcript( lambda: ytt_api.fetch(
video_id, languages=youtube_transcript_languages video_id, languages=youtube_transcript_languages
), ),
retries=3, # Retry 3 times retries=3, # Retry 3 times
@@ -151,17 +167,14 @@ class YouTubeConverter(DocumentConverter):
) )
if transcript: if transcript:
transcript_text = " ".join( transcript_text = " ".join(
[part["text"] for part in transcript] [part.text for part in transcript]
) # type: ignore ) # type: ignore
# Alternative formatting:
# formatter = TextFormatter()
# formatter.format_transcript(transcript)
except Exception as e: except Exception as e:
print(f"Error fetching transcript: {e}") print(f"Error fetching transcript: {e}")
if transcript_text: if transcript_text:
webpage_text += f"\n### Transcript\n{transcript_text}\n" webpage_text += f"\n### Transcript\n{transcript_text}\n"
title = title if title else soup.title.string title = title if title else (soup.title.string if soup.title else "")
assert isinstance(title, str) assert isinstance(title, str)
return DocumentConverterResult( return DocumentConverterResult(

View File

@@ -0,0 +1,232 @@
import dataclasses
from typing import List
@dataclasses.dataclass(frozen=True, kw_only=True)
class FileTestVector(object):
filename: str
mimetype: str | None
charset: str | None
url: str | None
must_include: List[str]
must_not_include: List[str]
GENERAL_TEST_VECTORS = [
FileTestVector(
filename="test.docx",
mimetype="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
charset=None,
url=None,
must_include=[
"314b0a30-5b04-470b-b9f7-eed2c2bec74a",
"49e168b7-d2ae-407f-a055-2167576f39a1",
"## d666f1f7-46cb-42bd-9a39-9a39cf2a509f",
"# Abstract",
"# Introduction",
"AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation",
],
must_not_include=[],
),
FileTestVector(
filename="test.xlsx",
mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
charset=None,
url=None,
must_include=[
"## 09060124-b5e7-4717-9d07-3c046eb",
"6ff4173b-42a5-4784-9b19-f49caff4d93d",
"affc7dad-52dc-4b98-9b5d-51e65d8a8ad0",
],
must_not_include=[],
),
FileTestVector(
filename="test.xls",
mimetype="application/vnd.ms-excel",
charset=None,
url=None,
must_include=[
"## 09060124-b5e7-4717-9d07-3c046eb",
"6ff4173b-42a5-4784-9b19-f49caff4d93d",
"affc7dad-52dc-4b98-9b5d-51e65d8a8ad0",
],
must_not_include=[],
),
FileTestVector(
filename="test.pptx",
mimetype="application/vnd.openxmlformats-officedocument.presentationml.presentation",
charset=None,
url=None,
must_include=[
"2cdda5c8-e50e-4db4-b5f0-9722a649f455",
"04191ea8-5c73-4215-a1d3-1cfb43aaaf12",
"44bf7d06-5e7a-4a40-a2e1-a2e42ef28c8a",
"1b92870d-e3b5-4e65-8153-919f4ff45592",
"AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation",
"a3f6004b-6f4f-4ea8-bee3-3741f4dc385f", # chart title
"2003", # chart value
],
must_not_include=[],
),
FileTestVector(
filename="test_outlook_msg.msg",
mimetype="application/vnd.ms-outlook",
charset=None,
url=None,
must_include=[
"# Email Message",
"**From:** test.sender@example.com",
"**To:** test.recipient@example.com",
"**Subject:** Test Email Message",
"## Content",
"This is the body of the test email message",
],
must_not_include=[],
),
FileTestVector(
filename="test.pdf",
mimetype="application/pdf",
charset=None,
url=None,
must_include=[
"While there is contemporaneous exploration of multi-agent approaches"
],
must_not_include=[],
),
FileTestVector(
filename="test_blog.html",
mimetype="text/html",
charset="utf-8",
url="https://microsoft.github.io/autogen/blog/2023/04/21/LLM-tuning-math",
must_include=[
"Large language models (LLMs) are powerful tools that can generate natural language texts for various applications, such as chatbots, summarization, translation, and more. GPT-4 is currently the state of the art LLM in the world. Is model selection irrelevant? What about inference parameters?",
"an example where high cost can easily prevent a generic complex",
],
must_not_include=[],
),
FileTestVector(
filename="test_wikipedia.html",
mimetype="text/html",
charset="utf-8",
url="https://en.wikipedia.org/wiki/Microsoft",
must_include=[
"Microsoft entered the operating system (OS) business in 1980 with its own version of [Unix]",
'Microsoft was founded by [Bill Gates](/wiki/Bill_Gates "Bill Gates")',
],
must_not_include=[
"You are encouraged to create an account and log in",
"154 languages",
"move to sidebar",
],
),
FileTestVector(
filename="test_serp.html",
mimetype="text/html",
charset="utf-8",
url="https://www.bing.com/search?q=microsoft+wikipedia",
must_include=[
"](https://en.wikipedia.org/wiki/Microsoft",
"Microsoft Corporation is **an American multinational corporation and technology company headquartered** in Redmond",
"19952007: Foray into the Web, Windows 95, Windows XP, and Xbox",
],
must_not_include=[
"https://www.bing.com/ck/a?!&&p=",
"data:image/svg+xml,%3Csvg%20width%3D",
],
),
FileTestVector(
filename="test_mskanji.csv",
mimetype="text/csv",
charset="cp932",
url=None,
must_include=[
"名前,年齢,住所",
"佐藤太郎,30,東京",
"三木英子,25,大阪",
"髙橋淳,35,名古屋",
],
must_not_include=[],
),
FileTestVector(
filename="test.json",
mimetype="application/json",
charset="ascii",
url=None,
must_include=[
"5b64c88c-b3c3-4510-bcb8-da0b200602d8",
"9700dc99-6685-40b4-9a3a-5e406dcb37f3",
],
must_not_include=[],
),
FileTestVector(
filename="test_rss.xml",
mimetype="text/xml",
charset="utf-8",
url=None,
must_include=[
"# The Official Microsoft Blog",
"## Ignite 2024: Why nearly 70% of the Fortune 500 now use Microsoft 365 Copilot",
"In the case of AI, it is absolutely true that the industry is moving incredibly fast",
],
must_not_include=["<rss", "<feed"],
),
FileTestVector(
filename="test_notebook.ipynb",
mimetype="application/json",
charset="ascii",
url=None,
must_include=[
"# Test Notebook",
"```python",
'print("markitdown")',
"```",
"## Code Cell Below",
],
must_not_include=[
"nbformat",
"nbformat_minor",
],
),
FileTestVector(
filename="test_files.zip",
mimetype="application/zip",
charset=None,
url=None,
must_include=[
"314b0a30-5b04-470b-b9f7-eed2c2bec74a",
"49e168b7-d2ae-407f-a055-2167576f39a1",
"## d666f1f7-46cb-42bd-9a39-9a39cf2a509f",
"# Abstract",
"# Introduction",
"AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation",
"2cdda5c8-e50e-4db4-b5f0-9722a649f455",
"04191ea8-5c73-4215-a1d3-1cfb43aaaf12",
"44bf7d06-5e7a-4a40-a2e1-a2e42ef28c8a",
"1b92870d-e3b5-4e65-8153-919f4ff45592",
"## 09060124-b5e7-4717-9d07-3c046eb",
"6ff4173b-42a5-4784-9b19-f49caff4d93d",
"affc7dad-52dc-4b98-9b5d-51e65d8a8ad0",
"Microsoft entered the operating system (OS) business in 1980 with its own version of [Unix]",
'Microsoft was founded by [Bill Gates](/wiki/Bill_Gates "Bill Gates")',
],
must_not_include=[],
),
FileTestVector(
filename="test.epub",
mimetype="application/epub+zip",
charset=None,
url=None,
must_include=[
"**Authors:** Test Author",
"A test EPUB document for MarkItDown testing",
"# Chapter 1: Test Content",
"This is a **test** paragraph with some formatting",
"* A bullet point",
"* Another point",
"# Chapter 2: More Content",
"*different* style",
"> This is a blockquote for testing",
],
must_not_include=[],
),
]

View File

@@ -1,119 +0,0 @@
#!/usr/bin/env python3 -m pytest
import os
import subprocess
import pytest
from markitdown import __version__
try:
from .test_markitdown import TEST_FILES_DIR, DOCX_TEST_STRINGS
except ImportError:
from test_markitdown import TEST_FILES_DIR, DOCX_TEST_STRINGS # type: ignore
@pytest.fixture(scope="session")
def shared_tmp_dir(tmp_path_factory):
return tmp_path_factory.mktemp("pytest_tmp")
def test_version(shared_tmp_dir) -> None:
result = subprocess.run(
["python", "-m", "markitdown", "--version"], capture_output=True, text=True
)
assert result.returncode == 0, f"CLI exited with error: {result.stderr}"
assert __version__ in result.stdout, f"Version not found in output: {result.stdout}"
def test_invalid_flag(shared_tmp_dir) -> None:
result = subprocess.run(
["python", "-m", "markitdown", "--foobar"], capture_output=True, text=True
)
assert result.returncode != 0, f"CLI exited with error: {result.stderr}"
assert (
"unrecognized arguments" in result.stderr
), f"Expected 'unrecognized arguments' to appear in STDERR"
assert "SYNTAX" in result.stderr, f"Expected 'SYNTAX' to appear in STDERR"
def test_output_to_stdout(shared_tmp_dir) -> None:
# DOC X
result = subprocess.run(
["python", "-m", "markitdown", os.path.join(TEST_FILES_DIR, "test.docx")],
capture_output=True,
text=True,
)
assert result.returncode == 0, f"CLI exited with error: {result.stderr}"
for test_string in DOCX_TEST_STRINGS:
assert (
test_string in result.stdout
), f"Expected string not found in output: {test_string}"
def test_output_to_file(shared_tmp_dir) -> None:
# DOC X, flag -o at the end
docx_output_file_1 = os.path.join(shared_tmp_dir, "test_docx_1.md")
result = subprocess.run(
[
"python",
"-m",
"markitdown",
os.path.join(TEST_FILES_DIR, "test.docx"),
"-o",
docx_output_file_1,
],
capture_output=True,
text=True,
)
assert result.returncode == 0, f"CLI exited with error: {result.stderr}"
assert os.path.exists(
docx_output_file_1
), f"Output file not created: {docx_output_file_1}"
with open(docx_output_file_1, "r") as f:
output = f.read()
for test_string in DOCX_TEST_STRINGS:
assert (
test_string in output
), f"Expected string not found in output: {test_string}"
# DOC X, flag -o at the beginning
docx_output_file_2 = os.path.join(shared_tmp_dir, "test_docx_2.md")
result = subprocess.run(
[
"python",
"-m",
"markitdown",
"-o",
docx_output_file_2,
os.path.join(TEST_FILES_DIR, "test.docx"),
],
capture_output=True,
text=True,
)
assert result.returncode == 0, f"CLI exited with error: {result.stderr}"
assert os.path.exists(
docx_output_file_2
), f"Output file not created: {docx_output_file_2}"
with open(docx_output_file_2, "r") as f:
output = f.read()
for test_string in DOCX_TEST_STRINGS:
assert (
test_string in output
), f"Expected string not found in output: {test_string}"
if __name__ == "__main__":
"""Runs this file's tests from the command line."""
import tempfile
with tempfile.TemporaryDirectory() as tmp_dir:
test_version(tmp_dir)
test_invalid_flag(tmp_dir)
test_output_to_stdout(tmp_dir)
test_output_to_file(tmp_dir)
print("All tests passed!")

View File

@@ -0,0 +1,35 @@
#!/usr/bin/env python3 -m pytest
import subprocess
import pytest
from markitdown import __version__
# This file contains CLI tests that are not directly tested by the FileTestVectors.
# This includes things like help messages, version numbers, and invalid flags.
def test_version() -> None:
result = subprocess.run(
["python", "-m", "markitdown", "--version"], capture_output=True, text=True
)
assert result.returncode == 0, f"CLI exited with error: {result.stderr}"
assert __version__ in result.stdout, f"Version not found in output: {result.stdout}"
def test_invalid_flag() -> None:
result = subprocess.run(
["python", "-m", "markitdown", "--foobar"], capture_output=True, text=True
)
assert result.returncode != 0, f"CLI exited with error: {result.stderr}"
assert (
"unrecognized arguments" in result.stderr
), f"Expected 'unrecognized arguments' to appear in STDERR"
assert "SYNTAX" in result.stderr, f"Expected 'SYNTAX' to appear in STDERR"
if __name__ == "__main__":
"""Runs this file's tests from the command line."""
test_version()
test_invalid_flag()
print("All tests passed!")

View File

@@ -0,0 +1,172 @@
#!/usr/bin/env python3 -m pytest
import os
import time
import pytest
import subprocess
import locale
from typing import List
if __name__ == "__main__":
from _test_vectors import GENERAL_TEST_VECTORS, FileTestVector
else:
from ._test_vectors import GENERAL_TEST_VECTORS, FileTestVector
from markitdown import (
MarkItDown,
UnsupportedFormatException,
FileConversionException,
StreamInfo,
)
skip_remote = (
True if os.environ.get("GITHUB_ACTIONS") else False
) # Don't run these tests in CI
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "test_files")
TEST_FILES_URL = "https://raw.githubusercontent.com/microsoft/markitdown/refs/heads/main/packages/markitdown/tests/test_files"
# Prepare CLI test vectors (remove vectors that require mockig the url)
CLI_TEST_VECTORS: List[FileTestVector] = []
for test_vector in GENERAL_TEST_VECTORS:
if test_vector.url is not None:
continue
CLI_TEST_VECTORS.append(test_vector)
@pytest.fixture(scope="session")
def shared_tmp_dir(tmp_path_factory):
return tmp_path_factory.mktemp("pytest_tmp")
@pytest.mark.parametrize("test_vector", CLI_TEST_VECTORS)
def test_output_to_stdout(shared_tmp_dir, test_vector) -> None:
"""Test that the CLI outputs to stdout correctly."""
result = subprocess.run(
[
"python",
"-m",
"markitdown",
os.path.join(TEST_FILES_DIR, test_vector.filename),
],
capture_output=True,
text=True,
)
assert result.returncode == 0, f"CLI exited with error: {result.stderr}"
for test_string in test_vector.must_include:
assert test_string in result.stdout
for test_string in test_vector.must_not_include:
assert test_string not in result.stdout
@pytest.mark.parametrize("test_vector", CLI_TEST_VECTORS)
def test_output_to_file(shared_tmp_dir, test_vector) -> None:
"""Test that the CLI outputs to a file correctly."""
output_file = os.path.join(shared_tmp_dir, test_vector.filename + ".output")
result = subprocess.run(
[
"python",
"-m",
"markitdown",
"-o",
output_file,
os.path.join(TEST_FILES_DIR, test_vector.filename),
],
capture_output=True,
text=True,
)
assert result.returncode == 0, f"CLI exited with error: {result.stderr}"
assert os.path.exists(output_file), f"Output file not created: {output_file}"
with open(output_file, "r") as f:
output_data = f.read()
for test_string in test_vector.must_include:
assert test_string in output_data
for test_string in test_vector.must_not_include:
assert test_string not in output_data
os.remove(output_file)
assert not os.path.exists(output_file), f"Output file not deleted: {output_file}"
@pytest.mark.parametrize("test_vector", CLI_TEST_VECTORS)
def test_input_from_stdin_without_hints(shared_tmp_dir, test_vector) -> None:
"""Test that the CLI readds from stdin correctly."""
test_input = b""
with open(os.path.join(TEST_FILES_DIR, test_vector.filename), "rb") as stream:
test_input = stream.read()
result = subprocess.run(
[
"python",
"-m",
"markitdown",
os.path.join(TEST_FILES_DIR, test_vector.filename),
],
input=test_input,
capture_output=True,
text=False,
)
stdout = result.stdout.decode(locale.getpreferredencoding())
assert (
result.returncode == 0
), f"CLI exited with error: {result.stderr.decode('utf-8')}"
for test_string in test_vector.must_include:
assert test_string in stdout
for test_string in test_vector.must_not_include:
assert test_string not in stdout
@pytest.mark.skipif(
skip_remote,
reason="do not run tests that query external urls",
)
@pytest.mark.parametrize("test_vector", CLI_TEST_VECTORS)
def test_convert_url(shared_tmp_dir, test_vector):
"""Test the conversion of a stream with no stream info."""
# Note: tmp_dir is not used here, but is needed to match the signature
markitdown = MarkItDown()
time.sleep(1) # Ensure we don't hit rate limits
result = subprocess.run(
["python", "-m", "markitdown", TEST_FILES_URL + "/" + test_vector.filename],
capture_output=True,
text=False,
)
stdout = result.stdout.decode(locale.getpreferredencoding())
assert result.returncode == 0, f"CLI exited with error: {result.stderr}"
for test_string in test_vector.must_include:
assert test_string in stdout
for test_string in test_vector.must_not_include:
assert test_string not in stdout
if __name__ == "__main__":
import sys
import tempfile
"""Runs this file's tests from the command line."""
with tempfile.TemporaryDirectory() as tmp_dir:
for test_function in [
test_output_to_stdout,
test_output_to_file,
test_input_from_stdin_without_hints,
test_convert_url,
]:
for test_vector in CLI_TEST_VECTORS:
print(
f"Running {test_function.__name__} on {test_vector.filename}...",
end="",
)
test_function(tmp_dir, test_vector)
print("OK")
print("All tests passed!")

Binary file not shown.

View File

@@ -3,9 +3,7 @@ import io
import os import os
import shutil import shutil
import openai import openai
import pytest import pytest
import requests
from markitdown import ( from markitdown import (
MarkItDown, MarkItDown,
@@ -13,7 +11,10 @@ from markitdown import (
FileConversionException, FileConversionException,
StreamInfo, StreamInfo,
) )
from markitdown._stream_info import _guess_stream_info_from_stream
# This file contains module tests that are not directly tested by the FileTestVectors.
# This includes things like helper functions and runtime conversion options
# (e.g., LLM clients, exiftool path, transcription services, etc.)
skip_remote = ( skip_remote = (
True if os.environ.get("GITHUB_ACTIONS") else False True if os.environ.get("GITHUB_ACTIONS") else False
@@ -60,36 +61,6 @@ YOUTUBE_TEST_STRINGS = [
"the model we're going to be using today is GPT 3.5 turbo", # From the transcript "the model we're going to be using today is GPT 3.5 turbo", # From the transcript
] ]
XLSX_TEST_STRINGS = [
"## 09060124-b5e7-4717-9d07-3c046eb",
"6ff4173b-42a5-4784-9b19-f49caff4d93d",
"affc7dad-52dc-4b98-9b5d-51e65d8a8ad0",
]
XLS_TEST_STRINGS = [
"## 09060124-b5e7-4717-9d07-3c046eb",
"6ff4173b-42a5-4784-9b19-f49caff4d93d",
"affc7dad-52dc-4b98-9b5d-51e65d8a8ad0",
]
DOCX_TEST_STRINGS = [
"314b0a30-5b04-470b-b9f7-eed2c2bec74a",
"49e168b7-d2ae-407f-a055-2167576f39a1",
"## d666f1f7-46cb-42bd-9a39-9a39cf2a509f",
"# Abstract",
"# Introduction",
"AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation",
]
MSG_TEST_STRINGS = [
"# Email Message",
"**From:** test.sender@example.com",
"**To:** test.recipient@example.com",
"**Subject:** Test Email Message",
"## Content",
"This is the body of the test email message",
]
DOCX_COMMENT_TEST_STRINGS = [ DOCX_COMMENT_TEST_STRINGS = [
"314b0a30-5b04-470b-b9f7-eed2c2bec74a", "314b0a30-5b04-470b-b9f7-eed2c2bec74a",
"49e168b7-d2ae-407f-a055-2167576f39a1", "49e168b7-d2ae-407f-a055-2167576f39a1",
@@ -101,6 +72,16 @@ DOCX_COMMENT_TEST_STRINGS = [
"Yet another comment in the doc. 55yiyi-asd09", "Yet another comment in the doc. 55yiyi-asd09",
] ]
BLOG_TEST_URL = "https://microsoft.github.io/autogen/blog/2023/04/21/LLM-tuning-math"
BLOG_TEST_STRINGS = [
"Large language models (LLMs) are powerful tools that can generate natural language texts for various applications, such as chatbots, summarization, translation, and more. GPT-4 is currently the state of the art LLM in the world. Is model selection irrelevant? What about inference parameters?",
"an example where high cost can easily prevent a generic complex",
]
LLM_TEST_STRINGS = [
"5bda1dd6",
]
PPTX_TEST_STRINGS = [ PPTX_TEST_STRINGS = [
"2cdda5c8-e50e-4db4-b5f0-9722a649f455", "2cdda5c8-e50e-4db4-b5f0-9722a649f455",
"04191ea8-5c73-4215-a1d3-1cfb43aaaf12", "04191ea8-5c73-4215-a1d3-1cfb43aaaf12",
@@ -111,57 +92,6 @@ PPTX_TEST_STRINGS = [
"2003", # chart value "2003", # chart value
] ]
BLOG_TEST_URL = "https://microsoft.github.io/autogen/blog/2023/04/21/LLM-tuning-math"
BLOG_TEST_STRINGS = [
"Large language models (LLMs) are powerful tools that can generate natural language texts for various applications, such as chatbots, summarization, translation, and more. GPT-4 is currently the state of the art LLM in the world. Is model selection irrelevant? What about inference parameters?",
"an example where high cost can easily prevent a generic complex",
]
RSS_TEST_STRINGS = [
"The Official Microsoft Blog",
"In the case of AI, it is absolutely true that the industry is moving incredibly fast",
]
WIKIPEDIA_TEST_URL = "https://en.wikipedia.org/wiki/Microsoft"
WIKIPEDIA_TEST_STRINGS = [
"Microsoft entered the operating system (OS) business in 1980 with its own version of [Unix]",
'Microsoft was founded by [Bill Gates](/wiki/Bill_Gates "Bill Gates")',
]
WIKIPEDIA_TEST_EXCLUDES = [
"You are encouraged to create an account and log in",
"154 languages",
"move to sidebar",
]
SERP_TEST_URL = "https://www.bing.com/search?q=microsoft+wikipedia"
SERP_TEST_STRINGS = [
"](https://en.wikipedia.org/wiki/Microsoft",
"Microsoft Corporation is **an American multinational corporation and technology company headquartered** in Redmond",
"19952007: Foray into the Web, Windows 95, Windows XP, and Xbox",
]
SERP_TEST_EXCLUDES = [
"https://www.bing.com/ck/a?!&&p=",
"data:image/svg+xml,%3Csvg%20width%3D",
]
CSV_CP932_TEST_STRINGS = [
"名前,年齢,住所",
"佐藤太郎,30,東京",
"三木英子,25,大阪",
"髙橋淳,35,名古屋",
]
LLM_TEST_STRINGS = [
"5bda1dd6",
]
JSON_TEST_STRINGS = [
"5b64c88c-b3c3-4510-bcb8-da0b200602d8",
"9700dc99-6685-40b4-9a3a-5e406dcb37f3",
]
# --- Helper Functions --- # --- Helper Functions ---
def validate_strings(result, expected_strings, exclude_strings=None): def validate_strings(result, expected_strings, exclude_strings=None):
@@ -246,33 +176,29 @@ def test_stream_info_operations() -> None:
assert updated_stream_info.url == "url.1" assert updated_stream_info.url == "url.1"
def test_stream_info_guesses() -> None: def test_docx_comments() -> None:
"""Test StreamInfo guesses based on stream content.""" markitdown = MarkItDown()
test_tuples = [ # Test DOCX processing, with comments and setting style_map on init
( markitdown_with_style_map = MarkItDown(style_map="comment-reference => ")
os.path.join(TEST_FILES_DIR, "test.xlsx"), result = markitdown_with_style_map.convert(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", os.path.join(TEST_FILES_DIR, "test_with_comment.docx")
), )
( validate_strings(result, DOCX_COMMENT_TEST_STRINGS)
os.path.join(TEST_FILES_DIR, "test.docx"),
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
),
(
os.path.join(TEST_FILES_DIR, "test.pptx"),
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
),
(os.path.join(TEST_FILES_DIR, "test.xls"), "application/vnd.ms-excel"),
]
for file_path, expected_mimetype in test_tuples:
with open(file_path, "rb") as f: def test_input_as_strings() -> None:
guesses = _guess_stream_info_from_stream( markitdown = MarkItDown()
f, filename_hint=os.path.basename(file_path)
) # Test input from a stream
assert len(guesses) > 0 input_data = b"<html><body><h1>Test</h1></body></html>"
assert guesses[0].mimetype == expected_mimetype result = markitdown.convert_stream(io.BytesIO(input_data))
assert guesses[0].extension == os.path.splitext(file_path)[1] assert "# Test" in result.text_content
# Test input with leading blank characters
input_data = b" \n\n\n<html><body><h1>Test</h1></body></html>"
result = markitdown.convert_stream(io.BytesIO(input_data))
assert "# Test" in result.text_content
@pytest.mark.skipif( @pytest.mark.skipif(
@@ -287,194 +213,12 @@ def test_markitdown_remote() -> None:
for test_string in PDF_TEST_STRINGS: for test_string in PDF_TEST_STRINGS:
assert test_string in result.text_content assert test_string in result.text_content
# By stream
response = requests.get(PDF_TEST_URL)
result = markitdown.convert_stream(
io.BytesIO(response.content), file_extension=".pdf", url=PDF_TEST_URL
)
for test_string in PDF_TEST_STRINGS:
assert test_string in result.text_content
# Youtube # Youtube
result = markitdown.convert(YOUTUBE_TEST_URL) result = markitdown.convert(YOUTUBE_TEST_URL)
for test_string in YOUTUBE_TEST_STRINGS: for test_string in YOUTUBE_TEST_STRINGS:
assert test_string in result.text_content assert test_string in result.text_content
def test_markitdown_local() -> None:
markitdown = MarkItDown()
# Test PDF processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.pdf"))
validate_strings(result, PDF_TEST_STRINGS)
# Test XLSX processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.xlsx"))
validate_strings(result, XLSX_TEST_STRINGS)
# Test XLS processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.xls"))
for test_string in XLS_TEST_STRINGS:
text_content = result.text_content.replace("\\", "")
assert test_string in text_content
# Test DOCX processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.docx"))
validate_strings(result, DOCX_TEST_STRINGS)
# Test DOCX processing, with comments
result = markitdown.convert(
os.path.join(TEST_FILES_DIR, "test_with_comment.docx"),
style_map="comment-reference => ",
)
validate_strings(result, DOCX_COMMENT_TEST_STRINGS)
# Test DOCX processing, with comments and setting style_map on init
markitdown_with_style_map = MarkItDown(style_map="comment-reference => ")
result = markitdown_with_style_map.convert(
os.path.join(TEST_FILES_DIR, "test_with_comment.docx")
)
validate_strings(result, DOCX_COMMENT_TEST_STRINGS)
# Test PPTX processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.pptx"))
validate_strings(result, PPTX_TEST_STRINGS)
# Test HTML processing
result = markitdown.convert(
os.path.join(TEST_FILES_DIR, "test_blog.html"), url=BLOG_TEST_URL
)
validate_strings(result, BLOG_TEST_STRINGS)
# Test Wikipedia processing
result = markitdown.convert(
os.path.join(TEST_FILES_DIR, "test_wikipedia.html"), url=WIKIPEDIA_TEST_URL
)
text_content = result.text_content.replace("\\", "")
validate_strings(result, WIKIPEDIA_TEST_STRINGS, WIKIPEDIA_TEST_EXCLUDES)
# Test Bing processing
result = markitdown.convert(
os.path.join(TEST_FILES_DIR, "test_serp.html"), url=SERP_TEST_URL
)
text_content = result.text_content.replace("\\", "")
validate_strings(result, SERP_TEST_STRINGS, SERP_TEST_EXCLUDES)
# Test RSS processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_rss.xml"))
text_content = result.text_content.replace("\\", "")
for test_string in RSS_TEST_STRINGS:
assert test_string in text_content
# Test MSG (Outlook email) processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_outlook_msg.msg"))
validate_strings(result, MSG_TEST_STRINGS)
# Test non-UTF-8 encoding
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_mskanji.csv"))
validate_strings(result, CSV_CP932_TEST_STRINGS)
# Test JSON processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.json"))
validate_strings(result, JSON_TEST_STRINGS)
# # Test ZIP file processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_files.zip"))
validate_strings(result, DOCX_TEST_STRINGS)
validate_strings(result, XLSX_TEST_STRINGS)
validate_strings(result, BLOG_TEST_STRINGS)
# Test input from a stream
input_data = b"<html><body><h1>Test</h1></body></html>"
result = markitdown.convert_stream(io.BytesIO(input_data))
assert "# Test" in result.text_content
# Test input with leading blank characters
input_data = b" \n\n\n<html><body><h1>Test</h1></body></html>"
result = markitdown.convert_stream(io.BytesIO(input_data))
assert "# Test" in result.text_content
def test_markitdown_streams() -> None:
markitdown = MarkItDown()
# Test PDF processing
with open(os.path.join(TEST_FILES_DIR, "test.pdf"), "rb") as f:
result = markitdown.convert(f, file_extension=".pdf")
validate_strings(result, PDF_TEST_STRINGS)
# Test XLSX processing
with open(os.path.join(TEST_FILES_DIR, "test.xlsx"), "rb") as f:
result = markitdown.convert(f, file_extension=".xlsx")
validate_strings(result, XLSX_TEST_STRINGS)
# Test XLS processing
with open(os.path.join(TEST_FILES_DIR, "test.xls"), "rb") as f:
result = markitdown.convert(f, file_extension=".xls")
for test_string in XLS_TEST_STRINGS:
text_content = result.text_content.replace("\\", "")
assert test_string in text_content
# Test DOCX processing
with open(os.path.join(TEST_FILES_DIR, "test.docx"), "rb") as f:
result = markitdown.convert(f, file_extension=".docx")
validate_strings(result, DOCX_TEST_STRINGS)
# Test DOCX processing, with comments
with open(os.path.join(TEST_FILES_DIR, "test_with_comment.docx"), "rb") as f:
result = markitdown.convert(
f,
file_extension=".docx",
style_map="comment-reference => ",
)
validate_strings(result, DOCX_COMMENT_TEST_STRINGS)
# Test DOCX processing, with comments and setting style_map on init
markitdown_with_style_map = MarkItDown(style_map="comment-reference => ")
with open(os.path.join(TEST_FILES_DIR, "test_with_comment.docx"), "rb") as f:
result = markitdown_with_style_map.convert(f, file_extension=".docx")
validate_strings(result, DOCX_COMMENT_TEST_STRINGS)
# Test PPTX processing
with open(os.path.join(TEST_FILES_DIR, "test.pptx"), "rb") as f:
result = markitdown.convert(f, file_extension=".pptx")
validate_strings(result, PPTX_TEST_STRINGS)
# Test HTML processing
with open(os.path.join(TEST_FILES_DIR, "test_blog.html"), "rb") as f:
result = markitdown.convert(f, file_extension=".html", url=BLOG_TEST_URL)
validate_strings(result, BLOG_TEST_STRINGS)
# Test Wikipedia processing
with open(os.path.join(TEST_FILES_DIR, "test_wikipedia.html"), "rb") as f:
result = markitdown.convert(f, file_extension=".html", url=WIKIPEDIA_TEST_URL)
text_content = result.text_content.replace("\\", "")
validate_strings(result, WIKIPEDIA_TEST_STRINGS, WIKIPEDIA_TEST_EXCLUDES)
# Test Bing processing
with open(os.path.join(TEST_FILES_DIR, "test_serp.html"), "rb") as f:
result = markitdown.convert(f, file_extension=".html", url=SERP_TEST_URL)
text_content = result.text_content.replace("\\", "")
validate_strings(result, SERP_TEST_STRINGS, SERP_TEST_EXCLUDES)
# Test RSS processing
with open(os.path.join(TEST_FILES_DIR, "test_rss.xml"), "rb") as f:
result = markitdown.convert(f, file_extension=".xml")
text_content = result.text_content.replace("\\", "")
for test_string in RSS_TEST_STRINGS:
assert test_string in text_content
# Test MSG (Outlook email) processing
with open(os.path.join(TEST_FILES_DIR, "test_outlook_msg.msg"), "rb") as f:
result = markitdown.convert(f, file_extension=".msg")
validate_strings(result, MSG_TEST_STRINGS)
# Test JSON processing
with open(os.path.join(TEST_FILES_DIR, "test.json"), "rb") as f:
result = markitdown.convert(f, file_extension=".json")
validate_strings(result, JSON_TEST_STRINGS)
@pytest.mark.skipif( @pytest.mark.skipif(
skip_remote, skip_remote,
reason="do not run remotely run speech transcription tests", reason="do not run remotely run speech transcription tests",
@@ -568,13 +312,17 @@ def test_markitdown_llm() -> None:
if __name__ == "__main__": if __name__ == "__main__":
"""Runs this file's tests from the command line.""" """Runs this file's tests from the command line."""
test_stream_info_operations() for test in [
test_stream_info_guesses() test_stream_info_operations,
test_markitdown_remote() test_docx_comments,
test_markitdown_local() test_input_as_strings,
test_markitdown_streams() test_markitdown_remote,
test_speech_transcription() test_speech_transcription,
test_exceptions() test_exceptions,
test_markitdown_exiftool() test_markitdown_exiftool,
test_markitdown_llm() test_markitdown_llm,
]:
print(f"Running {test.__name__}...", end="")
test()
print("OK")
print("All tests passed!") print("All tests passed!")

View File

@@ -0,0 +1,144 @@
#!/usr/bin/env python3 -m pytest
import os
import time
import pytest
import codecs
if __name__ == "__main__":
from _test_vectors import GENERAL_TEST_VECTORS
else:
from ._test_vectors import GENERAL_TEST_VECTORS
from markitdown import (
MarkItDown,
UnsupportedFormatException,
FileConversionException,
StreamInfo,
)
skip_remote = (
True if os.environ.get("GITHUB_ACTIONS") else False
) # Don't run these tests in CI
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "test_files")
TEST_FILES_URL = "https://raw.githubusercontent.com/microsoft/markitdown/refs/heads/main/packages/markitdown/tests/test_files"
@pytest.mark.parametrize("test_vector", GENERAL_TEST_VECTORS)
def test_guess_stream_info(test_vector):
"""Test the ability to guess stream info."""
markitdown = MarkItDown()
local_path = os.path.join(TEST_FILES_DIR, test_vector.filename)
expected_extension = os.path.splitext(test_vector.filename)[1]
with open(local_path, "rb") as stream:
guesses = markitdown._get_stream_info_guesses(
stream,
base_guess=StreamInfo(
filename=os.path.basename(test_vector.filename),
local_path=local_path,
extension=expected_extension,
),
)
# For some limited exceptions, we can't guarantee the exact
# mimetype or extension, so we'll special-case them here.
if test_vector.filename in [
"test_outlook_msg.msg",
]:
return
assert guesses[0].mimetype == test_vector.mimetype
assert guesses[0].extension == expected_extension
assert guesses[0].charset == test_vector.charset
@pytest.mark.parametrize("test_vector", GENERAL_TEST_VECTORS)
def test_convert_local(test_vector):
"""Test the conversion of a local file."""
markitdown = MarkItDown()
result = markitdown.convert(
os.path.join(TEST_FILES_DIR, test_vector.filename), url=test_vector.url
)
for string in test_vector.must_include:
assert string in result.markdown
for string in test_vector.must_not_include:
assert string not in result.markdown
@pytest.mark.parametrize("test_vector", GENERAL_TEST_VECTORS)
def test_convert_stream_with_hints(test_vector):
"""Test the conversion of a stream with full stream info."""
markitdown = MarkItDown()
stream_info = StreamInfo(
extension=os.path.splitext(test_vector.filename)[1],
mimetype=test_vector.mimetype,
charset=test_vector.charset,
)
with open(os.path.join(TEST_FILES_DIR, test_vector.filename), "rb") as stream:
result = markitdown.convert(
stream, stream_info=stream_info, url=test_vector.url
)
for string in test_vector.must_include:
assert string in result.markdown
for string in test_vector.must_not_include:
assert string not in result.markdown
@pytest.mark.parametrize("test_vector", GENERAL_TEST_VECTORS)
def test_convert_stream_without_hints(test_vector):
"""Test the conversion of a stream with no stream info."""
markitdown = MarkItDown()
with open(os.path.join(TEST_FILES_DIR, test_vector.filename), "rb") as stream:
result = markitdown.convert(stream, url=test_vector.url)
for string in test_vector.must_include:
assert string in result.markdown
for string in test_vector.must_not_include:
assert string not in result.markdown
@pytest.mark.skipif(
skip_remote,
reason="do not run tests that query external urls",
)
@pytest.mark.parametrize("test_vector", GENERAL_TEST_VECTORS)
def test_convert_url(test_vector):
"""Test the conversion of a stream with no stream info."""
markitdown = MarkItDown()
time.sleep(1) # Ensure we don't hit rate limits
result = markitdown.convert(
TEST_FILES_URL + "/" + test_vector.filename,
url=test_vector.url, # Mock where this file would be found
)
for string in test_vector.must_include:
assert string in result.markdown
for string in test_vector.must_not_include:
assert string not in result.markdown
if __name__ == "__main__":
import sys
"""Runs this file's tests from the command line."""
for test_function in [
test_guess_stream_info,
test_convert_local,
test_convert_stream_with_hints,
test_convert_stream_without_hints,
test_convert_url,
]:
for test_vector in GENERAL_TEST_VECTORS:
print(
f"Running {test_function.__name__} on {test_vector.filename}...", end=""
)
test_function(test_vector)
print("OK")
print("All tests passed!")