universal.cc 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2011 Data Differential, http://datadifferential.com/
  6. * Copyright (C) 2008 Brian Aker, Eric Day
  7. * All rights reserved.
  8. *
  9. * Redistribution and use in source and binary forms, with or without
  10. * modification, are permitted provided that the following conditions are
  11. * met:
  12. *
  13. * * Redistributions of source code must retain the above copyright
  14. * notice, this list of conditions and the following disclaimer.
  15. *
  16. * * Redistributions in binary form must reproduce the above
  17. * copyright notice, this list of conditions and the following disclaimer
  18. * in the documentation and/or other materials provided with the
  19. * distribution.
  20. *
  21. * * The names of its contributors may not be used to endorse or
  22. * promote products derived from this software without specific prior
  23. * written permission.
  24. *
  25. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  26. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  27. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  28. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  29. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  30. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  31. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  32. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  33. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  34. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  35. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  36. *
  37. */
  38. /**
  39. * @file
  40. * @brief Gearman State Definitions
  41. */
  42. #include <config.h>
  43. #include <libgearman/common.h>
  44. #include <cassert>
  45. #include <cerrno>
  46. #include <cstdarg>
  47. #include <cstdio>
  48. #include <cstdlib>
  49. #include <cstring>
  50. #include <cctype>
  51. #include <unistd.h>
  52. void gearman_universal_initialize(gearman_universal_st &self, const gearman_universal_options_t *options)
  53. {
  54. { // Set defaults on all options.
  55. self.options.dont_track_packets= false;
  56. self.options.non_blocking= false;
  57. self.options.stored_non_blocking= false;
  58. }
  59. if (options)
  60. {
  61. while (*options != GEARMAN_MAX)
  62. {
  63. /**
  64. @note Check for bad value, refactor gearman_add_options().
  65. */
  66. gearman_universal_add_options(self, *options);
  67. options++;
  68. }
  69. }
  70. self.verbose= GEARMAN_VERBOSE_NEVER;
  71. self.con_count= 0;
  72. self.packet_count= 0;
  73. self.pfds_size= 0;
  74. self.sending= 0;
  75. self.timeout= -1;
  76. self.con_list= NULL;
  77. self.packet_list= NULL;
  78. self.pfds= NULL;
  79. self.log_fn= NULL;
  80. self.log_context= NULL;
  81. self.allocator= gearman_default_allocator();
  82. self._namespace= NULL;
  83. self.error.rc= GEARMAN_SUCCESS;
  84. self.error.last_errno= 0;
  85. self.error.last_error[0]= 0;
  86. self.wakeup_fd[0]= INVALID_SOCKET;
  87. self.wakeup_fd[1]= INVALID_SOCKET;
  88. }
  89. void gearman_nap(int arg)
  90. {
  91. if (arg < 1)
  92. { }
  93. else
  94. {
  95. #ifdef WIN32
  96. sleep(arg/1000000);
  97. #else
  98. struct timespec global_sleep_value= { 0, static_cast<long>(arg * 1000)};
  99. nanosleep(&global_sleep_value, NULL);
  100. #endif
  101. }
  102. }
  103. void gearman_nap(gearman_universal_st &self)
  104. {
  105. gearman_nap(self.timeout);
  106. }
  107. void gearman_universal_clone(gearman_universal_st &destination, const gearman_universal_st &source, bool has_wakeup_fd)
  108. {
  109. int wakeup_fd[2];
  110. if (has_wakeup_fd)
  111. {
  112. wakeup_fd[0]= destination.wakeup_fd[0];
  113. wakeup_fd[1]= destination.wakeup_fd[1];
  114. }
  115. gearman_universal_initialize(destination);
  116. if (has_wakeup_fd)
  117. {
  118. destination.wakeup_fd[0]= wakeup_fd[0];
  119. destination.wakeup_fd[1]= wakeup_fd[1];
  120. }
  121. (void)gearman_universal_set_option(destination, GEARMAN_NON_BLOCKING, source.options.non_blocking);
  122. (void)gearman_universal_set_option(destination, GEARMAN_DONT_TRACK_PACKETS, source.options.dont_track_packets);
  123. destination.timeout= source.timeout;
  124. destination._namespace= gearman_string_clone(source._namespace);
  125. for (gearman_connection_st *con= source.con_list; con; con= con->next)
  126. {
  127. if (gearman_connection_copy(destination, *con) == NULL)
  128. {
  129. gearman_universal_free(destination);
  130. return;
  131. }
  132. }
  133. }
  134. void gearman_universal_free(gearman_universal_st &universal)
  135. {
  136. gearman_free_all_cons(universal);
  137. gearman_free_all_packets(universal);
  138. gearman_string_free(universal._namespace);
  139. if (universal.pfds)
  140. {
  141. // created realloc()
  142. free(universal.pfds);
  143. }
  144. }
  145. gearman_return_t gearman_universal_set_option(gearman_universal_st &self, gearman_universal_options_t option, bool value)
  146. {
  147. switch (option)
  148. {
  149. case GEARMAN_NON_BLOCKING:
  150. self.options.non_blocking= value;
  151. break;
  152. case GEARMAN_DONT_TRACK_PACKETS:
  153. self.options.dont_track_packets= value;
  154. break;
  155. case GEARMAN_MAX:
  156. default:
  157. return GEARMAN_INVALID_COMMAND;
  158. }
  159. return GEARMAN_SUCCESS;
  160. }
  161. int gearman_universal_timeout(gearman_universal_st &self)
  162. {
  163. return self.timeout;
  164. }
  165. void gearman_universal_set_timeout(gearman_universal_st &self, int timeout)
  166. {
  167. self.timeout= timeout;
  168. }
  169. void gearman_set_log_fn(gearman_universal_st &self, gearman_log_fn *function,
  170. void *context, gearman_verbose_t verbose)
  171. {
  172. self.log_fn= function;
  173. self.log_context= context;
  174. self.verbose= verbose;
  175. }
  176. void gearman_set_workload_malloc_fn(gearman_universal_st& universal,
  177. gearman_malloc_fn *function,
  178. void *context)
  179. {
  180. universal.allocator.malloc= function;
  181. universal.allocator.context= context;
  182. }
  183. void gearman_set_workload_free_fn(gearman_universal_st& universal,
  184. gearman_free_fn *function,
  185. void *context)
  186. {
  187. universal.allocator.free= function;
  188. universal.allocator.context= context;
  189. }
  190. void gearman_free_all_cons(gearman_universal_st& universal)
  191. {
  192. while (universal.con_list)
  193. {
  194. delete universal.con_list;
  195. }
  196. }
  197. void gearman_reset(gearman_universal_st& universal)
  198. {
  199. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  200. {
  201. con->close_socket();
  202. }
  203. }
  204. gearman_return_t gearman_flush_all(gearman_universal_st& universal)
  205. {
  206. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  207. {
  208. if (con->events & POLLOUT)
  209. {
  210. continue;
  211. }
  212. gearman_return_t ret= con->flush();
  213. if (gearman_failed(ret) and ret != GEARMAN_IO_WAIT)
  214. {
  215. return ret;
  216. }
  217. }
  218. return GEARMAN_SUCCESS;
  219. }
  220. gearman_return_t gearman_wait(gearman_universal_st& universal)
  221. {
  222. struct pollfd *pfds;
  223. bool have_shutdown_pipe= universal.wakeup_fd[0] == INVALID_SOCKET ? false : true;
  224. size_t con_count= universal.con_count +int(have_shutdown_pipe);
  225. if (universal.pfds_size < con_count)
  226. {
  227. pfds= static_cast<pollfd*>(realloc(universal.pfds, con_count * sizeof(struct pollfd)));
  228. if (pfds == NULL)
  229. {
  230. gearman_perror(universal, "pollfd realloc");
  231. return GEARMAN_MEMORY_ALLOCATION_FAILURE;
  232. }
  233. universal.pfds= pfds;
  234. universal.pfds_size= con_count;
  235. }
  236. else
  237. {
  238. pfds= universal.pfds;
  239. }
  240. nfds_t x= 0;
  241. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  242. {
  243. if (con->events == 0)
  244. continue;
  245. pfds[x].fd= con->fd;
  246. pfds[x].events= con->events;
  247. pfds[x].revents= 0;
  248. x++;
  249. }
  250. if (x == 0)
  251. {
  252. return gearman_error(universal, GEARMAN_NO_ACTIVE_FDS, "no active file descriptors");
  253. }
  254. // Wakeup handling, we only make use of this if we have active connections
  255. size_t pipe_array_iterator= 0;
  256. if (have_shutdown_pipe)
  257. {
  258. pipe_array_iterator= x;
  259. pfds[x].fd= universal.wakeup_fd[0];
  260. pfds[x].events= POLLIN;
  261. pfds[x].revents= 0;
  262. x++;
  263. }
  264. int ret= 0;
  265. while (universal.timeout)
  266. {
  267. ret= poll(pfds, x, universal.timeout);
  268. if (ret == -1)
  269. {
  270. switch(errno)
  271. {
  272. case EINTR:
  273. continue;
  274. default:
  275. return gearman_perror(universal, "poll");
  276. }
  277. }
  278. break;
  279. }
  280. if (ret == 0)
  281. {
  282. gearman_error(universal, GEARMAN_TIMEOUT, "timeout reached, no servers were available");
  283. return GEARMAN_TIMEOUT;
  284. }
  285. x= 0;
  286. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  287. {
  288. if (con->events == 0)
  289. {
  290. continue;
  291. }
  292. int err;
  293. socklen_t len= sizeof (err);
  294. if (getsockopt(con->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
  295. {
  296. con->cached_errno= err;
  297. }
  298. con->set_revents(pfds[x].revents);
  299. x++;
  300. }
  301. if (have_shutdown_pipe and pfds[pipe_array_iterator].revents)
  302. {
  303. char buffer[1];
  304. ssize_t read_length= read(universal.wakeup_fd[0], buffer, sizeof(buffer));
  305. if (read_length > 0)
  306. {
  307. return GEARMAN_SHUTDOWN_GRACEFUL;
  308. }
  309. if (read_length == 0)
  310. {
  311. return GEARMAN_SHUTDOWN;
  312. }
  313. perror("shudown read");
  314. // @todo figure out what happens in an error
  315. }
  316. return GEARMAN_SUCCESS;
  317. }
  318. gearman_connection_st *gearman_ready(gearman_universal_st& universal)
  319. {
  320. /*
  321. We can't keep universal between calls since connections may be removed during
  322. processing. If this list ever gets big, we may want something faster.
  323. */
  324. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  325. {
  326. if (con->options.ready)
  327. {
  328. con->options.ready= false;
  329. return con;
  330. }
  331. }
  332. return NULL;
  333. }
  334. /**
  335. @note _push_blocking is only used for echo (and should be fixed
  336. when tricky flip/flop in IO is fixed).
  337. */
  338. static inline void _push_blocking(gearman_universal_st& universal, bool &orig_block_universal)
  339. {
  340. orig_block_universal= universal.options.non_blocking;
  341. universal.options.non_blocking= false;
  342. }
  343. static inline void _pop_non_blocking(gearman_universal_st& universal, bool orig_block_universal)
  344. {
  345. universal.options.non_blocking= orig_block_universal;
  346. }
  347. gearman_return_t gearman_set_identifier(gearman_universal_st& universal,
  348. const char *id, size_t id_size)
  349. {
  350. bool orig_block_universal;
  351. if (id == NULL)
  352. {
  353. return gearman_error(universal, GEARMAN_INVALID_ARGUMENT, "id was NULL");
  354. }
  355. if (id_size == 0)
  356. {
  357. return gearman_error(universal, GEARMAN_INVALID_ARGUMENT, "id_size was 0");
  358. }
  359. if (id_size > GEARMAN_MAX_IDENTIFIER)
  360. {
  361. return gearman_error(universal, GEARMAN_ARGUMENT_TOO_LARGE, "id_size was greater then GEARMAN_MAX_ECHO_SIZE");
  362. }
  363. for (size_t x= 0; x < id_size; x++)
  364. {
  365. if (isgraph(id[x]) == false)
  366. {
  367. return gearman_error(universal, GEARMAN_INVALID_ARGUMENT, "Invalid character found in identifier");
  368. }
  369. }
  370. gearman_packet_st packet;
  371. gearman_return_t ret= gearman_packet_create_args(universal, packet, GEARMAN_MAGIC_REQUEST,
  372. GEARMAN_COMMAND_SET_CLIENT_ID,
  373. (const void**)&id, &id_size, 1);
  374. if (gearman_failed(ret))
  375. {
  376. #if 0
  377. assert_msg(universal.error.rc != GEARMAN_SUCCESS, "Programmer error, error returned but not recorded");
  378. #endif
  379. gearman_packet_free(&packet);
  380. return ret;
  381. }
  382. _push_blocking(universal, orig_block_universal);
  383. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  384. {
  385. ret= con->send_packet(packet, true);
  386. if (gearman_failed(ret))
  387. {
  388. #if 0
  389. assert_msg(con->universal.error.rc != GEARMAN_SUCCESS, "Programmer error, error returned but not recorded");
  390. #endif
  391. goto exit;
  392. }
  393. con->options.packet_in_use= true;
  394. gearman_packet_st *packet_ptr= con->receiving(con->_packet, ret, true);
  395. if (gearman_failed(ret))
  396. {
  397. #if 0
  398. assert_msg(con->universal.error.rc != GEARMAN_SUCCESS, "Programmer error, error returned but not recorded");
  399. #endif
  400. con->free_private_packet();
  401. con->recv_packet= NULL;
  402. goto exit;
  403. }
  404. assert(packet_ptr);
  405. if (con->_packet.data_size != id_size or
  406. memcmp(id, con->_packet.data, id_size))
  407. {
  408. #if 0
  409. assert_msg(con->universal.error.rc != GEARMAN_SUCCESS, "Programmer error, error returned but not recorded");
  410. #endif
  411. con->free_private_packet();
  412. con->recv_packet= NULL;
  413. ret= gearman_error(universal, GEARMAN_ECHO_DATA_CORRUPTION, "corruption during echo");
  414. goto exit;
  415. }
  416. con->recv_packet= NULL;
  417. con->free_private_packet();
  418. }
  419. ret= GEARMAN_SUCCESS;
  420. exit:
  421. gearman_packet_free(&packet);
  422. _pop_non_blocking(universal, orig_block_universal);
  423. return ret;
  424. }
  425. gearman_return_t gearman_echo(gearman_universal_st& universal,
  426. const void *workload,
  427. size_t workload_size)
  428. {
  429. bool orig_block_universal;
  430. if (workload == NULL)
  431. {
  432. return gearman_error(universal, GEARMAN_INVALID_ARGUMENT, "workload was NULL");
  433. }
  434. if (workload_size == 0)
  435. {
  436. return gearman_error(universal, GEARMAN_INVALID_ARGUMENT, "workload_size was 0");
  437. }
  438. if (workload_size > GEARMAN_MAX_ECHO_SIZE)
  439. {
  440. return gearman_error(universal, GEARMAN_ARGUMENT_TOO_LARGE, "workload_size was greater then GEARMAN_MAX_ECHO_SIZE");
  441. }
  442. gearman_packet_st packet;
  443. gearman_return_t ret= gearman_packet_create_args(universal, packet, GEARMAN_MAGIC_REQUEST,
  444. GEARMAN_COMMAND_ECHO_REQ,
  445. &workload, &workload_size, 1);
  446. if (gearman_failed(ret))
  447. {
  448. #if 0
  449. assert_msg(universal.error.rc != GEARMAN_SUCCESS, "Programmer error, error returned but not recorded");
  450. #endif
  451. gearman_packet_free(&packet);
  452. return ret;
  453. }
  454. _push_blocking(universal, orig_block_universal);
  455. for (gearman_connection_st *con= universal.con_list; con; con= con->next)
  456. {
  457. ret= con->send_packet(packet, true);
  458. if (gearman_failed(ret))
  459. {
  460. #if 0
  461. assert_msg(con->universal.error.rc != GEARMAN_SUCCESS, "Programmer error, error returned but not recorded");
  462. #endif
  463. goto exit;
  464. }
  465. con->options.packet_in_use= true;
  466. gearman_packet_st *packet_ptr= con->receiving(con->_packet, ret, true);
  467. if (gearman_failed(ret))
  468. {
  469. #if 0
  470. assert_msg(con->universal.error.rc != GEARMAN_SUCCESS, "Programmer error, error returned but not recorded");
  471. #endif
  472. con->free_private_packet();
  473. con->recv_packet= NULL;
  474. goto exit;
  475. }
  476. assert(packet_ptr);
  477. if (con->_packet.data_size != workload_size or
  478. memcmp(workload, con->_packet.data, workload_size))
  479. {
  480. #if 0
  481. assert_msg(con->universal.error.rc != GEARMAN_SUCCESS, "Programmer error, error returned but not recorded");
  482. #endif
  483. con->free_private_packet();
  484. con->recv_packet= NULL;
  485. ret= gearman_error(universal, GEARMAN_ECHO_DATA_CORRUPTION, "corruption during echo");
  486. goto exit;
  487. }
  488. con->recv_packet= NULL;
  489. con->free_private_packet();
  490. }
  491. ret= GEARMAN_SUCCESS;
  492. exit:
  493. gearman_packet_free(&packet);
  494. _pop_non_blocking(universal, orig_block_universal);
  495. return ret;
  496. }
  497. bool gearman_request_option(gearman_universal_st &universal,
  498. gearman_string_t &option)
  499. {
  500. bool orig_block_universal;
  501. const void *args[]= { gearman_c_str(option) };
  502. size_t args_size[]= { gearman_size(option) };
  503. gearman_packet_st packet;
  504. gearman_return_t ret= gearman_packet_create_args(universal, packet, GEARMAN_MAGIC_REQUEST,
  505. GEARMAN_COMMAND_OPTION_REQ,
  506. args, args_size, 1);
  507. if (gearman_failed(ret))
  508. {
  509. gearman_packet_free(&packet);
  510. gearman_error(universal, GEARMAN_MEMORY_ALLOCATION_FAILURE, "gearman_packet_create_args()");
  511. return ret;
  512. }
  513. _push_blocking(universal, orig_block_universal);
  514. for (gearman_connection_st *con= universal.con_list; con != NULL; con= con->next)
  515. {
  516. ret= con->send_packet(packet, true);
  517. if (gearman_failed(ret))
  518. {
  519. goto exit;
  520. }
  521. gearman_packet_st recv_packet;
  522. assert(con->recv_state == GEARMAN_CON_RECV_UNIVERSAL_NONE);
  523. gearman_packet_st *packet_ptr= con->receiving(recv_packet, ret, true);
  524. if (ret == GEARMAN_NOT_CONNECTED)
  525. {
  526. goto exit;
  527. }
  528. else if (gearman_failed(ret))
  529. {
  530. con->recv_packet= NULL;
  531. gearman_packet_free(&recv_packet);
  532. goto exit;
  533. }
  534. assert(packet_ptr);
  535. if (packet_ptr->command == GEARMAN_COMMAND_ERROR)
  536. {
  537. con->recv_packet= NULL;
  538. gearman_packet_free(&recv_packet);
  539. ret= gearman_error(universal, GEARMAN_INVALID_ARGUMENT, "invalid server option");
  540. goto exit;
  541. }
  542. gearman_packet_free(&recv_packet);
  543. }
  544. ret= GEARMAN_SUCCESS;
  545. exit:
  546. gearman_packet_free(&packet);
  547. _pop_non_blocking(universal, orig_block_universal);
  548. return gearman_success(ret);
  549. }
  550. void gearman_free_all_packets(gearman_universal_st &universal)
  551. {
  552. while (universal.packet_list)
  553. {
  554. gearman_packet_free(universal.packet_list);
  555. }
  556. }
  557. gearman_id_t gearman_universal_id(gearman_universal_st &universal)
  558. {
  559. gearman_id_t handle= { universal.wakeup_fd[0], universal.wakeup_fd[1] };
  560. return handle;
  561. }
  562. /*
  563. * Local Definitions
  564. */
  565. void gearman_universal_set_namespace(gearman_universal_st& universal, const char *namespace_key, size_t namespace_key_size)
  566. {
  567. gearman_string_free(universal._namespace);
  568. universal._namespace= gearman_string_create(NULL, namespace_key, namespace_key_size);
  569. }