Merge pull request #151 from wlritchi/youtube-playlist-polymer

RFC: youtube: Polymer UI and JSON endpoints for playlists
This commit is contained in:
Tom-Oliver Heidel 2020-11-11 00:05:27 +01:00 committed by GitHub
commit 00c38ef28d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -36,6 +36,7 @@
get_element_by_attribute, get_element_by_attribute,
get_element_by_id, get_element_by_id,
int_or_none, int_or_none,
js_to_json,
mimetype2ext, mimetype2ext,
orderedSet, orderedSet,
parse_codecs, parse_codecs,
@ -70,6 +71,8 @@ class YoutubeBaseInfoExtractor(InfoExtractor):
_LOGIN_REQUIRED = False _LOGIN_REQUIRED = False
_PLAYLIST_ID_RE = r'(?:PL|LL|EC|UU|FL|RD|UL|TL|PU|OLAK5uy_)[0-9A-Za-z-_]{10,}' _PLAYLIST_ID_RE = r'(?:PL|LL|EC|UU|FL|RD|UL|TL|PU|OLAK5uy_)[0-9A-Za-z-_]{10,}'
_INITIAL_DATA_RE = r'(?:window\["ytInitialData"\]|ytInitialData)\W?=\W?({.*?});'
_YTCFG_DATA_RE = r"ytcfg.set\(({.*?})\)"
_YOUTUBE_CLIENT_HEADERS = { _YOUTUBE_CLIENT_HEADERS = {
'x-youtube-client-name': '1', 'x-youtube-client-name': '1',
@ -274,7 +277,6 @@ def warn(message):
def _download_webpage_handle(self, *args, **kwargs): def _download_webpage_handle(self, *args, **kwargs):
query = kwargs.get('query', {}).copy() query = kwargs.get('query', {}).copy()
query['disable_polymer'] = 'true'
kwargs['query'] = query kwargs['query'] = query
return super(YoutubeBaseInfoExtractor, self)._download_webpage_handle( return super(YoutubeBaseInfoExtractor, self)._download_webpage_handle(
*args, **compat_kwargs(kwargs)) *args, **compat_kwargs(kwargs))
@ -297,16 +299,61 @@ def _real_initialize(self):
class YoutubeEntryListBaseInfoExtractor(YoutubeBaseInfoExtractor): class YoutubeEntryListBaseInfoExtractor(YoutubeBaseInfoExtractor):
# Extract entries from page with "Load more" button
def _entries(self, page, playlist_id): def _find_entries_in_json(self, extracted):
more_widget_html = content_html = page entries = []
mobj_reg = r'(?:(?:data-uix-load-more-href="[^"]+?;continuation=)|(?:"continuation":"))(?P<more>[^"]+)"' c = {}
for page_num in itertools.count(1):
for entry in self._process_page(content_html): def _real_find(obj):
if obj is None or isinstance(obj, str):
return
if type(obj) is list:
for elem in obj:
_real_find(elem)
if type(obj) is dict:
if self._is_entry(obj):
entries.append(obj)
return
if 'continuationCommand' in obj:
c['continuation'] = obj
return
for _, o in obj.items():
_real_find(o)
_real_find(extracted)
return entries, try_get(c, lambda x: x["continuation"])
def _entries(self, page, playlist_id, max_pages=None):
seen = []
yt_conf = {}
for m in re.finditer(self._YTCFG_DATA_RE, page):
parsed = self._parse_json(m.group(1), playlist_id,
transform_source=js_to_json, fatal=False)
if parsed:
yt_conf.update(parsed)
data_json = self._parse_json(self._search_regex(self._INITIAL_DATA_RE, page, 'ytInitialData'), None)
for page_num in range(1, max_pages + 1) if max_pages is not None else itertools.count(1):
entries, continuation = self._find_entries_in_json(data_json)
processed = self._process_entries(entries, seen)
if not processed:
break
for entry in processed:
yield entry yield entry
mobj = re.search(mobj_reg, more_widget_html) if not continuation or not yt_conf:
if not mobj: break
continuation_token = try_get(continuation, lambda x: x['continuationCommand']['token'])
continuation_url = try_get(continuation, lambda x: x['commandMetadata']['webCommandMetadata']['apiUrl'])
if not continuation_token or not continuation_url:
break break
count = 0 count = 0
@ -315,12 +362,23 @@ def _entries(self, page, playlist_id):
try: try:
# Downloading page may result in intermittent 5xx HTTP error # Downloading page may result in intermittent 5xx HTTP error
# that is usually worked around with a retry # that is usually worked around with a retry
more = self._download_json( data_json = self._download_json(
'https://www.youtube.com/browse_ajax?ctoken=%s' % mobj.group('more'), playlist_id, 'https://www.youtube.com%s' % continuation_url,
'Downloading page #%s%s' playlist_id,
% (page_num, ' (retry #%d)' % count if count else ''), 'Downloading continuation page #%s%s' % (page_num, ' (retry #%d)' % count if count else ''),
transform_source=uppercase_escape, transform_source=uppercase_escape,
headers=self._YOUTUBE_CLIENT_HEADERS) query={
'key': try_get(yt_conf, lambda x: x['INNERTUBE_API_KEY'])
},
data=bytes(json.dumps({
'context': try_get(yt_conf, lambda x: x['INNERTUBE_CONTEXT']),
'continuation': continuation_token
}), encoding='utf-8'),
headers={
'Content-Type': 'application/json'
}
)
break break
except ExtractorError as e: except ExtractorError as e:
if isinstance(e.cause, compat_HTTPError) and e.cause.code in (500, 503): if isinstance(e.cause, compat_HTTPError) and e.cause.code in (500, 503):
@ -329,31 +387,30 @@ def _entries(self, page, playlist_id):
continue continue
raise raise
content_html = more['content_html'] def _extract_title(self, renderer):
if not content_html.strip(): title = try_get(renderer, lambda x: x['title']['runs'][0]['text'], compat_str)
# Some webpages show a "Load more" button but they don't if title:
# have more videos return title
break return try_get(renderer, lambda x: x['title']['simpleText'], compat_str)
more_widget_html = more['load_more_widget_html']
class YoutubePlaylistBaseInfoExtractor(YoutubeEntryListBaseInfoExtractor): class YoutubePlaylistBaseInfoExtractor(YoutubeEntryListBaseInfoExtractor):
def _process_page(self, content): def _is_entry(self, obj):
for video_id, video_title in self.extract_videos_from_page(content): return 'videoId' in obj
yield self.url_result(video_id, 'Youtube', video_id, video_title)
def extract_videos_from_page_impl(self, video_re, page, ids_in_page, titles_in_page): def _process_entries(self, entries, seen):
for mobj in re.finditer(video_re, page): ids_in_page = []
# The link with index 0 is not the first video of the playlist (not sure if still actual) titles_in_page = []
if 'index' in mobj.groupdict() and mobj.group('id') == '0': for renderer in entries:
video_id = try_get(renderer, lambda x: x['videoId'])
video_title = self._extract_title(renderer)
if video_id is None or video_title is None:
# we do not have a videoRenderer or title extraction broke
continue continue
video_id = mobj.group('id')
video_title = unescapeHTML( video_title = video_title.strip()
mobj.group('title')) if 'title' in mobj.groupdict() else None
if video_title:
video_title = video_title.strip()
if video_title == '► Play all':
video_title = None
try: try:
idx = ids_in_page.index(video_id) idx = ids_in_page.index(video_id)
if video_title and not titles_in_page[idx]: if video_title and not titles_in_page[idx]:
@ -362,19 +419,17 @@ def extract_videos_from_page_impl(self, video_re, page, ids_in_page, titles_in_p
ids_in_page.append(video_id) ids_in_page.append(video_id)
titles_in_page.append(video_title) titles_in_page.append(video_title)
def extract_videos_from_page(self, page): for video_id, video_title in zip(ids_in_page, titles_in_page):
ids_in_page = [] yield self.url_result(video_id, 'Youtube', video_id, video_title)
titles_in_page = []
self.extract_videos_from_page_impl(
self._VIDEO_RE, page, ids_in_page, titles_in_page)
return zip(ids_in_page, titles_in_page)
class YoutubePlaylistsBaseInfoExtractor(YoutubeEntryListBaseInfoExtractor): class YoutubePlaylistsBaseInfoExtractor(YoutubeEntryListBaseInfoExtractor):
def _process_page(self, content): def _is_entry(self, obj):
for playlist_id in orderedSet(re.findall( return 'playlistId' in obj
r'"/?playlist\?list=([0-9A-Za-z-_]{10,})"',
content)): def _process_entries(self, entries, seen):
for playlist_id in orderedSet(try_get(r, lambda x: x['playlistId']) for r in entries):
yield self.url_result( yield self.url_result(
'https://www.youtube.com/playlist?list=%s' % playlist_id, 'YoutubePlaylist') 'https://www.youtube.com/playlist?list=%s' % playlist_id, 'YoutubePlaylist')
@ -3241,11 +3296,7 @@ class YoutubePlaylistsIE(YoutubePlaylistsBaseInfoExtractor):
}] }]
class YoutubeSearchBaseInfoExtractor(YoutubePlaylistBaseInfoExtractor): class YoutubeSearchIE(SearchInfoExtractor, YoutubePlaylistBaseInfoExtractor):
_VIDEO_RE = r'href="\s*/watch\?v=(?P<id>[0-9A-Za-z_-]{11})(?:[^"]*"[^>]+\btitle="(?P<title>[^"]+))?'
class YoutubeSearchIE(SearchInfoExtractor, YoutubeSearchBaseInfoExtractor):
IE_DESC = 'YouTube.com searches' IE_DESC = 'YouTube.com searches'
# there doesn't appear to be a real limit, for example if you search for # there doesn't appear to be a real limit, for example if you search for
# 'python' you get more than 8.000.000 results # 'python' you get more than 8.000.000 results
@ -3342,11 +3393,10 @@ class YoutubeSearchDateIE(YoutubeSearchIE):
_SEARCH_PARAMS = 'CAI%3D' _SEARCH_PARAMS = 'CAI%3D'
class YoutubeSearchURLIE(YoutubeSearchBaseInfoExtractor): class YoutubeSearchURLIE(YoutubePlaylistBaseInfoExtractor):
IE_DESC = 'YouTube.com search URLs' IE_DESC = 'YouTube.com search URLs'
IE_NAME = 'youtube:search_url' IE_NAME = 'youtube:search_url'
_VALID_URL = r'https?://(?:www\.)?youtube\.com/results\?(.*?&)?(?:search_query|q)=(?P<query>[^&]+)(?:[&]|$)' _VALID_URL = r'https?://(?:www\.)?youtube\.com/results\?(.*?&)?(?:search_query|q)=(?P<query>[^&]+)(?:[&]|$)'
_SEARCH_DATA = r'(?:window\["ytInitialData"\]|ytInitialData)\W?=\W?({.*?});'
_TESTS = [{ _TESTS = [{
'url': 'https://www.youtube.com/results?baz=bar&search_query=youtube-dl+test+video&filters=video&lclk=video', 'url': 'https://www.youtube.com/results?baz=bar&search_query=youtube-dl+test+video&filters=video&lclk=video',
'playlist_mincount': 5, 'playlist_mincount': 5,
@ -3358,63 +3408,20 @@ class YoutubeSearchURLIE(YoutubeSearchBaseInfoExtractor):
'only_matching': True, 'only_matching': True,
}] }]
def _find_videos_in_json(self, extracted): def _process_json_dict(self, obj, videos, c):
videos = [] if "videoId" in obj:
videos.append(obj)
return
def _real_find(obj): if "nextContinuationData" in obj:
if obj is None or isinstance(obj, str): c["continuation"] = obj["nextContinuationData"]
return return
if type(obj) is list:
for elem in obj:
_real_find(elem)
if type(obj) is dict:
if "videoId" in obj:
videos.append(obj)
return
for _, o in obj.items():
_real_find(o)
_real_find(extracted)
return videos
def extract_videos_from_page_impl(self, page, ids_in_page, titles_in_page):
search_response = self._parse_json(self._search_regex(self._SEARCH_DATA, page, 'ytInitialData'), None)
result_items = self._find_videos_in_json(search_response)
for renderer in result_items:
video_id = try_get(renderer, lambda x: x['videoId'])
video_title = try_get(renderer, lambda x: x['title']['runs'][0]['text']) or try_get(renderer, lambda x: x['title']['simpleText'])
if video_id is None or video_title is None:
# we do not have a videoRenderer or title extraction broke
continue
video_title = video_title.strip()
try:
idx = ids_in_page.index(video_id)
if video_title and not titles_in_page[idx]:
titles_in_page[idx] = video_title
except ValueError:
ids_in_page.append(video_id)
titles_in_page.append(video_title)
def extract_videos_from_page(self, page):
ids_in_page = []
titles_in_page = []
self.extract_videos_from_page_impl(page, ids_in_page, titles_in_page)
return zip(ids_in_page, titles_in_page)
def _real_extract(self, url): def _real_extract(self, url):
mobj = re.match(self._VALID_URL, url) mobj = re.match(self._VALID_URL, url)
query = compat_urllib_parse_unquote_plus(mobj.group('query')) query = compat_urllib_parse_unquote_plus(mobj.group('query'))
webpage = self._download_webpage(url, query) webpage = self._download_webpage(url, query)
return self.playlist_result(self._process_page(webpage), playlist_title=query) return self.playlist_result(self._entries(webpage, query, max_pages=5), playlist_title=query)
class YoutubeShowIE(YoutubePlaylistsBaseInfoExtractor): class YoutubeShowIE(YoutubePlaylistsBaseInfoExtractor):
@ -3436,14 +3443,12 @@ def _real_extract(self, url):
'https://www.youtube.com/show/%s/playlists' % playlist_id) 'https://www.youtube.com/show/%s/playlists' % playlist_id)
class YoutubeFeedsInfoExtractor(YoutubeBaseInfoExtractor): class YoutubeFeedsInfoExtractor(YoutubePlaylistBaseInfoExtractor):
""" """
Base class for feed extractors Base class for feed extractors
Subclasses must define the _FEED_NAME and _PLAYLIST_TITLE properties. Subclasses must define the _FEED_NAME and _PLAYLIST_TITLE properties.
""" """
_LOGIN_REQUIRED = True _LOGIN_REQUIRED = True
_FEED_DATA = r'(?:window\["ytInitialData"\]|ytInitialData)\W?=\W?({.*?});'
_YTCFG_DATA = r"ytcfg.set\(({.*?})\)"
@property @property
def IE_NAME(self): def IE_NAME(self):
@ -3452,96 +3457,35 @@ def IE_NAME(self):
def _real_initialize(self): def _real_initialize(self):
self._login() self._login()
def _find_videos_in_json(self, extracted): def _process_entries(self, entries, seen):
videos = [] new_info = []
c = {} for v in entries:
v_id = try_get(v, lambda x: x['videoId'])
if not v_id:
continue
def _real_find(obj): have_video = False
if obj is None or isinstance(obj, str): for old in seen:
return if old['videoId'] == v_id:
have_video = True
break
if type(obj) is list: if not have_video:
for elem in obj: new_info.append(v)
_real_find(elem)
if type(obj) is dict: if not new_info:
if "videoId" in obj: return
videos.append(obj)
return
if "nextContinuationData" in obj: seen.extend(new_info)
c["continuation"] = obj["nextContinuationData"] for video in new_info:
return yield self.url_result(try_get(video, lambda x: x['videoId']), YoutubeIE.ie_key(), video_title=self._extract_title(video))
for _, o in obj.items():
_real_find(o)
_real_find(extracted)
return videos, try_get(c, lambda x: x["continuation"])
def _entries(self, page):
info = []
yt_conf = self._parse_json(self._search_regex(self._YTCFG_DATA, page, 'ytcfg.set', default="null"), None, fatal=False)
search_response = self._parse_json(self._search_regex(self._FEED_DATA, page, 'ytInitialData'), None)
for page_num in itertools.count(1):
video_info, continuation = self._find_videos_in_json(search_response)
new_info = []
for v in video_info:
v_id = try_get(v, lambda x: x['videoId'])
if not v_id:
continue
have_video = False
for old in info:
if old['videoId'] == v_id:
have_video = True
break
if not have_video:
new_info.append(v)
if not new_info:
break
info.extend(new_info)
for video in new_info:
yield self.url_result(try_get(video, lambda x: x['videoId']), YoutubeIE.ie_key(), video_title=try_get(video, lambda x: x['title']['runs'][0]['text']) or try_get(video, lambda x: x['title']['simpleText']))
if not continuation or not yt_conf:
break
search_response = self._download_json(
'https://www.youtube.com/browse_ajax', self._PLAYLIST_TITLE,
'Downloading page #%s' % page_num,
transform_source=uppercase_escape,
query={
"ctoken": try_get(continuation, lambda x: x["continuation"]),
"continuation": try_get(continuation, lambda x: x["continuation"]),
"itct": try_get(continuation, lambda x: x["clickTrackingParams"])
},
headers={
"X-YouTube-Client-Name": try_get(yt_conf, lambda x: x["INNERTUBE_CONTEXT_CLIENT_NAME"]),
"X-YouTube-Client-Version": try_get(yt_conf, lambda x: x["INNERTUBE_CONTEXT_CLIENT_VERSION"]),
"X-Youtube-Identity-Token": try_get(yt_conf, lambda x: x["ID_TOKEN"]),
"X-YouTube-Device": try_get(yt_conf, lambda x: x["DEVICE"]),
"X-YouTube-Page-CL": try_get(yt_conf, lambda x: x["PAGE_CL"]),
"X-YouTube-Page-Label": try_get(yt_conf, lambda x: x["PAGE_BUILD_LABEL"]),
"X-YouTube-Variants-Checksum": try_get(yt_conf, lambda x: x["VARIANTS_CHECKSUM"]),
})
def _real_extract(self, url): def _real_extract(self, url):
page = self._download_webpage( page = self._download_webpage(
'https://www.youtube.com/feed/%s' % self._FEED_NAME, 'https://www.youtube.com/feed/%s' % self._FEED_NAME,
self._PLAYLIST_TITLE) self._PLAYLIST_TITLE)
return self.playlist_result( return self.playlist_result(self._entries(page, self._PLAYLIST_TITLE),
self._entries(page), playlist_title=self._PLAYLIST_TITLE) playlist_title=self._PLAYLIST_TITLE)
class YoutubeWatchLaterIE(YoutubePlaylistIE): class YoutubeWatchLaterIE(YoutubePlaylistIE):