|
@@ -1,8 +1,11 @@
|
|
|
+from __future__ import annotations
|
|
|
+
|
|
|
from dataclasses import dataclass
|
|
|
from datetime import timedelta
|
|
|
from typing import Any, Dict
|
|
|
|
|
|
from django.urls import reverse
|
|
|
+from rb.clients import LocalClient
|
|
|
|
|
|
from sentry import options
|
|
|
from sentry.models import AuthProvider, Organization, OrganizationMember, User
|
|
@@ -22,7 +25,7 @@ def send_one_time_account_confirm_link(
|
|
|
provider: AuthProvider,
|
|
|
email: str,
|
|
|
identity_id: str,
|
|
|
-) -> "AccountConfirmLink":
|
|
|
+) -> AccountConfirmLink:
|
|
|
"""Store and email a verification key for IdP migration.
|
|
|
|
|
|
Create a one-time verification key for a user whose SSO identity
|
|
@@ -42,7 +45,7 @@ def send_one_time_account_confirm_link(
|
|
|
return link
|
|
|
|
|
|
|
|
|
-def get_redis_cluster():
|
|
|
+def get_redis_cluster() -> LocalClient:
|
|
|
return redis.clusters.get("default").get_local_client_for_key(_REDIS_KEY)
|
|
|
|
|
|
|
|
@@ -54,7 +57,7 @@ class AccountConfirmLink:
|
|
|
email: str
|
|
|
identity_id: str
|
|
|
|
|
|
- def __post_init__(self):
|
|
|
+ def __post_init__(self) -> None:
|
|
|
self.verification_code = get_secure_token()
|
|
|
self.verification_key = f"auth:one-time-key:{self.verification_code}"
|
|
|
|
|
@@ -105,17 +108,18 @@ class AccountConfirmLink:
|
|
|
)
|
|
|
|
|
|
|
|
|
-def get_verification_value_from_key(key: str) -> Dict[str, Any]:
|
|
|
+def get_verification_value_from_key(key: str) -> Dict[str, Any] | None:
|
|
|
cluster = get_redis_cluster()
|
|
|
verification_key = f"auth:one-time-key:{key}"
|
|
|
- verification_value = cluster.get(verification_key)
|
|
|
- if verification_value:
|
|
|
- verification_value = json.loads(verification_value)
|
|
|
- metrics.incr(
|
|
|
- "idpmigration.confirmation_success",
|
|
|
- tags={key: verification_value.get(key) for key in ("provider", "organization_id")},
|
|
|
- sample_rate=1.0,
|
|
|
- )
|
|
|
- else:
|
|
|
+ verification_str = cluster.get(verification_key)
|
|
|
+ if verification_str is None:
|
|
|
metrics.incr("idpmigration.confirmation_failure", sample_rate=1.0)
|
|
|
+ return None
|
|
|
+
|
|
|
+ verification_value: Dict[str, Any] = json.loads(verification_str)
|
|
|
+ metrics.incr(
|
|
|
+ "idpmigration.confirmation_success",
|
|
|
+ tags={key: verification_value.get(key) for key in ("provider", "organization_id")},
|
|
|
+ sample_rate=1.0,
|
|
|
+ )
|
|
|
return verification_value
|