Browse Source

recv(), close()

Brian Aker 14 years ago
parent
commit
6d3143f9e6

+ 2 - 2
libgearman/client.cc

@@ -1145,14 +1145,14 @@ static inline gearman_return_t _client_run_tasks(gearman_client_st *client)
             {
               /* Read the next packet, without buffering the data part. */
               client->task= NULL;
-              (void)gearman_connection_recv(client->con, &(client->con->packet), &ret, false);
+              (void)client->con->recv(&(client->con->packet), &ret, false);
             }
           }
           else
           {
             /* Read the next packet, buffering the data part. */
             client->task= NULL;
-            (void)gearman_connection_recv(client->con, &(client->con->packet), &ret, true);
+            (void)client->con->recv(&(client->con->packet), &ret, true);
           }
 
           if (client->task == NULL)

+ 92 - 96
libgearman/connection.cc

@@ -173,7 +173,7 @@ gearman_connection_st *gearman_connection_copy(gearman_universal_st& universal,
 void gearman_connection_free(gearman_connection_st *connection)
 {
   if (connection->fd != -1)
-    gearman_connection_close(connection);
+    connection->close();
 
   gearman_connection_reset_addrinfo(connection);
 
@@ -239,44 +239,44 @@ void gearman_connection_st::set_host(const char *host_arg, const in_port_t port_
   port= in_port_t(port_arg == 0 ? GEARMAN_DEFAULT_TCP_PORT : port_arg);
 }
 
-void gearman_connection_close(gearman_connection_st *connection)
+void gearman_connection_st::close()
 {
-  if (connection->fd == INVALID_SOCKET)
+  if (fd == INVALID_SOCKET)
     return;
 
   /* in case of death shutdown to avoid blocking at close() */
-  if (shutdown(connection->fd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
+  if (shutdown(fd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
   {
-    gearman_perror(connection->universal, "shutdown");
+    gearman_perror(universal, "shutdown");
     assert(errno != ENOTSOCK);
     return;
   }
 
-  if (closesocket(connection->fd) == SOCKET_ERROR)
+  if (closesocket(fd) == SOCKET_ERROR)
   {
-    gearman_perror(connection->universal, "close");
+    gearman_perror(universal, "close");
   }
 
-  connection->state= GEARMAN_CON_UNIVERSAL_ADDRINFO;
-  connection->fd= INVALID_SOCKET;
-  connection->events= 0;
-  connection->revents= 0;
+  state= GEARMAN_CON_UNIVERSAL_ADDRINFO;
+  fd= INVALID_SOCKET;
+  events= 0;
+  revents= 0;
 
-  connection->send_state= GEARMAN_CON_SEND_STATE_NONE;
-  connection->send_buffer_ptr= connection->send_buffer;
-  connection->send_buffer_size= 0;
-  connection->send_data_size= 0;
-  connection->send_data_offset= 0;
+  send_state= GEARMAN_CON_SEND_STATE_NONE;
+  send_buffer_ptr= send_buffer;
+  send_buffer_size= 0;
+  send_data_size= 0;
+  send_data_offset= 0;
 
-  connection->recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
-  if (connection->recv_packet != NULL)
+  recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
+  if (recv_packet != NULL)
   {
-    gearman_packet_free(connection->recv_packet);
-    connection->recv_packet= NULL;
+    gearman_packet_free(recv_packet);
+    recv_packet= NULL;
   }
 
-  connection->recv_buffer_ptr= connection->recv_buffer;
-  connection->recv_buffer_size= 0;
+  recv_buffer_ptr= recv_buffer;
+  recv_buffer_size= 0;
 }
 
 void gearman_connection_reset_addrinfo(gearman_connection_st *connection)
@@ -412,7 +412,7 @@ gearman_return_t gearman_connection_st::send(const gearman_packet_st *packet_arg
       gearman_return_t ret= flush();
       if (gearman_success(ret) and options.close_after_flush)
       {
-        gearman_connection_close(this);
+        close();
         ret= GEARMAN_LOST_CONNECTION;
       }
       return ret;
@@ -425,7 +425,7 @@ gearman_return_t gearman_connection_st::send(const gearman_packet_st *packet_arg
     gearman_return_t ret= flush();
     if (ret == GEARMAN_SUCCESS && options.close_after_flush)
     {
-      gearman_connection_close(this);
+      close();
       ret= GEARMAN_LOST_CONNECTION;
     }
     return ret;
@@ -491,8 +491,8 @@ gearman_return_t gearman_connection_st::flush()
       }
 
     case GEARMAN_CON_UNIVERSAL_CONNECT:
-      if (fd != -1)
-        gearman_connection_close(this);
+      if (fd != INVALID_SOCKET)
+        close();
 
       if (addrinfo_next == NULL)
       {
@@ -504,7 +504,7 @@ gearman_return_t gearman_connection_st::flush()
       fd= socket(addrinfo_next->ai_family,
                              addrinfo_next->ai_socktype,
                              addrinfo_next->ai_protocol);
-      if (fd == -1)
+      if (fd == INVALID_SOCKET)
       {
         state= GEARMAN_CON_UNIVERSAL_ADDRINFO;
         gearman_perror(universal, "socket");
@@ -515,7 +515,7 @@ gearman_return_t gearman_connection_st::flush()
         gearman_return_t gret= _con_setsockopt(this);
         if (gearman_failed(gret))
         {
-          gearman_connection_close(this);
+          close();
           return gret;
         }
       }
@@ -546,7 +546,7 @@ gearman_return_t gearman_connection_st::flush()
         }
 
         gearman_perror(universal, "connect");
-        gearman_connection_close(this);
+        close();
         return GEARMAN_ERRNO;
       }
 
@@ -640,7 +640,7 @@ gearman_return_t gearman_connection_st::flush()
           {
             gearman_error(&universal, GEARMAN_LOST_CONNECTION, "lost connection to server (EOF)");
           }
-          gearman_connection_close(this);
+          close();
           return GEARMAN_LOST_CONNECTION;
         }
         else if (write_size == -1)
@@ -673,12 +673,12 @@ gearman_return_t gearman_connection_st::flush()
             {
               gearman_perror(universal, "lost connection to server during send");
             }
-            gearman_connection_close(this);
+            close();
             return GEARMAN_LOST_CONNECTION;
           }
 
           gearman_perror(universal, "send");
-          gearman_connection_close(this);
+          close();
           return GEARMAN_ERRNO;
         }
 
@@ -709,39 +709,38 @@ gearman_return_t gearman_connection_st::flush()
   }
 }
 
-gearman_packet_st *gearman_connection_recv(gearman_connection_st *connection,
-                                           gearman_packet_st *packet,
-                                           gearman_return_t *ret_ptr, bool recv_data)
+gearman_packet_st *gearman_connection_st::recv(gearman_packet_st *packet_arg,
+                                               gearman_return_t *ret_ptr, bool recv_data)
 {
-  switch (connection->recv_state)
+  switch (recv_state)
   {
   case GEARMAN_CON_RECV_UNIVERSAL_NONE:
-    if (connection->state != GEARMAN_CON_UNIVERSAL_CONNECTED)
+    if (state != GEARMAN_CON_UNIVERSAL_CONNECTED)
     {
-      gearman_error(&connection->universal, GEARMAN_NOT_CONNECTED, "not connected");
+      gearman_error(&universal, GEARMAN_NOT_CONNECTED, "not connected");
       *ret_ptr= GEARMAN_NOT_CONNECTED;
       return NULL;
     }
 
-    connection->recv_packet= gearman_packet_create(connection->universal, packet);
-    if (not connection->recv_packet)
+    recv_packet= gearman_packet_create(universal, packet_arg);
+    if (not recv_packet)
     {
       *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
       return NULL;
     }
 
-    connection->recv_state= GEARMAN_CON_RECV_UNIVERSAL_READ;
+    recv_state= GEARMAN_CON_RECV_UNIVERSAL_READ;
 
   case GEARMAN_CON_RECV_UNIVERSAL_READ:
     while (1)
     {
-      if (connection->recv_buffer_size > 0)
+      if (recv_buffer_size > 0)
       {
-        size_t recv_size= gearman_packet_unpack(*connection->recv_packet,
-                                                connection->recv_buffer_ptr,
-                                                connection->recv_buffer_size, *ret_ptr);
-        connection->recv_buffer_ptr+= recv_size;
-        connection->recv_buffer_size-= recv_size;
+        size_t recv_size= gearman_packet_unpack(*recv_packet,
+                                                recv_buffer_ptr,
+                                                recv_buffer_size, *ret_ptr);
+        recv_buffer_ptr+= recv_size;
+        recv_buffer_size-= recv_size;
 
         if (gearman_success(*ret_ptr))
         {
@@ -749,128 +748,125 @@ gearman_packet_st *gearman_connection_recv(gearman_connection_st *connection,
         }
         else if (*ret_ptr != GEARMAN_IO_WAIT)
         {
-          gearman_connection_close(connection);
+          close();
           return NULL;
         }
       }
 
       /* Shift buffer contents if needed. */
-      if (connection->recv_buffer_size > 0)
-        memmove(connection->recv_buffer, connection->recv_buffer_ptr, connection->recv_buffer_size);
-      connection->recv_buffer_ptr= connection->recv_buffer;
+      if (recv_buffer_size > 0)
+        memmove(recv_buffer, recv_buffer_ptr, recv_buffer_size);
+      recv_buffer_ptr= recv_buffer;
 
-      size_t recv_size= gearman_connection_read(connection, connection->recv_buffer + connection->recv_buffer_size,
-                                                GEARMAN_RECV_BUFFER_SIZE - connection->recv_buffer_size, ret_ptr);
+      size_t recv_size= gearman_connection_read(this, recv_buffer + recv_buffer_size,
+                                                GEARMAN_RECV_BUFFER_SIZE - recv_buffer_size, ret_ptr);
       if (gearman_failed(*ret_ptr))
       {
         return NULL;
       }
 
-      connection->recv_buffer_size+= recv_size;
+      recv_buffer_size+= recv_size;
     }
 
-    if (packet->data_size == 0)
+    if (packet_arg->data_size == 0)
     {
-      connection->recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
+      recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
       break;
     }
 
-    connection->recv_data_size= packet->data_size;
+    recv_data_size= packet_arg->data_size;
 
     if (not recv_data)
     {
-      connection->recv_state= GEARMAN_CON_RECV_STATE_READ_DATA;
+      recv_state= GEARMAN_CON_RECV_STATE_READ_DATA;
       break;
     }
 
-    if (packet->universal->workload_malloc_fn == NULL)
+    if (packet_arg->universal->workload_malloc_fn == NULL)
     {
       // Since it may be C on the other side, don't use new
-      packet->data= malloc(packet->data_size);
+      packet_arg->data= malloc(packet_arg->data_size);
     }
     else
     {
-      packet->data= packet->universal->workload_malloc_fn(packet->data_size,
-                                                        static_cast<void *>(packet->universal->workload_malloc_context));
+      packet_arg->data= packet_arg->universal->workload_malloc_fn(packet_arg->data_size,
+                                                                  static_cast<void *>(packet_arg->universal->workload_malloc_context));
     }
-    if (packet->data == NULL)
+    if (packet_arg->data == NULL)
     {
       *ret_ptr= GEARMAN_MEMORY_ALLOCATION_FAILURE;
-      gearman_connection_close(connection);
+      close();
       return NULL;
     }
 
-    packet->options.free_data= true;
-    connection->recv_state= GEARMAN_CON_RECV_STATE_READ_DATA;
+    packet_arg->options.free_data= true;
+    recv_state= GEARMAN_CON_RECV_STATE_READ_DATA;
 
   case GEARMAN_CON_RECV_STATE_READ_DATA:
-    while (connection->recv_data_size != 0)
+    while (recv_data_size)
     {
-      (void)gearman_connection_recv_data(connection,
-                                         static_cast<uint8_t *>(const_cast<void *>(packet->data)) +
-                                         connection->recv_data_offset,
-                                         packet->data_size -
-                                         connection->recv_data_offset, ret_ptr);
+      (void)recv(static_cast<uint8_t *>(const_cast<void *>(packet_arg->data)) +
+                 recv_data_offset,
+                 packet_arg->data_size -recv_data_offset, ret_ptr);
       if (gearman_failed(*ret_ptr))
       {
         return NULL;
       }
     }
 
-    connection->recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
+    recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
     break;
   }
 
-  packet= connection->recv_packet;
-  connection->recv_packet= NULL;
+  packet_arg= recv_packet;
+  recv_packet= NULL;
 
-  return packet;
+  return packet_arg;
 }
 
-size_t gearman_connection_recv_data(gearman_connection_st *connection, void *data, size_t data_size,
-                                    gearman_return_t *ret_ptr)
+size_t gearman_connection_st::recv(void *data, size_t data_size, gearman_return_t *ret_ptr)
 {
   size_t recv_size= 0;
 
-  if (connection->recv_data_size == 0)
+  if (recv_data_size == 0)
   {
     *ret_ptr= GEARMAN_SUCCESS;
     return 0;
   }
 
-  if ((connection->recv_data_size - connection->recv_data_offset) < data_size)
-    data_size= connection->recv_data_size - connection->recv_data_offset;
+  if ((recv_data_size - recv_data_offset) < data_size)
+    data_size= recv_data_size - recv_data_offset;
 
-  if (connection->recv_buffer_size > 0)
+  if (recv_buffer_size > 0)
   {
-    if (connection->recv_buffer_size < data_size)
-      recv_size= connection->recv_buffer_size;
+    if (recv_buffer_size < data_size)
+      recv_size= recv_buffer_size;
     else
       recv_size= data_size;
 
-    memcpy(data, connection->recv_buffer_ptr, recv_size);
-    connection->recv_buffer_ptr+= recv_size;
-    connection->recv_buffer_size-= recv_size;
+    memcpy(data, recv_buffer_ptr, recv_size);
+    recv_buffer_ptr+= recv_size;
+    recv_buffer_size-= recv_size;
   }
 
   if (data_size != recv_size)
   {
-    recv_size+= gearman_connection_read(connection,
+    recv_size+= gearman_connection_read(this,
                                         static_cast<uint8_t *>(const_cast<void *>(data)) + recv_size,
                                         data_size - recv_size, ret_ptr);
-    connection->recv_data_offset+= recv_size;
+    recv_data_offset+= recv_size;
   }
   else
   {
-    connection->recv_data_offset+= recv_size;
+    recv_data_offset+= recv_size;
     *ret_ptr= GEARMAN_SUCCESS;
   }
 
-  if (connection->recv_data_size == connection->recv_data_offset)
+  if (recv_data_size == recv_data_offset)
   {
-    connection->recv_data_size= 0;
-    connection->recv_data_offset= 0;
-    connection->recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
+    recv_data_size= 0;
+    recv_data_offset= 0;
+    recv_state= GEARMAN_CON_RECV_UNIVERSAL_NONE;
   }
 
   return recv_size;
@@ -890,7 +886,7 @@ size_t gearman_connection_read(gearman_connection_st *connection, void *data, si
       {
         gearman_error(&connection->universal, GEARMAN_LOST_CONNECTION, "lost connection to server (EOF)");
       }
-      gearman_connection_close(connection);
+      connection->close();
       *ret_ptr= GEARMAN_LOST_CONNECTION;
       return 0;
     }
@@ -935,7 +931,7 @@ size_t gearman_connection_read(gearman_connection_st *connection, void *data, si
         *ret_ptr= GEARMAN_ERRNO;
       }
 
-      gearman_connection_close(connection);
+      connection->close();
       return 0;
     }
 

+ 0 - 27
libgearman/connection.h

@@ -95,33 +95,6 @@ enum gearman_con_universal_t {
 GEARMAN_INTERNAL_API
 void gearman_connection_free(gearman_connection_st *connection);
 
-/**
- * Close a connection.
- */
-GEARMAN_INTERNAL_API
-void gearman_connection_close(gearman_connection_st *connection);
-
-/**
- * Flush the send buffer.
- */
-GEARMAN_INTERNAL_API
-gearman_return_t gearman_connection_flush(gearman_connection_st *connection);
-
-/**
- * Receive packet from a connection.
- */
-GEARMAN_INTERNAL_API
-gearman_packet_st *gearman_connection_recv(gearman_connection_st *connection,
-                                           gearman_packet_st *packet,
-                                           gearman_return_t *ret_ptr, bool recv_data);
-
-/**
- * Receive packet data from a connection.
- */
-GEARMAN_INTERNAL_API
-size_t gearman_connection_recv_data(gearman_connection_st *connection, void *data, size_t data_size,
-                                    gearman_return_t *ret_ptr);
-
 /**
  * Read data from a connection.
  */

+ 8 - 0
libgearman/connection.hpp

@@ -88,6 +88,14 @@ struct gearman_connection_st
   size_t send(const void *data, size_t data_size, gearman_return_t *ret_ptr);
 
   gearman_return_t flush();
+  void close();
+
+  // Receive packet from a connection.
+  gearman_packet_st *recv(gearman_packet_st *packet,
+                          gearman_return_t *ret_ptr, bool recv_data);
+
+  // Receive packet data from a connection.
+  size_t recv(void *data, size_t data_size, gearman_return_t *ret_ptr);
 };
 
 /**

+ 1 - 1
libgearman/task.cc

@@ -344,7 +344,7 @@ size_t gearman_task_recv_data(gearman_task_st *task, void *data,
     return 0;
   }
 
-  return gearman_connection_recv_data(task->con, data, data_size, ret_ptr);
+  return task->con->recv(data, data_size, ret_ptr);
 }
 
 gearman_return_t gearman_task_error(const gearman_task_st *task)

+ 2 - 2
libgearman/universal.cc

@@ -307,7 +307,7 @@ gearman_return_t gearman_echo(gearman_universal_st *universal,
       goto exit;
     }
 
-    packet_ptr= gearman_connection_recv(con, &(con->packet), &ret, true);
+    packet_ptr= con->recv(&(con->packet), &ret, true);
     if (gearman_failed(ret))
     {
       goto exit;
@@ -370,7 +370,7 @@ bool gearman_request_option(gearman_universal_st &universal,
       goto exit;
     }
 
-    packet_ptr= gearman_connection_recv(con, &(con->packet), &ret, true);
+    packet_ptr= con->recv(&(con->packet), &ret, true);
     if (gearman_failed(ret))
     {
       gearman_packet_free(&(con->packet));

+ 1 - 1
libgearman/worker.cc

@@ -672,7 +672,7 @@ gearman_job_st *gearman_worker_grab_job(gearman_worker_st *worker,
         {
     case GEARMAN_WORKER_STATE_GRAB_JOB_RECV:
           assert(&(worker->job->assigned));
-          (void)gearman_connection_recv(worker->con, &(worker->job->assigned), ret_ptr, true);
+          (void)worker->con->recv(&(worker->job->assigned), ret_ptr, true);
 
           if (gearman_failed(*ret_ptr))
           {

+ 4 - 0
m4/pandora_canonical.m4

@@ -367,7 +367,11 @@ typedef unsigned long int ulong;
 #else
 #define INVALID_SOCKET -1
 #define SOCKET_ERROR -1
+#ifdef __cplusplus
+#define closesocket(a) ::close(a)
+#else
 #define closesocket(a) close(a)
+#endif
 #define get_socket_errno() errno
 #endif // TARGET_OS_WINDOWS
 

+ 2 - 2
tests/worker_test.cc

@@ -354,7 +354,7 @@ static test_return_t abandoned_worker_test(void *)
 
   gearman_packet_free(&packet);
 
-  gearman_connection_recv(worker1, &packet, &ret, false);
+  worker1->recv(&packet, &ret, false);
   test_truth(not (ret != GEARMAN_SUCCESS || packet.command != GEARMAN_COMMAND_JOB_ASSIGN));
 
   test_strcmp(job_handle, packet.arg[0]); // unexepcted job
@@ -389,7 +389,7 @@ static test_return_t abandoned_worker_test(void *)
   gearman_packet_free(&packet);
 
   gearman_universal_set_timeout(universal, 1000);
-  gearman_connection_recv(worker2, &packet, &ret, false);
+  worker2->recv(&packet, &ret, false);
   test_truth(not (ret != GEARMAN_SUCCESS || packet.command != GEARMAN_COMMAND_ERROR));
 
   gearman_connection_free(worker1);