Browse Source

Merge in split of task run out to its own file.

Brian Aker 13 years ago
parent
commit
d7a3987b78
4 changed files with 363 additions and 279 deletions
  1. 1 279
      libgearman/client.cc
  2. 2 0
      libgearman/include.am
  3. 318 0
      libgearman/run.cc
  4. 42 0
      libgearman/run.hpp

+ 1 - 279
libgearman/client.cc

@@ -42,6 +42,7 @@
 #include <libgearman/add.hpp>
 #include <libgearman/connection.h>
 #include <libgearman/packet.hpp>
+#include <libgearman/run.hpp>
 
 #include <cassert>
 #include <cerrno>
@@ -60,12 +61,6 @@ static gearman_client_st *_client_allocate(gearman_client_st *client, bool is_cl
 static gearman_return_t _client_add_server(const char *host, in_port_t port,
                                            void *context);
 
-/**
- * Task state machine.
- */
-static gearman_return_t _client_run_task(gearman_client_st *client,
-                                         gearman_task_st *task);
-
 /**
  * Real do function.
  */
@@ -1110,279 +1105,6 @@ static gearman_return_t _client_add_server(const char *host, in_port_t port,
   return gearman_client_add_server(static_cast<gearman_client_st *>(context), host, port);
 }
 
-static gearman_return_t _client_run_task(gearman_client_st *client, gearman_task_st *task)
-{
-  switch(task->state)
-  {
-  case GEARMAN_TASK_STATE_NEW:
-    if (task->client->universal.con_list == NULL)
-    {
-      client->new_tasks--;
-      client->running_tasks--;
-      gearman_universal_set_error(client->universal, GEARMAN_NO_SERVERS, __func__, AT, "no servers added");
-      return GEARMAN_NO_SERVERS;
-    }
-
-    for (task->con= task->client->universal.con_list; task->con;
-         task->con= task->con->next)
-    {
-      if (task->con->send_state == GEARMAN_CON_SEND_STATE_NONE)
-        break;
-    }
-
-    if (task->con == NULL)
-    {
-      client->options.no_new= true;
-      gearman_gerror(client->universal, GEARMAN_IO_WAIT);
-      return GEARMAN_IO_WAIT;
-    }
-
-    client->new_tasks--;
-
-    if (task->send.command != GEARMAN_COMMAND_GET_STATUS)
-    {
-      task->created_id= task->con->created_id_next;
-      task->con->created_id_next++;
-    }
-
-  case GEARMAN_TASK_STATE_SUBMIT:
-    while (1)
-    {
-      gearman_return_t ret= task->con->send(task->send, client->new_tasks == 0 ? true : false);
-
-      if (gearman_success(ret))
-      {
-        break;
-      }
-      else if (ret == GEARMAN_IO_WAIT)
-      {
-        task->state= GEARMAN_TASK_STATE_SUBMIT;
-        return ret;
-      }
-      else if (gearman_failed(ret))
-      {
-        /* Increment this since the job submission failed. */
-        task->con->created_id++;
-
-        if (ret == GEARMAN_COULD_NOT_CONNECT)
-        {
-          for (task->con= task->con->next; task->con;
-               task->con= task->con->next)
-          {
-            if (task->con->send_state == GEARMAN_CON_SEND_STATE_NONE)
-              break;
-          }
-        }
-        else
-        {
-          task->con= NULL;
-        }
-
-        if (task->con == NULL)
-        {
-          client->running_tasks--;
-          return ret;
-        }
-
-        if (task->send.command != GEARMAN_COMMAND_GET_STATUS)
-        {
-          task->created_id= task->con->created_id_next;
-          task->con->created_id_next++;
-        }
-      }
-    }
-
-    if (task->send.data_size > 0 && task->send.data == NULL)
-    {
-      if (not task->func.workload_fn)
-      {
-        gearman_error(client->universal, GEARMAN_NEED_WORKLOAD_FN,
-                      "workload size > 0, but no data pointer or workload_fn was given");
-        return GEARMAN_NEED_WORKLOAD_FN;
-      }
-
-  case GEARMAN_TASK_STATE_WORKLOAD:
-      gearman_return_t ret= task->func.workload_fn(task);
-      if (gearman_failed(ret))
-      {
-        task->state= GEARMAN_TASK_STATE_WORKLOAD;
-        return ret;
-      }
-    }
-
-    client->options.no_new= false;
-    task->state= GEARMAN_TASK_STATE_WORK;
-    task->con->set_events(POLLIN);
-    return GEARMAN_SUCCESS;
-
-  case GEARMAN_TASK_STATE_WORK:
-    if (task->recv->command == GEARMAN_COMMAND_JOB_CREATED)
-    {
-      snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
-               int(task->recv->arg_size[0]),
-               static_cast<char *>(task->recv->arg[0]));
-
-  case GEARMAN_TASK_STATE_CREATED:
-      if (task->func.created_fn)
-      {
-        gearman_return_t ret= task->func.created_fn(task);
-        if (gearman_failed(ret))
-        {
-          task->state= GEARMAN_TASK_STATE_CREATED;
-          return ret;
-        }
-      }
-
-      if (task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_BG ||
-          task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG ||
-          task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG ||
-          task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH ||
-          task->send.command == GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND)
-      {
-        break;
-      }
-    }
-    else if (task->recv->command == GEARMAN_COMMAND_WORK_DATA)
-    {
-  case GEARMAN_TASK_STATE_DATA:
-      if (task->func.data_fn)
-      {
-        gearman_return_t ret= task->func.data_fn(task);
-        if (gearman_failed(ret))
-        {
-          task->state= GEARMAN_TASK_STATE_DATA;
-          return ret;
-        }
-      }
-    }
-    else if (task->recv->command == GEARMAN_COMMAND_WORK_WARNING)
-    {
-  case GEARMAN_TASK_STATE_WARNING:
-      if (task->func.warning_fn)
-      {
-        gearman_return_t ret= task->func.warning_fn(task);
-        if (gearman_failed(ret))
-        {
-          task->state= GEARMAN_TASK_STATE_WARNING;
-          return ret;
-        }
-      }
-    }
-    else if (task->recv->command == GEARMAN_COMMAND_WORK_STATUS ||
-             task->recv->command == GEARMAN_COMMAND_STATUS_RES)
-    {
-      uint8_t x;
-
-      if (task->recv->command == GEARMAN_COMMAND_STATUS_RES)
-      {
-        if (atoi(static_cast<char *>(task->recv->arg[1])) == 0)
-          task->options.is_known= false;
-        else
-          task->options.is_known= true;
-
-        if (atoi(static_cast<char *>(task->recv->arg[2])) == 0)
-          task->options.is_running= false;
-        else
-          task->options.is_running= true;
-
-        x= 3;
-      }
-      else
-      {
-        x= 1;
-      }
-
-      task->numerator= uint32_t(atoi(static_cast<char *>(task->recv->arg[x])));
-      char status_buffer[11]; /* Max string size to hold a uint32_t. */
-      snprintf(status_buffer, 11, "%.*s",
-               int(task->recv->arg_size[x + 1]),
-               static_cast<char *>(task->recv->arg[x + 1]));
-      task->denominator= uint32_t(atoi(status_buffer));
-
-  case GEARMAN_TASK_STATE_STATUS:
-      if (task->func.status_fn)
-      {
-        gearman_return_t ret= task->func.status_fn(task);
-        if (gearman_failed(ret))
-        {
-          task->state= GEARMAN_TASK_STATE_STATUS;
-          return ret;
-        }
-      }
-
-      if (task->send.command == GEARMAN_COMMAND_GET_STATUS)
-        break;
-    }
-    else if (task->recv->command == GEARMAN_COMMAND_WORK_COMPLETE)
-    {
-      task->result_rc= GEARMAN_SUCCESS;
-
-  case GEARMAN_TASK_STATE_COMPLETE:
-      if (task->func.complete_fn)
-      {
-        gearman_return_t ret= task->func.complete_fn(task);
-        if (gearman_failed(ret))
-        {
-          task->state= GEARMAN_TASK_STATE_COMPLETE;
-          return ret;
-        }
-      }
-
-      break;
-    }
-    else if (task->recv->command == GEARMAN_COMMAND_WORK_EXCEPTION)
-    {
-  case GEARMAN_TASK_STATE_EXCEPTION:
-      if (task->func.exception_fn)
-      {
-        gearman_return_t ret= task->func.exception_fn(task);
-        if (gearman_failed(ret))
-        {
-          task->state= GEARMAN_TASK_STATE_EXCEPTION;
-          return ret;
-        }
-      }
-    }
-    else if (task->recv->command == GEARMAN_COMMAND_WORK_FAIL)
-    {
-      // If things fail we need to delete the result, and set the result_rc
-      // correctly.
-      delete task->result_ptr;
-      task->result_ptr= NULL;
-      task->result_rc= GEARMAN_WORK_FAIL;
-
-  case GEARMAN_TASK_STATE_FAIL:
-      if (task->func.fail_fn)
-      {
-        gearman_return_t ret= task->func.fail_fn(task);
-        if (gearman_failed(ret))
-        {
-          task->state= GEARMAN_TASK_STATE_FAIL;
-          return ret;
-        }
-      }
-
-      break;
-    }
-
-    task->state= GEARMAN_TASK_STATE_WORK;
-    return GEARMAN_SUCCESS;
-
-  case GEARMAN_TASK_STATE_FINISHED:
-    break;
-  }
-
-  client->running_tasks--;
-  task->state= GEARMAN_TASK_STATE_FINISHED;
-
-  if (client->options.free_tasks)
-  {
-    gearman_task_free(task);
-  }
-
-  return GEARMAN_SUCCESS;
-}
-
 static void *_client_do(gearman_client_st *client, gearman_command_t command,
                         const char *function_name,
                         const char *unique,

+ 2 - 0
libgearman/include.am

@@ -54,6 +54,7 @@ noinst_HEADERS+= \
 		 libgearman/log.h \
 		 libgearman/packet.hpp \
 		 libgearman/result.hpp \
+		 libgearman/run.hpp \
 		 libgearman/strcommand.h \
 		 libgearman/unique.h \
 		 libgearman/universal.hpp \
@@ -89,6 +90,7 @@ libgearman_libgearman_la_SOURCES= \
 				  libgearman/log.cc \
 				  libgearman/packet.cc \
 				  libgearman/result.cc \
+				  libgearman/run.cc \
 				  libgearman/strcommand.cc \
 				  libgearman/strerror.cc \
 				  libgearman/string.cc \

+ 318 - 0
libgearman/run.cc

@@ -0,0 +1,318 @@
+/*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
+ * 
+ *  Gearmand client and server library.
+ *
+ *  Copyright (C) 2011 Data Differential, http://datadifferential.com/
+ *  Copyright (C) 2008 Brian Aker, Eric Day
+ *  All rights reserved.
+ *
+ *  Redistribution and use in source and binary forms, with or without
+ *  modification, are permitted provided that the following conditions are
+ *  met:
+ *
+ *      * Redistributions of source code must retain the above copyright
+ *  notice, this list of conditions and the following disclaimer.
+ *
+ *      * Redistributions in binary form must reproduce the above
+ *  copyright notice, this list of conditions and the following disclaimer
+ *  in the documentation and/or other materials provided with the
+ *  distribution.
+ *
+ *      * The names of its contributors may not be used to endorse or
+ *  promote products derived from this software without specific prior
+ *  written permission.
+ *
+ *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <libgearman/common.h>
+
+#include <cstdio>
+#include <cstdlib>
+
+#include <libgearman/run.hpp>
+#include <libgearman/universal.hpp>
+
+gearman_return_t _client_run_task(gearman_client_st *client, gearman_task_st *task)
+{
+  switch(task->state)
+  {
+  case GEARMAN_TASK_STATE_NEW:
+    if (task->client->universal.con_list == NULL)
+    {
+      client->new_tasks--;
+      client->running_tasks--;
+      gearman_universal_set_error(client->universal, GEARMAN_NO_SERVERS, __func__, AT, "no servers added");
+      return GEARMAN_NO_SERVERS;
+    }
+
+    for (task->con= task->client->universal.con_list; task->con;
+         task->con= task->con->next)
+    {
+      if (task->con->send_state == GEARMAN_CON_SEND_STATE_NONE)
+        break;
+    }
+
+    if (not task->con)
+    {
+      client->options.no_new= true;
+      gearman_gerror(client->universal, GEARMAN_IO_WAIT);
+      return GEARMAN_IO_WAIT;
+    }
+
+    client->new_tasks--;
+
+    if (task->send.command != GEARMAN_COMMAND_GET_STATUS)
+    {
+      task->created_id= task->con->created_id_next;
+      task->con->created_id_next++;
+    }
+
+  case GEARMAN_TASK_STATE_SUBMIT:
+    while (1)
+    {
+      gearman_return_t ret= task->con->send(task->send, client->new_tasks == 0 ? true : false);
+
+      if (gearman_success(ret))
+      {
+        break;
+      }
+      else if (ret == GEARMAN_IO_WAIT)
+      {
+        task->state= GEARMAN_TASK_STATE_SUBMIT;
+        return ret;
+      }
+      else if (gearman_failed(ret))
+      {
+        /* Increment this since the job submission failed. */
+        task->con->created_id++;
+
+        if (ret == GEARMAN_COULD_NOT_CONNECT)
+        {
+          for (task->con= task->con->next; task->con;
+               task->con= task->con->next)
+          {
+            if (task->con->send_state == GEARMAN_CON_SEND_STATE_NONE)
+              break;
+          }
+        }
+        else
+        {
+          task->con= NULL;
+        }
+
+        if (task->con == NULL)
+        {
+          client->running_tasks--;
+          return ret;
+        }
+
+        if (task->send.command != GEARMAN_COMMAND_GET_STATUS)
+        {
+          task->created_id= task->con->created_id_next;
+          task->con->created_id_next++;
+        }
+      }
+    }
+
+    if (task->send.data_size > 0 && task->send.data == NULL)
+    {
+      if (not task->func.workload_fn)
+      {
+        gearman_error(client->universal, GEARMAN_NEED_WORKLOAD_FN,
+                      "workload size > 0, but no data pointer or workload_fn was given");
+        return GEARMAN_NEED_WORKLOAD_FN;
+      }
+
+  case GEARMAN_TASK_STATE_WORKLOAD:
+      gearman_return_t ret= task->func.workload_fn(task);
+      if (gearman_failed(ret))
+      {
+        task->state= GEARMAN_TASK_STATE_WORKLOAD;
+        return ret;
+      }
+    }
+
+    client->options.no_new= false;
+    task->state= GEARMAN_TASK_STATE_WORK;
+    task->con->set_events(POLLIN);
+    return GEARMAN_SUCCESS;
+
+  case GEARMAN_TASK_STATE_WORK:
+    if (task->recv->command == GEARMAN_COMMAND_JOB_CREATED)
+    {
+      snprintf(task->job_handle, GEARMAN_JOB_HANDLE_SIZE, "%.*s",
+               int(task->recv->arg_size[0]),
+               static_cast<char *>(task->recv->arg[0]));
+
+  case GEARMAN_TASK_STATE_CREATED:
+      if (task->func.created_fn)
+      {
+        gearman_return_t ret= task->func.created_fn(task);
+        if (gearman_failed(ret))
+        {
+          task->state= GEARMAN_TASK_STATE_CREATED;
+          return ret;
+        }
+      }
+
+      if (task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_BG ||
+          task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG ||
+          task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG ||
+          task->send.command == GEARMAN_COMMAND_SUBMIT_JOB_EPOCH ||
+          task->send.command == GEARMAN_COMMAND_SUBMIT_REDUCE_JOB_BACKGROUND)
+      {
+        break;
+      }
+    }
+    else if (task->recv->command == GEARMAN_COMMAND_WORK_DATA)
+    {
+  case GEARMAN_TASK_STATE_DATA:
+      if (task->func.data_fn)
+      {
+        gearman_return_t ret= task->func.data_fn(task);
+        if (gearman_failed(ret))
+        {
+          task->state= GEARMAN_TASK_STATE_DATA;
+          return ret;
+        }
+      }
+    }
+    else if (task->recv->command == GEARMAN_COMMAND_WORK_WARNING)
+    {
+  case GEARMAN_TASK_STATE_WARNING:
+      if (task->func.warning_fn)
+      {
+        gearman_return_t ret= task->func.warning_fn(task);
+        if (gearman_failed(ret))
+        {
+          task->state= GEARMAN_TASK_STATE_WARNING;
+          return ret;
+        }
+      }
+    }
+    else if (task->recv->command == GEARMAN_COMMAND_WORK_STATUS ||
+             task->recv->command == GEARMAN_COMMAND_STATUS_RES)
+    {
+      uint8_t x;
+
+      if (task->recv->command == GEARMAN_COMMAND_STATUS_RES)
+      {
+        if (atoi(static_cast<char *>(task->recv->arg[1])) == 0)
+          task->options.is_known= false;
+        else
+          task->options.is_known= true;
+
+        if (atoi(static_cast<char *>(task->recv->arg[2])) == 0)
+          task->options.is_running= false;
+        else
+          task->options.is_running= true;
+
+        x= 3;
+      }
+      else
+      {
+        x= 1;
+      }
+
+      task->numerator= uint32_t(atoi(static_cast<char *>(task->recv->arg[x])));
+      char status_buffer[11]; /* Max string size to hold a uint32_t. */
+      snprintf(status_buffer, 11, "%.*s",
+               int(task->recv->arg_size[x + 1]),
+               static_cast<char *>(task->recv->arg[x + 1]));
+      task->denominator= uint32_t(atoi(status_buffer));
+
+  case GEARMAN_TASK_STATE_STATUS:
+      if (task->func.status_fn)
+      {
+        gearman_return_t ret= task->func.status_fn(task);
+        if (gearman_failed(ret))
+        {
+          task->state= GEARMAN_TASK_STATE_STATUS;
+          return ret;
+        }
+      }
+
+      if (task->send.command == GEARMAN_COMMAND_GET_STATUS)
+        break;
+    }
+    else if (task->recv->command == GEARMAN_COMMAND_WORK_COMPLETE)
+    {
+      task->result_rc= GEARMAN_SUCCESS;
+
+  case GEARMAN_TASK_STATE_COMPLETE:
+      if (task->func.complete_fn)
+      {
+        gearman_return_t ret= task->func.complete_fn(task);
+        if (gearman_failed(ret))
+        {
+          task->state= GEARMAN_TASK_STATE_COMPLETE;
+          return ret;
+        }
+      }
+
+      break;
+    }
+    else if (task->recv->command == GEARMAN_COMMAND_WORK_EXCEPTION)
+    {
+  case GEARMAN_TASK_STATE_EXCEPTION:
+      if (task->func.exception_fn)
+      {
+        gearman_return_t ret= task->func.exception_fn(task);
+        if (gearman_failed(ret))
+        {
+          task->state= GEARMAN_TASK_STATE_EXCEPTION;
+          return ret;
+        }
+      }
+    }
+    else if (task->recv->command == GEARMAN_COMMAND_WORK_FAIL)
+    {
+      // If things fail we need to delete the result, and set the result_rc
+      // correctly.
+      delete task->result_ptr;
+      task->result_ptr= NULL;
+      task->result_rc= GEARMAN_WORK_FAIL;
+
+  case GEARMAN_TASK_STATE_FAIL:
+      if (task->func.fail_fn)
+      {
+        gearman_return_t ret= task->func.fail_fn(task);
+        if (gearman_failed(ret))
+        {
+          task->state= GEARMAN_TASK_STATE_FAIL;
+          return ret;
+        }
+      }
+
+      break;
+    }
+
+    task->state= GEARMAN_TASK_STATE_WORK;
+    return GEARMAN_SUCCESS;
+
+  case GEARMAN_TASK_STATE_FINISHED:
+    break;
+  }
+
+  client->running_tasks--;
+  task->state= GEARMAN_TASK_STATE_FINISHED;
+
+  if (client->options.free_tasks)
+  {
+    gearman_task_free(task);
+  }
+
+  return GEARMAN_SUCCESS;
+}

+ 42 - 0
libgearman/run.hpp

@@ -0,0 +1,42 @@
+/*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
+ * 
+ *  Gearmand client and server library.
+ *
+ *  Copyright (C) 2011 Data Differential, http://datadifferential.com/
+ *  Copyright (C) 2008 Brian Aker, Eric Day
+ *  All rights reserved.
+ *
+ *  Redistribution and use in source and binary forms, with or without
+ *  modification, are permitted provided that the following conditions are
+ *  met:
+ *
+ *      * Redistributions of source code must retain the above copyright
+ *  notice, this list of conditions and the following disclaimer.
+ *
+ *      * Redistributions in binary form must reproduce the above
+ *  copyright notice, this list of conditions and the following disclaimer
+ *  in the documentation and/or other materials provided with the
+ *  distribution.
+ *
+ *      * The names of its contributors may not be used to endorse or
+ *  promote products derived from this software without specific prior
+ *  written permission.
+ *
+ *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#pragma once
+
+GEARMAN_LOCAL
+gearman_return_t _client_run_task(gearman_client_st *client, gearman_task_st *task);