Files
test/packages/markitdown/src/markitdown/_markitdown.py
2025-02-27 16:44:50 -05:00

480 lines
18 KiB
Python

import copy
import mimetypes
import os
import re
import tempfile
import warnings
import traceback
from importlib.metadata import entry_points
from typing import Any, List, Optional, Union
from pathlib import Path
from urllib.parse import urlparse
from warnings import warn
from io import BufferedIOBase, TextIOBase, BytesIO
# File-format detection
import puremagic
import requests
from .converters import (
DocumentConverter,
DocumentConverterResult,
PlainTextConverter,
HtmlConverter,
RssConverter,
WikipediaConverter,
YouTubeConverter,
IpynbConverter,
BingSerpConverter,
PdfConverter,
DocxConverter,
XlsxConverter,
XlsConverter,
PptxConverter,
ImageConverter,
WavConverter,
Mp3Converter,
OutlookMsgConverter,
ZipConverter,
DocumentIntelligenceConverter,
ConverterInput,
)
from ._exceptions import (
FileConversionException,
UnsupportedFormatException,
ConverterPrerequisiteException,
)
# Override mimetype for csv to fix issue on windows
mimetypes.add_type("text/csv", ".csv")
_plugins: Union[None | List[Any]] = None
def _load_plugins() -> Union[None | List[Any]]:
"""Lazy load plugins, exiting early if already loaded."""
global _plugins
# Skip if we've already loaded plugins
if _plugins is not None:
return _plugins
# Load plugins
_plugins = []
for entry_point in entry_points(group="markitdown.plugin"):
try:
_plugins.append(entry_point.load())
except Exception:
tb = traceback.format_exc()
warn(f"Plugin '{entry_point.name}' failed to load ... skipping:\n{tb}")
return _plugins
class MarkItDown:
"""(In preview) An extremely simple text-based document reader, suitable for LLM use.
This reader will convert common file-types or webpages to Markdown."""
def __init__(
self,
*,
enable_builtins: Union[None, bool] = None,
enable_plugins: Union[None, bool] = None,
**kwargs,
):
self._builtins_enabled = False
self._plugins_enabled = False
requests_session = kwargs.get("requests_session")
if requests_session is None:
self._requests_session = requests.Session()
else:
self._requests_session = requests_session
# TODO - remove these (see enable_builtins)
self._llm_client = None
self._llm_model = None
self._exiftool_path = None
self._style_map = None
# Register the converters
self._page_converters: List[DocumentConverter] = []
if (
enable_builtins is None or enable_builtins
): # Default to True when not specified
self.enable_builtins(**kwargs)
if enable_plugins:
self.enable_plugins(**kwargs)
def enable_builtins(self, **kwargs) -> None:
"""
Enable and register built-in converters.
Built-in converters are enabled by default.
This method should only be called once, if built-ins were initially disabled.
"""
if not self._builtins_enabled:
# TODO: Move these into converter constructors
self._llm_client = kwargs.get("llm_client")
self._llm_model = kwargs.get("llm_model")
self._exiftool_path = kwargs.get("exiftool_path")
self._style_map = kwargs.get("style_map")
if self._exiftool_path is None:
self._exiftool_path = os.getenv("EXIFTOOL_PATH")
# Register converters for successful browsing operations
# Later registrations are tried first / take higher priority than earlier registrations
# To this end, the most specific converters should appear below the most generic converters
self.register_converter(PlainTextConverter())
self.register_converter(ZipConverter())
self.register_converter(HtmlConverter())
self.register_converter(RssConverter())
self.register_converter(WikipediaConverter())
self.register_converter(YouTubeConverter())
self.register_converter(BingSerpConverter())
self.register_converter(DocxConverter())
self.register_converter(XlsxConverter())
self.register_converter(XlsConverter())
self.register_converter(PptxConverter())
self.register_converter(WavConverter())
self.register_converter(Mp3Converter())
self.register_converter(ImageConverter())
self.register_converter(IpynbConverter())
self.register_converter(PdfConverter())
self.register_converter(OutlookMsgConverter())
# Register Document Intelligence converter at the top of the stack if endpoint is provided
docintel_endpoint = kwargs.get("docintel_endpoint")
if docintel_endpoint is not None:
self.register_converter(
DocumentIntelligenceConverter(endpoint=docintel_endpoint)
)
self._builtins_enabled = True
else:
warn("Built-in converters are already enabled.", RuntimeWarning)
def enable_plugins(self, **kwargs) -> None:
"""
Enable and register converters provided by plugins.
Plugins are disabled by default.
This method should only be called once, if plugins were initially disabled.
"""
if not self._plugins_enabled:
# Load plugins
for plugin in _load_plugins():
try:
plugin.register_converters(self, **kwargs)
except Exception:
tb = traceback.format_exc()
warn(f"Plugin '{plugin}' failed to register converters:\n{tb}")
self._plugins_enabled = True
else:
warn("Plugins converters are already enabled.", RuntimeWarning)
def convert(
self,
source: Union[str, requests.Response, Path, BufferedIOBase, TextIOBase],
**kwargs: Any,
) -> DocumentConverterResult: # TODO: deal with kwargs
"""
Args:
- source: can be a string representing a path either as string pathlib path object or url, a requests.response object, or a file object (TextIO or BinaryIO)
- extension: specifies the file extension to use when interpreting the file. If None, infer from source (path, uri, content-type, etc.)
"""
# Local path or url
if isinstance(source, str):
if (
source.startswith("http://")
or source.startswith("https://")
or source.startswith("file://")
):
return self.convert_url(source, **kwargs)
else:
return self.convert_local(source, **kwargs)
# Request response
elif isinstance(source, requests.Response):
return self.convert_response(source, **kwargs)
elif isinstance(source, Path):
return self.convert_local(source, **kwargs)
# File object
elif isinstance(source, BufferedIOBase) or isinstance(source, TextIOBase):
return self.convert_file_object(source, **kwargs)
def convert_local(
self, path: Union[str, Path], **kwargs: Any
) -> DocumentConverterResult: # TODO: deal with kwargs
if isinstance(path, Path):
path = str(path)
# Prepare a list of extensions to try (in order of priority)
ext = kwargs.get("file_extension")
extensions = [ext] if ext is not None else []
# Get extension alternatives from the path and puremagic
base, ext = os.path.splitext(path)
self._append_ext(extensions, ext)
for g in self._guess_ext_magic(source=path):
self._append_ext(extensions, g)
# Create the ConverterInput object
input = ConverterInput(input_type="filepath", filepath=path)
# Convert
return self._convert(input, extensions, **kwargs)
def convert_file_object(
self, file_object: Union[BufferedIOBase, TextIOBase], **kwargs: Any
) -> DocumentConverterResult: # TODO: deal with kwargs
# Prepare a list of extensions to try (in order of priority
ext = kwargs.get("file_extension")
extensions = [ext] if ext is not None else []
# TODO: Curently, there are some ongoing issues with passing direct file objects to puremagic (incorrect guesses, unsupported file type errors, etc.)
# Only use puremagic as a last resort if no extensions were provided
if extensions == []:
for g in self._guess_ext_magic(source=file_object):
self._append_ext(extensions, g)
# Create the ConverterInput object
input = ConverterInput(input_type="object", file_object=file_object)
# Convert
return self._convert(input, extensions, **kwargs)
# TODO what should stream's type be?
def convert_stream(
self, stream: Any, **kwargs: Any
) -> DocumentConverterResult: # TODO: deal with kwargs
# Prepare a list of extensions to try (in order of priority)
ext = kwargs.get("file_extension")
extensions = [ext] if ext is not None else []
# Save the file locally to a temporary file. It will be deleted before this method exits
handle, temp_path = tempfile.mkstemp()
fh = os.fdopen(handle, "wb")
result = None
try:
# Write to the temporary file
content = stream.read()
if isinstance(content, str):
fh.write(content.encode("utf-8"))
else:
fh.write(content)
fh.close()
# Use puremagic to check for more extension options
for g in self._guess_ext_magic(source=temp_path):
self._append_ext(extensions, g)
# Create the ConverterInput object
input = ConverterInput(input_type="filepath", filepath=temp_path)
# Convert
result = self._convert(input, extensions, **kwargs)
# Clean up
finally:
try:
fh.close()
except Exception:
pass
os.unlink(temp_path)
return result
def convert_url(
self, url: str, **kwargs: Any
) -> DocumentConverterResult: # TODO: fix kwargs type
# Send a HTTP request to the URL
response = self._requests_session.get(url, stream=True)
response.raise_for_status()
return self.convert_response(response, **kwargs)
def convert_response(
self, response: requests.Response, **kwargs: Any
) -> DocumentConverterResult: # TODO fix kwargs type
# Prepare a list of extensions to try (in order of priority)
ext = kwargs.get("file_extension")
extensions = [ext] if ext is not None else []
# Guess from the mimetype
content_type = response.headers.get("content-type", "").split(";")[0]
self._append_ext(extensions, mimetypes.guess_extension(content_type))
# Read the content disposition if there is one
content_disposition = response.headers.get("content-disposition", "")
m = re.search(r"filename=([^;]+)", content_disposition)
if m:
base, ext = os.path.splitext(m.group(1).strip("\"'"))
self._append_ext(extensions, ext)
# Read from the extension from the path
base, ext = os.path.splitext(urlparse(response.url).path)
self._append_ext(extensions, ext)
# Save the file locally to a temporary file. It will be deleted before this method exits
handle, temp_path = tempfile.mkstemp()
fh = os.fdopen(handle, "wb")
result = None
try:
# Download the file
for chunk in response.iter_content(chunk_size=512):
fh.write(chunk)
fh.close()
# Use puremagic to check for more extension options
for g in self._guess_ext_magic(source=temp_path):
self._append_ext(extensions, g)
# Create the ConverterInput object
input = ConverterInput(input_type="filepath", filepath=temp_path)
# Convert
result = self._convert(input, extensions, url=response.url, **kwargs)
# Clean up
finally:
try:
fh.close()
except Exception:
pass
os.unlink(temp_path)
return result
def _convert(
self, input: ConverterInput, extensions: List[Union[str, None]], **kwargs
) -> DocumentConverterResult:
error_trace = ""
# Create a copy of the page_converters list, sorted by priority.
# We do this with each call to _convert because the priority of converters may change between calls.
# The sort is guaranteed to be stable, so converters with the same priority will remain in the same order.
sorted_converters = sorted(self._page_converters, key=lambda x: x.priority)
for ext in extensions + [None]: # Try last with no extension
for converter in sorted_converters:
_kwargs = copy.deepcopy(kwargs)
# Overwrite file_extension appropriately
if ext is None:
if "file_extension" in _kwargs:
del _kwargs["file_extension"]
else:
_kwargs.update({"file_extension": ext})
# Copy any additional global options
if "llm_client" not in _kwargs and self._llm_client is not None:
_kwargs["llm_client"] = self._llm_client
if "llm_model" not in _kwargs and self._llm_model is not None:
_kwargs["llm_model"] = self._llm_model
if "style_map" not in _kwargs and self._style_map is not None:
_kwargs["style_map"] = self._style_map
if "exiftool_path" not in _kwargs and self._exiftool_path is not None:
_kwargs["exiftool_path"] = self._exiftool_path
# Add the list of converters for nested processing
_kwargs["_parent_converters"] = self._page_converters
# If we hit an error log it and keep trying
try:
res = converter.convert(input, **_kwargs)
except Exception:
error_trace = ("\n\n" + traceback.format_exc()).strip()
if res is not None:
# Normalize the content
res.text_content = "\n".join(
[line.rstrip() for line in re.split(r"\r?\n", res.text_content)]
)
res.text_content = re.sub(r"\n{3,}", "\n\n", res.text_content)
# Todo
return res
# If we got this far without success, report any exceptions
if len(error_trace) > 0:
raise FileConversionException(
f"Could not convert '{input.filepath}' to Markdown. File type was recognized as {extensions}. While converting the file, the following error was encountered:\n\n{error_trace}"
)
# Nothing can handle it!
raise UnsupportedFormatException(
f"Could not convert '{input.filepath}' to Markdown. The formats {extensions} are not supported."
)
def _append_ext(self, extensions, ext):
"""Append a unique non-None, non-empty extension to a list of extensions."""
if ext is None:
return
ext = ext.strip()
if ext == "":
return
# if ext not in extensions:
extensions.append(ext)
def _guess_ext_magic(self, source):
"""Use puremagic (a Python implementation of libmagic) to guess a file's extension based on the first few bytes."""
# Use puremagic to guess
try:
guesses = []
# Guess extensions for filepaths
if isinstance(source, str):
guesses = puremagic.magic_file(source)
# Fix for: https://github.com/microsoft/markitdown/issues/222
# If there are no guesses, then try again after trimming leading ASCII whitespaces.
# ASCII whitespace characters are those byte values in the sequence b' \t\n\r\x0b\f'
# (space, tab, newline, carriage return, vertical tab, form feed).
if len(guesses) == 0:
with open(source, "rb") as file:
while True:
char = file.read(1)
if not char: # End of file
break
if not char.isspace():
file.seek(file.tell() - 1)
break
try:
guesses = puremagic.magic_stream(file)
except puremagic.main.PureError:
pass
# Guess extensions for file objects. Note that the puremagic's magic_stream function requires a BytesIO-like file source
# TODO: Figure out how to guess extensions for TextIO-like file sources (manually converting to BytesIO does not work)
elif isinstance(source, BufferedIOBase):
guesses = puremagic.magic_stream(source)
extensions = list()
for g in guesses:
ext = g.extension.strip()
if len(ext) > 0:
if not ext.startswith("."):
ext = "." + ext
if ext not in extensions:
extensions.append(ext)
return extensions
except FileNotFoundError:
pass
except IsADirectoryError:
pass
except PermissionError:
pass
return []
def register_page_converter(self, converter: DocumentConverter) -> None:
"""DEPRECATED: User register_converter instead."""
warn(
"register_page_converter is deprecated. Use register_converter instead.",
DeprecationWarning,
)
self.register_converter(converter)
def register_converter(self, converter: DocumentConverter) -> None:
"""Register a page text converter."""
self._page_converters.insert(0, converter)