* refactor: remove unused imports * fix: replace NotImplemented with NotImplementedError * refactor: resolve E722 (do not use bare 'except') * refactor: remove unused variable * refactor: remove unused imports * refactor: ignore unused imports that will be used in the future * refactor: resolve W293 (blank line contains whitespace) * refactor: resolve F541 (f-string is missing placeholders) --------- Co-authored-by: afourney <adamfo@microsoft.com>
117 lines
3.6 KiB
Python
117 lines
3.6 KiB
Python
import zipfile
|
|
import io
|
|
import os
|
|
|
|
from typing import BinaryIO, Any, TYPE_CHECKING
|
|
|
|
from .._base_converter import DocumentConverter, DocumentConverterResult
|
|
from .._stream_info import StreamInfo
|
|
from .._exceptions import UnsupportedFormatException, FileConversionException
|
|
|
|
# Break otherwise circular import for type hinting
|
|
if TYPE_CHECKING:
|
|
from .._markitdown import MarkItDown
|
|
|
|
ACCEPTED_MIME_TYPE_PREFIXES = [
|
|
"application/zip",
|
|
]
|
|
|
|
ACCEPTED_FILE_EXTENSIONS = [".zip"]
|
|
|
|
|
|
class ZipConverter(DocumentConverter):
|
|
"""Converts ZIP files to markdown by extracting and converting all contained files.
|
|
|
|
The converter extracts the ZIP contents to a temporary directory, processes each file
|
|
using appropriate converters based on file extensions, and then combines the results
|
|
into a single markdown document. The temporary directory is cleaned up after processing.
|
|
|
|
Example output format:
|
|
```markdown
|
|
Content from the zip file `example.zip`:
|
|
|
|
## File: docs/readme.txt
|
|
|
|
This is the content of readme.txt
|
|
Multiple lines are preserved
|
|
|
|
## File: images/example.jpg
|
|
|
|
ImageSize: 1920x1080
|
|
DateTimeOriginal: 2024-02-15 14:30:00
|
|
Description: A beautiful landscape photo
|
|
|
|
## File: data/report.xlsx
|
|
|
|
## Sheet1
|
|
| Column1 | Column2 | Column3 |
|
|
|---------|---------|---------|
|
|
| data1 | data2 | data3 |
|
|
| data4 | data5 | data6 |
|
|
```
|
|
|
|
Key features:
|
|
- Maintains original file structure in headings
|
|
- Processes nested files recursively
|
|
- Uses appropriate converters for each file type
|
|
- Preserves formatting of converted content
|
|
- Cleans up temporary files after processing
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
markitdown: "MarkItDown",
|
|
):
|
|
super().__init__()
|
|
self._markitdown = markitdown
|
|
|
|
def accepts(
|
|
self,
|
|
file_stream: BinaryIO,
|
|
stream_info: StreamInfo,
|
|
**kwargs: Any, # Options to pass to the converter
|
|
) -> bool:
|
|
mimetype = (stream_info.mimetype or "").lower()
|
|
extension = (stream_info.extension or "").lower()
|
|
|
|
if extension in ACCEPTED_FILE_EXTENSIONS:
|
|
return True
|
|
|
|
for prefix in ACCEPTED_MIME_TYPE_PREFIXES:
|
|
if mimetype.startswith(prefix):
|
|
return True
|
|
|
|
return False
|
|
|
|
def convert(
|
|
self,
|
|
file_stream: BinaryIO,
|
|
stream_info: StreamInfo,
|
|
**kwargs: Any, # Options to pass to the converter
|
|
) -> DocumentConverterResult:
|
|
file_path = stream_info.url or stream_info.local_path or stream_info.filename
|
|
md_content = f"Content from the zip file `{file_path}`:\n\n"
|
|
|
|
with zipfile.ZipFile(file_stream, "r") as zipObj:
|
|
for name in zipObj.namelist():
|
|
try:
|
|
z_file_stream = io.BytesIO(zipObj.read(name))
|
|
z_file_stream_info = StreamInfo(
|
|
extension=os.path.splitext(name)[1],
|
|
filename=os.path.basename(name),
|
|
)
|
|
result = self._markitdown.convert_stream(
|
|
stream=z_file_stream,
|
|
stream_info=z_file_stream_info,
|
|
)
|
|
if result is not None:
|
|
md_content += f"## File: {name}\n\n"
|
|
md_content += result.markdown + "\n\n"
|
|
except UnsupportedFormatException:
|
|
pass
|
|
except FileConversionException:
|
|
pass
|
|
|
|
return DocumentConverterResult(markdown=md_content.strip())
|