ffmpeg_sched.h 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504
  1. /*
  2. * Inter-thread scheduling/synchronization.
  3. * Copyright (c) 2023 Anton Khirnov
  4. *
  5. * This file is part of FFmpeg.
  6. *
  7. * FFmpeg is free software; you can redistribute it and/or
  8. * modify it under the terms of the GNU Lesser General Public
  9. * License as published by the Free Software Foundation; either
  10. * version 2.1 of the License, or (at your option) any later version.
  11. *
  12. * FFmpeg is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  15. * Lesser General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU Lesser General Public
  18. * License along with FFmpeg; if not, write to the Free Software
  19. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20. */
  21. #ifndef FFTOOLS_FFMPEG_SCHED_H
  22. #define FFTOOLS_FFMPEG_SCHED_H
  23. #include <stddef.h>
  24. #include <stdint.h>
  25. #include "ffmpeg_utils.h"
  26. /*
  27. * This file contains the API for the transcode scheduler.
  28. *
  29. * Overall architecture of the transcoding process involves instances of the
  30. * following components:
  31. * - demuxers, each containing any number of demuxed streams; demuxed packets
  32. * belonging to some stream are sent to any number of decoders (transcoding)
  33. * and/or muxers (streamcopy);
  34. * - decoders, which receive encoded packets from some demuxed stream or
  35. * encoder, decode them, and send decoded frames to any number of filtergraph
  36. * inputs (audio/video) or encoders (subtitles);
  37. * - filtergraphs, each containing zero or more inputs (0 in case the
  38. * filtergraph contains a lavfi source filter), and one or more outputs; the
  39. * inputs and outputs need not have matching media types;
  40. * each filtergraph input receives decoded frames from some decoder or another
  41. * filtergraph output;
  42. * filtered frames from each output are sent to some encoder;
  43. * - encoders, which receive decoded frames from some decoder (subtitles) or
  44. * some filtergraph output (audio/video), encode them, and send encoded
  45. * packets to any number of muxed streams or decoders;
  46. * - muxers, each containing any number of muxed streams; each muxed stream
  47. * receives encoded packets from some demuxed stream (streamcopy) or some
  48. * encoder (transcoding); those packets are interleaved and written out by the
  49. * muxer.
  50. *
  51. * The structure formed by the above components is a directed acyclic graph
  52. * (absence of cycles is checked at startup).
  53. *
  54. * There must be at least one muxer instance, otherwise the transcode produces
  55. * no output and is meaningless. Otherwise, in a generic transcoding scenario
  56. * there may be arbitrary number of instances of any of the above components,
  57. * interconnected in various ways.
  58. *
  59. * The code tries to keep all the output streams across all the muxers in sync
  60. * (i.e. at the same DTS), which is accomplished by varying the rates at which
  61. * packets are read from different demuxers and lavfi sources. Note that the
  62. * degree of control we have over synchronization is fundamentally limited - if
  63. * some demuxed streams in the same input are interleaved at different rates
  64. * than that at which they are to be muxed (e.g. because an input file is badly
  65. * interleaved, or the user changed their speed by mismatching amounts), then
  66. * there will be increasing amounts of buffering followed by eventual
  67. * transcoding failure.
  68. *
  69. * N.B. 1: there are meaningful transcode scenarios with no demuxers, e.g.
  70. * - encoding and muxing output from filtergraph(s) that have no inputs;
  71. * - creating a file that contains nothing but attachments and/or metadata.
  72. *
  73. * N.B. 2: a filtergraph output could, in principle, feed multiple encoders, but
  74. * this is unnecessary because the (a)split filter provides the same
  75. * functionality.
  76. *
  77. * The scheduler, in the above model, is the master object that oversees and
  78. * facilitates the transcoding process. The basic idea is that all instances
  79. * of the abovementioned components communicate only with the scheduler and not
  80. * with each other. The scheduler is then the single place containing the
  81. * knowledge about the whole transcoding pipeline.
  82. */
  83. struct AVFrame;
  84. struct AVPacket;
  85. typedef struct Scheduler Scheduler;
  86. enum SchedulerNodeType {
  87. SCH_NODE_TYPE_NONE = 0,
  88. SCH_NODE_TYPE_DEMUX,
  89. SCH_NODE_TYPE_MUX,
  90. SCH_NODE_TYPE_DEC,
  91. SCH_NODE_TYPE_ENC,
  92. SCH_NODE_TYPE_FILTER_IN,
  93. SCH_NODE_TYPE_FILTER_OUT,
  94. };
  95. typedef struct SchedulerNode {
  96. enum SchedulerNodeType type;
  97. unsigned idx;
  98. unsigned idx_stream;
  99. } SchedulerNode;
  100. typedef int (*SchThreadFunc)(void *arg);
  101. #define SCH_DSTREAM(file, stream) \
  102. (SchedulerNode){ .type = SCH_NODE_TYPE_DEMUX, \
  103. .idx = file, .idx_stream = stream }
  104. #define SCH_MSTREAM(file, stream) \
  105. (SchedulerNode){ .type = SCH_NODE_TYPE_MUX, \
  106. .idx = file, .idx_stream = stream }
  107. #define SCH_DEC_IN(decoder) \
  108. (SchedulerNode){ .type = SCH_NODE_TYPE_DEC, \
  109. .idx = decoder }
  110. #define SCH_DEC_OUT(decoder, out_idx) \
  111. (SchedulerNode){ .type = SCH_NODE_TYPE_DEC, \
  112. .idx = decoder, .idx_stream = out_idx }
  113. #define SCH_ENC(encoder) \
  114. (SchedulerNode){ .type = SCH_NODE_TYPE_ENC, \
  115. .idx = encoder }
  116. #define SCH_FILTER_IN(filter, input) \
  117. (SchedulerNode){ .type = SCH_NODE_TYPE_FILTER_IN, \
  118. .idx = filter, .idx_stream = input }
  119. #define SCH_FILTER_OUT(filter, output) \
  120. (SchedulerNode){ .type = SCH_NODE_TYPE_FILTER_OUT, \
  121. .idx = filter, .idx_stream = output }
  122. Scheduler *sch_alloc(void);
  123. void sch_free(Scheduler **sch);
  124. int sch_start(Scheduler *sch);
  125. int sch_stop(Scheduler *sch, int64_t *finish_ts);
  126. /**
  127. * Wait until transcoding terminates or the specified timeout elapses.
  128. *
  129. * @param timeout_us Amount of time in microseconds after which this function
  130. * will timeout.
  131. * @param transcode_ts Current transcode timestamp in AV_TIME_BASE_Q, for
  132. * informational purposes only.
  133. *
  134. * @retval 0 waiting timed out, transcoding is not finished
  135. * @retval 1 transcoding is finished
  136. */
  137. int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts);
  138. /**
  139. * Add a demuxer to the scheduler.
  140. *
  141. * @param func Function executed as the demuxer task.
  142. * @param ctx Demuxer state; will be passed to func and used for logging.
  143. *
  144. * @retval ">=0" Index of the newly-created demuxer.
  145. * @retval "<0" Error code.
  146. */
  147. int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx);
  148. /**
  149. * Add a demuxed stream for a previously added demuxer.
  150. *
  151. * @param demux_idx index previously returned by sch_add_demux()
  152. *
  153. * @retval ">=0" Index of the newly-created demuxed stream.
  154. * @retval "<0" Error code.
  155. */
  156. int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx);
  157. /**
  158. * Add a decoder to the scheduler.
  159. *
  160. * @param func Function executed as the decoder task.
  161. * @param ctx Decoder state; will be passed to func and used for logging.
  162. * @param send_end_ts The decoder will return an end timestamp after flush packets
  163. * are delivered to it. See documentation for
  164. * sch_dec_receive() for more details.
  165. *
  166. * @retval ">=0" Index of the newly-created decoder.
  167. * @retval "<0" Error code.
  168. */
  169. int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts);
  170. /**
  171. * Add another output to decoder (e.g. for multiview video).
  172. *
  173. * @retval ">=0" Index of the newly-added decoder output.
  174. * @retval "<0" Error code.
  175. */
  176. int sch_add_dec_output(Scheduler *sch, unsigned dec_idx);
  177. /**
  178. * Add a filtergraph to the scheduler.
  179. *
  180. * @param nb_inputs Number of filtergraph inputs.
  181. * @param nb_outputs number of filtergraph outputs
  182. * @param func Function executed as the filtering task.
  183. * @param ctx Filter state; will be passed to func and used for logging.
  184. *
  185. * @retval ">=0" Index of the newly-created filtergraph.
  186. * @retval "<0" Error code.
  187. */
  188. int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
  189. SchThreadFunc func, void *ctx);
  190. /**
  191. * Add a muxer to the scheduler.
  192. *
  193. * Note that muxer thread startup is more complicated than for other components,
  194. * because
  195. * - muxer streams fed by audio/video encoders become initialized dynamically at
  196. * runtime, after those encoders receive their first frame and initialize
  197. * themselves, followed by calling sch_mux_stream_ready()
  198. * - the header can be written after all the streams for a muxer are initialized
  199. * - we may need to write an SDP, which must happen
  200. * - AFTER all the headers are written
  201. * - BEFORE any packets are written by any muxer
  202. * - with all the muxers quiescent
  203. * To avoid complicated muxer-thread synchronization dances, we postpone
  204. * starting the muxer threads until after the SDP is written. The sequence of
  205. * events is then as follows:
  206. * - After sch_mux_stream_ready() is called for all the streams in a given muxer,
  207. * the header for that muxer is written (care is taken that headers for
  208. * different muxers are not written concurrently, since they write file
  209. * information to stderr). If SDP is not wanted, the muxer thread then starts
  210. * and muxing begins.
  211. * - When SDP _is_ wanted, no muxer threads start until the header for the last
  212. * muxer is written. After that, the SDP is written, after which all the muxer
  213. * threads are started at once.
  214. *
  215. * In order for the above to work, the scheduler needs to be able to invoke
  216. * just writing the header, which is the reason the init parameter exists.
  217. *
  218. * @param func Function executed as the muxing task.
  219. * @param init Callback that is called to initialize the muxer and write the
  220. * header. Called after sch_mux_stream_ready() is called for all the
  221. * streams in the muxer.
  222. * @param ctx Muxer state; will be passed to func/init and used for logging.
  223. * @param sdp_auto Determines automatic SDP writing - see sch_sdp_filename().
  224. * @param thread_queue_size number of packets that can be buffered before
  225. * sending to the muxer blocks
  226. *
  227. * @retval ">=0" Index of the newly-created muxer.
  228. * @retval "<0" Error code.
  229. */
  230. int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
  231. void *ctx, int sdp_auto, unsigned thread_queue_size);
  232. /**
  233. * Default size of a packet thread queue. For muxing this can be overridden by
  234. * the thread_queue_size option as passed to a call to sch_add_mux().
  235. */
  236. #define DEFAULT_PACKET_THREAD_QUEUE_SIZE 8
  237. /**
  238. * Default size of a frame thread queue.
  239. */
  240. #define DEFAULT_FRAME_THREAD_QUEUE_SIZE 8
  241. /**
  242. * Add a muxed stream for a previously added muxer.
  243. *
  244. * @param mux_idx index previously returned by sch_add_mux()
  245. *
  246. * @retval ">=0" Index of the newly-created muxed stream.
  247. * @retval "<0" Error code.
  248. */
  249. int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx);
  250. /**
  251. * Configure limits on packet buffering performed before the muxer task is
  252. * started.
  253. *
  254. * @param mux_idx index previously returned by sch_add_mux()
  255. * @param stream_idx_idx index previously returned by sch_add_mux_stream()
  256. * @param data_threshold Total size of the buffered packets' data after which
  257. * max_packets applies.
  258. * @param max_packets maximum Maximum number of buffered packets after
  259. * data_threshold is reached.
  260. */
  261. void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
  262. size_t data_threshold, int max_packets);
  263. /**
  264. * Signal to the scheduler that the specified muxed stream is initialized and
  265. * ready. Muxing is started once all the streams are ready.
  266. */
  267. int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx);
  268. /**
  269. * Set the file path for the SDP.
  270. *
  271. * The SDP is written when either of the following is true:
  272. * - this function is called at least once
  273. * - sdp_auto=1 is passed to EVERY call of sch_add_mux()
  274. */
  275. int sch_sdp_filename(Scheduler *sch, const char *sdp_filename);
  276. /**
  277. * Add an encoder to the scheduler.
  278. *
  279. * @param func Function executed as the encoding task.
  280. * @param ctx Encoder state; will be passed to func and used for logging.
  281. * @param open_cb This callback, if specified, will be called when the first
  282. * frame is obtained for this encoder. For audio encoders with a
  283. * fixed frame size (which use a sync queue in the scheduler to
  284. * rechunk frames), it must return that frame size on success.
  285. * Otherwise (non-audio, variable frame size) it should return 0.
  286. *
  287. * @retval ">=0" Index of the newly-created encoder.
  288. * @retval "<0" Error code.
  289. */
  290. int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
  291. int (*open_cb)(void *func_arg, const struct AVFrame *frame));
  292. /**
  293. * Add an pre-encoding sync queue to the scheduler.
  294. *
  295. * @param buf_size_us Sync queue buffering size, passed to sq_alloc().
  296. * @param logctx Logging context for the sync queue. passed to sq_alloc().
  297. *
  298. * @retval ">=0" Index of the newly-created sync queue.
  299. * @retval "<0" Error code.
  300. */
  301. int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx);
  302. int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx,
  303. int limiting, uint64_t max_frames);
  304. int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst);
  305. enum DemuxSendFlags {
  306. /**
  307. * Treat the packet as an EOF for SCH_NODE_TYPE_MUX destinations
  308. * send normally to other types.
  309. */
  310. DEMUX_SEND_STREAMCOPY_EOF = (1 << 0),
  311. };
  312. /**
  313. * Called by demuxer tasks to communicate with their downstreams. The following
  314. * may be sent:
  315. * - a demuxed packet for the stream identified by pkt->stream_index;
  316. * - demuxer discontinuity/reset (e.g. after a seek) - this is signalled by an
  317. * empty packet with stream_index=-1.
  318. *
  319. * @param demux_idx demuxer index
  320. * @param pkt A demuxed packet to send.
  321. * When flushing (i.e. pkt->stream_index=-1 on entry to this
  322. * function), on successful return pkt->pts/pkt->time_base will be
  323. * set to the maximum end timestamp of any decoded audio stream, or
  324. * AV_NOPTS_VALUE if no decoded audio streams are present.
  325. *
  326. * @retval "non-negative value" success
  327. * @retval AVERROR_EOF all consumers for the stream are done
  328. * @retval AVERROR_EXIT all consumers are done, should terminate demuxing
  329. * @retval "anoter negative error code" other failure
  330. */
  331. int sch_demux_send(Scheduler *sch, unsigned demux_idx, struct AVPacket *pkt,
  332. unsigned flags);
  333. /**
  334. * Called by decoder tasks to receive a packet for decoding.
  335. *
  336. * @param dec_idx decoder index
  337. * @param pkt Input packet will be written here on success.
  338. *
  339. * An empty packet signals that the decoder should be flushed, but
  340. * more packets will follow (e.g. after seeking). When a decoder
  341. * created with send_end_ts=1 receives a flush packet, it must write
  342. * the end timestamp of the stream after flushing to
  343. * pkt->pts/time_base on the next call to this function (if any).
  344. *
  345. * @retval "non-negative value" success
  346. * @retval AVERROR_EOF no more packets will arrive, should terminate decoding
  347. * @retval "another negative error code" other failure
  348. */
  349. int sch_dec_receive(Scheduler *sch, unsigned dec_idx, struct AVPacket *pkt);
  350. /**
  351. * Called by decoder tasks to send a decoded frame downstream.
  352. *
  353. * @param dec_idx Decoder index previously returned by sch_add_dec().
  354. * @param frame Decoded frame; on success it is consumed and cleared by this
  355. * function
  356. *
  357. * @retval ">=0" success
  358. * @retval AVERROR_EOF all consumers are done, should terminate decoding
  359. * @retval "another negative error code" other failure
  360. */
  361. int sch_dec_send(Scheduler *sch, unsigned dec_idx,
  362. unsigned out_idx, struct AVFrame *frame);
  363. /**
  364. * Called by filtergraph tasks to obtain frames for filtering. Will wait for a
  365. * frame to become available and return it in frame.
  366. *
  367. * Filtergraphs that contain lavfi sources and do not currently require new
  368. * input frames should call this function as a means of rate control - then
  369. * in_idx should be set equal to nb_inputs on entry to this function.
  370. *
  371. * @param fg_idx Filtergraph index previously returned by sch_add_filtergraph().
  372. * @param[in,out] in_idx On input contains the index of the input on which a frame
  373. * is most desired. May be set to nb_inputs to signal that
  374. * the filtergraph does not need more input currently.
  375. *
  376. * On success, will be replaced with the input index of
  377. * the actually returned frame or EOF timestamp.
  378. *
  379. * @retval ">=0" Frame data or EOF timestamp was delivered into frame, in_idx
  380. * contains the index of the input it belongs to.
  381. * @retval AVERROR(EAGAIN) No frame was returned, the filtergraph should
  382. * resume filtering. May only be returned when
  383. * in_idx=nb_inputs on entry to this function.
  384. * @retval AVERROR_EOF No more frames will arrive, should terminate filtering.
  385. */
  386. int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
  387. unsigned *in_idx, struct AVFrame *frame);
  388. /**
  389. * Called by filter tasks to signal that a filter input will no longer accept input.
  390. *
  391. * @param fg_idx Filtergraph index previously returned from sch_add_filtergraph().
  392. * @param in_idx Index of the input to finish.
  393. */
  394. void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx);
  395. /**
  396. * Called by filtergraph tasks to send a filtered frame or EOF to consumers.
  397. *
  398. * @param fg_idx Filtergraph index previously returned by sch_add_filtergraph().
  399. * @param out_idx Index of the output which produced the frame.
  400. * @param frame The frame to send to consumers. When NULL, signals that no more
  401. * frames will be produced for the specified output. When non-NULL,
  402. * the frame is consumed and cleared by this function on success.
  403. *
  404. * @retval "non-negative value" success
  405. * @retval AVERROR_EOF all consumers are done
  406. * @retval "anoter negative error code" other failure
  407. */
  408. int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx,
  409. struct AVFrame *frame);
  410. int sch_filter_command(Scheduler *sch, unsigned fg_idx, struct AVFrame *frame);
  411. /**
  412. * Called by encoder tasks to obtain frames for encoding. Will wait for a frame
  413. * to become available and return it in frame.
  414. *
  415. * @param enc_idx Encoder index previously returned by sch_add_enc().
  416. * @param frame Newly-received frame will be stored here on success. Must be
  417. * clean on entrance to this function.
  418. *
  419. * @retval 0 A frame was successfully delivered into frame.
  420. * @retval AVERROR_EOF No more frames will be delivered, the encoder should
  421. * flush everything and terminate.
  422. *
  423. */
  424. int sch_enc_receive(Scheduler *sch, unsigned enc_idx, struct AVFrame *frame);
  425. /**
  426. * Called by encoder tasks to send encoded packets downstream.
  427. *
  428. * @param enc_idx Encoder index previously returned by sch_add_enc().
  429. * @param pkt An encoded packet; it will be consumed and cleared by this
  430. * function on success.
  431. *
  432. * @retval 0 success
  433. * @retval "<0" Error code.
  434. */
  435. int sch_enc_send (Scheduler *sch, unsigned enc_idx, struct AVPacket *pkt);
  436. /**
  437. * Called by muxer tasks to obtain packets for muxing. Will wait for a packet
  438. * for any muxed stream to become available and return it in pkt.
  439. *
  440. * @param mux_idx Muxer index previously returned by sch_add_mux().
  441. * @param pkt Newly-received packet will be stored here on success. Must be
  442. * clean on entrance to this function.
  443. *
  444. * @retval 0 A packet was successfully delivered into pkt. Its stream_index
  445. * corresponds to a stream index previously returned from
  446. * sch_add_mux_stream().
  447. * @retval AVERROR_EOF When pkt->stream_index is non-negative, this signals that
  448. * no more packets will be delivered for this stream index.
  449. * Otherwise this indicates that no more packets will be
  450. * delivered for any stream and the muxer should therefore
  451. * flush everything and terminate.
  452. */
  453. int sch_mux_receive(Scheduler *sch, unsigned mux_idx, struct AVPacket *pkt);
  454. /**
  455. * Called by muxer tasks to signal that a stream will no longer accept input.
  456. *
  457. * @param stream_idx Stream index previously returned from sch_add_mux_stream().
  458. */
  459. void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx);
  460. int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
  461. unsigned dec_idx);
  462. int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx,
  463. const AVPacket *pkt);
  464. #endif /* FFTOOLS_FFMPEG_SCHED_H */