Browse Source

ref(dynamic-sampling): Cleanup tasks for dynamic sampling (#50721)

Riccardo Busetti 1 year ago
parent
commit
2f146322fc

+ 0 - 17
pyproject.toml

@@ -499,13 +499,6 @@ module = [
     "sentry.discover.endpoints.discover_saved_queries",
     "sentry.discover.endpoints.serializers",
     "sentry.discover.tasks",
-    "sentry.dynamic_sampling.prioritise_projects",
-    "sentry.dynamic_sampling.prioritise_transactions",
-    "sentry.dynamic_sampling.recalibrate_transactions",
-    "sentry.dynamic_sampling.rules.base",
-    "sentry.dynamic_sampling.sliding_window",
-    "sentry.dynamic_sampling.snuba_utils",
-    "sentry.dynamic_sampling.tasks",
     "sentry.event_manager",
     "sentry.eventstore.base",
     "sentry.eventstore.compressor",
@@ -1386,16 +1379,6 @@ module = [
     "tests.sentry.digests.backends.test_redis",
     "tests.sentry.digests.test_notifications",
     "tests.sentry.digests.test_utilities",
-    "tests.sentry.dynamic_sampling.models.test_projects_rebalancing",
-    "tests.sentry.dynamic_sampling.models.test_transactions_rebalancing",
-    "tests.sentry.dynamic_sampling.rules.biases.test_boost_latest_releases_bias",
-    "tests.sentry.dynamic_sampling.test_generate_rules",
-    "tests.sentry.dynamic_sampling.test_logging",
-    "tests.sentry.dynamic_sampling.test_prioritise_projects",
-    "tests.sentry.dynamic_sampling.test_prioritise_transactions",
-    "tests.sentry.dynamic_sampling.test_recalibrate_transactions",
-    "tests.sentry.dynamic_sampling.test_sliding_window",
-    "tests.sentry.dynamic_sampling.test_tasks",
     "tests.sentry.event_manager.interfaces.test_breadcrumbs",
     "tests.sentry.event_manager.interfaces.test_contexts",
     "tests.sentry.event_manager.interfaces.test_csp",

+ 1 - 1
src/sentry/api/serializers/models/organization.py

@@ -556,7 +556,7 @@ class DetailedOrganizationSerializer(OrganizationSerializer):
             team__organization=obj
         ).count()
         context["onboardingTasks"] = serialize(tasks_to_serialize, user)
-        sample_rate = quotas.get_blended_sample_rate(organization_id=obj.id)
+        sample_rate = quotas.get_blended_sample_rate(organization_id=obj.id)  # type:ignore
         context["isDynamicallySampled"] = sample_rate is not None and sample_rate < 1.0
 
         return context

+ 15 - 10
src/sentry/conf/server.py

@@ -733,7 +733,12 @@ CELERY_IMPORTS = (
     "sentry.tasks.user_report",
     "sentry.profiles.task",
     "sentry.release_health.tasks",
-    "sentry.dynamic_sampling.tasks",
+    "sentry.dynamic_sampling.tasks.boost_low_volume_projects",
+    "sentry.dynamic_sampling.tasks.boost_low_volume_transactions",
+    "sentry.dynamic_sampling.tasks.recalibrate_orgs",
+    "sentry.dynamic_sampling.tasks.sliding_window",
+    "sentry.dynamic_sampling.tasks.sliding_window_org",
+    "sentry.dynamic_sampling.tasks.utils",
     "sentry.utils.suspect_resolutions.get_suspect_resolutions",
     "sentry.utils.suspect_resolutions_releases.get_suspect_resolutions_releases",
     "sentry.tasks.derive_code_mappings",
@@ -1055,13 +1060,18 @@ CELERYBEAT_SCHEDULE_REGION = {
         "schedule": crontab(minute=30, hour="0"),
         "options": {"expires": 3600},
     },
-    "dynamic-sampling-prioritize-projects": {
-        "task": "sentry.dynamic_sampling.tasks.prioritise_projects",
+    "dynamic-sampling-boost-low-volume-projects": {
+        "task": "sentry.dynamic_sampling.tasks.boost_low_volume_projects",
         # Run every 5 minutes
         "schedule": crontab(minute="*/5"),
     },
-    "dynamic-sampling-prioritize-transactions": {
-        "task": "sentry.dynamic_sampling.tasks.prioritise_transactions",
+    "dynamic-sampling-boost-low-volume-transactions": {
+        "task": "sentry.dynamic_sampling.tasks.boost_low_volume_transactions",
+        # Run every 5 minutes
+        "schedule": crontab(minute="*/5"),
+    },
+    "dynamic-sampling-recalibrate-orgs": {
+        "task": "sentry.dynamic_sampling.tasks.recalibrate_orgs",
         # Run every 5 minutes
         "schedule": crontab(minute="*/5"),
     },
@@ -1082,11 +1092,6 @@ CELERYBEAT_SCHEDULE_REGION = {
         # TODO: Increase expiry time to x4 once we change this to run weekly
         "options": {"expires": 60 * 60 * 3},
     },
-    "dynamic-sampling-recalibrate-orgs": {
-        "task": "sentry.dynamic_sampling.tasks.recalibrate_orgs",
-        # Run every 5 minutes
-        "schedule": crontab(minute="*/5"),
-    },
     "schedule_auto_transition_new": {
         "task": "sentry.tasks.schedule_auto_transition_new",
         # Run job every 6 hours

+ 0 - 145
src/sentry/dynamic_sampling/prioritise_projects.py

@@ -1,145 +0,0 @@
-import logging
-import time
-from collections import defaultdict
-from datetime import datetime, timedelta
-from typing import List, Mapping, Optional, Sequence, Tuple
-
-from snuba_sdk import (
-    Column,
-    Condition,
-    Direction,
-    Entity,
-    Function,
-    Granularity,
-    LimitBy,
-    Op,
-    OrderBy,
-    Query,
-    Request,
-)
-
-from sentry import options
-from sentry.dynamic_sampling.rules.utils import (
-    DecisionDropCount,
-    DecisionKeepCount,
-    OrganizationId,
-    ProjectId,
-)
-from sentry.dynamic_sampling.snuba_utils import MAX_TRANSACTIONS_PER_PROJECT
-from sentry.sentry_metrics import indexer
-from sentry.snuba.dataset import Dataset, EntityKey
-from sentry.snuba.metrics.naming_layer.mri import TransactionMRI
-from sentry.snuba.referrer import Referrer
-from sentry.utils.snuba import raw_snql_query
-
-logger = logging.getLogger(__name__)
-MAX_SECONDS = 60
-CHUNK_SIZE = 9998  # Snuba's limit is 10000, and we fetch CHUNK_SIZE+1
-
-
-def fetch_projects_with_total_volumes(
-    org_ids: List[int],
-    granularity: Optional[Granularity] = None,
-    query_interval: Optional[timedelta] = None,
-) -> Mapping[OrganizationId, Sequence[Tuple[ProjectId, int, DecisionKeepCount, DecisionDropCount]]]:
-    """
-    This function fetch with pagination orgs and projects with count per root project
-    and also calculates decision count keep/drop per project
-    """
-    if query_interval is None:
-        query_interval = timedelta(hours=1)
-        granularity = Granularity(3600)
-    aggregated_projects = defaultdict(list)
-    start_time = time.time()
-    offset = 0
-    org_ids = list(org_ids)
-    transaction_string_id = indexer.resolve_shared_org("decision")
-    transaction_tag = f"tags_raw[{transaction_string_id}]"
-    sample_rate = int(options.get("dynamic-sampling.prioritise_projects.sample_rate") * 100)
-    metric_id = indexer.resolve_shared_org(str(TransactionMRI.COUNT_PER_ROOT_PROJECT.value))
-    where = [
-        Condition(Column("timestamp"), Op.GTE, datetime.utcnow() - query_interval),
-        Condition(Column("timestamp"), Op.LT, datetime.utcnow()),
-        Condition(Column("metric_id"), Op.EQ, metric_id),
-        Condition(Column("org_id"), Op.IN, org_ids),
-    ]
-    if sample_rate != 100:
-        where += [Condition(Function("modulo", [Column("org_id"), 100]), Op.LT, sample_rate)]
-
-    keep_count = Function(
-        "countIf",
-        [
-            Function(
-                "equals",
-                [Column(transaction_tag), "keep"],
-            )
-        ],
-        alias="keep_count",
-    )
-    drop_count = Function(
-        "countIf",
-        [
-            Function(
-                "equals",
-                [Column(transaction_tag), "drop"],
-            )
-        ],
-        alias="drop_count",
-    )
-
-    while (time.time() - start_time) < MAX_SECONDS:
-        query = (
-            Query(
-                match=Entity(EntityKey.GenericOrgMetricsCounters.value),
-                select=[
-                    Function("sum", [Column("value")], "root_count_value"),
-                    Column("org_id"),
-                    Column("project_id"),
-                    keep_count,
-                    drop_count,
-                ],
-                groupby=[Column("org_id"), Column("project_id")],
-                where=where,
-                granularity=granularity,
-                orderby=[
-                    OrderBy(Column("org_id"), Direction.ASC),
-                    OrderBy(Column("project_id"), Direction.ASC),
-                ],
-            )
-            .set_limitby(
-                LimitBy(
-                    columns=[Column("org_id"), Column("project_id")],
-                    count=MAX_TRANSACTIONS_PER_PROJECT,
-                )
-            )
-            .set_limit(CHUNK_SIZE + 1)
-            .set_offset(offset)
-        )
-        request = Request(
-            dataset=Dataset.PerformanceMetrics.value, app_id="dynamic_sampling", query=query
-        )
-        data = raw_snql_query(
-            request,
-            referrer=Referrer.DYNAMIC_SAMPLING_DISTRIBUTION_FETCH_PROJECTS_WITH_COUNT_PER_ROOT.value,
-        )["data"]
-        count = len(data)
-        more_results = count > CHUNK_SIZE
-        offset += CHUNK_SIZE
-
-        if more_results:
-            data = data[:-1]
-
-        for row in data:
-            aggregated_projects[row["org_id"]].append(
-                (row["project_id"], row["root_count_value"], row["keep_count"], row["drop_count"])
-            )
-
-        if not more_results:
-            break
-    else:
-        logger.error(
-            "",
-            extra={"offset": offset},
-        )
-
-    return aggregated_projects

+ 0 - 103
src/sentry/dynamic_sampling/recalibrate_transactions.py

@@ -1,103 +0,0 @@
-import logging
-from dataclasses import dataclass
-from datetime import datetime, timedelta
-from typing import List
-
-from snuba_sdk import (
-    Column,
-    Condition,
-    Direction,
-    Entity,
-    Function,
-    Granularity,
-    Op,
-    OrderBy,
-    Query,
-    Request,
-)
-
-from sentry.sentry_metrics import indexer
-from sentry.snuba.dataset import Dataset, EntityKey
-from sentry.snuba.metrics.naming_layer.mri import TransactionMRI
-from sentry.snuba.referrer import Referrer
-from sentry.utils.snuba import raw_snql_query
-
-logger = logging.getLogger(__name__)
-
-
-@dataclass(frozen=True)
-class OrganizationDataVolume:
-    """
-    Represents the total and indexed number of transactions received by an organisation
-    (in a particular interval of time).
-    """
-
-    # organisation id
-    org_id: int
-    # total number of transactions
-    total: int
-    # number of transactions indexed ( i.e. kept)
-    indexed: int
-
-
-def fetch_org_volumes(
-    org_ids: List[int], query_interval: timedelta
-) -> List[OrganizationDataVolume]:
-    """
-    Returns the number of total and indexed transactions received by all organisations in the
-    specified interval
-    """
-    transaction_string_id = indexer.resolve_shared_org("decision")
-    transaction_tag = f"tags_raw[{transaction_string_id}]"
-    metric_id = indexer.resolve_shared_org(str(TransactionMRI.COUNT_PER_ROOT_PROJECT.value))
-    where = [
-        Condition(Column("timestamp"), Op.GTE, datetime.utcnow() - query_interval),
-        Condition(Column("timestamp"), Op.LT, datetime.utcnow()),
-        Condition(Column("metric_id"), Op.EQ, metric_id),
-        Condition(Column("org_id"), Op.IN, org_ids),
-    ]
-
-    keep_count = Function(
-        "sumIf",
-        [
-            Column("value"),
-            Function(
-                "equals",
-                [Column(transaction_tag), "keep"],
-            ),
-        ],
-        alias="keep_count",
-    )
-
-    ret_val: List[OrganizationDataVolume] = []
-
-    query = Query(
-        match=Entity(EntityKey.GenericOrgMetricsCounters.value),
-        select=[
-            Function("sum", [Column("value")], "total_count"),
-            Column("org_id"),
-            keep_count,
-        ],
-        groupby=[Column("org_id")],
-        where=where,
-        granularity=Granularity(60),
-        orderby=[
-            OrderBy(Column("org_id"), Direction.ASC),
-        ],
-    )
-    request = Request(
-        dataset=Dataset.PerformanceMetrics.value, app_id="dynamic_sampling", query=query
-    )
-    data = raw_snql_query(
-        request,
-        referrer=Referrer.DYNAMIC_SAMPLING_COUNTERS_GET_ORG_TRANSACTION_VOLUMES.value,
-    )["data"]
-
-    for row in data:
-        ret_val.append(
-            OrganizationDataVolume(
-                org_id=row["org_id"], total=row["total_count"], indexed=row["keep_count"]
-            )
-        )
-
-    return ret_val

+ 3 - 3
src/sentry/dynamic_sampling/rules/base.py

@@ -10,7 +10,7 @@ from sentry.db.models import Model
 from sentry.dynamic_sampling.rules.biases.base import Bias
 from sentry.dynamic_sampling.rules.combine import get_relay_biases_combinator
 from sentry.dynamic_sampling.rules.helpers.prioritise_project import (
-    get_prioritise_by_project_sample_rate,
+    get_boost_low_volume_projects_sample_rate,
 )
 from sentry.dynamic_sampling.rules.helpers.sliding_window import get_sliding_window_sample_rate
 from sentry.dynamic_sampling.rules.logging import log_rules
@@ -61,7 +61,7 @@ def can_boost_new_projects(organization: Organization) -> bool:
 
 
 def get_guarded_blended_sample_rate(organization: Organization, project: Project) -> float:
-    sample_rate = quotas.get_blended_sample_rate(organization_id=organization.id)
+    sample_rate = quotas.get_blended_sample_rate(organization_id=organization.id)  # type:ignore
 
     # If the sample rate is None, it means that dynamic sampling rules shouldn't be generated.
     if sample_rate is None:
@@ -91,7 +91,7 @@ def get_guarded_blended_sample_rate(organization: Organization, project: Project
     else:
         # In case we use the prioritise by project, we want to fall back to the original sample rate in case there are
         # any issues.
-        sample_rate = get_prioritise_by_project_sample_rate(
+        sample_rate = get_boost_low_volume_projects_sample_rate(
             org_id=organization.id, project_id=project.id, error_sample_rate_fallback=sample_rate
         )
 

+ 4 - 3
src/sentry/dynamic_sampling/rules/helpers/prioritise_project.py

@@ -4,15 +4,16 @@ from sentry.dynamic_sampling.rules.helpers.sliding_window import was_sliding_win
 from sentry.dynamic_sampling.rules.utils import get_redis_client_for_ds
 
 
-def generate_prioritise_by_project_cache_key(org_id: int) -> str:
+def generate_boost_low_volume_projects_cache_key(org_id: int) -> str:
+    # The cache key name wasn't be changed, since it would require a migration which is not worth it.
     return f"ds::o:{org_id}:prioritise_projects"
 
 
-def get_prioritise_by_project_sample_rate(
+def get_boost_low_volume_projects_sample_rate(
     org_id: int, project_id: int, error_sample_rate_fallback: float
 ) -> float:
     redis_client = get_redis_client_for_ds()
-    cache_key = generate_prioritise_by_project_cache_key(org_id=org_id)
+    cache_key = generate_boost_low_volume_projects_cache_key(org_id=org_id)
 
     try:
         return float(redis_client.hget(cache_key, project_id))

+ 0 - 173
src/sentry/dynamic_sampling/sliding_window.py

@@ -1,173 +0,0 @@
-import logging
-import time
-from collections import defaultdict
-from datetime import datetime, timedelta
-from typing import List, Mapping, Sequence, Tuple
-
-from snuba_sdk import (
-    Column,
-    Condition,
-    Direction,
-    Entity,
-    Function,
-    Granularity,
-    Op,
-    OrderBy,
-    Query,
-    Request,
-)
-
-from sentry.dynamic_sampling.rules.utils import OrganizationId, ProjectId
-from sentry.sentry_metrics import indexer
-from sentry.snuba.dataset import Dataset, EntityKey
-from sentry.snuba.metrics.naming_layer.mri import TransactionMRI
-from sentry.snuba.referrer import Referrer
-from sentry.utils.snuba import raw_snql_query
-
-logger = logging.getLogger(__name__)
-MAX_SECONDS = 60
-CHUNK_SIZE = 9998  # Snuba's limit is 10000, and we fetch CHUNK_SIZE+1
-
-
-def fetch_projects_with_total_root_transactions_count(
-    org_ids: List[int], window_size: int
-) -> Mapping[OrganizationId, Sequence[Tuple[ProjectId, int]]]:
-    """
-    Fetches tuples of (org_id, project_id) and the respective root transaction counts.
-    """
-    query_interval = timedelta(hours=window_size)
-    granularity = Granularity(3600)
-
-    count_per_root_metric_id = indexer.resolve_shared_org(
-        str(TransactionMRI.COUNT_PER_ROOT_PROJECT.value)
-    )
-    where = [
-        Condition(Column("timestamp"), Op.GTE, datetime.utcnow() - query_interval),
-        Condition(Column("timestamp"), Op.LT, datetime.utcnow()),
-        Condition(Column("metric_id"), Op.EQ, count_per_root_metric_id),
-        Condition(Column("org_id"), Op.IN, list(org_ids)),
-    ]
-
-    start_time = time.time()
-    offset = 0
-    aggregated_projects = defaultdict(list)
-    while (time.time() - start_time) < MAX_SECONDS:
-        query = (
-            Query(
-                match=Entity(EntityKey.GenericOrgMetricsCounters.value),
-                select=[
-                    Function("sum", [Column("value")], "root_count_value"),
-                    Column("org_id"),
-                    Column("project_id"),
-                ],
-                where=where,
-                groupby=[Column("org_id"), Column("project_id")],
-                orderby=[
-                    OrderBy(Column("org_id"), Direction.ASC),
-                    OrderBy(Column("project_id"), Direction.ASC),
-                ],
-                granularity=granularity,
-            )
-            .set_limit(CHUNK_SIZE + 1)
-            .set_offset(offset)
-        )
-
-        request = Request(
-            dataset=Dataset.PerformanceMetrics.value, app_id="dynamic_sampling", query=query
-        )
-
-        data = raw_snql_query(
-            request,
-            referrer=Referrer.DYNAMIC_SAMPLING_DISTRIBUTION_FETCH_PROJECTS_WITH_COUNT_PER_ROOT.value,
-        )["data"]
-
-        count = len(data)
-        more_results = count > CHUNK_SIZE
-        offset += CHUNK_SIZE
-
-        if more_results:
-            data = data[:-1]
-
-        for row in data:
-            aggregated_projects[row["org_id"]].append((row["project_id"], row["root_count_value"]))
-
-        if not more_results:
-            break
-    else:
-        logger.error(
-            f"Fetching the transaction root count of multiple orgs took more than {MAX_SECONDS} seconds.",
-            extra={"offset": offset},
-        )
-
-    return aggregated_projects
-
-
-def fetch_orgs_with_total_root_transactions_count(
-    org_ids: List[int], window_size: int
-) -> Mapping[OrganizationId, int]:
-    """
-    Fetches for each org the total root transaction count.
-    """
-    query_interval = timedelta(hours=window_size)
-    granularity = Granularity(3600)
-
-    count_per_root_metric_id = indexer.resolve_shared_org(
-        str(TransactionMRI.COUNT_PER_ROOT_PROJECT.value)
-    )
-    where = [
-        Condition(Column("timestamp"), Op.GTE, datetime.utcnow() - query_interval),
-        Condition(Column("timestamp"), Op.LT, datetime.utcnow()),
-        Condition(Column("metric_id"), Op.EQ, count_per_root_metric_id),
-        Condition(Column("org_id"), Op.IN, list(org_ids)),
-    ]
-
-    start_time = time.time()
-    offset = 0
-    aggregated_projects = {}
-    while (time.time() - start_time) < MAX_SECONDS:
-        query = (
-            Query(
-                match=Entity(EntityKey.GenericOrgMetricsCounters.value),
-                select=[
-                    Function("sum", [Column("value")], "root_count_value"),
-                    Column("org_id"),
-                ],
-                where=where,
-                groupby=[Column("org_id")],
-                orderby=[
-                    OrderBy(Column("org_id"), Direction.ASC),
-                ],
-                granularity=granularity,
-            )
-            .set_limit(CHUNK_SIZE + 1)
-            .set_offset(offset)
-        )
-
-        request = Request(
-            dataset=Dataset.PerformanceMetrics.value, app_id="dynamic_sampling", query=query
-        )
-
-        data = raw_snql_query(
-            request,
-            referrer=Referrer.DYNAMIC_SAMPLING_DISTRIBUTION_FETCH_ORGS_WITH_COUNT_PER_ROOT.value,
-        )["data"]
-
-        count = len(data)
-        more_results = count > CHUNK_SIZE
-        offset += CHUNK_SIZE
-
-        if more_results:
-            data = data[:-1]
-
-        for row in data:
-            aggregated_projects[row["org_id"]] = row["root_count_value"]
-
-        if not more_results:
-            break
-    else:
-        logger.error(
-            f"Fetching the transaction root count of multiple orgs took more than {MAX_SECONDS} seconds.",
-            extra={"offset": offset},
-        )
-
-    return aggregated_projects

+ 0 - 158
src/sentry/dynamic_sampling/snuba_utils.py

@@ -1,158 +0,0 @@
-import logging
-import time
-from datetime import datetime, timedelta
-from typing import Generator, List, Tuple
-
-from snuba_sdk import (
-    Column,
-    Condition,
-    Direction,
-    Entity,
-    Function,
-    Granularity,
-    Op,
-    OrderBy,
-    Query,
-    Request,
-)
-
-from sentry.sentry_metrics import indexer
-from sentry.snuba.dataset import Dataset, EntityKey
-from sentry.snuba.metrics.naming_layer.mri import TransactionMRI
-from sentry.snuba.referrer import Referrer
-from sentry.utils.snuba import raw_snql_query
-
-logger = logging.getLogger(__name__)
-MAX_SECONDS = 60
-CHUNK_SIZE = 9998  # Snuba's limit is 10000 and we fetch CHUNK_SIZE+1
-MAX_ORGS_PER_QUERY = 100
-MAX_PROJECTS_PER_QUERY = 5000
-MAX_TRANSACTIONS_PER_PROJECT = 20
-
-
-def get_orgs_with_project_counts_without_modulo(
-    max_orgs: int, max_projects: int
-) -> Generator[List[int], None, None]:
-    """
-    Fetch organisations in batches.
-    A batch will return at max max_orgs elements
-    It will accumulate org ids in the list until either it accumulates max_orgs or the
-    number of projects in the already accumulated orgs is more than max_projects or there
-    are no more orgs
-    """
-    start_time = time.time()
-    metric_id = indexer.resolve_shared_org(str(TransactionMRI.COUNT_PER_ROOT_PROJECT.value))
-    offset = 0
-    last_result: List[Tuple[int, int]] = []
-    while (time.time() - start_time) < MAX_SECONDS:
-        query = (
-            Query(
-                match=Entity(EntityKey.GenericOrgMetricsCounters.value),
-                select=[
-                    Function("uniq", [Column("project_id")], "num_projects"),
-                    Column("org_id"),
-                ],
-                groupby=[
-                    Column("org_id"),
-                ],
-                where=[
-                    Condition(Column("timestamp"), Op.GTE, datetime.utcnow() - timedelta(hours=1)),
-                    Condition(Column("timestamp"), Op.LT, datetime.utcnow()),
-                    Condition(Column("metric_id"), Op.EQ, metric_id),
-                ],
-                granularity=Granularity(3600),
-                orderby=[
-                    OrderBy(Column("org_id"), Direction.ASC),
-                ],
-            )
-            .set_limit(CHUNK_SIZE + 1)
-            .set_offset(offset)
-        )
-        request = Request(
-            dataset=Dataset.PerformanceMetrics.value, app_id="dynamic_sampling", query=query
-        )
-        data = raw_snql_query(
-            request,
-            referrer=Referrer.DYNAMIC_SAMPLING_COUNTERS_FETCH_PROJECTS_WITH_COUNT_PER_TRANSACTION.value,
-        )["data"]
-        count = len(data)
-        more_results = count > CHUNK_SIZE
-        offset += CHUNK_SIZE
-        if more_results:
-            data = data[:-1]
-        for row in data:
-            last_result.append((row["org_id"], row["num_projects"]))
-
-        first_idx = 0
-        count_projects = 0
-        for idx, (org_id, num_projects) in enumerate(last_result):
-            count_projects += num_projects
-            if idx - first_idx >= max_orgs - 1 or count_projects >= max_projects:
-                # we got to the number of elements desired
-                yield [o for o, _ in last_result[first_idx : idx + 1]]
-                first_idx = idx + 1
-                count_projects = 0
-
-        # keep what is left unused from last_result for the next iteration or final result
-        last_result = last_result[first_idx:]
-        if not more_results:
-            break
-    if len(last_result) > 0:
-        yield [org_id for org_id, _ in last_result]
-
-
-def get_active_orgs(max_orgs: int, time_interval: timedelta) -> Generator[List[int], None, None]:
-    """
-    Fetch organisations in batches.
-    A batch will return at max max_orgs elements
-    """
-    start_time = time.time()
-    metric_id = indexer.resolve_shared_org(str(TransactionMRI.COUNT_PER_ROOT_PROJECT.value))
-    offset = 0
-
-    while (time.time() - start_time) < MAX_SECONDS:
-        query = (
-            Query(
-                match=Entity(EntityKey.GenericOrgMetricsCounters.value),
-                select=[
-                    Function("uniq", [Column("project_id")], "num_projects"),
-                    Column("org_id"),
-                ],
-                groupby=[
-                    Column("org_id"),
-                ],
-                where=[
-                    Condition(Column("timestamp"), Op.GTE, datetime.utcnow() - time_interval),
-                    Condition(Column("timestamp"), Op.LT, datetime.utcnow()),
-                    Condition(Column("metric_id"), Op.EQ, metric_id),
-                ],
-                granularity=Granularity(60),
-                orderby=[
-                    OrderBy(Column("org_id"), Direction.ASC),
-                ],
-            )
-            .set_limit(max_orgs + 1)
-            .set_offset(offset)
-        )
-        request = Request(
-            dataset=Dataset.PerformanceMetrics.value, app_id="dynamic_sampling", query=query
-        )
-        data = raw_snql_query(
-            request,
-            referrer=Referrer.DYNAMIC_SAMPLING_COUNTERS_GET_ACTIVE_ORGS.value,
-        )["data"]
-        count = len(data)
-        more_results = count > max_orgs
-        offset += max_orgs
-        if more_results:
-            data = data[:-1]
-
-        ret_val = []
-
-        for row in data:
-            ret_val.append(row["org_id"])
-
-        yield ret_val
-
-        if not more_results:
-            return

+ 0 - 695
src/sentry/dynamic_sampling/tasks.py

@@ -1,695 +0,0 @@
-import logging
-import math
-from datetime import timedelta
-from typing import Dict, Optional, Sequence, Tuple
-
-import sentry_sdk
-
-from sentry import options, quotas
-from sentry.dynamic_sampling.models.base import ModelType
-from sentry.dynamic_sampling.models.common import RebalancedItem, guarded_run
-from sentry.dynamic_sampling.models.factory import model_factory
-from sentry.dynamic_sampling.models.projects_rebalancing import ProjectsRebalancingInput
-from sentry.dynamic_sampling.models.transactions_rebalancing import TransactionsRebalancingInput
-from sentry.dynamic_sampling.prioritise_projects import fetch_projects_with_total_volumes
-from sentry.dynamic_sampling.prioritise_transactions import (
-    ProjectTransactions,
-    fetch_project_transaction_totals,
-    fetch_transactions_with_total_volumes,
-    get_orgs_with_project_counts,
-    transactions_zip,
-)
-from sentry.dynamic_sampling.recalibrate_transactions import (
-    OrganizationDataVolume,
-    fetch_org_volumes,
-)
-from sentry.dynamic_sampling.rules.base import (
-    is_sliding_window_enabled,
-    is_sliding_window_org_enabled,
-)
-from sentry.dynamic_sampling.rules.helpers.prioritise_project import (
-    generate_prioritise_by_project_cache_key,
-    get_prioritise_by_project_sample_rate,
-)
-from sentry.dynamic_sampling.rules.helpers.prioritize_transactions import (
-    set_transactions_resampling_rates,
-)
-from sentry.dynamic_sampling.rules.helpers.sliding_window import (
-    SLIDING_WINDOW_CALCULATION_ERROR,
-    extrapolate_monthly_volume,
-    generate_sliding_window_cache_key,
-    generate_sliding_window_org_cache_key,
-    get_sliding_window_org_sample_rate,
-    get_sliding_window_sample_rate,
-    get_sliding_window_size,
-    mark_sliding_window_executed,
-    mark_sliding_window_org_executed,
-)
-from sentry.dynamic_sampling.rules.utils import (
-    DecisionDropCount,
-    DecisionKeepCount,
-    OrganizationId,
-    ProjectId,
-    adjusted_factor,
-    generate_cache_key_rebalance_factor,
-    get_redis_client_for_ds,
-)
-from sentry.dynamic_sampling.sliding_window import (
-    fetch_orgs_with_total_root_transactions_count,
-    fetch_projects_with_total_root_transactions_count,
-)
-from sentry.dynamic_sampling.snuba_utils import (
-    get_active_orgs,
-    get_orgs_with_project_counts_without_modulo,
-)
-from sentry.models import Organization, Project
-from sentry.tasks.base import instrumented_task
-from sentry.tasks.relay import schedule_invalidate_project_config
-from sentry.utils import metrics
-
-CACHE_KEY_TTL = 24 * 60 * 60 * 1000  # in milliseconds
-
-MAX_ORGS_PER_QUERY = 100
-MAX_PROJECTS_PER_QUERY = 5000
-MAX_TRANSACTIONS_PER_PROJECT = 20
-
-# MIN and MAX rebalance factor ( make sure we don't go crazy when rebalancing)
-MIN_REBALANCE_FACTOR = 0.1
-MAX_REBALANCE_FACTOR = 10
-
-logger = logging.getLogger(__name__)
-
-
-@instrumented_task(
-    name="sentry.dynamic_sampling.tasks.recalibrate_orgs",
-    queue="dynamicsampling",
-    default_retry_delay=5,
-    max_retries=5,
-    soft_time_limit=2 * 60 * 60,  # 2hours
-    time_limit=2 * 60 * 60 + 5,
-)
-def recalibrate_orgs() -> None:
-    query_interval = timedelta(minutes=5)
-    metrics.incr("sentry.tasks.dynamic_sampling.recalibrate_orgs.start", sample_rate=1.0)
-
-    # use a dict instead of a list to easily pass it to the logger in the extra field
-    errors: Dict[str, str] = {}
-
-    with metrics.timer("sentry.tasks.dynamic_sampling.recalibrate_orgs", sample_rate=1.0):
-        for orgs in get_active_orgs(1000, query_interval):
-            for org_volume in fetch_org_volumes(orgs, query_interval):
-                error = rebalance_org(org_volume)
-                if error and len(errors) < 100:
-                    error_message = f"organisation:{org_volume.org_id} with {org_volume.total} transactions from which {org_volume.indexed} indexed, generated error:{error}"
-                    errors[str(org_volume.org_id)] = error_message
-
-        if errors:
-            logger.info("Dynamic sampling organization recalibration failed", extra=errors)
-
-
-def rebalance_org(org_volume: OrganizationDataVolume) -> Optional[str]:
-    """
-    Calculates the rebalancing factor for an org
-
-    It takes the last interval total number of transactions and kept transactions, and
-    it figures out how far it is from the desired rate ( i.e. the blended rate)
-    """
-
-    redis_client = get_redis_client_for_ds()
-    factor_key = generate_cache_key_rebalance_factor(org_volume.org_id)
-
-    desired_sample_rate = quotas.get_blended_sample_rate(organization_id=org_volume.org_id)
-    if desired_sample_rate is None:
-        return f"Organisation with desired_sample_rate==None org_id={org_volume.org_id}"
-
-    if org_volume.total == 0 or org_volume.indexed == 0:
-        # not enough info to make adjustments ( we don't consider this an error)
-        return None
-
-    previous_interval_sample_rate = org_volume.indexed / org_volume.total
-    try:
-        previous_factor = float(redis_client.get(factor_key))
-    except (TypeError, ValueError):
-        previous_factor = 1.0
-
-    new_factor = adjusted_factor(
-        previous_factor, previous_interval_sample_rate, desired_sample_rate
-    )
-
-    if new_factor < MIN_REBALANCE_FACTOR or new_factor > MAX_REBALANCE_FACTOR:
-        # whatever we did before didn't help, give up
-        redis_client.delete(factor_key)
-        return f"factor:{new_factor} outside of the acceptable range [{MIN_REBALANCE_FACTOR}..{MAX_REBALANCE_FACTOR}]"
-
-    if new_factor != 1.0:
-        # Finally got a good key, save it to be used in rule generation
-        redis_client.set(factor_key, new_factor)
-    else:
-        # we are either at 1.0 no point creating an adjustment rule
-        redis_client.delete(factor_key)
-    return None
-
-
-@instrumented_task(
-    name="sentry.dynamic_sampling.tasks.prioritise_transactions",
-    queue="dynamicsampling",
-    default_retry_delay=5,
-    max_retries=5,
-    soft_time_limit=2 * 60 * 60,  # 2hours
-    time_limit=2 * 60 * 60 + 5,
-)
-def prioritise_transactions() -> None:
-    """
-    A task that retrieves all relative transaction counts from all projects in all orgs
-    and invokes a task for rebalancing transaction sampling rates within each project
-    """
-    metrics.incr("sentry.tasks.dynamic_sampling.prioritise_transactions.start", sample_rate=1.0)
-
-    num_big_trans = int(
-        options.get("dynamic-sampling.prioritise_transactions.num_explicit_large_transactions")
-    )
-    num_small_trans = int(
-        options.get("dynamic-sampling.prioritise_transactions.num_explicit_small_transactions")
-    )
-
-    with metrics.timer("sentry.tasks.dynamic_sampling.prioritise_transactions", sample_rate=1.0):
-        for orgs in get_orgs_with_project_counts(MAX_ORGS_PER_QUERY, MAX_PROJECTS_PER_QUERY):
-            # get the low and high transactions
-            for project_transactions in transactions_zip(
-                fetch_project_transaction_totals(orgs),
-                fetch_transactions_with_total_volumes(
-                    orgs,
-                    large_transactions=True,
-                    max_transactions=num_big_trans,
-                ),
-                fetch_transactions_with_total_volumes(
-                    orgs,
-                    large_transactions=False,
-                    max_transactions=num_small_trans,
-                ),
-            ):
-                process_transaction_biases.delay(project_transactions)
-
-
-@instrumented_task(
-    name="sentry.dynamic_sampling.process_transaction_biases",
-    queue="dynamicsampling",
-    default_retry_delay=5,
-    max_retries=5,
-    soft_time_limit=25 * 60,  # 25 mins
-    time_limit=2 * 60 + 5,
-)
-def process_transaction_biases(project_transactions: ProjectTransactions) -> None:
-    """
-    A task that given a project relative transaction counts calculates rebalancing
-    sampling rates based on the overall desired project sampling rate.
-    """
-    org_id = project_transactions["org_id"]
-    project_id = project_transactions["project_id"]
-    total_num_transactions = project_transactions.get("total_num_transactions")
-    total_num_classes = project_transactions.get("total_num_classes")
-    transactions = [
-        RebalancedItem(id=id, count=count)
-        for id, count in project_transactions["transaction_counts"]
-    ]
-
-    try:
-        organization = Organization.objects.get_from_cache(id=org_id)
-    except Organization.DoesNotExist:
-        organization = None
-
-    # By default, this bias uses the blended sample rate.
-    sample_rate = quotas.get_blended_sample_rate(organization_id=org_id)
-
-    # In case we have specific feature flags enabled, we will change the sample rate either basing ourselves
-    # on sliding window per project or per org.
-    if organization is not None and is_sliding_window_enabled(organization):
-        sample_rate = get_sliding_window_sample_rate(
-            org_id=org_id, project_id=project_id, error_sample_rate_fallback=sample_rate
-        )
-        log_sample_rate_source(
-            org_id, project_id, "prioritise_by_transaction", "sliding_window", sample_rate
-        )
-    elif organization is not None and is_sliding_window_org_enabled(organization):
-        sample_rate = get_prioritise_by_project_sample_rate(
-            org_id=org_id, project_id=project_id, error_sample_rate_fallback=sample_rate
-        )
-        log_sample_rate_source(
-            org_id, project_id, "prioritise_by_transaction", "prioritise_by_project", sample_rate
-        )
-    else:
-        log_sample_rate_source(
-            org_id, project_id, "prioritise_by_transaction", "blended_sample_rate", sample_rate
-        )
-
-    if sample_rate is None or sample_rate == 1.0:
-        # no sampling => no rebalancing
-        return
-
-    intensity = options.get("dynamic-sampling.prioritise_transactions.rebalance_intensity", 1.0)
-
-    model = model_factory(ModelType.TRANSACTIONS_REBALANCING)
-    rebalanced_transactions = guarded_run(
-        model,
-        TransactionsRebalancingInput(
-            classes=transactions,
-            sample_rate=sample_rate,
-            total_num_classes=total_num_classes,
-            total=total_num_transactions,
-            intensity=intensity,
-        ),
-    )
-    # In case the result of the model is None, it means that an error occurred, thus we want to early return.
-    if rebalanced_transactions is None:
-        return
-
-    # Only after checking the nullability of rebalanced_transactions, we want to unpack the tuple.
-    named_rates, implicit_rate = rebalanced_transactions
-    set_transactions_resampling_rates(
-        org_id=org_id,
-        proj_id=project_id,
-        named_rates=named_rates,
-        default_rate=implicit_rate,
-        ttl_ms=CACHE_KEY_TTL,
-    )
-
-    schedule_invalidate_project_config(
-        project_id=project_id, trigger="dynamic_sampling_prioritise_transaction_bias"
-    )
-
-
-@instrumented_task(
-    name="sentry.dynamic_sampling.tasks.prioritise_projects",
-    queue="dynamicsampling",
-    default_retry_delay=5,
-    max_retries=5,
-    soft_time_limit=2 * 60 * 60,  # 2hours
-    time_limit=2 * 60 * 60 + 5,
-)
-def prioritise_projects() -> None:
-    metrics.incr("sentry.tasks.dynamic_sampling.prioritise_projects.start", sample_rate=1.0)
-    with metrics.timer("sentry.tasks.dynamic_sampling.prioritise_projects", sample_rate=1.0):
-        for orgs in get_orgs_with_project_counts_without_modulo(
-            MAX_ORGS_PER_QUERY, MAX_PROJECTS_PER_QUERY
-        ):
-            for org_id, projects_with_tx_count_and_rates in fetch_projects_with_total_volumes(
-                org_ids=orgs
-            ).items():
-                process_projects_sample_rates.delay(org_id, projects_with_tx_count_and_rates)
-
-
-@instrumented_task(
-    name="sentry.dynamic_sampling.process_projects_sample_rates",
-    queue="dynamicsampling",
-    default_retry_delay=5,
-    max_retries=5,
-    soft_time_limit=25 * 60,  # 25 mins
-    time_limit=2 * 60 + 5,
-)
-def process_projects_sample_rates(
-    org_id: OrganizationId,
-    projects_with_tx_count_and_rates: Sequence[
-        Tuple[ProjectId, int, DecisionKeepCount, DecisionDropCount]
-    ],
-) -> None:
-    """
-    Takes a single org id and a list of project ids
-    """
-    with metrics.timer("sentry.tasks.dynamic_sampling.process_projects_sample_rates.core"):
-        adjust_sample_rates(org_id, projects_with_tx_count_and_rates)
-
-
-def adjust_sample_rates(
-    org_id: int,
-    projects_with_tx_count: Sequence[Tuple[ProjectId, int, DecisionKeepCount, DecisionDropCount]],
-) -> None:
-    """
-    This function apply model and adjust sample rate per project in org
-    and store it in DS redis cluster, then we invalidate project config
-    so relay can reread it, and we'll inject it from redis cache.
-    """
-    try:
-        # We need the organization object for the feature flag.
-        organization = Organization.objects.get_from_cache(id=org_id)
-    except Organization.DoesNotExist:
-        # In case an org is not found, it might be that it has been deleted in the time between
-        # the query triggering this job and the actual execution of the job.
-        organization = None
-
-    # We get the sample rate either directly from quotas or from the new sliding window org mechanism.
-    if organization is not None and is_sliding_window_org_enabled(organization):
-        sample_rate = get_adjusted_base_rate_from_cache_or_compute(org_id)
-        log_sample_rate_source(
-            org_id, None, "prioritise_by_project", "sliding_window_org", sample_rate
-        )
-    else:
-        sample_rate = quotas.get_blended_sample_rate(organization_id=org_id)
-        log_sample_rate_source(
-            org_id, None, "prioritise_by_project", "blended_sample_rate", sample_rate
-        )
-
-    # If we didn't find any sample rate, it doesn't make sense to run the adjustment model.
-    if sample_rate is None:
-        return
-
-    projects_with_counts = {
-        project_id: count_per_root for project_id, count_per_root, _, _ in projects_with_tx_count
-    }
-    # Since we don't mind about strong consistency, we query a replica of the main database with the possibility of
-    # having out of date information. This is a trade-off we accept, since we work under the assumption that eventually
-    # the projects of an org will be replicated consistently across replicas, because no org should continue to create
-    # new projects.
-    all_projects_ids = (
-        Project.objects.using_replica()
-        .filter(organization=organization)
-        .values_list("id", flat=True)
-    )
-    for project_id in all_projects_ids:
-        # In case a specific project has not been considered in the count query, it means that no metrics were extracted
-        # for it, thus we consider it as having 0 transactions for the query's time window.
-        if project_id not in projects_with_counts:
-            projects_with_counts[project_id] = 0
-
-    projects = []
-    for project_id, count_per_root in projects_with_counts.items():
-        projects.append(
-            RebalancedItem(
-                id=project_id,
-                count=count_per_root,
-            )
-        )
-
-    model = model_factory(ModelType.PROJECTS_REBALANCING)
-    rebalanced_projects = guarded_run(
-        model, ProjectsRebalancingInput(classes=projects, sample_rate=sample_rate)
-    )
-    # In case the result of the model is None, it means that an error occurred, thus we want to early return.
-    if rebalanced_projects is None:
-        return
-
-    redis_client = get_redis_client_for_ds()
-    with redis_client.pipeline(transaction=False) as pipeline:
-        for rebalanced_project in rebalanced_projects:
-            cache_key = generate_prioritise_by_project_cache_key(org_id=org_id)
-            # We want to get the old sample rate, which will be None in case it was not set.
-            old_sample_rate = sample_rate_to_float(
-                redis_client.hget(cache_key, rebalanced_project.id)
-            )
-
-            # We want to store the new sample rate as a string.
-            pipeline.hset(
-                cache_key,
-                rebalanced_project.id,
-                rebalanced_project.new_sample_rate,  # redis stores is as string
-            )
-            pipeline.pexpire(cache_key, CACHE_KEY_TTL)
-
-            # We invalidate the caches only if there was a change in the sample rate. This is to avoid flooding the
-            # system with project config invalidations, especially for projects with no volume.
-            if not are_equal_with_epsilon(old_sample_rate, rebalanced_project.new_sample_rate):
-                schedule_invalidate_project_config(
-                    project_id=rebalanced_project.id,
-                    trigger="dynamic_sampling_prioritise_project_bias",
-                )
-
-        pipeline.execute()
-
-
-def get_adjusted_base_rate_from_cache_or_compute(org_id: int) -> Optional[float]:
-    """
-    Gets the adjusted base sample rate from the sliding window directly from the Redis cache or tries to compute
-    it synchronously.
-    """
-    # We first try to get from cache the sliding window org sample rate.
-    sample_rate = get_sliding_window_org_sample_rate(org_id)
-    if sample_rate is not None:
-        return sample_rate
-
-    # In case we didn't find the value in cache, we want to compute it synchronously.
-    window_size = get_sliding_window_size()
-    # In case the size is None it means that we disabled the sliding window entirely.
-    if window_size is not None:
-        # We want to synchronously fetch the orgs and compute the sliding window org sample rate.
-        orgs_with_counts = fetch_orgs_with_total_root_transactions_count(
-            org_ids=[org_id], window_size=window_size
-        )
-        if (org_total_root_count := orgs_with_counts.get(org_id)) is not None:
-            return compute_guarded_sliding_window_sample_rate(
-                org_id, None, org_total_root_count, window_size
-            )
-
-    return None
-
-
-@instrumented_task(
-    name="sentry.dynamic_sampling.tasks.sliding_window",
-    queue="dynamicsampling",
-    default_retry_delay=5,
-    max_retries=5,
-    soft_time_limit=2 * 60 * 60,  # 2 hours
-    time_limit=2 * 60 * 60 + 5,
-)
-def sliding_window() -> None:
-    window_size = get_sliding_window_size()
-    # In case the size is None it means that we disabled the sliding window entirely.
-    if window_size is not None:
-        metrics.incr("sentry.dynamic_sampling.tasks.sliding_window.start", sample_rate=1.0)
-        with metrics.timer("sentry.dynamic_sampling.tasks.sliding_window", sample_rate=1.0):
-            # It is important to note that this query will return orgs that in the last hour have had at least 1
-            # transaction.
-            for orgs in get_orgs_with_project_counts_without_modulo(
-                MAX_ORGS_PER_QUERY, MAX_PROJECTS_PER_QUERY
-            ):
-                # This query on the other hand, fetches with a dynamic window size because we care about being able
-                # to extrapolate monthly volume with a bigger window than the hour used in the orgs query. Thus, it can
-                # be that an org is not detected because it didn't have traffic for this hour but its projects have
-                # traffic in the last window_size, however this isn't a big deal since we cache the sample rate and if
-                # not found we fall back to 100% (only if the sliding window has run).
-                for (
-                    org_id,
-                    projects_with_total_root_count,
-                ) in fetch_projects_with_total_root_transactions_count(
-                    org_ids=orgs, window_size=window_size
-                ).items():
-                    with metrics.timer(
-                        "sentry.dynamic_sampling.tasks.sliding_window.adjust_base_sample_rate_per_project"
-                    ):
-                        adjust_base_sample_rate_per_project(
-                            org_id, projects_with_total_root_count, window_size
-                        )
-
-            # Due to the synchronous nature of the sliding window, when we arrived here, we can confidently say that the
-            # execution of the sliding window was successful. We will keep this state for 1 hour.
-            mark_sliding_window_executed()
-
-
-def adjust_base_sample_rate_per_project(
-    org_id: int, projects_with_total_root_count: Sequence[Tuple[ProjectId, int]], window_size: int
-) -> None:
-    """
-    Adjusts the base sample rate per project by computing the sliding window sample rate, considering the total
-    volume of root transactions started from each project in the org.
-    """
-    projects_with_rebalanced_sample_rate = []
-
-    for project_id, total_root_count in projects_with_total_root_count:
-        sample_rate = compute_guarded_sliding_window_sample_rate(
-            org_id, project_id, total_root_count, window_size
-        )
-
-        # If the sample rate is None, we want to add a sentinel value into Redis, the goal being that when generating
-        # rules we can distinguish between:
-        # 1. Value in the cache
-        # 2. No value in the cache
-        # 3. Error happened
-        projects_with_rebalanced_sample_rate.append(
-            (
-                project_id,
-                str(sample_rate) if sample_rate is not None else SLIDING_WINDOW_CALCULATION_ERROR,
-            )
-        )
-
-    redis_client = get_redis_client_for_ds()
-    cache_key = generate_sliding_window_cache_key(org_id=org_id)
-
-    # We want to get all the old sample rates in memory because we will remove the entire hash in the next step.
-    old_sample_rates = redis_client.hgetall(cache_key)
-
-    # For efficiency reasons, we start a pipeline that will apply a set of operations without multiple round trips.
-    with redis_client.pipeline(transaction=False) as pipeline:
-        # We want to delete the Redis hash before adding new sample rate since we don't back-fill projects that have no
-        # root count metrics in the considered window.
-        pipeline.delete(cache_key)
-
-        # For each project we want to now save the new sample rate.
-        for project_id, sample_rate in projects_with_rebalanced_sample_rate:  # type:ignore
-            # We store the new updated sample rate.
-            pipeline.hset(cache_key, project_id, sample_rate)
-            pipeline.pexpire(cache_key, CACHE_KEY_TTL)
-
-            # We want to get the old sample rate, which will be None in case it was not set.
-            old_sample_rate = sample_rate_to_float(old_sample_rates.get(str(project_id), ""))
-            # We also get the new sample rate, which will be None in case we stored a SLIDING_WINDOW_CALCULATION_ERROR.
-            sample_rate = sample_rate_to_float(sample_rate)  # type:ignore
-            # We invalidate the caches only if there was a change in the sample rate. This is to avoid flooding the
-            # system with project config invalidations.
-            if not are_equal_with_epsilon(old_sample_rate, sample_rate):
-                schedule_invalidate_project_config(
-                    project_id=project_id, trigger="dynamic_sampling_sliding_window"
-                )
-
-        pipeline.execute()
-
-
-@instrumented_task(
-    name="sentry.dynamic_sampling.tasks.sliding_window_org",
-    queue="dynamicsampling",
-    default_retry_delay=5,
-    max_retries=5,
-    soft_time_limit=2 * 60 * 60,  # 2 hours
-    time_limit=2 * 60 * 60 + 5,
-)
-def sliding_window_org() -> None:
-    window_size = get_sliding_window_size()
-    # In case the size is None it means that we disabled the sliding window entirely.
-    if window_size is not None:
-        metrics.incr("sentry.dynamic_sampling.tasks.sliding_window_org.start", sample_rate=1.0)
-        with metrics.timer("sentry.dynamic_sampling.tasks.sliding_window_org", sample_rate=1.0):
-            for orgs in get_orgs_with_project_counts_without_modulo(
-                MAX_ORGS_PER_QUERY, MAX_PROJECTS_PER_QUERY
-            ):
-                for (org_id, total_root_count,) in fetch_orgs_with_total_root_transactions_count(
-                    org_ids=orgs, window_size=window_size
-                ).items():
-                    adjust_base_sample_rate_per_org(org_id, total_root_count, window_size)
-
-            # Due to the synchronous nature of the sliding window org, when we arrived here, we can confidently say
-            # that the execution of the sliding window org was successful. We will keep this state for 1 hour.
-            mark_sliding_window_org_executed()
-
-
-def adjust_base_sample_rate_per_org(org_id: int, total_root_count: int, window_size: int) -> None:
-    """
-    Adjusts the base sample rate per org by considering its volume and how it fits w.r.t. to the sampling tiers.
-    """
-    sample_rate = compute_guarded_sliding_window_sample_rate(
-        org_id, None, total_root_count, window_size
-    )
-    # If the sample rate is None, we don't want to store a value into Redis, but we prefer to keep the system
-    # with the old value.
-    if sample_rate is None:
-        return
-
-    redis_client = get_redis_client_for_ds()
-    with redis_client.pipeline(transaction=False) as pipeline:
-        cache_key = generate_sliding_window_org_cache_key(org_id=org_id)
-        pipeline.set(cache_key, sample_rate)
-        pipeline.pexpire(cache_key, CACHE_KEY_TTL)
-        pipeline.execute()
-
-
-def compute_guarded_sliding_window_sample_rate(
-    org_id: int, project_id: Optional[int], total_root_count: int, window_size: int
-) -> Optional[float]:
-    try:
-        # We want to compute the sliding window sample rate by considering a window of time.
-        # This piece of code is very delicate, thus we want to guard it properly and capture any errors.
-        return compute_sliding_window_sample_rate(org_id, project_id, total_root_count, window_size)
-    except Exception as e:
-        sentry_sdk.capture_exception(e)
-        return None
-
-
-def compute_sliding_window_sample_rate(
-    org_id: int, project_id: Optional[int], total_root_count: int, window_size: int
-) -> Optional[float]:
-    """
-    Computes the actual sample rate for the sliding window given the total root count and the size of the
-    window that was used for computing the root count.
-
-    The org_id is used only because it is required on the quotas side to determine whether dynamic sampling is
-    enabled in the first place for that project.
-    """
-    extrapolated_volume = extrapolate_monthly_volume(volume=total_root_count, hours=window_size)
-    if extrapolated_volume is None:
-        with sentry_sdk.push_scope() as scope:
-            scope.set_extra("org_id", org_id)
-            scope.set_extra("window_size", window_size)
-            sentry_sdk.capture_message("The volume of the current month can't be extrapolated.")
-
-        return None
-
-    # We want to log the monthly volume for observability purposes.
-    log_extrapolated_monthly_volume(
-        org_id, project_id, total_root_count, extrapolated_volume, window_size
-    )
-
-    sampling_tier = quotas.get_transaction_sampling_tier_for_volume(org_id, extrapolated_volume)
-    if sampling_tier is None:
-        return None
-
-    # We unpack the tuple containing the sampling tier information in the form (volume, sample_rate). This is done
-    # under the assumption that the sampling_tier tuple contains both non-null values.
-    _, sample_rate = sampling_tier
-
-    # We assume that the sample_rate is a float.
-    return float(sample_rate)
-
-
-def log_extrapolated_monthly_volume(
-    org_id: int, project_id: Optional[int], volume: int, extrapolated_volume: int, window_size: int
-) -> None:
-    extra = {
-        "org_id": org_id,
-        "volume": volume,
-        "extrapolated_monthly_volume": extrapolated_volume,
-        "window_size_in_hours": window_size,
-    }
-
-    if project_id is not None:
-        extra["project_id"] = project_id
-
-    logger.info(
-        "compute_sliding_window_sample_rate.extrapolate_monthly_volume",
-        extra=extra,
-    )
-
-
-def log_sample_rate_source(
-    org_id: int, project_id: Optional[int], used_for: str, source: str, sample_rate: Optional[float]
-) -> None:
-    extra = {"org_id": org_id, "sample_rate": sample_rate, "source": source, "used_for": used_for}
-
-    if project_id is not None:
-        extra["project_id"] = project_id
-
-    logger.info(
-        "dynamic_sampling.sample_rate_source",
-        extra=extra,
-    )
-
-
-def sample_rate_to_float(sample_rate: Optional[str]) -> Optional[float]:
-    """
-    Converts a sample rate to a float or returns None in case the conversion failed.
-    """
-    if sample_rate is None:
-        return None
-
-    try:
-        return float(sample_rate)
-    except (TypeError, ValueError):
-        return None
-
-
-def are_equal_with_epsilon(a: Optional[float], b: Optional[float]) -> bool:
-    """
-    Checks if two floating point numbers are equal within an error boundary.
-    """
-    if a is None and b is None:
-        return True
-
-    if a is None or b is None:
-        return False
-
-    return math.isclose(a, b)

Some files were not shown because too many files changed in this diff