threadmessage.c 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. /*
  2. * Copyright (c) 2014 Nicolas George
  3. *
  4. * This file is part of FFmpeg.
  5. *
  6. * FFmpeg is free software; you can redistribute it and/or
  7. * modify it under the terms of the GNU Lesser General Public License
  8. * as published by the Free Software Foundation; either
  9. * version 2.1 of the License, or (at your option) any later version.
  10. *
  11. * FFmpeg is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU Lesser General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Lesser General Public License
  17. * along with FFmpeg; if not, write to the Free Software Foundation, Inc.,
  18. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  19. */
  20. #include "fifo.h"
  21. #include "threadmessage.h"
  22. #if HAVE_THREADS
  23. #if HAVE_PTHREADS
  24. #include <pthread.h>
  25. #elif HAVE_W32THREADS
  26. #include "compat/w32pthreads.h"
  27. #elif HAVE_OS2THREADS
  28. #include "compat/os2threads.h"
  29. #else
  30. #error "Unknown threads implementation"
  31. #endif
  32. #endif
  33. struct AVThreadMessageQueue {
  34. #if HAVE_THREADS
  35. AVFifoBuffer *fifo;
  36. pthread_mutex_t lock;
  37. pthread_cond_t cond;
  38. int err_send;
  39. int err_recv;
  40. unsigned elsize;
  41. #else
  42. int dummy;
  43. #endif
  44. };
  45. int av_thread_message_queue_alloc(AVThreadMessageQueue **mq,
  46. unsigned nelem,
  47. unsigned elsize)
  48. {
  49. #if HAVE_THREADS
  50. AVThreadMessageQueue *rmq;
  51. int ret = 0;
  52. if (nelem > INT_MAX / elsize)
  53. return AVERROR(EINVAL);
  54. if (!(rmq = av_mallocz(sizeof(*rmq))))
  55. return AVERROR(ENOMEM);
  56. if ((ret = pthread_mutex_init(&rmq->lock, NULL))) {
  57. av_free(rmq);
  58. return AVERROR(ret);
  59. }
  60. if ((ret = pthread_cond_init(&rmq->cond, NULL))) {
  61. pthread_mutex_destroy(&rmq->lock);
  62. av_free(rmq);
  63. return AVERROR(ret);
  64. }
  65. if (!(rmq->fifo = av_fifo_alloc(elsize * nelem))) {
  66. pthread_cond_destroy(&rmq->cond);
  67. pthread_mutex_destroy(&rmq->lock);
  68. av_free(rmq);
  69. return AVERROR(ret);
  70. }
  71. rmq->elsize = elsize;
  72. *mq = rmq;
  73. return 0;
  74. #else
  75. *mq = NULL;
  76. return AVERROR(ENOSYS);
  77. #endif /* HAVE_THREADS */
  78. }
  79. void av_thread_message_queue_free(AVThreadMessageQueue **mq)
  80. {
  81. #if HAVE_THREADS
  82. if (*mq) {
  83. av_fifo_freep(&(*mq)->fifo);
  84. pthread_cond_destroy(&(*mq)->cond);
  85. pthread_mutex_destroy(&(*mq)->lock);
  86. av_freep(mq);
  87. }
  88. #endif
  89. }
  90. #if HAVE_THREADS
  91. static int av_thread_message_queue_send_locked(AVThreadMessageQueue *mq,
  92. void *msg,
  93. unsigned flags)
  94. {
  95. while (!mq->err_send && av_fifo_space(mq->fifo) < mq->elsize) {
  96. if ((flags & AV_THREAD_MESSAGE_NONBLOCK))
  97. return AVERROR(EAGAIN);
  98. pthread_cond_wait(&mq->cond, &mq->lock);
  99. }
  100. if (mq->err_send)
  101. return mq->err_send;
  102. av_fifo_generic_write(mq->fifo, msg, mq->elsize, NULL);
  103. pthread_cond_signal(&mq->cond);
  104. return 0;
  105. }
  106. static int av_thread_message_queue_recv_locked(AVThreadMessageQueue *mq,
  107. void *msg,
  108. unsigned flags)
  109. {
  110. while (!mq->err_recv && av_fifo_size(mq->fifo) < mq->elsize) {
  111. if ((flags & AV_THREAD_MESSAGE_NONBLOCK))
  112. return AVERROR(EAGAIN);
  113. pthread_cond_wait(&mq->cond, &mq->lock);
  114. }
  115. if (av_fifo_size(mq->fifo) < mq->elsize)
  116. return mq->err_recv;
  117. av_fifo_generic_read(mq->fifo, msg, mq->elsize, NULL);
  118. pthread_cond_signal(&mq->cond);
  119. return 0;
  120. }
  121. #endif /* HAVE_THREADS */
  122. int av_thread_message_queue_send(AVThreadMessageQueue *mq,
  123. void *msg,
  124. unsigned flags)
  125. {
  126. #if HAVE_THREADS
  127. int ret;
  128. pthread_mutex_lock(&mq->lock);
  129. ret = av_thread_message_queue_send_locked(mq, msg, flags);
  130. pthread_mutex_unlock(&mq->lock);
  131. return ret;
  132. #else
  133. return AVERROR(ENOSYS);
  134. #endif /* HAVE_THREADS */
  135. }
  136. int av_thread_message_queue_recv(AVThreadMessageQueue *mq,
  137. void *msg,
  138. unsigned flags)
  139. {
  140. #if HAVE_THREADS
  141. int ret;
  142. pthread_mutex_lock(&mq->lock);
  143. ret = av_thread_message_queue_recv_locked(mq, msg, flags);
  144. pthread_mutex_unlock(&mq->lock);
  145. return ret;
  146. #else
  147. return AVERROR(ENOSYS);
  148. #endif /* HAVE_THREADS */
  149. }
  150. void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq,
  151. int err)
  152. {
  153. #if HAVE_THREADS
  154. pthread_mutex_lock(&mq->lock);
  155. mq->err_send = err;
  156. pthread_cond_broadcast(&mq->cond);
  157. pthread_mutex_unlock(&mq->lock);
  158. #endif /* HAVE_THREADS */
  159. }
  160. void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq,
  161. int err)
  162. {
  163. #if HAVE_THREADS
  164. pthread_mutex_lock(&mq->lock);
  165. mq->err_recv = err;
  166. pthread_cond_broadcast(&mq->cond);
  167. pthread_mutex_unlock(&mq->lock);
  168. #endif /* HAVE_THREADS */
  169. }