test_service_application.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. # -*- coding: utf-8 -*-
  2. import os
  3. from time import time
  4. from unittest.mock import patch
  5. import jwt
  6. from oauthlib.common import Request
  7. from oauthlib.oauth2 import ServiceApplicationClient
  8. from tests.unittest import TestCase
  9. class ServiceApplicationClientTest(TestCase):
  10. gt = ServiceApplicationClient.grant_type
  11. private_key = """
  12. -----BEGIN RSA PRIVATE KEY-----
  13. MIICXgIBAAKBgQDk1/bxyS8Q8jiheHeYYp/4rEKJopeQRRKKpZI4s5i+UPwVpupG
  14. AlwXWfzXwSMaKPAoKJNdu7tqKRniqst5uoHXw98gj0x7zamu0Ck1LtQ4c7pFMVah
  15. 5IYGhBi2E9ycNS329W27nJPWNCbESTu7snVlG8V8mfvGGg3xNjTMO7IdrwIDAQAB
  16. AoGBAOQ2KuH8S5+OrsL4K+wfjoCi6MfxCUyqVU9GxocdM1m30WyWRFMEz2nKJ8fR
  17. p3vTD4w8yplTOhcoXdQZl0kRoaDzrcYkm2VvJtQRrX7dKFT8dR8D/Tr7dNQLOXfC
  18. DY6xveQczE7qt7Vk7lp4FqmxBsaaEuokt78pOOjywZoInjZhAkEA9wz3zoZNT0/i
  19. rf6qv2qTIeieUB035N3dyw6f1BGSWYaXSuerDCD/J1qZbAPKKhyHZbVawFt3UMhe
  20. 542UftBaxQJBAO0iJy1I8GQjGnS7B3yvyH3CcLYGy296+XO/2xKp/d/ty1OIeovx
  21. C60pLNwuFNF3z9d2GVQAdoQ89hUkOtjZLeMCQQD0JO6oPHUeUjYT+T7ImAv7UKVT
  22. Suy30sKjLzqoGw1kR+wv7C5PeDRvscs4wa4CW9s6mjSrMDkDrmCLuJDtmf55AkEA
  23. kmaMg2PNrjUR51F0zOEFycaaqXbGcFwe1/xx9zLmHzMDXd4bsnwt9kk+fe0hQzVS
  24. JzatanQit3+feev1PN3QewJAWv4RZeavEUhKv+kLe95Yd0su7lTLVduVgh4v5yLT
  25. Ga6FHdjGPcfajt+nrpB1n8UQBEH9ZxniokR/IPvdMlxqXA==
  26. -----END RSA PRIVATE KEY-----
  27. """
  28. public_key = """
  29. -----BEGIN PUBLIC KEY-----
  30. MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDk1/bxyS8Q8jiheHeYYp/4rEKJ
  31. opeQRRKKpZI4s5i+UPwVpupGAlwXWfzXwSMaKPAoKJNdu7tqKRniqst5uoHXw98g
  32. j0x7zamu0Ck1LtQ4c7pFMVah5IYGhBi2E9ycNS329W27nJPWNCbESTu7snVlG8V8
  33. mfvGGg3xNjTMO7IdrwIDAQAB
  34. -----END PUBLIC KEY-----
  35. """
  36. subject = 'resource-owner@provider.com'
  37. issuer = 'the-client@provider.com'
  38. audience = 'https://provider.com/token'
  39. client_id = "someclientid"
  40. scope = ["/profile"]
  41. kwargs = {
  42. "some": "providers",
  43. "require": "extra arguments"
  44. }
  45. body = "isnot=empty"
  46. body_up = "not=empty&grant_type=%s" % gt
  47. body_kwargs = body_up + "&some=providers&require=extra+arguments"
  48. token_json = ('{ "access_token":"2YotnFZFEjr1zCsicMWpAA",'
  49. ' "token_type":"example",'
  50. ' "expires_in":3600,'
  51. ' "scope":"/profile",'
  52. ' "example_parameter":"example_value"}')
  53. token = {
  54. "access_token": "2YotnFZFEjr1zCsicMWpAA",
  55. "token_type": "example",
  56. "expires_in": 3600,
  57. "scope": ["/profile"],
  58. "example_parameter": "example_value"
  59. }
  60. @patch('time.time')
  61. def test_request_body(self, t):
  62. t.return_value = time()
  63. self.token['expires_at'] = self.token['expires_in'] + t.return_value
  64. client = ServiceApplicationClient(
  65. self.client_id, private_key=self.private_key)
  66. # Basic with min required params
  67. body = client.prepare_request_body(issuer=self.issuer,
  68. subject=self.subject,
  69. audience=self.audience,
  70. body=self.body)
  71. r = Request('https://a.b', body=body)
  72. self.assertEqual(r.isnot, 'empty')
  73. self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type)
  74. claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256'])
  75. self.assertEqual(claim['iss'], self.issuer)
  76. # audience verification is handled during decode now
  77. self.assertEqual(claim['sub'], self.subject)
  78. self.assertEqual(claim['iat'], int(t.return_value))
  79. self.assertNotIn('nbf', claim)
  80. self.assertNotIn('jti', claim)
  81. # Missing issuer parameter
  82. self.assertRaises(ValueError, client.prepare_request_body,
  83. issuer=None, subject=self.subject, audience=self.audience, body=self.body)
  84. # Missing subject parameter
  85. self.assertRaises(ValueError, client.prepare_request_body,
  86. issuer=self.issuer, subject=None, audience=self.audience, body=self.body)
  87. # Missing audience parameter
  88. self.assertRaises(ValueError, client.prepare_request_body,
  89. issuer=self.issuer, subject=self.subject, audience=None, body=self.body)
  90. # Optional kwargs
  91. not_before = time() - 3600
  92. jwt_id = '8zd15df4s35f43sd'
  93. body = client.prepare_request_body(issuer=self.issuer,
  94. subject=self.subject,
  95. audience=self.audience,
  96. body=self.body,
  97. not_before=not_before,
  98. jwt_id=jwt_id)
  99. r = Request('https://a.b', body=body)
  100. self.assertEqual(r.isnot, 'empty')
  101. self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type)
  102. claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256'])
  103. self.assertEqual(claim['iss'], self.issuer)
  104. # audience verification is handled during decode now
  105. self.assertEqual(claim['sub'], self.subject)
  106. self.assertEqual(claim['iat'], int(t.return_value))
  107. self.assertEqual(claim['nbf'], not_before)
  108. self.assertEqual(claim['jti'], jwt_id)
  109. @patch('time.time')
  110. def test_request_body_no_initial_private_key(self, t):
  111. t.return_value = time()
  112. self.token['expires_at'] = self.token['expires_in'] + t.return_value
  113. client = ServiceApplicationClient(
  114. self.client_id, private_key=None)
  115. # Basic with private key provided
  116. body = client.prepare_request_body(issuer=self.issuer,
  117. subject=self.subject,
  118. audience=self.audience,
  119. body=self.body,
  120. private_key=self.private_key)
  121. r = Request('https://a.b', body=body)
  122. self.assertEqual(r.isnot, 'empty')
  123. self.assertEqual(r.grant_type, ServiceApplicationClient.grant_type)
  124. claim = jwt.decode(r.assertion, self.public_key, audience=self.audience, algorithms=['RS256'])
  125. self.assertEqual(claim['iss'], self.issuer)
  126. # audience verification is handled during decode now
  127. self.assertEqual(claim['sub'], self.subject)
  128. self.assertEqual(claim['iat'], int(t.return_value))
  129. # No private key provided
  130. self.assertRaises(ValueError, client.prepare_request_body,
  131. issuer=self.issuer, subject=self.subject, audience=self.audience, body=self.body)
  132. @patch('time.time')
  133. def test_parse_token_response(self, t):
  134. t.return_value = time()
  135. self.token['expires_at'] = self.token['expires_in'] + t.return_value
  136. client = ServiceApplicationClient(self.client_id)
  137. # Parse code and state
  138. response = client.parse_request_body_response(self.token_json, scope=self.scope)
  139. self.assertEqual(response, self.token)
  140. self.assertEqual(client.access_token, response.get("access_token"))
  141. self.assertEqual(client.refresh_token, response.get("refresh_token"))
  142. self.assertEqual(client.token_type, response.get("token_type"))
  143. # Mismatching state
  144. self.assertRaises(Warning, client.parse_request_body_response, self.token_json, scope="invalid")
  145. os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '2'
  146. token = client.parse_request_body_response(self.token_json, scope="invalid")
  147. self.assertTrue(token.scope_changed)
  148. del os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE']