From 47193e0298200d89e8c4fc8d8886c50d05458278 Mon Sep 17 00:00:00 2001 From: coletdjnz Date: Thu, 15 Jul 2021 14:42:30 +1200 Subject: [PATCH] [youtube:tab] Extract playlist availability (#504) Authored by: colethedj --- yt_dlp/extractor/youtube.py | 224 ++++++++++++++++++++++-------------- 1 file changed, 136 insertions(+), 88 deletions(-) diff --git a/yt_dlp/extractor/youtube.py b/yt_dlp/extractor/youtube.py index a2abdc5036..d0056203f1 100644 --- a/yt_dlp/extractor/youtube.py +++ b/yt_dlp/extractor/youtube.py @@ -645,6 +645,28 @@ def _report_alerts(self, alerts, expected=True): def _extract_and_report_alerts(self, data, *args, **kwargs): return self._report_alerts(self._extract_alerts(data), *args, **kwargs) + def _extract_badges(self, renderer: dict): + badges = set() + for badge in try_get(renderer, lambda x: x['badges'], list) or []: + label = try_get(badge, lambda x: x['metadataBadgeRenderer']['label'], compat_str) + if label: + badges.add(label.lower()) + return badges + + @staticmethod + def _join_text_entries(runs): + text = None + for run in runs: + if not isinstance(run, dict): + continue + sub_text = try_get(run, lambda x: x['text'], compat_str) + if sub_text: + if not text: + text = sub_text + continue + text += sub_text + return text + def _extract_response(self, item_id, query, note='Downloading API JSON', headers=None, ytcfg=None, check_get_keys=None, ep='browse', fatal=True, api_hostname=None, default_client='WEB'): @@ -1971,20 +1993,6 @@ def parse_time_text(time_text): if len(time_text_split) >= 3: return datetime_from_str('now-%s%s' % (time_text_split[0], time_text_split[1]), precision='auto') - @staticmethod - def _join_text_entries(runs): - text = None - for run in runs: - if not isinstance(run, dict): - continue - sub_text = try_get(run, lambda x: x['text'], compat_str) - if sub_text: - if not text: - text = sub_text - continue - text += sub_text - return text - def _extract_comment(self, comment_renderer, parent=None): comment_id = comment_renderer.get('commentId') if not comment_id: @@ -2959,21 +2967,20 @@ def chapter_time(mmlir): if initial_data and is_private is not None: is_membersonly = False is_premium = False - contents = try_get(initial_data, lambda x: x['contents']['twoColumnWatchNextResults']['results']['results']['contents'], list) - for content in contents or []: - badges = try_get(content, lambda x: x['videoPrimaryInfoRenderer']['badges'], list) - for badge in badges or []: - label = try_get(badge, lambda x: x['metadataBadgeRenderer']['label']) or '' - if label.lower() == 'members only': - is_membersonly = True - break - elif label.lower() == 'premium': - is_premium = True - break - if is_membersonly or is_premium: - break + contents = try_get(initial_data, lambda x: x['contents']['twoColumnWatchNextResults']['results']['results']['contents'], list) or [] + badge_labels = set() + for content in contents: + if not isinstance(content, dict): + continue + badge_labels.update(self._extract_badges(content.get('videoPrimaryInfoRenderer'))) + for badge_label in badge_labels: + if badge_label.lower() == 'members only': + is_membersonly = True + elif badge_label.lower() == 'premium': + is_premium = True + elif badge_label.lower() == 'unlisted': + is_unlisted = True - # TODO: Add this for playlists info['availability'] = self._availability( is_private=is_private, needs_premium=is_premium, @@ -3447,6 +3454,17 @@ class YoutubeTabIE(YoutubeBaseInfoExtractor): 'title': 'Album - Royalty Free Music Library V2 (50 Songs)', }, 'playlist_count': 50, + }, { + 'note': 'unlisted single video playlist', + 'url': 'https://www.youtube.com/playlist?list=PLwL24UFy54GrB3s2KMMfjZscDi1x5Dajf', + 'info_dict': { + 'uploader_id': 'UC9zHu_mHU96r19o-wV5Qs1Q', + 'uploader': 'colethedj', + 'id': 'PLwL24UFy54GrB3s2KMMfjZscDi1x5Dajf', + 'title': 'yt-dlp unlisted playlist test', + 'availability': 'unlisted' + }, + 'playlist_count': 1, }] @classmethod @@ -3768,27 +3786,19 @@ def _extract_selected_tab(tabs): else: raise ExtractorError('Unable to find selected tab') - @staticmethod - def _extract_uploader(data): + @classmethod + def _extract_uploader(cls, data): uploader = {} - sidebar_renderer = try_get( - data, lambda x: x['sidebar']['playlistSidebarRenderer']['items'], list) - if sidebar_renderer: - for item in sidebar_renderer: - if not isinstance(item, dict): - continue - renderer = item.get('playlistSidebarSecondaryInfoRenderer') - if not isinstance(renderer, dict): - continue - owner = try_get( - renderer, lambda x: x['videoOwner']['videoOwnerRenderer']['title']['runs'][0], dict) - if owner: - uploader['uploader'] = owner.get('text') - uploader['uploader_id'] = try_get( - owner, lambda x: x['navigationEndpoint']['browseEndpoint']['browseId'], compat_str) - uploader['uploader_url'] = urljoin( - 'https://www.youtube.com/', - try_get(owner, lambda x: x['navigationEndpoint']['browseEndpoint']['canonicalBaseUrl'], compat_str)) + renderer = cls._extract_sidebar_info_renderer(data, 'playlistSidebarSecondaryInfoRenderer') or {} + owner = try_get( + renderer, lambda x: x['videoOwner']['videoOwnerRenderer']['title']['runs'][0], dict) + if owner: + uploader['uploader'] = owner.get('text') + uploader['uploader_id'] = try_get( + owner, lambda x: x['navigationEndpoint']['browseEndpoint']['browseId'], compat_str) + uploader['uploader_url'] = urljoin( + 'https://www.youtube.com/', + try_get(owner, lambda x: x['navigationEndpoint']['browseEndpoint']['canonicalBaseUrl'], compat_str)) return {k: v for k, v in uploader.items() if v is not None} def _extract_from_tabs(self, item_id, webpage, data, tabs): @@ -3814,8 +3824,8 @@ def _extract_from_tabs(self, item_id, webpage, data, tabs): thumbnails_list = ( try_get(renderer, lambda x: x['avatar']['thumbnails'], list) or try_get( - data, - lambda x: x['sidebar']['playlistSidebarRenderer']['items'][0]['playlistSidebarPrimaryInfoRenderer']['thumbnailRenderer']['playlistVideoThumbnailRenderer']['thumbnail']['thumbnails'], + self._extract_sidebar_info_renderer(data, 'playlistSidebarPrimaryInfoRenderer'), + lambda x: x['thumbnailRenderer']['playlistVideoThumbnailRenderer']['thumbnail']['thumbnails'], list) or []) @@ -3839,7 +3849,6 @@ def _extract_from_tabs(self, item_id, webpage, data, tabs): or playlist_id) title += format_field(selected_tab, 'title', ' - %s') title += format_field(selected_tab, 'expandedText', ' - %s') - metadata = { 'playlist_id': playlist_id, 'playlist_title': title, @@ -3850,6 +3859,9 @@ def _extract_from_tabs(self, item_id, webpage, data, tabs): 'thumbnails': thumbnails, 'tags': tags, } + availability = self._extract_availability(data) + if availability: + metadata['availability'] = availability if not channel_id: metadata.update(self._extract_uploader(data)) metadata.update({ @@ -3921,49 +3933,86 @@ def _extract_from_playlist(self, item_id, url, data, playlist, webpage): self._extract_mix_playlist(playlist, playlist_id, data, webpage), playlist_id=playlist_id, playlist_title=title) + def _extract_availability(self, data): + """ + Gets the availability of a given playlist/tab. + Note: Unless YouTube tells us explicitly, we do not assume it is public + @param data: response + """ + is_private = is_unlisted = None + renderer = self._extract_sidebar_info_renderer(data, 'playlistSidebarPrimaryInfoRenderer') or {} + badge_labels = self._extract_badges(renderer) + + # Personal playlists, when authenticated, have a dropdown visibility selector instead of a badge + privacy_dropdown_entries = try_get( + renderer, lambda x: x['privacyForm']['dropdownFormFieldRenderer']['dropdown']['dropdownRenderer']['entries'], list) or [] + for renderer_dict in privacy_dropdown_entries: + is_selected = try_get( + renderer_dict, lambda x: x['privacyDropdownItemRenderer']['isSelected'], bool) or False + if not is_selected: + continue + label = self._join_text_entries( + try_get(renderer_dict, lambda x: x['privacyDropdownItemRenderer']['label']['runs'], list) or []) + if label: + badge_labels.add(label.lower()) + break + + for badge_label in badge_labels: + if badge_label == 'unlisted': + is_unlisted = True + elif badge_label == 'private': + is_private = True + elif badge_label == 'public': + is_unlisted = is_private = False + return self._availability(is_private, False, False, False, is_unlisted) + + @staticmethod + def _extract_sidebar_info_renderer(data, info_renderer, expected_type=dict): + sidebar_renderer = try_get( + data, lambda x: x['sidebar']['playlistSidebarRenderer']['items'], list) or [] + for item in sidebar_renderer: + renderer = try_get(item, lambda x: x[info_renderer], expected_type) + if renderer: + return renderer + def _reload_with_unavailable_videos(self, item_id, data, webpage): """ Get playlist with unavailable videos if the 'show unavailable videos' button exists. """ - sidebar_renderer = try_get( - data, lambda x: x['sidebar']['playlistSidebarRenderer']['items'], list) - if not sidebar_renderer: - return browse_id = params = None - for item in sidebar_renderer: - if not isinstance(item, dict): + renderer = self._extract_sidebar_info_renderer(data, 'playlistSidebarPrimaryInfoRenderer') + if not renderer: + return + menu_renderer = try_get( + renderer, lambda x: x['menu']['menuRenderer']['items'], list) or [] + for menu_item in menu_renderer: + if not isinstance(menu_item, dict): continue - renderer = item.get('playlistSidebarPrimaryInfoRenderer') - menu_renderer = try_get( - renderer, lambda x: x['menu']['menuRenderer']['items'], list) or [] - for menu_item in menu_renderer: - if not isinstance(menu_item, dict): - continue - nav_item_renderer = menu_item.get('menuNavigationItemRenderer') - text = try_get( - nav_item_renderer, lambda x: x['text']['simpleText'], compat_str) - if not text or text.lower() != 'show unavailable videos': - continue - browse_endpoint = try_get( - nav_item_renderer, lambda x: x['navigationEndpoint']['browseEndpoint'], dict) or {} - browse_id = browse_endpoint.get('browseId') - params = browse_endpoint.get('params') - break + nav_item_renderer = menu_item.get('menuNavigationItemRenderer') + text = try_get( + nav_item_renderer, lambda x: x['text']['simpleText'], compat_str) + if not text or text.lower() != 'show unavailable videos': + continue + browse_endpoint = try_get( + nav_item_renderer, lambda x: x['navigationEndpoint']['browseEndpoint'], dict) or {} + browse_id = browse_endpoint.get('browseId') + params = browse_endpoint.get('params') + break - ytcfg = self._extract_ytcfg(item_id, webpage) - headers = self._generate_api_headers( - ytcfg, account_syncid=self._extract_account_syncid(ytcfg), - identity_token=self._extract_identity_token(webpage, item_id=item_id), - visitor_data=try_get( - self._extract_context(ytcfg), lambda x: x['client']['visitorData'], compat_str)) - query = { - 'params': params or 'wgYCCAA=', - 'browseId': browse_id or 'VL%s' % item_id - } - return self._extract_response( - item_id=item_id, headers=headers, query=query, - check_get_keys='contents', fatal=False, - note='Downloading API JSON with unavailable videos') + ytcfg = self._extract_ytcfg(item_id, webpage) + headers = self._generate_api_headers( + ytcfg, account_syncid=self._extract_account_syncid(ytcfg), + identity_token=self._extract_identity_token(webpage, item_id=item_id), + visitor_data=try_get( + self._extract_context(ytcfg), lambda x: x['client']['visitorData'], compat_str)) + query = { + 'params': params or 'wgYCCAA=', + 'browseId': browse_id or 'VL%s' % item_id + } + return self._extract_response( + item_id=item_id, headers=headers, query=query, + check_get_keys='contents', fatal=False, + note='Downloading API JSON with unavailable videos') def _extract_webpage(self, url, item_id): retries = self.get_param('extractor_retries', 3) @@ -4100,7 +4149,6 @@ def get_mobj(url): if 'no-youtube-unavailable-videos' not in compat_opts: data = self._reload_with_unavailable_videos(item_id, data, webpage) or data self._extract_and_report_alerts(data) - tabs = try_get( data, lambda x: x['contents']['twoColumnBrowseResultsRenderer']['tabs'], list) if tabs: