universal.cc 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Gearman State Definitions
  41. */
  42. #include "gear_config.h"
  43. #include <libgearman/common.h>
  44. #include "libgearman/assert.hpp"
  45. #include "libgearman/interface/push.hpp"
  46. #include "libgearman/server_options.hpp"
  47. #include <cerrno>
  48. #include <cstdarg>
  49. #include <cstdio>
  50. #include <cstdlib>
  51. #include <cstring>
  52. #include <cctype>
  53. #include <unistd.h>
  54. #include <memory>
  55. void gearman_nap(int arg)
  56. {
  57. if (arg < 1)
  58. { }
  59. else
  60. {
  61. #ifdef WIN32
  62. sleep(arg/1000000);
  63. #else
  64. struct timespec global_sleep_value= { 0, static_cast<long>(arg * 1000)};
  65. nanosleep(&global_sleep_value, NULL);
  66. #endif
  67. }
  68. }
  69. void gearman_nap(gearman_universal_st &self)
  70. {
  71. gearman_nap(self.timeout);
  72. }
  73. void gearman_universal_clone(gearman_universal_st &destination, const gearman_universal_st &source, bool has_wakeup_fd)
  74. {
  75. int wakeup_fd[2];
  76. if (has_wakeup_fd)
  77. {
  78. wakeup_fd[0]= destination.wakeup_fd[0];
  79. wakeup_fd[1]= destination.wakeup_fd[1];
  80. }
  81. if (has_wakeup_fd)
  82. {
  83. destination.wakeup_fd[0]= wakeup_fd[0];
  84. destination.wakeup_fd[1]= wakeup_fd[1];
  85. }
  86. (void)gearman_universal_set_option(destination, GEARMAN_NON_BLOCKING, source.options.non_blocking);
  87. destination.timeout= source.timeout;
  88. destination._namespace= gearman_string_clone(source._namespace);
  89. for (gearman_connection_st *con= source.con_list; con; con= con->next)
  90. {
  91. if (gearman_connection_copy(destination, *con) == NULL)
  92. {
  93. gearman_universal_free(destination);
  94. return;
  95. }
  96. }
  97. }
  98. void gearman_universal_free(gearman_universal_st &universal)
  99. {
  100. gearman_free_all_cons(universal);
  101. gearman_free_all_packets(universal);
  102. gearman_string_free(universal._namespace);
  103. if (universal.pfds)
  104. {
  105. // created realloc()
  106. free(universal.pfds);
  107. universal.pfds= NULL;
  108. }
  109. // clean-up server options
  110. while (universal.server_options_list)
  111. {
  112. delete universal.server_options_list;
  113. }
  114. }
  115. gearman_return_t gearman_universal_set_option(gearman_universal_st &self, gearman_universal_options_t option, bool value)
  116. {
  117. switch (option)
  118. {
  119. case GEARMAN_NON_BLOCKING:
  120. self.options.non_blocking= value;
  121. break;
  122. case GEARMAN_DONT_TRACK_PACKETS:
  123. break;
  124. case GEARMAN_MAX:
  125. default:
  126. return GEARMAN_INVALID_COMMAND;
  127. }
  128. return GEARMAN_SUCCESS;
  129. }
  130. int gearman_universal_timeout(gearman_universal_st &self)
  131. {
  132. return self.timeout;
  133. }
  134. void gearman_universal_set_timeout(gearman_universal_st &self, int timeout)
  135. {
  136. self.timeout= timeout;
  137. }
  138. void gearman_set_log_fn(gearman_universal_st &self, gearman_log_fn *function,
  139. void *context, gearman_verbose_t verbose)
  140. {
  141. self.log_fn= function;
  142. self.log_context= context;
  143. self.verbose= verbose;
  144. }
  145. void gearman_set_workload_malloc_fn(gearman_universal_st& universal,
  146. gearman_malloc_fn *function,
  147. void *context)
  148. {
  149. universal.allocator.malloc= function;
  150. universal.allocator.context= context;
  151. }
  152. void gearman_set_workload_free_fn(gearman_universal_st& universal,
  153. gearman_free_fn *function,
  154. void *context)
  155. {
  156. universal.allocator.free= function;
  157. universal.allocator.context= context;
  158. }
  159. void gearman_free_all_cons(gearman_universal_st& universal)
  160. {
  161. while (universal.con_list)
  162. {
  163. delete universal.con_list;
  164. }
  165. }
  166. void gearman_reset(gearman_universal_st& universal)
  167. {
  168. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  169. {
  170. con->close_socket();
  171. }
  172. }
  173. /*
  174. * Flush all shouldn't return any error, because there's no way to indicate
  175. * which connection experienced an issue. Error detection is better done in gearman_wait()
  176. * after flushing all the connections here.
  177. */
  178. void gearman_flush_all(gearman_universal_st& universal)
  179. {
  180. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  181. {
  182. if (con->events & POLLOUT)
  183. {
  184. continue;
  185. }
  186. con->flush();
  187. }
  188. }
  189. gearman_return_t gearman_wait(gearman_universal_st& universal)
  190. {
  191. struct pollfd *pfds;
  192. bool have_shutdown_pipe= universal.wakeup_fd[0] == INVALID_SOCKET ? false : true;
  193. size_t con_count= universal.con_count +int(have_shutdown_pipe);
  194. if (universal.pfds_size < con_count)
  195. {
  196. pfds= static_cast<pollfd*>(realloc(universal.pfds, con_count * sizeof(struct pollfd)));
  197. if (pfds == NULL)
  198. {
  199. gearman_perror(universal, "pollfd realloc");
  200. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  201. }
  202. universal.pfds= pfds;
  203. universal.pfds_size= int(con_count);
  204. }
  205. else
  206. {
  207. pfds= universal.pfds;
  208. }
  209. nfds_t x= 0;
  210. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  211. {
  212. if (con->events == 0)
  213. {
  214. continue;
  215. }
  216. pfds[x].fd= con->fd;
  217. pfds[x].events= con->events;
  218. pfds[x].revents= 0;
  219. x++;
  220. }
  221. if (x == 0)
  222. {
  223. return gearman_error(universal, GEARMAN_NO_ACTIVE_FDS, "no active file descriptors");
  224. }
  225. // Wakeup handling, we only make use of this if we have active connections
  226. size_t pipe_array_iterator= 0;
  227. if (have_shutdown_pipe)
  228. {
  229. pipe_array_iterator= x;
  230. pfds[x].fd= universal.wakeup_fd[0];
  231. pfds[x].events= POLLIN;
  232. pfds[x].revents= 0;
  233. x++;
  234. }
  235. int ret= 0;
  236. while (universal.timeout)
  237. {
  238. ret= poll(pfds, x, universal.timeout);
  239. if (ret == -1)
  240. {
  241. switch(errno)
  242. {
  243. case EINTR:
  244. continue;
  245. case EINVAL:
  246. return gearman_perror(universal, "RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid");
  247. default:
  248. return gearman_perror(universal, "poll");
  249. }
  250. }
  251. break;
  252. }
  253. if (ret == 0)
  254. {
  255. return gearman_universal_set_error(universal, GEARMAN_TIMEOUT, GEARMAN_AT,
  256. "timeout reached, %u servers were poll(), no servers were available, pipe:%s",
  257. uint32_t(x), have_shutdown_pipe ? "true" : "false");
  258. }
  259. x= 0;
  260. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  261. {
  262. if (con->events == 0)
  263. {
  264. continue;
  265. }
  266. if (pfds[x].revents & (POLLERR | POLLHUP | POLLNVAL))
  267. {
  268. int err;
  269. socklen_t len= sizeof (err);
  270. if (getsockopt(pfds[x].fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
  271. {
  272. con->cached_errno= err;
  273. }
  274. }
  275. con->set_revents(pfds[x].revents);
  276. x++;
  277. }
  278. if (have_shutdown_pipe and pfds[pipe_array_iterator].revents)
  279. {
  280. char buffer[1];
  281. ssize_t read_length= read(universal.wakeup_fd[0], buffer, sizeof(buffer));
  282. if (read_length > 0)
  283. {
  284. gearman_return_t local_ret= gearman_kill(gearman_universal_id(universal), GEARMAN_INTERRUPT);
  285. if (gearman_failed(local_ret))
  286. {
  287. return GEARMAN_SHUTDOWN;
  288. }
  289. return GEARMAN_SHUTDOWN_GRACEFUL;
  290. }
  291. if (read_length == 0)
  292. {
  293. return GEARMAN_SHUTDOWN;
  294. }
  295. #if 0
  296. perror("shudown read");
  297. #endif
  298. // @todo figure out what happens in an error
  299. }
  300. return GEARMAN_SUCCESS;
  301. }
  302. gearman_connection_st *gearman_ready(gearman_universal_st& universal)
  303. {
  304. /*
  305. We can't keep universal between calls since connections may be removed during
  306. processing. If this list ever gets big, we may want something faster.
  307. */
  308. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  309. {
  310. if (con->options.ready)
  311. {
  312. con->options.ready= false;
  313. return con;
  314. }
  315. }
  316. return NULL;
  317. }
  318. gearman_return_t gearman_set_identifier(gearman_universal_st& universal,
  319. const char *id, size_t id_size)
  320. {
  321. if (id == NULL)
  322. {
  323. return gearman_error(universal, GEARMAN_INVALID_ARGUMENT, "id was NULL");
  324. }
  325. if (id_size == 0)
  326. {
  327. return gearman_error(universal, GEARMAN_INVALID_ARGUMENT, "id_size was 0");
  328. }
  329. if (id_size > GEARMAN_MAX_IDENTIFIER)
  330. {
  331. return gearman_error(universal, GEARMAN_ARGUMENT_TOO_LARGE, "id_size was greater then GEARMAN_MAX_ECHO_SIZE");
  332. }
  333. for (size_t x= 0; x < id_size; x++)
  334. {
  335. if (isgraph(id[x]) == false)
  336. {
  337. return gearman_error(universal, GEARMAN_INVALID_ARGUMENT, "Invalid character found in identifier");
  338. }
  339. }
  340. gearman_packet_st packet;
  341. gearman_return_t ret= gearman_packet_create_args(universal, packet, GEARMAN_MAGIC_REQUEST,
  342. GEARMAN_COMMAND_SET_CLIENT_ID,
  343. (const void**)&id, &id_size, 1);
  344. if (gearman_success(ret))
  345. {
  346. PUSH_BLOCKING(universal);
  347. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  348. {
  349. gearman_return_t local_ret= con->send_packet(packet, true);
  350. if (gearman_failed(local_ret))
  351. {
  352. ret= local_ret;
  353. }
  354. }
  355. }
  356. gearman_packet_free(&packet);
  357. return ret;
  358. }
  359. EchoCheck::EchoCheck(gearman_universal_st& universal_,
  360. const void *workload_, const size_t workload_size_) :
  361. _universal(universal_),
  362. _workload(workload_),
  363. _workload_size(workload_size_)
  364. {
  365. }
  366. gearman_return_t EchoCheck::success(gearman_connection_st* con)
  367. {
  368. if (con->_packet.command != GEARMAN_COMMAND_ECHO_RES)
  369. {
  370. return gearman_error(_universal, GEARMAN_INVALID_COMMAND, "Wrong command sent in response to ECHO request");
  371. }
  372. if (con->_packet.data_size != _workload_size or
  373. memcmp(_workload, con->_packet.data, _workload_size))
  374. {
  375. return gearman_error(_universal, GEARMAN_ECHO_DATA_CORRUPTION, "corruption during echo");
  376. }
  377. return GEARMAN_SUCCESS;
  378. }
  379. OptionCheck::OptionCheck(gearman_universal_st& universal_):
  380. _universal(universal_)
  381. {
  382. }
  383. gearman_return_t OptionCheck::success(gearman_connection_st* con)
  384. {
  385. if (con->_packet.command == GEARMAN_COMMAND_ERROR)
  386. {
  387. return gearman_error(_universal, GEARMAN_INVALID_SERVER_OPTION, "invalid server option");
  388. }
  389. return GEARMAN_SUCCESS;
  390. }
  391. static gearman_return_t connection_loop(gearman_universal_st& universal,
  392. const gearman_packet_st& message,
  393. Check& check)
  394. {
  395. gearman_return_t ret= GEARMAN_SUCCESS;
  396. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  397. {
  398. ret= con->send_packet(message, true);
  399. if (gearman_failed(ret))
  400. {
  401. #if 0
  402. assert_msg(con->universal.error.rc != GEARMAN_SUCCESS, "Programmer error, error returned but not recorded");
  403. #endif
  404. break;
  405. }
  406. con->options.packet_in_use= true;
  407. gearman_packet_st *packet_ptr= con->receiving(con->_packet, ret, true);
  408. if (packet_ptr == NULL)
  409. {
  410. assert(&con->_packet == universal.packet_list);
  411. con->options.packet_in_use= false;
  412. break;
  413. }
  414. assert(packet_ptr == &con->_packet);
  415. if (gearman_failed(ret))
  416. {
  417. #if 0
  418. assert_msg(con->universal.error.rc != GEARMAN_SUCCESS, "Programmer error, error returned but not recorded");
  419. #endif
  420. con->free_private_packet();
  421. con->reset_recv_packet();
  422. break;
  423. }
  424. assert(packet_ptr);
  425. if (gearman_failed(ret= check.success(con)))
  426. {
  427. #if 0
  428. assert_msg(con->universal.error.rc != GEARMAN_SUCCESS, "Programmer error, error returned but not recorded");
  429. #endif
  430. con->free_private_packet();
  431. con->reset_recv_packet();
  432. break;
  433. }
  434. con->reset_recv_packet();
  435. con->free_private_packet();
  436. }
  437. return ret;
  438. }
  439. gearman_return_t gearman_echo(gearman_universal_st& universal,
  440. const void *workload,
  441. size_t workload_size)
  442. {
  443. if (workload == NULL)
  444. {
  445. return gearman_error(universal, GEARMAN_INVALID_ARGUMENT, "workload was NULL");
  446. }
  447. if (workload_size == 0)
  448. {
  449. return gearman_error(universal, GEARMAN_INVALID_ARGUMENT, "workload_size was 0");
  450. }
  451. if (workload_size > GEARMAN_MAX_ECHO_SIZE)
  452. {
  453. return gearman_error(universal, GEARMAN_ARGUMENT_TOO_LARGE, "workload_size was greater then GEARMAN_MAX_ECHO_SIZE");
  454. }
  455. gearman_packet_st message;
  456. gearman_return_t ret= gearman_packet_create_args(universal, message, GEARMAN_MAGIC_REQUEST,
  457. GEARMAN_COMMAND_ECHO_REQ,
  458. &workload, &workload_size, 1);
  459. if (gearman_success(ret))
  460. {
  461. PUSH_BLOCKING(universal);
  462. EchoCheck check(universal, workload, workload_size);
  463. ret= connection_loop(universal, message, check);
  464. }
  465. else
  466. {
  467. gearman_packet_free(&message);
  468. gearman_error(universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "gearman_packet_create_args()");
  469. return ret;
  470. }
  471. gearman_packet_free(&message);
  472. return ret;
  473. }
  474. void gearman_free_all_packets(gearman_universal_st &universal)
  475. {
  476. while (universal.packet_list)
  477. {
  478. gearman_packet_free(universal.packet_list);
  479. }
  480. }
  481. gearman_id_t gearman_universal_id(gearman_universal_st &universal)
  482. {
  483. gearman_id_t handle= { universal.wakeup_fd[0], universal.wakeup_fd[1] };
  484. return handle;
  485. }
  486. /*
  487. * Local Definitions
  488. */
  489. void gearman_universal_set_namespace(gearman_universal_st& universal, const char *namespace_key, size_t namespace_key_size)
  490. {
  491. gearman_string_free(universal._namespace);
  492. if (namespace_key)
  493. {
  494. universal._namespace= gearman_string_create(NULL, namespace_key, namespace_key_size);
  495. }
  496. else
  497. {
  498. universal._namespace= NULL;
  499. }
  500. }
  501. const char *gearman_univeral_namespace(gearman_universal_st& universal)
  502. {
  503. return gearman_string_value(universal._namespace);
  504. }