fix: Implement retry logic for YouTube transcript fetching and fix URL decoding issue (#1035)

* fix: add error handling, refactor _findKey to use json.items()

* fix: improve metadata and description extraction logic

* fix: improve YouTube transcript extraction reliability

* fix: implement retry logic for YouTube transcript fetching and fix URL decoding issue

* fix(readme): add youtube URLs as markitdown supports
This commit is contained in:
Nima Akbarzadeh
2025-02-28 08:17:54 +01:00
committed by GitHub
parent a87fbf01ee
commit a394cc7c27
3 changed files with 93 additions and 55 deletions

View File

@@ -1,4 +1,7 @@
import re
import json
import urllib.parse
import time
from typing import Any, Union, Dict, List
from urllib.parse import parse_qs, urlparse
@@ -13,7 +16,7 @@ try:
IS_YOUTUBE_TRANSCRIPT_CAPABLE = True
except ModuleNotFoundError:
pass
IS_YOUTUBE_TRANSCRIPT_CAPABLE = False
class YouTubeConverter(DocumentConverter):
@@ -24,6 +27,20 @@ class YouTubeConverter(DocumentConverter):
):
super().__init__(priority=priority)
def retry_operation(self, operation, retries=3, delay=2):
"""Retries the operation if it fails."""
attempt = 0
while attempt < retries:
try:
return operation() # Attempt the operation
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < retries - 1:
time.sleep(delay) # Wait before retrying
attempt += 1
# If all attempts fail, raise the last exception
raise Exception(f"Operation failed after {retries} attempts.")
def convert(
self, local_path: str, **kwargs: Any
) -> Union[None, DocumentConverterResult]:
@@ -32,38 +49,50 @@ class YouTubeConverter(DocumentConverter):
if extension.lower() not in [".html", ".htm"]:
return None
url = kwargs.get("url", "")
url = urllib.parse.unquote(url)
url = url.replace(r"\?", "?").replace(r"\=", "=")
if not url.startswith("https://www.youtube.com/watch?"):
return None
# Parse the file
soup = None
with open(local_path, "rt", encoding="utf-8") as fh:
soup = BeautifulSoup(fh.read(), "html.parser")
# Parse the file with error handling
try:
with open(local_path, "rt", encoding="utf-8") as fh:
soup = BeautifulSoup(fh.read(), "html.parser")
except Exception as e:
print(f"Error reading YouTube page: {e}")
return None
if not soup.title or not soup.title.string:
return None
# Read the meta tags
assert soup.title is not None and soup.title.string is not None
metadata: Dict[str, str] = {"title": soup.title.string}
for meta in soup(["meta"]):
for a in meta.attrs:
if a in ["itemprop", "property", "name"]:
metadata[meta[a]] = meta.get("content", "")
content = meta.get("content", "")
if content: # Only add non-empty content
metadata[meta[a]] = content
break
# We can also try to read the full description. This is more prone to breaking, since it reaches into the page implementation
# Try reading the description
try:
for script in soup(["script"]):
content = script.text
if not script.string: # Skip empty scripts
continue
content = script.string
if "ytInitialData" in content:
lines = re.split(r"\r?\n", content)
obj_start = lines[0].find("{")
obj_end = lines[0].rfind("}")
if obj_start >= 0 and obj_end >= 0:
data = json.loads(lines[0][obj_start : obj_end + 1])
attrdesc = self._findKey(data, "attributedDescriptionBodyText") # type: ignore
if attrdesc:
metadata["description"] = str(attrdesc["content"])
match = re.search(r"var ytInitialData = ({.*?});", content)
if match:
data = json.loads(match.group(1))
attrdesc = self._findKey(data, "attributedDescriptionBodyText")
if attrdesc and isinstance(attrdesc, dict):
metadata["description"] = str(attrdesc.get("content", ""))
break
except Exception:
except Exception as e:
print(f"Error extracting description: {e}")
pass
# Start preparing the page
@@ -99,21 +128,29 @@ class YouTubeConverter(DocumentConverter):
transcript_text = ""
parsed_url = urlparse(url) # type: ignore
params = parse_qs(parsed_url.query) # type: ignore
if "v" in params:
assert isinstance(params["v"][0], str)
if "v" in params and params["v"][0]:
video_id = str(params["v"][0])
try:
youtube_transcript_languages = kwargs.get(
"youtube_transcript_languages", ("en",)
)
# Must be a single transcript.
transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=youtube_transcript_languages) # type: ignore
transcript_text = " ".join([part["text"] for part in transcript]) # type: ignore
# Retry the transcript fetching operation
transcript = self.retry_operation(
lambda: YouTubeTranscriptApi.get_transcript(
video_id, languages=youtube_transcript_languages
),
retries=3, # Retry 3 times
delay=2, # 2 seconds delay between retries
)
if transcript:
transcript_text = " ".join(
[part["text"] for part in transcript]
) # type: ignore
# Alternative formatting:
# formatter = TextFormatter()
# formatter.format_transcript(transcript)
except Exception:
pass
except Exception as e:
print(f"Error fetching transcript: {e}")
if transcript_text:
webpage_text += f"\n### Transcript\n{transcript_text}\n"
@@ -131,23 +168,23 @@ class YouTubeConverter(DocumentConverter):
keys: List[str],
default: Union[str, None] = None,
) -> Union[str, None]:
"""Get first non-empty value from metadata matching given keys."""
for k in keys:
if k in metadata:
return metadata[k]
return default
def _findKey(self, json: Any, key: str) -> Union[str, None]: # TODO: Fix json type
"""Recursively search for a key in nested dictionary/list structures."""
if isinstance(json, list):
for elm in json:
ret = self._findKey(elm, key)
if ret is not None:
return ret
elif isinstance(json, dict):
for k in json:
for k, v in json.items():
if k == key:
return json[k]
else:
ret = self._findKey(json[k], key)
if ret is not None:
return ret
if result := self._findKey(v, key):
return result
return None

View File

@@ -184,9 +184,9 @@ def test_markitdown_remote() -> None:
# Youtube
# TODO: This test randomly fails for some reason. Haven't been able to repro it yet. Disabling until I can debug the issue
# result = markitdown.convert(YOUTUBE_TEST_URL)
# for test_string in YOUTUBE_TEST_STRINGS:
# assert test_string in result.text_content
result = markitdown.convert(YOUTUBE_TEST_URL)
for test_string in YOUTUBE_TEST_STRINGS:
assert test_string in result.text_content
def test_markitdown_local() -> None: