Browse Source

[ymq] fix support of old queue format without version & add test

init
alexbogo 2 years ago
parent
commit
4f0101f6bb

+ 0 - 2
ydb/core/ymq/actor/queue_schema.cpp

@@ -1065,8 +1065,6 @@ void TCreateQueueSchemaActorV2::MatchQueueAttributes(
 ) {
     Become(&TCreateQueueSchemaActorV2::MatchAttributes);
 
-    Y_VERIFY(currentVersion != 0);
-
     TDbQueriesMaker queryMaker(
         Cfg().GetRoot(),
         QueuePath_.UserName,

+ 97 - 0
ydb/tests/functional/sqs/common/test_format_without_version.py

@@ -0,0 +1,97 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import pytest
+import uuid
+import time
+
+from hamcrest import assert_that, equal_to
+
+from ydb.tests.library.sqs.test_base import KikimrSqsTestBase, get_test_with_sqs_installation_by_path, get_test_with_sqs_tenant_installation, IS_FIFO_PARAMS
+from ydb.tests.library.sqs.tables import create_queue_tables
+from ydb.tests.library.common.types import TabletTypes
+
+from ydb.tests.library.sqs.matchers import ReadResponseMatcher
+
+
+class QueueWithoutVersionTest(KikimrSqsTestBase):
+    used_tablets = []
+
+    def chose_tablet(self):
+        response = self.cluster.client.tablet_state(TabletTypes.FLAT_DATASHARD)
+        for info in response.TabletStateInfo:
+            if info.TabletId not in self.used_tablets:
+                self.used_tablets.append(info.TabletId)
+                return info.TabletId
+        assert(False)
+
+    def get_table_path(self, table=None):
+        table_path = f'{self.sqs_root}/{self._username}/{self.queue_name}'
+        if table is not None:
+            table_path += f'/{table}'
+        return table_path
+
+    def init_queue(self, is_fifo):
+        self._init_with_params(is_fifo)
+        queue_tables_path = self.get_table_path()
+        session = self._driver.table_client.session().create()
+
+        now = int(1000*time.time())
+        shards = 1 if is_fifo else 2
+        tablet_id = self.chose_tablet()
+        create_queue_tables(queue_tables_path, is_fifo, self._driver, session, shards)
+
+        queues_table = self.sqs_root + '/.Queues'
+        self._execute_yql_query(
+            f'''UPSERT INTO `{queues_table}`
+                (Account, QueueName, QueueId, QueueState, FifoQueue, DeadLetterQueue, Shards, MasterTabletId, CreatedTimestamp, FolderId, DlqName, Partitions)
+                VALUES
+                ("{self._username}", "{self.queue_name}", "{uuid.uuid1()}", 1, {is_fifo}, false, {shards}, {tablet_id}, {now}, "", "", 1);'''
+        )
+
+        for shard in range(shards):
+            self._execute_yql_query(
+                f'''UPSERT INTO `{self.get_table_path("State")}`
+                (State, MessageCount, InflyCount, ReadOffset, WriteOffset, CreatedTimestamp, LastModifiedTimestamp, CleanupTimestamp, RetentionBoundary)
+                VALUES({shard}, 0, 0, 0, 0, {now}, {now}, {now}, {now});'''
+            )
+
+        self._execute_yql_query(
+            f'''UPSERT INTO `{self.get_table_path("Attributes")}`
+            (State, MessageRetentionPeriod, VisibilityTimeout, MaximumMessageSize, FifoQueue, ContentBasedDeduplication, ReceiveMessageWaitTime, DelaySeconds, MaxReceiveCount)
+            VALUES (0, 345600000, 30000, 262144, {is_fifo}, false, 0, 0, 0);'''
+        )
+
+    @pytest.mark.parametrize(**IS_FIFO_PARAMS)
+    def test_common(self, is_fifo):
+        self.init_queue(is_fifo)
+
+        created_queue_url = self._create_queue_and_assert(self.queue_name, is_fifo=is_fifo)
+        got_queue_url = self._sqs_api.get_queue_url(self.queue_name)
+        assert_that(got_queue_url, equal_to(created_queue_url.decode('utf-8')))
+
+        self.seq_no += 1
+        message_id = self._send_message_and_assert(
+            created_queue_url,
+            self._msg_body_template,
+            seq_no=self.seq_no if is_fifo else None,
+            group_id='group' if is_fifo else None
+        )
+        self._read_messages_and_assert(
+            created_queue_url,
+            messages_count=1,
+            visibility_timeout=1000,
+            matcher=ReadResponseMatcher().with_message_ids([message_id, ])
+        )
+
+        self._sqs_api.get_queue_url(self.queue_name)
+        self._sqs_api.get_queue_attributes(created_queue_url)
+        self._sqs_api.set_queue_attributes(created_queue_url, {'MaximumMessageSize': '1024'})
+
+
+class TestQueueWithoutVersionWithTenant(get_test_with_sqs_tenant_installation(QueueWithoutVersionTest)):
+    pass
+
+
+class TestQueueWithoutVersionWithPath(get_test_with_sqs_installation_by_path(QueueWithoutVersionTest)):
+    pass

+ 75 - 43
ydb/tests/library/sqs/tables.py

@@ -11,21 +11,21 @@ class QueueType(Enum):
     FIFO = 2
 
 
-def get_prefix_path(root, queue_type=None):
-    subdir = ('.{}/'.format(queue_type.name)) if queue_type else ''
+def get_prefix_path(root, queue_type=None, common_table=True):
+    subdir = ('.{}/'.format(queue_type.name)) if queue_type and common_table else ''
     return '{}/{}'.format(root, subdir)
 
 
-def get_table_path(root, table_name, queue_type=None):
-    return '{}/{}'.format(get_prefix_path(root, queue_type), table_name)
+def get_table_path(root, table_name, queue_type=None, common_table=True):
+    return '{}/{}'.format(get_prefix_path(root, queue_type, common_table), table_name)
 
 
-def _create_table(root, session, table_name, columns, keys_count, queue_type=None):
-    table_path = get_table_path(root, table_name, queue_type)
+def _create_table(root, session, table_name, columns, keys_count, common_table=True, queue_type=None):
+    table_path = get_table_path(root, table_name, queue_type, common_table)
     keys = [name for name, _ in columns[:keys_count]]
     columns = [ydb.Column(name, ydb.OptionalType(column_type)) for name, column_type in columns]
 
-    if queue_type:
+    if queue_type and common_table:
         ydb.retry_operation_sync(lambda: session.create_table(
             table_path,
             ydb.TableDescription()
@@ -48,7 +48,9 @@ def _create_table(root, session, table_name, columns, keys_count, queue_type=Non
         ))
 
 
-def get_table_keys_for_queue(with_shard=False):
+def get_table_keys_for_queue(common_table, with_shard=False):
+    if not common_table:
+        return []
     if with_shard:
         columns = [
             ('QueueIdNumberAndShardHash', ydb.PrimitiveType.Uint64),
@@ -137,8 +139,13 @@ def create_removed_queues_table(root, session):
     _create_table(root, session, '.RemovedQueues', columns, keys_count=2)
 
 
-def create_attibutes_table(root, session, queue_type):
-    queue_keys = get_table_keys_for_queue()
+def create_attibutes_table(root, session, queue_type, common_table=True):
+    queue_keys = []
+    if common_table:
+        queue_keys = get_table_keys_for_queue(common_table)
+    else:
+        queue_keys.append(('State', ydb.PrimitiveType.Uint64))
+
     columns = queue_keys + [
         ('ContentBasedDeduplication', ydb.PrimitiveType.Bool),
         ('DelaySeconds', ydb.PrimitiveType.Uint64),
@@ -152,16 +159,20 @@ def create_attibutes_table(root, session, queue_type):
         ('MaxReceiveCount', ydb.PrimitiveType.Uint64),
         ('ShowDetailedCountersDeadline', ydb.PrimitiveType.Uint64),
     ]
-    _create_table(root, session, 'Attributes', columns, len(queue_keys), queue_type)
+    _create_table(root, session, 'Attributes', columns, len(queue_keys), common_table, queue_type)
 
 
-def create_state_table(root, session, queue_type):
-    queue_keys = columns = [
-        ('QueueIdNumberHash', ydb.PrimitiveType.Uint64),
-        ('QueueIdNumber', ydb.PrimitiveType.Uint64),
-    ]
-    if queue_type == QueueType.STD:
-        queue_keys.append(('Shard', ydb.PrimitiveType.Uint32))
+def create_state_table(root, session, queue_type, common_table=True):
+    queue_keys = []
+    if common_table:
+        queue_keys = [
+            ('QueueIdNumberHash', ydb.PrimitiveType.Uint64),
+            ('QueueIdNumber', ydb.PrimitiveType.Uint64),
+        ]
+        if queue_type == QueueType.STD:
+            queue_keys.append(('Shard', ydb.PrimitiveType.Uint32))
+    else:
+        queue_keys.append(('State', ydb.PrimitiveType.Uint64))
 
     columns = queue_keys + [
         ('CleanupTimestamp', ydb.PrimitiveType.Uint64),
@@ -175,12 +186,12 @@ def create_state_table(root, session, queue_type):
         ('CleanupVersion', ydb.PrimitiveType.Uint64),
         ('InflyVersion', ydb.PrimitiveType.Uint64),
     ]
-    _create_table(root, session, 'State', columns, len(queue_keys), queue_type)
+    _create_table(root, session, 'State', columns, len(queue_keys), common_table, queue_type)
 
 
-def create_infly_table(root, session):
+def create_infly_table(root, session, common_table=True):
     queue_type = QueueType.STD
-    queue_keys = get_table_keys_for_queue(with_shard=(queue_type == QueueType.STD))
+    queue_keys = get_table_keys_for_queue(common_table, with_shard=(queue_type == QueueType.STD))
     columns = queue_keys + [
         ('Offset', ydb.PrimitiveType.Uint64),
         ('RandomId', ydb.PrimitiveType.Uint64),
@@ -192,12 +203,12 @@ def create_infly_table(root, session):
         ('VisibilityDeadline', ydb.PrimitiveType.Uint64),
         ('DelayDeadline', ydb.PrimitiveType.Uint64),
     ]
-    _create_table(root, session, 'Infly', columns, len(queue_keys) + 1, queue_type)
+    _create_table(root, session, 'Infly', columns, len(queue_keys) + 1, common_table, queue_type)
 
 
-def create_message_data_table(root, session):
+def create_message_data_table(root, session, common_table=True):
     queue_type = QueueType.STD
-    queue_keys = get_table_keys_for_queue(with_shard=(queue_type == QueueType.STD))
+    queue_keys = get_table_keys_for_queue(common_table, with_shard=(queue_type == QueueType.STD))
     columns = queue_keys + [
         ('RandomId', ydb.PrimitiveType.Uint64),
         ('Offset', ydb.PrimitiveType.Uint64),
@@ -206,11 +217,11 @@ def create_message_data_table(root, session):
         ('MessageId', ydb.PrimitiveType.String),
         ('SenderId', ydb.PrimitiveType.String),
     ]
-    _create_table(root, session, 'MessageData', columns, len(queue_keys) + 2, queue_type)
+    _create_table(root, session, 'MessageData', columns, len(queue_keys) + 2, common_table, queue_type)
 
 
-def create_message_table(root, session, queue_type):
-    queue_keys = get_table_keys_for_queue(with_shard=(queue_type == QueueType.STD))
+def create_message_table(root, session, queue_type, common_table=True):
+    queue_keys = get_table_keys_for_queue(common_table, with_shard=(queue_type == QueueType.STD))
     columns = queue_keys + [
         ('Offset', ydb.PrimitiveType.Uint64),
         ('RandomId', ydb.PrimitiveType.Uint64),
@@ -229,11 +240,11 @@ def create_message_table(root, session, queue_type):
             ('FirstReceiveTimestamp', ydb.PrimitiveType.Uint64),
             ('SentTimestamp', ydb.PrimitiveType.Uint64),
         ]
-    _create_table(root, session, 'Messages', columns, len(queue_keys) + 1, queue_type)
+    _create_table(root, session, 'Messages', columns, len(queue_keys) + 1, common_table, queue_type)
 
 
-def create_sent_timestamp_idx_table(root, session, queue_type):
-    queue_keys = get_table_keys_for_queue(with_shard=(queue_type == QueueType.STD))
+def create_sent_timestamp_idx_table(root, session, queue_type, common_table=True):
+    queue_keys = get_table_keys_for_queue(common_table, with_shard=(queue_type == QueueType.STD))
     columns = queue_keys + [
         ('SentTimestamp', ydb.PrimitiveType.Uint64),
         ('Offset', ydb.PrimitiveType.Uint64),
@@ -242,12 +253,12 @@ def create_sent_timestamp_idx_table(root, session, queue_type):
     ]
     if queue_type == QueueType.FIFO:
         columns.append(('GroupId', ydb.PrimitiveType.String))
-    _create_table(root, session, 'SentTimestampIdx', columns, len(queue_keys) + 2, queue_type)
+    _create_table(root, session, 'SentTimestampIdx', columns, len(queue_keys) + 2, common_table, queue_type)
 
 
-def create_data_table(root, session):
+def create_data_table(root, session, common_table=True):
     queue_type = QueueType.FIFO
-    queue_keys = get_table_keys_for_queue()
+    queue_keys = get_table_keys_for_queue(common_table)
     columns = queue_keys + [
         ("RandomId", ydb.PrimitiveType.Uint64),
         ("Offset", ydb.PrimitiveType.Uint64),
@@ -257,24 +268,24 @@ def create_data_table(root, session):
         ("Data", ydb.PrimitiveType.String),
         ("MessageId", ydb.PrimitiveType.String),
     ]
-    _create_table(root, session, 'Data', columns, len(queue_keys) + 2, queue_type)
+    _create_table(root, session, 'Data', columns, len(queue_keys) + 2, common_table, queue_type)
 
 
-def create_deduplication_table(root, session):
+def create_deduplication_table(root, session, common_table=True):
     queue_type = QueueType.FIFO
-    queue_keys = get_table_keys_for_queue()
+    queue_keys = get_table_keys_for_queue(common_table)
     columns = queue_keys + [
         ("DedupId", ydb.PrimitiveType.String),
         ("Deadline", ydb.PrimitiveType.Uint64),
         ("Offset", ydb.PrimitiveType.Uint64),
         ("MessageId", ydb.PrimitiveType.String),
     ]
-    _create_table(root, session, 'Deduplication', columns, len(queue_keys) + 1, queue_type)
+    _create_table(root, session, 'Deduplication', columns, len(queue_keys) + 1, common_table, queue_type)
 
 
-def create_groups_table(root, session):
+def create_groups_table(root, session, common_table=True):
     queue_type = QueueType.FIFO
-    queue_keys = get_table_keys_for_queue()
+    queue_keys = get_table_keys_for_queue(common_table)
     columns = queue_keys + [
         ("GroupId", ydb.PrimitiveType.String),
         ("VisibilityDeadline", ydb.PrimitiveType.Uint64),
@@ -284,17 +295,17 @@ def create_groups_table(root, session):
         ("ReceiveAttemptId", ydb.PrimitiveType.Utf8),
         ("LockTimestamp", ydb.PrimitiveType.Uint64),
     ]
-    _create_table(root, session, 'Groups', columns, len(queue_keys) + 1, queue_type)
+    _create_table(root, session, 'Groups', columns, len(queue_keys) + 1, common_table, queue_type)
 
 
-def create_reads_table(root, session):
+def create_reads_table(root, session, common_table=True):
     queue_type = QueueType.FIFO
-    queue_keys = get_table_keys_for_queue()
+    queue_keys = get_table_keys_for_queue(common_table)
     columns = queue_keys + [
         ("ReceiveAttemptId", ydb.PrimitiveType.Utf8),
         ("Deadline", ydb.PrimitiveType.Uint64),
     ]
-    _create_table(root, session, 'Reads', columns, len(queue_keys) + 1, queue_type)
+    _create_table(root, session, 'Reads', columns, len(queue_keys) + 1, common_table, queue_type)
 
 
 def create_all_tables(root, driver, session):
@@ -319,3 +330,24 @@ def create_all_tables(root, driver, session):
     create_groups_table(root, session)
     create_deduplication_table(root, session)
     create_data_table(root, session)
+
+
+def create_queue_tables(path, if_fifo, driver, session, shards=None):
+    queue_type = QueueType.FIFO if if_fifo else QueueType.STD
+    create_state_table(path, session, queue_type, common_table=False)
+    create_attibutes_table(path, session, queue_type, common_table=False)
+
+    if queue_type == QueueType.STD:
+        for shard in range(shards):
+            shard_path = f'{path}/{shard}'
+            create_infly_table(shard_path, session, common_table=False)
+            create_message_data_table(shard_path, session, common_table=False)
+            create_sent_timestamp_idx_table(shard_path, session, queue_type, common_table=False)
+            create_message_table(shard_path, session, queue_type, common_table=False)
+    else:
+        create_reads_table(path, session, common_table=False)
+        create_groups_table(path, session, common_table=False)
+        create_deduplication_table(path, session, common_table=False)
+        create_data_table(path, session, common_table=False)
+        create_sent_timestamp_idx_table(path, session, queue_type, common_table=False)
+        create_message_table(path, session, queue_type, common_table=False)