zmqsend.c 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. /*
  2. * Copyright (c) 2013 Stefano Sabatini
  3. *
  4. * This file is part of FFmpeg.
  5. *
  6. * FFmpeg is free software; you can redistribute it and/or
  7. * modify it under the terms of the GNU Lesser General Public
  8. * License as published by the Free Software Foundation; either
  9. * version 2.1 of the License, or (at your option) any later version.
  10. *
  11. * FFmpeg is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. * Lesser General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Lesser General Public
  17. * License along with FFmpeg; if not, write to the Free Software
  18. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  19. */
  20. #include "config.h"
  21. #include <stdio.h>
  22. #include <string.h>
  23. #include <zmq.h>
  24. #include "libavutil/log.h"
  25. #include "libavutil/mem.h"
  26. #include "libavutil/bprint.h"
  27. #if HAVE_UNISTD_H
  28. #include <unistd.h> /* getopt */
  29. #endif
  30. #if !HAVE_GETOPT
  31. #include "compat/getopt.c"
  32. #endif
  33. /**
  34. * @file
  35. * zmq message sender example, meant to be used with the zmq filters
  36. */
  37. static void usage(void)
  38. {
  39. printf("send message to ZMQ recipient, to use with the zmq filters\n");
  40. printf("usage: zmqsend [OPTIONS]\n");
  41. printf("\n"
  42. "Options:\n"
  43. "-b ADDRESS set bind address\n"
  44. "-h print this help\n"
  45. "-i INFILE set INFILE as input file, stdin if omitted\n");
  46. }
  47. int main(int argc, char **argv)
  48. {
  49. AVBPrint src;
  50. char *src_buf, *recv_buf;
  51. int c;
  52. int recv_buf_size, ret = 0;
  53. void *zmq_ctx, *socket;
  54. const char *bind_address = "tcp://localhost:5555";
  55. const char *infilename = NULL;
  56. FILE *infile = NULL;
  57. zmq_msg_t msg;
  58. while ((c = getopt(argc, argv, "b:hi:")) != -1) {
  59. switch (c) {
  60. case 'b':
  61. bind_address = optarg;
  62. break;
  63. case 'h':
  64. usage();
  65. return 0;
  66. case 'i':
  67. infilename = optarg;
  68. break;
  69. case '?':
  70. return 1;
  71. }
  72. }
  73. if (!infilename || !strcmp(infilename, "-")) {
  74. infilename = "stdin";
  75. infile = stdin;
  76. } else {
  77. infile = fopen(infilename, "r");
  78. }
  79. if (!infile) {
  80. av_log(NULL, AV_LOG_ERROR,
  81. "Impossible to open input file '%s': %s\n", infilename, strerror(errno));
  82. return 1;
  83. }
  84. zmq_ctx = zmq_ctx_new();
  85. if (!zmq_ctx) {
  86. av_log(NULL, AV_LOG_ERROR,
  87. "Could not create ZMQ context: %s\n", zmq_strerror(errno));
  88. return 1;
  89. }
  90. socket = zmq_socket(zmq_ctx, ZMQ_REQ);
  91. if (!socket) {
  92. av_log(NULL, AV_LOG_ERROR,
  93. "Could not create ZMQ socket: %s\n", zmq_strerror(errno));
  94. ret = 1;
  95. goto end;
  96. }
  97. if (zmq_connect(socket, bind_address) == -1) {
  98. av_log(NULL, AV_LOG_ERROR, "Could not bind ZMQ responder to address '%s': %s\n",
  99. bind_address, zmq_strerror(errno));
  100. ret = 1;
  101. goto end;
  102. }
  103. /* grab the input and store it in src */
  104. av_bprint_init(&src, 1, AV_BPRINT_SIZE_UNLIMITED);
  105. while ((c = fgetc(infile)) != EOF)
  106. av_bprint_chars(&src, c, 1);
  107. av_bprint_chars(&src, 0, 1);
  108. if (!av_bprint_is_complete(&src)) {
  109. av_log(NULL, AV_LOG_ERROR, "Could not allocate a buffer for the source string\n");
  110. av_bprint_finalize(&src, NULL);
  111. ret = 1;
  112. goto end;
  113. }
  114. av_bprint_finalize(&src, &src_buf);
  115. if (zmq_send(socket, src_buf, strlen(src_buf), 0) == -1) {
  116. av_log(NULL, AV_LOG_ERROR, "Could not send message: %s\n", zmq_strerror(errno));
  117. ret = 1;
  118. goto end;
  119. }
  120. if (zmq_msg_init(&msg) == -1) {
  121. av_log(NULL, AV_LOG_ERROR,
  122. "Could not initialize receiving message: %s\n", zmq_strerror(errno));
  123. ret = 1;
  124. goto end;
  125. }
  126. if (zmq_msg_recv(&msg, socket, 0) == -1) {
  127. av_log(NULL, AV_LOG_ERROR,
  128. "Could not receive message: %s\n", zmq_strerror(errno));
  129. zmq_msg_close(&msg);
  130. ret = 1;
  131. goto end;
  132. }
  133. recv_buf_size = zmq_msg_size(&msg) + 1;
  134. recv_buf = av_malloc(recv_buf_size);
  135. if (!recv_buf) {
  136. av_log(NULL, AV_LOG_ERROR,
  137. "Could not allocate receiving message buffer\n");
  138. zmq_msg_close(&msg);
  139. ret = 1;
  140. goto end;
  141. }
  142. memcpy(recv_buf, zmq_msg_data(&msg), recv_buf_size - 1);
  143. recv_buf[recv_buf_size-1] = 0;
  144. printf("%s\n", recv_buf);
  145. zmq_msg_close(&msg);
  146. av_free(recv_buf);
  147. end:
  148. zmq_close(socket);
  149. zmq_ctx_destroy(zmq_ctx);
  150. return ret;
  151. }