pubsub_publish.cc 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include <google/pubsub/v1/pubsub.grpc.pb.h>
  3. #include <grpcpp/grpcpp.h>
  4. #include <stdexcept>
  5. #include "pubsub_publish.h"
  6. #define EVENT_CHECK_TIMEOUT 50
  7. struct response {
  8. grpc::ClientContext *context;
  9. google::pubsub::v1::PublishResponse *publish_response;
  10. size_t tag;
  11. grpc::Status *status;
  12. size_t published_metrics;
  13. size_t published_bytes;
  14. };
  15. static inline void copy_error_message(char *error_message_dst, const char *error_message_src)
  16. {
  17. std::strncpy(error_message_dst, error_message_src, ERROR_LINE_MAX);
  18. error_message_dst[ERROR_LINE_MAX] = '\0';
  19. }
  20. /**
  21. * Initialize a Pub/Sub client and a data structure for responses.
  22. *
  23. * @param pubsub_specific_data_p a pointer to a structure with instance-wide data.
  24. * @param error_message report error message to a caller.
  25. * @param destination a Pub/Sub service endpoint.
  26. * @param credentials_file a full path for a file with google application credentials.
  27. * @param project_id a project ID.
  28. * @param topic_id a topic ID.
  29. * @return Returns 0 on success, 1 on failure.
  30. */
  31. int pubsub_init(
  32. void *pubsub_specific_data_p, char *error_message, const char *destination, const char *credentials_file,
  33. const char *project_id, const char *topic_id)
  34. {
  35. struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
  36. try {
  37. setenv("GOOGLE_APPLICATION_CREDENTIALS", credentials_file, 0);
  38. std::shared_ptr<grpc::ChannelCredentials> credentials = grpc::GoogleDefaultCredentials();
  39. if (credentials == nullptr) {
  40. copy_error_message(error_message, "Can't load credentials");
  41. return 1;
  42. }
  43. std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel(destination, credentials);
  44. google::pubsub::v1::Publisher::Stub *stub = new google::pubsub::v1::Publisher::Stub(channel);
  45. if (!stub) {
  46. copy_error_message(error_message, "Can't create a publisher stub");
  47. return 1;
  48. }
  49. connector_specific_data->stub = stub;
  50. google::pubsub::v1::PublishRequest *request = new google::pubsub::v1::PublishRequest;
  51. connector_specific_data->request = request;
  52. ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request))
  53. ->set_topic(std::string("projects/") + project_id + "/topics/" + topic_id);
  54. grpc::CompletionQueue *cq = new grpc::CompletionQueue;
  55. connector_specific_data->completion_queue = cq;
  56. connector_specific_data->responses = new std::list<struct response>;
  57. return 0;
  58. } catch (std::exception const &ex) {
  59. std::string em(std::string("Standard exception raised: ") + ex.what());
  60. copy_error_message(error_message, em.c_str());
  61. return 1;
  62. }
  63. return 0;
  64. }
  65. /**
  66. * Clean the PubSub connector instance specific data
  67. */
  68. void pubsub_cleanup(void *pubsub_specific_data_p)
  69. {
  70. struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
  71. std::list<struct response> *responses = (std::list<struct response> *)connector_specific_data->responses;
  72. std::list<struct response>::iterator response;
  73. for (response = responses->begin(); response != responses->end(); ++response) {
  74. // TODO: If we do this, there are a huge amount of possibly lost records. We need to find a right way of
  75. // cleaning up contexts
  76. // delete response->context;
  77. delete response->publish_response;
  78. delete response->status;
  79. }
  80. delete responses;
  81. ((grpc::CompletionQueue *)connector_specific_data->completion_queue)->Shutdown();
  82. delete (grpc::CompletionQueue *)connector_specific_data->completion_queue;
  83. delete (google::pubsub::v1::PublishRequest *)connector_specific_data->request;
  84. delete (google::pubsub::v1::Publisher::Stub *)connector_specific_data->stub;
  85. // TODO: Find how to shutdown grpc gracefully. grpc_shutdown() doesn't seem to work.
  86. // grpc_shutdown();
  87. return;
  88. }
  89. /**
  90. * Add data to a Pub/Sub request message.
  91. *
  92. * @param pubsub_specific_data_p a pointer to a structure with instance-wide data.
  93. * @param data a text buffer with metrics.
  94. * @return Returns 0 on success, 1 on failure.
  95. */
  96. int pubsub_add_message(void *pubsub_specific_data_p, char *data)
  97. {
  98. struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
  99. try {
  100. google::pubsub::v1::PubsubMessage *message =
  101. ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request))->add_messages();
  102. if (!message)
  103. return 1;
  104. message->set_data(data);
  105. } catch (std::exception const &ex) {
  106. return 1;
  107. }
  108. return 0;
  109. }
  110. /**
  111. * Send data to the Pub/Sub service
  112. *
  113. * @param pubsub_specific_data_p a pointer to a structure with client and request outcome information.
  114. * @param error_message report error message to a caller.
  115. * @param buffered_metrics the number of metrics we are going to send.
  116. * @param buffered_bytes the number of bytes we are going to send.
  117. * @return Returns 0 on success, 1 on failure.
  118. */
  119. int pubsub_publish(void *pubsub_specific_data_p, char *error_message, size_t buffered_metrics, size_t buffered_bytes)
  120. {
  121. struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
  122. try {
  123. grpc::ClientContext *context = new grpc::ClientContext;
  124. std::unique_ptr<grpc::ClientAsyncResponseReader<google::pubsub::v1::PublishResponse> > rpc(
  125. ((google::pubsub::v1::Publisher::Stub *)(connector_specific_data->stub))
  126. ->AsyncPublish(
  127. context, (*(google::pubsub::v1::PublishRequest *)(connector_specific_data->request)),
  128. ((grpc::CompletionQueue *)(connector_specific_data->completion_queue))));
  129. struct response response;
  130. response.context = context;
  131. response.publish_response = new google::pubsub::v1::PublishResponse;
  132. response.tag = connector_specific_data->last_tag++;
  133. response.status = new grpc::Status;
  134. response.published_metrics = buffered_metrics;
  135. response.published_bytes = buffered_bytes;
  136. rpc->Finish(response.publish_response, response.status, (void *)response.tag);
  137. ((google::pubsub::v1::PublishRequest *)(connector_specific_data->request))->clear_messages();
  138. ((std::list<struct response> *)(connector_specific_data->responses))->push_back(response);
  139. } catch (std::exception const &ex) {
  140. std::string em(std::string("Standard exception raised: ") + ex.what());
  141. copy_error_message(error_message, em.c_str());
  142. return 1;
  143. }
  144. return 0;
  145. }
  146. /**
  147. * Get results from service responses
  148. *
  149. * @param pubsub_specific_data_p a pointer to a structure with instance-wide data.
  150. * @param error_message report error message to a caller.
  151. * @param sent_metrics report to a caller how many metrics was successfully sent.
  152. * @param sent_bytes report to a caller how many bytes was successfully sent.
  153. * @param lost_metrics report to a caller how many metrics was lost during transmission.
  154. * @param lost_bytes report to a caller how many bytes was lost during transmission.
  155. * @return Returns 0 if all data was sent successfully, 1 when data was lost on transmission.
  156. */
  157. int pubsub_get_result(
  158. void *pubsub_specific_data_p, char *error_message,
  159. size_t *sent_metrics, size_t *sent_bytes, size_t *lost_metrics, size_t *lost_bytes)
  160. {
  161. struct pubsub_specific_data *connector_specific_data = (struct pubsub_specific_data *)pubsub_specific_data_p;
  162. std::list<struct response> *responses = (std::list<struct response> *)connector_specific_data->responses;
  163. grpc::CompletionQueue::NextStatus next_status;
  164. *sent_metrics = 0;
  165. *sent_bytes = 0;
  166. *lost_metrics = 0;
  167. *lost_bytes = 0;
  168. try {
  169. do {
  170. std::list<struct response>::iterator response;
  171. void *got_tag;
  172. bool ok = false;
  173. auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(50);
  174. next_status = (*(grpc::CompletionQueue *)(connector_specific_data->completion_queue))
  175. .AsyncNext(&got_tag, &ok, deadline);
  176. if (next_status == grpc::CompletionQueue::GOT_EVENT) {
  177. for (response = responses->begin(); response != responses->end(); ++response) {
  178. if ((void *)response->tag == got_tag)
  179. break;
  180. }
  181. if (response == responses->end()) {
  182. copy_error_message(error_message, "Cannot get Pub/Sub response");
  183. return 1;
  184. }
  185. if (ok && response->publish_response->message_ids_size()) {
  186. *sent_metrics += response->published_metrics;
  187. *sent_bytes += response->published_bytes;
  188. } else {
  189. *lost_metrics += response->published_metrics;
  190. *lost_bytes += response->published_bytes;
  191. response->status->error_message().copy(error_message, ERROR_LINE_MAX);
  192. error_message[ERROR_LINE_MAX] = '\0';
  193. }
  194. delete response->context;
  195. delete response->publish_response;
  196. delete response->status;
  197. responses->erase(response);
  198. }
  199. if (next_status == grpc::CompletionQueue::SHUTDOWN) {
  200. copy_error_message(error_message, "Completion queue shutdown");
  201. return 1;
  202. }
  203. } while (next_status == grpc::CompletionQueue::GOT_EVENT);
  204. } catch (std::exception const &ex) {
  205. std::string em(std::string("Standard exception raised: ") + ex.what());
  206. copy_error_message(error_message, em.c_str());
  207. return 1;
  208. }
  209. if (*lost_metrics) {
  210. return 1;
  211. }
  212. return 0;
  213. }