123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- import datetime as dt
- import hashlib
- import hmac
- import urllib.parse
- from .common import InfoExtractor
- class AWSIE(InfoExtractor): # XXX: Conventionally, base classes should end with BaseIE/InfoExtractor
- _AWS_ALGORITHM = 'AWS4-HMAC-SHA256'
- _AWS_REGION = 'us-east-1'
- def _aws_execute_api(self, aws_dict, video_id, query=None):
- query = query or {}
- amz_date = dt.datetime.now(dt.timezone.utc).strftime('%Y%m%dT%H%M%SZ')
- date = amz_date[:8]
- headers = {
- 'Accept': 'application/json',
- 'Host': self._AWS_PROXY_HOST,
- 'X-Amz-Date': amz_date,
- 'X-Api-Key': self._AWS_API_KEY,
- }
- session_token = aws_dict.get('session_token')
- if session_token:
- headers['X-Amz-Security-Token'] = session_token
- def aws_hash(s):
- return hashlib.sha256(s.encode()).hexdigest()
- # Task 1: http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
- canonical_querystring = urllib.parse.urlencode(query)
- canonical_headers = ''
- for header_name, header_value in sorted(headers.items()):
- canonical_headers += f'{header_name.lower()}:{header_value}\n'
- signed_headers = ';'.join([header.lower() for header in sorted(headers.keys())])
- canonical_request = '\n'.join([
- 'GET',
- aws_dict['uri'],
- canonical_querystring,
- canonical_headers,
- signed_headers,
- aws_hash(''),
- ])
- # Task 2: http://docs.aws.amazon.com/general/latest/gr/sigv4-create-string-to-sign.html
- credential_scope_list = [date, self._AWS_REGION, 'execute-api', 'aws4_request']
- credential_scope = '/'.join(credential_scope_list)
- string_to_sign = '\n'.join([self._AWS_ALGORITHM, amz_date, credential_scope, aws_hash(canonical_request)])
- # Task 3: http://docs.aws.amazon.com/general/latest/gr/sigv4-calculate-signature.html
- def aws_hmac(key, msg):
- return hmac.new(key, msg.encode(), hashlib.sha256)
- def aws_hmac_digest(key, msg):
- return aws_hmac(key, msg).digest()
- def aws_hmac_hexdigest(key, msg):
- return aws_hmac(key, msg).hexdigest()
- k_signing = ('AWS4' + aws_dict['secret_key']).encode()
- for value in credential_scope_list:
- k_signing = aws_hmac_digest(k_signing, value)
- signature = aws_hmac_hexdigest(k_signing, string_to_sign)
- # Task 4: http://docs.aws.amazon.com/general/latest/gr/sigv4-add-signature-to-request.html
- headers['Authorization'] = ', '.join([
- '{} Credential={}/{}'.format(self._AWS_ALGORITHM, aws_dict['access_key'], credential_scope),
- f'SignedHeaders={signed_headers}',
- f'Signature={signature}',
- ])
- return self._download_json(
- 'https://{}{}{}'.format(self._AWS_PROXY_HOST, aws_dict['uri'], '?' + canonical_querystring if canonical_querystring else ''),
- video_id, headers=headers)
|