|
@@ -226,6 +226,7 @@ typedef struct SchFilterIn {
|
|
SchedulerNode src;
|
|
SchedulerNode src;
|
|
SchedulerNode src_sched;
|
|
SchedulerNode src_sched;
|
|
int send_finished;
|
|
int send_finished;
|
|
|
|
+ int receive_finished;
|
|
} SchFilterIn;
|
|
} SchFilterIn;
|
|
|
|
|
|
typedef struct SchFilterOut {
|
|
typedef struct SchFilterOut {
|
|
@@ -237,7 +238,8 @@ typedef struct SchFilterGraph {
|
|
|
|
|
|
SchFilterIn *inputs;
|
|
SchFilterIn *inputs;
|
|
unsigned nb_inputs;
|
|
unsigned nb_inputs;
|
|
- atomic_uint nb_inputs_finished;
|
|
|
|
|
|
+ atomic_uint nb_inputs_finished_send;
|
|
|
|
+ unsigned nb_inputs_finished_receive;
|
|
|
|
|
|
SchFilterOut *outputs;
|
|
SchFilterOut *outputs;
|
|
unsigned nb_outputs;
|
|
unsigned nb_outputs;
|
|
@@ -1959,7 +1961,7 @@ static int send_to_filter(Scheduler *sch, SchFilterGraph *fg,
|
|
tq_send_finish(fg->queue, in_idx);
|
|
tq_send_finish(fg->queue, in_idx);
|
|
|
|
|
|
// close the control stream when all actual inputs are done
|
|
// close the control stream when all actual inputs are done
|
|
- if (atomic_fetch_add(&fg->nb_inputs_finished, 1) == fg->nb_inputs - 1)
|
|
|
|
|
|
+ if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
|
|
tq_send_finish(fg->queue, fg->nb_inputs);
|
|
tq_send_finish(fg->queue, fg->nb_inputs);
|
|
}
|
|
}
|
|
return 0;
|
|
return 0;
|
|
@@ -2143,6 +2145,27 @@ int sch_filter_receive(Scheduler *sch, unsigned fg_idx,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
|
|
|
|
+{
|
|
|
|
+ SchFilterGraph *fg;
|
|
|
|
+ SchFilterIn *fi;
|
|
|
|
+
|
|
|
|
+ av_assert0(fg_idx < sch->nb_filters);
|
|
|
|
+ fg = &sch->filters[fg_idx];
|
|
|
|
+
|
|
|
|
+ av_assert0(in_idx < fg->nb_inputs);
|
|
|
|
+ fi = &fg->inputs[in_idx];
|
|
|
|
+
|
|
|
|
+ if (!fi->receive_finished) {
|
|
|
|
+ fi->receive_finished = 1;
|
|
|
|
+ tq_receive_finish(fg->queue, in_idx);
|
|
|
|
+
|
|
|
|
+ // close the control stream when all actual inputs are done
|
|
|
|
+ if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
|
|
|
|
+ tq_receive_finish(fg->queue, fg->nb_inputs);
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
|
|
int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
|
|
{
|
|
{
|
|
SchFilterGraph *fg;
|
|
SchFilterGraph *fg;
|