123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531 |
- /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
- *
- * Gearmand client and server library.
- *
- * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
- * Copyright (C) 2011 Oleksiy Krivoshey
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- *
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * The names of its contributors may not be used to endorse or
- * promote products derived from this software without specific prior
- * written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
- #include <gear_config.h>
- #include <libgearman-server/common.h>
- #include <libgearman-server/byte.h>
- #include <libgearman-server/plugins/queue/mysql/queue.h>
- #include <libgearman-server/plugins/queue/base.h>
- #include <mysql.h>
- #if !defined(MARIADB_BASE_VERSION) && !defined(MARIADB_VERSION_ID) && \
- MYSQL_VERSION_ID >= 80001 && MYSQL_VERSION_ID != 80002
- typedef bool my_bool;
- #endif
- #include <errmsg.h>
- #include <cerrno>
- /**
- * Default values.
- */
- #define GEARMAND_QUEUE_MYSQL_DEFAULT_TABLE "gearman_queue"
- namespace gearmand { namespace plugins { namespace queue { class MySQL; } } }
- static gearmand_error_t _initialize(gearman_server_st& server, gearmand::plugins::queue::MySQL *queue);
- namespace gearmand
- {
- namespace plugins
- {
- namespace queue
- {
- class MySQL : public gearmand::plugins::Queue {
- public:
- MySQL();
- ~MySQL();
- gearmand_error_t initialize();
- gearmand_error_t prepareAddStatement();
- gearmand_error_t prepareDoneStatement();
- MYSQL *con;
- MYSQL_STMT *add_stmt;
- MYSQL_STMT *done_stmt;
- std::string mysql_host;
- std::string mysql_user;
- std::string mysql_password;
- std::string mysql_db;
- std::string mysql_table;
- in_port_t port() const
- {
- return _port;
- }
- private:
- in_port_t _port;
- };
- MySQL::MySQL() :
- Queue("MySQL"),
- con(NULL),
- add_stmt(NULL),
- done_stmt(NULL)
- {
- command_line_options().add_options()
- ("mysql-host", boost::program_options::value(&mysql_host)->default_value("localhost"), "MySQL host.")
- ("mysql-port", boost::program_options::value(&_port)->default_value(3306), "Port of server. (by default 3306)")
- ("mysql-user", boost::program_options::value(&mysql_user)->default_value(""), "MySQL user.")
- ("mysql-password", boost::program_options::value(&mysql_password)->default_value(""), "MySQL user password.")
- ("mysql-db", boost::program_options::value(&mysql_db)->default_value(""), "MySQL database.")
- ("mysql-table", boost::program_options::value(&mysql_table)->default_value(GEARMAND_QUEUE_MYSQL_DEFAULT_TABLE), "MySQL table name.");
- }
- MySQL::~MySQL()
- {
- if (add_stmt)
- {
- mysql_stmt_close(add_stmt);
- }
- if (con)
- {
- mysql_close(con);
- }
- }
- gearmand_error_t MySQL::initialize()
- {
- return _initialize(Gearmand()->server, this);
- }
- gearmand_error_t MySQL::prepareAddStatement()
- {
- char query_buffer[1024];
- if ((this->add_stmt= mysql_stmt_init(this->con)) == NULL)
- {
- gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_init failed: %s", mysql_error(this->con));
- return GEARMAND_QUEUE_ERROR;
- }
- int query_buffer_length= snprintf(query_buffer, sizeof(query_buffer),
- "INSERT INTO %s "
- "(unique_key, function_name, priority, data, when_to_run) "
- "VALUES(?, ?, ?, ?, ?)", this->mysql_table.c_str());
- if (mysql_stmt_prepare(this->add_stmt, query_buffer, query_buffer_length))
- {
- gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_prepare failed: %s", mysql_error(this->con));
- return GEARMAND_QUEUE_ERROR;
- }
- return GEARMAND_SUCCESS;
- }
- gearmand_error_t MySQL::prepareDoneStatement()
- {
- char query_buffer[1024];
- if ((this->done_stmt= mysql_stmt_init(this->con)) == NULL)
- {
- gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_init failed: %s", mysql_error(this->con));
- return GEARMAND_QUEUE_ERROR;
- }
- int query_buffer_length= snprintf(query_buffer, sizeof(query_buffer),
- "DELETE FROM %s "
- "WHERE unique_key=? "
- "AND function_name=?", this->mysql_table.c_str());
- if (mysql_stmt_prepare(this->done_stmt, query_buffer, query_buffer_length))
- {
- gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_prepare failed: %s", mysql_error(this->con));
- return GEARMAND_QUEUE_ERROR;
- }
- return GEARMAND_SUCCESS;
- }
- void initialize_mysql()
- {
- static MySQL local_instance;
- }
- } // namespace queue
- } // namespace plugin
- } // namespace gearmand
- /* Queue callback functions. */
- static gearmand_error_t _mysql_queue_add(gearman_server_st *server, void *context,
- const char *unique, size_t unique_size,
- const char *function_name,
- size_t function_name_size,
- const void *data, size_t data_size,
- gearman_job_priority_t priority,
- int64_t when);
- static gearmand_error_t _mysql_queue_flush(gearman_server_st *server, void *context);
- static gearmand_error_t _mysql_queue_done(gearman_server_st *server, void *context,
- const char *unique,
- size_t unique_size,
- const char *function_name,
- size_t function_name_size);
- static gearmand_error_t _mysql_queue_replay(gearman_server_st *server, void *context,
- gearman_queue_add_fn *add_fn,
- void *add_context);
- gearmand_error_t _initialize(gearman_server_st& server, gearmand::plugins::queue::MySQL *queue)
- {
- MYSQL_RES * result;
- my_bool my_true= true;
- gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,"Initializing MySQL module");
- gearman_server_set_queue(server, queue, _mysql_queue_add, _mysql_queue_flush, _mysql_queue_done, _mysql_queue_replay);
- queue->con= mysql_init(queue->con);
- mysql_options(queue->con, MYSQL_READ_DEFAULT_GROUP, "gearmand");
- mysql_options(queue->con, MYSQL_OPT_RECONNECT, &my_true);
- if (!mysql_real_connect(queue->con,
- queue->mysql_host.c_str(),
- queue->mysql_user.c_str(),
- queue->mysql_password.c_str(),
- queue->mysql_db.c_str(),
- queue->port(), NULL, 0))
- {
- gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "Failed to connect to database: %s", mysql_error(queue->con));
- return GEARMAND_QUEUE_ERROR;
- }
- if (!(result= mysql_list_tables(queue->con, queue->mysql_table.c_str())))
- {
- gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_list_tables failed: %s", mysql_error(queue->con));
- return GEARMAND_QUEUE_ERROR;
- }
- if (mysql_num_rows(result) == 0)
- {
- char query_buffer[1024];
- int query_buffer_length= snprintf(query_buffer, sizeof(query_buffer),
- "CREATE TABLE %s"
- "("
- "unique_key VARCHAR(%d),"
- "function_name VARCHAR(255),"
- "priority INT,"
- "data LONGBLOB,"
- "when_to_run INT,"
- "unique key (unique_key, function_name)"
- ")",
- queue->mysql_table.c_str(), GEARMAN_UNIQUE_SIZE);
- gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,"MySQL module: creating table %s", queue->mysql_table.c_str());
- if (mysql_real_query(queue->con, query_buffer, query_buffer_length))
- {
- gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "MySQL module: create table failed: %s", mysql_error(queue->con));
- return GEARMAND_QUEUE_ERROR;
- }
- }
- mysql_free_result(result);
- if (queue->prepareAddStatement() == GEARMAND_QUEUE_ERROR)
- {
- return GEARMAND_QUEUE_ERROR;
- }
- if (queue->prepareDoneStatement() == GEARMAND_QUEUE_ERROR)
- {
- return GEARMAND_QUEUE_ERROR;
- }
- return GEARMAND_SUCCESS;
- }
- /*
- * Static definitions
- */
- static gearmand_error_t _mysql_queue_add(gearman_server_st *, void *context,
- const char *unique, size_t unique_size,
- const char *function_name,
- size_t function_name_size,
- const void *data, size_t data_size,
- gearman_job_priority_t priority,
- int64_t when)
- {
- MYSQL_BIND bind[5];
- gearmand::plugins::queue::MySQL *queue= (gearmand::plugins::queue::MySQL *)context;
- gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue add: %.*s %.*s", (uint32_t) unique_size, (char *) unique,
- (uint32_t) function_name_size, (char *) function_name);
- mysql_ping(queue->con);
- bind[0].buffer_type= MYSQL_TYPE_STRING;
- bind[0].buffer= (char *)unique;
- bind[0].buffer_length= unique_size;
- bind[0].is_null= 0;
- bind[0].length= (unsigned long*)&unique_size;
- bind[1].buffer_type= MYSQL_TYPE_STRING;
- bind[1].buffer= (char *)function_name;
- bind[1].buffer_length= function_name_size;
- bind[1].is_null= 0;
- bind[1].length= (unsigned long*)&function_name_size;
- bind[2].buffer_type= MYSQL_TYPE_LONG;
- bind[2].buffer= (char *)&priority;
- bind[2].is_null= 0;
- bind[2].length= 0;
- bind[3].buffer_type= MYSQL_TYPE_LONG_BLOB;
- bind[3].buffer= (char *)data;
- bind[3].buffer_length= data_size;
- bind[3].is_null= 0;
- bind[3].length= (unsigned long*)&data_size;
- bind[4].buffer_type= MYSQL_TYPE_LONG;
- bind[4].buffer= (char *)&when;
- bind[4].is_null= 0;
- bind[4].length= 0;
- while(1)
- {
- if (mysql_stmt_bind_param(queue->add_stmt, bind))
- {
- if ( mysql_stmt_errno(queue->add_stmt) == CR_NO_PREPARE_STMT )
- {
- if (queue->prepareAddStatement() == GEARMAND_QUEUE_ERROR)
- {
- return GEARMAND_QUEUE_ERROR;
- }
- continue;
- }
- else
- {
- gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_bind_param failed: %s", mysql_error(queue->con));
- return GEARMAND_QUEUE_ERROR;
- }
- }
- if (mysql_stmt_execute(queue->add_stmt))
- {
- if ( mysql_stmt_errno(queue->add_stmt) == CR_SERVER_LOST )
- {
- mysql_stmt_close(queue->add_stmt);
- if (queue->prepareAddStatement() != GEARMAND_QUEUE_ERROR)
- {
- continue;
- }
- }
- gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_execute failed: %s", mysql_error(queue->con));
- return GEARMAND_QUEUE_ERROR;
- }
- break;
- }
- return GEARMAND_SUCCESS;
- }
- static gearmand_error_t _mysql_queue_flush(gearman_server_st*, void*)
- {
- gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue flush");
- return GEARMAND_SUCCESS;
- }
- static gearmand_error_t _mysql_queue_done(gearman_server_st*, void *context,
- const char *unique,
- size_t unique_size,
- const char *function_name,
- size_t function_name_size)
- {
- MYSQL_BIND bind[2];
- gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue done: %.*s %.*s", (uint32_t) unique_size, (char *) unique,
- (uint32_t) function_name_size, (char *) function_name);
- gearmand::plugins::queue::MySQL *queue= (gearmand::plugins::queue::MySQL *)context;
- mysql_ping(queue->con);
- bind[0].buffer_type= MYSQL_TYPE_STRING;
- bind[0].buffer= (char *)unique;
- bind[0].buffer_length= unique_size;
- bind[0].is_null= 0;
- bind[0].length= (unsigned long*)&unique_size;
- bind[1].buffer_type= MYSQL_TYPE_STRING;
- bind[1].buffer= (char *)function_name;
- bind[1].buffer_length= function_name_size;
- bind[1].is_null= 0;
- bind[1].length= (unsigned long*)&function_name_size;
- while(1)
- {
- if (mysql_stmt_bind_param(queue->done_stmt, bind))
- {
- if ( mysql_stmt_errno(queue->done_stmt) == CR_NO_PREPARE_STMT )
- {
- if (queue->prepareDoneStatement() == GEARMAND_QUEUE_ERROR)
- {
- return GEARMAND_QUEUE_ERROR;
- }
- continue;
- }
- else
- {
- gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_bind_param failed: %s", mysql_error(queue->con));
- return GEARMAND_QUEUE_ERROR;
- }
- }
- if (mysql_stmt_execute(queue->done_stmt))
- {
- if ( mysql_stmt_errno(queue->done_stmt) == CR_SERVER_LOST )
- {
- mysql_stmt_close(queue->done_stmt);
- if (queue->prepareDoneStatement() != GEARMAND_QUEUE_ERROR)
- {
- continue;
- }
- }
- gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_stmt_execute failed: %s", mysql_error(queue->con));
- return GEARMAND_QUEUE_ERROR;
- }
- break;
- }
- return GEARMAND_SUCCESS;
- }
- static gearmand_error_t _mysql_queue_replay(gearman_server_st* server, void *context,
- gearman_queue_add_fn *add_fn,
- void *add_context)
- {
- MYSQL_RES * result;
- MYSQL_ROW row;
- char query_buffer[1024];
- gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM,"MySQL queue replay");
- gearmand::plugins::queue::MySQL *queue= (gearmand::plugins::queue::MySQL *)context;
- int query_buffer_length= snprintf(query_buffer, sizeof(query_buffer),
- "SELECT unique_key, function_name, data, priority, when_to_run FROM %s",
- queue->mysql_table.c_str());
- mysql_ping(queue->con);
- if (mysql_real_query(queue->con, query_buffer, query_buffer_length))
- {
- gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_real_query failed: %s", mysql_error(queue->con));
- return GEARMAND_QUEUE_ERROR;
- }
- if (!(result= mysql_store_result(queue->con)))
- {
- gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "mysql_store_result failed: %s", mysql_error(queue->con));
- return GEARMAND_QUEUE_ERROR;
- }
- if (mysql_num_fields(result) < 5)
- {
- gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "MySQL queue: insufficient row fields in queue table");
- return GEARMAND_QUEUE_ERROR;
- }
- gearmand_error_t ret= GEARMAND_SUCCESS;
- while ((row= mysql_fetch_row(result)))
- {
- unsigned long *lengths;
- gearman_job_priority_t priority= (gearman_job_priority_t)0;
- int when= 0;
- lengths= mysql_fetch_lengths(result);
- /* need to make a copy here ... gearman_server_job_free will free it later */
- size_t data_size= lengths[2];
- char * data= (char *)malloc(data_size);
- if (data == NULL)
- {
- return gearmand_perror(errno, "malloc failed");
- }
- memcpy(data, row[2], data_size);
- if (lengths[3])
- {
- priority= (gearman_job_priority_t) atoi(row[3]);
- }
- if (lengths[4])
- {
- when= atoi(row[4]);
- }
- ret= (*add_fn)(server, add_context,
- row[0], (size_t) lengths[0],
- row[1], (size_t) lengths[1],
- data, data_size,
- priority,
- when);
- if (ret != GEARMAND_SUCCESS)
- {
- break;
- }
- }
- mysql_free_result(result);
- return ret;
- }
|