Fix remaining mypy errors. (#1132)
This commit is contained in:
@@ -139,7 +139,7 @@ def main():
|
||||
else:
|
||||
charset_hint = None
|
||||
|
||||
stream_info: str | None = None
|
||||
stream_info = None
|
||||
if (
|
||||
extension_hint is not None
|
||||
or mime_type_hint is not None
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import io
|
||||
import re
|
||||
import base64
|
||||
import binascii
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
from typing import Any, BinaryIO, Optional
|
||||
from bs4 import BeautifulSoup
|
||||
@@ -60,6 +61,8 @@ class BingSerpConverter(DocumentConverter):
|
||||
stream_info: StreamInfo,
|
||||
**kwargs: Any, # Options to pass to the converter
|
||||
) -> DocumentConverterResult:
|
||||
assert stream_info.url is not None
|
||||
|
||||
# Parse the query parameters
|
||||
parsed_params = parse_qs(urlparse(stream_info.url).query)
|
||||
query = parsed_params.get("q", [""])[0]
|
||||
@@ -79,6 +82,9 @@ class BingSerpConverter(DocumentConverter):
|
||||
_markdownify = _CustomMarkdownify()
|
||||
results = list()
|
||||
for result in soup.find_all(class_="b_algo"):
|
||||
if not hasattr(result, "find_all"):
|
||||
continue
|
||||
|
||||
# Rewrite redirect urls
|
||||
for a in result.find_all("a", href=True):
|
||||
parsed_href = urlparse(a["href"])
|
||||
|
||||
@@ -9,7 +9,7 @@ from .._exceptions import MissingDependencyException, MISSING_DEPENDENCY_MESSAGE
|
||||
_dependency_exc_info = None
|
||||
olefile = None
|
||||
try:
|
||||
import olefile
|
||||
import olefile # type: ignore[no-redef]
|
||||
except ImportError:
|
||||
# Preserve the error and stack trace for later
|
||||
_dependency_exc_info = sys.exc_info()
|
||||
@@ -56,12 +56,13 @@ class OutlookMsgConverter(DocumentConverter):
|
||||
|
||||
# Brue force, check if it's an Outlook file
|
||||
try:
|
||||
msg = olefile.OleFileIO(file_stream)
|
||||
toc = "\n".join([str(stream) for stream in msg.listdir()])
|
||||
return (
|
||||
"__properties_version1.0" in toc
|
||||
and "__recip_version1.0_#00000000" in toc
|
||||
)
|
||||
if olefile is not None:
|
||||
msg = olefile.OleFileIO(file_stream)
|
||||
toc = "\n".join([str(stream) for stream in msg.listdir()])
|
||||
return (
|
||||
"__properties_version1.0" in toc
|
||||
and "__recip_version1.0_#00000000" in toc
|
||||
)
|
||||
except Exception as e:
|
||||
pass
|
||||
finally:
|
||||
@@ -89,7 +90,11 @@ class OutlookMsgConverter(DocumentConverter):
|
||||
_dependency_exc_info[2]
|
||||
)
|
||||
|
||||
assert (
|
||||
olefile is not None
|
||||
) # If we made it this far, olefile should be available
|
||||
msg = olefile.OleFileIO(file_stream)
|
||||
|
||||
# Extract email metadata
|
||||
md_content = "# Email Message\n\n"
|
||||
|
||||
@@ -121,6 +126,7 @@ class OutlookMsgConverter(DocumentConverter):
|
||||
|
||||
def _get_stream_data(self, msg: Any, stream_path: str) -> Union[str, None]:
|
||||
"""Helper to safely extract and decode stream data from the MSG file."""
|
||||
assert olefile is not None
|
||||
assert isinstance(
|
||||
msg, olefile.OleFileIO
|
||||
) # Ensure msg is of the correct type (type hinting is not possible with the optional olefile package)
|
||||
|
||||
@@ -66,7 +66,7 @@ class RssConverter(DocumentConverter):
|
||||
file_stream.seek(cur_pos)
|
||||
return False
|
||||
|
||||
def _feed_type(self, doc: Any) -> str:
|
||||
def _feed_type(self, doc: Any) -> str | None:
|
||||
if doc.getElementsByTagName("rss"):
|
||||
return "rss"
|
||||
elif doc.getElementsByTagName("feed"):
|
||||
@@ -130,10 +130,10 @@ class RssConverter(DocumentConverter):
|
||||
Returns None if the feed type is not recognized or something goes wrong.
|
||||
"""
|
||||
root = doc.getElementsByTagName("rss")[0]
|
||||
channel = root.getElementsByTagName("channel")
|
||||
if not channel:
|
||||
return None
|
||||
channel = channel[0]
|
||||
channel_list = root.getElementsByTagName("channel")
|
||||
if not channel_list:
|
||||
raise ValueError("No channel found in RSS feed")
|
||||
channel = channel_list[0]
|
||||
channel_title = self._get_data_by_tag_name(channel, "title")
|
||||
channel_description = self._get_data_by_tag_name(channel, "description")
|
||||
items = channel.getElementsByTagName("item")
|
||||
@@ -141,8 +141,6 @@ class RssConverter(DocumentConverter):
|
||||
md_text = f"# {channel_title}\n"
|
||||
if channel_description:
|
||||
md_text += f"{channel_description}\n"
|
||||
if not items:
|
||||
items = []
|
||||
for item in items:
|
||||
title = self._get_data_by_tag_name(item, "title")
|
||||
description = self._get_data_by_tag_name(item, "description")
|
||||
@@ -183,5 +181,6 @@ class RssConverter(DocumentConverter):
|
||||
return None
|
||||
fc = nodes[0].firstChild
|
||||
if fc:
|
||||
return fc.data
|
||||
if hasattr(fc, "data"):
|
||||
return fc.data
|
||||
return None
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import io
|
||||
import re
|
||||
import bs4
|
||||
from typing import Any, BinaryIO, Optional
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from .._base_converter import DocumentConverter, DocumentConverterResult
|
||||
from .._stream_info import StreamInfo
|
||||
@@ -57,7 +57,7 @@ class WikipediaConverter(DocumentConverter):
|
||||
) -> DocumentConverterResult:
|
||||
# Parse the stream
|
||||
encoding = "utf-8" if stream_info.charset is None else stream_info.charset
|
||||
soup = BeautifulSoup(file_stream, "html.parser", from_encoding=encoding)
|
||||
soup = bs4.BeautifulSoup(file_stream, "html.parser", from_encoding=encoding)
|
||||
|
||||
# Remove javascript and style blocks
|
||||
for script in soup(["script", "style"]):
|
||||
@@ -72,9 +72,8 @@ class WikipediaConverter(DocumentConverter):
|
||||
|
||||
if body_elm:
|
||||
# What's the title
|
||||
if title_elm and len(title_elm) > 0:
|
||||
main_title = title_elm.string # type: ignore
|
||||
assert isinstance(main_title, str)
|
||||
if title_elm and isinstance(title_elm, bs4.Tag):
|
||||
main_title = title_elm.string
|
||||
|
||||
# Convert the page
|
||||
webpage_text = f"# {main_title}\n\n" + _CustomMarkdownify().convert_soup(
|
||||
|
||||
@@ -3,9 +3,9 @@ import json
|
||||
import time
|
||||
import io
|
||||
import re
|
||||
import bs4
|
||||
from typing import Any, BinaryIO, Optional, Dict, List, Union
|
||||
from urllib.parse import parse_qs, urlparse, unquote
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from .._base_converter import DocumentConverter, DocumentConverterResult
|
||||
from .._stream_info import StreamInfo
|
||||
@@ -72,21 +72,31 @@ class YouTubeConverter(DocumentConverter):
|
||||
) -> DocumentConverterResult:
|
||||
# Parse the stream
|
||||
encoding = "utf-8" if stream_info.charset is None else stream_info.charset
|
||||
soup = BeautifulSoup(file_stream, "html.parser", from_encoding=encoding)
|
||||
soup = bs4.BeautifulSoup(file_stream, "html.parser", from_encoding=encoding)
|
||||
|
||||
# Read the meta tags
|
||||
metadata: Dict[str, str] = {"title": soup.title.string}
|
||||
metadata: Dict[str, str] = {}
|
||||
|
||||
if soup.title and soup.title.string:
|
||||
metadata["title"] = soup.title.string
|
||||
|
||||
for meta in soup(["meta"]):
|
||||
if not isinstance(meta, bs4.Tag):
|
||||
continue
|
||||
|
||||
for a in meta.attrs:
|
||||
if a in ["itemprop", "property", "name"]:
|
||||
content = meta.get("content", "")
|
||||
if content: # Only add non-empty content
|
||||
metadata[meta[a]] = content
|
||||
key = str(meta.get(a, ""))
|
||||
content = str(meta.get("content", ""))
|
||||
if key and content: # Only add non-empty content
|
||||
metadata[key] = content
|
||||
break
|
||||
|
||||
# Try reading the description
|
||||
try:
|
||||
for script in soup(["script"]):
|
||||
if not isinstance(script, bs4.Tag):
|
||||
continue
|
||||
if not script.string: # Skip empty scripts
|
||||
continue
|
||||
content = script.string
|
||||
@@ -161,7 +171,7 @@ class YouTubeConverter(DocumentConverter):
|
||||
if transcript_text:
|
||||
webpage_text += f"\n### Transcript\n{transcript_text}\n"
|
||||
|
||||
title = title if title else soup.title.string
|
||||
title = title if title else (soup.title.string if soup.title else "")
|
||||
assert isinstance(title, str)
|
||||
|
||||
return DocumentConverterResult(
|
||||
|
||||
@@ -114,7 +114,9 @@ def test_input_from_stdin_without_hints(shared_tmp_dir, test_vector) -> None:
|
||||
)
|
||||
|
||||
stdout = result.stdout.decode(locale.getpreferredencoding())
|
||||
assert result.returncode == 0, f"CLI exited with error: {result.stderr}"
|
||||
assert (
|
||||
result.returncode == 0
|
||||
), f"CLI exited with error: {result.stderr.decode('utf-8')}"
|
||||
for test_string in test_vector.must_include:
|
||||
assert test_string in stdout
|
||||
for test_string in test_vector.must_not_include:
|
||||
|
||||
Reference in New Issue
Block a user