123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- # Copyright (c) 2021 Ultimaker B.V.
- # Cura is released under the terms of the LGPLv3 or higher.
- from base64 import b64encode
- from datetime import datetime
- from hashlib import sha512
- from PyQt5.QtNetwork import QNetworkReply
- import secrets
- from threading import Lock
- from typing import Callable, Optional
- import requests
- import urllib.parse
- from UM.i18n import i18nCatalog
- from UM.Logger import Logger
- from UM.TaskManagement.HttpRequestManager import HttpRequestManager # To download log-in tokens.
- from cura.OAuth2.Models import AuthenticationResponse, UserProfile, OAuth2Settings
- catalog = i18nCatalog("cura")
- TOKEN_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S"
- class AuthorizationHelpers:
- """Class containing several helpers to deal with the authorization flow."""
- def __init__(self, settings: "OAuth2Settings") -> None:
- self._settings = settings
- self._token_url = "{}/token".format(self._settings.OAUTH_SERVER_URL)
- self._request_lock = Lock()
- self._auth_response = None # type: Optional[AuthenticationResponse]
- @property
- def settings(self) -> "OAuth2Settings":
- """The OAuth2 settings object."""
- return self._settings
- def getAccessTokenUsingAuthorizationCode(self, authorization_code: str, verification_code: str) -> "AuthenticationResponse":
- """Request the access token from the authorization server.
- :param authorization_code: The authorization code from the 1st step.
- :param verification_code: The verification code needed for the PKCE extension.
- :return: An AuthenticationResponse object.
- """
- data = {
- "client_id": self._settings.CLIENT_ID if self._settings.CLIENT_ID is not None else "",
- "redirect_uri": self._settings.CALLBACK_URL if self._settings.CALLBACK_URL is not None else "",
- "grant_type": "authorization_code",
- "code": authorization_code,
- "code_verifier": verification_code,
- "scope": self._settings.CLIENT_SCOPES if self._settings.CLIENT_SCOPES is not None else "",
- }
- headers = {"Content-type": "application/x-www-form-urlencoded"}
- self._request_lock.acquire()
- HttpRequestManager.getInstance().post(
- self._token_url,
- data = urllib.parse.urlencode(data).encode("UTF-8"),
- headers_dict = headers,
- callback = self.parseTokenResponse
- )
- self._request_lock.acquire(timeout = 60) # Block until the request is completed. 1 minute timeout.
- response = self._auth_response
- self._auth_response = None
- self._request_lock.release()
- return response
- def getAccessTokenUsingRefreshToken(self, refresh_token: str) -> "AuthenticationResponse":
- """Request the access token from the authorization server using a refresh token.
- :param refresh_token:
- :return: An AuthenticationResponse object.
- """
- Logger.log("d", "Refreshing the access token for [%s]", self._settings.OAUTH_SERVER_URL)
- data = {
- "client_id": self._settings.CLIENT_ID if self._settings.CLIENT_ID is not None else "",
- "redirect_uri": self._settings.CALLBACK_URL if self._settings.CALLBACK_URL is not None else "",
- "grant_type": "refresh_token",
- "refresh_token": refresh_token,
- "scope": self._settings.CLIENT_SCOPES if self._settings.CLIENT_SCOPES is not None else "",
- }
- headers = {"Content-type": "application/x-www-form-urlencoded"}
- self._request_lock.acquire()
- HttpRequestManager.getInstance().post(
- self._token_url,
- data = urllib.parse.urlencode(data).encode("UTF-8"),
- headers_dict = headers,
- callback = self.parseTokenResponse
- )
- self._request_lock.acquire(timeout = 60) # Block until the request is completed. 1 minute timeout.
- response = self._auth_response
- self._auth_response = None
- self._request_lock.release()
- return response
- def parseTokenResponse(self, token_response: QNetworkReply) -> None:
- """Parse the token response from the authorization server into an AuthenticationResponse object.
- :param token_response: The JSON string data response from the authorization server.
- :return: An AuthenticationResponse object.
- """
- token_data = HttpRequestManager.readJSON(token_response)
- if not token_data:
- self._auth_response = AuthenticationResponse(success = False, err_message = catalog.i18nc("@message", "Could not read response."))
- self._request_lock.release()
- return
- if token_response.error() != QNetworkReply.NetworkError.NoError:
- self._auth_response = AuthenticationResponse(success = False, err_message = token_data["error_description"])
- self._request_lock.release()
- return
- self._auth_response = AuthenticationResponse(success = True,
- token_type = token_data["token_type"],
- access_token = token_data["access_token"],
- refresh_token = token_data["refresh_token"],
- expires_in = token_data["expires_in"],
- scope = token_data["scope"],
- received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT))
- self._request_lock.release()
- return
- def checkToken(self, access_token: str, callback: Optional[Callable[[UserProfile], None]] = None) -> None:
- """Calls the authentication API endpoint to get the token data.
- The API is called asynchronously. When a response is given, the callback is called with the user's profile.
- :param access_token: The encoded JWT token.
- :param callback: When a response is given, this function will be called with a user profile. If None, there will
- not be a callback. If the token failed to give/parse a user profile, the callback will not be called either.
- """
- self._user_profile = None
- check_token_url = "{}/check-token".format(self._settings.OAUTH_SERVER_URL)
- Logger.log("d", "Checking the access token for [%s]", check_token_url)
- headers = {
- "Authorization": f"Bearer {access_token}"
- }
- HttpRequestManager.getInstance().get(
- check_token_url,
- headers_dict = headers,
- callback = lambda reply: self._parseUserProfile(reply, callback)
- )
- def _parseUserProfile(self, reply: QNetworkReply, callback: Optional[Callable[[UserProfile], None]]) -> None:
- """
- Parses the user profile from a reply to /check-token.
- If the response is valid, the callback will be called to return the user profile to the caller.
- :param reply: A network reply to a request to the /check-token URL.
- :param callback: A function to call once a user profile was successfully obtained.
- """
- if reply.error() != QNetworkReply.NetworkError.NoError:
- Logger.warning(f"Could not access account information. QNetworkError {reply.errorString()}")
- return
- profile_data = HttpRequestManager.getInstance().readJSON(reply)
- if profile_data is None or "data" not in profile_data:
- Logger.warning("Could not parse user data from token.")
- return
- profile_data = profile_data["data"]
- required_fields = {"user_id", "username"}
- if "user_id" not in profile_data or "username" not in profile_data:
- Logger.warning(f"User data missing required field(s): {required_fields - set(profile_data.keys())}")
- return
- callback(UserProfile(
- user_id = profile_data["user_id"],
- username = profile_data["username"],
- profile_image_url = profile_data.get("profile_image_url", ""),
- organization_id = profile_data.get("organization", {}).get("organization_id"),
- subscriptions = profile_data.get("subscriptions", [])
- ))
- @staticmethod
- def generateVerificationCode(code_length: int = 32) -> str:
- """Generate a verification code of arbitrary length.
- :param code_length:: How long should the code be in bytes? This should never be lower than 16, but it's probably
- better to leave it at 32
- """
- return secrets.token_hex(code_length)
- @staticmethod
- def generateVerificationCodeChallenge(verification_code: str) -> str:
- """Generates a base64 encoded sha512 encrypted version of a given string.
- :param verification_code:
- :return: The encrypted code in base64 format.
- """
- encoded = sha512(verification_code.encode()).digest()
- return b64encode(encoded, altchars = b"_-").decode()
|