feat: add rss converter
This commit is contained in:
@@ -13,6 +13,7 @@ import sys
|
||||
import tempfile
|
||||
import traceback
|
||||
import zipfile
|
||||
from xml.dom import minidom
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
from urllib.parse import parse_qs, quote, unquote, urlparse, urlunparse
|
||||
from warnings import catch_warnings
|
||||
@@ -219,6 +220,83 @@ class HtmlConverter(DocumentConverter):
|
||||
text_content=webpage_text,
|
||||
)
|
||||
|
||||
class RSSConverter(DocumentConverter):
|
||||
"""Convert RSS type to markdown"""
|
||||
|
||||
def convert(
|
||||
self, local_path: str, **kwargs
|
||||
) -> Union[None, DocumentConverterResult]:
|
||||
# Bail if not RSS type
|
||||
extension = kwargs.get("file_extension", "")
|
||||
if extension.lower() not in [".xml", ".rss"]:
|
||||
return None
|
||||
try:
|
||||
doc = minidom.parse(local_path)
|
||||
except BaseException as _:
|
||||
return None
|
||||
if not doc.getElementsByTagName("rss"):
|
||||
# A RSS feed must have a root element of <rss>
|
||||
return None
|
||||
|
||||
try:
|
||||
root = doc.getElementsByTagName("rss")[0]
|
||||
channel = root.getElementsByTagName("channel")
|
||||
if not channel:
|
||||
return None
|
||||
channel = channel[0]
|
||||
channel_title = self._get_data_by_tag_name(channel, "title")
|
||||
channel_description = self._get_data_by_tag_name(channel, "description")
|
||||
items = channel.getElementsByTagName("item")
|
||||
if channel_title:
|
||||
md_text = f"# {channel_title}\n"
|
||||
if channel_description:
|
||||
md_text += f"{channel_description}\n"
|
||||
for item in items:
|
||||
title = self._get_data_by_tag_name(item, "title")
|
||||
description = self._get_data_by_tag_name(item, "description")
|
||||
pubDate = self._get_data_by_tag_name(item, "pubDate")
|
||||
content = self._get_data_by_tag_name(item, "content:encoded")
|
||||
|
||||
if title:
|
||||
md_text += f"## {title}\n"
|
||||
if pubDate:
|
||||
md_text += f"Published on: {pubDate}\n"
|
||||
if description:
|
||||
md_text += self._parse_content(description)
|
||||
if content:
|
||||
md_text += self._parse_content(content)
|
||||
|
||||
with open("rss.md", "wt", encoding="utf-8") as f:
|
||||
f.write(md_text)
|
||||
|
||||
return DocumentConverterResult(
|
||||
title=channel_title,
|
||||
text_content=md_text,
|
||||
)
|
||||
except BaseException as _:
|
||||
print(traceback.format_exc())
|
||||
return None
|
||||
|
||||
def _parse_content(self, content: str) -> str:
|
||||
"""Parse the content of an RSS feed item"""
|
||||
try:
|
||||
# using bs4 because many RSS feeds have HTML-styled content
|
||||
soup = BeautifulSoup(content, "html.parser")
|
||||
return _CustomMarkdownify().convert_soup(soup)
|
||||
except BaseException as _:
|
||||
return content
|
||||
|
||||
def _get_data_by_tag_name(self, element: minidom.Element, tag_name: str) -> Union[str, None]:
|
||||
"""Get data from first child element with the given tag name.
|
||||
Returns None when no such element is found.
|
||||
"""
|
||||
nodes = element.getElementsByTagName(tag_name)
|
||||
if not nodes:
|
||||
return None
|
||||
fc = nodes[0].firstChild
|
||||
if fc:
|
||||
return fc.data
|
||||
return None
|
||||
|
||||
class WikipediaConverter(DocumentConverter):
|
||||
"""Handle Wikipedia pages separately, focusing only on the main document content."""
|
||||
@@ -1029,6 +1107,7 @@ class MarkItDown:
|
||||
# To this end, the most specific converters should appear below the most generic converters
|
||||
self.register_page_converter(PlainTextConverter())
|
||||
self.register_page_converter(HtmlConverter())
|
||||
self.register_page_converter(RSSConverter())
|
||||
self.register_page_converter(WikipediaConverter())
|
||||
self.register_page_converter(YouTubeConverter())
|
||||
self.register_page_converter(BingSerpConverter())
|
||||
|
||||
Reference in New Issue
Block a user