test_parameters.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. from unittest.mock import patch
  2. from oauthlib import signals
  3. from oauthlib.oauth2.rfc6749.errors import *
  4. from oauthlib.oauth2.rfc6749.parameters import *
  5. from tests.unittest import TestCase
  6. @patch('time.time', new=lambda: 1000)
  7. class ParameterTests(TestCase):
  8. state = 'xyz'
  9. auth_base = {
  10. 'uri': 'https://server.example.com/authorize',
  11. 'client_id': 's6BhdRkqt3',
  12. 'redirect_uri': 'https://client.example.com/cb',
  13. 'state': state,
  14. 'scope': 'photos'
  15. }
  16. list_scope = ['list', 'of', 'scopes']
  17. auth_grant = {'response_type': 'code'}
  18. auth_grant_pkce = {'response_type': 'code', 'code_challenge': "code_challenge",
  19. 'code_challenge_method': 'code_challenge_method'}
  20. auth_grant_list_scope = {}
  21. auth_implicit = {'response_type': 'token', 'extra': 'extra'}
  22. auth_implicit_list_scope = {}
  23. def setUp(self):
  24. self.auth_grant.update(self.auth_base)
  25. self.auth_grant_pkce.update(self.auth_base)
  26. self.auth_implicit.update(self.auth_base)
  27. self.auth_grant_list_scope.update(self.auth_grant)
  28. self.auth_grant_list_scope['scope'] = self.list_scope
  29. self.auth_implicit_list_scope.update(self.auth_implicit)
  30. self.auth_implicit_list_scope['scope'] = self.list_scope
  31. auth_base_uri = ('https://server.example.com/authorize?response_type={0}'
  32. '&client_id=s6BhdRkqt3&redirect_uri=https%3A%2F%2F'
  33. 'client.example.com%2Fcb&scope={1}&state={2}{3}')
  34. auth_base_uri_pkce = ('https://server.example.com/authorize?response_type={0}'
  35. '&client_id=s6BhdRkqt3&redirect_uri=https%3A%2F%2F'
  36. 'client.example.com%2Fcb&scope={1}&state={2}{3}&code_challenge={4}'
  37. '&code_challenge_method={5}')
  38. auth_grant_uri = auth_base_uri.format('code', 'photos', state, '')
  39. auth_grant_uri_pkce = auth_base_uri_pkce.format('code', 'photos', state, '', 'code_challenge',
  40. 'code_challenge_method')
  41. auth_grant_uri_list_scope = auth_base_uri.format('code', 'list+of+scopes', state, '')
  42. auth_implicit_uri = auth_base_uri.format('token', 'photos', state, '&extra=extra')
  43. auth_implicit_uri_list_scope = auth_base_uri.format('token', 'list+of+scopes', state, '&extra=extra')
  44. grant_body = {
  45. 'grant_type': 'authorization_code',
  46. 'code': 'SplxlOBeZQQYbYS6WxSbIA',
  47. 'redirect_uri': 'https://client.example.com/cb'
  48. }
  49. grant_body_pkce = {
  50. 'grant_type': 'authorization_code',
  51. 'code': 'SplxlOBeZQQYbYS6WxSbIA',
  52. 'redirect_uri': 'https://client.example.com/cb',
  53. 'code_verifier': 'code_verifier'
  54. }
  55. grant_body_scope = {'scope': 'photos'}
  56. grant_body_list_scope = {'scope': list_scope}
  57. auth_grant_body = ('grant_type=authorization_code&'
  58. 'code=SplxlOBeZQQYbYS6WxSbIA&'
  59. 'redirect_uri=https%3A%2F%2Fclient.example.com%2Fcb')
  60. auth_grant_body_pkce = ('grant_type=authorization_code&'
  61. 'code=SplxlOBeZQQYbYS6WxSbIA&'
  62. 'redirect_uri=https%3A%2F%2Fclient.example.com%2Fcb'
  63. '&code_verifier=code_verifier')
  64. auth_grant_body_scope = auth_grant_body + '&scope=photos'
  65. auth_grant_body_list_scope = auth_grant_body + '&scope=list+of+scopes'
  66. pwd_body = {
  67. 'grant_type': 'password',
  68. 'username': 'johndoe',
  69. 'password': 'A3ddj3w'
  70. }
  71. password_body = 'grant_type=password&username=johndoe&password=A3ddj3w'
  72. cred_grant = {'grant_type': 'client_credentials'}
  73. cred_body = 'grant_type=client_credentials'
  74. grant_response = 'https://client.example.com/cb?code=SplxlOBeZQQYbYS6WxSbIA&state=xyz'
  75. grant_dict = {'code': 'SplxlOBeZQQYbYS6WxSbIA', 'state': state}
  76. error_nocode = 'https://client.example.com/cb?state=xyz'
  77. error_nostate = 'https://client.example.com/cb?code=SplxlOBeZQQYbYS6WxSbIA'
  78. error_wrongstate = 'https://client.example.com/cb?code=SplxlOBeZQQYbYS6WxSbIA&state=abc'
  79. error_denied = 'https://client.example.com/cb?error=access_denied&state=xyz'
  80. error_invalid = 'https://client.example.com/cb?error=invalid_request&state=xyz'
  81. implicit_base = 'https://example.com/cb#access_token=2YotnFZFEjr1zCsicMWpAA&scope=abc&'
  82. implicit_response = implicit_base + 'state={}&token_type=example&expires_in=3600'.format(state)
  83. implicit_notype = implicit_base + 'state={}&expires_in=3600'.format(state)
  84. implicit_wrongstate = implicit_base + 'state={}&token_type=exampleexpires_in=3600'.format('invalid')
  85. implicit_nostate = implicit_base + 'token_type=example&expires_in=3600'
  86. implicit_notoken = 'https://example.com/cb#state=xyz&token_type=example&expires_in=3600'
  87. implicit_dict = {
  88. 'access_token': '2YotnFZFEjr1zCsicMWpAA',
  89. 'state': state,
  90. 'token_type': 'example',
  91. 'expires_in': 3600,
  92. 'expires_at': 4600,
  93. 'scope': ['abc']
  94. }
  95. json_response = ('{ "access_token": "2YotnFZFEjr1zCsicMWpAA",'
  96. ' "token_type": "example",'
  97. ' "expires_in": 3600,'
  98. ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",'
  99. ' "example_parameter": "example_value",'
  100. ' "scope":"abc def"}')
  101. json_response_noscope = ('{ "access_token": "2YotnFZFEjr1zCsicMWpAA",'
  102. ' "token_type": "example",'
  103. ' "expires_in": 3600,'
  104. ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",'
  105. ' "example_parameter": "example_value" }')
  106. json_response_noexpire = ('{ "access_token": "2YotnFZFEjr1zCsicMWpAA",'
  107. ' "token_type": "example",'
  108. ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",'
  109. ' "example_parameter": "example_value"}')
  110. json_response_expirenull = ('{ "access_token": "2YotnFZFEjr1zCsicMWpAA",'
  111. ' "token_type": "example",'
  112. ' "expires_in": null,'
  113. ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",'
  114. ' "example_parameter": "example_value"}')
  115. json_custom_error = '{ "error": "incorrect_client_credentials" }'
  116. json_error = '{ "error": "access_denied" }'
  117. json_notoken = ('{ "token_type": "example",'
  118. ' "expires_in": 3600,'
  119. ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",'
  120. ' "example_parameter": "example_value" }')
  121. json_notype = ('{ "access_token": "2YotnFZFEjr1zCsicMWpAA",'
  122. ' "expires_in": 3600,'
  123. ' "refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",'
  124. ' "example_parameter": "example_value" }')
  125. json_dict = {
  126. 'access_token': '2YotnFZFEjr1zCsicMWpAA',
  127. 'token_type': 'example',
  128. 'expires_in': 3600,
  129. 'expires_at': 4600,
  130. 'refresh_token': 'tGzv3JOkF0XG5Qx2TlKWIA',
  131. 'example_parameter': 'example_value',
  132. 'scope': ['abc', 'def']
  133. }
  134. json_noscope_dict = {
  135. 'access_token': '2YotnFZFEjr1zCsicMWpAA',
  136. 'token_type': 'example',
  137. 'expires_in': 3600,
  138. 'expires_at': 4600,
  139. 'refresh_token': 'tGzv3JOkF0XG5Qx2TlKWIA',
  140. 'example_parameter': 'example_value'
  141. }
  142. json_noexpire_dict = {
  143. 'access_token': '2YotnFZFEjr1zCsicMWpAA',
  144. 'token_type': 'example',
  145. 'refresh_token': 'tGzv3JOkF0XG5Qx2TlKWIA',
  146. 'example_parameter': 'example_value'
  147. }
  148. json_notype_dict = {
  149. 'access_token': '2YotnFZFEjr1zCsicMWpAA',
  150. 'expires_in': 3600,
  151. 'expires_at': 4600,
  152. 'refresh_token': 'tGzv3JOkF0XG5Qx2TlKWIA',
  153. 'example_parameter': 'example_value',
  154. }
  155. url_encoded_response = ('access_token=2YotnFZFEjr1zCsicMWpAA'
  156. '&token_type=example'
  157. '&expires_in=3600'
  158. '&refresh_token=tGzv3JOkF0XG5Qx2TlKWIA'
  159. '&example_parameter=example_value'
  160. '&scope=abc def')
  161. url_encoded_error = 'error=access_denied'
  162. url_encoded_notoken = ('token_type=example'
  163. '&expires_in=3600'
  164. '&refresh_token=tGzv3JOkF0XG5Qx2TlKWIA'
  165. '&example_parameter=example_value')
  166. def test_prepare_grant_uri(self):
  167. """Verify correct authorization URI construction."""
  168. self.assertURLEqual(prepare_grant_uri(**self.auth_grant), self.auth_grant_uri)
  169. self.assertURLEqual(prepare_grant_uri(**self.auth_grant_list_scope), self.auth_grant_uri_list_scope)
  170. self.assertURLEqual(prepare_grant_uri(**self.auth_implicit), self.auth_implicit_uri)
  171. self.assertURLEqual(prepare_grant_uri(**self.auth_implicit_list_scope), self.auth_implicit_uri_list_scope)
  172. self.assertURLEqual(prepare_grant_uri(**self.auth_grant_pkce), self.auth_grant_uri_pkce)
  173. def test_prepare_token_request(self):
  174. """Verify correct access token request body construction."""
  175. self.assertFormBodyEqual(prepare_token_request(**self.grant_body), self.auth_grant_body)
  176. self.assertFormBodyEqual(prepare_token_request(**self.pwd_body), self.password_body)
  177. self.assertFormBodyEqual(prepare_token_request(**self.cred_grant), self.cred_body)
  178. self.assertFormBodyEqual(prepare_token_request(**self.grant_body_pkce), self.auth_grant_body_pkce)
  179. def test_grant_response(self):
  180. """Verify correct parameter parsing and validation for auth code responses."""
  181. params = parse_authorization_code_response(self.grant_response)
  182. self.assertEqual(params, self.grant_dict)
  183. params = parse_authorization_code_response(self.grant_response, state=self.state)
  184. self.assertEqual(params, self.grant_dict)
  185. self.assertRaises(MissingCodeError, parse_authorization_code_response,
  186. self.error_nocode)
  187. self.assertRaises(AccessDeniedError, parse_authorization_code_response,
  188. self.error_denied)
  189. self.assertRaises(InvalidRequestFatalError, parse_authorization_code_response,
  190. self.error_invalid)
  191. self.assertRaises(MismatchingStateError, parse_authorization_code_response,
  192. self.error_nostate, state=self.state)
  193. self.assertRaises(MismatchingStateError, parse_authorization_code_response,
  194. self.error_wrongstate, state=self.state)
  195. def test_implicit_token_response(self):
  196. """Verify correct parameter parsing and validation for implicit responses."""
  197. self.assertEqual(parse_implicit_response(self.implicit_response),
  198. self.implicit_dict)
  199. self.assertRaises(MissingTokenError, parse_implicit_response,
  200. self.implicit_notoken)
  201. self.assertRaises(ValueError, parse_implicit_response,
  202. self.implicit_nostate, state=self.state)
  203. self.assertRaises(ValueError, parse_implicit_response,
  204. self.implicit_wrongstate, state=self.state)
  205. def test_custom_json_error(self):
  206. self.assertRaises(CustomOAuth2Error, parse_token_response, self.json_custom_error)
  207. def test_json_token_response(self):
  208. """Verify correct parameter parsing and validation for token responses. """
  209. self.assertEqual(parse_token_response(self.json_response), self.json_dict)
  210. self.assertRaises(AccessDeniedError, parse_token_response, self.json_error)
  211. self.assertRaises(MissingTokenError, parse_token_response, self.json_notoken)
  212. self.assertEqual(parse_token_response(self.json_response_noscope,
  213. scope=['all', 'the', 'scopes']), self.json_noscope_dict)
  214. self.assertEqual(parse_token_response(self.json_response_noexpire), self.json_noexpire_dict)
  215. self.assertEqual(parse_token_response(self.json_response_expirenull), self.json_noexpire_dict)
  216. scope_changes_recorded = []
  217. def record_scope_change(sender, message, old, new):
  218. scope_changes_recorded.append((message, old, new))
  219. os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '1'
  220. signals.scope_changed.connect(record_scope_change)
  221. try:
  222. parse_token_response(self.json_response, scope='aaa')
  223. self.assertEqual(len(scope_changes_recorded), 1)
  224. message, old, new = scope_changes_recorded[0]
  225. for scope in new + old:
  226. self.assertIn(scope, message)
  227. self.assertEqual(old, ['aaa'])
  228. self.assertEqual(set(new), {'abc', 'def'})
  229. finally:
  230. signals.scope_changed.disconnect(record_scope_change)
  231. del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE']
  232. def test_json_token_notype(self):
  233. """Verify strict token type parsing only when configured. """
  234. self.assertEqual(parse_token_response(self.json_notype), self.json_notype_dict)
  235. try:
  236. os.environ['OAUTHLIB_STRICT_TOKEN_TYPE'] = '1'
  237. self.assertRaises(MissingTokenTypeError, parse_token_response, self.json_notype)
  238. finally:
  239. del os.environ['OAUTHLIB_STRICT_TOKEN_TYPE']
  240. def test_url_encoded_token_response(self):
  241. """Verify fallback parameter parsing and validation for token responses. """
  242. self.assertEqual(parse_token_response(self.url_encoded_response), self.json_dict)
  243. self.assertRaises(AccessDeniedError, parse_token_response, self.url_encoded_error)
  244. self.assertRaises(MissingTokenError, parse_token_response, self.url_encoded_notoken)
  245. scope_changes_recorded = []
  246. def record_scope_change(sender, message, old, new):
  247. scope_changes_recorded.append((message, old, new))
  248. os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '1'
  249. signals.scope_changed.connect(record_scope_change)
  250. try:
  251. token = parse_token_response(self.url_encoded_response, scope='aaa')
  252. self.assertEqual(len(scope_changes_recorded), 1)
  253. message, old, new = scope_changes_recorded[0]
  254. for scope in new + old:
  255. self.assertIn(scope, message)
  256. self.assertEqual(old, ['aaa'])
  257. self.assertEqual(set(new), {'abc', 'def'})
  258. finally:
  259. signals.scope_changed.disconnect(record_scope_change)
  260. del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE']