sync_queue.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. /*
  2. * This file is part of FFmpeg.
  3. *
  4. * FFmpeg is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU Lesser General Public
  6. * License as published by the Free Software Foundation; either
  7. * version 2.1 of the License, or (at your option) any later version.
  8. *
  9. * FFmpeg is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. * Lesser General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU Lesser General Public
  15. * License along with FFmpeg; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. */
  18. #include <stdint.h>
  19. #include <string.h>
  20. #include "libavutil/avassert.h"
  21. #include "libavutil/error.h"
  22. #include "libavutil/fifo.h"
  23. #include "libavutil/mathematics.h"
  24. #include "libavutil/mem.h"
  25. #include "objpool.h"
  26. #include "sync_queue.h"
  27. typedef struct SyncQueueStream {
  28. AVFifo *fifo;
  29. AVRational tb;
  30. /* stream head: largest timestamp seen */
  31. int64_t head_ts;
  32. /* no more frames will be sent for this stream */
  33. int finished;
  34. } SyncQueueStream;
  35. struct SyncQueue {
  36. enum SyncQueueType type;
  37. /* no more frames will be sent for any stream */
  38. int finished;
  39. /* sync head: the stream with the _smallest_ head timestamp
  40. * this stream determines which frames can be output */
  41. int head_stream;
  42. /* the finished stream with the smallest finish timestamp or -1 */
  43. int head_finished_stream;
  44. // maximum buffering duration in microseconds
  45. int64_t buf_size_us;
  46. SyncQueueStream *streams;
  47. unsigned int nb_streams;
  48. // pool of preallocated frames to avoid constant allocations
  49. ObjPool *pool;
  50. };
  51. static void frame_move(const SyncQueue *sq, SyncQueueFrame dst,
  52. SyncQueueFrame src)
  53. {
  54. if (sq->type == SYNC_QUEUE_PACKETS)
  55. av_packet_move_ref(dst.p, src.p);
  56. else
  57. av_frame_move_ref(dst.f, src.f);
  58. }
  59. static int64_t frame_ts(const SyncQueue *sq, SyncQueueFrame frame)
  60. {
  61. return (sq->type == SYNC_QUEUE_PACKETS) ?
  62. frame.p->pts + frame.p->duration :
  63. frame.f->pts + frame.f->pkt_duration;
  64. }
  65. static int frame_null(const SyncQueue *sq, SyncQueueFrame frame)
  66. {
  67. return (sq->type == SYNC_QUEUE_PACKETS) ? (frame.p == NULL) : (frame.f == NULL);
  68. }
  69. static void finish_stream(SyncQueue *sq, unsigned int stream_idx)
  70. {
  71. SyncQueueStream *st = &sq->streams[stream_idx];
  72. st->finished = 1;
  73. if (st->head_ts != AV_NOPTS_VALUE) {
  74. /* check if this stream is the new finished head */
  75. if (sq->head_finished_stream < 0 ||
  76. av_compare_ts(st->head_ts, st->tb,
  77. sq->streams[sq->head_finished_stream].head_ts,
  78. sq->streams[sq->head_finished_stream].tb) < 0) {
  79. sq->head_finished_stream = stream_idx;
  80. }
  81. /* mark as finished all streams that should no longer receive new frames,
  82. * due to them being ahead of some finished stream */
  83. st = &sq->streams[sq->head_finished_stream];
  84. for (unsigned int i = 0; i < sq->nb_streams; i++) {
  85. SyncQueueStream *st1 = &sq->streams[i];
  86. if (st != st1 && st1->head_ts != AV_NOPTS_VALUE &&
  87. av_compare_ts(st->head_ts, st->tb, st1->head_ts, st1->tb) <= 0)
  88. st1->finished = 1;
  89. }
  90. }
  91. /* mark the whole queue as finished if all streams are finished */
  92. for (unsigned int i = 0; i < sq->nb_streams; i++) {
  93. if (!sq->streams[i].finished)
  94. return;
  95. }
  96. sq->finished = 1;
  97. }
  98. static void queue_head_update(SyncQueue *sq)
  99. {
  100. if (sq->head_stream < 0) {
  101. /* wait for one timestamp in each stream before determining
  102. * the queue head */
  103. for (unsigned int i = 0; i < sq->nb_streams; i++) {
  104. SyncQueueStream *st = &sq->streams[i];
  105. if (st->head_ts == AV_NOPTS_VALUE)
  106. return;
  107. }
  108. // placeholder value, correct one will be found below
  109. sq->head_stream = 0;
  110. }
  111. for (unsigned int i = 0; i < sq->nb_streams; i++) {
  112. SyncQueueStream *st_head = &sq->streams[sq->head_stream];
  113. SyncQueueStream *st_other = &sq->streams[i];
  114. if (st_other->head_ts != AV_NOPTS_VALUE &&
  115. av_compare_ts(st_other->head_ts, st_other->tb,
  116. st_head->head_ts, st_head->tb) < 0)
  117. sq->head_stream = i;
  118. }
  119. }
  120. /* update this stream's head timestamp */
  121. static void stream_update_ts(SyncQueue *sq, unsigned int stream_idx, int64_t ts)
  122. {
  123. SyncQueueStream *st = &sq->streams[stream_idx];
  124. if (ts == AV_NOPTS_VALUE ||
  125. (st->head_ts != AV_NOPTS_VALUE && st->head_ts >= ts))
  126. return;
  127. st->head_ts = ts;
  128. /* if this stream is now ahead of some finished stream, then
  129. * this stream is also finished */
  130. if (sq->head_finished_stream >= 0 &&
  131. av_compare_ts(sq->streams[sq->head_finished_stream].head_ts,
  132. sq->streams[sq->head_finished_stream].tb,
  133. ts, st->tb) <= 0)
  134. finish_stream(sq, stream_idx);
  135. /* update the overall head timestamp if it could have changed */
  136. if (sq->head_stream < 0 || sq->head_stream == stream_idx)
  137. queue_head_update(sq);
  138. }
  139. /* If the queue for the given stream (or all streams when stream_idx=-1)
  140. * is overflowing, trigger a fake heartbeat on lagging streams.
  141. *
  142. * @return 1 if heartbeat triggered, 0 otherwise
  143. */
  144. static int overflow_heartbeat(SyncQueue *sq, int stream_idx)
  145. {
  146. SyncQueueStream *st;
  147. SyncQueueFrame frame;
  148. int64_t tail_ts = AV_NOPTS_VALUE;
  149. /* if no stream specified, pick the one that is most ahead */
  150. if (stream_idx < 0) {
  151. int64_t ts = AV_NOPTS_VALUE;
  152. for (int i = 0; i < sq->nb_streams; i++) {
  153. st = &sq->streams[i];
  154. if (st->head_ts != AV_NOPTS_VALUE &&
  155. (ts == AV_NOPTS_VALUE ||
  156. av_compare_ts(ts, sq->streams[stream_idx].tb,
  157. st->head_ts, st->tb) < 0)) {
  158. ts = st->head_ts;
  159. stream_idx = i;
  160. }
  161. }
  162. /* no stream has a timestamp yet -> nothing to do */
  163. if (stream_idx < 0)
  164. return 0;
  165. }
  166. st = &sq->streams[stream_idx];
  167. /* get the chosen stream's tail timestamp */
  168. for (size_t i = 0; tail_ts == AV_NOPTS_VALUE &&
  169. av_fifo_peek(st->fifo, &frame, 1, i) >= 0; i++)
  170. tail_ts = frame_ts(sq, frame);
  171. /* overflow triggers when the tail is over specified duration behind the head */
  172. if (tail_ts == AV_NOPTS_VALUE || tail_ts >= st->head_ts ||
  173. av_rescale_q(st->head_ts - tail_ts, st->tb, AV_TIME_BASE_Q) < sq->buf_size_us)
  174. return 0;
  175. /* signal a fake timestamp for all streams that prevent tail_ts from being output */
  176. tail_ts++;
  177. for (unsigned int i = 0; i < sq->nb_streams; i++) {
  178. SyncQueueStream *st1 = &sq->streams[i];
  179. int64_t ts;
  180. if (st == st1 || st1->finished ||
  181. (st1->head_ts != AV_NOPTS_VALUE &&
  182. av_compare_ts(tail_ts, st->tb, st1->head_ts, st1->tb) <= 0))
  183. continue;
  184. ts = av_rescale_q(tail_ts, st->tb, st1->tb);
  185. if (st1->head_ts != AV_NOPTS_VALUE)
  186. ts = FFMAX(st1->head_ts + 1, ts);
  187. stream_update_ts(sq, i, ts);
  188. }
  189. return 1;
  190. }
  191. int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
  192. {
  193. SyncQueueStream *st;
  194. SyncQueueFrame dst;
  195. int64_t ts;
  196. int ret;
  197. av_assert0(stream_idx < sq->nb_streams);
  198. st = &sq->streams[stream_idx];
  199. av_assert0(st->tb.num > 0 && st->tb.den > 0);
  200. if (frame_null(sq, frame)) {
  201. finish_stream(sq, stream_idx);
  202. return 0;
  203. }
  204. if (st->finished)
  205. return AVERROR_EOF;
  206. ret = objpool_get(sq->pool, (void**)&dst);
  207. if (ret < 0)
  208. return ret;
  209. frame_move(sq, dst, frame);
  210. ts = frame_ts(sq, dst);
  211. ret = av_fifo_write(st->fifo, &dst, 1);
  212. if (ret < 0) {
  213. frame_move(sq, frame, dst);
  214. objpool_release(sq->pool, (void**)&dst);
  215. return ret;
  216. }
  217. stream_update_ts(sq, stream_idx, ts);
  218. return 0;
  219. }
  220. static int receive_for_stream(SyncQueue *sq, unsigned int stream_idx,
  221. SyncQueueFrame frame)
  222. {
  223. SyncQueueStream *st_head = sq->head_stream >= 0 ?
  224. &sq->streams[sq->head_stream] : NULL;
  225. SyncQueueStream *st;
  226. av_assert0(stream_idx < sq->nb_streams);
  227. st = &sq->streams[stream_idx];
  228. if (av_fifo_can_read(st->fifo)) {
  229. SyncQueueFrame peek;
  230. int64_t ts;
  231. int cmp = 1;
  232. av_fifo_peek(st->fifo, &peek, 1, 0);
  233. ts = frame_ts(sq, peek);
  234. /* check if this stream's tail timestamp does not overtake
  235. * the overall queue head */
  236. if (ts != AV_NOPTS_VALUE && st_head)
  237. cmp = av_compare_ts(ts, st->tb, st_head->head_ts, st_head->tb);
  238. /* We can release frames that do not end after the queue head.
  239. * Frames with no timestamps are just passed through with no conditions.
  240. */
  241. if (cmp <= 0 || ts == AV_NOPTS_VALUE) {
  242. frame_move(sq, frame, peek);
  243. objpool_release(sq->pool, (void**)&peek);
  244. av_fifo_drain2(st->fifo, 1);
  245. return 0;
  246. }
  247. }
  248. return (sq->finished || (st->finished && !av_fifo_can_read(st->fifo))) ?
  249. AVERROR_EOF : AVERROR(EAGAIN);
  250. }
  251. static int receive_internal(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
  252. {
  253. int nb_eof = 0;
  254. int ret;
  255. /* read a frame for a specific stream */
  256. if (stream_idx >= 0) {
  257. ret = receive_for_stream(sq, stream_idx, frame);
  258. return (ret < 0) ? ret : stream_idx;
  259. }
  260. /* read a frame for any stream with available output */
  261. for (unsigned int i = 0; i < sq->nb_streams; i++) {
  262. ret = receive_for_stream(sq, i, frame);
  263. if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) {
  264. nb_eof += (ret == AVERROR_EOF);
  265. continue;
  266. }
  267. return (ret < 0) ? ret : i;
  268. }
  269. return (nb_eof == sq->nb_streams) ? AVERROR_EOF : AVERROR(EAGAIN);
  270. }
  271. int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
  272. {
  273. int ret = receive_internal(sq, stream_idx, frame);
  274. /* try again if the queue overflowed and triggered a fake heartbeat
  275. * for lagging streams */
  276. if (ret == AVERROR(EAGAIN) && overflow_heartbeat(sq, stream_idx))
  277. ret = receive_internal(sq, stream_idx, frame);
  278. return ret;
  279. }
  280. int sq_add_stream(SyncQueue *sq)
  281. {
  282. SyncQueueStream *tmp, *st;
  283. tmp = av_realloc_array(sq->streams, sq->nb_streams + 1, sizeof(*sq->streams));
  284. if (!tmp)
  285. return AVERROR(ENOMEM);
  286. sq->streams = tmp;
  287. st = &sq->streams[sq->nb_streams];
  288. memset(st, 0, sizeof(*st));
  289. st->fifo = av_fifo_alloc2(1, sizeof(SyncQueueFrame), AV_FIFO_FLAG_AUTO_GROW);
  290. if (!st->fifo)
  291. return AVERROR(ENOMEM);
  292. /* we set a valid default, so that a pathological stream that never
  293. * receives even a real timebase (and no frames) won't stall all other
  294. * streams forever; cf. overflow_heartbeat() */
  295. st->tb = (AVRational){ 1, 1 };
  296. st->head_ts = AV_NOPTS_VALUE;
  297. return sq->nb_streams++;
  298. }
  299. void sq_set_tb(SyncQueue *sq, unsigned int stream_idx, AVRational tb)
  300. {
  301. SyncQueueStream *st;
  302. av_assert0(stream_idx < sq->nb_streams);
  303. st = &sq->streams[stream_idx];
  304. av_assert0(!av_fifo_can_read(st->fifo));
  305. if (st->head_ts != AV_NOPTS_VALUE)
  306. st->head_ts = av_rescale_q(st->head_ts, st->tb, tb);
  307. st->tb = tb;
  308. }
  309. SyncQueue *sq_alloc(enum SyncQueueType type, int64_t buf_size_us)
  310. {
  311. SyncQueue *sq = av_mallocz(sizeof(*sq));
  312. if (!sq)
  313. return NULL;
  314. sq->type = type;
  315. sq->buf_size_us = buf_size_us;
  316. sq->head_stream = -1;
  317. sq->head_finished_stream = -1;
  318. sq->pool = (type == SYNC_QUEUE_PACKETS) ? objpool_alloc_packets() :
  319. objpool_alloc_frames();
  320. if (!sq->pool) {
  321. av_freep(&sq);
  322. return NULL;
  323. }
  324. return sq;
  325. }
  326. void sq_free(SyncQueue **psq)
  327. {
  328. SyncQueue *sq = *psq;
  329. if (!sq)
  330. return;
  331. for (unsigned int i = 0; i < sq->nb_streams; i++) {
  332. SyncQueueFrame frame;
  333. while (av_fifo_read(sq->streams[i].fifo, &frame, 1) >= 0)
  334. objpool_release(sq->pool, (void**)&frame);
  335. av_fifo_freep2(&sq->streams[i].fifo);
  336. }
  337. av_freep(&sq->streams);
  338. objpool_free(&sq->pool);
  339. av_freep(psq);
  340. }