The branch, master has been updated
via fd1fd5850de4f3beb240422b67889bb776b26c77 (commit)
via d43fd5b3321cc1ece25835e2b44936880ee05328 (commit)
via 56d9ca69d7f229dccee6ad47c67a37f558196fb7 (commit)
via 9d0b88feb17dabfebcb10b801045c1285fa5e4bc (commit)
via 59a847a23718a87a2bd1e2eae893d9784ed34b0f (commit)
via 5f4cbb5617ba52fb0b5bce36eb36e4cd7bdaec70 (commit)
via 23f1f094f854bdeaae4225c1876fd2b985e977f4 (commit)
via fd4b5b24cedac1f7bae6792cbe4216f3e30a2cae (commit)
via 15407cf90bb4fd9f47f85f078e5689b2593ccbc3 (commit)
from 1608aa38a2ca9bd49debc5577ee38e2726303eda (commit)
- Log -----------------------------------------------------------------
commit fd1fd5850de4f3beb240422b67889bb776b26c77
Author: Niklas Haas <[email protected]>
AuthorDate: Mon Sep 8 19:11:29 2025 +0200
Commit: Niklas Haas <[email protected]>
CommitDate: Tue Sep 30 13:16:59 2025 +0200
fftools/ffmpeg_sched: unchoke upstream nodes on recv-closed filter inputs
This allows upstream filters to observe EOF on their corresponding outputs
and terminate early, which is particularly useful for decoders and demuxers
that may then gracefully release their resources.
Prevents "leaking" memory for previously used, but now unused, filter
inputs.
diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index dbe337ca16..d08f4a061d 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -1368,6 +1368,18 @@ static void schedule_update_locked(Scheduler *sch)
}
}
+ // also unchoke any sources feeding into closed filter graph inputs, so
+ // that they can observe the downstream EOF
+ for (unsigned i = 0; i < sch->nb_filters; i++) {
+ SchFilterGraph *fg = &sch->filters[i];
+
+ for (unsigned j = 0; j < fg->nb_inputs; j++) {
+ SchFilterIn *fi = &fg->inputs[j];
+ if (fi->receive_finished && !fi->send_finished)
+ unchoke_for_stream(sch, fi->src);
+ }
+ }
+
// make sure to unchoke at least one source, if still available
for (unsigned type = 0; !have_unchoked && type < 2; type++)
for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux);
i++) {
@@ -2478,6 +2490,8 @@ void sch_filter_receive_finish(Scheduler *sch, unsigned
fg_idx, unsigned in_idx)
av_assert0(in_idx < fg->nb_inputs);
fi = &fg->inputs[in_idx];
+ pthread_mutex_lock(&sch->schedule_lock);
+
if (!fi->receive_finished) {
fi->receive_finished = 1;
tq_receive_finish(fg->queue, in_idx);
@@ -2485,7 +2499,11 @@ void sch_filter_receive_finish(Scheduler *sch, unsigned
fg_idx, unsigned in_idx)
// close the control stream when all actual inputs are done
if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
tq_receive_finish(fg->queue, fg->nb_inputs);
+
+ schedule_update_locked(sch);
}
+
+ pthread_mutex_unlock(&sch->schedule_lock);
}
int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame
*frame)
commit d43fd5b3321cc1ece25835e2b44936880ee05328
Author: Niklas Haas <[email protected]>
AuthorDate: Tue Sep 9 13:10:53 2025 +0200
Commit: Niklas Haas <[email protected]>
CommitDate: Tue Sep 30 13:16:59 2025 +0200
fftools/ffmpeg_sched: close stream when sch_filter_send receives EOF
THis is currently done by sch_demux_send() (via demux_stream_send_to_dst()),
sch_enc_send() (via enc_send_to_dst()), and sch_dec_send() (via
dec_send_to_dst()), but not by sch_filter_send().
Implement the same queue-closing logic for them. The main benefit here is
that
this will allow them to mark downstream inputs as send-done (in addition
to received-done), which is useful for a following commit.
diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 589f5360f2..dbe337ca16 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -2492,6 +2492,7 @@ int sch_filter_send(Scheduler *sch, unsigned fg_idx,
unsigned out_idx, AVFrame *
{
SchFilterGraph *fg;
SchedulerNode dst;
+ int ret;
av_assert0(fg_idx < sch->nb_filters);
fg = &sch->filters[fg_idx];
@@ -2499,9 +2500,16 @@ int sch_filter_send(Scheduler *sch, unsigned fg_idx,
unsigned out_idx, AVFrame *
av_assert0(out_idx < fg->nb_outputs);
dst = fg->outputs[out_idx].dst;
- return (dst.type == SCH_NODE_TYPE_ENC) ?
- send_to_enc (sch, &sch->enc[dst.idx], frame) :
- send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame);
+ if (dst.type == SCH_NODE_TYPE_ENC) {
+ ret = send_to_enc(sch, &sch->enc[dst.idx], frame);
+ if (ret == AVERROR_EOF)
+ send_to_enc(sch, &sch->enc[dst.idx], NULL);
+ } else {
+ ret = send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream,
frame);
+ if (ret == AVERROR_EOF)
+ send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
+ }
+ return ret;
}
static int filter_done(Scheduler *sch, unsigned fg_idx)
commit 56d9ca69d7f229dccee6ad47c67a37f558196fb7
Author: Niklas Haas <[email protected]>
AuthorDate: Mon Sep 8 14:00:53 2025 +0200
Commit: Niklas Haas <[email protected]>
CommitDate: Tue Sep 30 13:16:59 2025 +0200
fftools/ffmpeg_dec: free decoder ctx after EOF
The codec context is no longer used after the decoder thread exits, but
still idly sticks around until program exit, wasting memory.
diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c
index 521bd657f3..66c58c1c3c 100644
--- a/fftools/ffmpeg_dec.c
+++ b/fftools/ffmpeg_dec.c
@@ -1018,6 +1018,7 @@ static int decoder_thread(void *arg)
finish:
dec_thread_uninit(&dt);
+ avcodec_free_context(&dp->dec_ctx);
return ret;
}
commit 9d0b88feb17dabfebcb10b801045c1285fa5e4bc
Author: Niklas Haas <[email protected]>
AuthorDate: Thu Sep 4 18:22:27 2025 +0200
Commit: Niklas Haas <[email protected]>
CommitDate: Tue Sep 30 13:16:59 2025 +0200
fftools/ffmpeg_sched: forward demuxer choke status to dst queues
Cut off a choked demuxer's output codec/filter queues, effectively
preventing
them from processing packets while the demuxer is choked. Avoids downstream
nodes from piling up extra input that a demuxer shouldn't currently be
sending.
The main benefit of this is to avoid queuing up excess packets that don't
want
to be decoded yet, reducing memory consumption for idle inputs by preventing
them from being read earlier than needed.
diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 039cd1c9aa..589f5360f2 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -1294,6 +1294,36 @@ static void unchoke_for_stream(Scheduler *sch,
SchedulerNode src)
}
}
+static void choke_demux(const Scheduler *sch, int demux_id, int choked)
+{
+ av_assert1(demux_id < sch->nb_demux);
+ SchDemux *demux = &sch->demux[demux_id];
+
+ for (int i = 0; i < demux->nb_streams; i++) {
+ SchedulerNode *dst = demux->streams[i].dst;
+ SchFilterGraph *fg;
+
+ switch (dst->type) {
+ case SCH_NODE_TYPE_DEC:
+ tq_choke(sch->dec[dst->idx].queue, choked);
+ break;
+ case SCH_NODE_TYPE_ENC:
+ tq_choke(sch->enc[dst->idx].queue, choked);
+ break;
+ case SCH_NODE_TYPE_MUX:
+ break;
+ case SCH_NODE_TYPE_FILTER_IN:
+ fg = &sch->filters[dst->idx];
+ if (fg->nb_inputs == 1)
+ tq_choke(fg->queue, choked);
+ break;
+ default:
+ av_unreachable("Invalid destination node type?");
+ break;
+ }
+ }
+}
+
static void schedule_update_locked(Scheduler *sch)
{
int64_t dts;
@@ -1350,13 +1380,16 @@ static void schedule_update_locked(Scheduler *sch)
}
}
-
- for (unsigned type = 0; type < 2; type++)
+ for (unsigned type = 0; type < 2; type++) {
for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux);
i++) {
SchWaiter *w = type ? &sch->filters[i].waiter :
&sch->demux[i].waiter;
- if (w->choked_prev != w->choked_next)
+ if (w->choked_prev != w->choked_next) {
waiter_set(w, w->choked_next);
+ if (!type)
+ choke_demux(sch, i, w->choked_next);
+ }
}
+ }
}
@@ -2595,6 +2628,8 @@ int sch_stop(Scheduler *sch, int64_t *finish_ts)
for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters);
i++) {
SchWaiter *w = type ? &sch->demux[i].waiter :
&sch->filters[i].waiter;
waiter_set(w, 1);
+ if (type)
+ choke_demux(sch, i, 0); // unfreeze to allow draining
}
for (unsigned i = 0; i < sch->nb_demux; i++) {
commit 59a847a23718a87a2bd1e2eae893d9784ed34b0f
Author: Niklas Haas <[email protected]>
AuthorDate: Thu Sep 4 18:09:31 2025 +0200
Commit: Niklas Haas <[email protected]>
CommitDate: Tue Sep 30 13:16:59 2025 +0200
fftools/thread_queue: allow choking thread queues directly
Currently, when a demuxer thread is choked, it will avoid queuing more
packets, but any packets already present on the thread queue will still be
processed.
This can be quite wasteful if the choke is due to e.g. decoder not being
needed yet, such as in a filter graph involving concatenation-style filters.
Adding the ability to propagate the choke status to the thread queue
directly
allows downstream decoders and filter graphs to avoid unnecessary work and
buffering.
Reduces the effective latency between scheduler updates and changes in the
thread workfload.
diff --git a/fftools/thread_queue.c b/fftools/thread_queue.c
index b035ffe11d..eb33431c98 100644
--- a/fftools/thread_queue.c
+++ b/fftools/thread_queue.c
@@ -38,6 +38,7 @@ enum {
};
struct ThreadQueue {
+ int choked;
int *finished;
unsigned int nb_streams;
@@ -157,6 +158,9 @@ static int receive_locked(ThreadQueue *tq, int *stream_idx,
{
unsigned int nb_finished = 0;
+ if (tq->choked)
+ return AVERROR(EAGAIN);
+
while (av_container_fifo_read(tq->fifo, data, 0) >= 0) {
unsigned idx;
int ret;
@@ -230,6 +234,7 @@ void tq_send_finish(ThreadQueue *tq, unsigned int
stream_idx)
* next time the consumer thread tries to read this stream it will get
* an EOF and recv-finished flag will be set */
tq->finished[stream_idx] |= FINISHED_SEND;
+ tq->choked = 0;
pthread_cond_broadcast(&tq->cond);
pthread_mutex_unlock(&tq->lock);
@@ -249,3 +254,15 @@ void tq_receive_finish(ThreadQueue *tq, unsigned int
stream_idx)
pthread_mutex_unlock(&tq->lock);
}
+
+void tq_choke(ThreadQueue *tq, int choked)
+{
+ pthread_mutex_lock(&tq->lock);
+
+ int prev_choked = tq->choked;
+ tq->choked = choked;
+ if (choked != prev_choked)
+ pthread_cond_broadcast(&tq->cond);
+
+ pthread_mutex_unlock(&tq->lock);
+}
diff --git a/fftools/thread_queue.h b/fftools/thread_queue.h
index cc01c8a2c9..ad7669f131 100644
--- a/fftools/thread_queue.h
+++ b/fftools/thread_queue.h
@@ -58,6 +58,15 @@ int tq_send(ThreadQueue *tq, unsigned int stream_idx, void
*data);
*/
void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx);
+/**
+ * Prevent further reads from the thread queue until it is unchoked. Threads
+ * attempting to read from the queue will block, similar to when the queue is
+ * empty.
+ *
+ * @param choked 1 to choke, 0 to unchoke
+ */
+void tq_choke(ThreadQueue *tq, int choked);
+
/**
* Read the next item from the queue.
*
commit 5f4cbb5617ba52fb0b5bce36eb36e4cd7bdaec70
Author: Niklas Haas <[email protected]>
AuthorDate: Thu Sep 4 15:11:46 2025 +0200
Commit: Niklas Haas <[email protected]>
CommitDate: Tue Sep 30 13:16:59 2025 +0200
fftools/ffmpeg_sched: choke inputs during filtergraph configuration
Currently, while the filter graph is being initially created, the scheduler
continues demuxing frames on the last input that happened to be active
before
the filter graph was complete.
This can lead to an excess number of decoded frames "piling" up on this
input,
regardless of whether or not it will actually be requested by the configured
filter graph. Suspending the filter graph during this initialization phase
reduces the amount of wasted memory.
diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
index 2dae6400c8..c1c8eeb2d8 100644
--- a/fftools/ffmpeg_filter.c
+++ b/fftools/ffmpeg_filter.c
@@ -2878,6 +2878,7 @@ static const char *unknown_if_null(const char *str)
static int send_frame(FilterGraph *fg, FilterGraphThread *fgt,
InputFilter *ifilter, AVFrame *frame)
{
+ FilterGraphPriv *fgp = fgp_from_fg(fg);
InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
FrameData *fd;
AVFrameSideData *sd;
@@ -2986,6 +2987,11 @@ static int send_frame(FilterGraph *fg, FilterGraphThread
*fgt,
if (reason.len > 1)
reason.str[reason.len - 2] = '\0'; // remove last comma
av_log(fg, AV_LOG_INFO, "Reconfiguring filter graph%s%s\n",
reason.len ? " because " : "", reason.str);
+ } else {
+ /* Choke all input to avoid buffering excessive frames while the
+ * initial filter graph is being configured, and before we have a
+ * preferred input */
+ sch_filter_choke_inputs(fgp->sch, fgp->sch_idx);
}
ret = configure_filtergraph(fg, fgt);
diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 3c5cffa594..039cd1c9aa 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -2510,6 +2510,18 @@ int sch_filter_command(Scheduler *sch, unsigned fg_idx,
AVFrame *frame)
return send_to_filter(sch, fg, fg->nb_inputs, frame);
}
+void sch_filter_choke_inputs(Scheduler *sch, unsigned fg_idx)
+{
+ SchFilterGraph *fg;
+ av_assert0(fg_idx < sch->nb_filters);
+ fg = &sch->filters[fg_idx];
+
+ pthread_mutex_lock(&sch->schedule_lock);
+ fg->best_input = fg->nb_inputs;
+ schedule_update_locked(sch);
+ pthread_mutex_unlock(&sch->schedule_lock);
+}
+
static int task_cleanup(Scheduler *sch, SchedulerNode node)
{
switch (node.type) {
diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h
index 24ad37b778..0c01f558e4 100644
--- a/fftools/ffmpeg_sched.h
+++ b/fftools/ffmpeg_sched.h
@@ -443,6 +443,13 @@ int sch_filter_send(Scheduler *sch, unsigned fg_idx,
unsigned out_idx,
int sch_filter_command(Scheduler *sch, unsigned fg_idx, struct AVFrame *frame);
+/**
+ * Called by filtergraph tasks to choke all filter inputs, preventing them from
+ * receiving more frames until woken up again by the scheduler. Used during
+ * initial graph configuration to avoid unnecessary buffering.
+ */
+void sch_filter_choke_inputs(Scheduler *sch, unsigned fg_idx);
+
/**
* Called by encoder tasks to obtain frames for encoding. Will wait for a frame
* to become available and return it in frame.
commit 23f1f094f854bdeaae4225c1876fd2b985e977f4
Author: Niklas Haas <[email protected]>
AuthorDate: Thu Sep 4 16:53:28 2025 +0200
Commit: Niklas Haas <[email protected]>
CommitDate: Tue Sep 30 13:16:59 2025 +0200
fftools/ffmpeg_sched: get rid of src_sched
This field is just saving (typically) a single pointer indirection; and IMO
makes the logic and graph relations unnecessarily complicated. I am also
considering adding choking logic to decoders and encoders as well, which
this
field would get in the way of.
Apart from the unchoking logic in unchoke_for_input(), the only other place
that uses this field is the (cold) function check_acyclic(), which can be
served just as well with a simple function to do the graph traversal there.
diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 4f0d446007..3c5cffa594 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -189,7 +189,6 @@ typedef struct PreMuxQueue {
typedef struct SchMuxStream {
SchedulerNode src;
- SchedulerNode src_sched;
unsigned *sub_heartbeat_dst;
unsigned nb_sub_heartbeat_dst;
@@ -235,7 +234,6 @@ typedef struct SchMux {
typedef struct SchFilterIn {
SchedulerNode src;
- SchedulerNode src_sched;
int send_finished;
int receive_finished;
} SchFilterIn;
@@ -1268,24 +1266,31 @@ static void unchoke_for_stream(Scheduler *sch,
SchedulerNode src)
{
while (1) {
SchFilterGraph *fg;
-
- // fed directly by a demuxer (i.e. not through a filtergraph)
- if (src.type == SCH_NODE_TYPE_DEMUX) {
+ switch (src.type) {
+ case SCH_NODE_TYPE_DEMUX:
+ // fed directly by a demuxer (i.e. not through a filtergraph)
sch->demux[src.idx].waiter.choked_next = 0;
return;
- }
-
- av_assert0(src.type == SCH_NODE_TYPE_FILTER_OUT);
- fg = &sch->filters[src.idx];
-
- // the filtergraph contains internal sources and
- // requested to be scheduled directly
- if (fg->best_input == fg->nb_inputs) {
- fg->waiter.choked_next = 0;
+ case SCH_NODE_TYPE_DEC:
+ src = sch->dec[src.idx].src;
+ continue;
+ case SCH_NODE_TYPE_ENC:
+ src = sch->enc[src.idx].src;
+ continue;
+ case SCH_NODE_TYPE_FILTER_OUT:
+ fg = &sch->filters[src.idx];
+ // the filtergraph contains internal sources and
+ // requested to be scheduled directly
+ if (fg->best_input == fg->nb_inputs) {
+ fg->waiter.choked_next = 0;
+ return;
+ }
+ src = fg->inputs[fg->best_input].src;
+ continue;
+ default:
+ av_unreachable("Invalid source node type?");
return;
}
-
- src = fg->inputs[fg->best_input].src_sched;
}
}
@@ -1328,7 +1333,7 @@ static void schedule_update_locked(Scheduler *sch)
continue;
// resolve the source to unchoke
- unchoke_for_stream(sch, ms->src_sched);
+ unchoke_for_stream(sch, ms->src);
have_unchoked = 1;
}
}
@@ -1361,6 +1366,27 @@ enum {
CYCLE_NODE_DONE,
};
+// Finds the filtergraph or muxer upstream of a scheduler node
+static SchedulerNode src_filtergraph(const Scheduler *sch, SchedulerNode src)
+{
+ while (1) {
+ switch (src.type) {
+ case SCH_NODE_TYPE_DEMUX:
+ case SCH_NODE_TYPE_FILTER_OUT:
+ return src;
+ case SCH_NODE_TYPE_DEC:
+ src = sch->dec[src.idx].src;
+ continue;
+ case SCH_NODE_TYPE_ENC:
+ src = sch->enc[src.idx].src;
+ continue;
+ default:
+ av_unreachable("Invalid source node type?");
+ return (SchedulerNode) {0};
+ }
+ }
+}
+
static int
check_acyclic_for_output(const Scheduler *sch, SchedulerNode src,
uint8_t *filters_visited, SchedulerNode
*filters_stack)
@@ -1377,22 +1403,23 @@ check_acyclic_for_output(const Scheduler *sch,
SchedulerNode src,
// descend into every input, depth first
if (src.idx_stream < fg->nb_inputs) {
const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
+ SchedulerNode node = src_filtergraph(sch, fi->src);
// connected to demuxer, no cycles possible
- if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX)
+ if (node.type == SCH_NODE_TYPE_DEMUX)
continue;
// otherwise connected to another filtergraph
- av_assert0(fi->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
+ av_assert0(node.type == SCH_NODE_TYPE_FILTER_OUT);
// found a cycle
- if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED)
+ if (filters_visited[node.idx] == CYCLE_NODE_STARTED)
return AVERROR(EINVAL);
// place current position on stack and descend
av_assert0(nb_filters_stack < sch->nb_filters);
filters_stack[nb_filters_stack++] = src;
- src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 };
+ src = (SchedulerNode){ .idx = node.idx, .idx_stream = 0 };
continue;
}
@@ -1514,22 +1541,7 @@ static int start_prepare(Scheduler *sch)
for (unsigned j = 0; j < mux->nb_streams; j++) {
SchMuxStream *ms = &mux->streams[j];
- switch (ms->src.type) {
- case SCH_NODE_TYPE_ENC: {
- SchEnc *enc = &sch->enc[ms->src.idx];
- if (enc->src.type == SCH_NODE_TYPE_DEC) {
- ms->src_sched = sch->dec[enc->src.idx].src;
- av_assert0(ms->src_sched.type == SCH_NODE_TYPE_DEMUX);
- } else {
- ms->src_sched = enc->src;
- av_assert0(ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
- }
- break;
- }
- case SCH_NODE_TYPE_DEMUX:
- ms->src_sched = ms->src;
- break;
- default:
+ if (!ms->src.type) {
av_log(mux, AV_LOG_ERROR,
"Muxer stream #%u not connected to a source\n", j);
return AVERROR(EINVAL);
@@ -1547,26 +1559,12 @@ static int start_prepare(Scheduler *sch)
for (unsigned j = 0; j < fg->nb_inputs; j++) {
SchFilterIn *fi = &fg->inputs[j];
- SchDec *dec;
if (!fi->src.type) {
av_log(fg, AV_LOG_ERROR,
"Filtergraph input %u not connected to a source\n", j);
return AVERROR(EINVAL);
}
-
- if (fi->src.type == SCH_NODE_TYPE_FILTER_OUT)
- fi->src_sched = fi->src;
- else {
- av_assert0(fi->src.type == SCH_NODE_TYPE_DEC);
- dec = &sch->dec[fi->src.idx];
-
- switch (dec->src.type) {
- case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src;
break;
- case SCH_NODE_TYPE_ENC: fi->src_sched =
sch->enc[dec->src.idx].src; break;
- default: av_assert0(0);
- }
- }
}
for (unsigned j = 0; j < fg->nb_outputs; j++) {
commit fd4b5b24cedac1f7bae6792cbe4216f3e30a2cae
Author: Niklas Haas <[email protected]>
AuthorDate: Wed Sep 3 14:38:38 2025 +0200
Commit: Niklas Haas <[email protected]>
CommitDate: Tue Sep 30 13:16:59 2025 +0200
fftools/ffmpeg_sched: lower default frame queue size
I tested this extensively under different conditions and could not come up
with any scenario where using a larger queue size was actually beneficial.
Moreover, having such a large default queue is very wasteful especially
for larger frame sizes; and can in the worst case lead to an extra ~50%
memory
footprint per input (with the default 16 threads), regardless of whether
that
input is currently in use or not.
My methodology was to add logging in the event of a queue underrun/overrun,
and then observe and then observe the frequency of such events in practice,
as well as the impact on performance. I came up with an example filter graph
involving decoding, filtering and encoding with several input files and
various changes to move the bottleneck around.
I found that, in all configurations I tested, with all thread counts and
bottlenecks, using a queue size of 2 frames yielded practically identical
performance to a queue size of 8 frames. I was only able to consistently
measure a slowdown when restricting the queue to a single frame, where the
underruns ended up making up almost 1.1% of frame events in the worst case.
A summary of my test log follows:
= Bottleneck in decoder =
ffmpeg -i A -i B -i C -filter_complex "concat=n=3" -f null -
== 16 threads ==
=== Queue statistics (dec -> filtergraph) ===
- 8 frames = 91355 underruns, 1 overrun
- 4 frames = 91381 underruns, 2 overruns
- 2 frames = 91326 underruns, 21 overruns
- 1 frame = 91284 underruns, 102 overruns
=== Time elapsed ===
- 8 frames = 14.37s
- 4 frames = 14.28s
- 2 frames = 14.27s
- 1 frame = 14.35s
== 1 thread ==
=== Queue statistics (dec -> filtergraph) ===
- 8 frames = 91801 underruns, 0 overruns
- 4 frames = 91929 underruns, 1 overrun
- 2 frames = 91854 underruns, 7 overruns
- 1 frame = 91745 underrons, 83 overruns
=== Time elapsed ===
- 8 frames = 39.51s
- 4 frames = 39.94s
- 2 frames = 39.91s
- 1 frame = 41.69s
= Bottleneck in filter graph: =
ffmpeg -i A -i B -i C -filter_complex "concat=n=3,scale=3840x2160" -f null -
== 16 threads ==
=== Queue statistics (dec -> filtergraph) ===
- 8 frames = 277 underruns, 84673 overruns
- 4 frames = 640 underruns, 86523 overruns
- 2 frames = 850 underruns, 88751 overruns
- 1 frame = 1028 underruns, 89957 overruns
=== Time elapsed ===
- 8 frames = 26.35s
- 4 frames = 26.31s
- 2 frames = 26.38s
- 1 frame = 26.55s
== 1 thread ==
=== Queue statistics (dec -> filtergraph) ===
- 8 frames = 29746 underruns, 57033 overruns
- 4 frames = 29940 underruns, 58948 overruns
- 2 frames = 30160 underruns, 60185 overruns
- 1 frame = 30259 underruns, 61126 overruns
=== Time elapsed ===
- 8 frames = 52.08s
- 4 frames = 52.49s
- 2 frames = 52.25s
- 1 frame = 52.69s
= Bottleneck in encoder: =
ffmpeg -i A -i B -i C -filter_complex "concat=n=3" -c:v libx264 -preset
veryfast -f null -
== 1 thread ==
== Queue statistics (filtergraph -> enc) ==
- 8 frames = 26763 underruns, 63535 overruns
- 4 frames = 26863 underruns, 63810 overruns
- 2 frames = 27243 underruns, 63839 overruns
- 1 frame = 27670 underruns, 63953 overruns
== Time elapsed ==
- 8 frames = 89.45s
- 4 frames = 89.04s
- 2 frames = 89.24s
- 1 frame = 90.26s
diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h
index fb7a77ddfc..24ad37b778 100644
--- a/fftools/ffmpeg_sched.h
+++ b/fftools/ffmpeg_sched.h
@@ -257,7 +257,7 @@ int sch_add_mux(Scheduler *sch, SchThreadFunc func, int
(*init)(void *),
/**
* Default size of a frame thread queue.
*/
-#define DEFAULT_FRAME_THREAD_QUEUE_SIZE 8
+#define DEFAULT_FRAME_THREAD_QUEUE_SIZE 2
/**
* Add a muxed stream for a previously added muxer.
commit 15407cf90bb4fd9f47f85f078e5689b2593ccbc3
Author: Niklas Haas <[email protected]>
AuthorDate: Wed Sep 3 14:10:55 2025 +0200
Commit: Niklas Haas <[email protected]>
CommitDate: Tue Sep 30 13:16:59 2025 +0200
fftools/ffmpeg_sched: relax queue size assertion
The code in the decoder just cares about allocating enough extra hw frames
to cover the size of the queue; but there's no reason we actually *have* to
use this many. We can safely relax the assertion to a <= check.
diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 3180367576..4f0d446007 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -389,7 +389,7 @@ static int queue_alloc(ThreadQueue **ptq, unsigned
nb_streams, unsigned queue_si
// for frames held in queues inside the ffmpeg utility. If this
// can ever dynamically change then the corresponding decode
// code needs to be updated as well.
- av_assert0(queue_size == DEFAULT_FRAME_THREAD_QUEUE_SIZE);
+ av_assert0(queue_size <= DEFAULT_FRAME_THREAD_QUEUE_SIZE);
}
tq = tq_alloc(nb_streams, queue_size,
-----------------------------------------------------------------------
Summary of changes:
fftools/ffmpeg_dec.c | 1 +
fftools/ffmpeg_filter.c | 6 ++
fftools/ffmpeg_sched.c | 183 +++++++++++++++++++++++++++++++++---------------
fftools/ffmpeg_sched.h | 9 ++-
fftools/thread_queue.c | 17 +++++
fftools/thread_queue.h | 9 +++
6 files changed, 168 insertions(+), 57 deletions(-)
hooks/post-receive
--
_______________________________________________
ffmpeg-cvslog mailing list -- [email protected]
To unsubscribe send an email to [email protected]