Browse Source

Merge of the Walmartlabs code.

Brian Aker 13 years ago
parent
commit
bd411df0c3

+ 8 - 3
PROTOCOL

@@ -495,12 +495,17 @@ status
 maxqueue
 
     This sets the maximum queue size for a function. If no size is
-    given, the default is used. If the size is negative, then the queue
-    is set to be unlimited. This sends back a single line with "OK".
+    given, the default is used. If one size is given, it is applied to
+    jobs regardless of priority. If three sizes are given, the sizes
+    are used when testing high-priority, normal, and low-priority jobs,
+    respectively. A zero or negative size indicates no limit.  This
+    command sends back a single line with "OK".
 
     Arguments:
     - Function name.
-    - Optional maximum queue size.
+    - Optional maximum queue size (to apply one maximum at all priorities), or
+      three optional maximum queue sizes (to enforce for high-, normal-, and
+      low-priority job submissions).
 
 shutdown
 

+ 2 - 1
libgearman-server/function.cc

@@ -79,7 +79,8 @@ static gearman_server_function_st * gearman_server_function_create(gearman_serve
   function->job_count= 0;
   function->job_total= 0;
   function->job_running= 0;
-  function->max_queue_size= GEARMAN_DEFAULT_MAX_QUEUE_SIZE;
+  memset(function->max_queue_size, GEARMAN_DEFAULT_MAX_QUEUE_SIZE,
+         sizeof(uint32_t) * GEARMAND_JOB_PRIORITY_MAX);
   function->function_name_size= 0;
   gearmand_server_list_add(server, function);
   function->function_name= NULL;

+ 5 - 5
libgearman-server/function.h

@@ -29,13 +29,13 @@ extern "C" {
  * @{
  */
 
-/** Add a new function to a server instance.
+/** 
+  Add a new function to a server instance.
  */
 GEARMAN_API
-gearman_server_function_st *
-gearman_server_function_get(gearman_server_st *server,
-                            const char *function_name,
-                            size_t function_name_size);
+  gearman_server_function_st * gearman_server_function_get(gearman_server_st *server,
+                                                           const char *function_name,
+                                                           size_t function_name_size);
 
 /**
  * Free a server function structure.

+ 5 - 2
libgearman-server/job.c

@@ -157,8 +157,11 @@ gearman_server_job_add_reducer(gearman_server_st *server,
 
   if (server_job == NULL)
   {
-    if (server_function->max_queue_size > 0 &&
-        server_function->job_total >= server_function->max_queue_size)
+    gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Comparing queue %u to limit %u for priority %u",
+      server_function->job_total, server_function->max_queue_size[priority],
+      priority);
+    if (server_function->max_queue_size[priority] > 0 &&
+        server_function->job_total >= server_function->max_queue_size[priority])
     {
       *ret_ptr= GEARMAN_JOB_QUEUE_FULL;
       return NULL;

+ 77 - 51
libgearman-server/server.c

@@ -829,18 +829,9 @@ static gearmand_error_t _server_error_packet(gearman_server_con_st *server_con,
 static gearmand_error_t _server_run_text(gearman_server_con_st *server_con,
                                          gearmand_packet_st *packet)
 {
-  char *data;
-  char *new_data;
-  size_t size;
   size_t total;
-  int max_queue_size;
-  gearman_server_thread_st *thread;
-  gearman_server_con_st *con;
-  gearman_server_worker_st *worker;
-  gearman_server_packet_st *server_packet;
-  int checked_length;
 
-  data= (char *)(char *)malloc(GEARMAN_TEXT_RESPONSE_SIZE);
+  char *data= (char *)(char *)malloc(GEARMAN_TEXT_RESPONSE_SIZE);
   if (data == NULL)
   {
     gearmand_perror("malloc");
@@ -858,28 +849,33 @@ static gearmand_error_t _server_run_text(gearman_server_con_st *server_con,
   {
     snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, TEXT_ERROR_UNKNOWN_COMMAND, 4, "NULL");
   }
-  else if (!strcasecmp("workers", (char *)(packet->arg[0])))
+  else if (strcasecmp("workers", (char *)(packet->arg[0])) == 0)
   {
-    size= 0;
+    size_t size= 0;
 
-    for (thread= Server->thread_list; thread != NULL;
+    for (gearman_server_thread_st *thread= Server->thread_list;
+         thread != NULL;
          thread= thread->next)
     {
       int error;
       if (! (error= pthread_mutex_lock(&thread->lock)))
       {
-        for (con= thread->con_list; con != NULL; con= con->next)
+        for (gearman_server_con_st *con= thread->con_list; con != NULL; con= con->next)
         {
           if (con->_host == NULL)
+          {
             continue;
+          }
 
           if (size > total)
+          {
             size= total;
+          }
 
           /* Make sure we have at least GEARMAN_TEXT_RESPONSE_SIZE bytes. */
           if (size + GEARMAN_TEXT_RESPONSE_SIZE > total)
           {
-            new_data= (char *)realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
+            char *new_data= (char *)realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
             if (new_data == NULL)
             {
               (void) pthread_mutex_unlock(&thread->lock);
@@ -893,10 +889,10 @@ static gearmand_error_t _server_run_text(gearman_server_con_st *server_con,
             total+= GEARMAN_TEXT_RESPONSE_SIZE;
           }
 
-          checked_length= snprintf(data + size, total - size, "%d %s %s :",
-                                   con->con.fd, con->_host, con->id);
+          int sn_checked_length= snprintf(data + size, total - size, "%d %s %s :",
+                                          con->con.fd, con->_host, con->id);
 
-          if ((size_t)checked_length > total - size || checked_length < 0)
+          if ((size_t)sn_checked_length > total - size || sn_checked_length < 0)
           {
             (void) pthread_mutex_unlock(&thread->lock);
             gearmand_debug("free");
@@ -905,13 +901,15 @@ static gearmand_error_t _server_run_text(gearman_server_con_st *server_con,
             return GEARMAN_MEMORY_ALLOCATION_FAILURE;
           }
 
-          size+= (size_t)checked_length;
+          size+= (size_t)sn_checked_length;
           if (size > total)
+          {
             continue;
+          }
 
-          for (worker= con->worker_list; worker != NULL; worker= worker->con_next)
+          for (gearman_server_worker_st *worker= con->worker_list; worker != NULL; worker= worker->con_next)
           {
-            checked_length= snprintf(data + size, total - size, " %.*s",
+            int checked_length= snprintf(data + size, total - size, " %.*s",
                                      (int)(worker->function->function_name_size),
                                      worker->function->function_name);
 
@@ -930,9 +928,11 @@ static gearmand_error_t _server_run_text(gearman_server_con_st *server_con,
           }
 
           if (size > total)
+          {
             continue;
+          }
 
-          checked_length= snprintf(data + size, total - size, "\n");
+          int checked_length= snprintf(data + size, total - size, "\n");
           if ((size_t)checked_length > total - size || checked_length < 0)
           {
             (void) pthread_mutex_unlock(&thread->lock);
@@ -956,7 +956,7 @@ static gearmand_error_t _server_run_text(gearman_server_con_st *server_con,
 
     if (size < total)
     {
-      checked_length= snprintf(data + size, total - size, ".\n");
+      int checked_length= snprintf(data + size, total - size, ".\n");
       if ((size_t)checked_length > total - size || checked_length < 0)
       {
         gearmand_perror("snprintf");
@@ -964,9 +964,9 @@ static gearmand_error_t _server_run_text(gearman_server_con_st *server_con,
       }
     }
   }
-  else if (!strcasecmp("status", (char *)(packet->arg[0])))
+  else if (strcasecmp("status", (char *)(packet->arg[0])) == 0)
   {
-    size= 0;
+    size_t size= 0;
 
     gearman_server_function_st *function;
     for (function= Server->function_list; function != NULL;
@@ -974,7 +974,7 @@ static gearmand_error_t _server_run_text(gearman_server_con_st *server_con,
     {
       if (size + GEARMAN_TEXT_RESPONSE_SIZE > total)
       {
-        new_data= (char *)realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
+        char *new_data= (char *)realloc(data, total + GEARMAN_TEXT_RESPONSE_SIZE);
         if (new_data == NULL)
         {
           gearmand_perror("realloc");
@@ -987,10 +987,10 @@ static gearmand_error_t _server_run_text(gearman_server_con_st *server_con,
         total+= GEARMAN_TEXT_RESPONSE_SIZE;
       }
 
-      checked_length= snprintf(data + size, total - size, "%.*s\t%u\t%u\t%u\n",
-                               (int)(function->function_name_size),
-                               function->function_name, function->job_total,
-                               function->job_running, function->worker_count);
+      int checked_length= snprintf(data + size, total - size, "%.*s\t%u\t%u\t%u\n",
+                                   (int)(function->function_name_size),
+                                   function->function_name, function->job_total,
+                                   function->job_running, function->worker_count);
 
       if ((size_t)checked_length > total - size || checked_length < 0)
       {
@@ -1002,12 +1002,14 @@ static gearmand_error_t _server_run_text(gearman_server_con_st *server_con,
 
       size+= (size_t)checked_length;
       if (size > total)
+      {
         size= total;
+      }
     }
 
     if (size < total)
     {
-      checked_length= snprintf(data + size, total - size, ".\n");
+      int checked_length= snprintf(data + size, total - size, ".\n");
       if ((size_t)checked_length > total - size || checked_length < 0)
       {
         gearmand_perror("snprintf");
@@ -1017,7 +1019,7 @@ static gearmand_error_t _server_run_text(gearman_server_con_st *server_con,
       }
     }
   }
-  else if (!strcasecmp("create", (char *)(packet->arg[0])))
+  else if (strcasecmp("create", (char *)(packet->arg[0])) == 0)
   {
     if (packet->argc == 3 && !strcasecmp("function", (char *)(packet->arg[1])))
     {
@@ -1040,7 +1042,7 @@ static gearmand_error_t _server_run_text(gearman_server_con_st *server_con,
       snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, TEXT_ERROR_ARGS, (int)packet->arg_size[0], (char *)(packet->arg[0]));
     }
   }
-  else if (! strcasecmp("drop", (char *)(packet->arg[0])))
+  else if (strcasecmp("drop", (char *)(packet->arg[0])) == 0)
   {
     if (packet->argc == 3 && !strcasecmp("function", (char *)(packet->arg[1])))
     {
@@ -1075,7 +1077,7 @@ static gearmand_error_t _server_run_text(gearman_server_con_st *server_con,
       snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, TEXT_ERROR_ARGS, (int)packet->arg_size[0], (char *)(packet->arg[0]));
     }
   }
-  else if (!strcasecmp("maxqueue", (char *)(packet->arg[0])))
+  else if (strcasecmp("maxqueue", (char *)(packet->arg[0])) == 0)
   {
     if (packet->argc == 1)
     {
@@ -1083,24 +1085,48 @@ static gearmand_error_t _server_run_text(gearman_server_con_st *server_con,
     }
     else
     {
-      if (packet->argc == 2)
-        max_queue_size= GEARMAN_DEFAULT_MAX_QUEUE_SIZE;
-      else
+      uint32_t max_queue_size[GEARMAND_JOB_PRIORITY_MAX];
+
+      for (int priority= 0; priority < GEARMAND_JOB_PRIORITY_MAX; ++priority)
       {
-        max_queue_size= atoi((char *)(packet->arg[2]));
-        if (max_queue_size < 0)
-          max_queue_size= 0;
+        const int argc= priority +2;
+        if (packet->argc > argc)
+        {
+          const int parameter= atoi((char *)(packet->arg[argc]));
+          if (parameter < 0)
+          {
+            max_queue_size[priority]= 0;
+          }
+          else
+          {
+            max_queue_size[priority]= (uint32_t)parameter;
+          }
+        }
+        else
+        {
+          max_queue_size[priority]= GEARMAN_DEFAULT_MAX_QUEUE_SIZE;
+        }
       }
 
-      gearman_server_function_st *function;
-      for (function= Server->function_list;
-           function != NULL; function= function->next)
+      /* 
+        To preserve the existing behavior of maxqueue, ensure that the
+         one-parameter invocation is applied to all priorities.
+      */
+      if (packet->argc <= 3)
+      {
+        for (int priority= 1; priority < GEARMAND_JOB_PRIORITY_MAX; ++priority)
+        {
+          max_queue_size[priority]= max_queue_size[0];
+        }
+      }
+       
+      for (gearman_server_function_st *function= Server->function_list; function != NULL; function= function->next)
       {
         if (strlen((char *)(packet->arg[1])) == function->function_name_size &&
-            !memcmp(packet->arg[1], function->function_name,
-                    function->function_name_size))
+            (memcmp(packet->arg[1], function->function_name, function->function_name_size) == 0))
         {
-          function->max_queue_size= (uint32_t)max_queue_size;
+          gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Applying queue limits to %s", function->function_name);
+          memcpy(function->max_queue_size, max_queue_size, sizeof(uint32_t) * GEARMAND_JOB_PRIORITY_MAX);
         }
       }
 
@@ -1111,7 +1137,7 @@ static gearmand_error_t _server_run_text(gearman_server_con_st *server_con,
   {
     snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK %d\n", (int)getpid());
   }
-  else if (!strcasecmp("shutdown", (char *)(packet->arg[0])))
+  else if (strcasecmp("shutdown", (char *)(packet->arg[0])) == 0)
   {
     if (packet->argc == 1)
     {
@@ -1119,7 +1145,7 @@ static gearmand_error_t _server_run_text(gearman_server_con_st *server_con,
       snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, TEXT_SUCCESS);
     }
     else if (packet->argc == 2 &&
-             !strcasecmp("graceful", (char *)(packet->arg[1])))
+             strcasecmp("graceful", (char *)(packet->arg[1])) == 0)
     {
       Server->shutdown_graceful= true;
       snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, TEXT_SUCCESS);
@@ -1130,11 +1156,11 @@ static gearmand_error_t _server_run_text(gearman_server_con_st *server_con,
       snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, TEXT_ERROR_ARGS, (int)packet->arg_size[0], (char *)(packet->arg[0]));
     }
   }
-  else if (!strcasecmp("verbose", (char *)(packet->arg[0])))
+  else if (strcasecmp("verbose", (char *)(packet->arg[0])) == 0)
   {
     snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK %s\n", gearmand_verbose_name(Gearmand()->verbose));
   }
-  else if (!strcasecmp("version", (char *)(packet->arg[0])))
+  else if (strcasecmp("version", (char *)(packet->arg[0])) == 0)
   {
     snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, "OK %s\n", PACKAGE_VERSION);
   }
@@ -1145,7 +1171,7 @@ static gearmand_error_t _server_run_text(gearman_server_con_st *server_con,
     snprintf(data, GEARMAN_TEXT_RESPONSE_SIZE, TEXT_ERROR_UNKNOWN_COMMAND, (int)packet->arg_size[0], (char *)(packet->arg[0]));
   }
 
-  server_packet= gearman_server_packet_create(server_con->thread, false);
+  gearman_server_packet_st *server_packet= gearman_server_packet_create(server_con->thread, false);
   if (server_packet == NULL)
   {
     gearmand_debug("free");

+ 1 - 1
libgearman-server/struct/function.h

@@ -43,7 +43,7 @@ struct gearman_server_function_st
   uint32_t job_count;
   uint32_t job_total;
   uint32_t job_running;
-  uint32_t max_queue_size;
+  uint32_t max_queue_size[GEARMAND_JOB_PRIORITY_MAX];
   size_t function_name_size;
   gearman_server_function_st *next;
   gearman_server_function_st *prev;

+ 1 - 1
libgearman/command.cc

@@ -49,7 +49,7 @@
  */
 gearman_command_info_st gearmand_command_info_list[GEARMAN_COMMAND_MAX]=
 {
-  { "TEXT",               3, false },
+  { "TEXT",               5, false },
   { "CAN_DO",             1, false },
   { "CANT_DO",            1, false },
   { "RESET_ABILITIES",    0, false },