|
@@ -4,7 +4,7 @@ import json
|
|
|
import random
|
|
|
from hashlib import sha512
|
|
|
from base64 import b64encode
|
|
|
-from typing import Optional
|
|
|
+from typing import Dict, Optional
|
|
|
|
|
|
import requests
|
|
|
|
|
@@ -24,37 +24,39 @@ class AuthorizationHelpers:
|
|
|
def settings(self) -> "OAuth2Settings":
|
|
|
return self._settings
|
|
|
|
|
|
+ # Gets a dictionary with data that need to be used for any HTTP authorization request.
|
|
|
+ def getCommonRequestDataDict(self) -> Dict[str, str]:
|
|
|
+ data_dict = {"client_id": self._settings.CLIENT_ID if self._settings.CLIENT_ID is not None else "",
|
|
|
+ "redirect_uri": self._settings.CALLBACK_URL if self._settings.CALLBACK_URL is not None else "",
|
|
|
+ "scope": self._settings.CLIENT_SCOPES if self._settings.CLIENT_SCOPES is not None else "",
|
|
|
+ }
|
|
|
+ return data_dict
|
|
|
+
|
|
|
# Request the access token from the authorization server.
|
|
|
# \param authorization_code: The authorization code from the 1st step.
|
|
|
# \param verification_code: The verification code needed for the PKCE extension.
|
|
|
# \return: An AuthenticationResponse object.
|
|
|
- def getAccessTokenUsingAuthorizationCode(self, authorization_code: str, verification_code: str)-> "AuthenticationResponse":
|
|
|
- return self.parseTokenResponse(requests.post(self._token_url, data={
|
|
|
- "client_id": self._settings.CLIENT_ID,
|
|
|
- "redirect_uri": self._settings.CALLBACK_URL,
|
|
|
- "grant_type": "authorization_code",
|
|
|
- "code": authorization_code,
|
|
|
- "code_verifier": verification_code,
|
|
|
- "scope": self._settings.CLIENT_SCOPES
|
|
|
- })) # type: ignore
|
|
|
+ def getAccessTokenUsingAuthorizationCode(self, authorization_code: str, verification_code: str) -> "AuthenticationResponse":
|
|
|
+ data = self.getCommonRequestDataDict()
|
|
|
+ data["grant_type"] = "authorization_code"
|
|
|
+ data["code"] = authorization_code
|
|
|
+ data["code_verifier"] = verification_code
|
|
|
+ return self.parseTokenResponse(requests.post(self._token_url, data = data)) # type: ignore
|
|
|
|
|
|
# Request the access token from the authorization server using a refresh token.
|
|
|
# \param refresh_token:
|
|
|
# \return: An AuthenticationResponse object.
|
|
|
- def getAccessTokenUsingRefreshToken(self, refresh_token: str) -> AuthenticationResponse:
|
|
|
- return self.parseTokenResponse(requests.post(self._token_url, data={
|
|
|
- "client_id": self._settings.CLIENT_ID,
|
|
|
- "redirect_uri": self._settings.CALLBACK_URL,
|
|
|
- "grant_type": "refresh_token",
|
|
|
- "refresh_token": refresh_token,
|
|
|
- "scope": self._settings.CLIENT_SCOPES
|
|
|
- })) # type: ignore
|
|
|
+ def getAccessTokenUsingRefreshToken(self, refresh_token: str) -> "AuthenticationResponse":
|
|
|
+ data = self.getCommonRequestDataDict()
|
|
|
+ data["grant_type"] = "refresh_token"
|
|
|
+ data["refresh_token"] = refresh_token
|
|
|
+ return self.parseTokenResponse(requests.post(self._token_url, data = data)) # type: ignore
|
|
|
|
|
|
@staticmethod
|
|
|
# Parse the token response from the authorization server into an AuthenticationResponse object.
|
|
|
# \param token_response: The JSON string data response from the authorization server.
|
|
|
# \return: An AuthenticationResponse object.
|
|
|
- def parseTokenResponse(token_response: requests.models.Response) -> AuthenticationResponse:
|
|
|
+ def parseTokenResponse(token_response: requests.models.Response) -> "AuthenticationResponse":
|
|
|
token_data = None
|
|
|
|
|
|
try:
|