Browse Source

feat: improved user auth tokens (#68148)

Supports https://github.com/getsentry/rfcs/pull/32. 

- Newly created _user auth tokens_ will be prefixed with `sntryu_`. 
- Introduce a custom model manager for `ApiToken` to handle the unique
creation logic where we need to hash the token values and store them.
- Use the `token_type` (currently optional) parameter/field on
`ApiToken` when creating user auth tokens to let the model do the heavy
lifting on generating and hashing the values. This will keep the logic
out of views and simplify calls to just `new_token =
ApiToken.objects.create(user=user, token_type=AuthTokenType.USER)`.
- I've [changed some
behavior](https://github.com/getsentry/sentry/pull/68148/files#diff-e68bf726258b71dbfe6c6a3dcb9a959faba4e9585762067078a38bed5bad4812R36-R37)
where we only return the `refreshToken` on applicable token types.

I introduce a "read once" pattern in this PR for the token secrets to
prevent leaking of them in logs, exceptions, etc. It works like this
when creating a new `ApiToken`:

1. The model manager sets temporary fields `__plaintext_token` and
`__plaintext_refresh_token` that store the respective plaintext values
for temporary reading.
2. When reading the value through the `plaintext_token` property on
`ApiToken` (notice the single prepended underscore) the string value is
returned and `__plaintext_token` is immediately set to `None`.
3. If you attempt to read the `_plaintext_token` property again, an
exception will be raised, `PlaintextSecretAlreadyRead`.
Matthew T 11 months ago
parent
commit
c1d6984aa3

+ 2 - 1
src/sentry/api/endpoints/api_tokens.py

@@ -19,6 +19,7 @@ from sentry.auth.elevated_mode import has_elevated_mode
 from sentry.models.apitoken import ApiToken
 from sentry.models.apitoken import ApiToken
 from sentry.models.outbox import outbox_context
 from sentry.models.outbox import outbox_context
 from sentry.security.utils import capture_security_activity
 from sentry.security.utils import capture_security_activity
+from sentry.types.token import AuthTokenType
 
 
 
 
 class ApiTokenSerializer(serializers.Serializer):
 class ApiTokenSerializer(serializers.Serializer):
@@ -78,8 +79,8 @@ class ApiTokensEndpoint(Endpoint):
             token = ApiToken.objects.create(
             token = ApiToken.objects.create(
                 user_id=request.user.id,
                 user_id=request.user.id,
                 name=result.get("name", None),
                 name=result.get("name", None),
+                token_type=AuthTokenType.USER,
                 scope_list=result["scopes"],
                 scope_list=result["scopes"],
-                refresh_token=None,
                 expires_at=None,
                 expires_at=None,
             )
             )
 
 

+ 4 - 2
src/sentry/api/serializers/models/apitoken.py

@@ -1,5 +1,6 @@
 from sentry.api.serializers import Serializer, register, serialize
 from sentry.api.serializers import Serializer, register, serialize
 from sentry.models.apitoken import ApiToken
 from sentry.models.apitoken import ApiToken
+from sentry.types.token import AuthTokenType
 
 
 
 
 @register(ApiToken)
 @register(ApiToken)
@@ -30,9 +31,10 @@ class ApiTokenSerializer(Serializer):
         if not attrs["application"]:
         if not attrs["application"]:
             include_token = kwargs.get("include_token", True)
             include_token = kwargs.get("include_token", True)
             if include_token:
             if include_token:
-                data["token"] = obj.token
+                data["token"] = obj.plaintext_token
 
 
-            data["refreshToken"] = obj.refresh_token
+                if not obj.token_type == AuthTokenType.USER:
+                    data["refreshToken"] = obj.plaintext_refresh_token
 
 
         """
         """
         While this is a nullable column at the db level, this should never be empty. If it is, it's a sign that the
         While this is a nullable column at the db level, this should never be empty. If it is, it's a sign that the

+ 184 - 6
src/sentry/models/apitoken.py

@@ -1,5 +1,6 @@
 from __future__ import annotations
 from __future__ import annotations
 
 
+import hashlib
 import secrets
 import secrets
 from collections.abc import Collection
 from collections.abc import Collection
 from datetime import timedelta
 from datetime import timedelta
@@ -22,16 +23,82 @@ from sentry.types.region import find_all_region_names
 from sentry.types.token import AuthTokenType
 from sentry.types.token import AuthTokenType
 
 
 DEFAULT_EXPIRATION = timedelta(days=30)
 DEFAULT_EXPIRATION = timedelta(days=30)
+TOKEN_REDACTED = "***REDACTED***"
 
 
 
 
 def default_expiration():
 def default_expiration():
     return timezone.now() + DEFAULT_EXPIRATION
     return timezone.now() + DEFAULT_EXPIRATION
 
 
 
 
-def generate_token():
+def generate_token(token_type: AuthTokenType | str | None = AuthTokenType.__empty__) -> str:
+    if token_type:
+        return f"{token_type}{secrets.token_hex(nbytes=32)}"
+
     return secrets.token_hex(nbytes=32)
     return secrets.token_hex(nbytes=32)
 
 
 
 
+class PlaintextSecretAlreadyRead(Exception):
+    """the secret you are trying to read is read-once and cannot be accessed directly again"""
+
+    pass
+
+
+class NotSupported(Exception):
+    """the method you called is not supported by this token type"""
+
+    pass
+
+
+class ApiTokenManager(ControlOutboxProducingManager):
+    def create(self, *args, **kwargs):
+        token_type: AuthTokenType | None = kwargs.get("token_type", None)
+
+        # Typically the .create() method is called with `refresh_token=None` as an
+        # argument when we specifically do not want a refresh_token.
+        #
+        # But if it is not None or not specified, we should generate a token since
+        # that is the expected behavior... the refresh_token field on ApiToken has
+        # a default of generate_token()
+        #
+        # TODO(mdtro): All of these if/else statements will be cleaned up at a later time
+        #   to use a match statment on the AuthTokenType. Move each of the various token type
+        #   create calls one at a time.
+        if "refresh_token" in kwargs:
+            plaintext_refresh_token = kwargs["refresh_token"]
+        else:
+            plaintext_refresh_token = generate_token()
+
+        if token_type == AuthTokenType.USER:
+            plaintext_token = generate_token(token_type=AuthTokenType.USER)
+            plaintext_refresh_token = None  # user auth tokens do not have refresh tokens
+        else:
+            # to maintain compatibility with current
+            # code that currently calls create with token= specified
+            if "token" in kwargs:
+                plaintext_token = kwargs["token"]
+            else:
+                plaintext_token = generate_token()
+
+        if options.get("apitoken.save-hash-on-create"):
+            kwargs["hashed_token"] = hashlib.sha256(plaintext_token.encode()).hexdigest()
+
+            if plaintext_refresh_token:
+                kwargs["hashed_refresh_token"] = hashlib.sha256(
+                    plaintext_refresh_token.encode()
+                ).hexdigest()
+
+        kwargs["token"] = plaintext_token
+        kwargs["refresh_token"] = plaintext_refresh_token
+
+        api_token = super().create(*args, **kwargs)
+
+        # Store the plaintext tokens for one-time retrieval
+        api_token._set_plaintext_token(token=plaintext_token)
+        api_token._set_plaintext_refresh_token(token=plaintext_refresh_token)
+
+        return api_token
+
+
 @control_silo_only_model
 @control_silo_only_model
 class ApiToken(ReplicatedControlModel, HasApiScopes):
 class ApiToken(ReplicatedControlModel, HasApiScopes):
     __relocation_scope__ = {RelocationScope.Global, RelocationScope.Config}
     __relocation_scope__ = {RelocationScope.Global, RelocationScope.Config}
@@ -50,7 +117,7 @@ class ApiToken(ReplicatedControlModel, HasApiScopes):
     expires_at = models.DateTimeField(null=True, default=default_expiration)
     expires_at = models.DateTimeField(null=True, default=default_expiration)
     date_added = models.DateTimeField(default=timezone.now)
     date_added = models.DateTimeField(default=timezone.now)
 
 
-    objects: ClassVar[ControlOutboxProducingManager[ApiToken]] = ControlOutboxProducingManager(
+    objects: ClassVar[ControlOutboxProducingManager[ApiToken]] = ApiTokenManager(
         cache_fields=("token",)
         cache_fields=("token",)
     )
     )
 
 
@@ -63,12 +130,117 @@ class ApiToken(ReplicatedControlModel, HasApiScopes):
     def __str__(self):
     def __str__(self):
         return force_str(self.token)
         return force_str(self.token)
 
 
+    def _set_plaintext_token(self, token: str) -> None:
+        """Set the plaintext token for one-time reading
+        This function should only be called from the model's
+        manager class.
+
+        :param token: A plaintext string of the token
+        :raises PlaintextSecretAlreadyRead: when the token has already been read once
+        """
+        existing_token: str | None = None
+        try:
+            existing_token = self.__plaintext_token
+        except AttributeError:
+            self.__plaintext_token: str = token
+
+        if existing_token == TOKEN_REDACTED:
+            raise PlaintextSecretAlreadyRead()
+
+    def _set_plaintext_refresh_token(self, token: str) -> None:
+        """Set the plaintext refresh token for one-time reading
+        This function should only be called from the model's
+        manager class.
+
+        :param token: A plaintext string of the refresh token
+        :raises PlaintextSecretAlreadyRead: if the token has already been read once
+        """
+        existing_refresh_token: str | None = None
+        try:
+            existing_refresh_token = self.__plaintext_refresh_token
+        except AttributeError:
+            self.__plaintext_refresh_token: str = token
+
+        if existing_refresh_token == TOKEN_REDACTED:
+            raise PlaintextSecretAlreadyRead()
+
+    @property
+    def plaintext_token(self) -> str:
+        """The plaintext value of the token
+        To be called immediately after creation of a new `ApiToken` to return the
+        plaintext token to the user. After reading the token, the plaintext token
+        string will be set to `TOKEN_REDACTED` to prevent future accidental leaking
+        of the token in logs, exceptions, etc.
+
+        :raises PlaintextSecretAlreadyRead: if the token has already been read once
+        :return: the plaintext value of the token
+        """
+        token = self.__plaintext_token
+        if token == TOKEN_REDACTED:
+            raise PlaintextSecretAlreadyRead()
+
+        self.__plaintext_token = TOKEN_REDACTED
+
+        return token
+
+    @property
+    def plaintext_refresh_token(self) -> str:
+        """The plaintext value of the refresh token
+        To be called immediately after creation of a new `ApiToken` to return the
+        plaintext token to the user. After reading the token, the plaintext token
+        string will be set to `TOKEN_REDACTED` to prevent future accidental leaking
+        of the token in logs, exceptions, etc.
+
+        :raises PlaintextSecretAlreadyRead: if the refresh token has already been read once
+        :raises NotSupported: if called on a User Auth Token
+        :return: the plaintext value of the refresh token
+        """
+        if not self.refresh_token and not self.hashed_refresh_token:
+            raise NotSupported("This API token type does not support refresh tokens")
+
+        token = self.__plaintext_refresh_token
+        if token == TOKEN_REDACTED:
+            raise PlaintextSecretAlreadyRead()
+
+        self.__plaintext_refresh_token = TOKEN_REDACTED
+
+        return token
+
     def save(self, *args: Any, **kwargs: Any) -> None:
     def save(self, *args: Any, **kwargs: Any) -> None:
+        if options.get("apitoken.save-hash-on-create"):
+            self.hashed_token = hashlib.sha256(self.token.encode()).hexdigest()
+
+            if self.refresh_token:
+                self.hashed_refresh_token = hashlib.sha256(self.refresh_token.encode()).hexdigest()
+            else:
+                # The backup tests create a token with a refresh_token and then clear it out.
+                # So if the refresh_token is None, wipe out any hashed value that may exist too.
+                # https://github.com/getsentry/sentry/blob/1fc699564e79c62bff6cc3c168a49bfceadcac52/tests/sentry/backup/test_imports.py#L1306
+                self.hashed_refresh_token = None
+
         if options.get("apitoken.auto-add-last-chars"):
         if options.get("apitoken.auto-add-last-chars"):
             token_last_characters = self.token[-4:]
             token_last_characters = self.token[-4:]
             self.token_last_characters = token_last_characters
             self.token_last_characters = token_last_characters
 
 
-        return super().save(**kwargs)
+        return super().save(*args, **kwargs)
+
+    def update(self, *args: Any, **kwargs: Any) -> int:
+        # if the token or refresh_token was updated, we need to
+        # re-calculate the hashed values
+        if options.get("apitoken.save-hash-on-create"):
+            if "token" in kwargs:
+                kwargs["hashed_token"] = hashlib.sha256(kwargs["token"].encode()).hexdigest()
+
+            if "refresh_token" in kwargs:
+                kwargs["hashed_refresh_token"] = hashlib.sha256(
+                    kwargs["refresh_token"].encode()
+                ).hexdigest()
+
+        if options.get("apitoken.auto-add-last-chars"):
+            if "token" in kwargs:
+                kwargs["token_last_characters"] = kwargs["token"][-4:]
+
+        return super().update(*args, **kwargs)
 
 
     def outbox_region_names(self) -> Collection[str]:
     def outbox_region_names(self) -> Collection[str]:
         return list(find_all_region_names())
         return list(find_all_region_names())
@@ -104,10 +276,16 @@ class ApiToken(ReplicatedControlModel, HasApiScopes):
         return ()
         return ()
 
 
     def refresh(self, expires_at=None):
     def refresh(self, expires_at=None):
+        if self.token_type == AuthTokenType.USER:
+            raise NotSupported("User auth tokens do not support refreshing the token")
+
         if expires_at is None:
         if expires_at is None:
             expires_at = timezone.now() + DEFAULT_EXPIRATION
             expires_at = timezone.now() + DEFAULT_EXPIRATION
 
 
-        self.update(token=generate_token(), refresh_token=generate_token(), expires_at=expires_at)
+        new_token = generate_token(token_type=self.token_type)
+        new_refresh_token = generate_token(token_type=self.token_type)
+
+        self.update(token=new_token, refresh_token=new_refresh_token, expires_at=expires_at)
 
 
     def get_relocation_scope(self) -> RelocationScope:
     def get_relocation_scope(self) -> RelocationScope:
         if self.application_id is not None:
         if self.application_id is not None:
@@ -125,9 +303,9 @@ class ApiToken(ReplicatedControlModel, HasApiScopes):
         )
         )
         existing = self.__class__.objects.filter(query).first()
         existing = self.__class__.objects.filter(query).first()
         if existing:
         if existing:
-            self.token = generate_token()
+            self.token = generate_token(token_type=self.token_type)
             if self.refresh_token is not None:
             if self.refresh_token is not None:
-                self.refresh_token = generate_token()
+                self.refresh_token = generate_token(token_type=self.token_type)
             if self.expires_at is not None:
             if self.expires_at is not None:
                 self.expires_at = timezone.now() + DEFAULT_EXPIRATION
                 self.expires_at = timezone.now() + DEFAULT_EXPIRATION
 
 

+ 2 - 0
src/sentry/testutils/factories.py

@@ -148,6 +148,7 @@ from sentry.testutils.silo import assume_test_silo_mode
 from sentry.types.activity import ActivityType
 from sentry.types.activity import ActivityType
 from sentry.types.integrations import ExternalProviders
 from sentry.types.integrations import ExternalProviders
 from sentry.types.region import Region, get_local_region, get_region_by_name
 from sentry.types.region import Region, get_local_region, get_region_by_name
+from sentry.types.token import AuthTokenType
 from sentry.utils import json, loremipsum
 from sentry.utils import json, loremipsum
 from sentry.utils.performance_issues.performance_problem import PerformanceProblem
 from sentry.utils.performance_issues.performance_problem import PerformanceProblem
 from social_auth.models import UserSocialAuth
 from social_auth.models import UserSocialAuth
@@ -423,6 +424,7 @@ class Factories:
         return ApiToken.objects.create(
         return ApiToken.objects.create(
             user=user,
             user=user,
             scope_list=scope_list,
             scope_list=scope_list,
+            token_type=AuthTokenType.USER,
             **kwargs,
             **kwargs,
         )
         )
 
 

+ 5 - 1
src/sentry/testutils/helpers/backups.py

@@ -99,6 +99,7 @@ from sentry.silo.base import SiloMode
 from sentry.testutils.cases import TransactionTestCase
 from sentry.testutils.cases import TransactionTestCase
 from sentry.testutils.factories import get_fixture_path
 from sentry.testutils.factories import get_fixture_path
 from sentry.testutils.silo import assume_test_silo_mode
 from sentry.testutils.silo import assume_test_silo_mode
+from sentry.types.token import AuthTokenType
 from sentry.utils import json
 from sentry.utils import json
 from sentry.utils.json import JSONData
 from sentry.utils.json import JSONData
 
 
@@ -632,7 +633,10 @@ class BackupTestCase(TransactionTestCase):
         ControlOption.objects.create(key="bar", value="b")
         ControlOption.objects.create(key="bar", value="b")
         ApiAuthorization.objects.create(user=owner)
         ApiAuthorization.objects.create(user=owner)
         ApiToken.objects.create(
         ApiToken.objects.create(
-            user=owner, expires_at=None, name="create_exhaustive_global_configs"
+            user=owner,
+            expires_at=None,
+            name="create_exhaustive_global_configs",
+            token_type=AuthTokenType.USER,
         )
         )
 
 
     @assume_test_silo_mode(SiloMode.REGION)
     @assume_test_silo_mode(SiloMode.REGION)

+ 2 - 1
src/sentry/web/frontend/setup_wizard.py

@@ -24,6 +24,7 @@ from sentry.services.hybrid_cloud.project.service import project_service
 from sentry.services.hybrid_cloud.project_key.model import ProjectKeyRole
 from sentry.services.hybrid_cloud.project_key.model import ProjectKeyRole
 from sentry.services.hybrid_cloud.project_key.service import project_key_service
 from sentry.services.hybrid_cloud.project_key.service import project_key_service
 from sentry.services.hybrid_cloud.user.model import RpcUser
 from sentry.services.hybrid_cloud.user.model import RpcUser
+from sentry.types.token import AuthTokenType
 from sentry.utils.http import absolute_uri
 from sentry.utils.http import absolute_uri
 from sentry.utils.security.orgauthtoken_token import (
 from sentry.utils.security.orgauthtoken_token import (
     SystemUrlPrefixMissingException,
     SystemUrlPrefixMissingException,
@@ -159,7 +160,7 @@ def get_token(mappings: list[OrganizationMapping], user: RpcUser):
         token = ApiToken.objects.create(
         token = ApiToken.objects.create(
             user_id=user.id,
             user_id=user.id,
             scope_list=["project:releases"],
             scope_list=["project:releases"],
-            refresh_token=None,
+            token_type=AuthTokenType.USER,
             expires_at=None,
             expires_at=None,
         )
         )
     return serialize(token)
     return serialize(token)

+ 31 - 0
tests/sentry/api/serializers/test_apitoken.py

@@ -1,5 +1,9 @@
 from sentry.api.serializers import ApiTokenSerializer
 from sentry.api.serializers import ApiTokenSerializer
+from sentry.models.apitoken import ApiToken
+from sentry.silo.base import SiloMode
 from sentry.testutils.cases import TestCase
 from sentry.testutils.cases import TestCase
+from sentry.testutils.helpers.options import override_options
+from sentry.testutils.silo import assume_test_silo_mode
 
 
 
 
 class TestApiTokenSerializer(TestCase):
 class TestApiTokenSerializer(TestCase):
@@ -38,6 +42,33 @@ class TestIncludeTokenFlag(TestApiTokenSerializer):
         assert "token" not in serialized_object
         assert "token" not in serialized_object
 
 
 
 
+class TestRefreshTokens(TestApiTokenSerializer):
+    def setUp(self) -> None:
+        super().setUp()
+        attrs = self._serializer.get_attrs(item_list=[self._token], user=self._user)
+        attrs["application"] = None
+        self._attrs = attrs
+
+    def test_no_refresh_token_on_user_token(self) -> None:
+        serialized_object = self._serializer.serialize(
+            obj=self._token, user=self._user, attrs=self._attrs
+        )
+
+        assert "refreshToken" not in serialized_object
+
+    @override_options({"apitoken.save-hash-on-create": True})
+    def test_refresh_token_on_non_user_token(self) -> None:
+        with assume_test_silo_mode(SiloMode.CONTROL):
+            token = ApiToken.objects.create(user=self._user)
+            assert token.hashed_refresh_token is not None
+
+            serialized_object = self._serializer.serialize(
+                obj=token, user=self._user, attrs=self._attrs
+            )
+
+            assert "refreshToken" in serialized_object
+
+
 class TestLastTokenCharacters(TestApiTokenSerializer):
 class TestLastTokenCharacters(TestApiTokenSerializer):
     def test_field_is_returned(self) -> None:
     def test_field_is_returned(self) -> None:
         attrs = self._serializer.get_attrs(item_list=[self._token], user=self._user)
         attrs = self._serializer.get_attrs(item_list=[self._token], user=self._user)

+ 2 - 2
tests/sentry/api/test_authentication.py

@@ -181,11 +181,11 @@ class TestTokenAuthentication(TestCase):
 
 
         self.auth = UserAuthTokenAuthentication()
         self.auth = UserAuthTokenAuthentication()
         self.org = self.create_organization(owner=self.user)
         self.org = self.create_organization(owner=self.user)
-        self.token = "abc123"
         self.api_token = ApiToken.objects.create(
         self.api_token = ApiToken.objects.create(
-            token=self.token,
+            token_type=AuthTokenType.USER,
             user=self.user,
             user=self.user,
         )
         )
+        self.token = self.api_token.plaintext_token
 
 
     def test_authenticate(self):
     def test_authenticate(self):
         request = HttpRequest()
         request = HttpRequest()

File diff suppressed because it is too large
+ 204 - 204
tests/sentry/backup/snapshots/ReleaseTests/test_at_head.pysnap


+ 101 - 1
tests/sentry/models/test_apitoken.py

@@ -1,10 +1,12 @@
+import hashlib
 from datetime import timedelta
 from datetime import timedelta
 
 
+import pytest
 from django.utils import timezone
 from django.utils import timezone
 
 
 from sentry.conf.server import SENTRY_SCOPE_HIERARCHY_MAPPING, SENTRY_SCOPES
 from sentry.conf.server import SENTRY_SCOPE_HIERARCHY_MAPPING, SENTRY_SCOPES
 from sentry.hybridcloud.models import ApiTokenReplica
 from sentry.hybridcloud.models import ApiTokenReplica
-from sentry.models.apitoken import ApiToken
+from sentry.models.apitoken import ApiToken, NotSupported, PlaintextSecretAlreadyRead
 from sentry.models.integrations.sentry_app_installation import SentryAppInstallation
 from sentry.models.integrations.sentry_app_installation import SentryAppInstallation
 from sentry.models.integrations.sentry_app_installation_token import SentryAppInstallationToken
 from sentry.models.integrations.sentry_app_installation_token import SentryAppInstallationToken
 from sentry.silo import SiloMode
 from sentry.silo import SiloMode
@@ -12,6 +14,7 @@ from sentry.testutils.cases import TestCase
 from sentry.testutils.helpers import override_options
 from sentry.testutils.helpers import override_options
 from sentry.testutils.outbox import outbox_runner
 from sentry.testutils.outbox import outbox_runner
 from sentry.testutils.silo import assume_test_silo_mode, control_silo_test
 from sentry.testutils.silo import assume_test_silo_mode, control_silo_test
+from sentry.types.token import AuthTokenType
 
 
 
 
 @control_silo_test
 @control_silo_test
@@ -74,6 +77,103 @@ class ApiTokenTest(TestCase):
         token = ApiToken.objects.create(user_id=user.id)
         token = ApiToken.objects.create(user_id=user.id)
         assert token.token_last_characters is None
         assert token.token_last_characters is None
 
 
+    @override_options({"apitoken.save-hash-on-create": True})
+    def test_hash_exists_on_token(self):
+        user = self.create_user()
+        token = ApiToken.objects.create(user_id=user.id)
+        assert token.hashed_token is not None
+        assert token.hashed_refresh_token is not None
+
+    @override_options({"apitoken.save-hash-on-create": True})
+    def test_hash_exists_on_user_token(self):
+        user = self.create_user()
+        token = ApiToken.objects.create(user_id=user.id, token_type=AuthTokenType.USER)
+        assert token.hashed_token is not None
+        assert len(token.hashed_token) == 64  # sha256 hash
+        assert token.hashed_refresh_token is None  # user auth tokens don't have refresh tokens
+
+    @override_options({"apitoken.save-hash-on-create": False})
+    def test_hash_does_not_exist_on_user_token_with_option_off(self):
+        user = self.create_user()
+        token = ApiToken.objects.create(user_id=user.id, token_type=AuthTokenType.USER)
+        assert token.hashed_token is None
+        assert token.hashed_refresh_token is None  # user auth tokens don't have refresh tokens
+
+    @override_options({"apitoken.save-hash-on-create": False})
+    def test_can_access_read_once_tokens_with_option_off(self):
+        user = self.create_user()
+        token = ApiToken.objects.create(user_id=user.id)
+        assert token.hashed_token is None
+        assert token.hashed_refresh_token is None
+
+        assert token.plaintext_token is not None
+        assert token.plaintext_refresh_token is not None
+
+        # we accessed the tokens above when we asserted it was not None
+        # accessing them again should throw an exception
+        with pytest.raises(PlaintextSecretAlreadyRead):
+            _ = token.plaintext_token
+
+        with pytest.raises(PlaintextSecretAlreadyRead):
+            _ = token.plaintext_refresh_token
+
+    @override_options({"apitoken.save-hash-on-create": True})
+    def test_plaintext_values_only_available_immediately_after_create(self):
+        user = self.create_user()
+        token = ApiToken.objects.create(user_id=user.id)
+        assert token.plaintext_token is not None
+        assert token.plaintext_refresh_token is not None
+
+        # we accessed the tokens above when we asserted it was not None
+        # accessing them again should throw an exception
+        with pytest.raises(PlaintextSecretAlreadyRead):
+            _ = token.plaintext_token
+
+        with pytest.raises(PlaintextSecretAlreadyRead):
+            _ = token.plaintext_refresh_token
+
+    @override_options({"apitoken.save-hash-on-create": True})
+    def test_error_when_accessing_refresh_token_on_user_token(self):
+        user = self.create_user()
+        token = ApiToken.objects.create(user_id=user.id, token_type=AuthTokenType.USER)
+
+        with pytest.raises(NotSupported):
+            assert token.plaintext_refresh_token is not None
+
+    @override_options({"apitoken.save-hash-on-create": True})
+    def test_user_auth_token_refresh_raises_error(self):
+        user = self.create_user()
+        token = ApiToken.objects.create(user_id=user.id, token_type=AuthTokenType.USER)
+
+        with pytest.raises(NotSupported):
+            token.refresh()
+
+    @override_options({"apitoken.save-hash-on-create": True})
+    def test_user_auth_token_sha256_hash(self):
+        user = self.create_user()
+        token = ApiToken.objects.create(user_id=user.id, token_type=AuthTokenType.USER)
+        expected_hash = hashlib.sha256(token.plaintext_token.encode()).hexdigest()
+        assert expected_hash == token.hashed_token
+
+    @override_options({"apitoken.save-hash-on-create": True})
+    def test_hash_updated_when_calling_update(self):
+        user = self.create_user()
+        token = ApiToken.objects.create(user_id=user.id)
+        initial_expected_hash = hashlib.sha256(token.plaintext_token.encode()).hexdigest()
+        assert initial_expected_hash == token.hashed_token
+
+        new_token = "abc1234"
+        new_token_expected_hash = hashlib.sha256(new_token.encode()).hexdigest()
+
+        with assume_test_silo_mode(SiloMode.CONTROL):
+            with outbox_runner():
+                token.update(token=new_token)
+
+        token.refresh_from_db()
+
+        assert token.token_last_characters == "1234"
+        assert token.hashed_token == new_token_expected_hash
+
 
 
 @control_silo_test
 @control_silo_test
 class ApiTokenInternalIntegrationTest(TestCase):
 class ApiTokenInternalIntegrationTest(TestCase):

Some files were not shown because too many files changed in this diff