Browse Source

ref(metrics-indexer): Implement cardinality limiter, take 2 [sns-1651] (#38795)

Markus Unterwaditzer 2 years ago
parent
commit
31f78c6d15

+ 273 - 21
src/sentry/ratelimits/cardinality.py

@@ -1,6 +1,8 @@
 import time
-from typing import Any, Collection, NamedTuple, Optional, Sequence, Tuple
+from collections import defaultdict
+from typing import Collection, Iterator, List, NamedTuple, Optional, Sequence, Tuple
 
+from sentry.utils import redis
 from sentry.utils.services import Service
 
 Hash = int
@@ -28,6 +30,30 @@ class Quota(NamedTuple):
     def __post__init__(self) -> None:
         assert self.window_seconds % self.granularity_seconds == 0
 
+    def iter_window(self, request_timestamp: int) -> Iterator[int]:
+        """
+        Iterate over the quota's window, yielding values representing each
+        (absolute) granule.
+
+        This function is used to calculate keys for storing the number of
+        requests made in each granule.
+
+        The iteration is done in reverse-order (newest timestamp to oldest),
+        starting with the key to which a currently-processed request should be
+        added. That request's timestamp is `request_timestamp`.
+
+        * `request_timestamp / self.granularity_seconds - 1`
+        * `request_timestamp / self.granularity_seconds - 2`
+        * `request_timestamp / self.granularity_seconds - 3`
+        * ...
+        """
+        value = request_timestamp // self.granularity_seconds
+
+        for granule_i in range(self.window_seconds // self.granularity_seconds):
+            value -= 1
+            assert value >= 0, value
+            yield value
+
 
 class RequestedQuota(NamedTuple):
     # A string that all redis state is prefixed with. For example
@@ -44,9 +70,9 @@ class RequestedQuota(NamedTuple):
     # ...though you can probably omit org_id if it is already in the prefix.
     unit_hashes: Collection[Hash]
 
-    # Which quotas to check against. The number of not-yet-seen hashes must
-    # "fit" into all quotas.
-    quotas: Sequence[Quota]
+    # Which quota to check against. The number of not-yet-seen hashes must
+    # "fit" into `Quota.limit`.
+    quota: Quota
 
 
 class GrantedQuota(NamedTuple):
@@ -58,7 +84,7 @@ class GrantedQuota(NamedTuple):
 
     # If len(granted_unit_hashes) < len(RequestedQuota.unit_hashes), this
     # contains the quotas that were reached.
-    reached_quotas: Sequence[Quota]
+    reached_quota: Optional[Quota]
 
 
 class CardinalityLimiter(Service):
@@ -97,19 +123,8 @@ class CardinalityLimiter(Service):
             However, consistently providing old timestamps here will work
             correctly.
         """
-        if timestamp is None:
-            timestamp = int(time.time())
-        else:
-            timestamp = int(timestamp)
 
-        grants = [
-            GrantedQuota(
-                request=request, granted_unit_hashes=request.unit_hashes, reached_quotas=[]
-            )
-            for request in requests
-        ]
-
-        return timestamp, grants
+        raise NotImplementedError()
 
     def use_quotas(
         self,
@@ -125,10 +140,247 @@ class CardinalityLimiter(Service):
         :param grants: The return value of `check_within_quotas` which
             indicates how much quota should actually be consumed.
         """
-        pass
+
+        # TODO: we may want to revisit this abstraction once we move the call
+        # to `use_quotas` in the string indexer to a separate thread.
+        # particularly we are considering to write more recent time windows
+        # synchronously while writing older ones on a separate thread.
+        raise NotImplementedError()
 
 
 class RedisCardinalityLimiter(CardinalityLimiter):
-    # TODO: implement everything
-    def __init__(self, **options: Any) -> None:
-        pass
+    """
+    The Redis cardinality limiter stores a key per unit hash, and adds the unit
+    hash to a set key prefixed with `RequestedQuota.prefix`. The memory usage
+    grows with the dimensionality of `prefix` and the configured quota limit
+    per prefix (i.e. the maximum cardinality of
+    `GrantedQuota.granted_unit_hashes`).
+
+    Many design decisions for this cardinality limiter were copied from the
+    sliding_windows rate limiter. You will find the next couple of paragraphs
+    in its documentation as well.
+
+    Why is checking quotas and using quotas two separate implementations?
+    Isn't that a time-of-check-time-of-use bug, and allows me to over-spend
+    quota when requests are happening concurrently?
+
+    1) It's desirable to first check quotas, then do a potentially fallible
+       operation, then consume quotas. This rate limiter is primarily going to
+       be used inside of the metrics string indexer to rate-limit database
+       writes. What we want to do there is: read DB, check rate limits,
+       write to DB, use rate limits.
+
+       If we did it any other way (the obvious one being to read DB,
+       check-and-use rate limits, write DB), crashes while writing to the
+       database can over-consume quotas. This is not a big problem if those
+       crashes are flukes, and especially not a problem if the crashes are
+       a result of an overloaded DB.
+
+       It is however a problem in case the consumer is crash-looping, or
+       crashing (quickly) for 100% of requests (e.g. due to schema
+       mismatches between code and DB that somehow don't surface during the
+       DB read). In that case the quotas would be consumed immediately and
+       incident recovery would require us to reset all quotas manually (or
+       disable rate limiting via some killswitch)
+
+    3) The redis backend (really the only backend we care about) already
+       has some consistency problems.
+
+       a) Redis only provides strong consistency and ability to
+          check-and-increment counters when all involved keys hit the same
+          Redis node. That means that a quota with prefix="org_id:123" can
+          only run on a single redis node. It also means that a global
+          quota (`prefix="global"`) would have to run on a single
+          (unchangeable) redis node to be strongly consistent. That's
+          however a problem for scalability.
+
+          There's no obvious way to make global quotas consistent with
+          per-org quotas this way, so right now it means that requests
+          containing multiple quotas with different `prefixes` cannot be
+          checked-and-incremented atomically even if we were to change the
+          rate-limiter's interface.
+
+       b) This is easily fixable, but because of the above, we
+          currently don't control Redis sharding at all, meaning that even
+          keys within a single quota's window will hit different Redis
+          node. This also helps further distribute the load internally.
+
+          Since we have given up on atomic check-and-increments in general
+          anyway, there's no reason to explicitly control sharding.
+    """
+
+    def __init__(
+        self,
+        cluster: str = "default",
+        num_shards: int = 3,
+        num_physical_shards: int = 3,
+    ) -> None:
+        """
+        :param cluster: Name of the redis cluster to use, to be configured with
+            the `redis.clusters` Sentry option (like any other redis cluster in
+            Sentry).
+        :param cluster_num_shards: The number of logical shards to have. This
+            controls the average set size in Redis.
+        :param cluster_num_physical_shards: The number of actual shards to
+            store. Controls how many keys of type "unordered set" there are in
+            Redis. The ratio `cluster_num_physical_shards / cluster_num_shards`
+            is a sampling rate, the lower it is, the less precise accounting
+            will be.
+        """
+        self.client = redis.redis_clusters.get(cluster)
+        assert 0 < num_physical_shards <= num_shards
+        self.num_shards = num_shards
+        self.num_physical_shards = num_physical_shards
+        super().__init__()
+
+    @staticmethod
+    def _get_timeseries_key(request: RequestedQuota, hash: Hash) -> str:
+        return f"cardinality:timeseries:{request.prefix}-{hash}"
+
+    def _get_read_sets_keys(self, request: RequestedQuota, timestamp: Timestamp) -> Sequence[str]:
+        oldest_time_bucket = list(request.quota.iter_window(timestamp))[-1]
+        return [
+            f"cardinality:sets:{request.prefix}-{shard}-{oldest_time_bucket}"
+            for shard in range(self.num_physical_shards)
+        ]
+
+    def _get_write_sets_keys(
+        self, request: RequestedQuota, timestamp: Timestamp, hash: Hash
+    ) -> Sequence[str]:
+        shard = hash % self.num_shards
+        if shard < self.num_physical_shards:
+            return [
+                f"cardinality:sets:{request.prefix}-{shard}-{time_bucket}"
+                for time_bucket in request.quota.iter_window(timestamp)
+            ]
+        else:
+            return []
+
+    def _get_set_cardinality_sample_factor(self) -> float:
+        return self.num_shards / self.num_physical_shards
+
+    def check_within_quotas(
+        self, requests: Sequence[RequestedQuota], timestamp: Optional[Timestamp] = None
+    ) -> Tuple[Timestamp, Sequence[GrantedQuota]]:
+        if timestamp is None:
+            timestamp = int(time.time())
+        else:
+            timestamp = int(timestamp)
+
+        unit_keys_to_get: List[str] = []
+        set_keys_to_count: List[str] = []
+
+        for request in requests:
+            for hash in request.unit_hashes:
+                unit_keys_to_get.append(self._get_timeseries_key(request, hash))
+
+            set_keys_to_count.extend(self._get_read_sets_keys(request, timestamp))
+
+        if not unit_keys_to_get and not set_keys_to_count:
+            # If there are no keys to fetch (i.e. there are no quotas to
+            # enforce), we can save the redis call entirely and just grant all
+            # quotas immediately.
+            return timestamp, [
+                GrantedQuota(
+                    request=request, granted_unit_hashes=request.unit_hashes, reached_quota=None
+                )
+                for request in requests
+            ]
+
+        with self.client.pipeline(transaction=False) as pipeline:
+            pipeline.mget(unit_keys_to_get)
+            # O(self.cluster_shard_factor * len(requests)), assuming there's
+            # only one per-org quota
+            for key in set_keys_to_count:
+                pipeline.scard(key)
+
+            results = iter(pipeline.execute())
+            unit_keys = dict(zip(unit_keys_to_get, next(results)))
+            set_counts = dict(zip(set_keys_to_count, results))
+
+        grants = []
+        cardinality_sample_factor = self._get_set_cardinality_sample_factor()
+        for request in requests:
+            granted_hashes = []
+
+            remaining_limit = max(
+                0,
+                request.quota.limit
+                - cardinality_sample_factor
+                * sum(set_counts[k] for k in self._get_read_sets_keys(request, timestamp)),
+            )
+
+            remaining_limit_running = remaining_limit
+            reached_quota = None
+
+            # for each hash in the request, check if:
+            # 1. the hash is in `unit_keys`. If so, it has already been seen in
+            #    this timewindow, and ingesting additional copies of it comes
+            #    at no cost (= ingesting multiple metric buckets of the same
+            #    timeseries only counts once against quota)
+            #
+            # 2. we still have budget/"remaining_limit". In that case,
+            #    accept/admit the hash as well and reduce the remaining quota.
+            #
+            # 3. we have observed a totally new hash. in that case set
+            #    `reached_quotas` for reporting purposes, but don't add the
+            #    hash to `granted_hashes` (which is our return value)
+            for hash in request.unit_hashes:
+                if unit_keys[self._get_timeseries_key(request, hash)]:
+                    granted_hashes.append(hash)
+                elif remaining_limit_running > 0:
+                    granted_hashes.append(hash)
+                    remaining_limit_running -= 1
+                else:
+                    reached_quota = request.quota
+
+            grants.append(
+                GrantedQuota(
+                    request=request,
+                    granted_unit_hashes=granted_hashes,
+                    reached_quota=reached_quota,
+                )
+            )
+
+        return timestamp, grants
+
+    def use_quotas(
+        self,
+        grants: Sequence[GrantedQuota],
+        timestamp: Timestamp,
+    ) -> None:
+        unit_keys_to_set = {}
+        set_keys_to_add = defaultdict(set)
+        set_keys_ttl = {}
+
+        for grant in grants:
+            key_ttl = grant.request.quota.window_seconds
+
+            for hash in grant.granted_unit_hashes:
+                unit_key = self._get_timeseries_key(grant.request, hash)
+                unit_keys_to_set[unit_key] = key_ttl
+
+                for set_key in self._get_write_sets_keys(grant.request, timestamp, hash):
+                    set_keys_ttl[set_key] = key_ttl
+                    set_keys_to_add[set_key].add(hash)
+
+        if not set_keys_to_add and not unit_keys_to_set:
+            # If there are no keys to mutate (i.e. there are no quotas to
+            # enforce), we can save the redis call entirely.
+            return
+
+        with self.client.pipeline(transaction=False) as pipeline:
+            for key, ttl in unit_keys_to_set.items():
+                pipeline.setex(key, ttl, 1)
+
+            for key, items in set_keys_to_add.items():
+                items_list = list(items)
+                while items_list:
+                    # SADD can take multiple arguments, but if you provide too
+                    # many you end up with very long-running redis commands.
+                    pipeline.sadd(key, *items_list[:200])
+                    items_list = items_list[200:]
+
+                pipeline.expire(key, set_keys_ttl[key])
+
+            pipeline.execute()

+ 4 - 2
src/sentry/ratelimits/sliding_windows.py

@@ -114,7 +114,8 @@ class Quota:
 
     def iter_window(self, request_timestamp: int) -> Iterator[int]:
         """
-        Iterate over the quota's window, yielding timestamps representing each granule.
+        Iterate over the quota's window, yielding values representing each
+        (absolute) granule.
 
         This function is used to calculate keys for storing the number of
         requests made in each granule.
@@ -123,11 +124,12 @@ class Quota:
         starting with the key to which a currently-processed request should be
         added. That request's timestamp is `request_timestamp`.
 
-        * `request_timestamp / self.granularity_seconds`
         * `request_timestamp / self.granularity_seconds - 1`
         * `request_timestamp / self.granularity_seconds - 2`
+        * `request_timestamp / self.granularity_seconds - 3`
         * ...
         """
+
         value = request_timestamp // self.granularity_seconds
 
         for granule_i in range(self.window_seconds // self.granularity_seconds):

+ 35 - 27
src/sentry/sentry_metrics/indexer/limiters/cardinality.py

@@ -25,8 +25,8 @@ OrgId = int
 class CardinalityLimiterState:
     _cardinality_limiter: CardinalityLimiter
     _use_case_id: UseCaseKey
-    _grants: Sequence[GrantedQuota]
-    _timestamp: Timestamp
+    _grants: Optional[Sequence[GrantedQuota]]
+    _timestamp: Optional[Timestamp]
     keys_to_remove: Sequence[PartitionIdxOffset]
 
 
@@ -38,7 +38,7 @@ def _build_quota_key(namespace: str, org_id: Optional[OrgId]) -> str:
 
 
 @metrics.wraps("sentry_metrics.indexer.construct_quotas")
-def _construct_quotas(use_case_id: UseCaseKey) -> Sequence[Quota]:
+def _construct_quotas(use_case_id: UseCaseKey) -> Optional[Quota]:
     """
     Construct write limit's quotas based on current sentry options.
 
@@ -46,20 +46,20 @@ def _construct_quotas(use_case_id: UseCaseKey) -> Sequence[Quota]:
     when sentry.options are.
     """
     if use_case_id == UseCaseKey.PERFORMANCE:
-        return [
-            Quota(**args)
-            for args in options.get("sentry-metrics.cardinality-limiter.limits.performance.per-org")
-        ]
+        quota_args = options.get("sentry-metrics.cardinality-limiter.limits.performance.per-org")
     elif use_case_id == UseCaseKey.RELEASE_HEALTH:
-        return [
-            Quota(**args)
-            for args in options.get(
-                "sentry-metrics.cardinality-limiter.limits.releasehealth.per-org"
-            )
-        ]
+        quota_args = options.get("sentry-metrics.cardinality-limiter.limits.releasehealth.per-org")
     else:
         raise ValueError(use_case_id)
 
+    if quota_args:
+        if len(quota_args) > 1:
+            raise ValueError("multiple quotas are actually unsupported")
+
+        return Quota(**quota_args[0])
+
+    return None
+
 
 class InboundMessage(TypedDict):
     # Note: This is only the subset of fields we access in this file.
@@ -94,22 +94,29 @@ class TimeseriesCardinalityLimiter:
             request_hashes[prefix].add(message_hash)
 
         requested_quotas = []
-        configured_quotas = _construct_quotas(use_case_id)
-        for prefix, hashes in request_hashes.items():
-            requested_quotas.append(
-                RequestedQuota(prefix=prefix, unit_hashes=hashes, quotas=configured_quotas)
-            )
+        configured_quota = _construct_quotas(use_case_id)
+
+        grants = None
+        timestamp = None
+
+        if configured_quota is None:
+            keys_to_remove = {}
+        else:
+            for prefix, hashes in request_hashes.items():
+                requested_quotas.append(
+                    RequestedQuota(prefix=prefix, unit_hashes=hashes, quota=configured_quota)
+                )
 
-        timestamp, grants = self.backend.check_within_quotas(requested_quotas)
+            timestamp, grants = self.backend.check_within_quotas(requested_quotas)
 
-        keys_to_remove = hash_to_offset
-        # make sure that hash_to_offset is no longer used, as the underlying
-        # dict will be mutated
-        del hash_to_offset
+            keys_to_remove = hash_to_offset
+            # make sure that hash_to_offset is no longer used, as the underlying
+            # dict will be mutated
+            del hash_to_offset
 
-        for grant in grants:
-            for hash in grant.granted_unit_hashes:
-                del keys_to_remove[grant.request.prefix, hash]
+            for grant in grants:
+                for hash in grant.granted_unit_hashes:
+                    del keys_to_remove[grant.request.prefix, hash]
 
         return CardinalityLimiterState(
             _cardinality_limiter=self.backend,
@@ -120,7 +127,8 @@ class TimeseriesCardinalityLimiter:
         )
 
     def apply_cardinality_limits(self, state: CardinalityLimiterState) -> None:
-        state._cardinality_limiter.use_quotas(state._grants, state._timestamp)
+        if state._grants is not None and state._timestamp is not None:
+            state._cardinality_limiter.use_quotas(state._grants, state._timestamp)
 
 
 class TimeseriesCardinalityLimiterFactory:

+ 227 - 0
tests/sentry/ratelimits/test_cardinality.py

@@ -0,0 +1,227 @@
+from typing import Collection, Optional, Sequence
+
+import pytest
+
+from sentry.ratelimits.cardinality import (
+    GrantedQuota,
+    Quota,
+    RedisCardinalityLimiter,
+    RequestedQuota,
+)
+
+
+@pytest.fixture
+def limiter():
+    return RedisCardinalityLimiter()
+
+
+class LimiterHelper:
+    """
+    Wrapper interface around the rate limiter, with specialized, stateful and
+    primitive interface for more readable tests.
+    """
+
+    def __init__(self, limiter: RedisCardinalityLimiter):
+        self.limiter = limiter
+        self.quota = Quota(window_seconds=3600, granularity_seconds=60, limit=10)
+        self.timestamp = 3600
+
+    def add_value(self, value: int) -> Optional[int]:
+        values = self.add_values([value])
+        if values:
+            (value,) = values
+            return value
+
+    def add_values(self, values: Sequence[int]) -> Collection[int]:
+        request = RequestedQuota(prefix="hello", unit_hashes=values, quota=self.quota)
+        new_timestamp, grants = self.limiter.check_within_quotas(
+            [request], timestamp=self.timestamp
+        )
+        self.limiter.use_quotas(grants, new_timestamp)
+        (grant,) = grants
+        return grant.granted_unit_hashes
+
+
+def test_basic(limiter: RedisCardinalityLimiter):
+    helper = LimiterHelper(limiter)
+
+    for _ in range(20):
+        assert helper.add_value(1) == 1
+
+    for _ in range(20):
+        assert helper.add_value(2) == 2
+
+    assert [helper.add_value(10 + i) for i in range(100)] == list(range(10, 18)) + [None] * 92
+
+    helper.timestamp += 3600
+
+    # an hour has passed, we should be able to admit 10 new keys
+    #
+    # note: we only virtually advanced the timestamp. The
+    # `cardinality:timeseries` keys for 1, 2 still exist in this test setup
+    # (and we would admit them on top of 10..20), but they won't in a
+    # real-world scenario
+    assert [helper.add_value(10 + i) for i in range(100)] == list(range(10, 20)) + [None] * 90
+
+
+def test_multiple_prefixes(limiter: RedisCardinalityLimiter):
+    """
+    Test multiple prefixes/organizations and just make sure we're not leaking
+    state between prefixes.
+
+    * `a` only consumes 5 of the quota first and runs out of quota in the
+      second `check_within_quotas` call
+    * `b` immediately exceeds the quota.
+    * `c` fits comfortably into the quota at first (fills out the limit exactly)
+    """
+    quota = Quota(window_seconds=3600, granularity_seconds=60, limit=10)
+    requests = [
+        RequestedQuota(prefix="a", unit_hashes={1, 2, 3, 4, 5}, quota=quota),
+        RequestedQuota(prefix="b", unit_hashes={1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, quota=quota),
+        RequestedQuota(
+            prefix="c", unit_hashes={11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, quota=quota
+        ),
+    ]
+    new_timestamp, grants = limiter.check_within_quotas(requests)
+
+    assert grants == [
+        GrantedQuota(request=requests[0], granted_unit_hashes=[1, 2, 3, 4, 5], reached_quota=None),
+        GrantedQuota(
+            request=requests[1],
+            granted_unit_hashes=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
+            reached_quota=quota,
+        ),
+        GrantedQuota(
+            request=requests[2],
+            granted_unit_hashes=[11, 12, 13, 14, 15, 16, 17, 18, 19, 20],
+            reached_quota=None,
+        ),
+    ]
+    limiter.use_quotas(grants, new_timestamp)
+
+    requests = [
+        RequestedQuota(prefix="a", unit_hashes={6, 7, 8, 9, 10, 11}, quota=quota),
+        RequestedQuota(prefix="b", unit_hashes={1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, quota=quota),
+        RequestedQuota(
+            prefix="c", unit_hashes={11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21}, quota=quota
+        ),
+    ]
+    new_timestamp, grants = limiter.check_within_quotas(requests)
+
+    assert grants == [
+        GrantedQuota(
+            request=requests[0], granted_unit_hashes=[6, 7, 8, 9, 10], reached_quota=quota
+        ),
+        GrantedQuota(
+            request=requests[1],
+            granted_unit_hashes=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
+            reached_quota=quota,
+        ),
+        GrantedQuota(
+            request=requests[2],
+            granted_unit_hashes=[11, 12, 13, 14, 15, 16, 17, 18, 19, 20],
+            reached_quota=quota,
+        ),
+    ]
+    limiter.use_quotas(grants, new_timestamp)
+
+
+def test_sliding(limiter: RedisCardinalityLimiter):
+    """
+    Our rate limiter has a sliding window of [now - 1 hour ; now], with a
+    granularity of 1 hour.
+
+    What that means is that, as time moves on, old hashes should be forgotten
+    _one by one_, and the quota budget they occupy should become _gradually_
+    available to newer, never-seen-before items.
+    """
+
+    helper = LimiterHelper(limiter)
+
+    admissions = []
+
+    # We start with a limit of 10 new hashes per hour. We add a new hash and
+    # advance time by 6 minutes, _100 times_
+    for i in range(100):
+        admissions.append(helper.add_value(i))
+        helper.timestamp += 360
+
+    # We assert that _all 100 items_ are admitted/accepted. This is because we
+    # have advanced time between each item. We have "slept" for 6 minutes a 100
+    # times, so we actually added 100 hashes over a span of 10 hours. That
+    # should totally fit into our limit.
+    assert admissions == list(range(100))
+
+    admissions = []
+    expected = []
+
+    # 100 hashes over 10 hours is "basically" 10 hashes over 1 hour. Since we
+    # added items over a span of 10 hours, the limiter should've forgotten
+    # about 90% of items already, meaning that in a real-world scenario, we
+    # should accept 90 new hashes.
+    #
+    # But since we only advanced time virtually (and not in Redis for TTL
+    # purposes), we actually only accept 10 items... a flaw in this test.
+    #
+    # Anyway, in the previous loop we added an item every 6 minutes. Now we're
+    # adding an item 10 times per 6 minutes. So we should see every 10th item
+    # being admitted.
+    for i in range(100, 200):
+        admissions.append(helper.add_value(i))
+        expected.append(i if i % 10 == 0 else None)
+        helper.timestamp += 36
+
+    assert admissions == expected
+
+
+def test_sampling(limiter: RedisCardinalityLimiter):
+    """
+    demonstrate behavior when "shard sampling" is active. If one out of 10
+    shards for an organization are stored, it is still possible to limit the
+    exactly correct amount of hashes, for certain hash values.
+    """
+    limiter.num_physical_shards = 1
+    limiter.num_shards = 10
+    helper = LimiterHelper(limiter)
+
+    # when adding "hashes" 0..10 in ascending order, the first hash will fill up the physical shard
+    admissions = [helper.add_value(i) for i in reversed(range(10))]
+    assert admissions == list(reversed(range(10)))
+
+    # we have stored only one shard out of 10, meaning the set count reported
+    # from redis is 1, but the total counts are extrapolated correctly. like
+    # without sampling, assert that the limit of 10 hashes is correctly applied
+    # and we no longer accept additional hashes beyond 10.
+    admissions = [helper.add_value(i) for i in range(100, 110)]
+    assert admissions == [None] * 10
+
+
+def test_sampling_going_bad(limiter: RedisCardinalityLimiter):
+    """
+    test an edgecase of set sampling in the cardinality limiter. it is not
+    exactly desired behavior but a known sampling artifact
+    """
+    limiter.num_physical_shards = 1
+    limiter.num_shards = 10
+    helper = LimiterHelper(limiter)
+
+    # when adding "hashes" 0..10 in ascending order, the first hash will fill
+    # up the physical shard, and a total count of 10 is extrapolated from that
+    admissions = [helper.add_value(i) for i in range(10)]
+    assert admissions == [0] + [None] * 9
+
+
+def test_regression_mixed_order(limiter: RedisCardinalityLimiter):
+    """
+    Regression test to assert we still accept hashes after dropping some
+    within the same request, regardless of set order.
+    """
+
+    helper = LimiterHelper(limiter)
+    # this hash certainly fits into the default limit of 10 hashes
+    assert helper.add_value(5) == 5
+    # here, only 10 should be limited, as it is the 11th item being fed to the indexer.
+    # 5 was admitted in an earlier call, and 0..9 are admitted right before it.
+    # there used to be a bug where anything after 10 (i.e. 5) was dropped as
+    # well (due to a wrong `break` somewhere in a loop)
+    assert helper.add_values([0, 1, 2, 3, 4, 6, 7, 8, 9, 10, 5]) == [0, 1, 2, 3, 4, 6, 7, 8, 9, 5]

+ 11 - 7
tests/sentry/sentry_metrics/limiters/test_cardinality_limiter.py

@@ -16,7 +16,7 @@ from sentry.sentry_metrics.indexer.limiters.cardinality import TimeseriesCardina
 class MockCardinalityLimiter(CardinalityLimiter):
     def __init__(self):
         self.grant_hashes = 10
-        self.assert_quotas = []
+        self.assert_quota = None
 
     def check_within_quotas(
         self, requests: Sequence[RequestedQuota], timestamp: Optional[Timestamp] = None
@@ -29,7 +29,7 @@ class MockCardinalityLimiter(CardinalityLimiter):
         grants = []
         granted = 0
         for request in requests:
-            assert request.quotas == self.assert_quotas
+            assert request.quota == self.assert_quota
             granted_hashes = set()
             for hash in request.unit_hashes:
                 if granted < self.grant_hashes:
@@ -38,7 +38,9 @@ class MockCardinalityLimiter(CardinalityLimiter):
 
             # reached_quotas is incorrect, but we don't necessarily need it for testing
             grants.append(
-                GrantedQuota(request=request, granted_unit_hashes=granted_hashes, reached_quotas=[])
+                GrantedQuota(
+                    request=request, granted_unit_hashes=granted_hashes, reached_quota=None
+                )
             )
 
         return timestamp, grants
@@ -52,8 +54,12 @@ class MockCardinalityLimiter(CardinalityLimiter):
 
 
 def test_reject_all(set_sentry_option):
-    set_sentry_option("sentry-metrics.cardinality-limiter.limits.releasehealth.per-org", [])
+    set_sentry_option(
+        "sentry-metrics.cardinality-limiter.limits.releasehealth.per-org",
+        [{"window_seconds": 3600, "granularity_seconds": 60, "limit": 0}],
+    )
     backend = MockCardinalityLimiter()
+    backend.assert_quota = Quota(window_seconds=3600, granularity_seconds=60, limit=0)
     backend.grant_hashes = 0
     limiter = TimeseriesCardinalityLimiter("", backend)
 
@@ -75,9 +81,7 @@ def test_reject_partial(set_sentry_option):
     )
 
     backend = MockCardinalityLimiter()
-    backend.assert_quotas = [
-        Quota(window_seconds=3600, granularity_seconds=60, limit=1),
-    ]
+    backend.assert_quota = Quota(window_seconds=3600, granularity_seconds=60, limit=1)
     backend.grant_hashes = 1
     limiter = TimeseriesCardinalityLimiter("", backend)
 

+ 12 - 1
tests/sentry/sentry_metrics/test_multiprocess_steps.py

@@ -508,17 +508,28 @@ def test_process_messages_rate_limited(caplog, settings) -> None:
     assert "dropped_message" in caplog.text
 
 
-def test_process_messages_cardinality_limited(caplog, settings, monkeypatch) -> None:
+def test_process_messages_cardinality_limited(
+    caplog, settings, monkeypatch, set_sentry_option
+) -> None:
     """
     Test that the message processor correctly calls the cardinality limiter.
     """
     settings.SENTRY_METRICS_INDEXER_DEBUG_LOG_SAMPLE_RATE = 1.0
 
+    # set any limit at all to ensure we actually use the underlying rate limiter
+    set_sentry_option(
+        "sentry-metrics.cardinality-limiter.limits.releasehealth.per-org",
+        [{"window_seconds": 3600, "granularity_seconds": 60, "limit": 0}],
+    )
+
     class MockCardinalityLimiter(CardinalityLimiter):
         def check_within_quotas(self, requested_quotas):
             # Grant nothing, limit everything
             return 123, []
 
+        def use_quotas(self, grants, timestamp):
+            pass
+
     monkeypatch.setitem(
         cardinality_limiter_factory.rate_limiters,
         "releasehealth",