123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- #!/usr/bin/env python3
- import argparse
- import configparser
- import datetime
- import os
- import posixpath
- import traceback
- import time
- import ydb
- from collections import Counter
- dir = os.path.dirname(__file__)
- config = configparser.ConfigParser()
- config_file_path = f"{dir}/../../config/ydb_qa_db.ini"
- config.read(config_file_path)
- DATABASE_ENDPOINT = config["QA_DB"]["DATABASE_ENDPOINT"]
- DATABASE_PATH = config["QA_DB"]["DATABASE_PATH"]
- def create_tables(pool, table_path):
- print(f"> create table if not exists:'{table_path}'")
- def callee(session):
- session.execute_scheme(f"""
- CREATE table IF NOT EXISTS `{table_path}` (
- `test_name` Utf8 NOT NULL,
- `suite_folder` Utf8 NOT NULL,
- `full_name` Utf8 NOT NULL,
- `run_timestamp_last` Timestamp NOT NULL,
- `owners` Utf8 ,
- PRIMARY KEY (`test_name`, `suite_folder`, `full_name`)
- )
- PARTITION BY HASH(suite_folder,`full_name`)
- WITH (STORE = COLUMN)
- """)
- return pool.retry_operation_sync(callee)
- def bulk_upsert(table_client, table_path, rows):
- print(f"> bulk upsert: {table_path}")
- column_types = (
- ydb.BulkUpsertColumns()
- .add_column("test_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
- .add_column("suite_folder", ydb.OptionalType(ydb.PrimitiveType.Utf8))
- .add_column("full_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
- .add_column("run_timestamp_last", ydb.OptionalType(ydb.PrimitiveType.Timestamp))
- .add_column("owners", ydb.OptionalType(ydb.PrimitiveType.Utf8))
- )
- table_client.bulk_upsert(table_path, rows, column_types)
- def main():
- if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ:
- print(
- "Error: Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping"
- )
- return 1
- else:
- # Do not set up 'real' variable from gh workflows because it interfere with ydb tests
- # So, set up it locally
- os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ[
- "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"
- ]
- with ydb.Driver(
- endpoint=DATABASE_ENDPOINT,
- database=DATABASE_PATH,
- credentials=ydb.credentials_from_env_variables(),
- ) as driver:
- driver.wait(timeout=10, fail_fast=True)
- session = ydb.retry_operation_sync(
- lambda: driver.table_client.session().create()
- )
-
- # settings, paths, consts
- tc_settings = ydb.TableClientSettings().with_native_date_in_result_sets(enabled=True)
- table_client = ydb.TableClient(driver, tc_settings)
-
- table_path = f'test_results/analytics/testowners'
- query_get_owners = f"""
- select
- DISTINCT test_name,
- suite_folder,
- suite_folder || '/' || test_name as full_name,
- FIRST_VALUE(owners) OVER w AS owners,
- FIRST_VALUE (run_timestamp) OVER w AS run_timestamp_last
- FROM
- `test_results/test_runs_column`
- WHERE
- run_timestamp >= CurrentUtcDate()- Interval("P10D")
- AND branch = 'main'
- and job_name in (
- 'Nightly-run', 'Postcommit_relwithdebinfo',
- 'Postcommit_asan'
- )
- WINDOW w AS (
- PARTITION BY test_name,
- suite_folder
- ORDER BY
- run_timestamp DESC
- )
- order by
- run_timestamp_last desc
-
- """
- query = ydb.ScanQuery(query_get_owners, {})
- # start transaction time
- start_time = time.time()
- it = driver.table_client.scan_query(query)
- # end transaction time
- results = []
- test_list = []
- while True:
- try:
- result = next(it)
- results = results + result.result_set.rows
- except StopIteration:
- break
- end_time = time.time()
- print(f'transaction duration: {end_time - start_time}')
- print(f'testowners data captured, {len(results)} rows')
- for row in results:
- test_list.append({
- 'suite_folder': row['suite_folder'],
- 'test_name': row['test_name'],
- 'full_name': row['full_name'],
- 'owners': row['owners'],
- 'run_timestamp_last': row['run_timestamp_last'],
- })
- print('upserting testowners')
- with ydb.SessionPool(driver) as pool:
- create_tables(pool, table_path)
- full_path = posixpath.join(DATABASE_PATH, table_path)
- bulk_upsert(driver.table_client, full_path,
- test_list)
- print('testowners updated')
- if __name__ == "__main__":
- main()
|