threadmessage.c 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. /*
  2. * Copyright (c) 2014 Nicolas George
  3. *
  4. * This file is part of FFmpeg.
  5. *
  6. * FFmpeg is free software; you can redistribute it and/or
  7. * modify it under the terms of the GNU Lesser General Public License
  8. * as published by the Free Software Foundation; either
  9. * version 2.1 of the License, or (at your option) any later version.
  10. *
  11. * FFmpeg is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU Lesser General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Lesser General Public License
  17. * along with FFmpeg; if not, write to the Free Software Foundation, Inc.,
  18. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  19. */
  20. #include "fifo.h"
  21. #include "threadmessage.h"
  22. #include "thread.h"
  23. struct AVThreadMessageQueue {
  24. #if HAVE_THREADS
  25. AVFifoBuffer *fifo;
  26. pthread_mutex_t lock;
  27. pthread_cond_t cond_recv;
  28. pthread_cond_t cond_send;
  29. int err_send;
  30. int err_recv;
  31. unsigned elsize;
  32. void (*free_func)(void *msg);
  33. #else
  34. int dummy;
  35. #endif
  36. };
  37. int av_thread_message_queue_alloc(AVThreadMessageQueue **mq,
  38. unsigned nelem,
  39. unsigned elsize)
  40. {
  41. #if HAVE_THREADS
  42. AVThreadMessageQueue *rmq;
  43. int ret = 0;
  44. if (nelem > INT_MAX / elsize)
  45. return AVERROR(EINVAL);
  46. if (!(rmq = av_mallocz(sizeof(*rmq))))
  47. return AVERROR(ENOMEM);
  48. if ((ret = pthread_mutex_init(&rmq->lock, NULL))) {
  49. av_free(rmq);
  50. return AVERROR(ret);
  51. }
  52. if ((ret = pthread_cond_init(&rmq->cond_recv, NULL))) {
  53. pthread_mutex_destroy(&rmq->lock);
  54. av_free(rmq);
  55. return AVERROR(ret);
  56. }
  57. if ((ret = pthread_cond_init(&rmq->cond_send, NULL))) {
  58. pthread_cond_destroy(&rmq->cond_recv);
  59. pthread_mutex_destroy(&rmq->lock);
  60. av_free(rmq);
  61. return AVERROR(ret);
  62. }
  63. if (!(rmq->fifo = av_fifo_alloc(elsize * nelem))) {
  64. pthread_cond_destroy(&rmq->cond_send);
  65. pthread_cond_destroy(&rmq->cond_recv);
  66. pthread_mutex_destroy(&rmq->lock);
  67. av_free(rmq);
  68. return AVERROR(ENOMEM);
  69. }
  70. rmq->elsize = elsize;
  71. *mq = rmq;
  72. return 0;
  73. #else
  74. *mq = NULL;
  75. return AVERROR(ENOSYS);
  76. #endif /* HAVE_THREADS */
  77. }
  78. void av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq,
  79. void (*free_func)(void *msg))
  80. {
  81. #if HAVE_THREADS
  82. mq->free_func = free_func;
  83. #endif
  84. }
  85. void av_thread_message_queue_free(AVThreadMessageQueue **mq)
  86. {
  87. #if HAVE_THREADS
  88. if (*mq) {
  89. av_thread_message_flush(*mq);
  90. av_fifo_freep(&(*mq)->fifo);
  91. pthread_cond_destroy(&(*mq)->cond_send);
  92. pthread_cond_destroy(&(*mq)->cond_recv);
  93. pthread_mutex_destroy(&(*mq)->lock);
  94. av_freep(mq);
  95. }
  96. #endif
  97. }
  98. #if HAVE_THREADS
  99. static int av_thread_message_queue_send_locked(AVThreadMessageQueue *mq,
  100. void *msg,
  101. unsigned flags)
  102. {
  103. while (!mq->err_send && av_fifo_space(mq->fifo) < mq->elsize) {
  104. if ((flags & AV_THREAD_MESSAGE_NONBLOCK))
  105. return AVERROR(EAGAIN);
  106. pthread_cond_wait(&mq->cond_send, &mq->lock);
  107. }
  108. if (mq->err_send)
  109. return mq->err_send;
  110. av_fifo_generic_write(mq->fifo, msg, mq->elsize, NULL);
  111. /* one message is sent, signal one receiver */
  112. pthread_cond_signal(&mq->cond_recv);
  113. return 0;
  114. }
  115. static int av_thread_message_queue_recv_locked(AVThreadMessageQueue *mq,
  116. void *msg,
  117. unsigned flags)
  118. {
  119. while (!mq->err_recv && av_fifo_size(mq->fifo) < mq->elsize) {
  120. if ((flags & AV_THREAD_MESSAGE_NONBLOCK))
  121. return AVERROR(EAGAIN);
  122. pthread_cond_wait(&mq->cond_recv, &mq->lock);
  123. }
  124. if (av_fifo_size(mq->fifo) < mq->elsize)
  125. return mq->err_recv;
  126. av_fifo_generic_read(mq->fifo, msg, mq->elsize, NULL);
  127. /* one message space appeared, signal one sender */
  128. pthread_cond_signal(&mq->cond_send);
  129. return 0;
  130. }
  131. #endif /* HAVE_THREADS */
  132. int av_thread_message_queue_send(AVThreadMessageQueue *mq,
  133. void *msg,
  134. unsigned flags)
  135. {
  136. #if HAVE_THREADS
  137. int ret;
  138. pthread_mutex_lock(&mq->lock);
  139. ret = av_thread_message_queue_send_locked(mq, msg, flags);
  140. pthread_mutex_unlock(&mq->lock);
  141. return ret;
  142. #else
  143. return AVERROR(ENOSYS);
  144. #endif /* HAVE_THREADS */
  145. }
  146. int av_thread_message_queue_recv(AVThreadMessageQueue *mq,
  147. void *msg,
  148. unsigned flags)
  149. {
  150. #if HAVE_THREADS
  151. int ret;
  152. pthread_mutex_lock(&mq->lock);
  153. ret = av_thread_message_queue_recv_locked(mq, msg, flags);
  154. pthread_mutex_unlock(&mq->lock);
  155. return ret;
  156. #else
  157. return AVERROR(ENOSYS);
  158. #endif /* HAVE_THREADS */
  159. }
  160. void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq,
  161. int err)
  162. {
  163. #if HAVE_THREADS
  164. pthread_mutex_lock(&mq->lock);
  165. mq->err_send = err;
  166. pthread_cond_broadcast(&mq->cond_send);
  167. pthread_mutex_unlock(&mq->lock);
  168. #endif /* HAVE_THREADS */
  169. }
  170. void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq,
  171. int err)
  172. {
  173. #if HAVE_THREADS
  174. pthread_mutex_lock(&mq->lock);
  175. mq->err_recv = err;
  176. pthread_cond_broadcast(&mq->cond_recv);
  177. pthread_mutex_unlock(&mq->lock);
  178. #endif /* HAVE_THREADS */
  179. }
  180. #if HAVE_THREADS
  181. static void free_func_wrap(void *arg, void *msg, int size)
  182. {
  183. AVThreadMessageQueue *mq = arg;
  184. mq->free_func(msg);
  185. }
  186. #endif
  187. void av_thread_message_flush(AVThreadMessageQueue *mq)
  188. {
  189. #if HAVE_THREADS
  190. int used, off;
  191. void *free_func = mq->free_func;
  192. pthread_mutex_lock(&mq->lock);
  193. used = av_fifo_size(mq->fifo);
  194. if (free_func)
  195. for (off = 0; off < used; off += mq->elsize)
  196. av_fifo_generic_peek_at(mq->fifo, mq, off, mq->elsize, free_func_wrap);
  197. av_fifo_drain(mq->fifo, used);
  198. /* only the senders need to be notified since the queue is empty and there
  199. * is nothing to read */
  200. pthread_cond_broadcast(&mq->cond_send);
  201. pthread_mutex_unlock(&mq->lock);
  202. #endif /* HAVE_THREADS */
  203. }