Browse Source

Performance and general cleanup of SQLite driver

Brian Aker 12 years ago
parent
commit
c0bca418e0

+ 1 - 0
gearmand/gearmand.cc

@@ -343,6 +343,7 @@ int main(int argc, char *argv[])
     return EXIT_FAILURE;
   }
 
+  assert(queue_type.size());
   if (queue_type.empty() == false)
   {
     gearmand_error_t rc;

+ 9 - 1
libgearman-server/gearmand.cc

@@ -142,13 +142,20 @@ static void gearman_server_free(gearman_server_st& server)
     delete worker;
   }
 
+  gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "removing queue: %s", (server.queue_version == QUEUE_VERSION_CLASS) ? "CLASS" : "FUNCTION");
   if (server.queue_version == QUEUE_VERSION_CLASS)
   {
     delete server.queue.object;
+    assert(server.queue.functions == NULL);
   }
   else if (server.queue_version == QUEUE_VERSION_FUNCTION)
   {
     delete server.queue.functions;
+    assert(server.queue.object == NULL);
+  }
+  else
+  {
+    gearmand_debug("Unknown queue type in removal");
   }
 }
 
@@ -1116,8 +1123,9 @@ static bool gearman_server_create(gearman_server_st& server,
   server.free_client_list= NULL;
   server.free_worker_list= NULL;
 
-  server.queue_version= QUEUE_VERSION_FUNCTION;
+  server.queue_version= QUEUE_VERSION_NONE;
   server.queue.object= NULL;
+  server.queue.functions= NULL;
 
   memset(server.job_hash, 0,
          sizeof(gearman_server_job_st *) * GEARMAND_JOB_HASH_SIZE);

+ 1 - 1
libgearman-server/io.cc

@@ -639,7 +639,7 @@ gearmand_error_t gearman_io_recv(gearman_server_con_st *con, bool recv_data)
         }
         else if (ret != GEARMAN_IO_WAIT)
         {
-	  gearmand_gerror_warn("protocol failure, closing connection", ret);
+          gearmand_gerror_warn("protocol failure, closing connection", ret);
           _connection_close(connection);
           return ret;
         }

+ 54 - 0
libgearman-server/plugins/base.h

@@ -64,6 +64,24 @@ public:
 
   boost::program_options::options_description &command_line_options();
 
+  bool has_error()
+  {
+    return _error_string.size();
+  }
+
+  const std::string& error_string()
+  {
+    return _error_string;
+  }
+
+protected:
+  void reset_error()
+  {
+    _error_string.clear();
+  }
+
+  std::string _error_string;
+
 private:
   boost::program_options::options_description _command_line_options;
   std::string _name;
@@ -103,6 +121,24 @@ public:
                                      const void *data, size_t data_size,
                                      gearman_job_priority_t priority,
                                      int64_t when);
+
+  bool has_error()
+  {
+    return _error_string.size();
+  }
+
+  const std::string& error_string()
+  {
+    return _error_string;
+  }
+
+protected:
+  void reset_error()
+  {
+    _error_string.clear();
+  }
+
+  std::string _error_string;
 };
 
 } // namespace queue
@@ -135,6 +171,24 @@ public:
                         const void *data,
                         const size_t data_size,
                         gearmand_error_t& ret_ptr)= 0;
+
+  bool has_error()
+  {
+    return _error_string.size();
+  }
+
+  const std::string& error_string()
+  {
+    return _error_string;
+  }
+
+protected:
+  void reset_error()
+  {
+    _error_string.clear();
+  }
+
+  std::string _error_string;
 };
 
 } // namespace protocol

+ 1 - 1
libgearman-server/plugins/protocol/gear/protocol.cc

@@ -67,7 +67,7 @@ static gearmand_error_t gearmand_packet_unpack_header(gearmand_packet_st *packet
   }
   else
   {
-    gearmand_error("invalid magic value");
+    gearmand_warning("invalid magic value");
     return GEARMAN_INVALID_MAGIC;
   }
 

+ 281 - 248
libgearman-server/plugins/queue/sqlite/instance.cc

@@ -47,8 +47,12 @@ namespace queue {
 
 Instance::Instance(const std::string& schema_, const std::string& table_):
   _epoch_support(true),
+  _check_replay(false),
   _in_trans(0),
   _db(NULL),
+  delete_sth(NULL),
+  insert_sth(NULL),
+  replay_sth(NULL),
   _schema(schema_),
   _table(table_)
   { 
@@ -72,9 +76,22 @@ Instance::Instance(const std::string& schema_, const std::string& table_):
 
 Instance::~Instance()
 {
+  _sqlite3_finalize(delete_sth);
+  delete_sth= NULL;
+
+  _sqlite3_finalize(insert_sth);
+  insert_sth= NULL;
+
+  _sqlite3_finalize(replay_sth);
+  replay_sth= NULL;
+
+  assert(_db);
   if (_db)
   {
-    sqlite3_close(_db);
+    if (sqlite3_close(_db) != SQLITE_OK)
+    {
+      gearmand_error(sqlite3_errmsg(_db));
+    }
     _db= NULL;
     gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "sqlite shutdown database");
   }
@@ -82,93 +99,131 @@ Instance::~Instance()
   gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "sqlite shutdown");
 }
 
-int Instance::_sqlite_query(const char *query, size_t query_size,
-                            sqlite3_stmt ** sth)
+void Instance::_sqlite3_finalize(sqlite3_stmt* sth)
+{
+  if (sth)
+  {
+    if (sqlite3_finalize(sth) != SQLITE_OK )
+    {
+      gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "finalize error: %s", sqlite3_errmsg(_db));
+    }
+  }
+}
+
+bool Instance::_sqlite_prepare(const char *query, size_t query_size, sqlite3_stmt ** sth)
 {
+  reset_error();
   if (query_size > UINT32_MAX)
   {
-    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "query size too big [%u]", (uint32_t)query_size);
-    return SQLITE_ERROR;
+    _error_string= "query size too big";
+    return false;
   }
 
   gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "sqlite query: %s", query);
-  int ret= sqlite3_prepare(_db, query, (int)query_size, sth, NULL);
-  if (ret  != SQLITE_OK)
+  if (sqlite3_prepare_v2(_db, query, int(query_size), sth, NULL) != SQLITE_OK)
   {
-    if (*sth)
-    {
-      sqlite3_finalize(*sth);
-    }
-    *sth= NULL;
-    gearmand_log_error(AT, "sqlite_prepare:%s", sqlite3_errmsg(_db));
+    _error_string= sqlite3_errmsg(_db);
+    return false;
   }
 
-  return ret;
+  return true;
 }
 
-int Instance::_sqlite_lock()
+bool Instance::_sqlite_lock()
 {
   if (_in_trans)
   {
     /* already in transaction */
-    return SQLITE_OK;
+    return true;
   }
 
-  char* error= NULL;
-  sqlite3_exec(_db, "BEGIN TRANSACTION", NULL, NULL, &error);
-  if (error != NULL)
+  if (_sqlite_dispatch("BEGIN TRANSACTION") == false)
   {
-    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
-                       "failed to begin transaction: %s",
-                       error);
-    return sqlite3_errcode(_db);
+    return false;
   }
 
   _in_trans++;
 
-  return SQLITE_OK;
+  return true;
+}
+
+static int sql_count(void * rows_, int argc, char **, char **)
+{
+  int *rows= (int*)rows_;
+  assert(argc == 1);
+  (void)argc;
+  assert(rows);
+  *rows= *rows +1;
+
+  return 0;
 }
 
-gearmand_error_t Instance::_sqlite_dispatch(const std::string& arg, bool send_error)
+bool Instance::_sqlite_dispatch(const char* arg)
 {
+  int count;
+  return _sqlite_count(arg, count);
+}
+
+bool Instance::_sqlite_count(const char* arg, int& count)
+{
+  reset_error();
+  count= 0;
+
   char* error= NULL;
-  sqlite3_exec(_db, arg.c_str(), NULL, NULL, &error);
+  int errcode= sqlite3_exec(_db, arg, sql_count, &count, &error);
+  if (error != NULL or errcode != SQLITE_OK)
+  {
+    assert(errcode != SQLITE_OK);
+    _error_string= error;
+    sqlite3_free(error);
+
+    return false;
+  }
+
+  return true;
+}
 
-  if (error != NULL)
+bool Instance::_sqlite_count(const std::string& arg, int& count)
+{
+  reset_error();
+  count= 0;
+
+  char* error= NULL;
+  int errcode= sqlite3_exec(_db, arg.c_str(), sql_count, &count, &error);
+  if (error != NULL or errcode != SQLITE_OK)
   {
-    if (send_error)
-    {
-      return gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
-                                "%s", error);
-    }
+    assert(errcode != SQLITE_OK);
+    _error_string= error;
+    sqlite3_free(error);
 
-    return GEARMAN_UNKNOWN_OPTION;
+    return false;
   }
 
-  return GEARMAN_SUCCESS;
+  return true;
+}
+
+bool Instance::_sqlite_dispatch(const std::string& arg)
+{
+  int count;
+  return _sqlite_count(arg, count);
 }
 
-int Instance::_sqlite_commit()
+bool Instance::_sqlite_commit()
 {
   if (_in_trans == 0)
   {
     /* not in transaction */
-    return SQLITE_OK;
+    return true;
   }
 
-  char* error= NULL;
-  sqlite3_exec(_db, "COMMIT", NULL, NULL, &error);
-
-  if (error != NULL)
+  if (_sqlite_dispatch("COMMIT") == false)
   {
-    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
-                       "failed to commit transaction: %s", error);
-    return sqlite3_errcode(_db);
+    return false;
   }
 
   _in_trans= 0;
 
-  return SQLITE_OK;
+  return true;
 }
 
 gearmand_error_t Instance::init()
@@ -180,61 +235,57 @@ gearmand_error_t Instance::init()
     return gearmand_gerror("missing required --libsqlite3-db=<dbfile> argument", GEARMAN_QUEUE_ERROR);
   }
 
+  gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "sqlite open: %s", _schema.c_str());
+
   assert(_db == NULL);
-  if (sqlite3_open(_schema.c_str(), &_db) != SQLITE_OK)
+  if (sqlite3_open_v2(_schema.c_str(), &_db, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL) != SQLITE_OK)
   {
-    _error_string= "sqlite3_open failed with: ";
-    _error_string+= sqlite3_errmsg(_db);
-    return gearmand_gerror(_error_string.c_str(), GEARMAN_QUEUE_ERROR);
+    return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR, "sqlite3_open failed with: %s", sqlite3_errmsg(_db));
   }
 
   if (_db == NULL)
   {
-    _error_string= "Unknown error while opening up sqlite file";
-    return gearmand_gerror(_error_string.c_str(), GEARMAN_QUEUE_ERROR);
+    return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR, "Unknown error while opening up sqlite file");
   }
 
-  sqlite3_stmt* sth;
-  if (_sqlite_query(gearman_literal_param("SELECT name FROM sqlite_master WHERE type='table'"), &sth) != SQLITE_OK)
-  {
-    _error_string= "Unknown error while calling SELECT on sqlite file ";
-    _error_string+= _schema;
-    _error_string+= " :";
-    _error_string+= sqlite3_errmsg(_db);
+  int rows;
+  std::string check_table_str("SELECT 1 FROM sqlite_master WHERE type='table' AND name='");
+  check_table_str+= _table;
+  check_table_str+= "'";
 
+  if (_sqlite_count(check_table_str, rows) == false)
+  {
     return gearmand_gerror(_error_string.c_str(), GEARMAN_QUEUE_ERROR);
   }
 
-  bool found= false;
-  while (sqlite3_step(sth) == SQLITE_ROW)
+  if (rows)
   {
-    char *table= NULL;
-
-    if (sqlite3_column_type(sth, 0) == SQLITE_TEXT)
-    {
-      table= (char*)sqlite3_column_text(sth, 0);
-    }
-    else
-    {
-      std::string error_string("Column `name` from sqlite_master is not type TEXT");
-      sqlite3_finalize(sth);
-      return gearmand_gerror(error_string.c_str(), GEARMAN_QUEUE_ERROR);
-    }
-
-    if (_table.compare(table) == 0)
+    std::string query("SELECT when_to_run FROM ");
+    query+= _table;
+    sqlite3_stmt *select_sth;
+    if (_sqlite_prepare(query.c_str(), _delete_query.size(), &select_sth) == false)
     {
-      gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "sqlite module using table '%s'", table);
-      found= true;
-      break;
+      gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
+                          "Error from '%s': %s",
+                          query.c_str(),
+                          _error_string.c_str());
+      query.clear();
+      query+= "ALTER TABLE ";
+      query+= _table;
+      query+= " ADD COLUMN when_to_run INTEGER";
+      if (_sqlite_dispatch(query) == false)
+      {
+        gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
+                           "Error from '%s': %s",
+                           query.c_str(),
+                           _error_string.c_str());
+        gearmand_info("No epoch support in sqlite queue");
+        _epoch_support= false;
+      }
     }
+    _sqlite3_finalize(select_sth);
   }
-
-  if (sqlite3_finalize(sth) != SQLITE_OK)
-  {
-    return gearmand_gerror(sqlite3_errmsg(_db), GEARMAN_QUEUE_ERROR);
-  }
-
-  if (found == false)
+  else
   {
     std::string query("CREATE TABLE ");
     query+= _table;
@@ -242,51 +293,67 @@ gearmand_error_t Instance::init()
 
     gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "sqlite module creating table '%s'", _table.c_str());
 
-    gearmand_error_t ret= _sqlite_dispatch(query);
-    if (ret != GEARMAN_SUCCESS)
+    if (_sqlite_dispatch(query) == false)
     {
-      return ret;
+      return gearmand_gerror(_error_string.c_str(), GEARMAN_QUEUE_ERROR);
     }
   }
-  else
+
+  if (_sqlite_prepare(_delete_query.c_str(), _delete_query.size(), &delete_sth) == false)
   {
-    std::string query("SELECT when_to_run FROM ");
+    return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR,
+                               "DELETE PREPARE error: %s",
+                               _error_string.c_str());
+  }
+
+  if (_sqlite_prepare(_insert_query.c_str(), _insert_query.size(), &insert_sth) == false)
+  {
+    return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR,
+                               "INSERT PREPARE: %s",  _error_string.c_str());
+  }
+
+  {
+    std::string query;
+    if (_epoch_support)
+    {
+      query+= "SELECT unique_key,function_name,priority,data,when_to_run FROM ";
+    }
+    else
+    {
+      query+= "SELECT unique_key,function_name,priority,data FROM ";
+    }
     query+= _table;
 
-    gearmand_error_t ret= _sqlite_dispatch(query);
-    if (ret != GEARMAN_SUCCESS)
+    if (_sqlite_prepare(query.c_str(), query.size(), &replay_sth) == false)
     {
-      gearmand_info("No epoch support in sqlite queue");
-      _epoch_support= false;
+      return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR,
+                                 "REPLAY PREPARE: %s", _error_string.c_str());
     }
   }
 
+
   return GEARMAN_SUCCESS;
 }
 
-int Instance::_sqlite_rollback()
+bool Instance::_sqlite_rollback()
 {
   if (_in_trans == 0)
   {
     /* not in transaction */
-    return SQLITE_OK;
+    return true;
   }
 
-  char* error= NULL;
-  sqlite3_exec(_db, "ROLLBACK", NULL, NULL, &error);
-
-  if (error != NULL)
+  if (_sqlite_dispatch("ROLLBACK") == false)
   {
-    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
-                       "failed to commit transaction: %s", error);
-    return sqlite3_errcode(_db);
+    return false;
   }
+
   _in_trans= 0;
 
-  return SQLITE_OK;
+  return true;
 }
 
-gearmand_error_t Instance::add(gearman_server_st *server,
+gearmand_error_t Instance::add(gearman_server_st*,
                                   const char *unique, size_t unique_size,
                                   const char *function_name,
                                   size_t function_name_size,
@@ -294,156 +361,130 @@ gearmand_error_t Instance::add(gearman_server_st *server,
                                   gearman_job_priority_t priority,
                                   int64_t when)
 {
-  (void)server;
-  sqlite3_stmt* sth;
-
+  assert(_check_replay == false);
   if (when and _epoch_support == false)
   {
     return gearmand_gerror("Table lacks when_to_run field", GEARMAN_QUEUE_ERROR);
   }
 
-  if (unique_size > UINT32_MAX || function_name_size > UINT32_MAX ||
-      data_size > UINT32_MAX)
-  {
-    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "size too big [%u]", (uint32_t)unique_size);
-    return GEARMAN_QUEUE_ERROR;
-  }
+  gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "sqlite add: %.*s %.*s at %ld",
+                     uint32_t(unique_size), (char *)unique,
+                     uint32_t(function_name_size), (char *)function_name,
+                     (long int)when);
 
-  gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "sqlite add: %.*s at %ld", (uint32_t)unique_size, (char *)unique, (long int)when);
-
-  if (_sqlite_lock() !=  SQLITE_OK)
+  if (_sqlite_lock() ==  false)
   {
-    return GEARMAN_QUEUE_ERROR;
+    return gearmand_gerror(_error_string.c_str(), GEARMAN_QUEUE_ERROR);
   }
 
-  if (_sqlite_query(insert_query().c_str(), insert_query().size(), &sth) != SQLITE_OK)
+  if (sqlite3_reset(insert_sth) != SQLITE_OK)
   {
-    return GEARMAN_QUEUE_ERROR;
+    return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR,
+                               "failed to reset INSERT prep statement: %s", sqlite3_errmsg(_db));
   }
 
-  if (sqlite3_bind_int(sth,  1, priority) != SQLITE_OK)
+  if (sqlite3_bind_int(insert_sth,  1, priority) != SQLITE_OK)
   {
-    _sqlite_rollback();
-    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "failed to bind int [%d]: %s", priority, sqlite3_errmsg(_db));
-    sqlite3_finalize(sth);
-
-    return GEARMAN_QUEUE_ERROR;
+    return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR,
+                               "failed to bind priority [%d]: %s", priority, sqlite3_errmsg(_db));
   }
 
-  if (sqlite3_bind_text(sth, 2, (const char *)unique, (int)unique_size,
-                        SQLITE_TRANSIENT) != SQLITE_OK)
+  if (sqlite3_bind_text(insert_sth, 2, (const char *)unique, (int)unique_size, SQLITE_TRANSIENT) != SQLITE_OK)
   {
-    _sqlite_rollback();
-    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "failed to bind text [%.*s]: %s", (uint32_t)unique_size, (char*)unique, sqlite3_errmsg(_db));
-    sqlite3_finalize(sth);
-    return GEARMAN_QUEUE_ERROR;
+    return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR,
+                               "failed to bind unique [%.*s]: %s", (uint32_t)unique_size, (char*)unique, sqlite3_errmsg(_db));
   }
 
-  if (sqlite3_bind_text(sth, 3, (const char *)function_name, (int)function_name_size,
-                        SQLITE_TRANSIENT) != SQLITE_OK)
+  if (sqlite3_bind_text(insert_sth, 3, (const char *)function_name, (int)function_name_size, SQLITE_TRANSIENT) != SQLITE_OK)
   {
-    _sqlite_rollback();
-    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "failed to bind text [%.*s]: %s", (uint32_t)function_name_size, (char*)function_name, sqlite3_errmsg(_db));
-    sqlite3_finalize(sth);
-    return GEARMAN_QUEUE_ERROR;
+    return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR,
+                               "failed to bind function [%.*s]: %s", (uint32_t)function_name_size, (char*)function_name, sqlite3_errmsg(_db));
   }
 
-  if (sqlite3_bind_blob(sth, 4, data, (int)data_size,
-                        SQLITE_TRANSIENT) != SQLITE_OK)
+  if (sqlite3_bind_blob(insert_sth, 4, data, (int)data_size, SQLITE_TRANSIENT) != SQLITE_OK)
   {
-    _sqlite_rollback();
-    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "failed to bind blob: %s", sqlite3_errmsg(_db));
-    sqlite3_finalize(sth);
-    return GEARMAN_QUEUE_ERROR;
+    return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR, 
+                               "failed to bind data blob: %s", sqlite3_errmsg(_db));
   }
 
   // epoch data
-  if (sqlite3_bind_int64(sth,  5, when) != SQLITE_OK)
+  if (sqlite3_bind_int64(insert_sth,  5, when) != SQLITE_OK)
   {
-    _sqlite_rollback();
-    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "failed to bind int64_t(%ld): %s", (long int)when, sqlite3_errmsg(_db));
-    sqlite3_finalize(sth);
-    return GEARMAN_QUEUE_ERROR;
-  }
-
-  if (sqlite3_step(sth) != SQLITE_DONE)
-  {
-    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "insert error: %s", sqlite3_errmsg(_db));
-    if (sqlite3_finalize(sth) != SQLITE_OK )
-    {
-      gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "finalize error: %s", sqlite3_errmsg(_db));
-    }
-
-    return GEARMAN_QUEUE_ERROR;
+    return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR,
+                               "failed to bind epoch int64_t(%ld): %s", (long int)when, sqlite3_errmsg(_db));
   }
 
   gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
                      "sqlite data: priority: %d, unique_key: %s, function_name: %s",
                      priority, (char*)unique, (char*)function_name);
 
-  sqlite3_finalize(sth);
 
-  if (_sqlite_commit() !=  SQLITE_OK)
+  // INSERT happens here
+  if (sqlite3_step(insert_sth) != SQLITE_DONE)
   {
-    return GEARMAN_QUEUE_ERROR;
+    return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR,
+                               "INSERT error: %s", sqlite3_errmsg(_db));
   }
 
   return GEARMAN_SUCCESS;
 }
 
-gearmand_error_t Instance::flush(gearman_server_st *server)
+gearmand_error_t Instance::flush(gearman_server_st*)
 {
-  (void)server;
   gearmand_debug("sqlite flush");
 
+  if (_sqlite_commit() == false)
+  {
+    return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR,
+                               "COMMIT called on FLUSH error: %s",
+                               _error_string.c_str());
+  }
+
   return GEARMAN_SUCCESS;
 }
 
-gearmand_error_t Instance::done(gearman_server_st *server,
+gearmand_error_t Instance::done(gearman_server_st*,
                                    const char *unique,
                                    size_t unique_size,
                                    const char *function_name,
                                    size_t function_name_size)
 {
-  (void)server;
-  sqlite3_stmt* sth;
+  gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "sqlite done: %.*s", uint32_t(unique_size), (char *)unique);
 
-  if (unique_size > UINT32_MAX)
+  if (_sqlite_lock() == false)
   {
-    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
-                       "unique key size too big [%u]", (uint32_t)unique_size);
-    return GEARMAN_QUEUE_ERROR;
+    return gearmand_gerror(_error_string.c_str(), GEARMAN_QUEUE_ERROR);
   }
 
-  gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "sqlite done: %.*s", (uint32_t)unique_size, (char *)unique);
-
-  if (_sqlite_lock() !=  SQLITE_OK)
+  if (sqlite3_reset(delete_sth) != SQLITE_OK)
   {
-    return GEARMAN_QUEUE_ERROR;
+    return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR,
+                               "failed to reset DELETE prep statement: %s", sqlite3_errmsg(_db));
   }
 
-  if (_sqlite_query(delete_query().c_str(), delete_query().size(), &sth) != SQLITE_OK)
+  if (sqlite3_bind_text(delete_sth, 1, (const char *)unique, int(unique_size), SQLITE_TRANSIENT) != SQLITE_OK)
   {
-    return GEARMAN_QUEUE_ERROR;
+    return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR,
+                               "failed to bind unique [%.*s]: %s", uint32_t(unique_size), (char*)unique, sqlite3_errmsg(_db));
   }
 
-  sqlite3_bind_text(sth, 1, (const char *)unique, (int)unique_size, SQLITE_TRANSIENT);
-  sqlite3_bind_text(sth, 2, (const char *)function_name, (int)function_name_size, SQLITE_TRANSIENT);
-
-  if (sqlite3_step(sth) != SQLITE_DONE)
+  if (sqlite3_bind_text(delete_sth, 2, (const char *)function_name, int(function_name_size), SQLITE_TRANSIENT) != SQLITE_OK)
   {
-    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
-                       "delete error: %s",
-                       sqlite3_errmsg(_db));
-    sqlite3_finalize(sth);
-    return GEARMAN_QUEUE_ERROR;
+    return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR,
+                               "failed to bind function [%.*s]: %s", uint32_t(function_name_size), (char*)function_name, sqlite3_errmsg(_db));
   }
 
-  sqlite3_finalize(sth);
+  // DELETE happens here
+  if (sqlite3_step(delete_sth) != SQLITE_DONE)
+  {
+    return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR,
+                               "DELETE error: %s",
+                               sqlite3_errmsg(_db));
+  }
 
-  if (_sqlite_commit() !=  SQLITE_OK)
+  if (_sqlite_commit() == false)
   {
-    return GEARMAN_QUEUE_ERROR;
+    return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR, "DELETE error: %s", _error_string.c_str());
   }
 
   return GEARMAN_SUCCESS;
@@ -451,99 +492,87 @@ gearmand_error_t Instance::done(gearman_server_st *server,
 
 gearmand_error_t Instance::replay(gearman_server_st *server)
 {
-  gearmand_info("sqlite replay start");
+  gearmand_error_t ret;
+  _check_replay= true;
 
-  std::string query;
-  if (_epoch_support)
-  {
-    query+= "SELECT unique_key,function_name,priority,data,when_to_run FROM ";
-  }
-  else
+  if (gearmand_failed(ret= replay_loop(server)))
   {
-    query+= "SELECT unique_key,function_name,priority,data FROM ";
+    if (_sqlite_rollback() == false)
+    {
+      gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "failed to rollback sqlite from failed replay  error: %s", _error_string.c_str());
+    }
   }
-  query+= _table;
+  _check_replay= false;
 
-  sqlite3_stmt* sth;
-  if (_sqlite_query(query.c_str(), query.size(), &sth) != SQLITE_OK)
-  {
-    return GEARMAN_QUEUE_ERROR;
-  }
+  return ret;
+}
 
-  while (sqlite3_step(sth) == SQLITE_ROW)
+gearmand_error_t Instance::replay_loop(gearman_server_st *server)
+{
+  gearmand_info("sqlite replay start");
+
+  gearmand_error_t gret;
+  while (sqlite3_step(replay_sth) == SQLITE_ROW)
   {
     const char *unique, *function_name;
     size_t unique_size, function_name_size;
 
-    if (sqlite3_column_type(sth,0) == SQLITE_TEXT)
+    if (sqlite3_column_type(replay_sth, 0) == SQLITE_TEXT)
     {
-      unique= (char *)sqlite3_column_text(sth,0);
-      unique_size= (size_t) sqlite3_column_bytes(sth,0);
+      unique= (char *)sqlite3_column_text(replay_sth, 0);
+      unique_size= size_t(sqlite3_column_bytes(replay_sth, 0));
     }
     else
     {
-      sqlite3_finalize(sth);
-      gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
-                         "column %d is not type TEXT", 0);
-      return GEARMAN_QUEUE_ERROR;
+      return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR, "column %d is not type TEXT: %d", 0, int(sqlite3_column_type(replay_sth, 0)));
     }
 
-    if (sqlite3_column_type(sth,1) == SQLITE_TEXT)
+    if (sqlite3_column_type(replay_sth, 1) == SQLITE_TEXT)
     {
-      function_name= (char *)sqlite3_column_text(sth,1);
-      function_name_size= (size_t)sqlite3_column_bytes(sth,1);
+      function_name= (char *)sqlite3_column_text(replay_sth, 1);
+      function_name_size= size_t(sqlite3_column_bytes(replay_sth, 1));
     }
     else
     {
-      sqlite3_finalize(sth);
-      gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
-                         "column %d is not type TEXT", 1);
-      return GEARMAN_QUEUE_ERROR;
+      return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR,
+                                 "column %d is not type TEXT", 1);
     }
 
     gearman_job_priority_t priority;
-    if (sqlite3_column_type(sth,2) == SQLITE_INTEGER)
+    if (sqlite3_column_type(replay_sth, 2) == SQLITE_INTEGER)
     {
-      priority= (gearman_job_priority_t)sqlite3_column_int64(sth,2);
+      priority= (gearman_job_priority_t)sqlite3_column_int64(replay_sth, 2);
     }
     else
     {
-      sqlite3_finalize(sth);
-      gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
-                         "column %d is not type INTEGER", 2);
-      return GEARMAN_QUEUE_ERROR;
+      return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR,
+                                 "column %d is not type INTEGER", 2);
     }
 
-    if (sqlite3_column_type(sth,3) != SQLITE_BLOB)
+    if (sqlite3_column_type(replay_sth, 3) != SQLITE_BLOB)
     {
-      sqlite3_finalize(sth);
-      gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "column %d is not type TEXT", 3);
-      return GEARMAN_QUEUE_ERROR;
+      return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR, "column %d is not type TEXT", 3);
     }
 
-    size_t data_size= (size_t)sqlite3_column_bytes(sth,3);
+    size_t data_size= (size_t)sqlite3_column_bytes(replay_sth, 3);
     char* data= (char*)malloc(data_size);
     /* need to make a copy here ... gearman_server_job_free will free it later */
     if (data == NULL)
     {
-      sqlite3_finalize(sth);
-      gearmand_perror("malloc");
-      return GEARMAN_MEMORY_ALLOCATION_FAILURE;
+      return gearmand_perror("malloc");
     }
-    memcpy(data, sqlite3_column_blob(sth,3), data_size);
+    memcpy(data, sqlite3_column_blob(replay_sth, 3), data_size);
     
     int64_t when;
     if (_epoch_support)
     {
-      if (sqlite3_column_type(sth, 4) == SQLITE_INTEGER)
+      if (sqlite3_column_type(replay_sth, 4) == SQLITE_INTEGER)
       {
-        when= (int64_t)sqlite3_column_int64(sth, 4);
+        when= int64_t(sqlite3_column_int64(replay_sth, 4));
       }
       else
       {
-        sqlite3_finalize(sth);
-        gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "column %d is not type INTEGER", 3);
-        return GEARMAN_QUEUE_ERROR;
+        return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR, "column %d is not type INTEGER", 3);
       }
     }
     else
@@ -551,24 +580,28 @@ gearmand_error_t Instance::replay(gearman_server_st *server)
       when= 0;
     }
 
-    gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "sqlite replay: %s", (char*)function_name);
+    gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "sqlite replay: %s %s", (char*)unique, (char*)function_name);
+
+    gret= Instance::replay_add(server,
+                               NULL,
+                               unique, unique_size,
+                               function_name, function_name_size,
+                               data, data_size,
+                               priority, when);
 
-    gearmand_error_t gret= Instance::replay_add(server,
-                                                NULL,
-                                                unique, unique_size,
-                                                function_name, function_name_size,
-                                                data, data_size,
-                                                priority, when);
     if (gearmand_failed(gret))
     {
-      sqlite3_finalize(sth);
-      return gret;
+      break;
     }
   }
 
-  sqlite3_finalize(sth);
+  if (sqlite3_reset(replay_sth) != SQLITE_OK)
+  {
+    return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_QUEUE_ERROR,
+                               "failed to reset REPLAY prep statement: %s", sqlite3_errmsg(_db));
+  }
 
-  return GEARMAN_SUCCESS;
+  return gret;
 }
 
 } // namespace queue

+ 25 - 16
libgearman-server/plugins/queue/sqlite/instance.hpp

@@ -53,17 +53,6 @@ public:
 
   ~Instance();
 
-  const std::string &insert_query() const
-  {
-    return _insert_query;
-  }
-
-  const std::string &delete_query() const
-  {
-    return _delete_query;
-  }
-
-
   gearmand_error_t init();
 
   gearmand_error_t add(gearman_server_st *server,
@@ -81,17 +70,37 @@ public:
 
   gearmand_error_t replay(gearman_server_st *server);
 
+  bool has_error()
+  {
+    return _error_string.size();
+  }
+
 private:
-  gearmand_error_t _sqlite_dispatch(const std::string& arg, bool send_error= true);
-  int _sqlite_query(const char *query, size_t query_size, sqlite3_stmt ** sth);
-  int _sqlite_commit();
-  int _sqlite_rollback();
-  int _sqlite_lock();
+  gearmand_error_t replay_loop(gearman_server_st *server);
+
+  void reset_error()
+  {
+    _error_string.clear();
+  }
+
+  bool _sqlite_count(const std::string& arg, int& count);
+  bool _sqlite_dispatch(const std::string& arg);
+  bool _sqlite_dispatch(const char* arg);
+  bool _sqlite_count(const char* arg, int& count);
+  bool _sqlite_prepare(const char *query, size_t query_size, sqlite3_stmt ** sth);
+  bool _sqlite_commit();
+  bool _sqlite_rollback();
+  bool _sqlite_lock();
+  void _sqlite3_finalize(sqlite3_stmt*);
 
 private:
   bool _epoch_support;
+  bool _check_replay;
   int _in_trans;
   sqlite3 *_db;
+  sqlite3_stmt* delete_sth;
+  sqlite3_stmt* insert_sth;
+  sqlite3_stmt* replay_sth;
   std::string _error_string;
   std::string _schema;
   std::string _table;

+ 12 - 7
libgearman-server/queue.cc

@@ -123,6 +123,11 @@ void gearman_server_set_queue(gearman_server_st& server,
                               gearman_queue_done_fn *done,
                               gearman_queue_replay_fn *replay)
 {
+  delete server.queue.functions;
+  server.queue.functions= NULL;
+  delete server.queue.object;
+  server.queue.object= NULL;
+
   server.queue_version= QUEUE_VERSION_FUNCTION;
   server.queue.functions= new queue_st();
   if (server.queue.functions)
@@ -139,6 +144,10 @@ void gearman_server_set_queue(gearman_server_st& server,
 void gearman_server_set_queue(gearman_server_st& server,
                               gearmand::queue::Context* context)
 {
+  delete server.queue.functions;
+  server.queue.functions= NULL;
+  delete server.queue.object;
+  server.queue.object= NULL;
   assert(context);
   {
     server.queue_version= QUEUE_VERSION_CLASS;
@@ -192,10 +201,8 @@ gearmand_error_t initialize(gearmand_st *, std::string name)
       gearmand_error_t rc;
       if (gearmand_failed(rc= (*iter)->initialize()))
       {
-        std::string error_string("Failed to initialize ");
-        error_string+= name;
-
-        return gearmand_gerror(error_string.c_str(), rc);
+        return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, rc,
+                                   "Failed to initialize %s: %s", name.c_str(), (*iter)->error_string().c_str());
       }
 
       launched= true;
@@ -204,9 +211,7 @@ gearmand_error_t initialize(gearmand_st *, std::string name)
 
   if (launched == false)
   {
-    std::string error_string("Unknown queue ");
-    error_string+= name;
-    return gearmand_gerror(error_string.c_str(), GEARMAN_UNKNOWN_OPTION);
+    return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAN_UNKNOWN_OPTION, "Unknown queue %s", name.c_str());
   }
 
   return GEARMAN_SUCCESS;

+ 3 - 2
libgearman-server/struct/server.h

@@ -57,6 +57,7 @@ struct queue_st {
 };
 
 enum queue_version_t {
+  QUEUE_VERSION_NONE,
   QUEUE_VERSION_FUNCTION,
   QUEUE_VERSION_CLASS
 };
@@ -65,7 +66,7 @@ enum queue_version_t {
 namespace gearmand { namespace queue { class Context; } }
 #endif
 
-union queue_un {
+struct Queue_st {
   struct queue_st* functions;
 #ifdef __cplusplus
   gearmand::queue::Context* object;
@@ -110,7 +111,7 @@ struct gearman_server_st
   gearman_server_client_st *free_client_list;
   gearman_server_worker_st *free_worker_list;
   enum queue_version_t queue_version;
-  union queue_un queue;
+  struct Queue_st queue;
   pthread_mutex_t proc_lock;
   pthread_cond_t proc_cond;
   pthread_t proc_id;

+ 5 - 2
libtest/client.cc

@@ -139,8 +139,11 @@ SimpleClient::~SimpleClient()
 
 void SimpleClient::close_socket()
 {
-  close(sock_fd);
-  sock_fd= INVALID_SOCKET;
+  if (sock_fd != INVALID_SOCKET)
+  {
+    close(sock_fd);
+    sock_fd= INVALID_SOCKET;
+  }
 }
 
 bool SimpleClient::instance_connect()

Some files were not shown because too many files changed in this diff