Browse Source

Merge in current gearman work (need valgrind).

Brian Aker 14 years ago
parent
commit
8808dae4a2

+ 1 - 3
examples/reverse_client_bg.cc

@@ -126,9 +126,7 @@ int main(int args, char *argv[])
     gearman_client_set_timeout(&client, timeout);
 
 
-  gearman_workload_t workload= gearman_workload_make();
-  gearman_workload_set_background(&workload, true);
-
+  gearman_work_t workload= gearman_work(GEARMAN_JOB_PRIORITY_NORMAL);
 
   gearman_task_st *task;
   gearman_argument_t values[]= {

+ 2 - 3
examples/reverse_client_epoch.cc

@@ -126,8 +126,7 @@ int main(int args, char *argv[])
   if (timeout >= 0)
     gearman_client_set_timeout(&client, timeout);
 
-  gearman_workload_t workload= gearman_workload_make();
-  gearman_workload_set_epoch(&workload, time(NULL) +epoch);
+  gearman_work_t workload= gearman_work_epoch(time(NULL) +epoch);
 
   gearman_task_st *task;
   gearman_argument_t value= gearman_argument_make(text_to_echo.c_str(), text_to_echo.size());
@@ -150,7 +149,7 @@ int main(int args, char *argv[])
     ret= gearman_client_job_status(&client, gearman_task_job_handle(task),
                                    &is_known, &is_running,
                                    &numerator, &denominator);
-    if (ret != GEARMAN_SUCCESS)
+    if (gearman_failed(ret))
     {
       std::cerr << gearman_client_error(&client) << std::endl;
       exit_code= EXIT_FAILURE;

+ 4 - 1
libgearman-server/client.cc

@@ -57,9 +57,12 @@ gearman_server_client_add(gearman_server_con_st *con)
 
 void gearman_server_client_free(gearman_server_client_st *client)
 {
+  if (not client)
+    return;
+
   GEARMAN_LIST_DEL(client->con->client, client, con_)
 
-  if (client->job != NULL)
+  if (client->job)
   {
     GEARMAN_LIST_DEL(client->job->client, client, job_)
 

+ 1 - 0
libgearman-server/client.h

@@ -11,6 +11,7 @@
  * @brief Client Declarations
  */
 
+#pragma once
 #ifndef __GEARMAN_SERVER_CLIENT_H__
 #define __GEARMAN_SERVER_CLIENT_H__
 

+ 1 - 5
libgearman-server/constants.h

@@ -42,7 +42,7 @@ extern "C" {
 #define GEARMAND_JOB_HANDLE_SIZE 64
 #define GEARMAND_JOB_HASH_SIZE 383
 #define GEARMAN_MAX_COMMAND_ARGS 8
-#define GEARMAN_MAX_ERROR_SIZE 1024
+#define GEARMAN_MAX_ERROR_SIZE 2048
 #define GEARMAN_MAX_FREE_SERVER_CLIENT 1000
 #define GEARMAN_MAX_FREE_SERVER_CON 1000
 #define GEARMAN_MAX_FREE_SERVER_JOB 1000
@@ -105,10 +105,6 @@ typedef enum
  */
 
 /* Types. */
-typedef struct gearman_command_info_st gearman_command_info_st;
-typedef struct gearman_conf_module_st gearman_conf_module_st;
-typedef struct gearman_conf_option_st gearman_conf_option_st;
-typedef struct gearman_conf_st gearman_conf_st;
 typedef struct gearman_server_client_st gearman_server_client_st;
 typedef struct gearman_server_con_st gearman_server_con_st;
 typedef struct gearman_server_function_st gearman_server_function_st;

+ 1 - 0
libgearman-server/error/type.h

@@ -95,4 +95,5 @@ typedef enum
   GEARMAN_MAX_RETURN /* Always add new error code before */
 } gearmand_error_t;
 
+#define gearmand_success(X) ((X) == GEARMAN_SUCCESS)
 #define gearmand_failed(X) ((X) != GEARMAN_SUCCESS)

+ 1 - 0
libgearman-server/function.h

@@ -11,6 +11,7 @@
  * @brief Function Declarations
  */
 
+#pragma once
 #ifndef __GEARMAN_SERVER_FUNCTION_H__
 #define __GEARMAN_SERVER_FUNCTION_H__
 

+ 41 - 16
libgearman-server/job.c

@@ -86,19 +86,35 @@ _server_job_get_unique(gearman_server_st *server, uint32_t unique_key,
 /*
  * Public definitions
  */
+gearman_server_job_st * gearman_server_job_add(gearman_server_st *server,
+                                               const char *function_name, size_t function_name_size,
+                                               const char *unique, size_t unique_size,
+                                               const void *data, size_t data_size,
+                                               gearmand_job_priority_t priority,
+                                               gearman_server_client_st *server_client,
+                                               gearmand_error_t *ret_ptr,
+                                               int64_t when)
+{
+  return gearman_server_job_add_reducer(server,
+                                        function_name, function_name_size,
+                                        NULL, 0,
+                                        unique, unique_size, 
+                                        data, data_size,
+                                        priority, server_client, ret_ptr, when);
+}
 
 gearman_server_job_st *
-gearman_server_job_add(gearman_server_st *server,
-                       const char *function_name, size_t function_name_size,
-                       const char *unique, size_t unique_size,
-                       const void *data, size_t data_size,
-                       gearmand_job_priority_t priority,
-                       gearman_server_client_st *server_client,
-                       gearmand_error_t *ret_ptr,
-                       int64_t when)
+gearman_server_job_add_reducer(gearman_server_st *server,
+                               const char *function_name, size_t function_name_size,
+                               const char *reducer_name, size_t reducer_size,
+                               const char *unique, size_t unique_size,
+                               const void *data, size_t data_size,
+                               gearmand_job_priority_t priority,
+                               gearman_server_client_st *server_client,
+                               gearmand_error_t *ret_ptr,
+                               int64_t when)
 {
-  gearman_server_function_st *server_function= gearman_server_function_get(server, function_name,
-                                                                           function_name_size);
+  gearman_server_function_st *server_function= gearman_server_function_get(server, function_name, function_name_size);
   if (server_function == NULL)
   {
     *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
@@ -180,6 +196,15 @@ gearman_server_job_add(gearman_server_st *server,
     server_job->data= data;
     server_job->data_size= data_size;
 		server_job->when= when; 
+
+    if (reducer_size)
+    {
+      strncpy(server_job->reducer, reducer_name, reducer_size);
+    }
+    else
+    {
+      server_job->reducer[0]= 0;
+    }
 		
     server_job->unique_key= key;
     key= key % GEARMAND_JOB_HASH_SIZE;
@@ -205,7 +230,7 @@ gearman_server_job_add(gearman_server_st *server,
                                            function_name_size,
                                            data, data_size, priority, 
                                            when);
-      if (*ret_ptr != GEARMAN_SUCCESS)
+      if (gearmand_failed(*ret_ptr))
       {
         server_job->data= NULL;
         gearman_server_job_free(server_job);
@@ -228,7 +253,7 @@ gearman_server_job_add(gearman_server_st *server,
     }
 
     *ret_ptr= gearman_server_job_queue(server_job);
-    if (*ret_ptr != GEARMAN_SUCCESS)
+    if (gearmand_failed(*ret_ptr))
     {
       if (server_client == NULL && server->queue._done_fn != NULL)
       {
@@ -249,7 +274,7 @@ gearman_server_job_add(gearman_server_st *server,
     *ret_ptr= GEARMAN_JOB_EXISTS;
   }
 
-  if (server_client != NULL)
+  if (server_client)
   {
     server_client->job= server_job;
     GEARMAN_LIST_ADD(server_job->client, server_client, job_)
@@ -384,9 +409,9 @@ gearman_server_job_peek(gearman_server_con_st *server_con)
 
         int64_t current_time= (int64_t)time(NULL);
 
-        while(server_job != NULL && 
-             server_job->when != 0 && 
-             server_job->when > current_time)
+        while(server_job && 
+              server_job->when != 0 && 
+              server_job->when > current_time)
         {
           server_job = server_job->function_next;  
         }

+ 26 - 14
libgearman-server/job.h

@@ -11,16 +11,9 @@
  * @brief Job Declarations
  */
 
-#ifndef __GEARMAN_SERVER_JOB_H__
-#define __GEARMAN_SERVER_JOB_H__
+#pragma once
 
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-/**
- * @addtogroup gearman_server_job Job Declarations
- * @ingroup gearman_server
+/** @addtogroup gearman_server_job Job Declarations @ingroup gearman_server
  *
  * This is a low level interface for gearman server jobs. This is used
  * internally by the server interface, so you probably want to look there first.
@@ -54,21 +47,42 @@ struct gearman_server_job_st
   gearman_server_worker_st *worker;
   char job_handle[GEARMAND_JOB_HANDLE_SIZE];
   char unique[GEARMAN_UNIQUE_SIZE];
+  char reducer[GEARMAN_UNIQUE_SIZE];
 };
 
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
 /**
  * Add a new job to a server instance.
  */
 GEARMAN_API
 gearman_server_job_st *
-gearman_server_job_add(gearman_server_st *server, const char *function_name,
-                       size_t function_name_size, const char *unique,
-                       size_t unique_size, const void *data, size_t data_size,
+gearman_server_job_add(gearman_server_st *server,
+                       const char *function_name, size_t function_name_size,
+                       const char *unique, size_t unique_size,
+                       const void *data, size_t data_size,
                        gearmand_job_priority_t priority,
                        gearman_server_client_st *server_client,
                        gearmand_error_t *ret_ptr,
                        int64_t when);
 
+GEARMAN_API
+gearman_server_job_st *
+gearman_server_job_add_reducer(gearman_server_st *server,
+                               const char *function_name, size_t function_name_size, 
+                               const char *reducer, size_t reducer_name_size, 
+                               const char *unique, size_t unique_size, 
+                               const void *data, size_t data_size,
+                               gearmand_job_priority_t priority,
+                               gearman_server_client_st *server_client,
+                               gearmand_error_t *ret_ptr,
+                               int64_t when);
+
+
+
 /**
  * Initialize a server job structure.
  */
@@ -115,5 +129,3 @@ gearmand_error_t gearman_server_job_queue(gearman_server_job_st *server_job);
 #ifdef __cplusplus
 }
 #endif
-
-#endif /* __GEARMAN_SERVER_JOB_H__ */

+ 2 - 2
libgearman-server/log.h

@@ -23,8 +23,8 @@ extern "C" {
 #define AT __FILE__ ":" TOSTRING(__LINE__)
 
 #ifdef __cplusplus
-#define STRING_WITH_LEN(X) (X), (static_cast<size_t>((sizeof(X) - 1)))
-#define gearman_literal_param(X) (X), (static_cast<size_t>((sizeof(X) - 1)))
+#define STRING_WITH_LEN(X) (X), (size_t((sizeof(X) - 1)))
+#define gearman_literal_param(X) (X), (size_t(sizeof(X) - 1))
 #else
 #define STRING_WITH_LEN(X) (X), ((size_t)((sizeof(X) - 1)))
 #define gearman_literal_param(X) (X), ((size_t)((sizeof(X) - 1)))

Some files were not shown because too many files changed in this diff