Browse Source

Rollback ML transaction on failure. (#15013)

vkalintiris 1 year ago
parent
commit
d631ee80ed
1 changed files with 40 additions and 18 deletions
  1. 40 18
      ml/ml.cc

+ 40 - 18
ml/ml.cc

@@ -490,12 +490,16 @@ ml_dimension_add_model(const uuid_t *metric_uuid, const ml_kmeans_t *km)
     }
 
     rc = execute_insert(res);
-    if (unlikely(rc != SQLITE_DONE))
+    if (unlikely(rc != SQLITE_DONE)) {
         error_report("Failed to store model, rc = %d", rc);
+        return rc;
+    }
 
     rc = sqlite3_reset(res);
-    if (unlikely(rc != SQLITE_OK))
+    if (unlikely(rc != SQLITE_OK)) {
         error_report("Failed to reset statement when storing model, rc = %d", rc);
+        return rc;
+    }
 
     return 0;
 
@@ -504,7 +508,7 @@ bind_fail:
     rc = sqlite3_reset(res);
     if (unlikely(rc != SQLITE_OK))
         error_report("Failed to reset statement to store model, rc = %d", rc);
-    return 1;
+    return rc;
 }
 
 static int
@@ -523,7 +527,7 @@ ml_dimension_delete_models(const uuid_t *metric_uuid, time_t before)
         rc = prepare_statement(db, db_models_delete, &res);
         if (unlikely(rc != SQLITE_OK)) {
             error_report("Failed to prepare statement to delete models, rc = %d", rc);
-            return 1;
+            return rc;
         }
     }
 
@@ -536,12 +540,16 @@ ml_dimension_delete_models(const uuid_t *metric_uuid, time_t before)
         goto bind_fail;
 
     rc = execute_insert(res);
-    if (unlikely(rc != SQLITE_DONE))
+    if (unlikely(rc != SQLITE_DONE)) {
         error_report("Failed to delete models, rc = %d", rc);
+        return rc;
+    }
 
     rc = sqlite3_reset(res);
-    if (unlikely(rc != SQLITE_OK))
+    if (unlikely(rc != SQLITE_OK)) {
         error_report("Failed to reset statement when deleting models, rc = %d", rc);
+        return rc;
+    }
 
     return 0;
 
@@ -550,7 +558,7 @@ bind_fail:
     rc = sqlite3_reset(res);
     if (unlikely(rc != SQLITE_OK))
         error_report("Failed to reset statement to delete models, rc = %d", rc);
-    return 1;
+    return rc;
 }
 
 int ml_dimension_load_models(RRDDIM *rd) {
@@ -1370,23 +1378,37 @@ bool ml_dimension_is_anomalous(RRDDIM *rd, time_t curr_time, double value, bool
     return is_anomalous;
 }
 
-static int ml_flush_pending_models(ml_training_thread_t *training_thread) {
-    (void) db_execute(db, "BEGIN TRANSACTION;");
+static void ml_flush_pending_models(ml_training_thread_t *training_thread) {
+    int rc = db_execute(db, "BEGIN TRANSACTION;");
+    int op_no = 1;
 
-    for (const auto &pending_model: training_thread->pending_model_info) {
-        int rc = ml_dimension_add_model(&pending_model.metric_uuid, &pending_model.kmeans);
-        if (rc)
-            return rc;
+    if (!rc) {
+        op_no++;
 
-        rc = ml_dimension_delete_models(&pending_model.metric_uuid, pending_model.kmeans.before - (Cfg.num_models_to_use * Cfg.train_every));
-        if (rc)
-            return rc;
+        for (const auto &pending_model: training_thread->pending_model_info) {
+            if (!rc)
+                rc = ml_dimension_add_model(&pending_model.metric_uuid, &pending_model.kmeans);
+
+            if (!rc)
+                rc = ml_dimension_delete_models(&pending_model.metric_uuid, pending_model.kmeans.before - (Cfg.num_models_to_use * Cfg.train_every));
+        }
     }
 
-    (void) db_execute(db, "COMMIT TRANSACTION;");
+    if (!rc) {
+        op_no++;
+        rc = db_execute(db, "COMMIT TRANSACTION;");
+    }
+
+    // try to rollback transaction if we got any failures
+    if (rc) {
+        error("Trying to rollback ML transaction because it failed with rc=%d, op_no=%d", rc, op_no);
+        op_no++;
+        rc = db_execute(db, "ROLLBACK;");
+        if (rc)
+            error("ML transaction rollback failed with rc=%d", rc);
+    }
 
     training_thread->pending_model_info.clear();
-    return 0;
 }
 
 static void *ml_train_main(void *arg) {