feat: support convert atom to markdown
This commit is contained in:
@@ -221,23 +221,76 @@ class HtmlConverter(DocumentConverter):
|
|||||||
)
|
)
|
||||||
|
|
||||||
class RSSConverter(DocumentConverter):
|
class RSSConverter(DocumentConverter):
|
||||||
"""Convert RSS type to markdown"""
|
"""Convert RSS / Atom type to markdown"""
|
||||||
|
|
||||||
def convert(
|
def convert(
|
||||||
self, local_path: str, **kwargs
|
self, local_path: str, **kwargs
|
||||||
) -> Union[None, DocumentConverterResult]:
|
) -> Union[None, DocumentConverterResult]:
|
||||||
# Bail if not RSS type
|
# Bail if not RSS type
|
||||||
extension = kwargs.get("file_extension", "")
|
extension = kwargs.get("file_extension", "")
|
||||||
if extension.lower() not in [".xml", ".rss"]:
|
if extension.lower() not in [".xml", ".rss", ".atom"]:
|
||||||
return None
|
return None
|
||||||
try:
|
try:
|
||||||
doc = minidom.parse(local_path)
|
doc = minidom.parse(local_path)
|
||||||
except BaseException as _:
|
except BaseException as _:
|
||||||
return None
|
return None
|
||||||
if not doc.getElementsByTagName("rss"):
|
result = None
|
||||||
|
if doc.getElementsByTagName("rss"):
|
||||||
# A RSS feed must have a root element of <rss>
|
# A RSS feed must have a root element of <rss>
|
||||||
|
result = self._parse_rss_type(doc)
|
||||||
|
elif doc.getElementsByTagName("feed"):
|
||||||
|
root = doc.getElementsByTagName("feed")[0]
|
||||||
|
if root.getElementsByTagName("entry"):
|
||||||
|
# An Atom feed must have a root element of <feed> and at least one <entry>
|
||||||
|
result = self._parse_atom_type(doc)
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
# not rss or atom
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _parse_atom_type(self, doc: minidom.Document) -> Union[None, DocumentConverterResult]:
|
||||||
|
"""Parse the type of an Atom feed.
|
||||||
|
|
||||||
|
Returns None if the feed type is not recognized or something goes wrong.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
root = doc.getElementsByTagName("feed")[0]
|
||||||
|
title = self._get_data_by_tag_name(root, "title")
|
||||||
|
subtitle = self._get_data_by_tag_name(root, "subtitle")
|
||||||
|
entries = root.getElementsByTagName("entry")
|
||||||
|
md_text = f"# {title}\n"
|
||||||
|
if subtitle:
|
||||||
|
md_text += f"{subtitle}\n"
|
||||||
|
for entry in entries:
|
||||||
|
entry_title = self._get_data_by_tag_name(entry, "title")
|
||||||
|
entry_summary = self._get_data_by_tag_name(entry, "summary")
|
||||||
|
entry_updated = self._get_data_by_tag_name(entry, "updated")
|
||||||
|
entry_content = self._get_data_by_tag_name(entry, "content")
|
||||||
|
|
||||||
|
if entry_title:
|
||||||
|
md_text += f"## {entry_title}\n"
|
||||||
|
if entry_updated:
|
||||||
|
md_text += f"Updated on: {entry_updated}\n"
|
||||||
|
if entry_summary:
|
||||||
|
md_text += self._parse_content(entry_summary)
|
||||||
|
if entry_content:
|
||||||
|
md_text += self._parse_content(entry_content)
|
||||||
|
|
||||||
|
return DocumentConverterResult(
|
||||||
|
title=title,
|
||||||
|
text_content=md_text,
|
||||||
|
)
|
||||||
|
except BaseException as _:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _parse_rss_type(self, doc: minidom.Document) -> Union[None, DocumentConverterResult]:
|
||||||
|
"""Parse the type of an RSS feed.
|
||||||
|
|
||||||
|
Returns None if the feed type is not recognized or something goes wrong.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
root = doc.getElementsByTagName("rss")[0]
|
root = doc.getElementsByTagName("rss")[0]
|
||||||
channel = root.getElementsByTagName("channel")
|
channel = root.getElementsByTagName("channel")
|
||||||
@@ -251,6 +304,8 @@ class RSSConverter(DocumentConverter):
|
|||||||
md_text = f"# {channel_title}\n"
|
md_text = f"# {channel_title}\n"
|
||||||
if channel_description:
|
if channel_description:
|
||||||
md_text += f"{channel_description}\n"
|
md_text += f"{channel_description}\n"
|
||||||
|
if not items:
|
||||||
|
items = []
|
||||||
for item in items:
|
for item in items:
|
||||||
title = self._get_data_by_tag_name(item, "title")
|
title = self._get_data_by_tag_name(item, "title")
|
||||||
description = self._get_data_by_tag_name(item, "description")
|
description = self._get_data_by_tag_name(item, "description")
|
||||||
@@ -266,9 +321,6 @@ class RSSConverter(DocumentConverter):
|
|||||||
if content:
|
if content:
|
||||||
md_text += self._parse_content(content)
|
md_text += self._parse_content(content)
|
||||||
|
|
||||||
with open("rss.md", "wt", encoding="utf-8") as f:
|
|
||||||
f.write(md_text)
|
|
||||||
|
|
||||||
return DocumentConverterResult(
|
return DocumentConverterResult(
|
||||||
title=channel_title,
|
title=channel_title,
|
||||||
text_content=md_text,
|
text_content=md_text,
|
||||||
@@ -276,7 +328,7 @@ class RSSConverter(DocumentConverter):
|
|||||||
except BaseException as _:
|
except BaseException as _:
|
||||||
print(traceback.format_exc())
|
print(traceback.format_exc())
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _parse_content(self, content: str) -> str:
|
def _parse_content(self, content: str) -> str:
|
||||||
"""Parse the content of an RSS feed item"""
|
"""Parse the content of an RSS feed item"""
|
||||||
try:
|
try:
|
||||||
|
|||||||
Reference in New Issue
Block a user