Browse Source

First pass on making Clint's patch work.

Brian Aker 13 years ago
parent
commit
f88c689437

+ 100 - 4
libgearman-server/connection.c

@@ -71,8 +71,9 @@ static gearman_server_con_st * _server_con_create(gearman_server_thread_st *thre
       return NULL;
     }
   }
+
   assert(con);
-  if (!con)
+  if (con == NULL)
   {
     gearmand_error("Neigther an allocated gearman_server_con_st() or free listed could be found");
     *ret= GEARMAN_MEMORY_ALLOCATION_FAILURE;
@@ -112,6 +113,7 @@ static gearman_server_con_st * _server_con_create(gearman_server_thread_st *thre
   con->_host= dcon->host;
   con->_port= dcon->port;
   strcpy(con->id, "-");
+  con->timeout_event= NULL;
 
   con->protocol.context= NULL;
   con->protocol.context_free_fn= NULL;
@@ -203,6 +205,11 @@ void gearman_server_con_free(gearman_server_con_st *con)
     gearman_server_client_free(con->client_list);
   }
 
+  if (con->timeout_event != NULL)
+  {
+    event_del(con->timeout_event);
+  }
+
   (void) pthread_mutex_lock(&thread->lock);
   GEARMAN_LIST_DEL(con->thread->con, con,)
   (void) pthread_mutex_unlock(&thread->lock);
@@ -237,7 +244,9 @@ void gearman_server_con_set_id(gearman_server_con_st *con, char *id,
                                size_t size)
 {
   if (size >= GEARMAN_SERVER_CON_ID_SIZE)
+  {
     size= GEARMAN_SERVER_CON_ID_SIZE - 1;
+  }
 
   memcpy(con->id, id, size);
   con->id[size]= 0;
@@ -260,9 +269,13 @@ void gearman_server_con_free_worker(gearman_server_con_st *con,
 
       /* Set worker to the last kept worker, or the beginning of the list. */
       if (prev_worker == NULL)
+      {
         worker= con->worker_list;
+      }
       else
+      {
         worker= prev_worker;
+      }
     }
     else
     {
@@ -276,13 +289,17 @@ void gearman_server_con_free_worker(gearman_server_con_st *con,
 void gearman_server_con_free_workers(gearman_server_con_st *con)
 {
   while (con->worker_list != NULL)
+  {
     gearman_server_worker_free(con->worker_list);
+  }
 }
 
 void gearman_server_con_io_add(gearman_server_con_st *con)
 {
   if (con->io_list)
+  {
     return;
+  }
 
   (void) pthread_mutex_lock(&con->thread->lock);
 
@@ -318,7 +335,9 @@ gearman_server_con_io_next(gearman_server_thread_st *thread)
   gearman_server_con_st *con= thread->io_list;
 
   if (con == NULL)
+  {
     return NULL;
+  }
 
   gearman_server_con_io_remove(con);
 
@@ -328,7 +347,9 @@ gearman_server_con_io_next(gearman_server_thread_st *thread)
 void gearman_server_con_proc_add(gearman_server_con_st *con)
 {
   if (con->proc_list)
+  {
     return;
+  }
 
   (void) pthread_mutex_lock(&con->thread->lock);
   GEARMAN_LIST_ADD(con->thread->proc, con, proc_)
@@ -359,20 +380,22 @@ void gearman_server_con_proc_remove(gearman_server_con_st *con)
 gearman_server_con_st *
 gearman_server_con_proc_next(gearman_server_thread_st *thread)
 {
-  gearman_server_con_st *con;
-
   if (thread->proc_list == NULL)
+  {
     return NULL;
+  }
 
   (void) pthread_mutex_lock(&thread->lock);
 
-  con= thread->proc_list;
+  gearman_server_con_st *con= thread->proc_list;
   while (con != NULL)
   {
     GEARMAN_LIST_DEL(thread->proc, con, proc_)
     con->proc_list= false;
     if (!(con->proc_removed))
+    {
       break;
+    }
     con= thread->proc_list;
   }
 
@@ -397,5 +420,78 @@ void *gearmand_connection_protocol_context(const gearman_server_con_st *connecti
   return connection->protocol.context;
 }
 
+static void _server_job_timeout(int fd, short event, void *arg)
+{
+  gearman_server_job_st *job= (gearman_server_job_st *)arg;
+
+  fd= fd;
+  event= event;
+
+  /* A timeout has ocurred on a job, re-queue it */
+  gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
+                    "Worker timeout reached on job, requeueing: %s %s",
+                    job->job_handle, job->unique);
 
+  gearmand_error_t ret= gearman_server_job_queue(job);
+  if (ret != GEARMAN_SUCCESS)
+  {
+    gearmand_log_error(GEARMAN_DEFAULT_LOG_PARAM,
+                       "Failed trying to requeue job after timeout, job lost: %s %s",
+                       job->job_handle, job->unique);
+    gearman_server_job_free(job);
+  }
+}
+
+gearmand_error_t gearman_server_con_add_job_timeout(gearman_server_con_st *con, gearman_server_job_st *job)
+{
+  if (job)
+  {
+    gearman_server_worker_st *worker= con->worker_list;
+    for (worker= con->worker_list; worker != NULL; worker= worker->con_next)
+    {
+      /* Assumes the functions are always fetched from the same server structure */
+      if (worker->function == job->function)
+      {
+        break;
+      }
+    }
 
+    /* It makes no sense to add a timeout to a connection that has no workers for a job */
+    assert(worker);
+
+    if (worker && worker->timeout)
+    {
+      if (! con->timeout_event)
+      {
+        gearmand_con_st *dcon;
+        dcon= (gearmand_con_st *)con->con.context;
+        con->timeout_event= (struct event *)malloc(sizeof(struct event));
+        if (con->timeout_event == NULL)
+        {
+          return GEARMAN_MEMORY_ALLOCATION_FAILURE;
+        }
+        timeout_set(con->timeout_event, _server_job_timeout, job);
+        event_base_set(dcon->thread->base, con->timeout_event);
+      }
+
+      /* XXX Right now, if a worker has diff timeouts for functions I think
+        this will overwrite any existing timeouts on that event. One
+        solution to that would be to record the timeout from last time,
+        and only set this one if it is longer than that one. */
+
+      struct timeval timeout_tv = { 0 , 0 };
+      timeout_tv.tv_sec= worker->timeout;
+      timeout_add(con->timeout_event, &timeout_tv);
+    }
+  }
+  return GEARMAN_SUCCESS;
+}
+
+void gearman_server_con_delete_timeout(gearman_server_con_st *con)
+{
+  if (con->timeout_event)
+  {
+    timeout_del(con->timeout_event);
+    con->timeout_event= NULL;
+  }
+}

+ 16 - 0
libgearman-server/connection.h

@@ -18,6 +18,8 @@
 
 #include <libgearman-server/struct/io.h>
 
+struct gearman_server_job_st;
+
 #ifdef __cplusplus
 extern "C" {
 #endif
@@ -130,6 +132,7 @@ gearman_server_con_proc_next(gearman_server_thread_st *thread);
 
 /**
  * Set protocol context pointer.
+ * Add worker timeout for a connection tied to a job
  */
 GEARMAN_INTERNAL_API
 void gearmand_connection_set_protocol(gearman_server_con_st *connection, 
@@ -138,9 +141,22 @@ void gearmand_connection_set_protocol(gearman_server_con_st *connection,
                                       gearmand_packet_pack_fn *pack,
                                       gearmand_packet_unpack_fn *unpack);
 
+/**
+ * Set protocol context pointer.
+ * Add worker timeout for a connection tied to a job
+ */
+GEARMAN_API
+gearmand_error_t gearman_server_con_add_job_timeout(gearman_server_con_st *con, gearman_server_job_st *job);
+
 GEARMAN_INTERNAL_API
 void *gearmand_connection_protocol_context(const gearman_server_con_st *connection);
 
+/**
+ * Delete timeout event for a server con
+ */
+GEARMAN_API
+void gearman_server_con_delete_timeout(gearman_server_con_st *con);
+
 /** @} */
 
 #ifdef __cplusplus

+ 11 - 0
libgearman-server/server.c

@@ -422,6 +422,8 @@ gearmand_error_t gearman_server_run_command(gearman_server_con_st *server_con,
     if (server_job == NULL)
     {
       server_con->is_sleeping= true;
+      /* Remove any timeouts while sleeping */
+      gearman_server_con_delete_timeout(server_con);
     }
     else
     {
@@ -519,11 +521,20 @@ gearmand_error_t gearman_server_run_command(gearman_server_con_st *server_con,
       gearmand_gerror("gearman_server_io_packet_add", ret);
 
       if (server_job)
+      {
         return gearman_server_job_queue(server_job);
+      }
 
       return ret;
     }
 
+    /* Since job is assigned, we should respect function timeout */
+    if (server_job != NULL)
+    {
+      gearman_server_con_add_job_timeout(server_con, server_job);
+    }
+
+
     break;
 
   case GEARMAN_COMMAND_WORK_DATA:

+ 1 - 0
libgearman-server/struct/io.h

@@ -127,4 +127,5 @@ struct gearman_server_con_st
     gearmand_packet_pack_fn *packet_pack_fn;
     gearmand_packet_unpack_fn *packet_unpack_fn;
   } protocol;
+  struct event *timeout_event;
 };

+ 1 - 2
libgearman/connection.cc

@@ -966,8 +966,7 @@ size_t gearman_connection_st::recv_socket(void *data, size_t data_size, gearman_
 
         if (gearman_universal_is_non_blocking(universal))
         {
-          gearman_gerror(universal, GEARMAN_IO_WAIT);
-          ret= GEARMAN_IO_WAIT;
+          ret= gearman_gerror(universal, GEARMAN_IO_WAIT);
           return 0;
         }
 

+ 50 - 0
tests/libgearman-1.0/client_test.cc

@@ -250,6 +250,11 @@ extern "C"
   }
 }
 
+static void log_2_stderr(const char *line, gearman_verbose_t verbose, void*)
+{
+  Error << line << " " << gearman_verbose_name(verbose);
+}
+
 static test_return_t init_test(void *)
 {
   gearman_client_st client;
@@ -476,6 +481,7 @@ static test_return_t submit_job_test(void *object)
 {
   gearman_client_st *client= (gearman_client_st *)object;
   const char *worker_function= (const char *)gearman_client_context(client);
+  test_true(worker_function);
   gearman_string_t value= { test_literal_param("submit_job_test") };
 
   size_t result_length;
@@ -1005,6 +1011,45 @@ static test_return_t regression_785203_do_background_test(void *object)
   return TEST_SUCCESS;
 }
 
+static test_return_t gearman_worker_timeout_TEST(void *object)
+{
+  return TEST_SKIPPED;
+
+  gearman_client_st *client= (gearman_client_st *)object;
+  test_truth(client);
+
+  test_truth(WORKER_DEFAULT_SLEEP);
+  int timeout= WORKER_DEFAULT_SLEEP/4;
+  (void)timeout;
+
+  gearman_function_t dreaming_fn= gearman_function_create(echo_or_react_worker_v2);
+  worker_handle_st* worker_handle= test_worker_start(libtest::default_port(), NULL,
+                                                     __func__,
+                                                     dreaming_fn, NULL,
+                                                     gearman_worker_options_t(),
+                                                     0);
+
+  gearman_client_set_log_fn(client, log_2_stderr, NULL, GEARMAN_VERBOSE_MAX);
+
+  /*
+    The client should get a timeout since the "sleeper" will sleep longer then the timeout.
+  */
+  size_t result_length;
+  gearman_return_t rc;
+  void *job_result= gearman_client_do(client,
+                                      __func__,  // Our sleeper function
+                                      NULL, // No unique 
+                                      gearman_literal_param("sleep"), // We send "sleep" to tell the sleeper to sleep
+                                      &result_length, &rc);
+  test_compare(GEARMAN_NO_SERVERS, rc);
+  test_false(job_result);
+  test_zero(result_length);
+
+  delete worker_handle;
+
+  return TEST_SUCCESS;
+}
+
 static test_return_t submit_log_failure(void *object)
 {
   gearman_client_st *client= (gearman_client_st *)object;
@@ -1365,6 +1410,10 @@ test_st gearman_command_t_tests[] ={
   {0, 0, 0}
 };
 
+test_st gearman_worker_timeout_TESTS[] ={
+  {"gearman_worker_timeout()", 0, gearman_worker_timeout_TEST },
+  {0, 0, 0}
+};
 
 test_st tests_log[] ={
   {"submit_log_failure", 0, submit_log_failure },
@@ -1508,6 +1557,7 @@ collection_st collection[] ={
   {"regression_tests", 0, 0, regression_tests},
   {"limits", 0, 0, limit_tests},
   {"client-logging", pre_logging, post_logging, tests_log},
+  {"gearman_worker_timeout()", 0, 0, gearman_worker_timeout_TESTS },
   {0, 0, 0, 0}
 };
 

+ 5 - 0
tests/libgearman-1.0/workers.cc

@@ -69,6 +69,11 @@ gearman_return_t echo_or_react_worker_v2(gearman_job_st *job, void *)
   {
     return GEARMAN_FATAL;
   }
+  else if (result_size == test_literal_param_size("sleep") and (not memcmp(workload, test_literal_param("sleep"))))
+  {
+    libtest::dream(WORKER_DEFAULT_SLEEP, 0);
+    return GEARMAN_SUCCESS;
+  }
   else if (result_size == test_literal_param_size("exception") and (not memcmp(workload, test_literal_param("exception"))))
   {
     gearman_return_t rc= gearman_job_send_exception(job, test_literal_param("test exception"));

+ 13 - 4
tests/start_worker.cc

@@ -75,6 +75,7 @@ struct context_st {
   std::string function_name;
   void *context;
   int magic;
+  int _timeout;
   boost::barrier* _sync_point;
 
   context_st(worker_handle_st* handle_arg,
@@ -83,7 +84,8 @@ struct context_st {
              const std::string& namespace_key_arg,
              const std::string& function_name_arg,
              void *context_arg,
-             gearman_worker_options_t& options_arg) :
+             gearman_worker_options_t& options_arg,
+             int timeout_arg) :
     port(port_arg),
     handle(handle_arg),
     options(options_arg),
@@ -92,7 +94,8 @@ struct context_st {
     function_name(function_name_arg),
     context(context_arg),
     _sync_point(handle_arg->sync_point()),
-    magic(CONTEXT_MAGIC_MARKER)
+    magic(CONTEXT_MAGIC_MARKER),
+    _timeout(timeout_arg)
   {
   }
 
@@ -161,6 +164,11 @@ static void thread_runner(context_st* con)
     return;
   }
 
+  if (context->_timeout)
+  {
+    gearman_worker_set_timeout(&worker, context->_timeout);
+  }
+
   // Check for a working server by "asking" it for an option
   {
     size_t count= 5;
@@ -207,7 +215,8 @@ worker_handle_st *test_worker_start(in_port_t port,
                                     const char *function_name,
                                     gearman_function_t &worker_fn,
                                     void *context_arg,
-                                    gearman_worker_options_t options)
+                                    gearman_worker_options_t options,
+                                    int timeout)
 {
   worker_handle_st *handle= new worker_handle_st();
   fatal_assert(handle);
@@ -215,7 +224,7 @@ worker_handle_st *test_worker_start(in_port_t port,
   context_st *context= new context_st(handle, worker_fn, port,
                                       namespace_key ? namespace_key : "",
                                       function_name,
-                                      context_arg, options);
+                                      context_arg, options, timeout);
   fatal_assert(context);
 
   handle->_thread= new boost::thread(thread_runner, context);

+ 2 - 1
tests/start_worker.h

@@ -79,7 +79,8 @@ LIBTEST_API
 					     const char *function_name,
 					     gearman_function_t &worker_fn,
 					     void *context,
-					     gearman_worker_options_t options);
+					     gearman_worker_options_t options,
+               int timeout= 0);
 
 LIBTEST_API
 bool test_worker_stop(struct worker_handle_st *);

+ 2 - 0
tests/workers.h

@@ -37,6 +37,8 @@
 
 #pragma once
 
+#define WORKER_DEFAULT_SLEEP 20
+
 gearman_return_t echo_or_react_worker_v2(gearman_job_st *job, void *context);
 
 gearman_return_t echo_or_react_chunk_worker_v2(gearman_job_st *job, void *context);