universal.cc 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Gearman State Definitions
  41. */
  42. #include "gear_config.h"
  43. #include <libgearman/common.h>
  44. #include "libgearman/assert.hpp"
  45. #include "libgearman/server_options.hpp"
  46. #include <cerrno>
  47. #include <cstdarg>
  48. #include <cstdio>
  49. #include <cstdlib>
  50. #include <cstring>
  51. #include <cctype>
  52. #include <unistd.h>
  53. #include <memory>
  54. void gearman_universal_initialize(gearman_universal_st &self, const gearman_universal_options_t *options)
  55. {
  56. { // Set defaults on all options.
  57. self.options.non_blocking= false;
  58. }
  59. if (options)
  60. {
  61. while (*options != GEARMAN_MAX)
  62. {
  63. /**
  64. @note Check for bad value, refactor gearman_add_options().
  65. */
  66. gearman_universal_add_options(self, *options);
  67. options++;
  68. }
  69. }
  70. self.verbose= GEARMAN_VERBOSE_NEVER;
  71. self.con_count= 0;
  72. self.packet_count= 0;
  73. self.pfds_size= 0;
  74. self.sending= 0;
  75. self.timeout= -1;
  76. self.con_list= NULL;
  77. self.packet_list= NULL;
  78. self.server_options_list= NULL;
  79. self.pfds= NULL;
  80. self.log_fn= NULL;
  81. self.log_context= NULL;
  82. self.allocator= gearman_default_allocator();
  83. self._namespace= NULL;
  84. self.error.rc= GEARMAN_SUCCESS;
  85. self.error.last_errno= 0;
  86. self.error.last_error[0]= 0;
  87. self.wakeup_fd[0]= INVALID_SOCKET;
  88. self.wakeup_fd[1]= INVALID_SOCKET;
  89. }
  90. void gearman_nap(int arg)
  91. {
  92. if (arg < 1)
  93. { }
  94. else
  95. {
  96. #ifdef WIN32
  97. sleep(arg/1000000);
  98. #else
  99. struct timespec global_sleep_value= { 0, static_cast<long>(arg * 1000)};
  100. nanosleep(&global_sleep_value, NULL);
  101. #endif
  102. }
  103. }
  104. void gearman_nap(gearman_universal_st &self)
  105. {
  106. gearman_nap(self.timeout);
  107. }
  108. void gearman_universal_clone(gearman_universal_st &destination, const gearman_universal_st &source, bool has_wakeup_fd)
  109. {
  110. int wakeup_fd[2];
  111. if (has_wakeup_fd)
  112. {
  113. wakeup_fd[0]= destination.wakeup_fd[0];
  114. wakeup_fd[1]= destination.wakeup_fd[1];
  115. }
  116. gearman_universal_initialize(destination);
  117. if (has_wakeup_fd)
  118. {
  119. destination.wakeup_fd[0]= wakeup_fd[0];
  120. destination.wakeup_fd[1]= wakeup_fd[1];
  121. }
  122. (void)gearman_universal_set_option(destination, GEARMAN_NON_BLOCKING, source.options.non_blocking);
  123. destination.timeout= source.timeout;
  124. destination._namespace= gearman_string_clone(source._namespace);
  125. for (gearman_connection_st *con= source.con_list; con; con= con->next)
  126. {
  127. if (gearman_connection_copy(destination, *con) == NULL)
  128. {
  129. gearman_universal_free(destination);
  130. return;
  131. }
  132. }
  133. }
  134. void gearman_universal_free(gearman_universal_st &universal)
  135. {
  136. gearman_free_all_cons(universal);
  137. gearman_free_all_packets(universal);
  138. gearman_string_free(universal._namespace);
  139. if (universal.pfds)
  140. {
  141. // created realloc()
  142. free(universal.pfds);
  143. universal.pfds= NULL;
  144. }
  145. // clean-up server options
  146. while (universal.server_options_list)
  147. {
  148. delete universal.server_options_list;
  149. }
  150. }
  151. gearman_return_t gearman_universal_set_option(gearman_universal_st &self, gearman_universal_options_t option, bool value)
  152. {
  153. switch (option)
  154. {
  155. case GEARMAN_NON_BLOCKING:
  156. self.options.non_blocking= value;
  157. break;
  158. case GEARMAN_DONT_TRACK_PACKETS:
  159. break;
  160. case GEARMAN_MAX:
  161. default:
  162. return GEARMAN_INVALID_COMMAND;
  163. }
  164. return GEARMAN_SUCCESS;
  165. }
  166. int gearman_universal_timeout(gearman_universal_st &self)
  167. {
  168. return self.timeout;
  169. }
  170. void gearman_universal_set_timeout(gearman_universal_st &self, int timeout)
  171. {
  172. self.timeout= timeout;
  173. }
  174. void gearman_set_log_fn(gearman_universal_st &self, gearman_log_fn *function,
  175. void *context, gearman_verbose_t verbose)
  176. {
  177. self.log_fn= function;
  178. self.log_context= context;
  179. self.verbose= verbose;
  180. }
  181. void gearman_set_workload_malloc_fn(gearman_universal_st& universal,
  182. gearman_malloc_fn *function,
  183. void *context)
  184. {
  185. universal.allocator.malloc= function;
  186. universal.allocator.context= context;
  187. }
  188. void gearman_set_workload_free_fn(gearman_universal_st& universal,
  189. gearman_free_fn *function,
  190. void *context)
  191. {
  192. universal.allocator.free= function;
  193. universal.allocator.context= context;
  194. }
  195. void gearman_free_all_cons(gearman_universal_st& universal)
  196. {
  197. while (universal.con_list)
  198. {
  199. delete universal.con_list;
  200. }
  201. }
  202. void gearman_reset(gearman_universal_st& universal)
  203. {
  204. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  205. {
  206. con->close_socket();
  207. }
  208. }
  209. /*
  210. * Flush all shouldn't return any error, because there's no way to indicate
  211. * which connection experienced an issue. Error detection is better done in gearman_wait()
  212. * after flushing all the connections here.
  213. */
  214. void gearman_flush_all(gearman_universal_st& universal)
  215. {
  216. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  217. {
  218. if (con->events & POLLOUT)
  219. {
  220. continue;
  221. }
  222. con->flush();
  223. }
  224. }
  225. gearman_return_t gearman_wait(gearman_universal_st& universal)
  226. {
  227. struct pollfd *pfds;
  228. bool have_shutdown_pipe= universal.wakeup_fd[0] == INVALID_SOCKET ? false : true;
  229. size_t con_count= universal.con_count +int(have_shutdown_pipe);
  230. if (universal.pfds_size < con_count)
  231. {
  232. pfds= static_cast<pollfd*>(realloc(universal.pfds, con_count * sizeof(struct pollfd)));
  233. if (pfds == NULL)
  234. {
  235. gearman_perror(universal, "pollfd realloc");
  236. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  237. }
  238. universal.pfds= pfds;
  239. universal.pfds_size= int(con_count);
  240. }
  241. else
  242. {
  243. pfds= universal.pfds;
  244. }
  245. nfds_t x= 0;
  246. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  247. {
  248. if (con->events == 0)
  249. {
  250. continue;
  251. }
  252. pfds[x].fd= con->fd;
  253. pfds[x].events= con->events;
  254. pfds[x].revents= 0;
  255. x++;
  256. }
  257. if (x == 0)
  258. {
  259. return gearman_error(universal, GEARMAN_NO_ACTIVE_FDS, "no active file descriptors");
  260. }
  261. // Wakeup handling, we only make use of this if we have active connections
  262. size_t pipe_array_iterator= 0;
  263. if (have_shutdown_pipe)
  264. {
  265. pipe_array_iterator= x;
  266. pfds[x].fd= universal.wakeup_fd[0];
  267. pfds[x].events= POLLIN;
  268. pfds[x].revents= 0;
  269. x++;
  270. }
  271. int ret= 0;
  272. while (universal.timeout)
  273. {
  274. ret= poll(pfds, x, universal.timeout);
  275. if (ret == -1)
  276. {
  277. switch(errno)
  278. {
  279. case EINTR:
  280. continue;
  281. case EINVAL:
  282. return gearman_perror(universal, "RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid");
  283. default:
  284. return gearman_perror(universal, "poll");
  285. }
  286. }
  287. break;
  288. }
  289. if (ret == 0)
  290. {
  291. return gearman_universal_set_error(universal, GEARMAN_TIMEOUT, GEARMAN_AT,
  292. "timeout reached, %u servers were poll(), no servers were available, pipe:%s",
  293. uint32_t(x), have_shutdown_pipe ? "true" : "false");
  294. }
  295. x= 0;
  296. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  297. {
  298. if (con->events == 0)
  299. {
  300. continue;
  301. }
  302. if (pfds[x].revents & (POLLERR | POLLHUP | POLLNVAL))
  303. {
  304. int err;
  305. socklen_t len= sizeof (err);
  306. if (getsockopt(pfds[x].fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
  307. {
  308. con->cached_errno= err;
  309. }
  310. }
  311. con->set_revents(pfds[x].revents);
  312. x++;
  313. }
  314. if (have_shutdown_pipe and pfds[pipe_array_iterator].revents)
  315. {
  316. char buffer[1];
  317. ssize_t read_length= read(universal.wakeup_fd[0], buffer, sizeof(buffer));
  318. if (read_length > 0)
  319. {
  320. gearman_return_t local_ret= gearman_kill(gearman_universal_id(universal), GEARMAN_INTERRUPT);
  321. if (gearman_failed(local_ret))
  322. {
  323. return GEARMAN_SHUTDOWN;
  324. }
  325. return GEARMAN_SHUTDOWN_GRACEFUL;
  326. }
  327. if (read_length == 0)
  328. {
  329. return GEARMAN_SHUTDOWN;
  330. }
  331. #if 0
  332. perror("shudown read");
  333. #endif
  334. // @todo figure out what happens in an error
  335. }
  336. return GEARMAN_SUCCESS;
  337. }
  338. gearman_connection_st *gearman_ready(gearman_universal_st& universal)
  339. {
  340. /*
  341. We can't keep universal between calls since connections may be removed during
  342. processing. If this list ever gets big, we may want something faster.
  343. */
  344. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  345. {
  346. if (con->options.ready)
  347. {
  348. con->options.ready= false;
  349. return con;
  350. }
  351. }
  352. return NULL;
  353. }
  354. gearman_return_t gearman_set_identifier(gearman_universal_st& universal,
  355. const char *id, size_t id_size)
  356. {
  357. if (id == NULL)
  358. {
  359. return gearman_error(universal, GEARMAN_INVALID_ARGUMENT, "id was NULL");
  360. }
  361. if (id_size == 0)
  362. {
  363. return gearman_error(universal, GEARMAN_INVALID_ARGUMENT, "id_size was 0");
  364. }
  365. if (id_size > GEARMAN_MAX_IDENTIFIER)
  366. {
  367. return gearman_error(universal, GEARMAN_ARGUMENT_TOO_LARGE, "id_size was greater then GEARMAN_MAX_ECHO_SIZE");
  368. }
  369. for (size_t x= 0; x < id_size; x++)
  370. {
  371. if (isgraph(id[x]) == false)
  372. {
  373. return gearman_error(universal, GEARMAN_INVALID_ARGUMENT, "Invalid character found in identifier");
  374. }
  375. }
  376. gearman_packet_st packet;
  377. gearman_return_t ret= gearman_packet_create_args(universal, packet, GEARMAN_MAGIC_REQUEST,
  378. GEARMAN_COMMAND_SET_CLIENT_ID,
  379. (const void**)&id, &id_size, 1);
  380. if (gearman_success(ret))
  381. {
  382. PUSH_BLOCKING(universal);
  383. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  384. {
  385. gearman_return_t local_ret= con->send_packet(packet, true);
  386. if (gearman_failed(local_ret))
  387. {
  388. ret= local_ret;
  389. }
  390. }
  391. }
  392. gearman_packet_free(&packet);
  393. return ret;
  394. }
  395. EchoCheck::EchoCheck(gearman_universal_st& universal_,
  396. const void *workload_, const size_t workload_size_) :
  397. _universal(universal_),
  398. _workload(workload_),
  399. _workload_size(workload_size_)
  400. {
  401. }
  402. gearman_return_t EchoCheck::success(gearman_connection_st* con)
  403. {
  404. if (con->_packet.command != GEARMAN_COMMAND_ECHO_RES)
  405. {
  406. return gearman_error(_universal, GEARMAN_INVALID_COMMAND, "Wrong command sent in response to ECHO request");
  407. }
  408. if (con->_packet.data_size != _workload_size or
  409. memcmp(_workload, con->_packet.data, _workload_size))
  410. {
  411. return gearman_error(_universal, GEARMAN_ECHO_DATA_CORRUPTION, "corruption during echo");
  412. }
  413. return GEARMAN_SUCCESS;
  414. }
  415. OptionCheck::OptionCheck(gearman_universal_st& universal_):
  416. _universal(universal_)
  417. {
  418. }
  419. gearman_return_t OptionCheck::success(gearman_connection_st* con)
  420. {
  421. if (con->_packet.command == GEARMAN_COMMAND_ERROR)
  422. {
  423. return gearman_error(_universal, GEARMAN_INVALID_SERVER_OPTION, "invalid server option");
  424. }
  425. return GEARMAN_SUCCESS;
  426. }
  427. static gearman_return_t connection_loop(gearman_universal_st& universal,
  428. const gearman_packet_st& message,
  429. Check& check)
  430. {
  431. gearman_return_t ret= GEARMAN_SUCCESS;
  432. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  433. {
  434. ret= con->send_packet(message, true);
  435. if (gearman_failed(ret))
  436. {
  437. #if 0
  438. assert_msg(con->universal.error.rc != GEARMAN_SUCCESS, "Programmer error, error returned but not recorded");
  439. #endif
  440. break;
  441. }
  442. con->options.packet_in_use= true;
  443. gearman_packet_st *packet_ptr= con->receiving(con->_packet, ret, true);
  444. if (packet_ptr == NULL)
  445. {
  446. assert(&con->_packet == universal.packet_list);
  447. con->options.packet_in_use= false;
  448. break;
  449. }
  450. assert(packet_ptr == &con->_packet);
  451. if (gearman_failed(ret))
  452. {
  453. #if 0
  454. assert_msg(con->universal.error.rc != GEARMAN_SUCCESS, "Programmer error, error returned but not recorded");
  455. #endif
  456. con->free_private_packet();
  457. con->reset_recv_packet();
  458. break;
  459. }
  460. assert(packet_ptr);
  461. if (gearman_failed(ret= check.success(con)))
  462. {
  463. #if 0
  464. assert_msg(con->universal.error.rc != GEARMAN_SUCCESS, "Programmer error, error returned but not recorded");
  465. #endif
  466. con->free_private_packet();
  467. con->reset_recv_packet();
  468. break;
  469. }
  470. con->reset_recv_packet();
  471. con->free_private_packet();
  472. }
  473. return ret;
  474. }
  475. gearman_return_t gearman_echo(gearman_universal_st& universal,
  476. const void *workload,
  477. size_t workload_size)
  478. {
  479. if (workload == NULL)
  480. {
  481. return gearman_error(universal, GEARMAN_INVALID_ARGUMENT, "workload was NULL");
  482. }
  483. if (workload_size == 0)
  484. {
  485. return gearman_error(universal, GEARMAN_INVALID_ARGUMENT, "workload_size was 0");
  486. }
  487. if (workload_size > GEARMAN_MAX_ECHO_SIZE)
  488. {
  489. return gearman_error(universal, GEARMAN_ARGUMENT_TOO_LARGE, "workload_size was greater then GEARMAN_MAX_ECHO_SIZE");
  490. }
  491. gearman_packet_st message;
  492. gearman_return_t ret= gearman_packet_create_args(universal, message, GEARMAN_MAGIC_REQUEST,
  493. GEARMAN_COMMAND_ECHO_REQ,
  494. &workload, &workload_size, 1);
  495. if (gearman_success(ret))
  496. {
  497. PUSH_BLOCKING(universal);
  498. EchoCheck check(universal, workload, workload_size);
  499. ret= connection_loop(universal, message, check);
  500. }
  501. else
  502. {
  503. gearman_packet_free(&message);
  504. gearman_error(universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "gearman_packet_create_args()");
  505. return ret;
  506. }
  507. gearman_packet_free(&message);
  508. return ret;
  509. }
  510. void gearman_free_all_packets(gearman_universal_st &universal)
  511. {
  512. while (universal.packet_list)
  513. {
  514. gearman_packet_free(universal.packet_list);
  515. }
  516. }
  517. gearman_id_t gearman_universal_id(gearman_universal_st &universal)
  518. {
  519. gearman_id_t handle= { universal.wakeup_fd[0], universal.wakeup_fd[1] };
  520. return handle;
  521. }
  522. /*
  523. * Local Definitions
  524. */
  525. void gearman_universal_set_namespace(gearman_universal_st& universal, const char *namespace_key, size_t namespace_key_size)
  526. {
  527. gearman_string_free(universal._namespace);
  528. if (namespace_key)
  529. {
  530. universal._namespace= gearman_string_create(NULL, namespace_key, namespace_key_size);
  531. }
  532. else
  533. {
  534. universal._namespace= NULL;
  535. }
  536. }
  537. const char *gearman_univeral_namespace(gearman_universal_st& universal)
  538. {
  539. return gearman_string_value(universal._namespace);
  540. }