test_app_engine.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. # Copyright 2016 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import datetime
  15. import mock
  16. import pytest
  17. from google.auth import app_engine
  18. class _AppIdentityModule(object):
  19. """The interface of the App Idenity app engine module.
  20. See https://cloud.google.com/appengine/docs/standard/python/refdocs
  21. /google.appengine.api.app_identity.app_identity
  22. """
  23. def get_application_id(self):
  24. raise NotImplementedError()
  25. def sign_blob(self, bytes_to_sign, deadline=None):
  26. raise NotImplementedError()
  27. def get_service_account_name(self, deadline=None):
  28. raise NotImplementedError()
  29. def get_access_token(self, scopes, service_account_id=None):
  30. raise NotImplementedError()
  31. @pytest.fixture
  32. def app_identity(monkeypatch):
  33. """Mocks the app_identity module for google.auth.app_engine."""
  34. app_identity_module = mock.create_autospec(_AppIdentityModule, instance=True)
  35. monkeypatch.setattr(app_engine, "app_identity", app_identity_module)
  36. yield app_identity_module
  37. def test_get_project_id(app_identity):
  38. app_identity.get_application_id.return_value = mock.sentinel.project
  39. assert app_engine.get_project_id() == mock.sentinel.project
  40. @mock.patch.object(app_engine, "app_identity", new=None)
  41. def test_get_project_id_missing_apis():
  42. with pytest.raises(EnvironmentError) as excinfo:
  43. assert app_engine.get_project_id()
  44. assert excinfo.match(r"App Engine APIs are not available")
  45. class TestSigner(object):
  46. def test_key_id(self, app_identity):
  47. app_identity.sign_blob.return_value = (
  48. mock.sentinel.key_id,
  49. mock.sentinel.signature,
  50. )
  51. signer = app_engine.Signer()
  52. assert signer.key_id is None
  53. def test_sign(self, app_identity):
  54. app_identity.sign_blob.return_value = (
  55. mock.sentinel.key_id,
  56. mock.sentinel.signature,
  57. )
  58. signer = app_engine.Signer()
  59. to_sign = b"123"
  60. signature = signer.sign(to_sign)
  61. assert signature == mock.sentinel.signature
  62. app_identity.sign_blob.assert_called_with(to_sign)
  63. class TestCredentials(object):
  64. @mock.patch.object(app_engine, "app_identity", new=None)
  65. def test_missing_apis(self):
  66. with pytest.raises(EnvironmentError) as excinfo:
  67. app_engine.Credentials()
  68. assert excinfo.match(r"App Engine APIs are not available")
  69. def test_default_state(self, app_identity):
  70. credentials = app_engine.Credentials()
  71. # Not token acquired yet
  72. assert not credentials.valid
  73. # Expiration hasn't been set yet
  74. assert not credentials.expired
  75. # Scopes are required
  76. assert not credentials.scopes
  77. assert not credentials.default_scopes
  78. assert credentials.requires_scopes
  79. assert not credentials.quota_project_id
  80. def test_with_scopes(self, app_identity):
  81. credentials = app_engine.Credentials()
  82. assert not credentials.scopes
  83. assert credentials.requires_scopes
  84. scoped_credentials = credentials.with_scopes(["email"])
  85. assert scoped_credentials.has_scopes(["email"])
  86. assert not scoped_credentials.requires_scopes
  87. def test_with_default_scopes(self, app_identity):
  88. credentials = app_engine.Credentials()
  89. assert not credentials.scopes
  90. assert not credentials.default_scopes
  91. assert credentials.requires_scopes
  92. scoped_credentials = credentials.with_scopes(
  93. scopes=None, default_scopes=["email"]
  94. )
  95. assert scoped_credentials.has_scopes(["email"])
  96. assert not scoped_credentials.requires_scopes
  97. def test_with_quota_project(self, app_identity):
  98. credentials = app_engine.Credentials()
  99. assert not credentials.scopes
  100. assert not credentials.quota_project_id
  101. quota_project_creds = credentials.with_quota_project("project-foo")
  102. assert quota_project_creds.quota_project_id == "project-foo"
  103. def test_service_account_email_implicit(self, app_identity):
  104. app_identity.get_service_account_name.return_value = (
  105. mock.sentinel.service_account_email
  106. )
  107. credentials = app_engine.Credentials()
  108. assert credentials.service_account_email == mock.sentinel.service_account_email
  109. assert app_identity.get_service_account_name.called
  110. def test_service_account_email_explicit(self, app_identity):
  111. credentials = app_engine.Credentials(
  112. service_account_id=mock.sentinel.service_account_email
  113. )
  114. assert credentials.service_account_email == mock.sentinel.service_account_email
  115. assert not app_identity.get_service_account_name.called
  116. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  117. def test_refresh(self, utcnow, app_identity):
  118. token = "token"
  119. ttl = 643942923
  120. app_identity.get_access_token.return_value = token, ttl
  121. credentials = app_engine.Credentials(
  122. scopes=["email"], default_scopes=["profile"]
  123. )
  124. credentials.refresh(None)
  125. app_identity.get_access_token.assert_called_with(
  126. credentials.scopes, credentials._service_account_id
  127. )
  128. assert credentials.token == token
  129. assert credentials.expiry == datetime.datetime(1990, 5, 29, 1, 2, 3)
  130. assert credentials.valid
  131. assert not credentials.expired
  132. @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
  133. def test_refresh_with_default_scopes(self, utcnow, app_identity):
  134. token = "token"
  135. ttl = 643942923
  136. app_identity.get_access_token.return_value = token, ttl
  137. credentials = app_engine.Credentials(default_scopes=["email"])
  138. credentials.refresh(None)
  139. app_identity.get_access_token.assert_called_with(
  140. credentials.default_scopes, credentials._service_account_id
  141. )
  142. assert credentials.token == token
  143. assert credentials.expiry == datetime.datetime(1990, 5, 29, 1, 2, 3)
  144. assert credentials.valid
  145. assert not credentials.expired
  146. def test_sign_bytes(self, app_identity):
  147. app_identity.sign_blob.return_value = (
  148. mock.sentinel.key_id,
  149. mock.sentinel.signature,
  150. )
  151. credentials = app_engine.Credentials()
  152. to_sign = b"123"
  153. signature = credentials.sign_bytes(to_sign)
  154. assert signature == mock.sentinel.signature
  155. app_identity.sign_blob.assert_called_with(to_sign)
  156. def test_signer(self, app_identity):
  157. credentials = app_engine.Credentials()
  158. assert isinstance(credentials.signer, app_engine.Signer)
  159. def test_signer_email(self, app_identity):
  160. credentials = app_engine.Credentials()
  161. assert credentials.signer_email == credentials.service_account_email