123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684 |
- #!/usr/bin/env python3
- import argparse
- import configparser
- import datetime
- import os
- import posixpath
- import sys
- import traceback
- import time
- import ydb
- import pandas as pd
- from collections import Counter
- from multiprocessing import Pool, cpu_count
- dir = os.path.dirname(__file__)
- config = configparser.ConfigParser()
- config_file_path = f"{dir}/../../config/ydb_qa_db.ini"
- config.read(config_file_path)
- DATABASE_ENDPOINT = config["QA_DB"]["DATABASE_ENDPOINT"]
- DATABASE_PATH = config["QA_DB"]["DATABASE_PATH"]
- def create_tables(pool, table_path):
- print(f"> create table if not exists:'{table_path}'")
- def callee(session):
- session.execute_scheme(
- f"""
- CREATE table IF NOT EXISTS `{table_path}` (
- `test_name` Utf8 NOT NULL,
- `suite_folder` Utf8 NOT NULL,
- `full_name` Utf8 NOT NULL,
- `date_window` Date NOT NULL,
- `build_type` Utf8 NOT NULL,
- `branch` Utf8 NOT NULL,
- `days_ago_window` Uint64 NOT NULL,
- `history` Utf8,
- `history_class` Utf8,
- `pass_count` Uint64,
- `mute_count` Uint64,
- `fail_count` Uint64,
- `skip_count` Uint64,
- `success_rate` Uint64,
- `summary` Utf8,
- `owner` Utf8,
- `is_muted` Uint32,
- `is_test_chunk` Uint32,
- `state` Utf8,
- `previous_state` Utf8,
- `state_change_date` Date,
- `days_in_state` Uint64,
- `previous_state_filtered` Utf8,
- `state_change_date_filtered` Date,
- `days_in_state_filtered` Uint64,
- `state_filtered` Utf8,
- PRIMARY KEY (`test_name`, `suite_folder`, `full_name`,date_window, build_type, branch)
- )
- PARTITION BY HASH(build_type,branch)
- WITH (STORE = COLUMN)
- """
- )
- return pool.retry_operation_sync(callee)
- def bulk_upsert(table_client, table_path, rows):
- print(f"> bulk upsert: {table_path}")
- column_types = (
- ydb.BulkUpsertColumns()
- .add_column("test_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
- .add_column("suite_folder", ydb.OptionalType(ydb.PrimitiveType.Utf8))
- .add_column("build_type", ydb.OptionalType(ydb.PrimitiveType.Utf8))
- .add_column("branch", ydb.OptionalType(ydb.PrimitiveType.Utf8))
- .add_column("full_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
- .add_column("date_window", ydb.OptionalType(ydb.PrimitiveType.Date))
- .add_column("days_ago_window", ydb.OptionalType(ydb.PrimitiveType.Uint64))
- .add_column("history", ydb.OptionalType(ydb.PrimitiveType.Utf8))
- .add_column("history_class", ydb.OptionalType(ydb.PrimitiveType.Utf8))
- .add_column("pass_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
- .add_column("mute_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
- .add_column("fail_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
- .add_column("skip_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
- .add_column("success_rate", ydb.OptionalType(ydb.PrimitiveType.Uint64))
- .add_column("summary", ydb.OptionalType(ydb.PrimitiveType.Utf8))
- .add_column("owner", ydb.OptionalType(ydb.PrimitiveType.Utf8))
- .add_column("is_muted", ydb.OptionalType(ydb.PrimitiveType.Uint32))
- .add_column("is_test_chunk", ydb.OptionalType(ydb.PrimitiveType.Uint32))
- .add_column("state", ydb.OptionalType(ydb.PrimitiveType.Utf8))
- .add_column("previous_state", ydb.OptionalType(ydb.PrimitiveType.Utf8))
- .add_column("state_change_date", ydb.OptionalType(ydb.PrimitiveType.Date))
- .add_column("days_in_state", ydb.OptionalType(ydb.PrimitiveType.Uint64))
- .add_column("previous_state_filtered", ydb.OptionalType(ydb.PrimitiveType.Utf8))
- .add_column("state_change_date_filtered", ydb.OptionalType(ydb.PrimitiveType.Date))
- .add_column("days_in_state_filtered", ydb.OptionalType(ydb.PrimitiveType.Uint64))
- .add_column("state_filtered", ydb.OptionalType(ydb.PrimitiveType.Utf8))
- )
- table_client.bulk_upsert(table_path, rows, column_types)
- def process_test_group(name, group, last_day_data, default_start_date):
- state_list_for_filter = ['Muted', 'Muted Flaky', 'Muted Stable', 'Flaky', 'Passed']
- """Processes data for a single test group (by full_name)."""
- previous_state_list = []
- state_change_date_list = []
- days_in_state_list = []
- previous_state_filtered_list = []
- state_change_date_filtered_list = []
- days_in_state_filtered_list = []
- state_filtered_list = []
- # Get 'days_in_state' for the last existing day for the current test
- if last_day_data is not None and last_day_data[last_day_data['full_name'] == name].shape[0] > 0:
- prev_state = last_day_data[last_day_data['full_name'] == name]['state'].iloc[0]
- prev_date = last_day_data[last_day_data['full_name'] == name]['state_change_date'].iloc[0]
- current_days_in_state = last_day_data[last_day_data['full_name'] == name]['days_in_state'].iloc[0]
- prev_state_filtered = last_day_data[last_day_data['full_name'] == name]['state_filtered'].iloc[0]
- prev_date_filtered = last_day_data[last_day_data['full_name'] == name]['state_change_date_filtered'].iloc[0]
- current_days_in_state_filtered = last_day_data[last_day_data['full_name'] == name][
- 'days_in_state_filtered'
- ].iloc[0]
- saved_prev_state = last_day_data[last_day_data['full_name'] == name]['previous_state'].iloc[0]
- saved_prev_state_filtered = last_day_data[last_day_data['full_name'] == name]['previous_state_filtered'].iloc[0]
- else:
- prev_state = 'no_runs'
- prev_date = datetime.datetime(default_start_date.year, default_start_date.month, default_start_date.day)
- current_days_in_state = 0
- state_filtered = ''
- prev_state_filtered = 'no_runs'
- prev_date_filtered = datetime.datetime(
- default_start_date.year, default_start_date.month, default_start_date.day
- )
- current_days_in_state_filtered = 0
- saved_prev_state = prev_state
- saved_prev_state_filtered = prev_state_filtered
- for index, row in group.iterrows():
- current_days_in_state += 1
- if row['state'] != prev_state:
- saved_prev_state = prev_state
- prev_state = row['state']
- prev_date = row['date_window']
- current_days_in_state = 1
- previous_state_list.append(saved_prev_state)
- state_change_date_list.append(prev_date)
- days_in_state_list.append(current_days_in_state)
- # Process filtered states
- if row['state'] not in state_list_for_filter:
- state_filtered = prev_state_filtered
- else:
- state_filtered = row['state']
- current_days_in_state_filtered += 1
- if state_filtered != prev_state_filtered:
- saved_prev_state_filtered = prev_state_filtered
- prev_state_filtered = state_filtered
- prev_date_filtered = row['date_window']
- current_days_in_state_filtered = 1
- state_filtered_list.append(state_filtered)
- previous_state_filtered_list.append(saved_prev_state_filtered)
- state_change_date_filtered_list.append(prev_date_filtered)
- days_in_state_filtered_list.append(current_days_in_state_filtered)
- return {
- 'previous_state': previous_state_list,
- 'state_change_date': state_change_date_list,
- 'days_in_state': days_in_state_list,
- 'previous_state_filtered': previous_state_filtered_list,
- 'state_change_date_filtered': state_change_date_filtered_list,
- 'days_in_state_filtered': days_in_state_filtered_list,
- 'state_filtered': state_filtered_list,
- }
- def determine_state(row):
- history_class = row['history_class']
- is_muted = row['is_muted']
- if is_muted == 1:
- if 'mute' in history_class:
- return 'Muted Flaky'
- elif 'pass' in history_class:
- return 'Muted Stable'
- elif 'skipped' in history_class or not history_class:
- return 'Skipped'
- else:
- return history_class
- else:
- if 'failure' in history_class and 'mute' not in history_class:
- return 'Flaky'
- elif 'mute' in history_class:
- return 'Muted'
- elif 'skipped' in history_class or not history_class:
- return 'Skipped'
- elif 'pass' in history_class:
- return 'Passed'
- else:
- return history_class
- def calculate_success_rate(row):
- total_count = row['pass_count'] + row['mute_count'] + row['fail_count']
- if total_count == 0:
- return 0.0
- else:
- return (row['pass_count'] / total_count) * 100
- def calculate_summary(row):
- return (
- 'Pass:'
- + str(row['pass_count'])
- + ' Fail:'
- + str(row['fail_count'])
- + ' Mute:'
- + str(row['mute_count'])
- + ' Skip:'
- + str(row['skip_count'])
- )
- def compute_owner(owner):
- if not owner or owner == '':
- return 'Unknown'
- elif ';;' in owner:
- parts = owner.split(';;', 1)
- if 'TEAM' in parts[0]:
- return parts[0]
- else:
- return parts[1]
- else:
- return owner
- def main():
- parser = argparse.ArgumentParser()
- parser.add_argument('--days-window', default=1, type=int, help='how many days back we collecting history')
- parser.add_argument(
- '--build_type',
- choices=['relwithdebinfo', 'release-asan'],
- default='relwithdebinfo',
- type=str,
- help='build : relwithdebinfo or release-asan',
- )
- parser.add_argument('--branch', default='main', choices=['main'], type=str, help='branch')
- parser.add_argument(
- '--concurent',
- dest='concurrent_mode',
- action='store_true',
- default=True,
- help='Set concurrent mode to true (default).',
- )
- parser.add_argument(
- '--no-concurrent', dest='concurrent_mode', action='store_false', help='Set concurrent mode to false.'
- )
- args, unknown = parser.parse_known_args()
- history_for_n_day = args.days_window
- build_type = args.build_type
- branch = args.branch
- concurrent_mode = args.concurrent_mode
- if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ:
- print("Error: Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping")
- return 1
- else:
- os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ[
- "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"
- ]
- with ydb.Driver(
- endpoint=DATABASE_ENDPOINT,
- database=DATABASE_PATH,
- credentials=ydb.credentials_from_env_variables(),
- ) as driver:
- driver.wait(timeout=10, fail_fast=True)
- tc_settings = ydb.TableClientSettings().with_native_date_in_result_sets(enabled=False)
- table_client = ydb.TableClient(driver, tc_settings)
- base_date = datetime.datetime(1970, 1, 1)
- default_start_date = datetime.date(2024, 8, 1)
- today = datetime.date.today()
- table_path = f'test_results/analytics/tests_monitor_test_with_filtered_states'
- # Get last existing day
- print("Geting date of last collected monitor data")
- query_last_exist_day = f"""
- SELECT MAX(date_window) AS last_exist_day
- FROM `{table_path}`
- WHERE build_type = '{build_type}'
- AND branch = '{branch}'
- """
- query = ydb.ScanQuery(query_last_exist_day, {})
- it = driver.table_client.scan_query(query)
- last_exist_day = None
- while True:
- try:
- result = next(it)
- last_exist_day = result.result_set.rows[0][
- 'last_exist_day'
- ] # exclude last day, we want recalculate last day
- break
- except StopIteration:
- break
- except Exception as e:
- print(f"Error during fetching last existing day: {e}")
- last_exist_df = None
- last_day_data = None
- # If no data exists, set last_exist_day to a default start date
- if last_exist_day is None:
- last_exist_day = default_start_date
- last_exist_day_str = last_exist_day.strftime('%Y-%m-%d')
- date_list = [today - datetime.timedelta(days=x) for x in range((today - last_exist_day).days + 1)]
- print(f"Monitor data do not exist - init new monitor collecting from default date {last_exist_day_str}")
- else:
- # Get data from tests_monitor for last existing day
- last_exist_day = (base_date + datetime.timedelta(days=last_exist_day)).date()
- if last_exist_day == today: # to recalculate data for today
- last_exist_day = last_exist_day - datetime.timedelta(days=1)
- last_exist_day_str = last_exist_day.strftime('%Y-%m-%d')
- print(f"Monitor data exist - geting data for date {last_exist_day_str}")
- date_list = [today - datetime.timedelta(days=x) for x in range((today - last_exist_day).days)]
- query_last_exist_data = f"""
- SELECT *
- FROM `{table_path}`
- WHERE build_type = '{build_type}'
- AND branch = '{branch}'
- AND date_window = Date('{last_exist_day_str}')
- """
- query = ydb.ScanQuery(query_last_exist_data, {})
- it = driver.table_client.scan_query(query)
- last_exist_data = []
- while True:
- try:
- result = next(it)
- # Convert each row to a dictionary with consistent keys
- for row in result.result_set.rows:
- row_dict = {
- 'test_name': row['test_name'],
- 'suite_folder': row['suite_folder'],
- 'full_name': row['full_name'],
- 'date_window': base_date + datetime.timedelta(days=row['date_window']),
- 'build_type': row['build_type'],
- 'branch': row['branch'],
- 'days_ago_window': row['days_ago_window'],
- 'history': row['history'],
- 'history_class': row['history_class'],
- 'pass_count': row['pass_count'],
- 'mute_count': row['mute_count'],
- 'fail_count': row['fail_count'],
- 'skip_count': row['skip_count'],
- 'success_rate': row['success_rate'],
- 'summary': row['summary'],
- 'owners': row['owner'],
- 'is_muted': row['is_muted'],
- 'is_test_chunk': row['is_test_chunk'],
- 'state': row['state'],
- 'previous_state': row['previous_state'],
- 'state_change_date': base_date + datetime.timedelta(days=row['state_change_date']),
- 'days_in_state': row['days_in_state'],
- 'previous_state_filtered': row['previous_state_filtered'],
- 'state_change_date_filtered': base_date
- + datetime.timedelta(days=row['state_change_date_filtered']),
- 'days_in_state_filtered': row['days_in_state_filtered'],
- 'state_filtered': row['state_filtered'],
- }
- last_exist_data.append(row_dict)
- except StopIteration:
- break
- last_exist_df = pd.DataFrame(last_exist_data)
- # Get data from flaky_tests_window table for dates after last existing day
- data = {
- 'test_name': [],
- 'suite_folder': [],
- 'full_name': [],
- 'date_window': [],
- 'build_type': [],
- 'branch': [],
- 'owners': [],
- 'days_ago_window': [],
- 'history': [],
- 'history_class': [],
- 'pass_count': [],
- 'mute_count': [],
- 'fail_count': [],
- 'skip_count': [],
- 'is_muted': [],
- }
- print(f'Getting aggregated history in window {history_for_n_day} days')
- for date in sorted(date_list):
- # Query for data from flaky_tests_window with date_window >= last_existing_day
- query_get_history = f"""
- SELECT
- hist.branch AS branch,
- hist.build_type AS build_type,
- hist.date_window AS date_window,
- hist.days_ago_window AS days_ago_window,
- hist.fail_count AS fail_count,
- hist.full_name AS full_name,
- hist.history AS history,
- hist.history_class AS history_class,
- hist.mute_count AS mute_count,
- COALESCE(owners_t.owners, fallback_t.owners) AS owners,
- hist.pass_count AS pass_count,
- COALESCE(owners_t.run_timestamp_last, NULL) AS run_timestamp_last,
- COALESCE(owners_t.is_muted, NULL) AS is_muted,
- hist.skip_count AS skip_count,
- hist.suite_folder AS suite_folder,
- hist.test_name AS test_name
- FROM (
- SELECT * FROM
- `test_results/analytics/flaky_tests_window_{history_for_n_day}_days`
- WHERE
- date_window = Date('{date}')
- AND build_type = '{build_type}'
- AND branch = '{branch}'
- ) AS hist
- LEFT JOIN (
- SELECT
- test_name,
- suite_folder,
- owners,
- run_timestamp_last,
- is_muted,
- date
- FROM
- `test_results/all_tests_with_owner_and_mute`
- WHERE
- branch = '{branch}'
- AND date = Date('{date}')
- ) AS owners_t
- ON
- hist.test_name = owners_t.test_name
- AND hist.suite_folder = owners_t.suite_folder
- AND hist.date_window = owners_t.date
- LEFT JOIN (
- SELECT
- test_name,
- suite_folder,
- owners
- FROM
- `test_results/analytics/testowners`
- ) AS fallback_t
- ON
- hist.test_name = fallback_t.test_name
- AND hist.suite_folder = fallback_t.suite_folder
- WHERE
- owners_t.test_name IS NOT NULL OR fallback_t.test_name IS NOT NULL;
- """
- query = ydb.ScanQuery(query_get_history, {})
- # start transaction time
- start_time = time.time()
- it = driver.table_client.scan_query(query)
- # end transaction time
- results = []
- prepared_for_update_rows = []
- while True:
- try:
- result = next(it)
- results = results + result.result_set.rows
- except StopIteration:
- break
- end_time = time.time()
- print(f'Captured raw data for {date} duration: {end_time - start_time}')
- start_time = time.time()
- # Check if new data was found
- if results:
- for row in results:
- data['test_name'].append(row['test_name'])
- data['suite_folder'].append(row['suite_folder'])
- data['full_name'].append(row['full_name'])
- data['date_window'].append(base_date + datetime.timedelta(days=row['date_window']))
- data['build_type'].append(row['build_type'])
- data['branch'].append(row['branch'])
- data['owners'].append(row['owners'])
- data['days_ago_window'].append(row['days_ago_window'])
- data['history'].append(
- row['history'].decode('utf-8') if isinstance(row['history'], bytes) else row['history']
- )
- data['history_class'].append(
- row['history_class'].decode('utf-8')
- if isinstance(row['history_class'], bytes)
- else row['history_class']
- )
- data['pass_count'].append(row['pass_count'])
- data['mute_count'].append(row['mute_count'])
- data['fail_count'].append(row['fail_count'])
- data['skip_count'].append(row['skip_count'])
- data['is_muted'].append(row['is_muted'])
- else:
- print(
- f"Warning: No data found in flaky_tests_window for date {date} build_type='{build_type}', branch='{branch}'"
- )
- start_time = time.time()
- df = pd.DataFrame(data)
- # **Concatenate DataFrames**
- if last_exist_df is not None and last_exist_df.shape[0] > 0:
- last_day_data = last_exist_df[
- [
- 'full_name',
- 'days_in_state',
- 'state',
- 'previous_state',
- 'state_change_date',
- 'days_in_state_filtered',
- 'state_change_date_filtered',
- 'previous_state_filtered',
- 'state_filtered',
- ]
- ]
- end_time = time.time()
- print(f'Dataframe inited: {end_time - start_time}')
- tart_time = time.time()
- df = df.sort_values(by=['full_name', 'date_window'])
- end_time = time.time()
- print(f'Dataframe sorted: {end_time - start_time}')
- start_time = time.time()
- df['success_rate'] = df.apply(calculate_success_rate, axis=1).astype(int)
- df['summary'] = df.apply(calculate_summary, axis=1)
- df['owner'] = df['owners'].apply(compute_owner)
- df['is_test_chunk'] = df['full_name'].str.contains('chunk chunk|chunk\+chunk', regex=True).astype(int)
- df['is_muted'] = df['is_muted'].fillna(0).astype(int)
- df['success_rate'].astype(int)
- df['state'] = df.apply(determine_state, axis=1)
- end_time = time.time()
- print(f'Computed base params: {end_time - start_time}')
- start_time = time.time()
- if concurrent_mode:
- with Pool(processes=cpu_count()) as pool:
- results = pool.starmap(
- process_test_group,
- [(name, group, last_day_data, default_start_date) for name, group in df.groupby('full_name')],
- )
- end_time = time.time()
- print(
- f'Computed days_in_state, state_change_date, previous_state and other params: {end_time - start_time}'
- )
- start_time = time.time()
- # Apply results to the DataFrame
- for i, (name, group) in enumerate(df.groupby('full_name')):
- df.loc[group.index, 'previous_state'] = results[i]['previous_state']
- df.loc[group.index, 'state_change_date'] = results[i]['state_change_date']
- df.loc[group.index, 'days_in_state'] = results[i]['days_in_state']
- df.loc[group.index, 'previous_state_filtered'] = results[i]['previous_state_filtered']
- df.loc[group.index, 'state_change_date_filtered'] = results[i]['state_change_date_filtered']
- df.loc[group.index, 'days_in_state_filtered'] = results[i]['days_in_state_filtered']
- df.loc[group.index, 'state_filtered'] = results[i]['state_filtered']
- else:
- previous_state_list = []
- state_change_date_list = []
- days_in_state_list = []
- previous_state_filtered_list = []
- state_change_date_filtered_list = []
- days_in_state_filtered_list = []
- state_filtered_list = []
- for name, group in df.groupby('full_name'):
- result = process_test_group(name, group, last_day_data, default_start_date)
- previous_state_list = previous_state_list + result['previous_state']
- state_change_date_list = state_change_date_list + result['state_change_date']
- days_in_state_list = days_in_state_list + result['days_in_state']
- previous_state_filtered_list = previous_state_filtered_list + result['previous_state_filtered']
- state_change_date_filtered_list = state_change_date_filtered_list + result['state_change_date_filtered']
- days_in_state_filtered_list = days_in_state_filtered_list + result['days_in_state_filtered']
- state_filtered_list = state_filtered_list + result['state_filtered']
- end_time = time.time()
- print(
- f'Computed days_in_state, state_change_date, previous_state and other params: {end_time - start_time}'
- )
- start_time = time.time()
- # Apply results to the DataFrame
- df['previous_state'] = previous_state_list
- df['state_change_date'] = state_change_date_list
- df['days_in_state'] = days_in_state_list
- df['previous_state_filtered'] = previous_state_filtered_list
- df['state_change_date_filtered'] = state_change_date_filtered_list
- df['days_in_state_filtered'] = days_in_state_filtered_list
- df['state_filtered'] = state_filtered_list
- end_time = time.time()
- print(f'Saving computed result in dataframe: {end_time - start_time}')
- start_time = time.time()
- df['state_change_date'] = df['state_change_date'].dt.date
- df['date_window'] = df['date_window'].dt.date
- df['days_in_state'] = df['days_in_state'].astype(int)
- df['state_change_date_filtered'] = df['state_change_date_filtered'].dt.date
- df['days_in_state_filtered'] = df['days_in_state_filtered'].astype(int)
- end_time = time.time()
- print(f'Converting types of columns: {end_time - start_time}')
- start_time = time.time()
- result = df[
- [
- 'full_name',
- 'date_window',
- 'suite_folder',
- 'test_name',
- 'days_ago_window',
- 'build_type',
- 'branch',
- 'history',
- 'history_class',
- 'pass_count',
- 'mute_count',
- 'fail_count',
- 'skip_count',
- 'summary',
- 'owner',
- 'is_test_chunk',
- 'is_muted',
- 'state',
- 'previous_state',
- 'state_change_date',
- 'days_in_state',
- 'previous_state_filtered',
- 'state_change_date_filtered',
- 'days_in_state_filtered',
- 'state_filtered',
- 'success_rate',
- ]
- ]
- end_time = time.time()
- print(f'Dataframe prepared {end_time - start_time}')
- print(f'Data collected, {len(result)} rows')
- start_time = time.time()
- prepared_for_update_rows = result.to_dict('records')
- end_time = time.time()
- print(f'Data converted to dict for upsert: {end_time - start_time}')
- start_upsert_time = time.time()
- with ydb.SessionPool(driver) as pool:
- create_tables(pool, table_path)
- full_path = posixpath.join(DATABASE_PATH, table_path)
- def chunk_data(data, chunk_size):
- for i in range(0, len(data), chunk_size):
- yield data[i : i + chunk_size]
- chunk_size = 40000
- total_chunks = len(prepared_for_update_rows) // chunk_size + (
- 1 if len(prepared_for_update_rows) % chunk_size != 0 else 0
- )
- for i, chunk in enumerate(chunk_data(prepared_for_update_rows, chunk_size), start=1):
- start_time = time.time()
- print(f"Uploading chunk {i}/{total_chunks}")
- bulk_upsert(driver.table_client, full_path, chunk)
- end_time = time.time()
- print(f'upsert for: {end_time - start_time} ')
- end_time = time.time()
- print(f'monitor data upserted: {end_time - start_upsert_time}')
- if __name__ == "__main__":
- main()
|