|
@@ -1,8 +1,12 @@
|
|
|
+import ipaddress
|
|
|
from collections.abc import Callable
|
|
|
from copy import deepcopy
|
|
|
from dataclasses import dataclass
|
|
|
from datetime import UTC, datetime, timedelta, timezone
|
|
|
+from ipaddress import IPv4Address, IPv6Address, ip_address
|
|
|
from random import choice, randint
|
|
|
+from urllib.parse import urlparse, urlunparse
|
|
|
+from uuid import UUID, uuid4
|
|
|
|
|
|
import petname
|
|
|
from dateutil.parser import parse as parse_datetime
|
|
@@ -35,6 +39,17 @@ UPPER_CASE_NON_HEX = {
|
|
|
LOWER_CASE_HEX = {c.lower() for c in UPPER_CASE_HEX}
|
|
|
LOWER_CASE_NON_HEX = {c.lower() for c in UPPER_CASE_NON_HEX}
|
|
|
|
|
|
+MAX_IPV4 = (2**ipaddress.IPV4LENGTH) - 1
|
|
|
+MAX_IPV6 = (2**ipaddress.IPV6LENGTH) - 1
|
|
|
+
|
|
|
+
|
|
|
+def random_ipv4():
|
|
|
+ return str(ipaddress.IPv4Address(randint(0, MAX_IPV4)))
|
|
|
+
|
|
|
+
|
|
|
+def random_ipv6():
|
|
|
+ return str(ipaddress.IPv6Address(randint(0, MAX_IPV6)))
|
|
|
+
|
|
|
|
|
|
class SanitizationError(Exception):
|
|
|
"""
|
|
@@ -64,7 +79,8 @@ class UnrecognizedDatetimeError(SanitizationError):
|
|
|
@dataclass
|
|
|
class SanitizableField:
|
|
|
"""
|
|
|
- A pairing a of a `NormalizedModelName` with a field in that model, specifying the target for a sanitization operation.
|
|
|
+ A pairing a of a `NormalizedModelName` with a field in that model, specifying the target for a
|
|
|
+ sanitization operation.
|
|
|
"""
|
|
|
|
|
|
from sentry.backup.dependencies import NormalizedModelName
|
|
@@ -100,7 +116,8 @@ def _set_field_value(json: JSONData, field: SanitizableField, value: JSONData) -
|
|
|
|
|
|
def default_string_sanitizer(old: str) -> str:
|
|
|
"""
|
|
|
- Default string randomizer. Looks at the characters present in the source string to create a new, random string from a roughly similar set of characters.
|
|
|
+ Default string randomizer. Looks at the characters present in the source string to create a new,
|
|
|
+ random string from a roughly similar set of characters.
|
|
|
"""
|
|
|
|
|
|
has_upper_case_hex = False
|
|
@@ -136,7 +153,7 @@ def default_string_sanitizer(old: str) -> str:
|
|
|
if has_digit:
|
|
|
chars += "0123456789"
|
|
|
|
|
|
- return "".join([choice(list(chars)) for _ in range(3)])
|
|
|
+ return "".join([choice(list(chars)) for _ in range(len(old))])
|
|
|
|
|
|
|
|
|
class Sanitizer:
|
|
@@ -174,7 +191,7 @@ class Sanitizer:
|
|
|
|
|
|
def __init__(self, export: JSONData, datetime_offset: timedelta | None = None):
|
|
|
self.json = export
|
|
|
- self.interned_strings = dict()
|
|
|
+ self.interned_strings = {"": ""} # Always map empty string to itself.
|
|
|
self.interned_datetimes = dict()
|
|
|
|
|
|
# Walk the data once, extracting any dates into a set.
|
|
@@ -234,6 +251,34 @@ class Sanitizer:
|
|
|
|
|
|
return "@".join([self.map_string(p) for p in old.split("@")])
|
|
|
|
|
|
+ def map_ip(self, old: str) -> str:
|
|
|
+ """
|
|
|
+ Maps an IP with some randomly generated alternative. If the `old` IP has already been seen,
|
|
|
+ the already-generated value for that existing key will be used instead. If it has not, we'll
|
|
|
+ generate a new one. This ensures that all identical existing IPs are swapped with identical
|
|
|
+ replacements everywhere they occur.
|
|
|
+
|
|
|
+ If the `old` string is not a valid IP, it will be treated as a regular string.
|
|
|
+
|
|
|
+ If you wish to update an actual JSON model in-place with this newly generated IP, `set_ip()`
|
|
|
+ is the preferred method for doing so.
|
|
|
+ """
|
|
|
+
|
|
|
+ interned = self.interned_strings.get(old)
|
|
|
+ if interned is not None:
|
|
|
+ return interned
|
|
|
+
|
|
|
+ try:
|
|
|
+ old_ip = ip_address(old)
|
|
|
+ except ValueError:
|
|
|
+ return self.map_string(old)
|
|
|
+ else:
|
|
|
+ if isinstance(old_ip, IPv4Address):
|
|
|
+ self.interned_strings[old] = random_ipv4()
|
|
|
+ elif isinstance(old_ip, IPv6Address):
|
|
|
+ self.interned_strings[old] = random_ipv6()
|
|
|
+ return self.interned_strings[old]
|
|
|
+
|
|
|
def map_name(self, old: str) -> str:
|
|
|
"""
|
|
|
Maps a proper noun name with some randomly generated "petname" value (ex: "Hairy Tortoise").
|
|
@@ -285,6 +330,54 @@ class Sanitizer:
|
|
|
self.interned_strings[old] = new
|
|
|
return new
|
|
|
|
|
|
+ def map_url(self, old: str) -> str:
|
|
|
+ """
|
|
|
+ Map an URL in a manner that retains domain relationships - ie, all sanitized URLs from
|
|
|
+ domain `.foo` will now be from `.bar`. Scheme, subdomains, paths, etc will be retained in
|
|
|
+ kind (ie: if we have a path, we will sanitize it with a similar, interned one). This ensures
|
|
|
+ that all identical existing URLs are swapped with identical replacements everywhere they
|
|
|
+ occur.
|
|
|
+
|
|
|
+ If the `old` string is not a valid URL , it will be treated as a regular string.
|
|
|
+
|
|
|
+ If you wish to update an actual JSON model in-place with this newly generated URL,
|
|
|
+ `set_URL()` is the preferred method for doing so.
|
|
|
+ """
|
|
|
+
|
|
|
+ url = urlparse(old)
|
|
|
+ hostname = url.hostname
|
|
|
+ if not hostname:
|
|
|
+ return self.map_string(old)
|
|
|
+
|
|
|
+ new_path = (
|
|
|
+ "" if not url.path else ".".join([self.map_string(p) for p in url.path.split(".")])
|
|
|
+ )
|
|
|
+ new_query = (
|
|
|
+ "" if not url.query else ".".join([self.map_string(q) for q in url.query.split(".")])
|
|
|
+ )
|
|
|
+ return urlunparse(
|
|
|
+ url._replace(
|
|
|
+ scheme=self.map_string(url.scheme) if url.scheme else "",
|
|
|
+ netloc=".".join([self.map_string(d) for d in hostname.split(".")]),
|
|
|
+ path=new_path,
|
|
|
+ query=new_query,
|
|
|
+ fragment=self.map_string(url.fragment) if url.fragment else "",
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
+ def map_uuid(self, old: str) -> str:
|
|
|
+ """
|
|
|
+ Maps a UUID. If the `old` UUID has already been seen, the already-generated value for that
|
|
|
+ existing key will be used instead. If it has not, we'll generate a new one. This ensures
|
|
|
+ that all identical existing UUIDs are swapped with identical replacements everywhere they
|
|
|
+ occur.
|
|
|
+
|
|
|
+ If you wish to update an actual JSON model in-place with this newly generated name,
|
|
|
+ `set_uuid()` is the preferred method for doing so.
|
|
|
+ """
|
|
|
+
|
|
|
+ return self.map_string(old, lambda _: str(uuid4()))
|
|
|
+
|
|
|
def set_datetime(self, json: JSONData, field: SanitizableField) -> datetime | None:
|
|
|
"""
|
|
|
Replaces a datetime by replacing it with a different, but still correctly ordered,
|
|
@@ -334,6 +427,34 @@ class Sanitizer:
|
|
|
|
|
|
return _set_field_value(json, field, self.map_email(old))
|
|
|
|
|
|
+ def set_ip(
|
|
|
+ self,
|
|
|
+ json: JSONData,
|
|
|
+ field: SanitizableField,
|
|
|
+ ) -> str | None:
|
|
|
+ """
|
|
|
+ Replaces a IP with a randomly generated value. If the existing value of the IP has
|
|
|
+ already been seen, the already-generated value for that existing key will be used instead.
|
|
|
+ If it has not, we'll generate a new one. This ensures that all identical existing IPs are
|
|
|
+ swapped with identical replacements everywhere they occur.
|
|
|
+
|
|
|
+ This method updates the JSON in-place if the specified field is a non-null value, then
|
|
|
+ returns the newly generated replacement. If the specified field could not be found in the
|
|
|
+ supplied JSON model, `None` is returned instead.
|
|
|
+
|
|
|
+ If you wish to merely generate a string without updating the JSON in-place, consider using
|
|
|
+ `map_ip()` instead.
|
|
|
+ """
|
|
|
+
|
|
|
+ field.validate_json_model(json)
|
|
|
+ old = _get_field_value(json, field)
|
|
|
+ if old is None:
|
|
|
+ return None
|
|
|
+ if not isinstance(old, str):
|
|
|
+ raise TypeError("Existing value must be a string")
|
|
|
+
|
|
|
+ return _set_field_value(json, field, self.map_ip(old))
|
|
|
+
|
|
|
def set_name(
|
|
|
self,
|
|
|
json: JSONData,
|
|
@@ -432,6 +553,68 @@ class Sanitizer:
|
|
|
|
|
|
return _set_field_value(json, field, self.map_string(old, generate))
|
|
|
|
|
|
+ def set_url(
|
|
|
+ self,
|
|
|
+ json: JSONData,
|
|
|
+ field: SanitizableField,
|
|
|
+ ) -> str | None:
|
|
|
+ """
|
|
|
+ Replace a URL in a manner that retains domain relationships - ie, all sanitized URLs from
|
|
|
+ domain `.foo` will now be from `.bar`. Scheme, subdomains, paths, etc will be retained in
|
|
|
+ kind (ie: if we have a path, we will sanitize it with a similar, interned one). This ensures
|
|
|
+ that all identical existing URLs are swapped with identical replacements everywhere they
|
|
|
+ occur.
|
|
|
+
|
|
|
+ If the `old` string is not a valid URL , it will be treated as a regular string.
|
|
|
+
|
|
|
+ This method updates the JSON in-place if the specified field is a non-null value, then
|
|
|
+ returns the newly generated replacement. If the specified field could not be found in the
|
|
|
+ supplied JSON model, `None` is returned instead.
|
|
|
+
|
|
|
+ If you wish to merely generate a string without updating the JSON in-place, consider using
|
|
|
+ `map_url()` instead.
|
|
|
+ """
|
|
|
+
|
|
|
+ field.validate_json_model(json)
|
|
|
+ old = _get_field_value(json, field)
|
|
|
+ if old is None:
|
|
|
+ return None
|
|
|
+ if not isinstance(old, str):
|
|
|
+ raise TypeError("Existing value must be a string")
|
|
|
+
|
|
|
+ return _set_field_value(json, field, self.map_url(old))
|
|
|
+
|
|
|
+ def set_uuid(
|
|
|
+ self,
|
|
|
+ json: JSONData,
|
|
|
+ field: SanitizableField,
|
|
|
+ ) -> str | None:
|
|
|
+ """
|
|
|
+ Replaces a UUID with a randomly generated value. If the existing value of the UUID has
|
|
|
+ already been seen, the already-generated value for that existing key will be used instead.
|
|
|
+ If it has not, we'll generate a new one. This ensures that all identical existing UUIDs are
|
|
|
+ swapped with identical replacements everywhere they occur.
|
|
|
+
|
|
|
+ This method updates the JSON in-place if the specified field is a non-null value, then
|
|
|
+ returns the newly generated replacement. If the specified field could not be found in the
|
|
|
+ supplied JSON model, `None` is returned instead.
|
|
|
+
|
|
|
+ If you wish to merely generate a string without updating the JSON in-place, consider using
|
|
|
+ `map_uuid()` instead.
|
|
|
+ """
|
|
|
+
|
|
|
+ field.validate_json_model(json)
|
|
|
+ old = _get_field_value(json, field)
|
|
|
+ if old is None:
|
|
|
+ return None
|
|
|
+ if not isinstance(old, str):
|
|
|
+ raise TypeError("Existing value must be a string")
|
|
|
+
|
|
|
+ # Will throw an error if this is not a valid UUID.
|
|
|
+ UUID(old)
|
|
|
+
|
|
|
+ return _set_field_value(json, field, self.map_uuid(old))
|
|
|
+
|
|
|
|
|
|
def sanitize(export: JSONData, datetime_offset: timedelta | None = None) -> JSONData:
|
|
|
"""
|