protocol.cc 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510
  1. /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
  2. *
  3. * Gearmand client and server library.
  4. *
  5. * Copyright (C) 2012 Data Differential, http://datadifferential.com/
  6. * All rights reserved.
  7. *
  8. * Redistribution and use in source and binary forms, with or without
  9. * modification, are permitted provided that the following conditions are
  10. * met:
  11. *
  12. * * Redistributions of source code must retain the above copyright
  13. * notice, this list of conditions and the following disclaimer.
  14. *
  15. * * Redistributions in binary form must reproduce the above
  16. * copyright notice, this list of conditions and the following disclaimer
  17. * in the documentation and/or other materials provided with the
  18. * distribution.
  19. *
  20. * * The names of its contributors may not be used to endorse or
  21. * promote products derived from this software without specific prior
  22. * written permission.
  23. *
  24. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  25. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  26. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  27. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  28. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  29. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  30. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  31. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  32. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  33. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  34. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  35. *
  36. */
  37. /**
  38. * @file
  39. * @brief Gear Protocol Definitions
  40. */
  41. #include "gear_config.h"
  42. #include "configmake.h"
  43. #include <libgearman-server/common.h>
  44. #include <libgearman/strcommand.h>
  45. #include <libgearman-server/packet.h>
  46. #include "libgearman/strcommand.h"
  47. #include <cstdio>
  48. #include <cstdlib>
  49. #include "libgearman/ssl.h"
  50. #include <libgearman-server/plugins/protocol/gear/protocol.h>
  51. #include "libgearman/command.h"
  52. static gearmand_error_t gearmand_packet_unpack_header(gearmand_packet_st *packet)
  53. {
  54. uint32_t tmp;
  55. if (memcmp(packet->args, "\0REQ", 4) == 0)
  56. {
  57. packet->magic= GEARMAN_MAGIC_REQUEST;
  58. }
  59. else if (memcmp(packet->args, "\0RES", 4) == 0)
  60. {
  61. packet->magic= GEARMAN_MAGIC_RESPONSE;
  62. }
  63. else
  64. {
  65. gearmand_warning("invalid magic value");
  66. return GEARMAND_INVALID_MAGIC;
  67. }
  68. memcpy(&tmp, packet->args + 4, 4);
  69. packet->command= static_cast<gearman_command_t>(ntohl(tmp));
  70. if (packet->command == GEARMAN_COMMAND_TEXT ||
  71. packet->command >= GEARMAN_COMMAND_MAX)
  72. {
  73. gearmand_error("invalid command value");
  74. return GEARMAND_INVALID_COMMAND;
  75. }
  76. memcpy(&tmp, packet->args + 8, 4);
  77. packet->data_size= ntohl(tmp);
  78. return GEARMAND_SUCCESS;
  79. }
  80. class Geartext : public gearmand::protocol::Context {
  81. public:
  82. ~Geartext()
  83. { }
  84. bool is_owner()
  85. {
  86. return false;
  87. }
  88. void notify(gearman_server_con_st* connection)
  89. {
  90. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Gear connection disconnected: %s:%s", connection->host(), connection->port());
  91. }
  92. size_t unpack(gearmand_packet_st *packet,
  93. gearman_server_con_st *,
  94. const void *data, const size_t data_size,
  95. gearmand_error_t& ret_ptr)
  96. {
  97. size_t used_size;
  98. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Gear unpack");
  99. if (packet->args_size == 0)
  100. {
  101. if (data_size > 0 && ((uint8_t *)data)[0] != 0)
  102. {
  103. /* Try to parse a text-based command. */
  104. uint8_t* ptr= (uint8_t *)memchr(data, '\n', data_size);
  105. if (ptr == NULL)
  106. {
  107. ret_ptr= GEARMAND_IO_WAIT;
  108. return 0;
  109. }
  110. packet->magic= GEARMAN_MAGIC_TEXT;
  111. packet->command= GEARMAN_COMMAND_TEXT;
  112. used_size= size_t(ptr - ((uint8_t *)data)) +1;
  113. *ptr= 0;
  114. if (used_size > 1 && *(ptr - 1) == '\r')
  115. {
  116. *(ptr - 1)= 0;
  117. }
  118. size_t arg_size;
  119. for (arg_size= used_size, ptr= (uint8_t *)data; ptr != NULL; data= ptr)
  120. {
  121. ptr= (uint8_t *)memchr(data, ' ', arg_size);
  122. if (ptr != NULL)
  123. {
  124. *ptr= 0;
  125. ptr++;
  126. while (*ptr == ' ')
  127. {
  128. ptr++;
  129. }
  130. arg_size-= size_t(ptr - ((uint8_t *)data));
  131. }
  132. ret_ptr= gearmand_packet_create(packet, data, ptr == NULL ? arg_size :
  133. size_t(ptr - ((uint8_t *)data)));
  134. if (ret_ptr != GEARMAND_SUCCESS)
  135. {
  136. return used_size;
  137. }
  138. }
  139. return used_size;
  140. }
  141. else if (data_size < GEARMAND_PACKET_HEADER_SIZE)
  142. {
  143. ret_ptr= GEARMAND_IO_WAIT;
  144. return 0;
  145. }
  146. packet->args= packet->args_buffer;
  147. packet->args_size= GEARMAND_PACKET_HEADER_SIZE;
  148. memcpy(packet->args, data, GEARMAND_PACKET_HEADER_SIZE);
  149. if (gearmand_failed(ret_ptr= gearmand_packet_unpack_header(packet)))
  150. {
  151. return 0;
  152. }
  153. used_size= GEARMAND_PACKET_HEADER_SIZE;
  154. }
  155. else
  156. {
  157. used_size= 0;
  158. }
  159. while (packet->argc != gearman_command_info(packet->command)->argc)
  160. {
  161. if (packet->argc != (gearman_command_info(packet->command)->argc - 1) or
  162. gearman_command_info(packet->command)->data)
  163. {
  164. uint8_t* ptr= (uint8_t *)memchr(((uint8_t *)data) +used_size, 0,
  165. data_size -used_size);
  166. if (ptr == NULL)
  167. {
  168. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  169. "Possible protocol error for %s, received only %u args",
  170. gearman_command_info(packet->command)->name, packet->argc);
  171. ret_ptr= GEARMAND_IO_WAIT;
  172. return used_size;
  173. }
  174. size_t arg_size= size_t(ptr - (((uint8_t *)data) + used_size)) +1;
  175. if (gearmand_failed((ret_ptr= gearmand_packet_create(packet, ((uint8_t *)data) + used_size, arg_size))))
  176. {
  177. return used_size;
  178. }
  179. packet->data_size-= arg_size;
  180. used_size+= arg_size;
  181. }
  182. else
  183. {
  184. if ((data_size - used_size) < packet->data_size)
  185. {
  186. ret_ptr= GEARMAND_IO_WAIT;
  187. return used_size;
  188. }
  189. ret_ptr= gearmand_packet_create(packet, ((uint8_t *)data) + used_size, packet->data_size);
  190. if (gearmand_failed(ret_ptr))
  191. {
  192. return used_size;
  193. }
  194. used_size+= packet->data_size;
  195. packet->data_size= 0;
  196. }
  197. }
  198. if (packet->command == GEARMAN_COMMAND_ECHO_REQ and packet->data_size)
  199. {
  200. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  201. "GEAR %s length: %" PRIu64,
  202. gearman_strcommand(packet->command),
  203. uint64_t(packet->data_size));
  204. }
  205. else if (packet->command == GEARMAN_COMMAND_TEXT and packet->data_size)
  206. {
  207. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  208. "GEAR %s text: %.*s",
  209. gearman_strcommand(packet->command),
  210. int(packet->data_size),
  211. packet->data);
  212. }
  213. else if (packet->command == GEARMAN_COMMAND_OPTION_REQ and packet->arg_size[0])
  214. {
  215. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  216. "GEAR %s option: %.*s",
  217. gearman_strcommand(packet->command),
  218. int(packet->arg_size[0]),
  219. packet->arg[0]);
  220. }
  221. else if (packet->command == GEARMAN_COMMAND_WORK_EXCEPTION and packet->data_size)
  222. {
  223. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  224. "GEAR %s handle: %.*s exception: %.*s",
  225. gearman_strcommand(packet->command),
  226. int(packet->arg_size[0]),
  227. packet->arg[0],
  228. int(packet->data_size),
  229. packet->data);
  230. }
  231. else if (packet->command == GEARMAN_COMMAND_WORK_FAIL and packet->arg_size[0])
  232. {
  233. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  234. "GEAR %s handle: %.*s",
  235. gearman_strcommand(packet->command),
  236. int(packet->arg_size[0]),
  237. packet->arg[0]);
  238. }
  239. else if (packet->command == GEARMAN_COMMAND_SET_CLIENT_ID and packet->arg_size[0])
  240. {
  241. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  242. "GEAR %s identifier: %.*s",
  243. gearman_strcommand(packet->command),
  244. int(packet->arg_size[0]),
  245. packet->arg[0]);
  246. }
  247. else
  248. {
  249. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  250. "GEAR %s",
  251. gearman_strcommand(packet->command));
  252. }
  253. ret_ptr= GEARMAND_SUCCESS;
  254. return used_size;
  255. }
  256. size_t pack(const gearmand_packet_st *packet,
  257. gearman_server_con_st*,
  258. void *data, const size_t data_size,
  259. gearmand_error_t& ret_ptr)
  260. {
  261. if (packet->command == GEARMAN_COMMAND_ECHO_RES and packet->data_size)
  262. {
  263. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  264. "GEAR %s length: %" PRIu64,
  265. gearman_strcommand(packet->command),
  266. uint64_t(packet->data_size));
  267. }
  268. else if (packet->command == GEARMAN_COMMAND_OPTION_RES and packet->arg_size[0])
  269. {
  270. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  271. "GEAR %s option: %.*s",
  272. gearman_strcommand(packet->command),
  273. int(packet->arg_size[0]),
  274. packet->arg[0]);
  275. }
  276. else
  277. {
  278. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM,
  279. "GEAR %s",
  280. gearman_strcommand(packet->command));
  281. }
  282. if (packet->args_size == 0)
  283. {
  284. ret_ptr= GEARMAND_SUCCESS;
  285. return 0;
  286. }
  287. if (packet->args_size > data_size)
  288. {
  289. ret_ptr= GEARMAND_FLUSH_DATA;
  290. return 0;
  291. }
  292. memcpy(data, packet->args, packet->args_size);
  293. ret_ptr= GEARMAND_SUCCESS;
  294. return packet->args_size;
  295. }
  296. private:
  297. };
  298. static Geartext gear_context;
  299. static gearmand_error_t _gear_con_remove(gearman_server_con_st* connection)
  300. {
  301. #if defined(HAVE_SSL) && HAVE_SSL
  302. if (connection->_ssl)
  303. {
  304. SSL_shutdown(connection->_ssl);
  305. SSL_free(connection->_ssl);
  306. connection->_ssl= NULL;
  307. }
  308. #else
  309. (void)connection;
  310. #endif
  311. return GEARMAND_SUCCESS;
  312. }
  313. static gearmand_error_t _gear_con_add(gearman_server_con_st *connection)
  314. {
  315. #if defined(HAVE_SSL) && HAVE_SSL
  316. if (Gearmand()->ctx_ssl())
  317. {
  318. if ((connection->_ssl= SSL_new(Gearmand()->ctx_ssl())) == NULL)
  319. {
  320. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_MEMORY_ALLOCATION_FAILURE, "SSL_new() failed to return a valid object");
  321. }
  322. SSL_set_fd(connection->_ssl, connection->con.fd());
  323. while (SSL_accept(connection->_ssl) != SSL_SUCCESS)
  324. {
  325. int cyassl_error= SSL_get_error(connection->_ssl, 0);
  326. switch (cyassl_error)
  327. {
  328. case SSL_ERROR_WANT_READ:
  329. case SSL_ERROR_WANT_WRITE:
  330. continue;
  331. default:
  332. char cyassl_error_buffer[SSL_ERROR_SIZE]= { 0 };
  333. ERR_error_string_n(cyassl_error, cyassl_error_buffer, sizeof(cyassl_error_buffer));
  334. return gearmand_log_gerror(GEARMAN_DEFAULT_LOG_PARAM, GEARMAND_LOST_CONNECTION, "%s:%s %s(%d)",
  335. connection->host(),
  336. connection->port(),
  337. cyassl_error_buffer, cyassl_error);
  338. }
  339. }
  340. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "GearSSL connection made: %s:%s", connection->host(), connection->port());
  341. }
  342. else
  343. #endif
  344. {
  345. gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "Gear connection made: %s:%s", connection->host(), connection->port());
  346. }
  347. connection->set_protocol(&gear_context);
  348. return GEARMAND_SUCCESS;
  349. }
  350. namespace gearmand {
  351. namespace protocol {
  352. Gear::Gear() :
  353. Plugin("Gear"),
  354. _port(GEARMAN_DEFAULT_TCP_PORT_STRING),
  355. _ssl_ca_file(GEARMAND_CA_CERTIFICATE),
  356. _ssl_certificate(GEARMAND_SERVER_PEM),
  357. _ssl_key(GEARMAND_SERVER_KEY),
  358. opt_ssl(false)
  359. {
  360. command_line_options().add_options()
  361. ("port,p", boost::program_options::value(&_port)->default_value(GEARMAN_DEFAULT_TCP_PORT_STRING),
  362. "Port the server should listen on.")
  363. ("ssl", boost::program_options::bool_switch(&opt_ssl)->default_value(false),
  364. "Enable ssl connections.")
  365. ("ssl-ca-file", boost::program_options::value(&_ssl_ca_file),
  366. "CA file.")
  367. ("ssl-certificate", boost::program_options::value(&_ssl_certificate),
  368. "SSL certificate.")
  369. ("ssl-key", boost::program_options::value(&_ssl_key),
  370. "SSL key for certificate.")
  371. ;
  372. }
  373. Gear::~Gear()
  374. {
  375. }
  376. gearmand_error_t Gear::start(gearmand_st *gearmand)
  377. {
  378. gearmand_error_t rc;
  379. if (_port.compare(GEARMAN_DEFAULT_TCP_PORT_STRING) == 0)
  380. {
  381. char* service;
  382. if ((service= getenv("GEARMAND_PORT")) and service[0])
  383. {
  384. _port.clear();
  385. _port.append(service);
  386. }
  387. }
  388. if (_port.empty())
  389. {
  390. const char* service= GEARMAN_DEFAULT_TCP_PORT_STRING;
  391. struct servent *gearman_servent;
  392. if ((gearman_servent= getservbyname(GEARMAN_DEFAULT_TCP_SERVICE, NULL)))
  393. {
  394. if (gearman_servent and gearman_servent->s_name)
  395. {
  396. service= gearman_servent->s_name;
  397. }
  398. }
  399. _port.clear();
  400. _port.append(service);
  401. }
  402. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Initializing Gear on port %s with SSL: %s", _port.c_str(), opt_ssl ? "true" : "false");
  403. #if defined(HAVE_SSL) && HAVE_SSL
  404. if (opt_ssl)
  405. {
  406. if (getenv("GEARMAND_CA_CERTIFICATE"))
  407. {
  408. _ssl_ca_file= getenv("GEARMAND_CA_CERTIFICATE");
  409. }
  410. if (getenv("GEARMAND_SERVER_PEM"))
  411. {
  412. _ssl_certificate= getenv("GEARMAND_SERVER_PEM");
  413. }
  414. if (getenv("GEARMAND_SERVER_KEY"))
  415. {
  416. _ssl_key= getenv("GEARMAND_SERVER_KEY");
  417. }
  418. gearmand->init_ssl();
  419. if (SSL_CTX_load_verify_locations(gearmand->ctx_ssl(), _ssl_ca_file.c_str(), 0) != SSL_SUCCESS)
  420. {
  421. gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "SSL_CTX_load_verify_locations() cannot local the ca certificate %s", _ssl_ca_file.c_str());
  422. }
  423. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Loading CA certificate : %s", _ssl_ca_file.c_str());
  424. if (SSL_CTX_use_certificate_file(gearmand->ctx_ssl(), _ssl_certificate.c_str(), SSL_FILETYPE_PEM) != SSL_SUCCESS)
  425. {
  426. gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "SSL_CTX_use_certificate_file() cannot obtain certificate %s", _ssl_certificate.c_str());
  427. }
  428. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Loading certificate : %s", _ssl_certificate.c_str());
  429. if (SSL_CTX_use_PrivateKey_file(gearmand->ctx_ssl(), _ssl_key.c_str(), SSL_FILETYPE_PEM) != SSL_SUCCESS)
  430. {
  431. gearmand_log_fatal(GEARMAN_DEFAULT_LOG_PARAM, "SSL_CTX_use_PrivateKey_file() cannot obtain certificate %s", _ssl_key.c_str());
  432. }
  433. gearmand_log_info(GEARMAN_DEFAULT_LOG_PARAM, "Loading certificate key : %s", _ssl_key.c_str());
  434. assert(gearmand->ctx_ssl());
  435. }
  436. #endif
  437. rc= gearmand_port_add(gearmand, _port.c_str(), _gear_con_add, _gear_con_remove);
  438. return rc;
  439. }
  440. } // namespace protocol
  441. } // namespace gearmand