Browse Source

feat(indexer-spanner): Introduce basic model for indexer on spanner (#37726)

Added a model for the indexer on spanner with some api's which can be used for performing INSERTs into spanner.
Nikhar Saxena 2 years ago
parent
commit
c7f2a28ec5

+ 83 - 0
src/sentry/sentry_metrics/indexer/cloudspanner/cloudspanner_model.py

@@ -0,0 +1,83 @@
+from collections import namedtuple
+from enum import Enum
+from typing import Any, Sequence
+
+_COLUMNS = [
+    "id",
+    "decoded_id",
+    "string",
+    "organization_id",
+    "date_added",
+    "last_seen",
+    "retention_days",
+]
+
+SpannerIndexerModel = namedtuple(
+    "SpannerIndexerModel",
+    [
+        "id",
+        "decoded_id",
+        "string",
+        "organization_id",
+        "date_added",
+        "last_seen",
+        "retention_days",
+    ],
+)
+
+
+def get_column_names() -> Sequence[str]:
+    return _COLUMNS
+
+
+class CloudSpannerInsertMode(Enum):
+    """
+    The method to use when inserting data into CloudSpanner.
+    """
+
+    DML = 1
+    MUTATION = 2
+
+
+class CloudSpannerDBAccessor:
+    """
+    Provides methods to perform INSERT's and READ's on CloudSpanner.
+    """
+
+    def __init__(self, database: Any, table_name: str, insert_mode: CloudSpannerInsertMode) -> None:
+        self.__database = database
+        self.__table_name = table_name
+        self.__insert = (
+            self.__insert_using_mutation
+            if insert_mode == CloudSpannerInsertMode.MUTATION
+            else self.__insert_using_dml
+        )
+
+    def __insert_using_dml(self, models: Sequence[SpannerIndexerModel]) -> None:
+        """
+        Insert data using DML. Raise any errors that occur so that the
+        application can handle them.
+        """
+
+        def insert_dml(transaction: Any) -> None:
+            """
+            Inserts data on a database table in a transaction context.
+            """
+            transaction.insert(self.__table_name, columns=get_column_names(), values=models)
+
+        self.__database.run_in_transaction(insert_dml)
+
+    def __insert_using_mutation(self, models: Sequence[SpannerIndexerModel]) -> None:
+        """
+        Insert data using Mutation. Raise any errors that occur so that the
+        application can handle them.
+        """
+        with self.__database.batch() as batch:
+            batch.insert(
+                table=self.__table_name,
+                columns=get_column_names(),
+                values=models,
+            )
+
+    def insert(self, models: Sequence[SpannerIndexerModel]) -> None:
+        return self.__insert(models)

+ 0 - 1
tests/sentry/sentry_metrics/test_cloudspanner.py

@@ -27,5 +27,4 @@ def test_id_codec(value) -> None:
 def test_spanner_indexer_service():
     # TODO: Provide instance_id and database_id when running the test
     span_indexer = CloudSpannerIndexer(instance_id="", database_id="")
-    span_indexer.setup()
     span_indexer.validate()

+ 145 - 0
tests/sentry/sentry_metrics/test_cloudspanner_model.py

@@ -0,0 +1,145 @@
+import random
+import string
+import uuid
+from datetime import datetime
+from typing import Sequence
+
+import pytest
+
+from sentry.sentry_metrics.indexer.cloudspanner.cloudspanner import RawCloudSpannerIndexer
+from sentry.sentry_metrics.indexer.cloudspanner.cloudspanner_model import (
+    CloudSpannerDBAccessor,
+    CloudSpannerInsertMode,
+    SpannerIndexerModel,
+    get_column_names,
+)
+
+
+def test_cloudspanner_model_column_names() -> None:
+    assert get_column_names() == [
+        "id",
+        "decoded_id",
+        "string",
+        "organization_id",
+        "date_added",
+        "last_seen",
+        "retention_days",
+    ]
+
+
+def get_random_string(length: int) -> str:
+    return "".join(random.choice(string.ascii_letters) for _ in range(length))
+
+
+@pytest.mark.skip(reason="TODO: Implement it correctly")
+@pytest.mark.parametrize(
+    "mode,models",
+    [
+        pytest.param(
+            CloudSpannerInsertMode.MUTATION,
+            [
+                SpannerIndexerModel(
+                    id=(uuid.uuid4().int & (1 << 64) - 1) >> 1,
+                    decoded_id=12345,
+                    string=get_random_string(100),
+                    organization_id=1,
+                    retention_days=90,
+                    date_added=datetime.now(),
+                    last_seen=datetime.now(),
+                )
+            ],
+            id="mutation single write",
+        ),
+        pytest.param(
+            CloudSpannerInsertMode.MUTATION,
+            [
+                SpannerIndexerModel(
+                    id=(uuid.uuid4().int & (1 << 64) - 1) >> 1,
+                    decoded_id=12345,
+                    string=get_random_string(100),
+                    organization_id=1,
+                    retention_days=90,
+                    date_added=datetime.now(),
+                    last_seen=datetime.now(),
+                ),
+                SpannerIndexerModel(
+                    id=(uuid.uuid4().int & (1 << 64) - 1) >> 1,
+                    decoded_id=12345,
+                    string=get_random_string(100),
+                    organization_id=1,
+                    retention_days=90,
+                    date_added=datetime.now(),
+                    last_seen=datetime.now(),
+                ),
+                SpannerIndexerModel(
+                    id=(uuid.uuid4().int & (1 << 64) - 1) >> 1,
+                    decoded_id=12345,
+                    string=get_random_string(100),
+                    organization_id=1,
+                    retention_days=90,
+                    date_added=datetime.now(),
+                    last_seen=datetime.now(),
+                ),
+            ],
+            id="mutation multi write",
+        ),
+        pytest.param(
+            CloudSpannerInsertMode.DML,
+            [
+                SpannerIndexerModel(
+                    id=(uuid.uuid4().int & (1 << 64) - 1) >> 1,
+                    decoded_id=12345,
+                    string=get_random_string(100),
+                    organization_id=1,
+                    retention_days=90,
+                    date_added=datetime.now(),
+                    last_seen=datetime.now(),
+                )
+            ],
+            id="dml single write",
+        ),
+        pytest.param(
+            CloudSpannerInsertMode.DML,
+            [
+                SpannerIndexerModel(
+                    id=(uuid.uuid4().int & (1 << 64) - 1) >> 1,
+                    decoded_id=12345,
+                    string=get_random_string(100),
+                    organization_id=1,
+                    retention_days=90,
+                    date_added=datetime.now(),
+                    last_seen=datetime.now(),
+                ),
+                SpannerIndexerModel(
+                    id=(uuid.uuid4().int & (1 << 64) - 1) >> 1,
+                    decoded_id=12345,
+                    string=get_random_string(100),
+                    organization_id=1,
+                    retention_days=90,
+                    date_added=datetime.now(),
+                    last_seen=datetime.now(),
+                ),
+                SpannerIndexerModel(
+                    id=(uuid.uuid4().int & (1 << 64) - 1) >> 1,
+                    decoded_id=12345,
+                    string=get_random_string(100),
+                    organization_id=1,
+                    retention_days=90,
+                    date_added=datetime.now(),
+                    last_seen=datetime.now(),
+                ),
+            ],
+            id="dml multi write",
+        ),
+    ],
+)
+def test_spanner_indexer_write(mode: CloudSpannerInsertMode, models: Sequence[SpannerIndexerModel]):
+    # TODO: Provide instance_id and database_id when running the test
+    spanner_indexer = RawCloudSpannerIndexer(instance_id="", database_id="")
+    spanner_indexer.validate()
+
+    writer = CloudSpannerDBAccessor(spanner_indexer.database, "perfstringindexer", mode)
+    try:
+        writer.insert(models)
+    except Exception as exc:
+        assert False, f"spanner writer raised an exception {exc} for {mode}"