aws.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import datetime as dt
  2. import hashlib
  3. import hmac
  4. import urllib.parse
  5. from .common import InfoExtractor
  6. class AWSIE(InfoExtractor): # XXX: Conventionally, base classes should end with BaseIE/InfoExtractor
  7. _AWS_ALGORITHM = 'AWS4-HMAC-SHA256'
  8. _AWS_REGION = 'us-east-1'
  9. def _aws_execute_api(self, aws_dict, video_id, query=None):
  10. query = query or {}
  11. amz_date = dt.datetime.now(dt.timezone.utc).strftime('%Y%m%dT%H%M%SZ')
  12. date = amz_date[:8]
  13. headers = {
  14. 'Accept': 'application/json',
  15. 'Host': self._AWS_PROXY_HOST,
  16. 'X-Amz-Date': amz_date,
  17. 'X-Api-Key': self._AWS_API_KEY,
  18. }
  19. session_token = aws_dict.get('session_token')
  20. if session_token:
  21. headers['X-Amz-Security-Token'] = session_token
  22. def aws_hash(s):
  23. return hashlib.sha256(s.encode()).hexdigest()
  24. # Task 1: http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
  25. canonical_querystring = urllib.parse.urlencode(query)
  26. canonical_headers = ''
  27. for header_name, header_value in sorted(headers.items()):
  28. canonical_headers += f'{header_name.lower()}:{header_value}\n'
  29. signed_headers = ';'.join([header.lower() for header in sorted(headers.keys())])
  30. canonical_request = '\n'.join([
  31. 'GET',
  32. aws_dict['uri'],
  33. canonical_querystring,
  34. canonical_headers,
  35. signed_headers,
  36. aws_hash(''),
  37. ])
  38. # Task 2: http://docs.aws.amazon.com/general/latest/gr/sigv4-create-string-to-sign.html
  39. credential_scope_list = [date, self._AWS_REGION, 'execute-api', 'aws4_request']
  40. credential_scope = '/'.join(credential_scope_list)
  41. string_to_sign = '\n'.join([self._AWS_ALGORITHM, amz_date, credential_scope, aws_hash(canonical_request)])
  42. # Task 3: http://docs.aws.amazon.com/general/latest/gr/sigv4-calculate-signature.html
  43. def aws_hmac(key, msg):
  44. return hmac.new(key, msg.encode(), hashlib.sha256)
  45. def aws_hmac_digest(key, msg):
  46. return aws_hmac(key, msg).digest()
  47. def aws_hmac_hexdigest(key, msg):
  48. return aws_hmac(key, msg).hexdigest()
  49. k_signing = ('AWS4' + aws_dict['secret_key']).encode()
  50. for value in credential_scope_list:
  51. k_signing = aws_hmac_digest(k_signing, value)
  52. signature = aws_hmac_hexdigest(k_signing, string_to_sign)
  53. # Task 4: http://docs.aws.amazon.com/general/latest/gr/sigv4-add-signature-to-request.html
  54. headers['Authorization'] = ', '.join([
  55. '{} Credential={}/{}'.format(self._AWS_ALGORITHM, aws_dict['access_key'], credential_scope),
  56. f'SignedHeaders={signed_headers}',
  57. f'Signature={signature}',
  58. ])
  59. return self._download_json(
  60. 'https://{}{}{}'.format(self._AWS_PROXY_HOST, aws_dict['uri'], '?' + canonical_querystring if canonical_querystring else ''),
  61. video_id, headers=headers)