Browse Source

ref(sentry-metrics): Add MetricsKeyIndexer table (#28914)

ref(sentry-metrics): Add MetricsKeyIndexer table
This table is used to hold the relationship between a metric string (metric name, tag key or tag value) and its int representation, which is generate from the sequence on this table (the `id` field)
MeredithAnya 3 years ago
parent
commit
669cccc8b3

+ 1 - 1
.github/workflows/snuba-integration-test.yml

@@ -18,7 +18,7 @@ jobs:
     env:
       USE_SNUBA: 1
       MIGRATIONS_TEST_MIGRATE: 1
-      USE_REDIS_INDEXER: 1
+      USE_INDEXER: 1
 
     steps:
       - uses: actions/checkout@v2

+ 1 - 1
migrations_lockfile.txt

@@ -6,5 +6,5 @@ To resolve this, rebase against latest master and regenerate your migration. Thi
 will then be regenerated, and you should be able to merge without conflicts.
 
 nodestore: 0002_nodestore_no_dictfield
-sentry: 0234_grouphistory
+sentry: 0235_add_metricskeyindexer_table
 social_auth: 0001_initial

+ 1 - 0
src/sentry/conf/server.py

@@ -334,6 +334,7 @@ INSTALLED_APPS = (
     "sentry.analytics.events",
     "sentry.nodestore",
     "sentry.search",
+    "sentry.sentry_metrics.indexer",
     "sentry.snuba",
     "sentry.lang.java.apps.Config",
     "sentry.lang.javascript.apps.Config",

+ 55 - 0
src/sentry/migrations/0235_add_metricskeyindexer_table.py

@@ -0,0 +1,55 @@
+# Generated by Django 2.2.24 on 2021-10-04 18:19
+
+import django.utils.timezone
+from django.db import migrations, models
+
+import sentry.db.models.fields.bounded
+
+
+class Migration(migrations.Migration):
+    # This flag is used to mark that a migration shouldn't be automatically run in
+    # production. We set this to True for operations that we think are risky and want
+    # someone from ops to run manually and monitor.
+    # General advice is that if in doubt, mark your migration as `is_dangerous`.
+    # Some things you should always mark as dangerous:
+    # - Large data migrations. Typically we want these to be run manually by ops so that
+    #   they can be monitored. Since data migrations will now hold a transaction open
+    #   this is even more important.
+    # - Adding columns to highly active tables, even ones that are NULL.
+    is_dangerous = False
+
+    # This flag is used to decide whether to run this migration in a transaction or not.
+    # By default we prefer to run in a transaction, but for migrations where you want
+    # to `CREATE INDEX CONCURRENTLY` this needs to be set to False. Typically you'll
+    # want to create an index concurrently when adding one to an existing table.
+    # You'll also usually want to set this to `False` if you're writing a data
+    # migration, since we don't want the entire migration to run in one long-running
+    # transaction.
+    atomic = True
+
+    dependencies = [
+        ("sentry", "0234_grouphistory"),
+    ]
+
+    operations = [
+        migrations.CreateModel(
+            name="MetricsKeyIndexer",
+            fields=[
+                (
+                    "id",
+                    sentry.db.models.fields.bounded.BoundedBigAutoField(
+                        primary_key=True, serialize=False
+                    ),
+                ),
+                ("string", models.CharField(max_length=200)),
+                ("date_added", models.DateTimeField(default=django.utils.timezone.now)),
+            ],
+            options={
+                "db_table": "sentry_metricskeyindexer",
+            },
+        ),
+        migrations.AddConstraint(
+            model_name="metricskeyindexer",
+            constraint=models.UniqueConstraint(fields=("string",), name="unique_string"),
+        ),
+    ]

+ 4 - 2
src/sentry/new_migrations/monkey/executor.py

@@ -5,7 +5,7 @@ from django.contrib.contenttypes.management import RenameContentType
 from django.db.migrations.executor import MigrationExecutor
 from django.db.migrations.operations import SeparateDatabaseAndState
 from django.db.migrations.operations.fields import FieldOperation
-from django.db.migrations.operations.models import ModelOperation
+from django.db.migrations.operations.models import IndexOperation, ModelOperation
 
 logger = logging.getLogger(__name__)
 
@@ -37,7 +37,9 @@ class SentryMigrationExecutor(MigrationExecutor):
         def _check_operations(operations):
             failed_ops = []
             for operation in operations:
-                if isinstance(operation, (FieldOperation, ModelOperation, RenameContentType)):
+                if isinstance(
+                    operation, (FieldOperation, ModelOperation, RenameContentType, IndexOperation)
+                ):
                     continue
                 elif isinstance(operation, SeparateDatabaseAndState):
                     failed_ops.extend(_check_operations(operation.database_operations))

+ 6 - 6
src/sentry/release_health/metrics.py

@@ -40,32 +40,32 @@ def get_tag_values_list(org_id: int, values: Sequence[str]) -> Sequence[int]:
 
 
 def metric_id(org_id: int, name: str) -> int:
-    index = indexer.resolve(org_id, name)  # type: ignore
+    index = indexer.resolve(name)  # type: ignore
     if index is None:
         raise MetricIndexNotFound(name)
     return index  # type: ignore
 
 
 def tag_key(org_id: int, name: str) -> str:
-    index = indexer.resolve(org_id, name)  # type: ignore
+    index = indexer.resolve(name)  # type: ignore
     if index is None:
         raise MetricIndexNotFound(name)
     return f"tags[{index}]"
 
 
 def tag_value(org_id: int, name: str) -> int:
-    index = indexer.resolve(org_id, name)  # type: ignore
+    index = indexer.resolve(name)  # type: ignore
     if index is None:
         raise MetricIndexNotFound(name)
     return index  # type: ignore
 
 
 def try_get_string_index(org_id: int, name: str) -> Optional[int]:
-    return indexer.resolve(org_id, name)  # type: ignore
+    return indexer.resolve(name)  # type: ignore
 
 
 def reverse_tag_value(org_id: int, index: int) -> str:
-    str_value = indexer.reverse_resolve(org_id, index)  # type: ignore
+    str_value = indexer.reverse_resolve(index)  # type: ignore
     # If the value can't be reversed it's very likely a real programming bug
     # instead of something to be caught down: We probably got back a value from
     # Snuba that's not in the indexer => partial data loss
@@ -338,7 +338,7 @@ class MetricsReleaseHealthBackend(ReleaseHealthBackend):
         rv = {}
 
         for project_id, release in project_releases:
-            release_tag_value = indexer.resolve(org_id, release)  # type: ignore
+            release_tag_value = indexer.resolve(release)  # type: ignore
             if release_tag_value is None:
                 # Don't emit empty releases -- for exact compatibility with
                 # sessions table backend.

+ 4 - 4
src/sentry/sentry_metrics/indexer/base.py

@@ -11,10 +11,10 @@ class StringIndexer(Service):  # type: ignore
 
     __all__ = ("record", "resolve", "reverse_resolve", "bulk_record")
 
-    def bulk_record(self, org_id: int, strings: List[str]) -> Dict[str, int]:
+    def bulk_record(self, strings: List[str]) -> Dict[str, int]:
         raise NotImplementedError()
 
-    def record(self, org_id: int, string: str) -> int:
+    def record(self, string: str) -> int:
         """Store a string and return the integer ID generated for it
 
         With every call to this method, the lifetime of the entry will be
@@ -22,7 +22,7 @@ class StringIndexer(Service):  # type: ignore
         """
         raise NotImplementedError()
 
-    def resolve(self, org_id: int, string: str) -> Optional[int]:
+    def resolve(self, string: str) -> Optional[int]:
         """Lookup the integer ID for a string.
 
         Does not affect the lifetime of the entry.
@@ -31,7 +31,7 @@ class StringIndexer(Service):  # type: ignore
         """
         raise NotImplementedError()
 
-    def reverse_resolve(self, org_id: int, id: int) -> Optional[str]:
+    def reverse_resolve(self, id: int) -> Optional[str]:
         """Lookup the stored string for a given integer ID.
 
         Returns None if the entry cannot be found.

+ 1 - 2
src/sentry/sentry_metrics/indexer/indexer_consumer.py

@@ -36,7 +36,6 @@ class MetricsIndexerWorker(AbstractBatchWorker):  # type: ignore
     def process_message(self, message: Any) -> MutableMapping[str, Any]:
         parsed_message: MutableMapping[str, Any] = json.loads(message.value(), use_rapid_json=True)
 
-        org_id = parsed_message["org_id"]
         metric_name = parsed_message["name"]
         tags = parsed_message["tags"]
 
@@ -46,7 +45,7 @@ class MetricsIndexerWorker(AbstractBatchWorker):  # type: ignore
             *tags.values(),
         }
 
-        mapping = indexer.bulk_record(org_id, list(strings))  # type: ignore
+        mapping = indexer.bulk_record(list(strings))  # type: ignore
 
         new_tags = {mapping[k]: mapping[v] for k, v in tags.items()}
 

+ 3 - 3
src/sentry/sentry_metrics/indexer/mock.py

@@ -31,13 +31,13 @@ class SimpleIndexer(StringIndexer):
         self._strings: DefaultDict[str, int] = defaultdict(self._counter.__next__)
         self._reverse: Dict[int, str] = {}
 
-    def record(self, org_id: int, string: str) -> int:
+    def record(self, string: str) -> int:
         return self._record(string)
 
-    def resolve(self, org_id: int, string: str) -> Optional[int]:
+    def resolve(self, string: str) -> Optional[int]:
         return self._strings.get(string)
 
-    def reverse_resolve(self, org_id: int, id: int) -> Optional[str]:
+    def reverse_resolve(self, id: int) -> Optional[str]:
         return self._reverse.get(id)
 
     def _record(self, string: str) -> int:

+ 30 - 0
src/sentry/sentry_metrics/indexer/models.py

@@ -0,0 +1,30 @@
+from typing import Any
+
+from django.db import connections, models, router
+from django.utils import timezone
+
+from sentry.db.models import Model
+
+
+class MetricsKeyIndexer(Model):  # type: ignore
+    __include_in_export__ = False
+
+    string = models.CharField(max_length=200)
+    date_added = models.DateTimeField(default=timezone.now)
+
+    class Meta:
+        db_table = "sentry_metricskeyindexer"
+        app_label = "sentry"
+        constraints = [
+            models.UniqueConstraint(fields=["string"], name="unique_string"),
+        ]
+
+    @classmethod
+    def get_next_values(cls, num: int) -> Any:
+        using = router.db_for_write(cls)
+        connection = connections[using].cursor()
+
+        connection.execute(
+            "SELECT nextval('sentry_metricskeyindexer_id_seq') from generate_series(1,%s)", [num]
+        )
+        return connection.fetchall()

Some files were not shown because too many files changed in this diff