123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391 |
- # -*- coding: utf-8 -*-
- import json
- from unittest import mock
- from oauthlib import common
- from oauthlib.oauth2.rfc6749 import errors, tokens
- from oauthlib.oauth2.rfc6749.endpoints import Server
- from oauthlib.oauth2.rfc6749.endpoints.authorization import (
- AuthorizationEndpoint,
- )
- from oauthlib.oauth2.rfc6749.endpoints.resource import ResourceEndpoint
- from oauthlib.oauth2.rfc6749.endpoints.token import TokenEndpoint
- from oauthlib.oauth2.rfc6749.grant_types import (
- AuthorizationCodeGrant, ClientCredentialsGrant, ImplicitGrant,
- ResourceOwnerPasswordCredentialsGrant,
- )
- from tests.unittest import TestCase
- class AuthorizationEndpointTest(TestCase):
- def setUp(self):
- self.mock_validator = mock.MagicMock()
- self.mock_validator.get_code_challenge.return_value = None
- self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock())
- auth_code = AuthorizationCodeGrant(
- request_validator=self.mock_validator)
- auth_code.save_authorization_code = mock.MagicMock()
- implicit = ImplicitGrant(
- request_validator=self.mock_validator)
- implicit.save_token = mock.MagicMock()
- response_types = {
- 'code': auth_code,
- 'token': implicit,
- 'none': auth_code
- }
- self.expires_in = 1800
- token = tokens.BearerToken(
- self.mock_validator,
- expires_in=self.expires_in
- )
- self.endpoint = AuthorizationEndpoint(
- default_response_type='code',
- default_token_type=token,
- response_types=response_types
- )
- @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
- def test_authorization_grant(self):
- uri = 'http://i.b/l?response_type=code&client_id=me&scope=all+of+them&state=xyz'
- uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
- headers, body, status_code = self.endpoint.create_authorization_response(
- uri, scopes=['all', 'of', 'them'])
- self.assertIn('Location', headers)
- self.assertURLEqual(headers['Location'], 'http://back.to/me?code=abc&state=xyz')
- @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
- def test_implicit_grant(self):
- uri = 'http://i.b/l?response_type=token&client_id=me&scope=all+of+them&state=xyz'
- uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
- headers, body, status_code = self.endpoint.create_authorization_response(
- uri, scopes=['all', 'of', 'them'])
- self.assertIn('Location', headers)
- self.assertURLEqual(headers['Location'], 'http://back.to/me#access_token=abc&expires_in=' + str(self.expires_in) + '&token_type=Bearer&state=xyz&scope=all+of+them', parse_fragment=True)
- def test_none_grant(self):
- uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them&state=xyz'
- uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
- headers, body, status_code = self.endpoint.create_authorization_response(
- uri, scopes=['all', 'of', 'them'])
- self.assertIn('Location', headers)
- self.assertURLEqual(headers['Location'], 'http://back.to/me?state=xyz', parse_fragment=True)
- self.assertIsNone(body)
- self.assertEqual(status_code, 302)
- # and without the state parameter
- uri = 'http://i.b/l?response_type=none&client_id=me&scope=all+of+them'
- uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
- headers, body, status_code = self.endpoint.create_authorization_response(
- uri, scopes=['all', 'of', 'them'])
- self.assertIn('Location', headers)
- self.assertURLEqual(headers['Location'], 'http://back.to/me', parse_fragment=True)
- self.assertIsNone(body)
- self.assertEqual(status_code, 302)
- def test_missing_type(self):
- uri = 'http://i.b/l?client_id=me&scope=all+of+them'
- uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
- self.mock_validator.validate_request = mock.MagicMock(
- side_effect=errors.InvalidRequestError())
- headers, body, status_code = self.endpoint.create_authorization_response(
- uri, scopes=['all', 'of', 'them'])
- self.assertIn('Location', headers)
- self.assertURLEqual(headers['Location'], 'http://back.to/me?error=invalid_request&error_description=Missing+response_type+parameter.')
- def test_invalid_type(self):
- uri = 'http://i.b/l?response_type=invalid&client_id=me&scope=all+of+them'
- uri += '&redirect_uri=http%3A%2F%2Fback.to%2Fme'
- self.mock_validator.validate_request = mock.MagicMock(
- side_effect=errors.UnsupportedResponseTypeError())
- headers, body, status_code = self.endpoint.create_authorization_response(
- uri, scopes=['all', 'of', 'them'])
- self.assertIn('Location', headers)
- self.assertURLEqual(headers['Location'], 'http://back.to/me?error=unsupported_response_type')
- class TokenEndpointTest(TestCase):
- def setUp(self):
- def set_user(request):
- request.user = mock.MagicMock()
- request.client = mock.MagicMock()
- request.client.client_id = 'mocked_client_id'
- return True
- self.mock_validator = mock.MagicMock()
- self.mock_validator.authenticate_client.side_effect = set_user
- self.mock_validator.get_code_challenge.return_value = None
- self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock())
- auth_code = AuthorizationCodeGrant(
- request_validator=self.mock_validator)
- password = ResourceOwnerPasswordCredentialsGrant(
- request_validator=self.mock_validator)
- client = ClientCredentialsGrant(
- request_validator=self.mock_validator)
- supported_types = {
- 'authorization_code': auth_code,
- 'password': password,
- 'client_credentials': client,
- }
- self.expires_in = 1800
- token = tokens.BearerToken(
- self.mock_validator,
- expires_in=self.expires_in
- )
- self.endpoint = TokenEndpoint(
- 'authorization_code',
- default_token_type=token,
- grant_types=supported_types
- )
- @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
- def test_authorization_grant(self):
- body = 'grant_type=authorization_code&code=abc&scope=all+of+them'
- headers, body, status_code = self.endpoint.create_token_response(
- '', body=body)
- token = {
- 'token_type': 'Bearer',
- 'expires_in': self.expires_in,
- 'access_token': 'abc',
- 'refresh_token': 'abc',
- 'scope': 'all of them'
- }
- self.assertEqual(json.loads(body), token)
- body = 'grant_type=authorization_code&code=abc'
- headers, body, status_code = self.endpoint.create_token_response(
- '', body=body)
- token = {
- 'token_type': 'Bearer',
- 'expires_in': self.expires_in,
- 'access_token': 'abc',
- 'refresh_token': 'abc'
- }
- self.assertEqual(json.loads(body), token)
- # try with additional custom variables
- body = 'grant_type=authorization_code&code=abc&state=foobar'
- headers, body, status_code = self.endpoint.create_token_response(
- '', body=body)
- self.assertEqual(json.loads(body), token)
- @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
- def test_password_grant(self):
- body = 'grant_type=password&username=a&password=hello&scope=all+of+them'
- headers, body, status_code = self.endpoint.create_token_response(
- '', body=body)
- token = {
- 'token_type': 'Bearer',
- 'expires_in': self.expires_in,
- 'access_token': 'abc',
- 'refresh_token': 'abc',
- 'scope': 'all of them',
- }
- self.assertEqual(json.loads(body), token)
- @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
- def test_client_grant(self):
- body = 'grant_type=client_credentials&scope=all+of+them'
- headers, body, status_code = self.endpoint.create_token_response(
- '', body=body)
- token = {
- 'token_type': 'Bearer',
- 'expires_in': self.expires_in,
- 'access_token': 'abc',
- 'scope': 'all of them',
- }
- self.assertEqual(json.loads(body), token)
- def test_missing_type(self):
- _, body, _ = self.endpoint.create_token_response('', body='')
- token = {'error': 'unsupported_grant_type'}
- self.assertEqual(json.loads(body), token)
- def test_invalid_type(self):
- body = 'grant_type=invalid'
- _, body, _ = self.endpoint.create_token_response('', body=body)
- token = {'error': 'unsupported_grant_type'}
- self.assertEqual(json.loads(body), token)
- class SignedTokenEndpointTest(TestCase):
- def setUp(self):
- self.expires_in = 1800
- def set_user(request):
- request.user = mock.MagicMock()
- request.client = mock.MagicMock()
- request.client.client_id = 'mocked_client_id'
- return True
- self.mock_validator = mock.MagicMock()
- self.mock_validator.get_code_challenge.return_value = None
- self.mock_validator.authenticate_client.side_effect = set_user
- self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock())
- self.private_pem = """
- -----BEGIN RSA PRIVATE KEY-----
- MIIEpAIBAAKCAQEA6TtDhWGwzEOWZP6m/zHoZnAPLABfetvoMPmxPGjFjtDuMRPv
- EvI1sbixZBjBtdnc5rTtHUUQ25Am3JzwPRGo5laMGbj1pPyCPxlVi9LK82HQNX0B
- YK7tZtVfDHElQA7F4v3j9d3rad4O9/n+lyGIQ0tT7yQcBm2A8FEaP0bZYCLMjwMN
- WfaVLE8eXHyv+MfpNNLI9wttLxygKYM48I3NwsFuJgOa/KuodXaAmf8pJnx8t1Wn
- nxvaYXFiUn/TxmhM/qhemPa6+0nqq+aWV5eT7xn4K/ghLgNs09v6Yge0pmPl9Oz+
- +bjJ+aKRnAmwCOY8/5U5EilAiUOeBoO9+8OXtwIDAQABAoIBAGFTTbXXMkPK4HN8
- oItVdDlrAanG7hECuz3UtFUVE3upS/xG6TjqweVLwRqYCh2ssDXFwjy4mXRGDzF4
- e/e/6s9Txlrlh/w1MtTJ6ZzTdcViR9RKOczysjZ7S5KRlI3KnGFAuWPcG2SuOWjZ
- dZfzcj1Crd/ZHajBAVFHRsCo/ATVNKbTRprFfb27xKpQ2BwH/GG781sLE3ZVNIhs
- aRRaED4622kI1E/WXws2qQMqbFKzo0m1tPbLb3Z89WgZJ/tRQwuDype1Vfm7k6oX
- xfbp3948qSe/yWKRlMoPkleji/WxPkSIalzWSAi9ziN/0Uzhe65FURgrfHL3XR1A
- B8UR+aECgYEA7NPQZV4cAikk02Hv65JgISofqV49P8MbLXk8sdnI1n7Mj10TgzU3
- lyQGDEX4hqvT0bTXe4KAOxQZx9wumu05ejfzhdtSsEm6ptGHyCdmYDQeV0C/pxDX
- JNCK8XgMku2370XG0AnyBCT7NGlgtDcNCQufcesF2gEuoKiXg6Zjo7sCgYEA/Bzs
- 9fWGZZnSsMSBSW2OYbFuhF3Fne0HcxXQHipl0Rujc/9g0nccwqKGizn4fGOE7a8F
- usQgJoeGcinL7E9OEP/uQ9VX1C9RNVjIxP1O5/Guw1zjxQQYetOvbPhN2QhD1Ye7
- 0TRKrW1BapcjwLpFQlVg1ZeTPOi5lv24W/wX9jUCgYEAkrMSX/hPuTbrTNVZ3L6r
- NV/2hN+PaTPeXei/pBuXwOaCqDurnpcUfFcgN/IP5LwDVd+Dq0pHTFFDNv45EFbq
- R77o5n3ZVsIVEMiyJ1XgoK8oLDw7e61+15smtjT69Piz+09pu+ytMcwGn4y3Dmsb
- dALzHYnL8iLRU0ubrz0ec4kCgYAJiVKRTzNBPptQom49h85d9ac3jJCAE8o3WTjh
- Gzt0uHXrWlqgO280EY/DTnMOyXjqwLcXxHlu26uDP/99tdY/IF8z46sJ1KxetzgI
- 84f7kBHLRAU9m5UNeFpnZdEUB5MBTbwWAsNcYgiabpMkpCcghjg+fBhOsoLqqjhC
- CnwhjQKBgQDkv0QTdyBU84TE8J0XY3eLQwXbrvG2yD5A2ntN3PyxGEneX5WTJGMZ
- xJxwaFYQiDS3b9E7b8Q5dg8qa5Y1+epdhx3cuQAWPm+AoHKshDfbRve4txBDQAqh
- c6MxSWgsa+2Ld5SWSNbGtpPcmEM3Fl5ttMCNCKtNc0UE16oHwaPAIw==
- -----END RSA PRIVATE KEY-----
- """
- self.public_pem = """
- -----BEGIN PUBLIC KEY-----
- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA6TtDhWGwzEOWZP6m/zHo
- ZnAPLABfetvoMPmxPGjFjtDuMRPvEvI1sbixZBjBtdnc5rTtHUUQ25Am3JzwPRGo
- 5laMGbj1pPyCPxlVi9LK82HQNX0BYK7tZtVfDHElQA7F4v3j9d3rad4O9/n+lyGI
- Q0tT7yQcBm2A8FEaP0bZYCLMjwMNWfaVLE8eXHyv+MfpNNLI9wttLxygKYM48I3N
- wsFuJgOa/KuodXaAmf8pJnx8t1WnnxvaYXFiUn/TxmhM/qhemPa6+0nqq+aWV5eT
- 7xn4K/ghLgNs09v6Yge0pmPl9Oz++bjJ+aKRnAmwCOY8/5U5EilAiUOeBoO9+8OX
- twIDAQAB
- -----END PUBLIC KEY-----
- """
- signed_token = tokens.signed_token_generator(self.private_pem,
- user_id=123)
- self.endpoint = Server(
- self.mock_validator,
- token_expires_in=self.expires_in,
- token_generator=signed_token,
- refresh_token_generator=tokens.random_token_generator
- )
- @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
- def test_authorization_grant(self):
- body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&scope=all+of+them'
- headers, body, status_code = self.endpoint.create_token_response(
- '', body=body)
- body = json.loads(body)
- token = {
- 'token_type': 'Bearer',
- 'expires_in': self.expires_in,
- 'access_token': body['access_token'],
- 'refresh_token': 'abc',
- 'scope': 'all of them'
- }
- self.assertEqual(body, token)
- body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc'
- headers, body, status_code = self.endpoint.create_token_response(
- '', body=body)
- body = json.loads(body)
- token = {
- 'token_type': 'Bearer',
- 'expires_in': self.expires_in,
- 'access_token': body['access_token'],
- 'refresh_token': 'abc'
- }
- self.assertEqual(body, token)
- # try with additional custom variables
- body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=authorization_code&code=abc&state=foobar'
- headers, body, status_code = self.endpoint.create_token_response(
- '', body=body)
- body = json.loads(body)
- token = {
- 'token_type': 'Bearer',
- 'expires_in': self.expires_in,
- 'access_token': body['access_token'],
- 'refresh_token': 'abc'
- }
- self.assertEqual(body, token)
- @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
- def test_password_grant(self):
- body = 'grant_type=password&username=a&password=hello&scope=all+of+them'
- headers, body, status_code = self.endpoint.create_token_response(
- '', body=body)
- body = json.loads(body)
- token = {
- 'token_type': 'Bearer',
- 'expires_in': self.expires_in,
- 'access_token': body['access_token'],
- 'refresh_token': 'abc',
- 'scope': 'all of them',
- }
- self.assertEqual(body, token)
- @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
- def test_scopes_and_user_id_stored_in_access_token(self):
- body = 'grant_type=password&username=a&password=hello&scope=all+of+them'
- headers, body, status_code = self.endpoint.create_token_response(
- '', body=body)
- access_token = json.loads(body)['access_token']
- claims = common.verify_signed_token(self.public_pem, access_token)
- self.assertEqual(claims['scope'], 'all of them')
- self.assertEqual(claims['user_id'], 123)
- @mock.patch('oauthlib.common.generate_token', new=lambda: 'abc')
- def test_client_grant(self):
- body = 'grant_type=client_credentials&scope=all+of+them'
- headers, body, status_code = self.endpoint.create_token_response(
- '', body=body)
- body = json.loads(body)
- token = {
- 'token_type': 'Bearer',
- 'expires_in': self.expires_in,
- 'access_token': body['access_token'],
- 'scope': 'all of them',
- }
- self.assertEqual(body, token)
- def test_missing_type(self):
- _, body, _ = self.endpoint.create_token_response('', body='client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&code=abc')
- token = {'error': 'unsupported_grant_type'}
- self.assertEqual(json.loads(body), token)
- def test_invalid_type(self):
- body = 'client_id=me&redirect_uri=http%3A%2F%2Fback.to%2Fme&grant_type=invalid&code=abc'
- _, body, _ = self.endpoint.create_token_response('', body=body)
- token = {'error': 'unsupported_grant_type'}
- self.assertEqual(json.loads(body), token)
- class ResourceEndpointTest(TestCase):
- def setUp(self):
- self.mock_validator = mock.MagicMock()
- self.addCleanup(setattr, self, 'mock_validator', mock.MagicMock())
- token = tokens.BearerToken(request_validator=self.mock_validator)
- self.endpoint = ResourceEndpoint(
- default_token='Bearer',
- token_types={'Bearer': token}
- )
- def test_defaults(self):
- uri = 'http://a.b/path?some=query'
- self.mock_validator.validate_bearer_token.return_value = False
- valid, request = self.endpoint.verify_request(uri)
- self.assertFalse(valid)
- self.assertEqual(request.token_type, 'Bearer')
|