api-threadmessage-test.c 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. /*
  2. * Permission is hereby granted, free of charge, to any person obtaining a copy
  3. * of this software and associated documentation files (the "Software"), to deal
  4. * in the Software without restriction, including without limitation the rights
  5. * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  6. * copies of the Software, and to permit persons to whom the Software is
  7. * furnished to do so, subject to the following conditions:
  8. *
  9. * The above copyright notice and this permission notice shall be included in
  10. * all copies or substantial portions of the Software.
  11. *
  12. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  13. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  14. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  15. * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  16. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  17. * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  18. * THE SOFTWARE.
  19. */
  20. /**
  21. * Thread message API test
  22. */
  23. #include "libavutil/avassert.h"
  24. #include "libavutil/avstring.h"
  25. #include "libavutil/frame.h"
  26. #include "libavutil/threadmessage.h"
  27. #include "libavutil/thread.h" // not public
  28. struct sender_data {
  29. int id;
  30. pthread_t tid;
  31. int workload;
  32. AVThreadMessageQueue *queue;
  33. };
  34. /* same as sender_data but shuffled for testing purpose */
  35. struct receiver_data {
  36. pthread_t tid;
  37. int workload;
  38. int id;
  39. AVThreadMessageQueue *queue;
  40. };
  41. struct message {
  42. AVFrame *frame;
  43. // we add some junk in the message to make sure the message size is >
  44. // sizeof(void*)
  45. int magic;
  46. };
  47. #define MAGIC 0xdeadc0de
  48. static void free_frame(void *arg)
  49. {
  50. struct message *msg = arg;
  51. av_assert0(msg->magic == MAGIC);
  52. av_frame_free(&msg->frame);
  53. }
  54. static void *sender_thread(void *arg)
  55. {
  56. int i, ret = 0;
  57. struct sender_data *wd = arg;
  58. av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload);
  59. for (i = 0; i < wd->workload; i++) {
  60. if (rand() % wd->workload < wd->workload / 10) {
  61. av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id);
  62. av_thread_message_flush(wd->queue);
  63. } else {
  64. char *val;
  65. AVDictionary *meta = NULL;
  66. struct message msg = {
  67. .magic = MAGIC,
  68. .frame = av_frame_alloc(),
  69. };
  70. if (!msg.frame) {
  71. ret = AVERROR(ENOMEM);
  72. break;
  73. }
  74. /* we add some metadata to identify the frames */
  75. val = av_asprintf("frame %d/%d from sender %d",
  76. i + 1, wd->workload, wd->id);
  77. if (!val) {
  78. av_frame_free(&msg.frame);
  79. ret = AVERROR(ENOMEM);
  80. break;
  81. }
  82. ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL);
  83. if (ret < 0) {
  84. av_frame_free(&msg.frame);
  85. break;
  86. }
  87. msg.frame->metadata = meta;
  88. /* allocate a real frame in order to simulate "real" work */
  89. msg.frame->format = AV_PIX_FMT_RGBA;
  90. msg.frame->width = 320;
  91. msg.frame->height = 240;
  92. ret = av_frame_get_buffer(msg.frame, 0);
  93. if (ret < 0) {
  94. av_frame_free(&msg.frame);
  95. break;
  96. }
  97. /* push the frame in the common queue */
  98. av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n",
  99. wd->id, i + 1, wd->workload, msg.frame);
  100. ret = av_thread_message_queue_send(wd->queue, &msg, 0);
  101. if (ret < 0) {
  102. av_frame_free(&msg.frame);
  103. break;
  104. }
  105. }
  106. }
  107. av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n",
  108. wd->id, av_err2str(ret));
  109. av_thread_message_queue_set_err_recv(wd->queue, ret < 0 ? ret : AVERROR_EOF);
  110. return NULL;
  111. }
  112. static void *receiver_thread(void *arg)
  113. {
  114. int i, ret = 0;
  115. struct receiver_data *rd = arg;
  116. for (i = 0; i < rd->workload; i++) {
  117. if (rand() % rd->workload < rd->workload / 10) {
  118. av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue, "
  119. "discarding %d message(s)\n", rd->id,
  120. av_thread_message_queue_nb_elems(rd->queue));
  121. av_thread_message_flush(rd->queue);
  122. } else {
  123. struct message msg;
  124. AVDictionary *meta;
  125. AVDictionaryEntry *e;
  126. ret = av_thread_message_queue_recv(rd->queue, &msg, 0);
  127. if (ret < 0)
  128. break;
  129. av_assert0(msg.magic == MAGIC);
  130. meta = msg.frame->metadata;
  131. e = av_dict_get(meta, "sig", NULL, 0);
  132. av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame);
  133. av_frame_free(&msg.frame);
  134. }
  135. }
  136. av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i);
  137. av_thread_message_queue_set_err_send(rd->queue, ret < 0 ? ret : AVERROR_EOF);
  138. return NULL;
  139. }
  140. static int get_workload(int minv, int maxv)
  141. {
  142. return maxv == minv ? maxv : rand() % (maxv - minv) + minv;
  143. }
  144. int main(int ac, char **av)
  145. {
  146. int i, ret = 0;
  147. int max_queue_size;
  148. int nb_senders, sender_min_load, sender_max_load;
  149. int nb_receivers, receiver_min_load, receiver_max_load;
  150. struct sender_data *senders;
  151. struct receiver_data *receivers;
  152. AVThreadMessageQueue *queue = NULL;
  153. if (ac != 8) {
  154. av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> "
  155. "<nb_senders> <sender_min_send> <sender_max_send> "
  156. "<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]);
  157. return 1;
  158. }
  159. max_queue_size = atoi(av[1]);
  160. nb_senders = atoi(av[2]);
  161. sender_min_load = atoi(av[3]);
  162. sender_max_load = atoi(av[4]);
  163. nb_receivers = atoi(av[5]);
  164. receiver_min_load = atoi(av[6]);
  165. receiver_max_load = atoi(av[7]);
  166. if (max_queue_size <= 0 ||
  167. nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 ||
  168. nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) {
  169. av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n");
  170. return 1;
  171. }
  172. av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / "
  173. "%d receivers receiving [%d-%d]\n", max_queue_size,
  174. nb_senders, sender_min_load, sender_max_load,
  175. nb_receivers, receiver_min_load, receiver_max_load);
  176. senders = av_mallocz_array(nb_senders, sizeof(*senders));
  177. receivers = av_mallocz_array(nb_receivers, sizeof(*receivers));
  178. if (!senders || !receivers) {
  179. ret = AVERROR(ENOMEM);
  180. goto end;
  181. }
  182. ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message));
  183. if (ret < 0)
  184. goto end;
  185. av_thread_message_queue_set_free_func(queue, free_frame);
  186. #define SPAWN_THREADS(type) do { \
  187. for (i = 0; i < nb_##type##s; i++) { \
  188. struct type##_data *td = &type##s[i]; \
  189. \
  190. td->id = i; \
  191. td->queue = queue; \
  192. td->workload = get_workload(type##_min_load, type##_max_load); \
  193. \
  194. ret = pthread_create(&td->tid, NULL, type##_thread, td); \
  195. if (ret) { \
  196. const int err = AVERROR(ret); \
  197. av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type) \
  198. " thread: %s\n", av_err2str(err)); \
  199. goto end; \
  200. } \
  201. } \
  202. } while (0)
  203. #define WAIT_THREADS(type) do { \
  204. for (i = 0; i < nb_##type##s; i++) { \
  205. struct type##_data *td = &type##s[i]; \
  206. \
  207. ret = pthread_join(td->tid, NULL); \
  208. if (ret) { \
  209. const int err = AVERROR(ret); \
  210. av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type) \
  211. " thread: %s\n", av_err2str(err)); \
  212. goto end; \
  213. } \
  214. } \
  215. } while (0)
  216. SPAWN_THREADS(receiver);
  217. SPAWN_THREADS(sender);
  218. WAIT_THREADS(sender);
  219. WAIT_THREADS(receiver);
  220. end:
  221. av_thread_message_queue_free(&queue);
  222. av_freep(&senders);
  223. av_freep(&receivers);
  224. if (ret < 0 && ret != AVERROR_EOF) {
  225. av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret));
  226. return 1;
  227. }
  228. return 0;
  229. }