[ie/youtube] Refactor cookie auth (#11989)
Some checks are pending
CodeQL / Analyze (python) (push) Waiting to run
Download Tests / Quick Download Tests (push) Waiting to run
Download Tests / Full Download Tests (ubuntu-latest, 3.10) (push) Waiting to run
Download Tests / Full Download Tests (ubuntu-latest, 3.11) (push) Waiting to run
Download Tests / Full Download Tests (ubuntu-latest, 3.12) (push) Waiting to run
Download Tests / Full Download Tests (ubuntu-latest, 3.13) (push) Waiting to run
Download Tests / Full Download Tests (ubuntu-latest, pypy-3.10) (push) Waiting to run
Download Tests / Full Download Tests (windows-latest, 3.9) (push) Waiting to run
Download Tests / Full Download Tests (windows-latest, pypy-3.10) (push) Waiting to run
Quick Test / Core Test (push) Waiting to run
Quick Test / Code check (push) Waiting to run
Release (master) / release (push) Waiting to run
Release (master) / publish_pypi (push) Blocked by required conditions

Authored by: coletdjnz
This commit is contained in:
coletdjnz 2025-01-12 15:02:57 +13:00 committed by GitHub
parent 712d2abb32
commit 75079f4e3f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -32,7 +32,6 @@
classproperty,
clean_html,
datetime_from_str,
dict_get,
filesize_from_tbr,
filter_dict,
float_or_none,
@ -568,9 +567,15 @@ def _initialize_pref(self):
pref.update({'hl': self._preferred_lang or 'en', 'tz': 'UTC'})
self._set_cookie('.youtube.com', name='PREF', value=urllib.parse.urlencode(pref))
def _initialize_cookie_auth(self):
yt_sapisid, yt_1psapisid, yt_3psapisid = self._get_sid_cookies()
if yt_sapisid or yt_1psapisid or yt_3psapisid:
self.write_debug('Found YouTube account cookies')
def _real_initialize(self):
self._initialize_pref()
self._initialize_consent()
self._initialize_cookie_auth()
self._check_login_required()
def _perform_login(self, username, password):
@ -628,32 +633,63 @@ def _extract_context(self, ytcfg=None, default_client='web'):
client_context.update({'hl': self._preferred_lang or 'en', 'timeZone': 'UTC', 'utcOffsetMinutes': 0})
return context
_SAPISID = None
@staticmethod
def _make_sid_authorization(scheme, sid, origin, additional_parts):
timestamp = str(round(time.time()))
def _generate_sapisidhash_header(self, origin='https://www.youtube.com'):
time_now = round(time.time())
if self._SAPISID is None:
yt_cookies = self._get_cookies('https://www.youtube.com')
# Sometimes SAPISID cookie isn't present but __Secure-3PAPISID is.
# See: https://github.com/yt-dlp/yt-dlp/issues/393
sapisid_cookie = dict_get(
yt_cookies, ('__Secure-3PAPISID', 'SAPISID'))
if sapisid_cookie and sapisid_cookie.value:
self._SAPISID = sapisid_cookie.value
self.write_debug('Extracted SAPISID cookie')
# SAPISID cookie is required if not already present
if not yt_cookies.get('SAPISID'):
self.write_debug('Copying __Secure-3PAPISID cookie to SAPISID cookie')
self._set_cookie(
'.youtube.com', 'SAPISID', self._SAPISID, secure=True, expire_time=time_now + 3600)
else:
self._SAPISID = False
if not self._SAPISID:
hash_parts = []
if additional_parts:
hash_parts.append(':'.join(additional_parts.values()))
hash_parts.extend([timestamp, sid, origin])
sidhash = hashlib.sha1(' '.join(hash_parts).encode()).hexdigest()
parts = [timestamp, sidhash]
if additional_parts:
parts.append(''.join(additional_parts))
return f'{scheme} {"_".join(parts)}'
def _get_sid_cookies(self):
"""
Get SAPISID, 1PSAPISID, 3PSAPISID cookie values
@returns sapisid, 1psapisid, 3psapisid
"""
yt_cookies = self._get_cookies('https://www.youtube.com')
yt_sapisid = try_call(lambda: yt_cookies['SAPISID'].value)
yt_3papisid = try_call(lambda: yt_cookies['__Secure-3PAPISID'].value)
yt_1papisid = try_call(lambda: yt_cookies['__Secure-1PAPISID'].value)
# Sometimes SAPISID cookie isn't present but __Secure-3PAPISID is.
# YouTube also falls back to __Secure-3PAPISID if SAPISID is missing.
# See: https://github.com/yt-dlp/yt-dlp/issues/393
return yt_sapisid or yt_3papisid, yt_1papisid, yt_3papisid
def _get_sid_authorization_header(self, origin='https://www.youtube.com', user_session_id=None):
"""
Generate API Session ID Authorization for Innertube requests. Assumes all requests are secure (https).
@param origin: Origin URL
@param user_session_id: Optional User Session ID
@return: Authorization header value
"""
authorizations = []
additional_parts = {}
if user_session_id:
additional_parts['u'] = user_session_id
yt_sapisid, yt_1psapisid, yt_3psapisid = self._get_sid_cookies()
for scheme, sid in (('SAPISIDHASH', yt_sapisid),
('SAPISID1PHASH', yt_1psapisid),
('SAPISID3PHASH', yt_3psapisid)):
if sid:
authorizations.append(self._make_sid_authorization(scheme, sid, origin, additional_parts))
if not authorizations:
return None
# SAPISIDHASH algorithm from https://stackoverflow.com/a/32065323
sapisidhash = hashlib.sha1(
f'{time_now} {self._SAPISID} {origin}'.encode()).hexdigest()
return f'SAPISIDHASH {time_now}_{sapisidhash}'
return ' '.join(authorizations)
def _call_api(self, ep, query, video_id, fatal=True, headers=None,
note='Downloading API JSON', errnote='Unable to download API page',
@ -689,26 +725,48 @@ def _extract_session_index(*data):
if session_index is not None:
return session_index
def _data_sync_id_to_delegated_session_id(self, data_sync_id):
if not data_sync_id:
return
# datasyncid is of the form "channel_syncid||user_syncid" for secondary channel
# and just "user_syncid||" for primary channel. We only want the channel_syncid
channel_syncid, _, user_syncid = data_sync_id.partition('||')
if user_syncid:
return channel_syncid
def _extract_account_syncid(self, *args):
@staticmethod
def _parse_data_sync_id(data_sync_id):
"""
Extract current session ID required to download private playlists of secondary channels
Parse data_sync_id into delegated_session_id and user_session_id.
data_sync_id is of the form "delegated_session_id||user_session_id" for secondary channel
and just "user_session_id||" for primary channel.
@param data_sync_id: data_sync_id string
@return: Tuple of (delegated_session_id, user_session_id)
"""
if not data_sync_id:
return None, None
first, _, second = data_sync_id.partition('||')
if second:
return first, second
return None, first
def _extract_delegated_session_id(self, *args):
"""
Extract current delegated session ID required to download private playlists of secondary channels
@params response and/or ytcfg
@return: delegated session ID
"""
# ytcfg includes channel_syncid if on secondary channel
if delegated_sid := traverse_obj(args, (..., 'DELEGATED_SESSION_ID', {str}, any)):
return delegated_sid
data_sync_id = self._extract_data_sync_id(*args)
return self._data_sync_id_to_delegated_session_id(data_sync_id)
return self._parse_data_sync_id(data_sync_id)[0]
def _extract_user_session_id(self, *args):
"""
Extract current user session ID
@params response and/or ytcfg
@return: user session ID
"""
if user_sid := traverse_obj(args, (..., 'USER_SESSION_ID', {str}, any)):
return user_sid
data_sync_id = self._extract_data_sync_id(*args)
return self._parse_data_sync_id(data_sync_id)[1]
def _extract_data_sync_id(self, *args):
"""
@ -735,7 +793,7 @@ def _extract_visitor_data(self, *args):
@functools.cached_property
def is_authenticated(self):
return bool(self._generate_sapisidhash_header())
return bool(self._get_sid_authorization_header())
def extract_ytcfg(self, video_id, webpage):
if not webpage:
@ -745,25 +803,28 @@ def extract_ytcfg(self, video_id, webpage):
r'ytcfg\.set\s*\(\s*({.+?})\s*\)\s*;', webpage, 'ytcfg',
default='{}'), video_id, fatal=False) or {}
def _generate_cookie_auth_headers(self, *, ytcfg=None, account_syncid=None, session_index=None, origin=None, **kwargs):
def _generate_cookie_auth_headers(self, *, ytcfg=None, delegated_session_id=None, user_session_id=None, session_index=None, origin=None, **kwargs):
headers = {}
account_syncid = account_syncid or self._extract_account_syncid(ytcfg)
if account_syncid:
headers['X-Goog-PageId'] = account_syncid
delegated_session_id = delegated_session_id or self._extract_delegated_session_id(ytcfg)
if delegated_session_id:
headers['X-Goog-PageId'] = delegated_session_id
if session_index is None:
session_index = self._extract_session_index(ytcfg)
if account_syncid or session_index is not None:
if delegated_session_id or session_index is not None:
headers['X-Goog-AuthUser'] = session_index if session_index is not None else 0
auth = self._generate_sapisidhash_header(origin)
auth = self._get_sid_authorization_header(origin, user_session_id=user_session_id or self._extract_user_session_id(ytcfg))
if auth is not None:
headers['Authorization'] = auth
headers['X-Origin'] = origin
if traverse_obj(ytcfg, 'LOGGED_IN', expected_type=bool):
headers['X-Youtube-Bootstrap-Logged-In'] = 'true'
return headers
def generate_api_headers(
self, *, ytcfg=None, account_syncid=None, session_index=None,
self, *, ytcfg=None, delegated_session_id=None, user_session_id=None, session_index=None,
visitor_data=None, api_hostname=None, default_client='web', **kwargs):
origin = 'https://' + (self._select_api_hostname(api_hostname, default_client))
@ -774,7 +835,12 @@ def generate_api_headers(
'Origin': origin,
'X-Goog-Visitor-Id': visitor_data or self._extract_visitor_data(ytcfg),
'User-Agent': self._ytcfg_get_safe(ytcfg, lambda x: x['INNERTUBE_CONTEXT']['client']['userAgent'], default_client=default_client),
**self._generate_cookie_auth_headers(ytcfg=ytcfg, account_syncid=account_syncid, session_index=session_index, origin=origin),
**self._generate_cookie_auth_headers(
ytcfg=ytcfg,
delegated_session_id=delegated_session_id,
user_session_id=user_session_id,
session_index=session_index,
origin=origin),
}
return filter_dict(headers)
@ -3837,9 +3903,13 @@ def _extract_player_response(self, client, video_id, master_ytcfg, player_ytcfg,
default_client=client,
visitor_data=visitor_data,
session_index=self._extract_session_index(master_ytcfg, player_ytcfg),
account_syncid=(
self._data_sync_id_to_delegated_session_id(data_sync_id)
or self._extract_account_syncid(master_ytcfg, initial_pr, player_ytcfg)
delegated_session_id=(
self._parse_data_sync_id(data_sync_id)[0]
or self._extract_delegated_session_id(master_ytcfg, initial_pr, player_ytcfg)
),
user_session_id=(
self._parse_data_sync_id(data_sync_id)[1]
or self._extract_user_session_id(master_ytcfg, initial_pr, player_ytcfg)
),
)
@ -5351,7 +5421,7 @@ def _extract_entries(self, parent_renderer, continuation_list):
if not continuation_list[0]:
continuation_list[0] = self._extract_continuation(parent_renderer)
def _entries(self, tab, item_id, ytcfg, account_syncid, visitor_data):
def _entries(self, tab, item_id, ytcfg, delegated_session_id, visitor_data):
continuation_list = [None]
extract_entries = lambda x: self._extract_entries(x, continuation_list)
tab_content = try_get(tab, lambda x: x['content'], dict)
@ -5372,7 +5442,7 @@ def _entries(self, tab, item_id, ytcfg, account_syncid, visitor_data):
break
seen_continuations.add(continuation_token)
headers = self.generate_api_headers(
ytcfg=ytcfg, account_syncid=account_syncid, visitor_data=visitor_data)
ytcfg=ytcfg, delegated_session_id=delegated_session_id, visitor_data=visitor_data)
response = self._extract_response(
item_id=f'{item_id} page {page_num}',
query=continuation, headers=headers, ytcfg=ytcfg,
@ -5442,7 +5512,7 @@ def _extract_from_tabs(self, item_id, ytcfg, data, tabs):
return self.playlist_result(
self._entries(
selected_tab, metadata['id'], ytcfg,
self._extract_account_syncid(ytcfg, data),
self._extract_delegated_session_id(ytcfg, data),
self._extract_visitor_data(data, ytcfg)),
**metadata)
@ -5594,7 +5664,7 @@ def _extract_inline_playlist(self, playlist, playlist_id, data, ytcfg):
watch_endpoint = try_get(
playlist, lambda x: x['contents'][-1]['playlistPanelVideoRenderer']['navigationEndpoint']['watchEndpoint'])
headers = self.generate_api_headers(
ytcfg=ytcfg, account_syncid=self._extract_account_syncid(ytcfg, data),
ytcfg=ytcfg, delegated_session_id=self._extract_delegated_session_id(ytcfg, data),
visitor_data=self._extract_visitor_data(response, data, ytcfg))
query = {
'playlistId': playlist_id,
@ -5692,7 +5762,7 @@ def _reload_with_unavailable_videos(self, item_id, data, ytcfg):
if not is_playlist:
return
headers = self.generate_api_headers(
ytcfg=ytcfg, account_syncid=self._extract_account_syncid(ytcfg, data),
ytcfg=ytcfg, delegated_session_id=self._extract_delegated_session_id(ytcfg, data),
visitor_data=self._extract_visitor_data(data, ytcfg))
query = {
'params': 'wgYCCAA=',