Merge pull request #97 from Soulter/main

feat: Add RSSConverter
This commit is contained in:
afourney
2024-12-17 15:34:22 -08:00
committed by GitHub
3 changed files with 153 additions and 0 deletions

View File

@@ -13,6 +13,7 @@ import sys
import tempfile import tempfile
import traceback import traceback
import zipfile import zipfile
from xml.dom import minidom
from typing import Any, Dict, List, Optional, Union from typing import Any, Dict, List, Optional, Union
from urllib.parse import parse_qs, quote, unquote, urlparse, urlunparse from urllib.parse import parse_qs, quote, unquote, urlparse, urlunparse
from warnings import warn, resetwarnings, catch_warnings from warnings import warn, resetwarnings, catch_warnings
@@ -222,6 +223,143 @@ class HtmlConverter(DocumentConverter):
) )
class RSSConverter(DocumentConverter):
"""Convert RSS / Atom type to markdown"""
def convert(
self, local_path: str, **kwargs
) -> Union[None, DocumentConverterResult]:
# Bail if not RSS type
extension = kwargs.get("file_extension", "")
if extension.lower() not in [".xml", ".rss", ".atom"]:
return None
try:
doc = minidom.parse(local_path)
except BaseException as _:
return None
result = None
if doc.getElementsByTagName("rss"):
# A RSS feed must have a root element of <rss>
result = self._parse_rss_type(doc)
elif doc.getElementsByTagName("feed"):
root = doc.getElementsByTagName("feed")[0]
if root.getElementsByTagName("entry"):
# An Atom feed must have a root element of <feed> and at least one <entry>
result = self._parse_atom_type(doc)
else:
return None
else:
# not rss or atom
return None
return result
def _parse_atom_type(
self, doc: minidom.Document
) -> Union[None, DocumentConverterResult]:
"""Parse the type of an Atom feed.
Returns None if the feed type is not recognized or something goes wrong.
"""
try:
root = doc.getElementsByTagName("feed")[0]
title = self._get_data_by_tag_name(root, "title")
subtitle = self._get_data_by_tag_name(root, "subtitle")
entries = root.getElementsByTagName("entry")
md_text = f"# {title}\n"
if subtitle:
md_text += f"{subtitle}\n"
for entry in entries:
entry_title = self._get_data_by_tag_name(entry, "title")
entry_summary = self._get_data_by_tag_name(entry, "summary")
entry_updated = self._get_data_by_tag_name(entry, "updated")
entry_content = self._get_data_by_tag_name(entry, "content")
if entry_title:
md_text += f"\n## {entry_title}\n"
if entry_updated:
md_text += f"Updated on: {entry_updated}\n"
if entry_summary:
md_text += self._parse_content(entry_summary)
if entry_content:
md_text += self._parse_content(entry_content)
return DocumentConverterResult(
title=title,
text_content=md_text,
)
except BaseException as _:
return None
def _parse_rss_type(
self, doc: minidom.Document
) -> Union[None, DocumentConverterResult]:
"""Parse the type of an RSS feed.
Returns None if the feed type is not recognized or something goes wrong.
"""
try:
root = doc.getElementsByTagName("rss")[0]
channel = root.getElementsByTagName("channel")
if not channel:
return None
channel = channel[0]
channel_title = self._get_data_by_tag_name(channel, "title")
channel_description = self._get_data_by_tag_name(channel, "description")
items = channel.getElementsByTagName("item")
if channel_title:
md_text = f"# {channel_title}\n"
if channel_description:
md_text += f"{channel_description}\n"
if not items:
items = []
for item in items:
title = self._get_data_by_tag_name(item, "title")
description = self._get_data_by_tag_name(item, "description")
pubDate = self._get_data_by_tag_name(item, "pubDate")
content = self._get_data_by_tag_name(item, "content:encoded")
if title:
md_text += f"\n## {title}\n"
if pubDate:
md_text += f"Published on: {pubDate}\n"
if description:
md_text += self._parse_content(description)
if content:
md_text += self._parse_content(content)
return DocumentConverterResult(
title=channel_title,
text_content=md_text,
)
except BaseException as _:
print(traceback.format_exc())
return None
def _parse_content(self, content: str) -> str:
"""Parse the content of an RSS feed item"""
try:
# using bs4 because many RSS feeds have HTML-styled content
soup = BeautifulSoup(content, "html.parser")
return _CustomMarkdownify().convert_soup(soup)
except BaseException as _:
return content
def _get_data_by_tag_name(
self, element: minidom.Element, tag_name: str
) -> Union[str, None]:
"""Get data from first child element with the given tag name.
Returns None when no such element is found.
"""
nodes = element.getElementsByTagName(tag_name)
if not nodes:
return None
fc = nodes[0].firstChild
if fc:
return fc.data
return None
class WikipediaConverter(DocumentConverter): class WikipediaConverter(DocumentConverter):
"""Handle Wikipedia pages separately, focusing only on the main document content.""" """Handle Wikipedia pages separately, focusing only on the main document content."""
@@ -1061,6 +1199,7 @@ class MarkItDown:
# To this end, the most specific converters should appear below the most generic converters # To this end, the most specific converters should appear below the most generic converters
self.register_page_converter(PlainTextConverter()) self.register_page_converter(PlainTextConverter())
self.register_page_converter(HtmlConverter()) self.register_page_converter(HtmlConverter())
self.register_page_converter(RSSConverter())
self.register_page_converter(WikipediaConverter()) self.register_page_converter(WikipediaConverter())
self.register_page_converter(YouTubeConverter()) self.register_page_converter(YouTubeConverter())
self.register_page_converter(BingSerpConverter()) self.register_page_converter(BingSerpConverter())

1
tests/test_files/test_rss.xml vendored Normal file

File diff suppressed because one or more lines are too long

View File

@@ -90,6 +90,13 @@ BLOG_TEST_STRINGS = [
"an example where high cost can easily prevent a generic complex", "an example where high cost can easily prevent a generic complex",
] ]
RSS_TEST_STRINGS = [
"The Official Microsoft Blog",
"In the case of AI, it is absolutely true that the industry is moving incredibly fast",
]
WIKIPEDIA_TEST_URL = "https://en.wikipedia.org/wiki/Microsoft" WIKIPEDIA_TEST_URL = "https://en.wikipedia.org/wiki/Microsoft"
WIKIPEDIA_TEST_STRINGS = [ WIKIPEDIA_TEST_STRINGS = [
"Microsoft entered the operating system (OS) business in 1980 with its own version of [Unix]", "Microsoft entered the operating system (OS) business in 1980 with its own version of [Unix]",
@@ -224,6 +231,12 @@ def test_markitdown_local() -> None:
for test_string in SERP_TEST_STRINGS: for test_string in SERP_TEST_STRINGS:
assert test_string in text_content assert test_string in text_content
# Test RSS processing
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_rss.xml"))
text_content = result.text_content.replace("\\", "")
for test_string in RSS_TEST_STRINGS:
assert test_string in text_content
## Test non-UTF-8 encoding ## Test non-UTF-8 encoding
result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_mskanji.csv")) result = markitdown.convert(os.path.join(TEST_FILES_DIR, "test_mskanji.csv"))
text_content = result.text_content.replace("\\", "") text_content = result.text_content.replace("\\", "")