123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401 |
- #include <config.h>
- #include <libgearman-server/common.h>
- #include <string.h>
- #include <errno.h>
- #include <assert.h>
- static gearman_server_con_st * _server_con_create(gearman_server_thread_st *thread, gearmand_con_st *dcon,
- gearmand_error_t *ret);
- gearman_server_con_st *gearman_server_con_add(gearman_server_thread_st *thread, gearmand_con_st *dcon, gearmand_error_t *ret)
- {
- gearman_server_con_st *con;
- con= _server_con_create(thread, dcon, ret);
- if (con == NULL)
- {
- return NULL;
- }
- if ((*ret= gearman_io_set_fd(&(con->con), dcon->fd)) != GEARMAN_SUCCESS)
- {
- gearman_server_con_free(con);
- return NULL;
- }
- *ret= gearmand_io_set_events(con, POLLIN);
- if (*ret != GEARMAN_SUCCESS)
- {
- gearmand_gerror("gearmand_io_set_events", *ret);
- gearman_server_con_free(con);
- return NULL;
- }
- return con;
- }
- static gearman_server_con_st * _server_con_create(gearman_server_thread_st *thread, gearmand_con_st *dcon,
- gearmand_error_t *ret)
- {
- gearman_server_con_st *con;
- if (thread->free_con_count > 0)
- {
- con= thread->free_con_list;
- GEARMAN_LIST_DEL(thread->free_con, con,)
- }
- else
- {
- con= (gearman_server_con_st *)malloc(sizeof(gearman_server_con_st));
- if (con == NULL)
- {
- gearmand_perror("malloc");
- *ret= GEARMAN_MEMORY_ALLOCATION_FAILURE;
- return NULL;
- }
- }
- assert(con);
- if (!con)
- {
- gearmand_error("Neigther an allocated gearman_server_con_st() or free listed could be found");
- *ret= GEARMAN_MEMORY_ALLOCATION_FAILURE;
- return NULL;
- }
- gearmand_connection_options_t options[]= { GEARMAND_CON_MAX };
- gearmand_connection_init(thread->gearman, &(con->con), dcon, options);
- con->con.root= con;
- con->is_sleeping= false;
- con->is_exceptions= false;
- con->is_dead= false;
- con->is_noop_sent= false;
- con->ret= 0;
- con->io_list= false;
- con->proc_list= false;
- con->proc_removed= false;
- con->io_packet_count= 0;
- con->proc_packet_count= 0;
- con->worker_count= 0;
- con->client_count= 0;
- con->thread= thread;
- con->packet= NULL;
- con->io_packet_list= NULL;
- con->io_packet_end= NULL;
- con->proc_packet_list= NULL;
- con->proc_packet_end= NULL;
- con->io_next= NULL;
- con->io_prev= NULL;
- con->proc_next= NULL;
- con->proc_prev= NULL;
- con->worker_list= NULL;
- con->client_list= NULL;
- con->_host= dcon->host;
- con->_port= dcon->port;
- strcpy(con->id, "-");
- con->protocol.context= NULL;
- con->protocol.context_free_fn= NULL;
- con->protocol.packet_pack_fn= gearmand_packet_pack;
- con->protocol.packet_unpack_fn= gearmand_packet_unpack;
- int error;
- if (! (error= pthread_mutex_lock(&thread->lock)))
- {
- GEARMAN_LIST_ADD(thread->con, con,);
- (void) pthread_mutex_unlock(&thread->lock);
- }
- else
- {
- errno= error;
- gearmand_perror("pthread_mutex_lock");
- gearman_server_con_free(con);
- *ret= GEARMAN_ERRNO;
- return NULL;
- }
- return con;
- }
- void gearman_server_con_free(gearman_server_con_st *con)
- {
- gearman_server_thread_st *thread= con->thread;
- gearman_server_packet_st *packet;
- con->_host= NULL;
- con->_port= NULL;
- if (Server->flags.threaded && !(con->proc_removed) && !(Server->proc_shutdown))
- {
- con->is_dead= true;
- con->is_sleeping= false;
- con->is_exceptions= false;
- con->is_noop_sent= false;
- gearman_server_con_proc_add(con);
- return;
- }
- gearmand_io_free(&(con->con));
- if (con->protocol.context != NULL && con->protocol.context_free_fn != NULL)
- {
- con->protocol.context_free_fn(con, (void *)con->protocol.context);
- }
- if (con->proc_list)
- {
- gearman_server_con_proc_remove(con);
- }
- if (con->io_list)
- {
- gearman_server_con_io_remove(con);
- }
- if (con->packet != NULL)
- {
- if (&(con->packet->packet) != con->con.recv_packet)
- {
- gearmand_packet_free(&(con->packet->packet));
- }
- gearman_server_packet_free(con->packet, con->thread, true);
- }
- while (con->io_packet_list != NULL)
- {
- gearman_server_io_packet_remove(con);
- }
- while (con->proc_packet_list != NULL)
- {
- packet= gearman_server_proc_packet_remove(con);
- gearmand_packet_free(&(packet->packet));
- gearman_server_packet_free(packet, con->thread, true);
- }
- gearman_server_con_free_workers(con);
- while (con->client_list != NULL)
- {
- gearman_server_client_free(con->client_list);
- }
- (void) pthread_mutex_lock(&thread->lock);
- GEARMAN_LIST_DEL(con->thread->con, con,)
- (void) pthread_mutex_unlock(&thread->lock);
- if (thread->free_con_count < GEARMAN_MAX_FREE_SERVER_CON)
- {
- GEARMAN_LIST_ADD(thread->free_con, con,)
- }
- else
- {
- gearmand_debug("free");
- free(con);
- }
- }
- gearmand_io_st *gearman_server_con_con(gearman_server_con_st *con)
- {
- return &con->con;
- }
- gearmand_con_st *gearman_server_con_data(gearman_server_con_st *con)
- {
- return gearman_io_context(&(con->con));
- }
- const char *gearman_server_con_id(gearman_server_con_st *con)
- {
- return con->id;
- }
- void gearman_server_con_set_id(gearman_server_con_st *con, char *id,
- size_t size)
- {
- if (size >= GEARMAN_SERVER_CON_ID_SIZE)
- size= GEARMAN_SERVER_CON_ID_SIZE - 1;
- memcpy(con->id, id, size);
- con->id[size]= 0;
- }
- void gearman_server_con_free_worker(gearman_server_con_st *con,
- char *function_name,
- size_t function_name_size)
- {
- gearman_server_worker_st *worker= con->worker_list;
- gearman_server_worker_st *prev_worker= NULL;
- while (worker != NULL)
- {
- if (worker->function->function_name_size == function_name_size &&
- !memcmp(worker->function->function_name, function_name,
- function_name_size))
- {
- gearman_server_worker_free(worker);
-
- if (prev_worker == NULL)
- worker= con->worker_list;
- else
- worker= prev_worker;
- }
- else
- {
-
- prev_worker= worker;
- worker= worker->con_next;
- }
- }
- }
- void gearman_server_con_free_workers(gearman_server_con_st *con)
- {
- while (con->worker_list != NULL)
- gearman_server_worker_free(con->worker_list);
- }
- void gearman_server_con_io_add(gearman_server_con_st *con)
- {
- if (con->io_list)
- return;
- (void) pthread_mutex_lock(&con->thread->lock);
- GEARMAN_LIST_ADD(con->thread->io, con, io_)
- con->io_list= true;
-
- if (con->thread->io_count == 1 && con->thread->run_fn)
- {
- (void) pthread_mutex_unlock(&con->thread->lock);
- (*con->thread->run_fn)(con->thread, con->thread->run_fn_arg);
- }
- else
- {
- (void) pthread_mutex_unlock(&con->thread->lock);
- }
- }
- void gearman_server_con_io_remove(gearman_server_con_st *con)
- {
- (void) pthread_mutex_lock(&con->thread->lock);
- if (con->io_list)
- {
- GEARMAN_LIST_DEL(con->thread->io, con, io_)
- con->io_list= false;
- }
- (void) pthread_mutex_unlock(&con->thread->lock);
- }
- gearman_server_con_st *
- gearman_server_con_io_next(gearman_server_thread_st *thread)
- {
- gearman_server_con_st *con= thread->io_list;
- if (con == NULL)
- return NULL;
- gearman_server_con_io_remove(con);
- return con;
- }
- void gearman_server_con_proc_add(gearman_server_con_st *con)
- {
- if (con->proc_list)
- return;
- (void) pthread_mutex_lock(&con->thread->lock);
- GEARMAN_LIST_ADD(con->thread->proc, con, proc_)
- con->proc_list= true;
- (void) pthread_mutex_unlock(&con->thread->lock);
- if (! (Server->proc_shutdown) && !(Server->proc_wakeup))
- {
- (void) pthread_mutex_lock(&(Server->proc_lock));
- Server->proc_wakeup= true;
- (void) pthread_cond_signal(&(Server->proc_cond));
- (void) pthread_mutex_unlock(&(Server->proc_lock));
- }
- }
- void gearman_server_con_proc_remove(gearman_server_con_st *con)
- {
- (void) pthread_mutex_lock(&con->thread->lock);
- if (con->proc_list)
- {
- GEARMAN_LIST_DEL(con->thread->proc, con, proc_)
- con->proc_list= false;
- }
- (void) pthread_mutex_unlock(&con->thread->lock);
- }
- gearman_server_con_st *
- gearman_server_con_proc_next(gearman_server_thread_st *thread)
- {
- gearman_server_con_st *con;
- if (thread->proc_list == NULL)
- return NULL;
- (void) pthread_mutex_lock(&thread->lock);
- con= thread->proc_list;
- while (con != NULL)
- {
- GEARMAN_LIST_DEL(thread->proc, con, proc_)
- con->proc_list= false;
- if (!(con->proc_removed))
- break;
- con= thread->proc_list;
- }
- (void) pthread_mutex_unlock(&thread->lock);
- return con;
- }
- void gearmand_connection_set_protocol(gearman_server_con_st *connection, void *context,
- gearmand_connection_protocol_context_free_fn *free_fn,
- gearmand_packet_pack_fn *pack,
- gearmand_packet_unpack_fn *unpack)
- {
- connection->protocol.context= context;
- connection->protocol.context_free_fn= free_fn;
- connection->protocol.packet_pack_fn= pack;
- connection->protocol.packet_unpack_fn= unpack;
- }
- void *gearmand_connection_protocol_context(const gearman_server_con_st *connection)
- {
- return connection->protocol.context;
- }
|