added conversion path for file object in central class

This commit is contained in:
Kenny Zhang
2025-02-19 17:02:51 -05:00
parent e75f3f6f5b
commit 808401a331

View File

@@ -10,6 +10,7 @@ from typing import Any, List, Optional, Union
from pathlib import Path
from urllib.parse import urlparse
from warnings import warn
from io import BufferedIOBase, TextIOBase
# File-format detection
import puremagic
@@ -174,11 +175,11 @@ class MarkItDown:
warn("Plugins converters are already enabled.", RuntimeWarning)
def convert(
self, source: Union[str, requests.Response, Path], **kwargs: Any
self, source: Union[str, requests.Response, Path, BufferedIOBase, TextIOBase], **kwargs: Any
) -> DocumentConverterResult: # TODO: deal with kwargs
"""
Args:
- source: can be a string representing a path either as string pathlib path object or url, or a requests.response object
- source: can be a string representing a path either as string pathlib path object or url, a requests.response object, or a file object (TextIO or BinaryIO)
- extension: specifies the file extension to use when interpreting the file. If None, infer from source (path, uri, content-type, etc.)
"""
@@ -211,7 +212,7 @@ class MarkItDown:
base, ext = os.path.splitext(path)
self._append_ext(extensions, ext)
for g in self._guess_ext_magic(path):
for g in self._guess_ext_magic(source=path):
self._append_ext(extensions, g)
# Create the ConverterInput object
@@ -219,6 +220,23 @@ class MarkItDown:
# Convert
return self._convert(input, extensions, **kwargs)
def convert_file_object(
self, file_object: Union[BufferedIOBase, TextIOBase], **kwargs: Any
) -> DocumentConverterResult: #TODO: deal with kwargs
# Prepare a list of extensions to try (in order of priority)
ext = kwargs.get("file_extension")
extensions = [ext] if ext is not None else []
# Get extension alternatives from puremagic
for g in self._guess_ext_magic(source=file_object):
self._append_ext(extensions, g)
# Create the ConverterInput object
input = ConverterInput(input_type="object", file_object=file_object)
# Convert
return self._convert(input, extensions, **kwargs)
# TODO what should stream's type be?
def convert_stream(
@@ -242,7 +260,7 @@ class MarkItDown:
fh.close()
# Use puremagic to check for more extension options
for g in self._guess_ext_magic(temp_path):
for g in self._guess_ext_magic(source=temp_path):
self._append_ext(extensions, g)
# Create the ConverterInput object
@@ -301,7 +319,7 @@ class MarkItDown:
fh.close()
# Use puremagic to check for more extension options
for g in self._guess_ext_magic(temp_path):
for g in self._guess_ext_magic(source=temp_path):
self._append_ext(extensions, g)
# Create the ConverterInput object
@@ -393,29 +411,37 @@ class MarkItDown:
# if ext not in extensions:
extensions.append(ext)
def _guess_ext_magic(self, path):
def _guess_ext_magic(self, source):
"""Use puremagic (a Python implementation of libmagic) to guess a file's extension based on the first few bytes."""
# Use puremagic to guess
try:
guesses = puremagic.magic_file(path)
guesses = None
# Guess extensions for filepaths
if isinstance(source, str):
guesses = puremagic.magic_file(source)
# Fix for: https://github.com/microsoft/markitdown/issues/222
# If there are no guesses, then try again after trimming leading ASCII whitespaces.
# ASCII whitespace characters are those byte values in the sequence b' \t\n\r\x0b\f'
# (space, tab, newline, carriage return, vertical tab, form feed).
if len(guesses) == 0:
with open(path, "rb") as file:
while True:
char = file.read(1)
if not char: # End of file
break
if not char.isspace():
file.seek(file.tell() - 1)
break
try:
guesses = puremagic.magic_stream(file)
except puremagic.main.PureError:
pass
# Fix for: https://github.com/microsoft/markitdown/issues/222
# If there are no guesses, then try again after trimming leading ASCII whitespaces.
# ASCII whitespace characters are those byte values in the sequence b' \t\n\r\x0b\f'
# (space, tab, newline, carriage return, vertical tab, form feed).
if len(guesses) == 0:
with open(source, "rb") as file:
while True:
char = file.read(1)
if not char: # End of file
break
if not char.isspace():
file.seek(file.tell() - 1)
break
try:
guesses = puremagic.magic_stream(file)
except puremagic.main.PureError:
pass
# Guess extensions for file objects
elif isinstance(source, BufferedIOBase) or isinstance(source, TextIOBase):
guesses = puremagic.magic_stream(source)
extensions = list()
for g in guesses: