threadmessage.c 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. /*
  2. * Copyright (c) 2014 Nicolas George
  3. *
  4. * This file is part of FFmpeg.
  5. *
  6. * FFmpeg is free software; you can redistribute it and/or
  7. * modify it under the terms of the GNU Lesser General Public License
  8. * as published by the Free Software Foundation; either
  9. * version 2.1 of the License, or (at your option) any later version.
  10. *
  11. * FFmpeg is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU Lesser General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Lesser General Public License
  17. * along with FFmpeg; if not, write to the Free Software Foundation, Inc.,
  18. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  19. */
  20. #include "fifo.h"
  21. #include "threadmessage.h"
  22. #include "thread.h"
  23. struct AVThreadMessageQueue {
  24. #if HAVE_THREADS
  25. AVFifoBuffer *fifo;
  26. pthread_mutex_t lock;
  27. pthread_cond_t cond_recv;
  28. pthread_cond_t cond_send;
  29. int err_send;
  30. int err_recv;
  31. unsigned elsize;
  32. void (*free_func)(void *msg);
  33. #else
  34. int dummy;
  35. #endif
  36. };
  37. int av_thread_message_queue_alloc(AVThreadMessageQueue **mq,
  38. unsigned nelem,
  39. unsigned elsize)
  40. {
  41. #if HAVE_THREADS
  42. AVThreadMessageQueue *rmq;
  43. int ret = 0;
  44. if (nelem > INT_MAX / elsize)
  45. return AVERROR(EINVAL);
  46. if (!(rmq = av_mallocz(sizeof(*rmq))))
  47. return AVERROR(ENOMEM);
  48. if ((ret = pthread_mutex_init(&rmq->lock, NULL))) {
  49. av_free(rmq);
  50. return AVERROR(ret);
  51. }
  52. if ((ret = pthread_cond_init(&rmq->cond_recv, NULL))) {
  53. pthread_mutex_destroy(&rmq->lock);
  54. av_free(rmq);
  55. return AVERROR(ret);
  56. }
  57. if ((ret = pthread_cond_init(&rmq->cond_send, NULL))) {
  58. pthread_cond_destroy(&rmq->cond_recv);
  59. pthread_mutex_destroy(&rmq->lock);
  60. av_free(rmq);
  61. return AVERROR(ret);
  62. }
  63. if (!(rmq->fifo = av_fifo_alloc(elsize * nelem))) {
  64. pthread_cond_destroy(&rmq->cond_send);
  65. pthread_cond_destroy(&rmq->cond_recv);
  66. pthread_mutex_destroy(&rmq->lock);
  67. av_free(rmq);
  68. return AVERROR(ENOMEM);
  69. }
  70. rmq->elsize = elsize;
  71. *mq = rmq;
  72. return 0;
  73. #else
  74. *mq = NULL;
  75. return AVERROR(ENOSYS);
  76. #endif /* HAVE_THREADS */
  77. }
  78. void av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq,
  79. void (*free_func)(void *msg))
  80. {
  81. #if HAVE_THREADS
  82. mq->free_func = free_func;
  83. #endif
  84. }
  85. void av_thread_message_queue_free(AVThreadMessageQueue **mq)
  86. {
  87. #if HAVE_THREADS
  88. if (*mq) {
  89. av_thread_message_flush(*mq);
  90. av_fifo_freep(&(*mq)->fifo);
  91. pthread_cond_destroy(&(*mq)->cond_send);
  92. pthread_cond_destroy(&(*mq)->cond_recv);
  93. pthread_mutex_destroy(&(*mq)->lock);
  94. av_freep(mq);
  95. }
  96. #endif
  97. }
  98. int av_thread_message_queue_nb_elems(AVThreadMessageQueue *mq)
  99. {
  100. #if HAVE_THREADS
  101. int ret;
  102. pthread_mutex_lock(&mq->lock);
  103. ret = av_fifo_size(mq->fifo);
  104. pthread_mutex_unlock(&mq->lock);
  105. return ret / mq->elsize;
  106. #else
  107. return AVERROR(ENOSYS);
  108. #endif
  109. }
  110. #if HAVE_THREADS
  111. static int av_thread_message_queue_send_locked(AVThreadMessageQueue *mq,
  112. void *msg,
  113. unsigned flags)
  114. {
  115. while (!mq->err_send && av_fifo_space(mq->fifo) < mq->elsize) {
  116. if ((flags & AV_THREAD_MESSAGE_NONBLOCK))
  117. return AVERROR(EAGAIN);
  118. pthread_cond_wait(&mq->cond_send, &mq->lock);
  119. }
  120. if (mq->err_send)
  121. return mq->err_send;
  122. av_fifo_generic_write(mq->fifo, msg, mq->elsize, NULL);
  123. /* one message is sent, signal one receiver */
  124. pthread_cond_signal(&mq->cond_recv);
  125. return 0;
  126. }
  127. static int av_thread_message_queue_recv_locked(AVThreadMessageQueue *mq,
  128. void *msg,
  129. unsigned flags)
  130. {
  131. while (!mq->err_recv && av_fifo_size(mq->fifo) < mq->elsize) {
  132. if ((flags & AV_THREAD_MESSAGE_NONBLOCK))
  133. return AVERROR(EAGAIN);
  134. pthread_cond_wait(&mq->cond_recv, &mq->lock);
  135. }
  136. if (av_fifo_size(mq->fifo) < mq->elsize)
  137. return mq->err_recv;
  138. av_fifo_generic_read(mq->fifo, msg, mq->elsize, NULL);
  139. /* one message space appeared, signal one sender */
  140. pthread_cond_signal(&mq->cond_send);
  141. return 0;
  142. }
  143. #endif /* HAVE_THREADS */
  144. int av_thread_message_queue_send(AVThreadMessageQueue *mq,
  145. void *msg,
  146. unsigned flags)
  147. {
  148. #if HAVE_THREADS
  149. int ret;
  150. pthread_mutex_lock(&mq->lock);
  151. ret = av_thread_message_queue_send_locked(mq, msg, flags);
  152. pthread_mutex_unlock(&mq->lock);
  153. return ret;
  154. #else
  155. return AVERROR(ENOSYS);
  156. #endif /* HAVE_THREADS */
  157. }
  158. int av_thread_message_queue_recv(AVThreadMessageQueue *mq,
  159. void *msg,
  160. unsigned flags)
  161. {
  162. #if HAVE_THREADS
  163. int ret;
  164. pthread_mutex_lock(&mq->lock);
  165. ret = av_thread_message_queue_recv_locked(mq, msg, flags);
  166. pthread_mutex_unlock(&mq->lock);
  167. return ret;
  168. #else
  169. return AVERROR(ENOSYS);
  170. #endif /* HAVE_THREADS */
  171. }
  172. void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq,
  173. int err)
  174. {
  175. #if HAVE_THREADS
  176. pthread_mutex_lock(&mq->lock);
  177. mq->err_send = err;
  178. pthread_cond_broadcast(&mq->cond_send);
  179. pthread_mutex_unlock(&mq->lock);
  180. #endif /* HAVE_THREADS */
  181. }
  182. void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq,
  183. int err)
  184. {
  185. #if HAVE_THREADS
  186. pthread_mutex_lock(&mq->lock);
  187. mq->err_recv = err;
  188. pthread_cond_broadcast(&mq->cond_recv);
  189. pthread_mutex_unlock(&mq->lock);
  190. #endif /* HAVE_THREADS */
  191. }
  192. #if HAVE_THREADS
  193. static void free_func_wrap(void *arg, void *msg, int size)
  194. {
  195. AVThreadMessageQueue *mq = arg;
  196. mq->free_func(msg);
  197. }
  198. #endif
  199. void av_thread_message_flush(AVThreadMessageQueue *mq)
  200. {
  201. #if HAVE_THREADS
  202. int used, off;
  203. void *free_func = mq->free_func;
  204. pthread_mutex_lock(&mq->lock);
  205. used = av_fifo_size(mq->fifo);
  206. if (free_func)
  207. for (off = 0; off < used; off += mq->elsize)
  208. av_fifo_generic_peek_at(mq->fifo, mq, off, mq->elsize, free_func_wrap);
  209. av_fifo_drain(mq->fifo, used);
  210. /* only the senders need to be notified since the queue is empty and there
  211. * is nothing to read */
  212. pthread_cond_broadcast(&mq->cond_send);
  213. pthread_mutex_unlock(&mq->lock);
  214. #endif /* HAVE_THREADS */
  215. }