@@ -4,8 +4,10 @@
import boto3
import logging
import pytest
+import time
import ydb.public.api.protos.draft.fq_pb2 as fq
import ydb.public.api.protos.ydb_value_pb2 as ydb
+import ydb.tests.library.common.yatest_common as yatest_common
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1, yq_all
@@ -215,6 +217,143 @@ Pear,15,33'''
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
+ @yq_v1
+ @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
+ def test_checkpoints_on_join_s3_with_yds(self, kikimr, s3, client):
+ # Prepare S3
+ resource = boto3.resource(
+ "s3",
+ endpoint_url=s3.s3_url,
+ aws_access_key_id="key",
+ aws_secret_access_key="secret_key"
+ )
+ s3_client = boto3.client(
+ "s3",
+ endpoint_url=s3.s3_url,
+ aws_access_key_id="key",
+ aws_secret_access_key="secret_key"
+ )
+ bucket_name = "join_s3_with_yds"
+ bucket = resource.Bucket(bucket_name)
+ bucket.create(ACL='public-read')
+ bucket.objects.all().delete()
+ def put_kv(k, v):
+ json = '{}"key": {}, "value": "{}"{}'.format("{", k, v, "}")
+ s3_client.put_object(Body=json, Bucket=bucket_name, Key='a/b/c/{}.json'.format(k), ContentType='text/json')
+ put_kv(1, "one")
+ put_kv(2, "two")
+ put_kv(3, "three")
+ kikimr.control_plane.wait_bootstrap(1)
+ client.create_storage_connection("s3_dict", bucket_name)
+ # Prepare YDS
+ self.init_topics("yds_dict")
+ client.create_yds_connection(name="yds", database_id="FakeDatabaseId")
+ # Run query
+ sql = R'''
+ PRAGMA dq.MaxTasksPerStage="2";
+ $s3_dict_raw =
+ SELECT cast(Data AS json) AS data
+ FROM s3_dict.`*`
+ WITH (format=raw, SCHEMA (
+ Data String NOT NULL
+ ));
+ $s3_dict =
+ cast(JSON_VALUE(data, '$.key') AS int64) AS key,
+ cast(JSON_VALUE(data, '$.value') AS String) AS value
+ FROM $s3_dict_raw;
+ $parsed_yson_topic =
+ Yson::LookupInt64(yson_data, "key") AS key,
+ Yson::LookupString(yson_data, "val") AS val
+ FROM (
+ Yson::Parse(Data) AS yson_data
+ FROM yds.`{input_topic}` WITH SCHEMA (Data String NOT NULL));
+ $joined_seq =
+ s3_dict.value AS num,
+ yds_seq.val AS word
+ FROM $parsed_yson_topic AS yds_seq
+ INNER JOIN $s3_dict AS s3_dict
+ ON yds_seq.key = s3_dict.key;
+ INSERT INTO yds.`{output_topic}`
+ Yson::SerializeText(Yson::From(TableRow()))
+ FROM $joined_seq;
+ '''\
+ .format(
+ input_topic=self.input_topic,
+ output_topic=self.output_topic,
+ )
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
+ kikimr.control_plane.wait_zero_checkpoint(query_id)
+ yds_data = [
+ '{"key" = 1; "val" = "January";}',
+ '{"key" = 2; "val" = "February";}',
+ '{"key" = 3; "val" = "March";}',
+ '{"key" = 1; "val" = "Monday";}',
+ '{"key" = 2; "val" = "Tuesday";}',
+ '{"key" = 3; "val" = "Wednesday";}',
+ '{"key" = 1; "val" = "Gold";}',
+ '{"key" = 2; "val" = "Silver";}',
+ '{"key" = 3; "val" = "Bronze";}',
+ ]
+ self.write_stream(yds_data)
+ expected = [
+ '{"num" = "one"; "word" = "January"}',
+ '{"num" = "two"; "word" = "February"}',
+ '{"num" = "three"; "word" = "March"}',
+ '{"num" = "one"; "word" = "Monday"}',
+ '{"num" = "two"; "word" = "Tuesday"}',
+ '{"num" = "three"; "word" = "Wednesday"}',
+ '{"num" = "one"; "word" = "Gold"}',
+ '{"num" = "two"; "word" = "Silver"}',
+ '{"num" = "three"; "word" = "Bronze"}',
+ ]
+ assert self.read_stream(len(expected)) == expected
+ # Check that checkpointing is finished
+ def wait_checkpoints(require_query_is_on=False):
+ deadline = time.time() + yatest_common.plain_or_under_sanitizer(300, 900)
+ while True:
+ completed = kikimr.control_plane.get_completed_checkpoints(query_id, require_query_is_on)
+ if completed >= 3:
+ break
+ assert time.time() < deadline, "Completed: {}".format(completed)
+ time.sleep(yatest_common.plain_or_under_sanitizer(0.5, 2))
+ logging.debug("Wait checkpoints")
+ wait_checkpoints(True)
+ logging.debug("Wait checkpoints success")
+ kikimr.control_plane.kikimr_cluster.nodes[1].stop()
+ kikimr.control_plane.kikimr_cluster.nodes[1].start()
+ kikimr.control_plane.wait_bootstrap(1)
+ logging.debug("Wait checkpoints after restore")
+ wait_checkpoints(False)
+ logging.debug("Wait checkpoints after restore success")
+ client.abort_query(query_id)
+ client.wait_query(query_id)
@yq_v1 # v2 compute with multiple nodes is not supported yet
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
@pytest.mark.parametrize("kikimr", [{"compute": 3}], indirect=True)