Browse Source

Fix bulk upsert rewrite results asan/relwithdevinfo in analytics (#9521)

Kirill Rysin 5 months ago

+ 109 - 92

@@ -32,6 +32,9 @@ def create_tables(pool,  table_path):
                 `date_window` Date NOT NULL,
                 `build_type` Utf8 NOT NULL,
                 `branch` Utf8 NOT NULL,
+                `first_run` Timestamp,
+                `last_run` Timestamp ,
+                `owners` Utf8 ,
                 `days_ago_window` Uint64 NOT NULL,
                 `history` String,
                 `history_class` String,
@@ -39,9 +42,9 @@ def create_tables(pool,  table_path):
                 `mute_count` Uint64,
                 `fail_count` Uint64,
                 `skip_count` Uint64,
-                PRIMARY KEY (`test_name`, `suite_folder`, `full_name`,date_window)
+                PRIMARY KEY (`test_name`, `suite_folder`, `full_name`,date_window, build_type, branch)
-                PARTITION BY HASH(`full_name`)
+                PARTITION BY HASH(`full_name`,build_type,branch)
                 WITH (STORE = COLUMN)
@@ -56,6 +59,8 @@ def bulk_upsert(table_client, table_path, rows):
         .add_column("suite_folder", ydb.OptionalType(ydb.PrimitiveType.Utf8))
         .add_column("build_type", ydb.OptionalType(ydb.PrimitiveType.Utf8))
         .add_column("branch", ydb.OptionalType(ydb.PrimitiveType.Utf8))
+        .add_column("first_run", ydb.OptionalType(ydb.PrimitiveType.Timestamp))
+        .add_column("last_run", ydb.OptionalType(ydb.PrimitiveType.Timestamp))
         .add_column("full_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
         .add_column("date_window", ydb.OptionalType(ydb.PrimitiveType.Date))
         .add_column("days_ago_window", ydb.OptionalType(ydb.PrimitiveType.Uint64))
@@ -71,7 +76,8 @@ def bulk_upsert(table_client, table_path, rows):
 def main():
     parser = argparse.ArgumentParser()
-    parser.add_argument('--days-window', default=5, type=int, help='how many days back we collecting history')
+    parser.add_argument('--days-window', default=1, type=int, help='how many days back we collecting history')
     parser.add_argument('--build_type',choices=['relwithdebinfo', 'release-asan'], default='relwithdebinfo', type=str, help='build : relwithdebinfo or release-asan')
     parser.add_argument('--branch', default='main',choices=['main'], type=str, help='branch')
@@ -108,14 +114,15 @@ def main():
         tc_settings = ydb.TableClientSettings().with_native_date_in_result_sets(enabled=True)
         table_client = ydb.TableClient(driver, tc_settings)
-        table_path = f'test_results/analytics/flaky_tests_history_{history_for_n_day}_days'
-        default_start_date =, 7, 1)
+        table_path = f'test_results/analytics/flaky_tests_window_{history_for_n_day}_days'
+        default_start_date =, 9, 1)
         with ydb.SessionPool(driver) as pool:
             create_tables(pool, table_path)
         # geting last date from history
-        last_date_query = f"select max(date_window) as max_date_window from `{table_path}`"
+        last_date_query = f"""select max(date_window) as max_date_window from `{table_path}`
+            where build_type = '{build_type}' and branch = '{branch}'"""
         query = ydb.ScanQuery(last_date_query, {})
         it = table_client.scan_query(query)
         results = []
@@ -126,119 +133,129 @@ def main():
             except StopIteration:
-        if results[0] and results[0].get( 'max_date_window', default_start_date) is not None:
-            last_date = results[0].get(
-                'max_date_window', default_start_date).strftime('%Y-%m-%d')
+        if results[0] and results[0].get( 'max_date_window', default_start_date) is not None and results[0].get( 'max_date_window', default_start_date) > default_start_date:
+            last_datetime = results[0].get(
+                'max_date_window', default_start_date)
-            last_date = default_start_date.strftime('%Y-%m-%d')
+            last_datetime = default_start_date
+        last_date = last_datetime.strftime('%Y-%m-%d')
         print(f'last hisotry date: {last_date}')
         # getting history for dates >= last_date
-        query_get_history = f"""
-    select
-        full_name,
-        date_base,
-        history_list,
-        dist_hist,
-        suite_folder,
-        test_name,
-        '{build_type}' as  build_type,
-        '{branch}' as  branch
-    from (
+        today =
+        date_list = [today - datetime.timedelta(days=x) for x in range((today - last_datetime).days+1)]
+        for date in sorted(date_list):
+            query_get_history = f"""
-            AGG_LIST(status) as history_list ,
-            String::JoinFromList( AGG_LIST_DISTINCT(status) ,',') as dist_hist,
+            history_list,
+            if(dist_hist = '','no_runs',dist_hist) as dist_hist,
-            test_name
+            test_name,
+            build_type,
+            branch,
+            owners,
+            first_run,
+            last_run
         from (
-            select * from (
+            select
+                full_name,
+                date_base,
+                AGG_LIST(status) as history_list ,
+                String::JoinFromList( ListSort(AGG_LIST_DISTINCT(status)) ,',') as dist_hist,
+                suite_folder,
+                test_name,
+                owners,
+                build_type,
+                branch,
+                min(run_timestamp) as first_run,
+                max(run_timestamp) as last_run
+            from (
                 select * from (
-                    select  DISTINCT
+                    select distinct
-                        test_name
+                        test_name,
+                        owners,
+                        Date('{date}') as date_base,
+                        '{build_type}' as  build_type,
+                        '{branch}' as  branch
                     from  `test_results/analytics/testowners` 
-                    where  run_timestamp_last >= Date('{last_date}') - 3*Interval("P1D") 
-                ) as all_tests
-                cross join (
-                    select 
-                        DISTINCT DateTime::MakeDate(run_timestamp) as date_base
-                    from  `test_results/test_runs_column`
-                    where
-                        (job_name ='Nightly-run' or job_name ='Postcommit_relwithdebinfo' or job_name ='Postcommit_asan')
-                        and run_timestamp>= Date('{last_date}')
-                    ) as date_list
                 ) as test_and_date
-            left JOIN (
-                select * from (
+                left JOIN (
                         suite_folder || '/' || test_name as full_name,
                     from  `test_results/test_runs_column`
-                        run_timestamp >= Date('{last_date}') -{history_for_n_day}*Interval("P1D") 
+                        run_timestamp <= Date('{date}') + Interval("P1D")
+                        and run_timestamp >= Date('{date}') - {history_for_n_day}*Interval("P1D") 
                         and (job_name ='Nightly-run' or job_name ='Postcommit_relwithdebinfo' or job_name ='Postcommit_asan')
                         and build_type = '{build_type}'
                         and branch = '{branch}'
                     order by full_name,run_timestamp desc
-                )
-            ) as hist
-            ON test_and_date.full_name=hist.full_name
-            where
-                hist.run_timestamp >= test_and_date.date_base -{history_for_n_day}*Interval("P1D") AND
-                hist.run_timestamp < test_and_date.date_base + Interval("P1D")
+                ) as hist
+                ON test_and_date.full_name=hist.full_name
+            )
+            GROUP BY full_name,suite_folder,test_name,date_base,build_type,branch,owners
-        GROUP BY full_name,suite_folder,test_name,date_base
-    )
-        """
-        query = ydb.ScanQuery(query_get_history, {})
-        # start transaction time
-        start_time = time.time()
-        it = driver.table_client.scan_query(query)
-        # end transaction time
+            """
+            query = ydb.ScanQuery(query_get_history, {})
+            # start transaction time
+            start_time = time.time()
+            it = driver.table_client.scan_query(query)
+            # end transaction time
-        results = []
-        prepared_for_update_rows = []
-        while True:
-            try:
-                result = next(it)
-                results = results + result.result_set.rows
-            except StopIteration:
-                break
-        end_time = time.time()
-        print(f'transaction duration: {end_time - start_time}')
-        print(f'history data captured, {len(results)} rows')
-        for row in results:
-            row['count'] = dict(zip(list(row['history_list']), [list(
-                row['history_list']).count(i) for i in list(row['history_list'])]))                
-            prepared_for_update_rows.append({
-                'suite_folder': row['suite_folder'],
-                'test_name': row['test_name'],
-                'full_name': row['full_name'],
-                'date_window': row['date_base'],
-                'days_ago_window': history_for_n_day,
-                'build_type': row['build_type'],
-                'branch': row['branch'],
-                'history': ','.join(row['history_list']).encode('utf8'),
-                'history_class': row['dist_hist'],
-                'pass_count': row['count'].get('passed', 0),
-                'mute_count': row['count'].get('mute', 0),
-                'fail_count': row['count'].get('failure', 0),
-                'skip_count': row['count'].get('skipped', 0),
-            })
-        print('upserting history')
-        with ydb.SessionPool(driver) as pool:
+            results = []
+            prepared_for_update_rows = []
+            while True:
+                try:
+                    result = next(it)
+                    results = results + result.result_set.rows
+                except StopIteration:
+                    break
+            end_time = time.time()
+            print(f'transaction duration: {end_time - start_time}')
-            create_tables(pool, table_path)
-            full_path = posixpath.join(DATABASE_PATH, table_path)
-            bulk_upsert(driver.table_client, full_path,
-                        prepared_for_update_rows)
+            print(f'history data captured, {len(results)} rows')
+            for row in results:
+                row['count'] = dict(zip(list(row['history_list']), [list(
+                    row['history_list']).count(i) for i in list(row['history_list'])]))   
+                prepared_for_update_rows.append({
+                    'suite_folder': row['suite_folder'],
+                    'test_name': row['test_name'],
+                    'full_name': row['full_name'],
+                    'date_window': row['date_base'],
+                    'days_ago_window': history_for_n_day,
+                    'build_type': row['build_type'],
+                    'branch': row['branch'],
+                    'first_run': row['first_run'],
+                    'last_run': row['last_run'],
+                    'history': ','.join(row['history_list']).encode('utf8'),
+                    'history_class': row['dist_hist'],
+                    'pass_count': row['count'].get('passed', 0),
+                    'mute_count': row['count'].get('mute', 0),
+                    'fail_count': row['count'].get('failure', 0),
+                    'skip_count': row['count'].get('skipped', 0),
+                })
+            print(f'upserting history for date {date}')
+            with ydb.SessionPool(driver) as pool:
+                create_tables(pool, table_path)
+                full_path = posixpath.join(DATABASE_PATH, table_path)
+                bulk_upsert(driver.table_client, full_path,
+                            prepared_for_update_rows)
         print('history updated')

+ 1 - 0

@@ -185,6 +185,7 @@ def main():
                                     '{branch}' as  branch
                                 from  `test_results/analytics/testowners` as t1
                                 where  run_timestamp_last >= Date('{date}') - 3*Interval("P1D") 
+                                and run_timestamp_last <= Date('{date}') + Interval("P1D")
                             ) as test_and_date
                         left JOIN (
                             select * from (

+ 0 - 4

@@ -29,10 +29,6 @@ jobs:
         python3 -m pip install ydb ydb[yc] codeowners
     - name: Collect testowners 
       run: python3 .github/scripts/analytics/
-    - name: Collect test history data with window 5 days relwithdebinfo for main
-      run: python3 .github/scripts/analytics/ --days-window=5
-    - name: Collect test history data with window 5 days release-asan for main
-      run: python3 .github/scripts/analytics/ --days-window=5 --build_type=release-asan
     - name: Collect test history data with window 1 days relwithdebinfo for main
       run: python3 .github/scripts/analytics/ --days-window=1
     - name: Collect test history data with window 1 days release-asan for main