1 Commits

Author SHA1 Message Date
Adam Fourney
f17bc21c9d If files use zip packaging, be smarter about inspecting their types. 2025-03-07 23:06:56 -08:00
58 changed files with 851 additions and 3315 deletions

1
.gitignore vendored
View File

@@ -164,4 +164,3 @@ cython_debug/
#.idea/
src/.DS_Store
.DS_Store
.cursorrules

View File

@@ -4,13 +4,9 @@
![PyPI - Downloads](https://img.shields.io/pypi/dd/markitdown)
[![Built by AutoGen Team](https://img.shields.io/badge/Built%20by-AutoGen%20Team-blue)](https://github.com/microsoft/autogen)
> [!TIP]
> MarkItDown now offers an MCP (Model Context Protocol) server for integration with LLM applications like Claude Desktop. See [markitdown-mcp](https://github.com/microsoft/markitdown/tree/main/packages/markitdown-mcp) for more information.
> [!IMPORTANT]
> Breaking changes between 0.0.1 to 0.1.0:
> * Dependencies are now organized into optional feature-groups (further details below). Use `pip install 'markitdown[all]'` to have backward-compatible behavior.
> * convert\_stream() now requires a binary file-like object (e.g., a file opened in binary mode, or an io.BytesIO object). This is a breaking change from the previous version, where it previously also accepted text file-like objects, like io.StringIO.
> * Dependencies are now organized into optional feature-groups (further details below). Use `pip install 'markitdown[all]~=0.1.0a1'` to have backward-compatible behavior.
> * The DocumentConverter class interface has changed to read from file-like streams rather than file paths. *No temporary files are created anymore*. If you are the maintainer of a plugin, or custom DocumentConverter, you likely need to update your code. Otherwise, if only using the MarkItDown class or CLI (as in these examples), you should not need to change anything.
MarkItDown is a lightweight Python utility for converting various files to Markdown for use with LLMs and related text analysis pipelines. To this end, it is most comparable to [textract](https://github.com/deanmalmgren/textract), but with a focus on preserving important document structure and content as Markdown (including: headings, lists, tables, links, etc.) While the output is often reasonably presentable and human-friendly, it is meant to be consumed by text analysis tools -- and may not be the best option for high-fidelity document conversions for human consumption.
@@ -18,7 +14,7 @@ MarkItDown is a lightweight Python utility for converting various files to Markd
At present, MarkItDown supports:
- PDF
- PowerPoint
- PowerPoint (reading in top-to-bottom, left-to-right order)
- Word
- Excel
- Images (EXIF metadata and OCR)
@@ -27,7 +23,6 @@ At present, MarkItDown supports:
- Text-based formats (CSV, JSON, XML)
- ZIP files (iterates over contents)
- Youtube URLs
- EPubs
- ... and more!
## Why Markdown?
@@ -39,39 +34,14 @@ responses unprompted. This suggests that they have been trained on vast amounts
Markdown-formatted text, and understand it well. As a side benefit, Markdown conventions
are also highly token-efficient.
## Prerequisites
MarkItDown requires Python 3.10 or higher. It is recommended to use a virtual environment to avoid dependency conflicts.
With the standard Python installation, you can create and activate a virtual environment using the following commands:
```bash
python -m venv .venv
source .venv/bin/activate
```
If using `uv`, you can create a virtual environment with:
```bash
uv venv --python=3.12 .venv
source .venv/bin/activate
# NOTE: Be sure to use 'uv pip install' rather than just 'pip install' to install packages in this virtual environment
```
If you are using Anaconda, you can create a virtual environment with:
```bash
conda create -n markitdown python=3.12
conda activate markitdown
```
## Installation
To install MarkItDown, use pip: `pip install 'markitdown[all]'`. Alternatively, you can install it from the source:
To install MarkItDown, use pip: `pip install 'markitdown[all]~=0.1.0a1'`. Alternatively, you can install it from the source:
```bash
git clone git@github.com:microsoft/markitdown.git
cd markitdown
pip install -e 'packages/markitdown[all]'
pip install -e packages/markitdown[all]
```
## Usage
@@ -98,7 +68,7 @@ cat path-to-file.pdf | markitdown
MarkItDown has optional dependencies for activating various file formats. Earlier in this document, we installed all optional dependencies with the `[all]` option. However, you can also install them individually for more control. For example:
```bash
pip install 'markitdown[pdf, docx, pptx]'
pip install markitdown[pdf, docx, pptx]
```
will install only the dependencies for PDF, DOCX, and PPTX files.

View File

@@ -1,26 +0,0 @@
FROM python:3.13-slim-bullseye
ENV DEBIAN_FRONTEND=noninteractive
ENV EXIFTOOL_PATH=/usr/bin/exiftool
ENV FFMPEG_PATH=/usr/bin/ffmpeg
# Runtime dependency
RUN apt-get update && apt-get install -y --no-install-recommends \
ffmpeg \
exiftool
# Cleanup
RUN rm -rf /var/lib/apt/lists/*
COPY . /app
RUN pip --no-cache-dir install /app
WORKDIR /workdir
# Default USERID and GROUPID
ARG USERID=nobody
ARG GROUPID=nogroup
USER $USERID:$GROUPID
ENTRYPOINT [ "markitdown-mcp" ]

View File

@@ -1,139 +0,0 @@
# MarkItDown-MCP
[![PyPI](https://img.shields.io/pypi/v/markitdown-mcp.svg)](https://pypi.org/project/markitdown-mcp/)
![PyPI - Downloads](https://img.shields.io/pypi/dd/markitdown-mcp)
[![Built by AutoGen Team](https://img.shields.io/badge/Built%20by-AutoGen%20Team-blue)](https://github.com/microsoft/autogen)
The `markitdown-mcp` package provides a lightweight STDIO, SSE and Streamable HTTP MCP server for calling MarkItDown.
It exposes one tool: `convert_to_markdown(uri)`, where uri can be any `http:`, `https:`, `file:`, or `data:` URI.
## Installation
To install the package, use pip:
```bash
pip install markitdown-mcp
```
## Usage
To run the MCP server, using STDIO (default) use the following command:
```bash
markitdown-mcp
```
To run the MCP server, using SSE or Streamable HTTP use the following command:
```bash
markitdown-mcp --sse --host 127.0.0.1 --port 3001
```
## Running in Docker
To run `markitdown-mcp` in Docker, build the Docker image using the provided Dockerfile:
```bash
docker build -t markitdown-mcp:latest .
```
And run it using:
```bash
docker run -it --rm markitdown-mcp:latest
```
This will be sufficient for remote URIs. To access local files, you need to mount the local directory into the container. For example, if you want to access files in `/home/user/data`, you can run:
```bash
docker run -it --rm -v /home/user/data:/workdir markitdown-mcp:latest
```
Once mounted, all files under data will be accessible under `/workdir` in the container. For example, if you have a file `example.txt` in `/home/user/data`, it will be accessible in the container at `/workdir/example.txt`.
## Accessing from Claude Desktop
It is recommended to use the Docker image when running the MCP server for Claude Desktop.
Follow [these instrutions](https://modelcontextprotocol.io/quickstart/user#for-claude-desktop-users) to access Claude's `claude_desktop_config.json` file.
Edit it to include the following JSON entry:
```json
{
"mcpServers": {
"markitdown": {
"command": "docker",
"args": [
"run",
"--rm",
"-i",
"markitdown-mcp:latest"
]
}
}
}
```
If you want to mount a directory, adjust it accordingly:
```json
{
"mcpServers": {
"markitdown": {
"command": "docker",
"args": [
"run",
"--rm",
"-i",
"-v",
"/home/user/data:/workdir",
"markitdown-mcp:latest"
]
}
}
}
```
## Debugging
To debug the MCP server you can use the `mcpinspector` tool.
```bash
npx @modelcontextprotocol/inspector
```
You can then connect to the insepctor through the specified host and port (e.g., `http://localhost:5173/`).
If using STDIO:
* select `STDIO` as the transport type,
* input `markitdown-mcp` as the command, and
* click `Connect`
If using SSE:
* select `SSE` as the transport type,
* input `http://127.0.0.1:3001/sse` as the URL, and
* click `Connect`
If using Streamable HTTP:
* select `Streamable HTTP` as the transport type,
* input `http://127.0.0.1:3001/mcp` as the URL, and
* click `Connect`
Finally:
* click the `Tools` tab,
* click `List Tools`,
* click `convert_to_markdown`, and
* run the tool on any valid URI.
## Security Considerations
The server does not support authentication, and runs with the privileges if the user running it. For this reason, when running in SSE or Streamable HTTP mode, it is recommended to run the server bound to `localhost` (default).
## Trademarks
This project may contain trademarks or logos for projects, products, or services. Authorized use of Microsoft
trademarks or logos is subject to and must follow
[Microsoft's Trademark & Brand Guidelines](https://www.microsoft.com/en-us/legal/intellectualproperty/trademarks/usage/general).
Use of Microsoft trademarks or logos in modified versions of this project must not cause confusion or imply Microsoft sponsorship.
Any use of third-party trademarks or logos are subject to those third-party's policies.

View File

@@ -1,69 +0,0 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "markitdown-mcp"
dynamic = ["version"]
description = 'An MCP server for the "markitdown" library.'
readme = "README.md"
requires-python = ">=3.10"
license = "MIT"
keywords = []
authors = [
{ name = "Adam Fourney", email = "adamfo@microsoft.com" },
]
classifiers = [
"Development Status :: 4 - Beta",
"Programming Language :: Python",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = [
"mcp~=1.8.0",
"markitdown[all]>=0.1.1,<0.2.0",
]
[project.urls]
Documentation = "https://github.com/microsoft/markitdown#readme"
Issues = "https://github.com/microsoft/markitdown/issues"
Source = "https://github.com/microsoft/markitdown"
[tool.hatch.version]
path = "src/markitdown_mcp/__about__.py"
[project.scripts]
markitdown-mcp = "markitdown_mcp.__main__:main"
[tool.hatch.envs.types]
extra-dependencies = [
"mypy>=1.0.0",
]
[tool.hatch.envs.types.scripts]
check = "mypy --install-types --non-interactive {args:src/markitdown_mcp tests}"
[tool.coverage.run]
source_pkgs = ["markitdown-mcp", "tests"]
branch = true
parallel = true
omit = [
"src/markitdown_mcp/__about__.py",
]
[tool.coverage.paths]
markitdown-mcp = ["src/markitdown_mcp", "*/markitdown-mcp/src/markitdown_mcp"]
tests = ["tests", "*/markitdown-mcp/tests"]
[tool.coverage.report]
exclude_lines = [
"no cov",
"if __name__ == .__main__.:",
"if TYPE_CHECKING:",
]
[tool.hatch.build.targets.sdist]
only-include = ["src/markitdown_mcp"]

View File

@@ -1,4 +0,0 @@
# SPDX-FileCopyrightText: 2024-present Adam Fourney <adamfo@microsoft.com>
#
# SPDX-License-Identifier: MIT
__version__ = "0.0.1a4"

View File

@@ -1,9 +0,0 @@
# SPDX-FileCopyrightText: 2024-present Adam Fourney <adamfo@microsoft.com>
#
# SPDX-License-Identifier: MIT
from .__about__ import __version__
__all__ = [
"__version__",
]

View File

@@ -1,109 +0,0 @@
import contextlib
import sys
from collections.abc import AsyncIterator
from mcp.server.fastmcp import FastMCP
from starlette.applications import Starlette
from mcp.server.sse import SseServerTransport
from starlette.requests import Request
from starlette.routing import Mount, Route
from starlette.types import Receive, Scope, Send
from mcp.server import Server
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
from markitdown import MarkItDown
import uvicorn
# Initialize FastMCP server for MarkItDown (SSE)
mcp = FastMCP("markitdown")
@mcp.tool()
async def convert_to_markdown(uri: str) -> str:
"""Convert a resource described by an http:, https:, file: or data: URI to markdown"""
return MarkItDown().convert_uri(uri).markdown
def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette:
sse = SseServerTransport("/messages/")
session_manager = StreamableHTTPSessionManager(
app=mcp_server,
event_store=None,
json_response=True,
stateless=True,
)
async def handle_sse(request: Request) -> None:
async with sse.connect_sse(
request.scope,
request.receive,
request._send,
) as (read_stream, write_stream):
await mcp_server.run(
read_stream,
write_stream,
mcp_server.create_initialization_options(),
)
async def handle_streamable_http(
scope: Scope, receive: Receive, send: Send
) -> None:
await session_manager.handle_request(scope, receive, send)
@contextlib.asynccontextmanager
async def lifespan(app: Starlette) -> AsyncIterator[None]:
"""Context manager for session manager."""
async with session_manager.run():
print("Application started with StreamableHTTP session manager!")
try:
yield
finally:
print("Application shutting down...")
return Starlette(
debug=debug,
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/mcp", app=handle_streamable_http),
Mount("/messages/", app=sse.handle_post_message),
],
lifespan=lifespan,
)
# Main entry point
def main():
import argparse
mcp_server = mcp._mcp_server
parser = argparse.ArgumentParser(description="Run MCP SSE-based MarkItDown server")
parser.add_argument(
"--sse",
action="store_true",
help="Run the server with SSE transport rather than STDIO (default: False)",
)
parser.add_argument(
"--host", default=None, help="Host to bind to (default: 127.0.0.1)"
)
parser.add_argument(
"--port", type=int, default=None, help="Port to listen on (default: 3001)"
)
args = parser.parse_args()
if not args.sse and (args.host or args.port):
parser.error("Host and port arguments are only valid when using SSE transport.")
sys.exit(1)
if args.sse:
starlette_app = create_starlette_app(mcp_server, debug=True)
uvicorn.run(
starlette_app,
host=args.host if args.host else "127.0.0.1",
port=args.port if args.port else 3001,
)
else:
mcp.run()
if __name__ == "__main__":
main()

View File

@@ -1,3 +0,0 @@
# SPDX-FileCopyrightText: 2024-present Adam Fourney <adamfo@microsoft.com>
#
# SPDX-License-Identifier: MIT

View File

@@ -1,7 +1,7 @@
# MarkItDown Sample Plugin
[![PyPI](https://img.shields.io/pypi/v/markitdown-sample-plugin.svg)](https://pypi.org/project/markitdown-sample-plugin/)
![PyPI - Downloads](https://img.shields.io/pypi/dd/markitdown-sample-plugin)
[![PyPI](https://img.shields.io/pypi/v/markitdown.svg)](https://pypi.org/project/markitdown/)
![PyPI - Downloads](https://img.shields.io/pypi/dd/markitdown)
[![Built by AutoGen Team](https://img.shields.io/badge/Built%20by-AutoGen%20Team-blue)](https://github.com/microsoft/autogen)

View File

@@ -1,5 +1,6 @@
#!/usr/bin/env python3 -m pytest
import os
import pytest
from markitdown import MarkItDown, StreamInfo
from markitdown_sample_plugin import RtfConverter

View File

@@ -1,232 +0,0 @@
# THIRD-PARTY SOFTWARE NOTICES AND INFORMATION
**Do Not Translate or Localize**
This project incorporates components from the projects listed below. The original copyright notices and the licenses
under which MarkItDown received such components are set forth below. MarkItDown reserves all rights not expressly
granted herein, whether by implication, estoppel or otherwise.
1.dwml (https://github.com/xiilei/dwml)
dwml NOTICES AND INFORMATION BEGIN HERE
-----------------------------------------
NOTE 1: What follows is a verbatim copy of dwml's LICENSE file, as it appeared on March 28th, 2025 - including
placeholders for the copyright owner and year.
NOTE 2: The Apache License, Version 2.0, requires that modifications to the dwml source code be documented.
The following section summarizes these changes. The full details are available in the MarkItDown source code
repository under PR #1160 (https://github.com/microsoft/markitdown/pull/1160)
This project incorporates `dwml/latex_dict.py` and `dwml/omml.py` files without any additional logic modifications (which
lives in `packages/markitdown/src/markitdown/converter_utils/docx/math` location). However, we have reformatted the code
according to `black` code formatter. From `tests/docx.py` file, we have used `DOCXML_ROOT` XML namespaces and the rest of
the file is not used.
-----------------------------------------
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright {yyyy} {name of copyright owner}
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-----------------------------------------
END OF dwml NOTICES AND INFORMATION

View File

@@ -27,9 +27,9 @@ dependencies = [
"beautifulsoup4",
"requests",
"markdownify",
"magika~=0.6.1",
"puremagic",
"pathvalidate",
"charset-normalizer",
"defusedxml",
]
[project.optional-dependencies]
@@ -39,17 +39,16 @@ all = [
"pandas",
"openpyxl",
"xlrd",
"lxml",
"pdfminer.six",
"olefile",
"pydub",
"SpeechRecognition",
"youtube-transcript-api~=1.0.0",
"youtube-transcript-api",
"azure-ai-documentintelligence",
"azure-identity"
]
pptx = ["python-pptx"]
docx = ["mammoth", "lxml"]
docx = ["mammoth"]
xlsx = ["pandas", "openpyxl"]
xls = ["pandas", "xlrd"]
pdf = ["pdfminer.six"]

View File

@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2024-present Adam Fourney <adamfo@microsoft.com>
#
# SPDX-License-Identifier: MIT
__version__ = "0.1.2a1"
__version__ = "0.1.0a2"

View File

@@ -3,11 +3,10 @@
# SPDX-License-Identifier: MIT
import argparse
import sys
import codecs
from textwrap import dedent
from importlib.metadata import entry_points
from .__about__ import __version__
from ._markitdown import MarkItDown, StreamInfo, DocumentConverterResult
from ._markitdown import MarkItDown, DocumentConverterResult
def main():
@@ -33,13 +32,13 @@ def main():
OR
markitdown < example.pdf
OR to save to a file use
markitdown example.pdf -o example.md
OR
markitdown example.pdf > example.md
"""
).strip(),
@@ -59,24 +58,6 @@ def main():
help="Output file name. If not provided, output is written to stdout.",
)
parser.add_argument(
"-x",
"--extension",
help="Provide a hint about the file extension (e.g., when reading from stdin).",
)
parser.add_argument(
"-m",
"--mime-type",
help="Provide a hint about the file's MIME type.",
)
parser.add_argument(
"-c",
"--charset",
help="Provide a hint about the file's charset (e.g, UTF-8).",
)
parser.add_argument(
"-d",
"--use-docintel",
@@ -104,57 +85,9 @@ def main():
help="List installed 3rd-party plugins. Plugins are loaded when using the -p or --use-plugin option.",
)
parser.add_argument(
"--keep-data-uris",
action="store_true",
help="Keep data URIs (like base64-encoded images) in the output. By default, data URIs are truncated.",
)
parser.add_argument("filename", nargs="?")
args = parser.parse_args()
# Parse the extension hint
extension_hint = args.extension
if extension_hint is not None:
extension_hint = extension_hint.strip().lower()
if len(extension_hint) > 0:
if not extension_hint.startswith("."):
extension_hint = "." + extension_hint
else:
extension_hint = None
# Parse the mime type
mime_type_hint = args.mime_type
if mime_type_hint is not None:
mime_type_hint = mime_type_hint.strip()
if len(mime_type_hint) > 0:
if mime_type_hint.count("/") != 1:
_exit_with_error(f"Invalid MIME type: {mime_type_hint}")
else:
mime_type_hint = None
# Parse the charset
charset_hint = args.charset
if charset_hint is not None:
charset_hint = charset_hint.strip()
if len(charset_hint) > 0:
try:
charset_hint = codecs.lookup(charset_hint).name
except LookupError:
_exit_with_error(f"Invalid charset: {charset_hint}")
else:
charset_hint = None
stream_info = None
if (
extension_hint is not None
or mime_type_hint is not None
or charset_hint is not None
):
stream_info = StreamInfo(
extension=extension_hint, mimetype=mime_type_hint, charset=charset_hint
)
if args.list_plugins:
# List installed plugins, then exit
print("Installed MarkItDown 3rd-party Plugins:\n")
@@ -174,12 +107,11 @@ def main():
if args.use_docintel:
if args.endpoint is None:
_exit_with_error(
raise ValueError(
"Document Intelligence Endpoint is required when using Document Intelligence."
)
elif args.filename is None:
_exit_with_error("Filename is required when using Document Intelligence.")
raise ValueError("Filename is required when using Document Intelligence.")
markitdown = MarkItDown(
enable_plugins=args.use_plugins, docintel_endpoint=args.endpoint
)
@@ -187,15 +119,9 @@ def main():
markitdown = MarkItDown(enable_plugins=args.use_plugins)
if args.filename is None:
result = markitdown.convert_stream(
sys.stdin.buffer,
stream_info=stream_info,
keep_data_uris=args.keep_data_uris,
)
result = markitdown.convert_stream(sys.stdin.buffer)
else:
result = markitdown.convert(
args.filename, stream_info=stream_info, keep_data_uris=args.keep_data_uris
)
result = markitdown.convert(args.filename)
_handle_output(args, result)
@@ -204,19 +130,9 @@ def _handle_output(args, result: DocumentConverterResult):
"""Handle output to stdout or file"""
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
f.write(result.markdown)
f.write(result.text_content)
else:
# Handle stdout encoding errors more gracefully
print(
result.markdown.encode(sys.stdout.encoding, errors="replace").decode(
sys.stdout.encoding
)
)
def _exit_with_error(message: str):
print(message)
sys.exit(1)
print(result.text_content)
if __name__ == "__main__":

View File

@@ -1,4 +1,7 @@
from typing import Any, BinaryIO, Optional
import os
import tempfile
from warnings import warn
from typing import Any, Union, BinaryIO, Optional, List
from ._stream_info import StreamInfo

View File

@@ -69,7 +69,7 @@ class FileConversionException(MarkItDownException):
message = f"File conversion failed after {len(attempts)} attempts:\n"
for attempt in attempts:
if attempt.exc_info is None:
message += f" - {type(attempt.converter).__name__} provided no execution info."
message += " - {type(attempt.converter).__name__} provided no execution info."
else:
message += f" - {type(attempt.converter).__name__} threw {attempt.exc_info[0].__name__} with message: {attempt.exc_info[1]}\n"

View File

@@ -1,23 +1,25 @@
import copy
import mimetypes
import os
import re
import sys
import shutil
import tempfile
import warnings
import traceback
import io
from dataclasses import dataclass
from importlib.metadata import entry_points
from typing import Any, List, Dict, Optional, Union, BinaryIO
from typing import Any, List, Optional, Union, BinaryIO
from pathlib import Path
from urllib.parse import urlparse
from warnings import warn
import requests
import magika
import charset_normalizer
import codecs
from ._stream_info import StreamInfo
from ._uri_utils import parse_data_uri, file_uri_to_path
# File-format detection
import puremagic
import requests
from ._stream_info import StreamInfo, _guess_stream_info_from_stream
from .converters import (
PlainTextConverter,
@@ -36,9 +38,7 @@ from .converters import (
AudioConverter,
OutlookMsgConverter,
ZipConverter,
EpubConverter,
DocumentIntelligenceConverter,
CsvConverter,
)
from ._base_converter import DocumentConverter, DocumentConverterResult
@@ -110,8 +110,6 @@ class MarkItDown:
else:
self._requests_session = requests_session
self._magika = magika.Magika()
# TODO - remove these (see enable_builtins)
self._llm_client: Any = None
self._llm_model: Union[str | None] = None
@@ -158,8 +156,7 @@ class MarkItDown:
"/opt",
"/opt/bin",
"/opt/local/bin",
"/opt/homebrew/bin",
"C:\\Windows\\System32",
"/opt/homebrew/bin" "C:\\Windows\\System32",
"C:\\Program Files",
"C:\\Program Files (x86)",
]
@@ -191,29 +188,12 @@ class MarkItDown:
self.register_converter(IpynbConverter())
self.register_converter(PdfConverter())
self.register_converter(OutlookMsgConverter())
self.register_converter(EpubConverter())
self.register_converter(CsvConverter())
# Register Document Intelligence converter at the top of the stack if endpoint is provided
docintel_endpoint = kwargs.get("docintel_endpoint")
if docintel_endpoint is not None:
docintel_args: Dict[str, Any] = {}
docintel_args["endpoint"] = docintel_endpoint
docintel_credential = kwargs.get("docintel_credential")
if docintel_credential is not None:
docintel_args["credential"] = docintel_credential
docintel_types = kwargs.get("docintel_file_types")
if docintel_types is not None:
docintel_args["file_types"] = docintel_types
docintel_version = kwargs.get("docintel_api_version")
if docintel_version is not None:
docintel_args["api_version"] = docintel_version
self.register_converter(
DocumentIntelligenceConverter(**docintel_args),
DocumentIntelligenceConverter(endpoint=docintel_endpoint)
)
self._builtins_enabled = True
@@ -257,19 +237,11 @@ class MarkItDown:
# Local path or url
if isinstance(source, str):
if (
source.startswith("http:")
or source.startswith("https:")
or source.startswith("file:")
or source.startswith("data:")
source.startswith("http://")
or source.startswith("https://")
or source.startswith("file://")
):
# Rename the url argument to mock_url
# (Deprecated -- use stream_info)
_kwargs = {k: v for k, v in kwargs.items()}
if "url" in _kwargs:
_kwargs["mock_url"] = _kwargs["url"]
del _kwargs["url"]
return self.convert_uri(source, stream_info=stream_info, **_kwargs)
return self.convert_url(source, **kwargs)
else:
return self.convert_local(source, stream_info=stream_info, **kwargs)
# Path object
@@ -277,14 +249,14 @@ class MarkItDown:
return self.convert_local(source, stream_info=stream_info, **kwargs)
# Request response
elif isinstance(source, requests.Response):
return self.convert_response(source, stream_info=stream_info, **kwargs)
return self.convert_response(source, **kwargs)
# Binary stream
elif (
hasattr(source, "read")
and callable(source.read)
and not isinstance(source, io.TextIOBase)
):
return self.convert_stream(source, stream_info=stream_info, **kwargs)
return self.convert_stream(source, **kwargs)
else:
raise TypeError(
f"Invalid source type: {type(source)}. Expected str, requests.Response, BinaryIO."
@@ -303,28 +275,33 @@ class MarkItDown:
path = str(path)
# Build a base StreamInfo object from which to start guesses
base_guess = StreamInfo(
base_stream_info = StreamInfo(
local_path=path,
extension=os.path.splitext(path)[1],
filename=os.path.basename(path),
)
# Extend the base_guess with any additional info from the arguments
# Extend the base_stream_info with any additional info from the arguments
if stream_info is not None:
base_guess = base_guess.copy_and_update(stream_info)
base_stream_info = base_stream_info.copy_and_update(stream_info)
if file_extension is not None:
# Deprecated -- use stream_info
base_guess = base_guess.copy_and_update(extension=file_extension)
base_stream_info = base_stream_info.copy_and_update(
extension=file_extension
)
if url is not None:
# Deprecated -- use stream_info
base_guess = base_guess.copy_and_update(url=url)
base_stream_info = base_stream_info.copy_and_update(url=url)
with open(path, "rb") as fh:
guesses = self._get_stream_info_guesses(
file_stream=fh, base_guess=base_guess
)
# Prepare a list of configurations to try, starting with the base_stream_info
guesses: List[StreamInfo] = [base_stream_info]
for guess in _guess_stream_info_from_stream(
file_stream=fh, filename_hint=path
):
guesses.append(base_stream_info.copy_and_update(guess))
return self._convert(file_stream=fh, stream_info_guesses=guesses, **kwargs)
def convert_stream(
@@ -357,6 +334,21 @@ class MarkItDown:
assert base_guess is not None # for mypy
base_guess = base_guess.copy_and_update(url=url)
# Append the base guess, if it's non-trivial
if base_guess is not None:
if base_guess.mimetype is not None or base_guess.extension is not None:
guesses.append(base_guess)
else:
# Create a base guess with no information
base_guess = StreamInfo()
# Create a placeholder filename to help with guessing
placeholder_filename = None
if base_guess.filename is not None:
placeholder_filename = base_guess.filename
elif base_guess.extension is not None:
placeholder_filename = "placeholder" + base_guess.extension
# Check if we have a seekable stream. If not, load the entire stream into memory.
if not stream.seekable():
buffer = io.BytesIO()
@@ -369,90 +361,21 @@ class MarkItDown:
stream = buffer
# Add guesses based on stream content
guesses = self._get_stream_info_guesses(
file_stream=stream, base_guess=base_guess or StreamInfo()
)
for guess in _guess_stream_info_from_stream(
file_stream=stream, filename_hint=placeholder_filename
):
guesses.append(base_guess.copy_and_update(guess))
# Perform the conversion
return self._convert(file_stream=stream, stream_info_guesses=guesses, **kwargs)
def convert_url(
self,
url: str,
*,
stream_info: Optional[StreamInfo] = None,
file_extension: Optional[str] = None,
mock_url: Optional[str] = None,
**kwargs: Any,
) -> DocumentConverterResult:
"""Alias for convert_uri()"""
# convert_url will likely be deprecated in the future in favor of convert_uri
return self.convert_uri(
url,
stream_info=stream_info,
file_extension=file_extension,
mock_url=mock_url,
**kwargs,
)
def convert_uri(
self,
uri: str,
*,
stream_info: Optional[StreamInfo] = None,
file_extension: Optional[str] = None, # Deprecated -- use stream_info
mock_url: Optional[
str
] = None, # Mock the request as if it came from a different URL
**kwargs: Any,
) -> DocumentConverterResult:
uri = uri.strip()
# File URIs
if uri.startswith("file:"):
netloc, path = file_uri_to_path(uri)
if netloc and netloc != "localhost":
raise ValueError(
f"Unsupported file URI: {uri}. Netloc must be empty or localhost."
)
return self.convert_local(
path,
stream_info=stream_info,
file_extension=file_extension,
url=mock_url,
**kwargs,
)
# Data URIs
elif uri.startswith("data:"):
mimetype, attributes, data = parse_data_uri(uri)
base_guess = StreamInfo(
mimetype=mimetype,
charset=attributes.get("charset"),
)
if stream_info is not None:
base_guess = base_guess.copy_and_update(stream_info)
return self.convert_stream(
io.BytesIO(data),
stream_info=base_guess,
file_extension=file_extension,
url=mock_url,
**kwargs,
)
# HTTP/HTTPS URIs
elif uri.startswith("http:") or uri.startswith("https:"):
response = self._requests_session.get(uri, stream=True)
response.raise_for_status()
return self.convert_response(
response,
stream_info=stream_info,
file_extension=file_extension,
url=mock_url,
**kwargs,
)
else:
raise ValueError(
f"Unsupported URI scheme: {uri.split(':')[0]}. Supported schemes are: file:, data:, http:, https:"
)
self, url: str, **kwargs: Any
) -> DocumentConverterResult: # TODO: fix kwargs type
# Send a HTTP request to the URL
response = self._requests_session.get(url, stream=True)
response.raise_for_status()
return self.convert_response(response, **kwargs)
def convert_response(
self,
@@ -514,16 +437,31 @@ class MarkItDown:
# Deprecated -- use stream_info
base_guess = base_guess.copy_and_update(url=url)
# Add the guess if its non-trivial
guesses: List[StreamInfo] = []
if base_guess.mimetype is not None or base_guess.extension is not None:
guesses.append(base_guess)
# Read into BytesIO
buffer = io.BytesIO()
for chunk in response.iter_content(chunk_size=512):
buffer.write(chunk)
buffer.seek(0)
# Create a placeholder filename to help with guessing
placeholder_filename = None
if base_guess.filename is not None:
placeholder_filename = base_guess.filename
elif base_guess.extension is not None:
placeholder_filename = "placeholder" + base_guess.extension
# Add guesses based on stream content
for guess in _guess_stream_info_from_stream(
file_stream=buffer, filename_hint=placeholder_filename
):
guesses.append(base_guess.copy_and_update(guess))
# Convert
guesses = self._get_stream_info_guesses(
file_stream=buffer, base_guess=base_guess
)
return self._convert(file_stream=buffer, stream_info_guesses=guesses, **kwargs)
def _convert(
@@ -548,7 +486,7 @@ class MarkItDown:
# Sanity check -- make sure the cur_pos is still the same
assert (
cur_pos == file_stream.tell()
), "File stream position should NOT change between guess iterations"
), f"File stream position should NOT change between guess iterations"
_kwargs = {k: v for k, v in kwargs.items()}
@@ -615,7 +553,7 @@ class MarkItDown:
# Nothing can handle it!
raise UnsupportedFormatException(
"Could not convert stream to Markdown. No converter attempted a conversion, suggesting that the filetype is simply not supported."
f"Could not convert stream to Markdown. No converter attempted a conversion, suggesting that the filetype is simply not supported."
)
def register_page_converter(self, converter: DocumentConverter) -> None:
@@ -657,115 +595,3 @@ class MarkItDown:
self._converters.insert(
0, ConverterRegistration(converter=converter, priority=priority)
)
def _get_stream_info_guesses(
self, file_stream: BinaryIO, base_guess: StreamInfo
) -> List[StreamInfo]:
"""
Given a base guess, attempt to guess or expand on the stream info using the stream content (via magika).
"""
guesses: List[StreamInfo] = []
# Enhance the base guess with information based on the extension or mimetype
enhanced_guess = base_guess.copy_and_update()
# If there's an extension and no mimetype, try to guess the mimetype
if base_guess.mimetype is None and base_guess.extension is not None:
_m, _ = mimetypes.guess_type(
"placeholder" + base_guess.extension, strict=False
)
if _m is not None:
enhanced_guess = enhanced_guess.copy_and_update(mimetype=_m)
# If there's a mimetype and no extension, try to guess the extension
if base_guess.mimetype is not None and base_guess.extension is None:
_e = mimetypes.guess_all_extensions(base_guess.mimetype, strict=False)
if len(_e) > 0:
enhanced_guess = enhanced_guess.copy_and_update(extension=_e[0])
# Call magika to guess from the stream
cur_pos = file_stream.tell()
try:
result = self._magika.identify_stream(file_stream)
if result.status == "ok" and result.prediction.output.label != "unknown":
# If it's text, also guess the charset
charset = None
if result.prediction.output.is_text:
# Read the first 4k to guess the charset
file_stream.seek(cur_pos)
stream_page = file_stream.read(4096)
charset_result = charset_normalizer.from_bytes(stream_page).best()
if charset_result is not None:
charset = self._normalize_charset(charset_result.encoding)
# Normalize the first extension listed
guessed_extension = None
if len(result.prediction.output.extensions) > 0:
guessed_extension = "." + result.prediction.output.extensions[0]
# Determine if the guess is compatible with the base guess
compatible = True
if (
base_guess.mimetype is not None
and base_guess.mimetype != result.prediction.output.mime_type
):
compatible = False
if (
base_guess.extension is not None
and base_guess.extension.lstrip(".")
not in result.prediction.output.extensions
):
compatible = False
if (
base_guess.charset is not None
and self._normalize_charset(base_guess.charset) != charset
):
compatible = False
if compatible:
# Add the compatible base guess
guesses.append(
StreamInfo(
mimetype=base_guess.mimetype
or result.prediction.output.mime_type,
extension=base_guess.extension or guessed_extension,
charset=base_guess.charset or charset,
filename=base_guess.filename,
local_path=base_guess.local_path,
url=base_guess.url,
)
)
else:
# The magika guess was incompatible with the base guess, so add both guesses
guesses.append(enhanced_guess)
guesses.append(
StreamInfo(
mimetype=result.prediction.output.mime_type,
extension=guessed_extension,
charset=charset,
filename=base_guess.filename,
local_path=base_guess.local_path,
url=base_guess.url,
)
)
else:
# There were no other guesses, so just add the base guess
guesses.append(enhanced_guess)
finally:
file_stream.seek(cur_pos)
return guesses
def _normalize_charset(self, charset: str | None) -> str | None:
"""
Normalize a charset string to a canonical form.
"""
if charset is None:
return None
try:
return codecs.lookup(charset).name
except LookupError:
return charset

View File

@@ -1,5 +1,15 @@
import puremagic
import mimetypes
import zipfile
import os
from dataclasses import dataclass, asdict
from typing import Optional
from typing import Optional, BinaryIO, List, Union
# Mimetype substitutions table
MIMETYPE_SUBSTITUTIONS = {
"application/excel": "application/vnd.ms-excel",
"application/mspowerpoint": "application/vnd.ms-powerpoint",
}
@dataclass(kw_only=True, frozen=True)
@@ -30,3 +40,169 @@ class StreamInfo:
new_info.update(kwargs)
return StreamInfo(**new_info)
# Behavior subject to change.
# Do not rely on this outside of this module.
def _guess_stream_info_from_stream(
file_stream: BinaryIO,
*,
filename_hint: Optional[str] = None,
) -> List[StreamInfo]:
"""
Guess StreamInfo properties (mostly mimetype and extension) from a stream.
Args:
- stream: The stream to guess the StreamInfo from.
- filename_hint [Optional]: A filename hint to help with the guessing (may be a placeholder, and not actually be the file name)
Returns a list of StreamInfo objects in order of confidence.
"""
guesses: List[StreamInfo] = []
# Add a guess purely based on the filename hint
if filename_hint:
try:
# Requires Python 3.13+
mimetype, _ = mimetypes.guess_file_type(filename_hint) # type: ignore
except AttributeError:
mimetype, _ = mimetypes.guess_type(filename_hint)
if mimetype:
guesses.append(
StreamInfo(
mimetype=mimetype, extension=os.path.splitext(filename_hint)[1]
)
)
# If it looks like a zip use _guess_stream_info_from_zip rather than puremagic
cur_pos = file_stream.tell()
try:
header = file_stream.read(4)
file_stream.seek(cur_pos)
if header == b"PK\x03\x04":
zip_guess = _guess_stream_info_from_zip(file_stream)
if zip_guess:
guesses.append(zip_guess)
return guesses
finally:
file_stream.seek(cur_pos)
# Fall back to using puremagic
def _puremagic(
file_stream, filename_hint
) -> List[puremagic.main.PureMagicWithConfidence]:
"""Wrap guesses to handle exceptions."""
try:
return puremagic.magic_stream(file_stream, filename=filename_hint)
except puremagic.main.PureError as e:
return []
cur_pos = file_stream.tell()
type_guesses = _puremagic(file_stream, filename_hint=filename_hint)
if len(type_guesses) == 0:
# Fix for: https://github.com/microsoft/markitdown/issues/222
# If there are no guesses, then try again after trimming leading ASCII whitespaces.
# ASCII whitespace characters are those byte values in the sequence b' \t\n\r\x0b\f'
# (space, tab, newline, carriage return, vertical tab, form feed).
# Eat all the leading whitespace
file_stream.seek(cur_pos)
while True:
char = file_stream.read(1)
if not char: # End of file
break
if not char.isspace():
file_stream.seek(file_stream.tell() - 1)
break
# Try again
type_guesses = _puremagic(file_stream, filename_hint=filename_hint)
file_stream.seek(cur_pos)
# Convert and return the guesses
for guess in type_guesses:
kwargs: dict[str, str] = {}
if guess.extension:
kwargs["extension"] = guess.extension
if guess.mime_type:
kwargs["mimetype"] = MIMETYPE_SUBSTITUTIONS.get(
guess.mime_type, guess.mime_type
)
if len(kwargs) > 0:
# We don't add the filename_hint, because sometimes it's just a placeholder,
# and, in any case, doesn't add new information.
guesses.append(StreamInfo(**kwargs))
return guesses
def _guess_stream_info_from_zip(file_stream: BinaryIO) -> Union[None, StreamInfo]:
"""
Guess StreamInfo properties (mostly mimetype and extension) from a zip stream.
Args:
- stream: The stream to guess the StreamInfo from.
Returns the single best guess, or None if no guess could be made.
"""
cur_pos = file_stream.tell()
try:
with zipfile.ZipFile(file_stream) as z:
table_of_contents = z.namelist()
# OpenPackageFormat (OPF) file
if "[Content_Types].xml" in table_of_contents:
# Word file
if "word/document.xml" in table_of_contents:
return StreamInfo(
mimetype="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
extension=".docx",
)
# Excel file
if "xl/workbook.xml" in table_of_contents:
return StreamInfo(
mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
extension=".xlsx",
)
# PowerPoint file
if "ppt/presentation.xml" in table_of_contents:
return StreamInfo(
mimetype="application/vnd.openxmlformats-officedocument.presentationml.presentation",
extension=".pptx",
)
# Visio file
if "visio/document.xml" in table_of_contents:
return StreamInfo(
mimetype="application/vnd.ms-visio.drawing",
extension=".vsd",
)
# XPS file
if "FixedDocSeq.fdseq" in table_of_contents:
return StreamInfo(
mimetype="application/vnd.ms-xpsdocument",
extension=".xps",
)
# EPUB, or similar
if "mimetype" in table_of_contents:
_mimetype = z.read("mimetype").decode("ascii").strip()
_extension = mimetypes.guess_extension(_mimetype)
return StreamInfo(mimetype=_mimetype, extension=_extension)
# JAR
if "META-INF/MANIFEST.MF" in table_of_contents:
return StreamInfo(mimetype="application/java-archive", extension=".jar")
# If we made it this far, we couldn't identify the file
return StreamInfo(mimetype="application/zip", extension=".zip")
except zipfile.BadZipFile:
return None
finally:
file_stream.seek(cur_pos)

View File

@@ -1,52 +0,0 @@
import base64
import os
from typing import Tuple, Dict
from urllib.request import url2pathname
from urllib.parse import urlparse, unquote_to_bytes
def file_uri_to_path(file_uri: str) -> Tuple[str | None, str]:
"""Convert a file URI to a local file path"""
parsed = urlparse(file_uri)
if parsed.scheme != "file":
raise ValueError(f"Not a file URL: {file_uri}")
netloc = parsed.netloc if parsed.netloc else None
path = os.path.abspath(url2pathname(parsed.path))
return netloc, path
def parse_data_uri(uri: str) -> Tuple[str | None, Dict[str, str], bytes]:
if not uri.startswith("data:"):
raise ValueError("Not a data URI")
header, _, data = uri.partition(",")
if not _:
raise ValueError("Malformed data URI, missing ',' separator")
meta = header[5:] # Strip 'data:'
parts = meta.split(";")
is_base64 = False
# Ends with base64?
if parts[-1] == "base64":
parts.pop()
is_base64 = True
mime_type = None # Normally this would default to text/plain but we won't assume
if len(parts) and len(parts[0]) > 0:
# First part is the mime type
mime_type = parts.pop(0)
attributes: Dict[str, str] = {}
for part in parts:
# Handle key=value pairs in the middle
if "=" in part:
key, value = part.split("=", 1)
attributes[key] = value
elif len(part) > 0:
attributes[part] = ""
content = base64.b64decode(data) if is_base64 else unquote_to_bytes(data)
return mime_type, attributes, content

View File

@@ -1,273 +0,0 @@
# -*- coding: utf-8 -*-
"""
Adapted from https://github.com/xiilei/dwml/blob/master/dwml/latex_dict.py
On 25/03/2025
"""
from __future__ import unicode_literals
CHARS = ("{", "}", "_", "^", "#", "&", "$", "%", "~")
BLANK = ""
BACKSLASH = "\\"
ALN = "&"
CHR = {
# Unicode : Latex Math Symbols
# Top accents
"\u0300": "\\grave{{{0}}}",
"\u0301": "\\acute{{{0}}}",
"\u0302": "\\hat{{{0}}}",
"\u0303": "\\tilde{{{0}}}",
"\u0304": "\\bar{{{0}}}",
"\u0305": "\\overbar{{{0}}}",
"\u0306": "\\breve{{{0}}}",
"\u0307": "\\dot{{{0}}}",
"\u0308": "\\ddot{{{0}}}",
"\u0309": "\\ovhook{{{0}}}",
"\u030a": "\\ocirc{{{0}}}}",
"\u030c": "\\check{{{0}}}}",
"\u0310": "\\candra{{{0}}}",
"\u0312": "\\oturnedcomma{{{0}}}",
"\u0315": "\\ocommatopright{{{0}}}",
"\u031a": "\\droang{{{0}}}",
"\u0338": "\\not{{{0}}}",
"\u20d0": "\\leftharpoonaccent{{{0}}}",
"\u20d1": "\\rightharpoonaccent{{{0}}}",
"\u20d2": "\\vertoverlay{{{0}}}",
"\u20d6": "\\overleftarrow{{{0}}}",
"\u20d7": "\\vec{{{0}}}",
"\u20db": "\\dddot{{{0}}}",
"\u20dc": "\\ddddot{{{0}}}",
"\u20e1": "\\overleftrightarrow{{{0}}}",
"\u20e7": "\\annuity{{{0}}}",
"\u20e9": "\\widebridgeabove{{{0}}}",
"\u20f0": "\\asteraccent{{{0}}}",
# Bottom accents
"\u0330": "\\wideutilde{{{0}}}",
"\u0331": "\\underbar{{{0}}}",
"\u20e8": "\\threeunderdot{{{0}}}",
"\u20ec": "\\underrightharpoondown{{{0}}}",
"\u20ed": "\\underleftharpoondown{{{0}}}",
"\u20ee": "\\underledtarrow{{{0}}}",
"\u20ef": "\\underrightarrow{{{0}}}",
# Over | group
"\u23b4": "\\overbracket{{{0}}}",
"\u23dc": "\\overparen{{{0}}}",
"\u23de": "\\overbrace{{{0}}}",
# Under| group
"\u23b5": "\\underbracket{{{0}}}",
"\u23dd": "\\underparen{{{0}}}",
"\u23df": "\\underbrace{{{0}}}",
}
CHR_BO = {
# Big operators,
"\u2140": "\\Bbbsum",
"\u220f": "\\prod",
"\u2210": "\\coprod",
"\u2211": "\\sum",
"\u222b": "\\int",
"\u22c0": "\\bigwedge",
"\u22c1": "\\bigvee",
"\u22c2": "\\bigcap",
"\u22c3": "\\bigcup",
"\u2a00": "\\bigodot",
"\u2a01": "\\bigoplus",
"\u2a02": "\\bigotimes",
}
T = {
"\u2192": "\\rightarrow ",
# Greek letters
"\U0001d6fc": "\\alpha ",
"\U0001d6fd": "\\beta ",
"\U0001d6fe": "\\gamma ",
"\U0001d6ff": "\\theta ",
"\U0001d700": "\\epsilon ",
"\U0001d701": "\\zeta ",
"\U0001d702": "\\eta ",
"\U0001d703": "\\theta ",
"\U0001d704": "\\iota ",
"\U0001d705": "\\kappa ",
"\U0001d706": "\\lambda ",
"\U0001d707": "\\m ",
"\U0001d708": "\\n ",
"\U0001d709": "\\xi ",
"\U0001d70a": "\\omicron ",
"\U0001d70b": "\\pi ",
"\U0001d70c": "\\rho ",
"\U0001d70d": "\\varsigma ",
"\U0001d70e": "\\sigma ",
"\U0001d70f": "\\ta ",
"\U0001d710": "\\upsilon ",
"\U0001d711": "\\phi ",
"\U0001d712": "\\chi ",
"\U0001d713": "\\psi ",
"\U0001d714": "\\omega ",
"\U0001d715": "\\partial ",
"\U0001d716": "\\varepsilon ",
"\U0001d717": "\\vartheta ",
"\U0001d718": "\\varkappa ",
"\U0001d719": "\\varphi ",
"\U0001d71a": "\\varrho ",
"\U0001d71b": "\\varpi ",
# Relation symbols
"\u2190": "\\leftarrow ",
"\u2191": "\\uparrow ",
"\u2192": "\\rightarrow ",
"\u2193": "\\downright ",
"\u2194": "\\leftrightarrow ",
"\u2195": "\\updownarrow ",
"\u2196": "\\nwarrow ",
"\u2197": "\\nearrow ",
"\u2198": "\\searrow ",
"\u2199": "\\swarrow ",
"\u22ee": "\\vdots ",
"\u22ef": "\\cdots ",
"\u22f0": "\\adots ",
"\u22f1": "\\ddots ",
"\u2260": "\\ne ",
"\u2264": "\\leq ",
"\u2265": "\\geq ",
"\u2266": "\\leqq ",
"\u2267": "\\geqq ",
"\u2268": "\\lneqq ",
"\u2269": "\\gneqq ",
"\u226a": "\\ll ",
"\u226b": "\\gg ",
"\u2208": "\\in ",
"\u2209": "\\notin ",
"\u220b": "\\ni ",
"\u220c": "\\nni ",
# Ordinary symbols
"\u221e": "\\infty ",
# Binary relations
"\u00b1": "\\pm ",
"\u2213": "\\mp ",
# Italic, Latin, uppercase
"\U0001d434": "A",
"\U0001d435": "B",
"\U0001d436": "C",
"\U0001d437": "D",
"\U0001d438": "E",
"\U0001d439": "F",
"\U0001d43a": "G",
"\U0001d43b": "H",
"\U0001d43c": "I",
"\U0001d43d": "J",
"\U0001d43e": "K",
"\U0001d43f": "L",
"\U0001d440": "M",
"\U0001d441": "N",
"\U0001d442": "O",
"\U0001d443": "P",
"\U0001d444": "Q",
"\U0001d445": "R",
"\U0001d446": "S",
"\U0001d447": "T",
"\U0001d448": "U",
"\U0001d449": "V",
"\U0001d44a": "W",
"\U0001d44b": "X",
"\U0001d44c": "Y",
"\U0001d44d": "Z",
# Italic, Latin, lowercase
"\U0001d44e": "a",
"\U0001d44f": "b",
"\U0001d450": "c",
"\U0001d451": "d",
"\U0001d452": "e",
"\U0001d453": "f",
"\U0001d454": "g",
"\U0001d456": "i",
"\U0001d457": "j",
"\U0001d458": "k",
"\U0001d459": "l",
"\U0001d45a": "m",
"\U0001d45b": "n",
"\U0001d45c": "o",
"\U0001d45d": "p",
"\U0001d45e": "q",
"\U0001d45f": "r",
"\U0001d460": "s",
"\U0001d461": "t",
"\U0001d462": "u",
"\U0001d463": "v",
"\U0001d464": "w",
"\U0001d465": "x",
"\U0001d466": "y",
"\U0001d467": "z",
}
FUNC = {
"sin": "\\sin({fe})",
"cos": "\\cos({fe})",
"tan": "\\tan({fe})",
"arcsin": "\\arcsin({fe})",
"arccos": "\\arccos({fe})",
"arctan": "\\arctan({fe})",
"arccot": "\\arccot({fe})",
"sinh": "\\sinh({fe})",
"cosh": "\\cosh({fe})",
"tanh": "\\tanh({fe})",
"coth": "\\coth({fe})",
"sec": "\\sec({fe})",
"csc": "\\csc({fe})",
}
FUNC_PLACE = "{fe}"
BRK = "\\\\"
CHR_DEFAULT = {
"ACC_VAL": "\\hat{{{0}}}",
}
POS = {
"top": "\\overline{{{0}}}", # not sure
"bot": "\\underline{{{0}}}",
}
POS_DEFAULT = {
"BAR_VAL": "\\overline{{{0}}}",
}
SUB = "_{{{0}}}"
SUP = "^{{{0}}}"
F = {
"bar": "\\frac{{{num}}}{{{den}}}",
"skw": r"^{{{num}}}/_{{{den}}}",
"noBar": "\\genfrac{{}}{{}}{{0pt}}{{}}{{{num}}}{{{den}}}",
"lin": "{{{num}}}/{{{den}}}",
}
F_DEFAULT = "\\frac{{{num}}}{{{den}}}"
D = "\\left{left}{text}\\right{right}"
D_DEFAULT = {
"left": "(",
"right": ")",
"null": ".",
}
RAD = "\\sqrt[{deg}]{{{text}}}"
RAD_DEFAULT = "\\sqrt{{{text}}}"
ARR = "\\begin{{array}}{{c}}{text}\\end{{array}}"
LIM_FUNC = {
"lim": "\\lim_{{{lim}}}",
"max": "\\max_{{{lim}}}",
"min": "\\min_{{{lim}}}",
}
LIM_TO = ("\\rightarrow", "\\to")
LIM_UPP = "\\overset{{{lim}}}{{{text}}}"
M = "\\begin{{matrix}}{text}\\end{{matrix}}"

View File

@@ -1,400 +0,0 @@
# -*- coding: utf-8 -*-
"""
Office Math Markup Language (OMML)
Adapted from https://github.com/xiilei/dwml/blob/master/dwml/omml.py
On 25/03/2025
"""
from defusedxml import ElementTree as ET
from .latex_dict import (
CHARS,
CHR,
CHR_BO,
CHR_DEFAULT,
POS,
POS_DEFAULT,
SUB,
SUP,
F,
F_DEFAULT,
T,
FUNC,
D,
D_DEFAULT,
RAD,
RAD_DEFAULT,
ARR,
LIM_FUNC,
LIM_TO,
LIM_UPP,
M,
BRK,
BLANK,
BACKSLASH,
ALN,
FUNC_PLACE,
)
OMML_NS = "{http://schemas.openxmlformats.org/officeDocument/2006/math}"
def load(stream):
tree = ET.parse(stream)
for omath in tree.findall(OMML_NS + "oMath"):
yield oMath2Latex(omath)
def load_string(string):
root = ET.fromstring(string)
for omath in root.findall(OMML_NS + "oMath"):
yield oMath2Latex(omath)
def escape_latex(strs):
last = None
new_chr = []
strs = strs.replace(r"\\", "\\")
for c in strs:
if (c in CHARS) and (last != BACKSLASH):
new_chr.append(BACKSLASH + c)
else:
new_chr.append(c)
last = c
return BLANK.join(new_chr)
def get_val(key, default=None, store=CHR):
if key is not None:
return key if not store else store.get(key, key)
else:
return default
class Tag2Method(object):
def call_method(self, elm, stag=None):
getmethod = self.tag2meth.get
if stag is None:
stag = elm.tag.replace(OMML_NS, "")
method = getmethod(stag)
if method:
return method(self, elm)
else:
return None
def process_children_list(self, elm, include=None):
"""
process children of the elm,return iterable
"""
for _e in list(elm):
if OMML_NS not in _e.tag:
continue
stag = _e.tag.replace(OMML_NS, "")
if include and (stag not in include):
continue
t = self.call_method(_e, stag=stag)
if t is None:
t = self.process_unknow(_e, stag)
if t is None:
continue
yield (stag, t, _e)
def process_children_dict(self, elm, include=None):
"""
process children of the elm,return dict
"""
latex_chars = dict()
for stag, t, e in self.process_children_list(elm, include):
latex_chars[stag] = t
return latex_chars
def process_children(self, elm, include=None):
"""
process children of the elm,return string
"""
return BLANK.join(
(
t if not isinstance(t, Tag2Method) else str(t)
for stag, t, e in self.process_children_list(elm, include)
)
)
def process_unknow(self, elm, stag):
return None
class Pr(Tag2Method):
text = ""
__val_tags = ("chr", "pos", "begChr", "endChr", "type")
__innerdict = None # can't use the __dict__
""" common properties of element"""
def __init__(self, elm):
self.__innerdict = {}
self.text = self.process_children(elm)
def __str__(self):
return self.text
def __unicode__(self):
return self.__str__(self)
def __getattr__(self, name):
return self.__innerdict.get(name, None)
def do_brk(self, elm):
self.__innerdict["brk"] = BRK
return BRK
def do_common(self, elm):
stag = elm.tag.replace(OMML_NS, "")
if stag in self.__val_tags:
t = elm.get("{0}val".format(OMML_NS))
self.__innerdict[stag] = t
return None
tag2meth = {
"brk": do_brk,
"chr": do_common,
"pos": do_common,
"begChr": do_common,
"endChr": do_common,
"type": do_common,
}
class oMath2Latex(Tag2Method):
"""
Convert oMath element of omml to latex
"""
_t_dict = T
__direct_tags = ("box", "sSub", "sSup", "sSubSup", "num", "den", "deg", "e")
def __init__(self, element):
self._latex = self.process_children(element)
def __str__(self):
return self.latex
def __unicode__(self):
return self.__str__(self)
def process_unknow(self, elm, stag):
if stag in self.__direct_tags:
return self.process_children(elm)
elif stag[-2:] == "Pr":
return Pr(elm)
else:
return None
@property
def latex(self):
return self._latex
def do_acc(self, elm):
"""
the accent function
"""
c_dict = self.process_children_dict(elm)
latex_s = get_val(
c_dict["accPr"].chr, default=CHR_DEFAULT.get("ACC_VAL"), store=CHR
)
return latex_s.format(c_dict["e"])
def do_bar(self, elm):
"""
the bar function
"""
c_dict = self.process_children_dict(elm)
pr = c_dict["barPr"]
latex_s = get_val(pr.pos, default=POS_DEFAULT.get("BAR_VAL"), store=POS)
return pr.text + latex_s.format(c_dict["e"])
def do_d(self, elm):
"""
the delimiter object
"""
c_dict = self.process_children_dict(elm)
pr = c_dict["dPr"]
null = D_DEFAULT.get("null")
s_val = get_val(pr.begChr, default=D_DEFAULT.get("left"), store=T)
e_val = get_val(pr.endChr, default=D_DEFAULT.get("right"), store=T)
return pr.text + D.format(
left=null if not s_val else escape_latex(s_val),
text=c_dict["e"],
right=null if not e_val else escape_latex(e_val),
)
def do_spre(self, elm):
"""
the Pre-Sub-Superscript object -- Not support yet
"""
pass
def do_sub(self, elm):
text = self.process_children(elm)
return SUB.format(text)
def do_sup(self, elm):
text = self.process_children(elm)
return SUP.format(text)
def do_f(self, elm):
"""
the fraction object
"""
c_dict = self.process_children_dict(elm)
pr = c_dict["fPr"]
latex_s = get_val(pr.type, default=F_DEFAULT, store=F)
return pr.text + latex_s.format(num=c_dict.get("num"), den=c_dict.get("den"))
def do_func(self, elm):
"""
the Function-Apply object (Examples:sin cos)
"""
c_dict = self.process_children_dict(elm)
func_name = c_dict.get("fName")
return func_name.replace(FUNC_PLACE, c_dict.get("e"))
def do_fname(self, elm):
"""
the func name
"""
latex_chars = []
for stag, t, e in self.process_children_list(elm):
if stag == "r":
if FUNC.get(t):
latex_chars.append(FUNC[t])
else:
raise NotImplementedError("Not support func %s" % t)
else:
latex_chars.append(t)
t = BLANK.join(latex_chars)
return t if FUNC_PLACE in t else t + FUNC_PLACE # do_func will replace this
def do_groupchr(self, elm):
"""
the Group-Character object
"""
c_dict = self.process_children_dict(elm)
pr = c_dict["groupChrPr"]
latex_s = get_val(pr.chr)
return pr.text + latex_s.format(c_dict["e"])
def do_rad(self, elm):
"""
the radical object
"""
c_dict = self.process_children_dict(elm)
text = c_dict.get("e")
deg_text = c_dict.get("deg")
if deg_text:
return RAD.format(deg=deg_text, text=text)
else:
return RAD_DEFAULT.format(text=text)
def do_eqarr(self, elm):
"""
the Array object
"""
return ARR.format(
text=BRK.join(
[t for stag, t, e in self.process_children_list(elm, include=("e",))]
)
)
def do_limlow(self, elm):
"""
the Lower-Limit object
"""
t_dict = self.process_children_dict(elm, include=("e", "lim"))
latex_s = LIM_FUNC.get(t_dict["e"])
if not latex_s:
raise NotImplementedError("Not support lim %s" % t_dict["e"])
else:
return latex_s.format(lim=t_dict.get("lim"))
def do_limupp(self, elm):
"""
the Upper-Limit object
"""
t_dict = self.process_children_dict(elm, include=("e", "lim"))
return LIM_UPP.format(lim=t_dict.get("lim"), text=t_dict.get("e"))
def do_lim(self, elm):
"""
the lower limit of the limLow object and the upper limit of the limUpp function
"""
return self.process_children(elm).replace(LIM_TO[0], LIM_TO[1])
def do_m(self, elm):
"""
the Matrix object
"""
rows = []
for stag, t, e in self.process_children_list(elm):
if stag == "mPr":
pass
elif stag == "mr":
rows.append(t)
return M.format(text=BRK.join(rows))
def do_mr(self, elm):
"""
a single row of the matrix m
"""
return ALN.join(
[t for stag, t, e in self.process_children_list(elm, include=("e",))]
)
def do_nary(self, elm):
"""
the n-ary object
"""
res = []
bo = ""
for stag, t, e in self.process_children_list(elm):
if stag == "naryPr":
bo = get_val(t.chr, store=CHR_BO)
else:
res.append(t)
return bo + BLANK.join(res)
def do_r(self, elm):
"""
Get text from 'r' element,And try convert them to latex symbols
@todo text style support , (sty)
@todo \text (latex pure text support)
"""
_str = []
for s in elm.findtext("./{0}t".format(OMML_NS)):
# s = s if isinstance(s,unicode) else unicode(s,'utf-8')
_str.append(self._t_dict.get(s, s))
return escape_latex(BLANK.join(_str))
tag2meth = {
"acc": do_acc,
"r": do_r,
"bar": do_bar,
"sub": do_sub,
"sup": do_sup,
"f": do_f,
"func": do_func,
"fName": do_fname,
"groupChr": do_groupchr,
"d": do_d,
"rad": do_rad,
"eqArr": do_eqarr,
"limLow": do_limlow,
"limUpp": do_limupp,
"lim": do_lim,
"m": do_m,
"mr": do_mr,
"nary": do_nary,
}

View File

@@ -1,156 +0,0 @@
import zipfile
from io import BytesIO
from typing import BinaryIO
from xml.etree import ElementTree as ET
from bs4 import BeautifulSoup, Tag
from .math.omml import OMML_NS, oMath2Latex
MATH_ROOT_TEMPLATE = "".join(
(
"<w:document ",
'xmlns:wpc="http://schemas.microsoft.com/office/word/2010/wordprocessingCanvas" ',
'xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006" ',
'xmlns:o="urn:schemas-microsoft-com:office:office" ',
'xmlns:r="http://schemas.openxmlformats.org/officeDocument/2006/relationships" ',
'xmlns:m="http://schemas.openxmlformats.org/officeDocument/2006/math" ',
'xmlns:v="urn:schemas-microsoft-com:vml" ',
'xmlns:wp14="http://schemas.microsoft.com/office/word/2010/wordprocessingDrawing" ',
'xmlns:wp="http://schemas.openxmlformats.org/drawingml/2006/wordprocessingDrawing" ',
'xmlns:w10="urn:schemas-microsoft-com:office:word" ',
'xmlns:w="http://schemas.openxmlformats.org/wordprocessingml/2006/main" ',
'xmlns:w14="http://schemas.microsoft.com/office/word/2010/wordml" ',
'xmlns:wpg="http://schemas.microsoft.com/office/word/2010/wordprocessingGroup" ',
'xmlns:wpi="http://schemas.microsoft.com/office/word/2010/wordprocessingInk" ',
'xmlns:wne="http://schemas.microsoft.com/office/word/2006/wordml" ',
'xmlns:wps="http://schemas.microsoft.com/office/word/2010/wordprocessingShape" mc:Ignorable="w14 wp14">',
"{0}</w:document>",
)
)
def _convert_omath_to_latex(tag: Tag) -> str:
"""
Converts an OMML (Office Math Markup Language) tag to LaTeX format.
Args:
tag (Tag): A BeautifulSoup Tag object representing the OMML element.
Returns:
str: The LaTeX representation of the OMML element.
"""
# Format the tag into a complete XML document string
math_root = ET.fromstring(MATH_ROOT_TEMPLATE.format(str(tag)))
# Find the 'oMath' element within the XML document
math_element = math_root.find(OMML_NS + "oMath")
# Convert the 'oMath' element to LaTeX using the oMath2Latex function
latex = oMath2Latex(math_element).latex
return latex
def _get_omath_tag_replacement(tag: Tag, block: bool = False) -> Tag:
"""
Creates a replacement tag for an OMML (Office Math Markup Language) element.
Args:
tag (Tag): A BeautifulSoup Tag object representing the "oMath" element.
block (bool, optional): If True, the LaTeX will be wrapped in double dollar signs for block mode. Defaults to False.
Returns:
Tag: A BeautifulSoup Tag object representing the replacement element.
"""
t_tag = Tag(name="w:t")
t_tag.string = (
f"$${_convert_omath_to_latex(tag)}$$"
if block
else f"${_convert_omath_to_latex(tag)}$"
)
r_tag = Tag(name="w:r")
r_tag.append(t_tag)
return r_tag
def _replace_equations(tag: Tag):
"""
Replaces OMML (Office Math Markup Language) elements with their LaTeX equivalents.
Args:
tag (Tag): A BeautifulSoup Tag object representing the OMML element. Could be either "oMathPara" or "oMath".
Raises:
ValueError: If the tag is not supported.
"""
if tag.name == "oMathPara":
# Create a new paragraph tag
p_tag = Tag(name="w:p")
# Replace each 'oMath' child tag with its LaTeX equivalent as block equations
for child_tag in tag.find_all("oMath"):
p_tag.append(_get_omath_tag_replacement(child_tag, block=True))
# Replace the original 'oMathPara' tag with the new paragraph tag
tag.replace_with(p_tag)
elif tag.name == "oMath":
# Replace the 'oMath' tag with its LaTeX equivalent as inline equation
tag.replace_with(_get_omath_tag_replacement(tag, block=False))
else:
raise ValueError(f"Not supported tag: {tag.name}")
def _pre_process_math(content: bytes) -> bytes:
"""
Pre-processes the math content in a DOCX -> XML file by converting OMML (Office Math Markup Language) elements to LaTeX.
This preprocessed content can be directly replaced in the DOCX file -> XMLs.
Args:
content (bytes): The XML content of the DOCX file as bytes.
Returns:
bytes: The processed content with OMML elements replaced by their LaTeX equivalents, encoded as bytes.
"""
soup = BeautifulSoup(content.decode(), features="xml")
for tag in soup.find_all("oMathPara"):
_replace_equations(tag)
for tag in soup.find_all("oMath"):
_replace_equations(tag)
return str(soup).encode()
def pre_process_docx(input_docx: BinaryIO) -> BinaryIO:
"""
Pre-processes a DOCX file with provided steps.
The process works by unzipping the DOCX file in memory, transforming specific XML files
(such as converting OMML elements to LaTeX), and then zipping everything back into a
DOCX file without writing to disk.
Args:
input_docx (BinaryIO): A binary input stream representing the DOCX file.
Returns:
BinaryIO: A binary output stream representing the processed DOCX file.
"""
output_docx = BytesIO()
# The files that need to be pre-processed from .docx
pre_process_enable_files = [
"word/document.xml",
"word/footnotes.xml",
"word/endnotes.xml",
]
with zipfile.ZipFile(input_docx, mode="r") as zip_input:
files = {name: zip_input.read(name) for name in zip_input.namelist()}
with zipfile.ZipFile(output_docx, mode="w") as zip_output:
zip_output.comment = zip_input.comment
for name, content in files.items():
if name in pre_process_enable_files:
try:
# Pre-process the content
updated_content = _pre_process_math(content)
# In the future, if there are more pre-processing steps, they can be added here
zip_output.writestr(name, updated_content)
except Exception:
# If there is an error in processing the content, write the original content
zip_output.writestr(name, content)
else:
zip_output.writestr(name, content)
output_docx.seek(0)
return output_docx

View File

@@ -17,12 +17,7 @@ from ._image_converter import ImageConverter
from ._audio_converter import AudioConverter
from ._outlook_msg_converter import OutlookMsgConverter
from ._zip_converter import ZipConverter
from ._doc_intel_converter import (
DocumentIntelligenceConverter,
DocumentIntelligenceFileType,
)
from ._epub_converter import EpubConverter
from ._csv_converter import CsvConverter
from ._doc_intel_converter import DocumentIntelligenceConverter
__all__ = [
"PlainTextConverter",
@@ -42,7 +37,4 @@ __all__ = [
"OutlookMsgConverter",
"ZipConverter",
"DocumentIntelligenceConverter",
"DocumentIntelligenceFileType",
"EpubConverter",
"CsvConverter",
]

View File

@@ -1,4 +1,5 @@
from typing import Any, BinaryIO
import io
from typing import Any, BinaryIO, Optional
from ._exiftool import exiftool_metadata
from ._transcribe_audio import transcribe_audio

View File

@@ -1,8 +1,8 @@
import io
import re
import base64
import binascii
from urllib.parse import parse_qs, urlparse
from typing import Any, BinaryIO
from typing import Any, BinaryIO, Optional
from bs4 import BeautifulSoup
from .._base_converter import DocumentConverter, DocumentConverterResult
@@ -60,8 +60,6 @@ class BingSerpConverter(DocumentConverter):
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
assert stream_info.url is not None
# Parse the query parameters
parsed_params = parse_qs(urlparse(stream_info.url).query)
query = parsed_params.get("q", [""])[0]
@@ -78,12 +76,9 @@ class BingSerpConverter(DocumentConverter):
slug.extract()
# Parse the algorithmic results
_markdownify = _CustomMarkdownify(**kwargs)
_markdownify = _CustomMarkdownify()
results = list()
for result in soup.find_all(class_="b_algo"):
if not hasattr(result, "find_all"):
continue
# Rewrite redirect urls
for a in result.find_all("a", href=True):
parsed_href = urlparse(a["href"])

View File

@@ -1,77 +0,0 @@
import csv
import io
from typing import BinaryIO, Any
from charset_normalizer import from_bytes
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo
ACCEPTED_MIME_TYPE_PREFIXES = [
"text/csv",
"application/csv",
]
ACCEPTED_FILE_EXTENSIONS = [".csv"]
class CsvConverter(DocumentConverter):
"""
Converts CSV files to Markdown tables.
"""
def __init__(self):
super().__init__()
def accepts(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> bool:
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
if extension in ACCEPTED_FILE_EXTENSIONS:
return True
for prefix in ACCEPTED_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return True
return False
def convert(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
# Read the file content
if stream_info.charset:
content = file_stream.read().decode(stream_info.charset)
else:
content = str(from_bytes(file_stream.read()).best())
# Parse CSV content
reader = csv.reader(io.StringIO(content))
rows = list(reader)
if not rows:
return DocumentConverterResult(markdown="")
# Create markdown table
markdown_table = []
# Add header row
markdown_table.append("| " + " | ".join(rows[0]) + " |")
# Add separator row
markdown_table.append("| " + " | ".join(["---"] * len(rows[0])) + " |")
# Add data rows
for row in rows[1:]:
# Make sure row has the same number of columns as header
while len(row) < len(rows[0]):
row.append("")
# Truncate if row has more columns than header
row = row[: len(rows[0])]
markdown_table.append("| " + " | ".join(row) + " |")
result = "\n".join(markdown_table)
return DocumentConverterResult(markdown=result)

View File

@@ -1,12 +1,12 @@
import sys
import re
import os
from typing import BinaryIO, Any, List
from enum import Enum
from typing import BinaryIO, Any, List
from ._html_converter import HtmlConverter
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo
from .._exceptions import MissingDependencyException
from .._exceptions import MissingDependencyException, MISSING_DEPENDENCY_MESSAGE
# Try loading optional (but in this case, required) dependencies
# Save reporting of any exceptions for later
@@ -18,108 +18,49 @@ try:
AnalyzeResult,
DocumentAnalysisFeature,
)
from azure.core.credentials import AzureKeyCredential, TokenCredential
from azure.identity import DefaultAzureCredential
except ImportError:
# Preserve the error and stack trace for later
_dependency_exc_info = sys.exc_info()
# Define these types for type hinting when the package is not available
class AzureKeyCredential:
pass
class TokenCredential:
pass
class DocumentIntelligenceClient:
pass
class AnalyzeDocumentRequest:
pass
class AnalyzeResult:
pass
class DocumentAnalysisFeature:
pass
class DefaultAzureCredential:
pass
# TODO: currently, there is a bug in the document intelligence SDK with importing the "ContentFormat" enum.
# This constant is a temporary fix until the bug is resolved.
CONTENT_FORMAT = "markdown"
class DocumentIntelligenceFileType(str, Enum):
"""Enum of file types supported by the Document Intelligence Converter."""
OFFICE_MIME_TYPE_PREFIXES = [
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.openxmlformats-officedocument.presentationml",
"application/xhtml",
"text/html",
]
# No OCR
DOCX = "docx"
PPTX = "pptx"
XLSX = "xlsx"
HTML = "html"
# OCR
PDF = "pdf"
JPEG = "jpeg"
PNG = "png"
BMP = "bmp"
TIFF = "tiff"
OTHER_MIME_TYPE_PREFIXES = [
"application/pdf",
"application/x-pdf",
"text/html",
"image/",
]
OFFICE_FILE_EXTENSIONS = [
".docx",
".xlsx",
".pptx",
".html",
".htm",
]
def _get_mime_type_prefixes(types: List[DocumentIntelligenceFileType]) -> List[str]:
"""Get the MIME type prefixes for the given file types."""
prefixes: List[str] = []
for type_ in types:
if type_ == DocumentIntelligenceFileType.DOCX:
prefixes.append(
"application/vnd.openxmlformats-officedocument.wordprocessingml.document"
)
elif type_ == DocumentIntelligenceFileType.PPTX:
prefixes.append(
"application/vnd.openxmlformats-officedocument.presentationml"
)
elif type_ == DocumentIntelligenceFileType.XLSX:
prefixes.append(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
elif type_ == DocumentIntelligenceFileType.PDF:
prefixes.append("application/pdf")
prefixes.append("application/x-pdf")
elif type_ == DocumentIntelligenceFileType.JPEG:
prefixes.append("image/jpeg")
elif type_ == DocumentIntelligenceFileType.PNG:
prefixes.append("image/png")
elif type_ == DocumentIntelligenceFileType.BMP:
prefixes.append("image/bmp")
elif type_ == DocumentIntelligenceFileType.TIFF:
prefixes.append("image/tiff")
return prefixes
def _get_file_extensions(types: List[DocumentIntelligenceFileType]) -> List[str]:
"""Get the file extensions for the given file types."""
extensions: List[str] = []
for type_ in types:
if type_ == DocumentIntelligenceFileType.DOCX:
extensions.append(".docx")
elif type_ == DocumentIntelligenceFileType.PPTX:
extensions.append(".pptx")
elif type_ == DocumentIntelligenceFileType.XLSX:
extensions.append(".xlsx")
elif type_ == DocumentIntelligenceFileType.PDF:
extensions.append(".pdf")
elif type_ == DocumentIntelligenceFileType.JPEG:
extensions.append(".jpg")
extensions.append(".jpeg")
elif type_ == DocumentIntelligenceFileType.PNG:
extensions.append(".png")
elif type_ == DocumentIntelligenceFileType.BMP:
extensions.append(".bmp")
elif type_ == DocumentIntelligenceFileType.TIFF:
extensions.append(".tiff")
return extensions
OTHER_FILE_EXTENSIONS = [
".pdf",
".jpeg",
".jpg",
".png",
".bmp",
".tiff",
".heif",
]
class DocumentIntelligenceConverter(DocumentConverter):
@@ -130,30 +71,8 @@ class DocumentIntelligenceConverter(DocumentConverter):
*,
endpoint: str,
api_version: str = "2024-07-31-preview",
credential: AzureKeyCredential | TokenCredential | None = None,
file_types: List[DocumentIntelligenceFileType] = [
DocumentIntelligenceFileType.DOCX,
DocumentIntelligenceFileType.PPTX,
DocumentIntelligenceFileType.XLSX,
DocumentIntelligenceFileType.PDF,
DocumentIntelligenceFileType.JPEG,
DocumentIntelligenceFileType.PNG,
DocumentIntelligenceFileType.BMP,
DocumentIntelligenceFileType.TIFF,
],
):
"""
Initialize the DocumentIntelligenceConverter.
Args:
endpoint (str): The endpoint for the Document Intelligence service.
api_version (str): The API version to use. Defaults to "2024-07-31-preview".
credential (AzureKeyCredential | TokenCredential | None): The credential to use for authentication.
file_types (List[DocumentIntelligenceFileType]): The file types to accept. Defaults to all supported file types.
"""
super().__init__()
self._file_types = file_types
# Raise an error if the dependencies are not available.
# This is different than other converters since this one isn't even instantiated
@@ -167,18 +86,12 @@ class DocumentIntelligenceConverter(DocumentConverter):
_dependency_exc_info[2]
)
if credential is None:
if os.environ.get("AZURE_API_KEY") is None:
credential = DefaultAzureCredential()
else:
credential = AzureKeyCredential(os.environ["AZURE_API_KEY"])
self.endpoint = endpoint
self.api_version = api_version
self.doc_intel_client = DocumentIntelligenceClient(
endpoint=self.endpoint,
api_version=self.api_version,
credential=credential,
credential=DefaultAzureCredential(),
)
def accepts(
@@ -190,10 +103,10 @@ class DocumentIntelligenceConverter(DocumentConverter):
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
if extension in _get_file_extensions(self._file_types):
if extension in OFFICE_FILE_EXTENSIONS + OTHER_FILE_EXTENSIONS:
return True
for prefix in _get_mime_type_prefixes(self._file_types):
for prefix in OFFICE_MIME_TYPE_PREFIXES + OTHER_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return True
@@ -208,18 +121,10 @@ class DocumentIntelligenceConverter(DocumentConverter):
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
# Types that don't support ocr
no_ocr_types = [
DocumentIntelligenceFileType.DOCX,
DocumentIntelligenceFileType.PPTX,
DocumentIntelligenceFileType.XLSX,
DocumentIntelligenceFileType.HTML,
]
if extension in _get_file_extensions(no_ocr_types):
if extension in OFFICE_FILE_EXTENSIONS:
return []
for prefix in _get_mime_type_prefixes(no_ocr_types):
for prefix in OFFICE_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return []

View File

@@ -3,8 +3,7 @@ import sys
from typing import BinaryIO, Any
from ._html_converter import HtmlConverter
from ..converter_utils.docx.pre_process import pre_process_docx
from .._base_converter import DocumentConverterResult
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo
from .._exceptions import MissingDependencyException, MISSING_DEPENDENCY_MESSAGE
@@ -73,8 +72,6 @@ class DocxConverter(HtmlConverter):
)
style_map = kwargs.get("style_map", None)
pre_process_stream = pre_process_docx(file_stream)
return self._html_converter.convert_string(
mammoth.convert_to_html(pre_process_stream, style_map=style_map).value,
**kwargs,
mammoth.convert_to_html(file_stream, style_map=style_map).value
)

View File

@@ -1,146 +0,0 @@
import os
import zipfile
from defusedxml import minidom
from xml.dom.minidom import Document
from typing import BinaryIO, Any, Dict, List
from ._html_converter import HtmlConverter
from .._base_converter import DocumentConverterResult
from .._stream_info import StreamInfo
ACCEPTED_MIME_TYPE_PREFIXES = [
"application/epub",
"application/epub+zip",
"application/x-epub+zip",
]
ACCEPTED_FILE_EXTENSIONS = [".epub"]
MIME_TYPE_MAPPING = {
".html": "text/html",
".xhtml": "application/xhtml+xml",
}
class EpubConverter(HtmlConverter):
"""
Converts EPUB files to Markdown. Style information (e.g.m headings) and tables are preserved where possible.
"""
def __init__(self):
super().__init__()
self._html_converter = HtmlConverter()
def accepts(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> bool:
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
if extension in ACCEPTED_FILE_EXTENSIONS:
return True
for prefix in ACCEPTED_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return True
return False
def convert(
self,
file_stream: BinaryIO,
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
with zipfile.ZipFile(file_stream, "r") as z:
# Extracts metadata (title, authors, language, publisher, date, description, cover) from an EPUB file."""
# Locate content.opf
container_dom = minidom.parse(z.open("META-INF/container.xml"))
opf_path = container_dom.getElementsByTagName("rootfile")[0].getAttribute(
"full-path"
)
# Parse content.opf
opf_dom = minidom.parse(z.open(opf_path))
metadata: Dict[str, Any] = {
"title": self._get_text_from_node(opf_dom, "dc:title"),
"authors": self._get_all_texts_from_nodes(opf_dom, "dc:creator"),
"language": self._get_text_from_node(opf_dom, "dc:language"),
"publisher": self._get_text_from_node(opf_dom, "dc:publisher"),
"date": self._get_text_from_node(opf_dom, "dc:date"),
"description": self._get_text_from_node(opf_dom, "dc:description"),
"identifier": self._get_text_from_node(opf_dom, "dc:identifier"),
}
# Extract manifest items (ID → href mapping)
manifest = {
item.getAttribute("id"): item.getAttribute("href")
for item in opf_dom.getElementsByTagName("item")
}
# Extract spine order (ID refs)
spine_items = opf_dom.getElementsByTagName("itemref")
spine_order = [item.getAttribute("idref") for item in spine_items]
# Convert spine order to actual file paths
base_path = "/".join(
opf_path.split("/")[:-1]
) # Get base directory of content.opf
spine = [
f"{base_path}/{manifest[item_id]}" if base_path else manifest[item_id]
for item_id in spine_order
if item_id in manifest
]
# Extract and convert the content
markdown_content: List[str] = []
for file in spine:
if file in z.namelist():
with z.open(file) as f:
filename = os.path.basename(file)
extension = os.path.splitext(filename)[1].lower()
mimetype = MIME_TYPE_MAPPING.get(extension)
converted_content = self._html_converter.convert(
f,
StreamInfo(
mimetype=mimetype,
extension=extension,
filename=filename,
),
)
markdown_content.append(converted_content.markdown.strip())
# Format and add the metadata
metadata_markdown = []
for key, value in metadata.items():
if isinstance(value, list):
value = ", ".join(value)
if value:
metadata_markdown.append(f"**{key.capitalize()}:** {value}")
markdown_content.insert(0, "\n".join(metadata_markdown))
return DocumentConverterResult(
markdown="\n\n".join(markdown_content), title=metadata["title"]
)
def _get_text_from_node(self, dom: Document, tag_name: str) -> str | None:
"""Convenience function to extract a single occurrence of a tag (e.g., title)."""
texts = self._get_all_texts_from_nodes(dom, tag_name)
if len(texts) > 0:
return texts[0]
else:
return None
def _get_all_texts_from_nodes(self, dom: Document, tag_name: str) -> List[str]:
"""Helper function to extract all occurrences of a tag (e.g., multiple authors)."""
texts: List[str] = []
for node in dom.getElementsByTagName(tag_name):
if node.firstChild and hasattr(node.firstChild, "nodeValue"):
texts.append(node.firstChild.nodeValue.strip())
return texts

View File

@@ -1,6 +1,10 @@
import json
import subprocess
import locale
import sys
import shutil
import os
import warnings
from typing import BinaryIO, Any, Union

View File

@@ -56,9 +56,9 @@ class HtmlConverter(DocumentConverter):
body_elm = soup.find("body")
webpage_text = ""
if body_elm:
webpage_text = _CustomMarkdownify(**kwargs).convert_soup(body_elm)
webpage_text = _CustomMarkdownify().convert_soup(body_elm)
else:
webpage_text = _CustomMarkdownify(**kwargs).convert_soup(soup)
webpage_text = _CustomMarkdownify().convert_soup(soup)
assert isinstance(webpage_text, str)

View File

@@ -50,6 +50,8 @@ class IpynbConverter(DocumentConverter):
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
# Parse and convert the notebook
result = None
encoding = stream_info.charset or "utf-8"
notebook_content = file_stream.read().decode(encoding=encoding)
return self._convert(json.loads(notebook_content))

View File

@@ -1,4 +1,4 @@
from typing import BinaryIO, Union
from typing import BinaryIO, Any, Union
import base64
import mimetypes
from .._stream_info import StreamInfo

View File

@@ -17,7 +17,6 @@ class _CustomMarkdownify(markdownify.MarkdownConverter):
def __init__(self, **options: Any):
options["heading_style"] = options.get("heading_style", markdownify.ATX)
options["keep_data_uris"] = options.get("keep_data_uris", False)
# Explicitly cast options to the expected type if necessary
super().__init__(**options)
@@ -102,7 +101,7 @@ class _CustomMarkdownify(markdownify.MarkdownConverter):
return alt
# Remove dataURIs
if src.startswith("data:") and not self.options["keep_data_uris"]:
if src.startswith("data:"):
src = src.split(",")[0] + "..."
return "![%s](%s%s)" % (alt, src, title_part)

View File

@@ -9,7 +9,7 @@ from .._exceptions import MissingDependencyException, MISSING_DEPENDENCY_MESSAGE
_dependency_exc_info = None
olefile = None
try:
import olefile # type: ignore[no-redef]
import olefile
except ImportError:
# Preserve the error and stack trace for later
_dependency_exc_info = sys.exc_info()
@@ -56,13 +56,12 @@ class OutlookMsgConverter(DocumentConverter):
# Brue force, check if it's an Outlook file
try:
if olefile is not None:
msg = olefile.OleFileIO(file_stream)
toc = "\n".join([str(stream) for stream in msg.listdir()])
return (
"__properties_version1.0" in toc
and "__recip_version1.0_#00000000" in toc
)
msg = olefile.OleFileIO(file_stream)
toc = "\n".join([str(stream) for stream in msg.listdir()])
return (
"__properties_version1.0" in toc
and "__recip_version1.0_#00000000" in toc
)
except Exception as e:
pass
finally:
@@ -90,11 +89,7 @@ class OutlookMsgConverter(DocumentConverter):
_dependency_exc_info[2]
)
assert (
olefile is not None
) # If we made it this far, olefile should be available
msg = olefile.OleFileIO(file_stream)
# Extract email metadata
md_content = "# Email Message\n\n"
@@ -126,7 +121,6 @@ class OutlookMsgConverter(DocumentConverter):
def _get_stream_data(self, msg: Any, stream_path: str) -> Union[str, None]:
"""Helper to safely extract and decode stream data from the MSG file."""
assert olefile is not None
assert isinstance(
msg, olefile.OleFileIO
) # Ensure msg is of the correct type (type hinting is not possible with the optional olefile package)

View File

@@ -4,6 +4,7 @@ import io
from typing import BinaryIO, Any
from ._html_converter import HtmlConverter
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo
from .._exceptions import MissingDependencyException, MISSING_DEPENDENCY_MESSAGE

View File

@@ -9,7 +9,7 @@ from .._stream_info import StreamInfo
# Save reporting of any exceptions for later
_dependency_exc_info = None
try:
import mammoth # noqa: F401
import mammoth
except ImportError:
# Preserve the error and stack trace for later
_dependency_exc_info = sys.exc_info()
@@ -17,16 +17,12 @@ except ImportError:
ACCEPTED_MIME_TYPE_PREFIXES = [
"text/",
"application/json",
"application/markdown",
]
ACCEPTED_FILE_EXTENSIONS = [
".txt",
".text",
".md",
".markdown",
".json",
".jsonl",
# Mimetypes to ignore (commonly confused extensions)
IGNORE_MIME_TYPE_PREFIXES = [
"text/vnd.in3d.spot", # .spo wich is confused with xls, doc, etc.
"text/vnd.graphviz", # .dot which is confused with xls, doc, etc.
]
@@ -42,14 +38,9 @@ class PlainTextConverter(DocumentConverter):
mimetype = (stream_info.mimetype or "").lower()
extension = (stream_info.extension or "").lower()
# If we have a charset, we can safely assume it's text
# With Magika in the earlier stages, this handles most cases
if stream_info.charset is not None:
return True
# Otherwise, check the mimetype and extension
if extension in ACCEPTED_FILE_EXTENSIONS:
return True
for prefix in IGNORE_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):
return False
for prefix in ACCEPTED_MIME_TYPE_PREFIXES:
if mimetype.startswith(prefix):

View File

@@ -140,20 +140,13 @@ class PptxConverter(DocumentConverter):
alt_text = re.sub(r"[\r\n\[\]]", " ", alt_text)
alt_text = re.sub(r"\s+", " ", alt_text).strip()
# If keep_data_uris is True, use base64 encoding for images
if kwargs.get("keep_data_uris", False):
blob = shape.image.blob
content_type = shape.image.content_type or "image/png"
b64_string = base64.b64encode(blob).decode("utf-8")
md_content += f"\n![{alt_text}](data:{content_type};base64,{b64_string})\n"
else:
# A placeholder name
filename = re.sub(r"\W", "", shape.name) + ".jpg"
md_content += "\n![" + alt_text + "](" + filename + ")\n"
# A placeholder name
filename = re.sub(r"\W", "", shape.name) + ".jpg"
md_content += "\n![" + alt_text + "](" + filename + ")\n"
# Tables
if self._is_table(shape):
md_content += self._convert_table_to_markdown(shape.table, **kwargs)
md_content += self._convert_table_to_markdown(shape.table)
# Charts
if shape.has_chart:
@@ -200,7 +193,7 @@ class PptxConverter(DocumentConverter):
return True
return False
def _convert_table_to_markdown(self, table, **kwargs):
def _convert_table_to_markdown(self, table):
# Write the table as HTML, then convert it to Markdown
html_table = "<html><body><table>"
first_row = True
@@ -215,38 +208,27 @@ class PptxConverter(DocumentConverter):
first_row = False
html_table += "</table></body></html>"
return (
self._html_converter.convert_string(html_table, **kwargs).markdown.strip()
+ "\n"
)
return self._html_converter.convert_string(html_table).markdown.strip() + "\n"
def _convert_chart_to_markdown(self, chart):
try:
md = "\n\n### Chart"
if chart.has_title:
md += f": {chart.chart_title.text_frame.text}"
md += "\n\n"
data = []
category_names = [c.label for c in chart.plots[0].categories]
series_names = [s.name for s in chart.series]
data.append(["Category"] + series_names)
md = "\n\n### Chart"
if chart.has_title:
md += f": {chart.chart_title.text_frame.text}"
md += "\n\n"
data = []
category_names = [c.label for c in chart.plots[0].categories]
series_names = [s.name for s in chart.series]
data.append(["Category"] + series_names)
for idx, category in enumerate(category_names):
row = [category]
for series in chart.series:
row.append(series.values[idx])
data.append(row)
for idx, category in enumerate(category_names):
row = [category]
for series in chart.series:
row.append(series.values[idx])
data.append(row)
markdown_table = []
for row in data:
markdown_table.append("| " + " | ".join(map(str, row)) + " |")
header = markdown_table[0]
separator = "|" + "|".join(["---"] * len(data[0])) + "|"
return md + "\n".join([header, separator] + markdown_table[1:])
except ValueError as e:
# Handle the specific error for unsupported chart types
if "unsupported plot type" in str(e):
return "\n\n[unsupported chart]\n\n"
except Exception:
# Catch any other exceptions that might occur
return "\n\n[unsupported chart]\n\n"
markdown_table = []
for row in data:
markdown_table.append("| " + " | ".join(map(str, row)) + " |")
header = markdown_table[0]
separator = "|" + "|".join(["---"] * len(data[0])) + "|"
return md + "\n".join([header, separator] + markdown_table[1:])

View File

@@ -1,5 +1,4 @@
from defusedxml import minidom
from xml.dom.minidom import Document, Element
from xml.dom import minidom
from typing import BinaryIO, Any, Union
from bs4 import BeautifulSoup
@@ -9,9 +8,7 @@ from .._base_converter import DocumentConverter, DocumentConverterResult
PRECISE_MIME_TYPE_PREFIXES = [
"application/rss",
"application/rss+xml",
"application/atom",
"application/atom+xml",
]
PRECISE_FILE_EXTENSIONS = [".rss", ".atom"]
@@ -29,10 +26,6 @@ CANDIDATE_FILE_EXTENSIONS = [
class RssConverter(DocumentConverter):
"""Convert RSS / Atom type to markdown"""
def __init__(self):
super().__init__()
self._kwargs = {}
def accepts(
self,
file_stream: BinaryIO,
@@ -71,7 +64,7 @@ class RssConverter(DocumentConverter):
file_stream.seek(cur_pos)
return False
def _feed_type(self, doc: Any) -> str | None:
def _feed_type(self, doc: Any) -> str:
if doc.getElementsByTagName("rss"):
return "rss"
elif doc.getElementsByTagName("feed"):
@@ -87,7 +80,6 @@ class RssConverter(DocumentConverter):
stream_info: StreamInfo,
**kwargs: Any, # Options to pass to the converter
) -> DocumentConverterResult:
self._kwargs = kwargs
doc = minidom.parse(file_stream)
feed_type = self._feed_type(doc)
@@ -98,7 +90,7 @@ class RssConverter(DocumentConverter):
else:
raise ValueError("Unknown feed type")
def _parse_atom_type(self, doc: Document) -> DocumentConverterResult:
def _parse_atom_type(self, doc: minidom.Document) -> DocumentConverterResult:
"""Parse the type of an Atom feed.
Returns None if the feed type is not recognized or something goes wrong.
@@ -130,16 +122,16 @@ class RssConverter(DocumentConverter):
title=title,
)
def _parse_rss_type(self, doc: Document) -> DocumentConverterResult:
def _parse_rss_type(self, doc: minidom.Document) -> DocumentConverterResult:
"""Parse the type of an RSS feed.
Returns None if the feed type is not recognized or something goes wrong.
"""
root = doc.getElementsByTagName("rss")[0]
channel_list = root.getElementsByTagName("channel")
if not channel_list:
raise ValueError("No channel found in RSS feed")
channel = channel_list[0]
channel = root.getElementsByTagName("channel")
if not channel:
return None
channel = channel[0]
channel_title = self._get_data_by_tag_name(channel, "title")
channel_description = self._get_data_by_tag_name(channel, "description")
items = channel.getElementsByTagName("item")
@@ -147,6 +139,8 @@ class RssConverter(DocumentConverter):
md_text = f"# {channel_title}\n"
if channel_description:
md_text += f"{channel_description}\n"
if not items:
items = []
for item in items:
title = self._get_data_by_tag_name(item, "title")
description = self._get_data_by_tag_name(item, "description")
@@ -172,12 +166,12 @@ class RssConverter(DocumentConverter):
try:
# using bs4 because many RSS feeds have HTML-styled content
soup = BeautifulSoup(content, "html.parser")
return _CustomMarkdownify(**self._kwargs).convert_soup(soup)
return _CustomMarkdownify().convert_soup(soup)
except BaseException as _:
return content
def _get_data_by_tag_name(
self, element: Element, tag_name: str
self, element: minidom.Element, tag_name: str
) -> Union[str, None]:
"""Get data from first child element with the given tag name.
Returns None when no such element is found.
@@ -187,6 +181,5 @@ class RssConverter(DocumentConverter):
return None
fc = nodes[0].firstChild
if fc:
if hasattr(fc, "data"):
return fc.data
return fc.data
return None

View File

@@ -7,14 +7,8 @@ from .._exceptions import MissingDependencyException
# Save reporting of any exceptions for later
_dependency_exc_info = None
try:
# Suppress some warnings on library import
import warnings
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=SyntaxWarning)
import speech_recognition as sr
import pydub
import speech_recognition as sr
import pydub
except ImportError:
# Preserve the error and stack trace for later
_dependency_exc_info = sys.exc_info()

View File

@@ -1,6 +1,7 @@
import io
import re
import bs4
from typing import Any, BinaryIO
from typing import Any, BinaryIO, Optional
from bs4 import BeautifulSoup
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo
@@ -56,7 +57,7 @@ class WikipediaConverter(DocumentConverter):
) -> DocumentConverterResult:
# Parse the stream
encoding = "utf-8" if stream_info.charset is None else stream_info.charset
soup = bs4.BeautifulSoup(file_stream, "html.parser", from_encoding=encoding)
soup = BeautifulSoup(file_stream, "html.parser", from_encoding=encoding)
# Remove javascript and style blocks
for script in soup(["script", "style"]):
@@ -71,15 +72,16 @@ class WikipediaConverter(DocumentConverter):
if body_elm:
# What's the title
if title_elm and isinstance(title_elm, bs4.Tag):
main_title = title_elm.string
if title_elm and len(title_elm) > 0:
main_title = title_elm.string # type: ignore
assert isinstance(main_title, str)
# Convert the page
webpage_text = f"# {main_title}\n\n" + _CustomMarkdownify(
**kwargs
).convert_soup(body_elm)
webpage_text = f"# {main_title}\n\n" + _CustomMarkdownify().convert_soup(
body_elm
)
else:
webpage_text = _CustomMarkdownify(**kwargs).convert_soup(soup)
webpage_text = _CustomMarkdownify().convert_soup(soup)
return DocumentConverterResult(
markdown=webpage_text,

View File

@@ -10,14 +10,14 @@ from .._stream_info import StreamInfo
_xlsx_dependency_exc_info = None
try:
import pandas as pd
import openpyxl # noqa: F401
import openpyxl
except ImportError:
_xlsx_dependency_exc_info = sys.exc_info()
_xls_dependency_exc_info = None
try:
import pandas as pd # noqa: F811
import xlrd # noqa: F401
import pandas as pd
import xlrd
except ImportError:
_xls_dependency_exc_info = sys.exc_info()
@@ -86,9 +86,7 @@ class XlsxConverter(DocumentConverter):
md_content += f"## {s}\n"
html_content = sheets[s].to_html(index=False)
md_content += (
self._html_converter.convert_string(
html_content, **kwargs
).markdown.strip()
self._html_converter.convert_string(html_content).markdown.strip()
+ "\n\n"
)
@@ -148,9 +146,7 @@ class XlsConverter(DocumentConverter):
md_content += f"## {s}\n"
html_content = sheets[s].to_html(index=False)
md_content += (
self._html_converter.convert_string(
html_content, **kwargs
).markdown.strip()
self._html_converter.convert_string(html_content).markdown.strip()
+ "\n\n"
)

View File

@@ -1,22 +1,19 @@
import sys
import json
import time
import io
import re
import bs4
from typing import Any, BinaryIO, Dict, List, Union
from typing import Any, BinaryIO, Optional, Dict, List, Union
from urllib.parse import parse_qs, urlparse, unquote
from bs4 import BeautifulSoup
from .._base_converter import DocumentConverter, DocumentConverterResult
from .._stream_info import StreamInfo
from ._markdownify import _CustomMarkdownify
# Optional YouTube transcription support
try:
# Suppress some warnings on library import
import warnings
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=SyntaxWarning)
# Patch submitted upstream to fix the SyntaxWarning
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api import YouTubeTranscriptApi
IS_YOUTUBE_TRANSCRIPT_CAPABLE = True
except ModuleNotFoundError:
@@ -75,31 +72,21 @@ class YouTubeConverter(DocumentConverter):
) -> DocumentConverterResult:
# Parse the stream
encoding = "utf-8" if stream_info.charset is None else stream_info.charset
soup = bs4.BeautifulSoup(file_stream, "html.parser", from_encoding=encoding)
soup = BeautifulSoup(file_stream, "html.parser", from_encoding=encoding)
# Read the meta tags
metadata: Dict[str, str] = {}
if soup.title and soup.title.string:
metadata["title"] = soup.title.string
metadata: Dict[str, str] = {"title": soup.title.string}
for meta in soup(["meta"]):
if not isinstance(meta, bs4.Tag):
continue
for a in meta.attrs:
if a in ["itemprop", "property", "name"]:
key = str(meta.get(a, ""))
content = str(meta.get("content", ""))
if key and content: # Only add non-empty content
metadata[key] = content
content = meta.get("content", "")
if content: # Only add non-empty content
metadata[meta[a]] = content
break
# Try reading the description
try:
for script in soup(["script"]):
if not isinstance(script, bs4.Tag):
continue
if not script.string: # Skip empty scripts
continue
content = script.string
@@ -145,50 +132,36 @@ class YouTubeConverter(DocumentConverter):
webpage_text += f"\n### Description\n{description}\n"
if IS_YOUTUBE_TRANSCRIPT_CAPABLE:
ytt_api = YouTubeTranscriptApi()
transcript_text = ""
parsed_url = urlparse(stream_info.url) # type: ignore
params = parse_qs(parsed_url.query) # type: ignore
if "v" in params and params["v"][0]:
video_id = str(params["v"][0])
transcript_list = ytt_api.list(video_id)
languages = ["en"]
for transcript in transcript_list:
languages.append(transcript.language_code)
break
try:
youtube_transcript_languages = kwargs.get(
"youtube_transcript_languages", languages
"youtube_transcript_languages", ("en",)
)
# Retry the transcript fetching operation
transcript = self._retry_operation(
lambda: ytt_api.fetch(
lambda: YouTubeTranscriptApi.get_transcript(
video_id, languages=youtube_transcript_languages
),
retries=3, # Retry 3 times
delay=2, # 2 seconds delay between retries
)
if transcript:
transcript_text = " ".join(
[part.text for part in transcript]
[part["text"] for part in transcript]
) # type: ignore
# Alternative formatting:
# formatter = TextFormatter()
# formatter.format_transcript(transcript)
except Exception as e:
# No transcript available
if len(languages) == 1:
print(f"Error fetching transcript: {e}")
else:
# Translate transcript into first kwarg
transcript = (
transcript_list.find_transcript(languages)
.translate(youtube_transcript_languages[0])
.fetch()
)
transcript_text = " ".join([part.text for part in transcript])
print(f"Error fetching transcript: {e}")
if transcript_text:
webpage_text += f"\n### Transcript\n{transcript_text}\n"
title = title if title else (soup.title.string if soup.title else "")
title = title if title else soup.title.string
assert isinstance(title, str)
return DocumentConverterResult(

View File

@@ -1,3 +1,4 @@
import sys
import zipfile
import io
import os

View File

@@ -1,279 +0,0 @@
import dataclasses
from typing import List
@dataclasses.dataclass(frozen=True, kw_only=True)
class FileTestVector(object):
filename: str
mimetype: str | None
charset: str | None
url: str | None
must_include: List[str]
must_not_include: List[str]
GENERAL_TEST_VECTORS = [
FileTestVector(
filename="test.docx",
mimetype="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
charset=None,
url=None,
must_include=[
"314b0a30-5b04-470b-b9f7-eed2c2bec74a",
"49e168b7-d2ae-407f-a055-2167576f39a1",
"## d666f1f7-46cb-42bd-9a39-9a39cf2a509f",
"# Abstract",
"# Introduction",
"AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation",
"data:image/png;base64...",
],
must_not_include=[
"",
],
),
FileTestVector(
filename="test.xlsx",
mimetype="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
charset=None,
url=None,
must_include=[
"## 09060124-b5e7-4717-9d07-3c046eb",
"6ff4173b-42a5-4784-9b19-f49caff4d93d",
"affc7dad-52dc-4b98-9b5d-51e65d8a8ad0",
],
must_not_include=[],
),
FileTestVector(
filename="test.xls",
mimetype="application/vnd.ms-excel",
charset=None,
url=None,
must_include=[
"## 09060124-b5e7-4717-9d07-3c046eb",
"6ff4173b-42a5-4784-9b19-f49caff4d93d",
"affc7dad-52dc-4b98-9b5d-51e65d8a8ad0",
],
must_not_include=[],
),
FileTestVector(
filename="test.pptx",
mimetype="application/vnd.openxmlformats-officedocument.presentationml.presentation",
charset=None,
url=None,
must_include=[
"2cdda5c8-e50e-4db4-b5f0-9722a649f455",
"04191ea8-5c73-4215-a1d3-1cfb43aaaf12",
"44bf7d06-5e7a-4a40-a2e1-a2e42ef28c8a",
"1b92870d-e3b5-4e65-8153-919f4ff45592",
"AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation",
"a3f6004b-6f4f-4ea8-bee3-3741f4dc385f", # chart title
"2003", # chart value
"![This phrase of the caption is Human-written.](Picture4.jpg)",
],
must_not_include=[""],
),
FileTestVector(
filename="test_outlook_msg.msg",
mimetype="application/vnd.ms-outlook",
charset=None,
url=None,
must_include=[
"# Email Message",
"**From:** test.sender@example.com",
"**To:** test.recipient@example.com",
"**Subject:** Test Email Message",
"## Content",
"This is the body of the test email message",
],
must_not_include=[],
),
FileTestVector(
filename="test.pdf",
mimetype="application/pdf",
charset=None,
url=None,
must_include=[
"While there is contemporaneous exploration of multi-agent approaches"
],
must_not_include=[],
),
FileTestVector(
filename="test_blog.html",
mimetype="text/html",
charset="utf-8",
url="https://microsoft.github.io/autogen/blog/2023/04/21/LLM-tuning-math",
must_include=[
"Large language models (LLMs) are powerful tools that can generate natural language texts for various applications, such as chatbots, summarization, translation, and more. GPT-4 is currently the state of the art LLM in the world. Is model selection irrelevant? What about inference parameters?",
"an example where high cost can easily prevent a generic complex",
],
must_not_include=[],
),
FileTestVector(
filename="test_wikipedia.html",
mimetype="text/html",
charset="utf-8",
url="https://en.wikipedia.org/wiki/Microsoft",
must_include=[
"Microsoft entered the operating system (OS) business in 1980 with its own version of [Unix]",
'Microsoft was founded by [Bill Gates](/wiki/Bill_Gates "Bill Gates")',
],
must_not_include=[
"You are encouraged to create an account and log in",
"154 languages",
"move to sidebar",
],
),
FileTestVector(
filename="test_serp.html",
mimetype="text/html",
charset="utf-8",
url="https://www.bing.com/search?q=microsoft+wikipedia",
must_include=[
"](https://en.wikipedia.org/wiki/Microsoft",
"Microsoft Corporation is **an American multinational corporation and technology company headquartered** in Redmond",
"19952007: Foray into the Web, Windows 95, Windows XP, and Xbox",
],
must_not_include=[
"https://www.bing.com/ck/a?!&&p=",
"",
],
must_not_include=[
"data:image/png;base64...",
],
),
FileTestVector(
filename="test.pptx",
mimetype="application/vnd.openxmlformats-officedocument.presentationml.presentation",
charset=None,
url=None,
must_include=[
"2cdda5c8-e50e-4db4-b5f0-9722a649f455",
"04191ea8-5c73-4215-a1d3-1cfb43aaaf12",
"44bf7d06-5e7a-4a40-a2e1-a2e42ef28c8a",
"1b92870d-e3b5-4e65-8153-919f4ff45592",
"AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation",
"a3f6004b-6f4f-4ea8-bee3-3741f4dc385f", # chart title
"2003", # chart value
"![This phrase of the caption is Human-written.]", # image caption
"",
],
must_not_include=[
"![This phrase of the caption is Human-written.](Picture4.jpg)",
],
),
]

View File

@@ -0,0 +1,119 @@
#!/usr/bin/env python3 -m pytest
import os
import subprocess
import pytest
from markitdown import __version__
try:
from .test_markitdown import TEST_FILES_DIR, DOCX_TEST_STRINGS
except ImportError:
from test_markitdown import TEST_FILES_DIR, DOCX_TEST_STRINGS # type: ignore
@pytest.fixture(scope="session")
def shared_tmp_dir(tmp_path_factory):
return tmp_path_factory.mktemp("pytest_tmp")
def test_version(shared_tmp_dir) -> None:
result = subprocess.run(
["python", "-m", "markitdown", "--version"], capture_output=True, text=True
)
assert result.returncode == 0, f"CLI exited with error: {result.stderr}"
assert __version__ in result.stdout, f"Version not found in output: {result.stdout}"
def test_invalid_flag(shared_tmp_dir) -> None:
result = subprocess.run(
["python", "-m", "markitdown", "--foobar"], capture_output=True, text=True
)
assert result.returncode != 0, f"CLI exited with error: {result.stderr}"
assert (
"unrecognized arguments" in result.stderr
), f"Expected 'unrecognized arguments' to appear in STDERR"
assert "SYNTAX" in result.stderr, f"Expected 'SYNTAX' to appear in STDERR"
def test_output_to_stdout(shared_tmp_dir) -> None:
# DOC X
result = subprocess.run(
["python", "-m", "markitdown", os.path.join(TEST_FILES_DIR, "test.docx")],
capture_output=True,
text=True,
)
assert result.returncode == 0, f"CLI exited with error: {result.stderr}"
for test_string in DOCX_TEST_STRINGS:
assert (
test_string in result.stdout
), f"Expected string not found in output: {test_string}"
def test_output_to_file(shared_tmp_dir) -> None:
# DOC X, flag -o at the end
docx_output_file_1 = os.path.join(shared_tmp_dir, "test_docx_1.md")
result = subprocess.run(
[
"python",
"-m",
"markitdown",
os.path.join(TEST_FILES_DIR, "test.docx"),
"-o",
docx_output_file_1,
],
capture_output=True,
text=True,
)
assert result.returncode == 0, f"CLI exited with error: {result.stderr}"
assert os.path.exists(
docx_output_file_1
), f"Output file not created: {docx_output_file_1}"
with open(docx_output_file_1, "r") as f:
output = f.read()
for test_string in DOCX_TEST_STRINGS:
assert (
test_string in output
), f"Expected string not found in output: {test_string}"
# DOC X, flag -o at the beginning
docx_output_file_2 = os.path.join(shared_tmp_dir, "test_docx_2.md")
result = subprocess.run(
[
"python",
"-m",
"markitdown",
"-o",
docx_output_file_2,
os.path.join(TEST_FILES_DIR, "test.docx"),
],
capture_output=True,
text=True,
)
assert result.returncode == 0, f"CLI exited with error: {result.stderr}"
assert os.path.exists(
docx_output_file_2
), f"Output file not created: {docx_output_file_2}"
with open(docx_output_file_2, "r") as f:
output = f.read()
for test_string in DOCX_TEST_STRINGS:
assert (
test_string in output
), f"Expected string not found in output: {test_string}"
if __name__ == "__main__":
"""Runs this file's tests from the command line."""
import tempfile
with tempfile.TemporaryDirectory() as tmp_dir:
test_version(tmp_dir)
test_invalid_flag(tmp_dir)
test_output_to_stdout(tmp_dir)
test_output_to_file(tmp_dir)
print("All tests passed!")

View File

@@ -1,34 +0,0 @@
#!/usr/bin/env python3 -m pytest
import subprocess
from markitdown import __version__
# This file contains CLI tests that are not directly tested by the FileTestVectors.
# This includes things like help messages, version numbers, and invalid flags.
def test_version() -> None:
result = subprocess.run(
["python", "-m", "markitdown", "--version"], capture_output=True, text=True
)
assert result.returncode == 0, f"CLI exited with error: {result.stderr}"
assert __version__ in result.stdout, f"Version not found in output: {result.stdout}"
def test_invalid_flag() -> None:
result = subprocess.run(
["python", "-m", "markitdown", "--foobar"], capture_output=True, text=True
)
assert result.returncode != 0, f"CLI exited with error: {result.stderr}"
assert (
"unrecognized arguments" in result.stderr
), "Expected 'unrecognized arguments' to appear in STDERR"
assert "SYNTAX" in result.stderr, "Expected 'SYNTAX' to appear in STDERR"
if __name__ == "__main__":
"""Runs this file's tests from the command line."""
test_version()
test_invalid_flag()
print("All tests passed!")

View File

@@ -1,217 +0,0 @@
#!/usr/bin/env python3 -m pytest
import os
import time
import pytest
import subprocess
import locale
from typing import List
if __name__ == "__main__":
from _test_vectors import (
GENERAL_TEST_VECTORS,
DATA_URI_TEST_VECTORS,
FileTestVector,
)
else:
from ._test_vectors import (
GENERAL_TEST_VECTORS,
DATA_URI_TEST_VECTORS,
FileTestVector,
)
skip_remote = (
True if os.environ.get("GITHUB_ACTIONS") else False
) # Don't run these tests in CI
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "test_files")
TEST_FILES_URL = "https://raw.githubusercontent.com/microsoft/markitdown/refs/heads/main/packages/markitdown/tests/test_files"
# Prepare CLI test vectors (remove vectors that require mockig the url)
CLI_TEST_VECTORS: List[FileTestVector] = []
for test_vector in GENERAL_TEST_VECTORS:
if test_vector.url is not None:
continue
CLI_TEST_VECTORS.append(test_vector)
@pytest.fixture(scope="session")
def shared_tmp_dir(tmp_path_factory):
return tmp_path_factory.mktemp("pytest_tmp")
@pytest.mark.parametrize("test_vector", CLI_TEST_VECTORS)
def test_output_to_stdout(shared_tmp_dir, test_vector) -> None:
"""Test that the CLI outputs to stdout correctly."""
result = subprocess.run(
[
"python",
"-m",
"markitdown",
os.path.join(TEST_FILES_DIR, test_vector.filename),
],
capture_output=True,
text=True,
)
assert result.returncode == 0, f"CLI exited with error: {result.stderr}"
for test_string in test_vector.must_include:
assert test_string in result.stdout
for test_string in test_vector.must_not_include:
assert test_string not in result.stdout
@pytest.mark.parametrize("test_vector", CLI_TEST_VECTORS)
def test_output_to_file(shared_tmp_dir, test_vector) -> None:
"""Test that the CLI outputs to a file correctly."""
output_file = os.path.join(shared_tmp_dir, test_vector.filename + ".output")
result = subprocess.run(
[
"python",
"-m",
"markitdown",
"-o",
output_file,
os.path.join(TEST_FILES_DIR, test_vector.filename),
],
capture_output=True,
text=True,
)
assert result.returncode == 0, f"CLI exited with error: {result.stderr}"
assert os.path.exists(output_file), f"Output file not created: {output_file}"
with open(output_file, "r") as f:
output_data = f.read()
for test_string in test_vector.must_include:
assert test_string in output_data
for test_string in test_vector.must_not_include:
assert test_string not in output_data
os.remove(output_file)
assert not os.path.exists(output_file), f"Output file not deleted: {output_file}"
@pytest.mark.parametrize("test_vector", CLI_TEST_VECTORS)
def test_input_from_stdin_without_hints(shared_tmp_dir, test_vector) -> None:
"""Test that the CLI readds from stdin correctly."""
test_input = b""
with open(os.path.join(TEST_FILES_DIR, test_vector.filename), "rb") as stream:
test_input = stream.read()
result = subprocess.run(
[
"python",
"-m",
"markitdown",
os.path.join(TEST_FILES_DIR, test_vector.filename),
],
input=test_input,
capture_output=True,
text=False,
)
stdout = result.stdout.decode(locale.getpreferredencoding())
assert (
result.returncode == 0
), f"CLI exited with error: {result.stderr.decode('utf-8')}"
for test_string in test_vector.must_include:
assert test_string in stdout
for test_string in test_vector.must_not_include:
assert test_string not in stdout
@pytest.mark.skipif(
skip_remote,
reason="do not run tests that query external urls",
)
@pytest.mark.parametrize("test_vector", CLI_TEST_VECTORS)
def test_convert_url(shared_tmp_dir, test_vector):
"""Test the conversion of a stream with no stream info."""
# Note: tmp_dir is not used here, but is needed to match the signature
time.sleep(1) # Ensure we don't hit rate limits
result = subprocess.run(
["python", "-m", "markitdown", TEST_FILES_URL + "/" + test_vector.filename],
capture_output=True,
text=False,
)
stdout = result.stdout.decode(locale.getpreferredencoding())
assert result.returncode == 0, f"CLI exited with error: {result.stderr}"
for test_string in test_vector.must_include:
assert test_string in stdout
for test_string in test_vector.must_not_include:
assert test_string not in stdout
@pytest.mark.parametrize("test_vector", DATA_URI_TEST_VECTORS)
def test_output_to_file_with_data_uris(shared_tmp_dir, test_vector) -> None:
"""Test CLI functionality when keep_data_uris is enabled"""
output_file = os.path.join(shared_tmp_dir, test_vector.filename + ".output")
result = subprocess.run(
[
"python",
"-m",
"markitdown",
"--keep-data-uris",
"-o",
output_file,
os.path.join(TEST_FILES_DIR, test_vector.filename),
],
capture_output=True,
text=True,
)
assert result.returncode == 0, f"CLI exited with error: {result.stderr}"
assert os.path.exists(output_file), f"Output file not created: {output_file}"
with open(output_file, "r") as f:
output_data = f.read()
for test_string in test_vector.must_include:
assert test_string in output_data
for test_string in test_vector.must_not_include:
assert test_string not in output_data
os.remove(output_file)
assert not os.path.exists(output_file), f"Output file not deleted: {output_file}"
if __name__ == "__main__":
import tempfile
"""Runs this file's tests from the command line."""
with tempfile.TemporaryDirectory() as tmp_dir:
# General tests
for test_function in [
test_output_to_stdout,
test_output_to_file,
test_input_from_stdin_without_hints,
test_convert_url,
]:
for test_vector in CLI_TEST_VECTORS:
print(
f"Running {test_function.__name__} on {test_vector.filename}...",
end="",
)
test_function(tmp_dir, test_vector)
print("OK")
# Data URI tests
for test_function in [
test_output_to_file_with_data_uris,
]:
for test_vector in DATA_URI_TEST_VECTORS:
print(
f"Running {test_function.__name__} on {test_vector.filename}...",
end="",
)
test_function(tmp_dir, test_vector)
print("OK")
print("All tests passed!")

BIN
packages/markitdown/tests/test_files/test.docx vendored Executable file → Normal file

Binary file not shown.

Binary file not shown.

View File

@@ -1,11 +1,11 @@
#!/usr/bin/env python3 -m pytest
import io
import os
import re
import shutil
import pytest
import openai
from markitdown._uri_utils import parse_data_uri, file_uri_to_path
import pytest
import requests
from markitdown import (
MarkItDown,
@@ -13,10 +13,7 @@ from markitdown import (
FileConversionException,
StreamInfo,
)
# This file contains module tests that are not directly tested by the FileTestVectors.
# This includes things like helper functions and runtime conversion options
# (e.g., LLM clients, exiftool path, transcription services, etc.)
from markitdown._stream_info import _guess_stream_info_from_stream
skip_remote = (
True if os.environ.get("GITHUB_ACTIONS") else False
@@ -63,6 +60,36 @@ YOUTUBE_TEST_STRINGS = [
"the model we're going to be using today is GPT 3.5 turbo", # From the transcript
]
XLSX_TEST_STRINGS = [
"## 09060124-b5e7-4717-9d07-3c046eb",
"6ff4173b-42a5-4784-9b19-f49caff4d93d",
"affc7dad-52dc-4b98-9b5d-51e65d8a8ad0",
]
XLS_TEST_STRINGS = [
"## 09060124-b5e7-4717-9d07-3c046eb",
"6ff4173b-42a5-4784-9b19-f49caff4d93d",
"affc7dad-52dc-4b98-9b5d-51e65d8a8ad0",
]
DOCX_TEST_STRINGS = [
"314b0a30-5b04-470b-b9f7-eed2c2bec74a",
"49e168b7-d2ae-407f-a055-2167576f39a1",
"## d666f1f7-46cb-42bd-9a39-9a39cf2a509f",
"# Abstract",
"# Introduction",
"AutoGen: Enabling Next-Gen LLM Applications via Multi-Agent Conversation",
]
MSG_TEST_STRINGS = [
"# Email Message",
"**From:** test.sender@example.com",
"**To:** test.recipient@example.com",
"**Subject:** Test Email Message",
"## Content",
"This is the body of the test email message",
]
DOCX_COMMENT_TEST_STRINGS = [
"314b0a30-5b04-470b-b9f7-eed2c2bec74a",
"49e168b7-d2ae-407f-a055-2167576f39a1",
@@ -74,16 +101,6 @@ DOCX_COMMENT_TEST_STRINGS = [
"Yet another comment in the doc. 55yiyi-asd09",
]
BLOG_TEST_URL = "https://microsoft.github.io/autogen/blog/2023/04/21/LLM-tuning-math"
BLOG_TEST_STRINGS = [
"Large language models (LLMs) are powerful tools that can generate natural language texts for various applications, such as chatbots, summarization, translation, and more. GPT-4 is currently the state of the art LLM in the world. Is model selection irrelevant? What about inference parameters?",
"an example where high cost can easily prevent a generic complex",
]
LLM_TEST_STRINGS = [
"5bda1dd6",
]
PPTX_TEST_STRINGS = [
"2cdda5c8-e50e-4db4-b5f0-9722a649f455",
"04191ea8-5c73-4215-a1d3-1cfb43aaaf12",
@@ -94,6 +111,57 @@ PPTX_TEST_STRINGS = [
"2003", # chart value
]
BLOG_TEST_URL = "https://microsoft.github.io/autogen/blog/2023/04/21/LLM-tuning-math"
BLOG_TEST_STRINGS = [
"Large language models (LLMs) are powerful tools that can generate natural language texts for various applications, such as chatbots, summarization, translation, and more. GPT-4 is currently the state of the art LLM in the world. Is model selection irrelevant? What about inference parameters?",
"an example where high cost can easily prevent a generic complex",
]
RSS_TEST_STRINGS = [
"The Official Microsoft Blog",
"In the case of AI, it is absolutely true that the industry is moving incredibly fast",
]
WIKIPEDIA_TEST_URL = "https://en.wikipedia.org/wiki/Microsoft"
WIKIPEDIA_TEST_STRINGS = [
"Microsoft entered the operating system (OS) business in 1980 with its own version of [Unix]",
'Microsoft was founded by [Bill Gates](/wiki/Bill_Gates "Bill Gates")',
]
WIKIPEDIA_TEST_EXCLUDES = [
"You are encouraged to create an account and log in",
"154 languages",
"move to sidebar",
]
SERP_TEST_URL = "https://www.bing.com/search?q=microsoft+wikipedia"
SERP_TEST_STRINGS = [
"](https://en.wikipedia.org/wiki/Microsoft",
"Microsoft Corporation is **an American multinational corporation and technology company headquartered** in Redmond",
"19952007: Foray into the Web, Windows 95, Windows XP, and Xbox",
]
SERP_TEST_EXCLUDES = [
"https://www.bing.com/ck/a?!&&p=",
""
mime_type, attributes, data = parse_data_uri(data_uri)
assert mime_type == "text/plain"
assert len(attributes) == 0
assert data == b"Hello, World!"
def test_stream_info_guesses() -> None:
"""Test StreamInfo guesses based on stream content."""
data_uri = "data:base64,SGVsbG8sIFdvcmxkIQ=="
mime_type, attributes, data = parse_data_uri(data_uri)
assert mime_type is None
assert len(attributes) == 0
assert data == b"Hello, World!"
test_tuples = [
(
os.path.join(TEST_FILES_DIR, "test.xlsx"),
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
),
(
os.path.join(TEST_FILES_DIR, "test.docx"),
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
),
(
os.path.join(TEST_FILES_DIR, "test.pptx"),
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
),
(os.path.join(TEST_FILES_DIR, "test.xls"), "application/vnd.ms-excel"),
]
data_uri = "data:text/plain;charset=utf-8;base64,SGVsbG8sIFdvcmxkIQ=="
mime_type, attributes, data = parse_data_uri(data_uri)
assert mime_type == "text/plain"
assert len(attributes) == 1
assert attributes["charset"] == "utf-8"
assert data == b"Hello, World!"
data_uri = "data:,Hello%2C%20World%21"
mime_type, attributes, data = parse_data_uri(data_uri)
assert mime_type is None
assert len(attributes) == 0
assert data == b"Hello, World!"
data_uri = "data:text/plain,Hello%2C%20World%21"
mime_type, attributes, data = parse_data_uri(data_uri)
assert mime_type == "text/plain"
assert len(attributes) == 0
assert data == b"Hello, World!"
data_uri = "data:text/plain;charset=utf-8,Hello%2C%20World%21"
mime_type, attributes, data = parse_data_uri(data_uri)
assert mime_type == "text/plain"
assert len(attributes) == 1
assert attributes["charset"] == "utf-8"
assert data == b"Hello, World!"
def test_file_uris() -> None:
# Test file URI with an empty host
file_uri = "file:///path/to/file.txt"
netloc, path = file_uri_to_path(file_uri)
assert netloc is None
assert path == "/path/to/file.txt"
# Test file URI with no host
file_uri = "file:/path/to/file.txt"
netloc, path = file_uri_to_path(file_uri)
assert netloc is None
assert path == "/path/to/file.txt"
# Test file URI with localhost
file_uri = "file://localhost/path/to/file.txt"
netloc, path = file_uri_to_path(file_uri)
assert netloc == "localhost"
assert path == "/path/to/file.txt"
# Test file URI with query parameters
file_uri = "file:///path/to/file.txt?param=value"
netloc, path = file_uri_to_path(file_uri)
assert netloc is None
assert path == "/path/to/file.txt"
# Test file URI with fragment
file_uri = "file:///path/to/file.txt#fragment"
netloc, path = file_uri_to_path(file_uri)
assert netloc is None
assert path == "/path/to/file.txt"
def test_docx_comments() -> None:
# Test DOCX processing, with comments and setting style_map on init
markitdown_with_style_map = MarkItDown(style_map="comment-reference => ")
result = markitdown_with_style_map.convert(
os.path.join(TEST_FILES_DIR, "test_with_comment.docx")
)
validate_strings(result, DOCX_COMMENT_TEST_STRINGS)
def test_docx_equations() -> None:
markitdown = MarkItDown()
docx_file = os.path.join(TEST_FILES_DIR, "equations.docx")
result = markitdown.convert(docx_file)
# Check for inline equation m=1 (wrapped with single $) is present
assert "$m=1$" in result.text_content, "Inline equation $m=1$ not found"
# Find block equations wrapped with double $$ and check if they are present
block_equations = re.findall(r"\$\$(.+?)\$\$", result.text_content)
assert block_equations, "No block equations found in the document."
def test_input_as_strings() -> None:
markitdown = MarkItDown()
# Test input from a stream
input_data = b"<html><body><h1>Test</h1></body></html>"
result = markitdown.convert_stream(io.BytesIO(input_data))
assert "# Test" in result.text_content
# Test input with leading blank characters
input_data = b" \n\n\n<html><body><h1>Test</h1></body></html>"
result = markitdown.convert_stream(io.BytesIO(input_data))
assert "# Test" in result.text_content
for file_path, expected_mimetype in test_tuples:
with open(file_path, "rb") as f:
guesses = _guess_stream_info_from_stream(
f, filename_hint=os.path.basename(file_path)
)
assert len(guesses) > 0
assert guesses[0].mimetype == expected_mimetype
assert guesses[0].extension == os.path.splitext(file_path)[1]
@pytest.mark.skipif(
@@ -299,12 +287,194 @@ def test_markitdown_remote() -> None:
for test_string in PDF_TEST_STRINGS:
assert test_string in result.text_content
# By stream
response = requests.get(PDF_TEST_URL)
result = markitdown.convert_stream(
io.BytesIO(response.content), file_extension=".pdf", url=PDF_TEST_URL
)
for test_string in PDF_TEST_STRINGS:
assert test_string in result.text_content
# Youtube
result = markitdown.convert(YOUTUBE_TEST_URL)
for test_string in YOUTUBE_TEST_STRINGS:
assert test_string in result.text_content
def test_markitdown_local() -> None:
markitdown = MarkItDown()
# Test PDF processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.pdf"))
validate_strings(result, PDF_TEST_STRINGS)
# Test XLSX processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.xlsx"))
validate_strings(result, XLSX_TEST_STRINGS)
# Test XLS processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.xls"))
for test_string in XLS_TEST_STRINGS:
text_content = result.text_content.replace("\\", "")
assert test_string in text_content
# Test DOCX processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.docx"))
validate_strings(result, DOCX_TEST_STRINGS)
# Test DOCX processing, with comments
result = markitdown.convert(
os.path.join(TEST_FILES_DIR, "test_with_comment.docx"),
style_map="comment-reference => ",
)
validate_strings(result, DOCX_COMMENT_TEST_STRINGS)
# Test DOCX processing, with comments and setting style_map on init
markitdown_with_style_map = MarkItDown(style_map="comment-reference => ")
result = markitdown_with_style_map.convert(
os.path.join(TEST_FILES_DIR, "test_with_comment.docx")
)
validate_strings(result, DOCX_COMMENT_TEST_STRINGS)
# Test PPTX processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.pptx"))
validate_strings(result, PPTX_TEST_STRINGS)
# Test HTML processing
result = markitdown.convert(
os.path.join(TEST_FILES_DIR, "test_blog.html"), url=BLOG_TEST_URL
)
validate_strings(result, BLOG_TEST_STRINGS)
# Test Wikipedia processing
result = markitdown.convert(
os.path.join(TEST_FILES_DIR, "test_wikipedia.html"), url=WIKIPEDIA_TEST_URL
)
text_content = result.text_content.replace("\\", "")
validate_strings(result, WIKIPEDIA_TEST_STRINGS, WIKIPEDIA_TEST_EXCLUDES)
# Test Bing processing
result = markitdown.convert(
os.path.join(TEST_FILES_DIR, "test_serp.html"), url=SERP_TEST_URL
)
text_content = result.text_content.replace("\\", "")
validate_strings(result, SERP_TEST_STRINGS, SERP_TEST_EXCLUDES)
# Test RSS processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_rss.xml"))
text_content = result.text_content.replace("\\", "")
for test_string in RSS_TEST_STRINGS:
assert test_string in text_content
# Test MSG (Outlook email) processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_outlook_msg.msg"))
validate_strings(result, MSG_TEST_STRINGS)
# Test non-UTF-8 encoding
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_mskanji.csv"))
validate_strings(result, CSV_CP932_TEST_STRINGS)
# Test JSON processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test.json"))
validate_strings(result, JSON_TEST_STRINGS)
# # Test ZIP file processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_files.zip"))
validate_strings(result, DOCX_TEST_STRINGS)
validate_strings(result, XLSX_TEST_STRINGS)
validate_strings(result, BLOG_TEST_STRINGS)
# Test input from a stream
input_data = b"<html><body><h1>Test</h1></body></html>"
result = markitdown.convert_stream(io.BytesIO(input_data))
assert "# Test" in result.text_content
# Test input with leading blank characters
input_data = b" \n\n\n<html><body><h1>Test</h1></body></html>"
result = markitdown.convert_stream(io.BytesIO(input_data))
assert "# Test" in result.text_content
def test_markitdown_streams() -> None:
markitdown = MarkItDown()
# Test PDF processing
with open(os.path.join(TEST_FILES_DIR, "test.pdf"), "rb") as f:
result = markitdown.convert(f, file_extension=".pdf")
validate_strings(result, PDF_TEST_STRINGS)
# Test XLSX processing
with open(os.path.join(TEST_FILES_DIR, "test.xlsx"), "rb") as f:
result = markitdown.convert(f, file_extension=".xlsx")
validate_strings(result, XLSX_TEST_STRINGS)
# Test XLS processing
with open(os.path.join(TEST_FILES_DIR, "test.xls"), "rb") as f:
result = markitdown.convert(f, file_extension=".xls")
for test_string in XLS_TEST_STRINGS:
text_content = result.text_content.replace("\\", "")
assert test_string in text_content
# Test DOCX processing
with open(os.path.join(TEST_FILES_DIR, "test.docx"), "rb") as f:
result = markitdown.convert(f, file_extension=".docx")
validate_strings(result, DOCX_TEST_STRINGS)
# Test DOCX processing, with comments
with open(os.path.join(TEST_FILES_DIR, "test_with_comment.docx"), "rb") as f:
result = markitdown.convert(
f,
file_extension=".docx",
style_map="comment-reference => ",
)
validate_strings(result, DOCX_COMMENT_TEST_STRINGS)
# Test DOCX processing, with comments and setting style_map on init
markitdown_with_style_map = MarkItDown(style_map="comment-reference => ")
with open(os.path.join(TEST_FILES_DIR, "test_with_comment.docx"), "rb") as f:
result = markitdown_with_style_map.convert(f, file_extension=".docx")
validate_strings(result, DOCX_COMMENT_TEST_STRINGS)
# Test PPTX processing
with open(os.path.join(TEST_FILES_DIR, "test.pptx"), "rb") as f:
result = markitdown.convert(f, file_extension=".pptx")
validate_strings(result, PPTX_TEST_STRINGS)
# Test HTML processing
with open(os.path.join(TEST_FILES_DIR, "test_blog.html"), "rb") as f:
result = markitdown.convert(f, file_extension=".html", url=BLOG_TEST_URL)
validate_strings(result, BLOG_TEST_STRINGS)
# Test Wikipedia processing
with open(os.path.join(TEST_FILES_DIR, "test_wikipedia.html"), "rb") as f:
result = markitdown.convert(f, file_extension=".html", url=WIKIPEDIA_TEST_URL)
text_content = result.text_content.replace("\\", "")
validate_strings(result, WIKIPEDIA_TEST_STRINGS, WIKIPEDIA_TEST_EXCLUDES)
# Test Bing processing
with open(os.path.join(TEST_FILES_DIR, "test_serp.html"), "rb") as f:
result = markitdown.convert(f, file_extension=".html", url=SERP_TEST_URL)
text_content = result.text_content.replace("\\", "")
validate_strings(result, SERP_TEST_STRINGS, SERP_TEST_EXCLUDES)
# Test RSS processing
with open(os.path.join(TEST_FILES_DIR, "test_rss.xml"), "rb") as f:
result = markitdown.convert(f, file_extension=".xml")
text_content = result.text_content.replace("\\", "")
for test_string in RSS_TEST_STRINGS:
assert test_string in text_content
# Test MSG (Outlook email) processing
with open(os.path.join(TEST_FILES_DIR, "test_outlook_msg.msg"), "rb") as f:
result = markitdown.convert(f, file_extension=".msg")
validate_strings(result, MSG_TEST_STRINGS)
# Test JSON processing
with open(os.path.join(TEST_FILES_DIR, "test.json"), "rb") as f:
result = markitdown.convert(f, file_extension=".json")
validate_strings(result, JSON_TEST_STRINGS)
@pytest.mark.skipif(
skip_remote,
reason="do not run remotely run speech transcription tests",
@@ -398,19 +568,13 @@ def test_markitdown_llm() -> None:
if __name__ == "__main__":
"""Runs this file's tests from the command line."""
for test in [
test_stream_info_operations,
test_data_uris,
test_file_uris,
test_docx_comments,
test_input_as_strings,
test_markitdown_remote,
test_speech_transcription,
test_exceptions,
test_markitdown_exiftool,
test_markitdown_llm,
]:
print(f"Running {test.__name__}...", end="")
test()
print("OK")
test_stream_info_operations()
test_stream_info_guesses()
test_markitdown_remote()
test_markitdown_local()
test_markitdown_streams()
test_speech_transcription()
test_exceptions()
test_markitdown_exiftool()
test_markitdown_llm()
print("All tests passed!")

View File

@@ -1,234 +0,0 @@
#!/usr/bin/env python3 -m pytest
import os
import time
import pytest
import base64
from pathlib import Path
if __name__ == "__main__":
from _test_vectors import GENERAL_TEST_VECTORS, DATA_URI_TEST_VECTORS
else:
from ._test_vectors import GENERAL_TEST_VECTORS, DATA_URI_TEST_VECTORS
from markitdown import (
MarkItDown,
StreamInfo,
)
skip_remote = (
True if os.environ.get("GITHUB_ACTIONS") else False
) # Don't run these tests in CI
TEST_FILES_DIR = os.path.join(os.path.dirname(__file__), "test_files")
TEST_FILES_URL = "https://raw.githubusercontent.com/microsoft/markitdown/refs/heads/main/packages/markitdown/tests/test_files"
@pytest.mark.parametrize("test_vector", GENERAL_TEST_VECTORS)
def test_guess_stream_info(test_vector):
"""Test the ability to guess stream info."""
markitdown = MarkItDown()
local_path = os.path.join(TEST_FILES_DIR, test_vector.filename)
expected_extension = os.path.splitext(test_vector.filename)[1]
with open(local_path, "rb") as stream:
guesses = markitdown._get_stream_info_guesses(
stream,
base_guess=StreamInfo(
filename=os.path.basename(test_vector.filename),
local_path=local_path,
extension=expected_extension,
),
)
# For some limited exceptions, we can't guarantee the exact
# mimetype or extension, so we'll special-case them here.
if test_vector.filename in [
"test_outlook_msg.msg",
]:
return
assert guesses[0].mimetype == test_vector.mimetype
assert guesses[0].extension == expected_extension
assert guesses[0].charset == test_vector.charset
@pytest.mark.parametrize("test_vector", GENERAL_TEST_VECTORS)
def test_convert_local(test_vector):
"""Test the conversion of a local file."""
markitdown = MarkItDown()
result = markitdown.convert(
os.path.join(TEST_FILES_DIR, test_vector.filename), url=test_vector.url
)
for string in test_vector.must_include:
assert string in result.markdown
for string in test_vector.must_not_include:
assert string not in result.markdown
@pytest.mark.parametrize("test_vector", GENERAL_TEST_VECTORS)
def test_convert_stream_with_hints(test_vector):
"""Test the conversion of a stream with full stream info."""
markitdown = MarkItDown()
stream_info = StreamInfo(
extension=os.path.splitext(test_vector.filename)[1],
mimetype=test_vector.mimetype,
charset=test_vector.charset,
)
with open(os.path.join(TEST_FILES_DIR, test_vector.filename), "rb") as stream:
result = markitdown.convert(
stream, stream_info=stream_info, url=test_vector.url
)
for string in test_vector.must_include:
assert string in result.markdown
for string in test_vector.must_not_include:
assert string not in result.markdown
@pytest.mark.parametrize("test_vector", GENERAL_TEST_VECTORS)
def test_convert_stream_without_hints(test_vector):
"""Test the conversion of a stream with no stream info."""
markitdown = MarkItDown()
with open(os.path.join(TEST_FILES_DIR, test_vector.filename), "rb") as stream:
result = markitdown.convert(stream, url=test_vector.url)
for string in test_vector.must_include:
assert string in result.markdown
for string in test_vector.must_not_include:
assert string not in result.markdown
@pytest.mark.skipif(
skip_remote,
reason="do not run tests that query external urls",
)
@pytest.mark.parametrize("test_vector", GENERAL_TEST_VECTORS)
def test_convert_http_uri(test_vector):
"""Test the conversion of an HTTP:// or HTTPS:// URI."""
markitdown = MarkItDown()
time.sleep(1) # Ensure we don't hit rate limits
result = markitdown.convert(
TEST_FILES_URL + "/" + test_vector.filename,
url=test_vector.url, # Mock where this file would be found
)
for string in test_vector.must_include:
assert string in result.markdown
for string in test_vector.must_not_include:
assert string not in result.markdown
@pytest.mark.parametrize("test_vector", GENERAL_TEST_VECTORS)
def test_convert_file_uri(test_vector):
"""Test the conversion of a file:// URI."""
markitdown = MarkItDown()
result = markitdown.convert(
Path(os.path.join(TEST_FILES_DIR, test_vector.filename)).as_uri(),
url=test_vector.url,
)
for string in test_vector.must_include:
assert string in result.markdown
for string in test_vector.must_not_include:
assert string not in result.markdown
@pytest.mark.parametrize("test_vector", GENERAL_TEST_VECTORS)
def test_convert_data_uri(test_vector):
"""Test the conversion of a data URI."""
markitdown = MarkItDown()
data = ""
with open(os.path.join(TEST_FILES_DIR, test_vector.filename), "rb") as stream:
data = base64.b64encode(stream.read()).decode("utf-8")
mimetype = test_vector.mimetype
data_uri = f"data:{mimetype};base64,{data}"
result = markitdown.convert(
data_uri,
url=test_vector.url,
)
for string in test_vector.must_include:
assert string in result.markdown
for string in test_vector.must_not_include:
assert string not in result.markdown
@pytest.mark.parametrize("test_vector", DATA_URI_TEST_VECTORS)
def test_convert_keep_data_uris(test_vector):
"""Test API functionality when keep_data_uris is enabled"""
markitdown = MarkItDown()
# Test local file conversion
result = markitdown.convert(
os.path.join(TEST_FILES_DIR, test_vector.filename),
keep_data_uris=True,
url=test_vector.url,
)
for string in test_vector.must_include:
assert string in result.markdown
for string in test_vector.must_not_include:
assert string not in result.markdown
@pytest.mark.parametrize("test_vector", DATA_URI_TEST_VECTORS)
def test_convert_stream_keep_data_uris(test_vector):
"""Test the conversion of a stream with no stream info."""
markitdown = MarkItDown()
stream_info = StreamInfo(
extension=os.path.splitext(test_vector.filename)[1],
mimetype=test_vector.mimetype,
charset=test_vector.charset,
)
with open(os.path.join(TEST_FILES_DIR, test_vector.filename), "rb") as stream:
result = markitdown.convert(
stream, stream_info=stream_info, keep_data_uris=True, url=test_vector.url
)
for string in test_vector.must_include:
assert string in result.markdown
for string in test_vector.must_not_include:
assert string not in result.markdown
if __name__ == "__main__":
"""Runs this file's tests from the command line."""
# General tests
for test_function in [
test_guess_stream_info,
test_convert_local,
test_convert_stream_with_hints,
test_convert_stream_without_hints,
test_convert_http_uri,
test_convert_file_uri,
test_convert_data_uri,
]:
for test_vector in GENERAL_TEST_VECTORS:
print(
f"Running {test_function.__name__} on {test_vector.filename}...", end=""
)
test_function(test_vector)
print("OK")
# Data URI tests
for test_function in [
test_convert_keep_data_uris,
test_convert_stream_keep_data_uris,
]:
for test_vector in DATA_URI_TEST_VECTORS:
print(
f"Running {test_function.__name__} on {test_vector.filename}...", end=""
)
test_function(test_vector)
print("OK")
print("All tests passed!")