sync_queue.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. /*
  2. * This file is part of FFmpeg.
  3. *
  4. * FFmpeg is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU Lesser General Public
  6. * License as published by the Free Software Foundation; either
  7. * version 2.1 of the License, or (at your option) any later version.
  8. *
  9. * FFmpeg is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. * Lesser General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU Lesser General Public
  15. * License along with FFmpeg; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. */
  18. #include <stdint.h>
  19. #include <string.h>
  20. #include "libavutil/avassert.h"
  21. #include "libavutil/error.h"
  22. #include "libavutil/fifo.h"
  23. #include "libavutil/mathematics.h"
  24. #include "libavutil/mem.h"
  25. #include "objpool.h"
  26. #include "sync_queue.h"
  27. typedef struct SyncQueueStream {
  28. AVFifo *fifo;
  29. AVRational tb;
  30. /* stream head: largest timestamp seen */
  31. int64_t head_ts;
  32. int limiting;
  33. /* no more frames will be sent for this stream */
  34. int finished;
  35. uint64_t frames_sent;
  36. uint64_t frames_max;
  37. } SyncQueueStream;
  38. struct SyncQueue {
  39. enum SyncQueueType type;
  40. /* no more frames will be sent for any stream */
  41. int finished;
  42. /* sync head: the stream with the _smallest_ head timestamp
  43. * this stream determines which frames can be output */
  44. int head_stream;
  45. /* the finished stream with the smallest finish timestamp or -1 */
  46. int head_finished_stream;
  47. // maximum buffering duration in microseconds
  48. int64_t buf_size_us;
  49. SyncQueueStream *streams;
  50. unsigned int nb_streams;
  51. // pool of preallocated frames to avoid constant allocations
  52. ObjPool *pool;
  53. };
  54. static void frame_move(const SyncQueue *sq, SyncQueueFrame dst,
  55. SyncQueueFrame src)
  56. {
  57. if (sq->type == SYNC_QUEUE_PACKETS)
  58. av_packet_move_ref(dst.p, src.p);
  59. else
  60. av_frame_move_ref(dst.f, src.f);
  61. }
  62. static int64_t frame_ts(const SyncQueue *sq, SyncQueueFrame frame)
  63. {
  64. return (sq->type == SYNC_QUEUE_PACKETS) ?
  65. frame.p->pts + frame.p->duration :
  66. frame.f->pts + frame.f->duration;
  67. }
  68. static int frame_null(const SyncQueue *sq, SyncQueueFrame frame)
  69. {
  70. return (sq->type == SYNC_QUEUE_PACKETS) ? (frame.p == NULL) : (frame.f == NULL);
  71. }
  72. static void finish_stream(SyncQueue *sq, unsigned int stream_idx)
  73. {
  74. SyncQueueStream *st = &sq->streams[stream_idx];
  75. st->finished = 1;
  76. if (st->limiting && st->head_ts != AV_NOPTS_VALUE) {
  77. /* check if this stream is the new finished head */
  78. if (sq->head_finished_stream < 0 ||
  79. av_compare_ts(st->head_ts, st->tb,
  80. sq->streams[sq->head_finished_stream].head_ts,
  81. sq->streams[sq->head_finished_stream].tb) < 0) {
  82. sq->head_finished_stream = stream_idx;
  83. }
  84. /* mark as finished all streams that should no longer receive new frames,
  85. * due to them being ahead of some finished stream */
  86. st = &sq->streams[sq->head_finished_stream];
  87. for (unsigned int i = 0; i < sq->nb_streams; i++) {
  88. SyncQueueStream *st1 = &sq->streams[i];
  89. if (st != st1 && st1->head_ts != AV_NOPTS_VALUE &&
  90. av_compare_ts(st->head_ts, st->tb, st1->head_ts, st1->tb) <= 0)
  91. st1->finished = 1;
  92. }
  93. }
  94. /* mark the whole queue as finished if all streams are finished */
  95. for (unsigned int i = 0; i < sq->nb_streams; i++) {
  96. if (!sq->streams[i].finished)
  97. return;
  98. }
  99. sq->finished = 1;
  100. }
  101. static void queue_head_update(SyncQueue *sq)
  102. {
  103. if (sq->head_stream < 0) {
  104. /* wait for one timestamp in each stream before determining
  105. * the queue head */
  106. for (unsigned int i = 0; i < sq->nb_streams; i++) {
  107. SyncQueueStream *st = &sq->streams[i];
  108. if (st->limiting && st->head_ts == AV_NOPTS_VALUE)
  109. return;
  110. }
  111. // placeholder value, correct one will be found below
  112. sq->head_stream = 0;
  113. }
  114. for (unsigned int i = 0; i < sq->nb_streams; i++) {
  115. SyncQueueStream *st_head = &sq->streams[sq->head_stream];
  116. SyncQueueStream *st_other = &sq->streams[i];
  117. if (st_other->limiting && st_other->head_ts != AV_NOPTS_VALUE &&
  118. av_compare_ts(st_other->head_ts, st_other->tb,
  119. st_head->head_ts, st_head->tb) < 0)
  120. sq->head_stream = i;
  121. }
  122. }
  123. /* update this stream's head timestamp */
  124. static void stream_update_ts(SyncQueue *sq, unsigned int stream_idx, int64_t ts)
  125. {
  126. SyncQueueStream *st = &sq->streams[stream_idx];
  127. if (ts == AV_NOPTS_VALUE ||
  128. (st->head_ts != AV_NOPTS_VALUE && st->head_ts >= ts))
  129. return;
  130. st->head_ts = ts;
  131. /* if this stream is now ahead of some finished stream, then
  132. * this stream is also finished */
  133. if (sq->head_finished_stream >= 0 &&
  134. av_compare_ts(sq->streams[sq->head_finished_stream].head_ts,
  135. sq->streams[sq->head_finished_stream].tb,
  136. ts, st->tb) <= 0)
  137. finish_stream(sq, stream_idx);
  138. /* update the overall head timestamp if it could have changed */
  139. if (st->limiting &&
  140. (sq->head_stream < 0 || sq->head_stream == stream_idx))
  141. queue_head_update(sq);
  142. }
  143. /* If the queue for the given stream (or all streams when stream_idx=-1)
  144. * is overflowing, trigger a fake heartbeat on lagging streams.
  145. *
  146. * @return 1 if heartbeat triggered, 0 otherwise
  147. */
  148. static int overflow_heartbeat(SyncQueue *sq, int stream_idx)
  149. {
  150. SyncQueueStream *st;
  151. SyncQueueFrame frame;
  152. int64_t tail_ts = AV_NOPTS_VALUE;
  153. /* if no stream specified, pick the one that is most ahead */
  154. if (stream_idx < 0) {
  155. int64_t ts = AV_NOPTS_VALUE;
  156. for (int i = 0; i < sq->nb_streams; i++) {
  157. st = &sq->streams[i];
  158. if (st->head_ts != AV_NOPTS_VALUE &&
  159. (ts == AV_NOPTS_VALUE ||
  160. av_compare_ts(ts, sq->streams[stream_idx].tb,
  161. st->head_ts, st->tb) < 0)) {
  162. ts = st->head_ts;
  163. stream_idx = i;
  164. }
  165. }
  166. /* no stream has a timestamp yet -> nothing to do */
  167. if (stream_idx < 0)
  168. return 0;
  169. }
  170. st = &sq->streams[stream_idx];
  171. /* get the chosen stream's tail timestamp */
  172. for (size_t i = 0; tail_ts == AV_NOPTS_VALUE &&
  173. av_fifo_peek(st->fifo, &frame, 1, i) >= 0; i++)
  174. tail_ts = frame_ts(sq, frame);
  175. /* overflow triggers when the tail is over specified duration behind the head */
  176. if (tail_ts == AV_NOPTS_VALUE || tail_ts >= st->head_ts ||
  177. av_rescale_q(st->head_ts - tail_ts, st->tb, AV_TIME_BASE_Q) < sq->buf_size_us)
  178. return 0;
  179. /* signal a fake timestamp for all streams that prevent tail_ts from being output */
  180. tail_ts++;
  181. for (unsigned int i = 0; i < sq->nb_streams; i++) {
  182. SyncQueueStream *st1 = &sq->streams[i];
  183. int64_t ts;
  184. if (st == st1 || st1->finished ||
  185. (st1->head_ts != AV_NOPTS_VALUE &&
  186. av_compare_ts(tail_ts, st->tb, st1->head_ts, st1->tb) <= 0))
  187. continue;
  188. ts = av_rescale_q(tail_ts, st->tb, st1->tb);
  189. if (st1->head_ts != AV_NOPTS_VALUE)
  190. ts = FFMAX(st1->head_ts + 1, ts);
  191. stream_update_ts(sq, i, ts);
  192. }
  193. return 1;
  194. }
  195. int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
  196. {
  197. SyncQueueStream *st;
  198. SyncQueueFrame dst;
  199. int64_t ts;
  200. int ret;
  201. av_assert0(stream_idx < sq->nb_streams);
  202. st = &sq->streams[stream_idx];
  203. av_assert0(st->tb.num > 0 && st->tb.den > 0);
  204. if (frame_null(sq, frame)) {
  205. finish_stream(sq, stream_idx);
  206. return 0;
  207. }
  208. if (st->finished)
  209. return AVERROR_EOF;
  210. ret = objpool_get(sq->pool, (void**)&dst);
  211. if (ret < 0)
  212. return ret;
  213. frame_move(sq, dst, frame);
  214. ts = frame_ts(sq, dst);
  215. ret = av_fifo_write(st->fifo, &dst, 1);
  216. if (ret < 0) {
  217. frame_move(sq, frame, dst);
  218. objpool_release(sq->pool, (void**)&dst);
  219. return ret;
  220. }
  221. stream_update_ts(sq, stream_idx, ts);
  222. st->frames_sent++;
  223. if (st->frames_sent >= st->frames_max)
  224. finish_stream(sq, stream_idx);
  225. return 0;
  226. }
  227. static int receive_for_stream(SyncQueue *sq, unsigned int stream_idx,
  228. SyncQueueFrame frame)
  229. {
  230. SyncQueueStream *st_head = sq->head_stream >= 0 ?
  231. &sq->streams[sq->head_stream] : NULL;
  232. SyncQueueStream *st;
  233. av_assert0(stream_idx < sq->nb_streams);
  234. st = &sq->streams[stream_idx];
  235. if (av_fifo_can_read(st->fifo)) {
  236. SyncQueueFrame peek;
  237. int64_t ts;
  238. int cmp = 1;
  239. av_fifo_peek(st->fifo, &peek, 1, 0);
  240. ts = frame_ts(sq, peek);
  241. /* check if this stream's tail timestamp does not overtake
  242. * the overall queue head */
  243. if (ts != AV_NOPTS_VALUE && st_head)
  244. cmp = av_compare_ts(ts, st->tb, st_head->head_ts, st_head->tb);
  245. /* We can release frames that do not end after the queue head.
  246. * Frames with no timestamps are just passed through with no conditions.
  247. */
  248. if (cmp <= 0 || ts == AV_NOPTS_VALUE) {
  249. frame_move(sq, frame, peek);
  250. objpool_release(sq->pool, (void**)&peek);
  251. av_fifo_drain2(st->fifo, 1);
  252. return 0;
  253. }
  254. }
  255. return (sq->finished || (st->finished && !av_fifo_can_read(st->fifo))) ?
  256. AVERROR_EOF : AVERROR(EAGAIN);
  257. }
  258. static int receive_internal(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
  259. {
  260. int nb_eof = 0;
  261. int ret;
  262. /* read a frame for a specific stream */
  263. if (stream_idx >= 0) {
  264. ret = receive_for_stream(sq, stream_idx, frame);
  265. return (ret < 0) ? ret : stream_idx;
  266. }
  267. /* read a frame for any stream with available output */
  268. for (unsigned int i = 0; i < sq->nb_streams; i++) {
  269. ret = receive_for_stream(sq, i, frame);
  270. if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) {
  271. nb_eof += (ret == AVERROR_EOF);
  272. continue;
  273. }
  274. return (ret < 0) ? ret : i;
  275. }
  276. return (nb_eof == sq->nb_streams) ? AVERROR_EOF : AVERROR(EAGAIN);
  277. }
  278. int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
  279. {
  280. int ret = receive_internal(sq, stream_idx, frame);
  281. /* try again if the queue overflowed and triggered a fake heartbeat
  282. * for lagging streams */
  283. if (ret == AVERROR(EAGAIN) && overflow_heartbeat(sq, stream_idx))
  284. ret = receive_internal(sq, stream_idx, frame);
  285. return ret;
  286. }
  287. int sq_add_stream(SyncQueue *sq, int limiting)
  288. {
  289. SyncQueueStream *tmp, *st;
  290. tmp = av_realloc_array(sq->streams, sq->nb_streams + 1, sizeof(*sq->streams));
  291. if (!tmp)
  292. return AVERROR(ENOMEM);
  293. sq->streams = tmp;
  294. st = &sq->streams[sq->nb_streams];
  295. memset(st, 0, sizeof(*st));
  296. st->fifo = av_fifo_alloc2(1, sizeof(SyncQueueFrame), AV_FIFO_FLAG_AUTO_GROW);
  297. if (!st->fifo)
  298. return AVERROR(ENOMEM);
  299. /* we set a valid default, so that a pathological stream that never
  300. * receives even a real timebase (and no frames) won't stall all other
  301. * streams forever; cf. overflow_heartbeat() */
  302. st->tb = (AVRational){ 1, 1 };
  303. st->head_ts = AV_NOPTS_VALUE;
  304. st->frames_max = UINT64_MAX;
  305. st->limiting = limiting;
  306. return sq->nb_streams++;
  307. }
  308. void sq_set_tb(SyncQueue *sq, unsigned int stream_idx, AVRational tb)
  309. {
  310. SyncQueueStream *st;
  311. av_assert0(stream_idx < sq->nb_streams);
  312. st = &sq->streams[stream_idx];
  313. av_assert0(!av_fifo_can_read(st->fifo));
  314. if (st->head_ts != AV_NOPTS_VALUE)
  315. st->head_ts = av_rescale_q(st->head_ts, st->tb, tb);
  316. st->tb = tb;
  317. }
  318. void sq_limit_frames(SyncQueue *sq, unsigned int stream_idx, uint64_t frames)
  319. {
  320. SyncQueueStream *st;
  321. av_assert0(stream_idx < sq->nb_streams);
  322. st = &sq->streams[stream_idx];
  323. st->frames_max = frames;
  324. if (st->frames_sent >= st->frames_max)
  325. finish_stream(sq, stream_idx);
  326. }
  327. SyncQueue *sq_alloc(enum SyncQueueType type, int64_t buf_size_us)
  328. {
  329. SyncQueue *sq = av_mallocz(sizeof(*sq));
  330. if (!sq)
  331. return NULL;
  332. sq->type = type;
  333. sq->buf_size_us = buf_size_us;
  334. sq->head_stream = -1;
  335. sq->head_finished_stream = -1;
  336. sq->pool = (type == SYNC_QUEUE_PACKETS) ? objpool_alloc_packets() :
  337. objpool_alloc_frames();
  338. if (!sq->pool) {
  339. av_freep(&sq);
  340. return NULL;
  341. }
  342. return sq;
  343. }
  344. void sq_free(SyncQueue **psq)
  345. {
  346. SyncQueue *sq = *psq;
  347. if (!sq)
  348. return;
  349. for (unsigned int i = 0; i < sq->nb_streams; i++) {
  350. SyncQueueFrame frame;
  351. while (av_fifo_read(sq->streams[i].fifo, &frame, 1) >= 0)
  352. objpool_release(sq->pool, (void**)&frame);
  353. av_fifo_freep2(&sq->streams[i].fifo);
  354. }
  355. av_freep(&sq->streams);
  356. objpool_free(&sq->pool);
  357. av_freep(psq);
  358. }