|
@@ -99,7 +99,7 @@ Postgres::~Postgres ()
|
|
|
gearmand_error_t Postgres::initialize()
|
|
|
{
|
|
|
_create_query+= "CREATE TABLE " +table +" (unique_key VARCHAR" +"(" + TOSTRING(GEARMAN_UNIQUE_SIZE) +"), ";
|
|
|
- _create_query+= "function_name VARCHAR(255), priority INTEGER, data BYTEA, when_to_run INTEGER, UNIQUE KEY (unique_key, function_name))";
|
|
|
+ _create_query+= "function_name VARCHAR(255), priority INTEGER, data BYTEA, when_to_run INTEGER, UNIQUE (unique_key, function_name))";
|
|
|
|
|
|
gearmand_error_t ret= _initialize(&Gearmand()->server, this);
|
|
|
|
|
@@ -232,19 +232,18 @@ static gearmand_error_t _libpq_add(gearman_server_st *server, void *context,
|
|
|
(void)server;
|
|
|
(void)when;
|
|
|
gearmand::plugins::queue::Postgres *queue= (gearmand::plugins::queue::Postgres *)context;
|
|
|
- PGresult *result;
|
|
|
|
|
|
char buffer[22];
|
|
|
snprintf(buffer, sizeof(buffer), "%u", static_cast<uint32_t>(priority));
|
|
|
|
|
|
- const char *param_values[5]= {
|
|
|
+ const char *param_values[]= {
|
|
|
(char *)buffer,
|
|
|
(char *)unique,
|
|
|
(char *)function_name,
|
|
|
(char *)data,
|
|
|
(char *)when };
|
|
|
|
|
|
- int param_lengths[5]= {
|
|
|
+ int param_lengths[]= {
|
|
|
(int)strlen(buffer),
|
|
|
(int)unique_size,
|
|
|
(int)function_name_size,
|
|
@@ -253,7 +252,9 @@ static gearmand_error_t _libpq_add(gearman_server_st *server, void *context,
|
|
|
|
|
|
gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "libpq add: %.*s", (uint32_t)unique_size, (char *)unique);
|
|
|
|
|
|
- result= PQexecParams(queue->con, queue->insert().c_str(), 3, NULL, param_values, param_lengths, NULL, 0);
|
|
|
+ PGresult *result= PQexecParams(queue->con, queue->insert().c_str(),
|
|
|
+ gearmand_array_size(param_lengths),
|
|
|
+ NULL, param_values, param_lengths, NULL, 0);
|
|
|
if (result == NULL || PQresultStatus(result) != PGRES_COMMAND_OK)
|
|
|
{
|
|
|
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "PQexec:%s", PQerrorMessage(queue->con));
|
|
@@ -266,10 +267,8 @@ static gearmand_error_t _libpq_add(gearman_server_st *server, void *context,
|
|
|
return GEARMAN_SUCCESS;
|
|
|
}
|
|
|
|
|
|
-static gearmand_error_t _libpq_flush(gearman_server_st *server,
|
|
|
- void *context __attribute__((unused)))
|
|
|
+static gearmand_error_t _libpq_flush(gearman_server_st *, void *)
|
|
|
{
|
|
|
- (void)server;
|
|
|
gearmand_debug("libpq flush");
|
|
|
|
|
|
return GEARMAN_SUCCESS;
|
|
@@ -315,11 +314,10 @@ static gearmand_error_t _libpq_replay(gearman_server_st *server, void *context,
|
|
|
void *add_context)
|
|
|
{
|
|
|
gearmand::plugins::queue::Postgres *queue= (gearmand::plugins::queue::Postgres *)context;
|
|
|
- PGresult *result;
|
|
|
|
|
|
gearmand_info("libpq replay start");
|
|
|
|
|
|
- result= PQexecParams(queue->con, queue->create().c_str(), 0, NULL, NULL, NULL, NULL, 1);
|
|
|
+ PGresult *result= PQexecParams(queue->con, queue->create().c_str(), 0, NULL, NULL, NULL, NULL, 1);
|
|
|
if (result == NULL || PQresultStatus(result) != PGRES_TUPLES_OK)
|
|
|
{
|
|
|
gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM, "PQexecParams:%s", PQerrorMessage(queue->con));
|
|
@@ -330,27 +328,28 @@ static gearmand_error_t _libpq_replay(gearman_server_st *server, void *context,
|
|
|
for (int row= 0; row < PQntuples(result); row++)
|
|
|
{
|
|
|
gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
|
|
|
- "libpq replay: %.*s",
|
|
|
- PQgetlength(result, row, 0),
|
|
|
- PQgetvalue(result, row, 0));
|
|
|
+ "libpq replay: %.*s",
|
|
|
+ PQgetlength(result, row, 0),
|
|
|
+ PQgetvalue(result, row, 0));
|
|
|
|
|
|
+ size_t data_length;
|
|
|
char *data;
|
|
|
if (PQgetlength(result, row, 3) == 0)
|
|
|
{
|
|
|
data= NULL;
|
|
|
+ data_length= 0;
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- data= (char *)malloc((size_t)PQgetlength(result, row, 3));
|
|
|
- if (data == NULL)
|
|
|
+ data_length= size_t(PQgetlength(result, row, 3));
|
|
|
+ data= (char *)malloc(data_length);
|
|
|
+ if (not data)
|
|
|
{
|
|
|
PQclear(result);
|
|
|
- gearmand_perror("malloc");
|
|
|
- return GEARMAN_MEMORY_ALLOCATION_FAILURE;
|
|
|
+ return gearmand_perror("malloc");
|
|
|
}
|
|
|
|
|
|
- memcpy(data, PQgetvalue(result, row, 3),
|
|
|
- (size_t)PQgetlength(result, row, 3));
|
|
|
+ memcpy(data, PQgetvalue(result, row, 3), data_length);
|
|
|
}
|
|
|
|
|
|
gearmand_error_t ret;
|
|
@@ -358,10 +357,10 @@ static gearmand_error_t _libpq_replay(gearman_server_st *server, void *context,
|
|
|
(size_t)PQgetlength(result, row, 0),
|
|
|
PQgetvalue(result, row, 1),
|
|
|
(size_t)PQgetlength(result, row, 1),
|
|
|
- data, (size_t)PQgetlength(result, row, 3),
|
|
|
+ data, data_length,
|
|
|
(gearmand_job_priority_t)atoi(PQgetvalue(result, row, 2)),
|
|
|
atoll(PQgetvalue(result, row, 4)));
|
|
|
- if (ret != GEARMAN_SUCCESS)
|
|
|
+ if (gearmand_failed(ret))
|
|
|
{
|
|
|
PQclear(result);
|
|
|
return ret;
|