123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388 |
- # Copyright 2021 Google LLC
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import copy
- import mock
- import pytest # type: ignore
- from google.auth import exceptions
- from google.oauth2 import reauth
- MOCK_REQUEST = mock.Mock()
- CHALLENGES_RESPONSE_TEMPLATE = {
- "status": "CHALLENGE_REQUIRED",
- "sessionId": "123",
- "challenges": [
- {
- "status": "READY",
- "challengeId": 1,
- "challengeType": "PASSWORD",
- "securityKey": {},
- }
- ],
- }
- CHALLENGES_RESPONSE_AUTHENTICATED = {
- "status": "AUTHENTICATED",
- "sessionId": "123",
- "encodedProofOfReauthToken": "new_rapt_token",
- }
- REAUTH_START_METRICS_HEADER_VALUE = "gl-python/3.7 auth/1.1 auth-request-type/re-start"
- REAUTH_CONTINUE_METRICS_HEADER_VALUE = (
- "gl-python/3.7 auth/1.1 auth-request-type/re-cont"
- )
- TOKEN_REQUEST_METRICS_HEADER_VALUE = "gl-python/3.7 auth/1.1 cred-type/u"
- class MockChallenge(object):
- def __init__(self, name, locally_eligible, challenge_input):
- self.name = name
- self.is_locally_eligible = locally_eligible
- self.challenge_input = challenge_input
- def obtain_challenge_input(self, metadata):
- return self.challenge_input
- def _test_is_interactive():
- with mock.patch("sys.stdin.isatty", return_value=True):
- assert reauth.is_interactive()
- @mock.patch(
- "google.auth.metrics.reauth_start", return_value=REAUTH_START_METRICS_HEADER_VALUE
- )
- def test__get_challenges(mock_metrics_header_value):
- with mock.patch(
- "google.oauth2._client._token_endpoint_request"
- ) as mock_token_endpoint_request:
- reauth._get_challenges(MOCK_REQUEST, ["SAML"], "token")
- mock_token_endpoint_request.assert_called_with(
- MOCK_REQUEST,
- reauth._REAUTH_API + ":start",
- {"supportedChallengeTypes": ["SAML"]},
- access_token="token",
- use_json=True,
- headers={"x-goog-api-client": REAUTH_START_METRICS_HEADER_VALUE},
- )
- @mock.patch(
- "google.auth.metrics.reauth_start", return_value=REAUTH_START_METRICS_HEADER_VALUE
- )
- def test__get_challenges_with_scopes(mock_metrics_header_value):
- with mock.patch(
- "google.oauth2._client._token_endpoint_request"
- ) as mock_token_endpoint_request:
- reauth._get_challenges(
- MOCK_REQUEST, ["SAML"], "token", requested_scopes=["scope"]
- )
- mock_token_endpoint_request.assert_called_with(
- MOCK_REQUEST,
- reauth._REAUTH_API + ":start",
- {
- "supportedChallengeTypes": ["SAML"],
- "oauthScopesForDomainPolicyLookup": ["scope"],
- },
- access_token="token",
- use_json=True,
- headers={"x-goog-api-client": REAUTH_START_METRICS_HEADER_VALUE},
- )
- @mock.patch(
- "google.auth.metrics.reauth_continue",
- return_value=REAUTH_CONTINUE_METRICS_HEADER_VALUE,
- )
- def test__send_challenge_result(mock_metrics_header_value):
- with mock.patch(
- "google.oauth2._client._token_endpoint_request"
- ) as mock_token_endpoint_request:
- reauth._send_challenge_result(
- MOCK_REQUEST, "123", "1", {"credential": "password"}, "token"
- )
- mock_token_endpoint_request.assert_called_with(
- MOCK_REQUEST,
- reauth._REAUTH_API + "/123:continue",
- {
- "sessionId": "123",
- "challengeId": "1",
- "action": "RESPOND",
- "proposalResponse": {"credential": "password"},
- },
- access_token="token",
- use_json=True,
- headers={"x-goog-api-client": REAUTH_CONTINUE_METRICS_HEADER_VALUE},
- )
- def test__run_next_challenge_not_ready():
- challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE)
- challenges_response["challenges"][0]["status"] = "STATUS_UNSPECIFIED"
- assert (
- reauth._run_next_challenge(challenges_response, MOCK_REQUEST, "token") is None
- )
- def test__run_next_challenge_not_supported():
- challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE)
- challenges_response["challenges"][0]["challengeType"] = "CHALLENGE_TYPE_UNSPECIFIED"
- with pytest.raises(exceptions.ReauthFailError) as excinfo:
- reauth._run_next_challenge(challenges_response, MOCK_REQUEST, "token")
- assert excinfo.match(r"Unsupported challenge type CHALLENGE_TYPE_UNSPECIFIED")
- def test__run_next_challenge_not_locally_eligible():
- mock_challenge = MockChallenge("PASSWORD", False, "challenge_input")
- with mock.patch(
- "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge}
- ):
- with pytest.raises(exceptions.ReauthFailError) as excinfo:
- reauth._run_next_challenge(
- CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token"
- )
- assert excinfo.match(r"Challenge PASSWORD is not locally eligible")
- def test__run_next_challenge_no_challenge_input():
- mock_challenge = MockChallenge("PASSWORD", True, None)
- with mock.patch(
- "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge}
- ):
- assert (
- reauth._run_next_challenge(
- CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token"
- )
- is None
- )
- def test__run_next_challenge_success():
- mock_challenge = MockChallenge("PASSWORD", True, {"credential": "password"})
- with mock.patch(
- "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge}
- ):
- with mock.patch(
- "google.oauth2.reauth._send_challenge_result"
- ) as mock_send_challenge_result:
- reauth._run_next_challenge(
- CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token"
- )
- mock_send_challenge_result.assert_called_with(
- MOCK_REQUEST, "123", 1, {"credential": "password"}, "token"
- )
- def test__obtain_rapt_authenticated():
- with mock.patch(
- "google.oauth2.reauth._get_challenges",
- return_value=CHALLENGES_RESPONSE_AUTHENTICATED,
- ):
- assert reauth._obtain_rapt(MOCK_REQUEST, "token", None) == "new_rapt_token"
- def test__obtain_rapt_authenticated_after_run_next_challenge():
- with mock.patch(
- "google.oauth2.reauth._get_challenges",
- return_value=CHALLENGES_RESPONSE_TEMPLATE,
- ):
- with mock.patch(
- "google.oauth2.reauth._run_next_challenge",
- side_effect=[
- CHALLENGES_RESPONSE_TEMPLATE,
- CHALLENGES_RESPONSE_AUTHENTICATED,
- ],
- ):
- with mock.patch("google.oauth2.reauth.is_interactive", return_value=True):
- assert (
- reauth._obtain_rapt(MOCK_REQUEST, "token", None) == "new_rapt_token"
- )
- def test__obtain_rapt_unsupported_status():
- challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE)
- challenges_response["status"] = "STATUS_UNSPECIFIED"
- with mock.patch(
- "google.oauth2.reauth._get_challenges", return_value=challenges_response
- ):
- with pytest.raises(exceptions.ReauthFailError) as excinfo:
- reauth._obtain_rapt(MOCK_REQUEST, "token", None)
- assert excinfo.match(r"API error: STATUS_UNSPECIFIED")
- def test__obtain_rapt_no_challenge_output():
- challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE)
- with mock.patch(
- "google.oauth2.reauth._get_challenges", return_value=challenges_response
- ):
- with mock.patch("google.oauth2.reauth.is_interactive", return_value=True):
- with mock.patch(
- "google.oauth2.reauth._run_next_challenge", return_value=None
- ):
- with pytest.raises(exceptions.ReauthFailError) as excinfo:
- reauth._obtain_rapt(MOCK_REQUEST, "token", None)
- assert excinfo.match(r"Failed to obtain rapt token")
- def test__obtain_rapt_not_interactive():
- with mock.patch(
- "google.oauth2.reauth._get_challenges",
- return_value=CHALLENGES_RESPONSE_TEMPLATE,
- ):
- with mock.patch("google.oauth2.reauth.is_interactive", return_value=False):
- with pytest.raises(exceptions.ReauthFailError) as excinfo:
- reauth._obtain_rapt(MOCK_REQUEST, "token", None)
- assert excinfo.match(r"not in an interactive session")
- def test__obtain_rapt_not_authenticated():
- with mock.patch(
- "google.oauth2.reauth._get_challenges",
- return_value=CHALLENGES_RESPONSE_TEMPLATE,
- ):
- with mock.patch("google.oauth2.reauth.RUN_CHALLENGE_RETRY_LIMIT", 0):
- with pytest.raises(exceptions.ReauthFailError) as excinfo:
- reauth._obtain_rapt(MOCK_REQUEST, "token", None)
- assert excinfo.match(r"Reauthentication failed")
- def test_get_rapt_token():
- with mock.patch(
- "google.oauth2._client.refresh_grant", return_value=("token", None, None, None)
- ) as mock_refresh_grant:
- with mock.patch(
- "google.oauth2.reauth._obtain_rapt", return_value="new_rapt_token"
- ) as mock_obtain_rapt:
- assert (
- reauth.get_rapt_token(
- MOCK_REQUEST,
- "client_id",
- "client_secret",
- "refresh_token",
- "token_uri",
- )
- == "new_rapt_token"
- )
- mock_refresh_grant.assert_called_with(
- request=MOCK_REQUEST,
- client_id="client_id",
- client_secret="client_secret",
- refresh_token="refresh_token",
- token_uri="token_uri",
- scopes=[reauth._REAUTH_SCOPE],
- )
- mock_obtain_rapt.assert_called_with(
- MOCK_REQUEST, "token", requested_scopes=None
- )
- @mock.patch(
- "google.auth.metrics.token_request_user",
- return_value=TOKEN_REQUEST_METRICS_HEADER_VALUE,
- )
- def test_refresh_grant_failed(mock_metrics_header_value):
- with mock.patch(
- "google.oauth2._client._token_endpoint_request_no_throw"
- ) as mock_token_request:
- mock_token_request.return_value = (False, {"error": "Bad request"}, False)
- with pytest.raises(exceptions.RefreshError) as excinfo:
- reauth.refresh_grant(
- MOCK_REQUEST,
- "token_uri",
- "refresh_token",
- "client_id",
- "client_secret",
- scopes=["foo", "bar"],
- rapt_token="rapt_token",
- enable_reauth_refresh=True,
- )
- assert excinfo.match(r"Bad request")
- assert not excinfo.value.retryable
- mock_token_request.assert_called_with(
- MOCK_REQUEST,
- "token_uri",
- {
- "grant_type": "refresh_token",
- "client_id": "client_id",
- "client_secret": "client_secret",
- "refresh_token": "refresh_token",
- "scope": "foo bar",
- "rapt": "rapt_token",
- },
- headers={"x-goog-api-client": TOKEN_REQUEST_METRICS_HEADER_VALUE},
- )
- def test_refresh_grant_failed_with_string_type_response():
- with mock.patch(
- "google.oauth2._client._token_endpoint_request_no_throw"
- ) as mock_token_request:
- mock_token_request.return_value = (False, "string type error", False)
- with pytest.raises(exceptions.RefreshError) as excinfo:
- reauth.refresh_grant(
- MOCK_REQUEST,
- "token_uri",
- "refresh_token",
- "client_id",
- "client_secret",
- scopes=["foo", "bar"],
- rapt_token="rapt_token",
- enable_reauth_refresh=True,
- )
- assert excinfo.match(r"string type error")
- assert not excinfo.value.retryable
- def test_refresh_grant_success():
- with mock.patch(
- "google.oauth2._client._token_endpoint_request_no_throw"
- ) as mock_token_request:
- mock_token_request.side_effect = [
- (False, {"error": "invalid_grant", "error_subtype": "rapt_required"}, True),
- (True, {"access_token": "access_token"}, None),
- ]
- with mock.patch(
- "google.oauth2.reauth.get_rapt_token", return_value="new_rapt_token"
- ):
- assert reauth.refresh_grant(
- MOCK_REQUEST,
- "token_uri",
- "refresh_token",
- "client_id",
- "client_secret",
- enable_reauth_refresh=True,
- ) == (
- "access_token",
- "refresh_token",
- None,
- {"access_token": "access_token"},
- "new_rapt_token",
- )
- def test_refresh_grant_reauth_refresh_disabled():
- with mock.patch(
- "google.oauth2._client._token_endpoint_request_no_throw"
- ) as mock_token_request:
- mock_token_request.side_effect = [
- (False, {"error": "invalid_grant", "error_subtype": "rapt_required"}, True),
- (True, {"access_token": "access_token"}, None),
- ]
- with pytest.raises(exceptions.RefreshError) as excinfo:
- reauth.refresh_grant(
- MOCK_REQUEST, "token_uri", "refresh_token", "client_id", "client_secret"
- )
- assert excinfo.match(r"Reauthentication is needed")
|