The branch, master has been updated
       via  fd1fd5850de4f3beb240422b67889bb776b26c77 (commit)
       via  d43fd5b3321cc1ece25835e2b44936880ee05328 (commit)
       via  56d9ca69d7f229dccee6ad47c67a37f558196fb7 (commit)
       via  9d0b88feb17dabfebcb10b801045c1285fa5e4bc (commit)
       via  59a847a23718a87a2bd1e2eae893d9784ed34b0f (commit)
       via  5f4cbb5617ba52fb0b5bce36eb36e4cd7bdaec70 (commit)
       via  23f1f094f854bdeaae4225c1876fd2b985e977f4 (commit)
       via  fd4b5b24cedac1f7bae6792cbe4216f3e30a2cae (commit)
       via  15407cf90bb4fd9f47f85f078e5689b2593ccbc3 (commit)
      from  1608aa38a2ca9bd49debc5577ee38e2726303eda (commit)


- Log -----------------------------------------------------------------
commit fd1fd5850de4f3beb240422b67889bb776b26c77
Author:     Niklas Haas <[email protected]>
AuthorDate: Mon Sep 8 19:11:29 2025 +0200
Commit:     Niklas Haas <[email protected]>
CommitDate: Tue Sep 30 13:16:59 2025 +0200

    fftools/ffmpeg_sched: unchoke upstream nodes on recv-closed filter inputs
    
    This allows upstream filters to observe EOF on their corresponding outputs
    and terminate early, which is particularly useful for decoders and demuxers
    that may then gracefully release their resources.
    
    Prevents "leaking" memory for previously used, but now unused, filter 
inputs.

diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index dbe337ca16..d08f4a061d 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -1368,6 +1368,18 @@ static void schedule_update_locked(Scheduler *sch)
         }
     }
 
+    // also unchoke any sources feeding into closed filter graph inputs, so
+    // that they can observe the downstream EOF
+    for (unsigned i = 0; i < sch->nb_filters; i++) {
+        SchFilterGraph *fg = &sch->filters[i];
+
+        for (unsigned j = 0; j < fg->nb_inputs; j++) {
+            SchFilterIn *fi = &fg->inputs[j];
+            if (fi->receive_finished && !fi->send_finished)
+                unchoke_for_stream(sch, fi->src);
+        }
+    }
+
     // make sure to unchoke at least one source, if still available
     for (unsigned type = 0; !have_unchoked && type < 2; type++)
         for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); 
i++) {
@@ -2478,6 +2490,8 @@ void sch_filter_receive_finish(Scheduler *sch, unsigned 
fg_idx, unsigned in_idx)
     av_assert0(in_idx < fg->nb_inputs);
     fi = &fg->inputs[in_idx];
 
+    pthread_mutex_lock(&sch->schedule_lock);
+
     if (!fi->receive_finished) {
         fi->receive_finished = 1;
         tq_receive_finish(fg->queue, in_idx);
@@ -2485,7 +2499,11 @@ void sch_filter_receive_finish(Scheduler *sch, unsigned 
fg_idx, unsigned in_idx)
         // close the control stream when all actual inputs are done
         if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
             tq_receive_finish(fg->queue, fg->nb_inputs);
+
+        schedule_update_locked(sch);
     }
+
+    pthread_mutex_unlock(&sch->schedule_lock);
 }
 
 int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame 
*frame)

commit d43fd5b3321cc1ece25835e2b44936880ee05328
Author:     Niklas Haas <[email protected]>
AuthorDate: Tue Sep 9 13:10:53 2025 +0200
Commit:     Niklas Haas <[email protected]>
CommitDate: Tue Sep 30 13:16:59 2025 +0200

    fftools/ffmpeg_sched: close stream when sch_filter_send receives EOF
    
    THis is currently done by sch_demux_send() (via demux_stream_send_to_dst()),
    sch_enc_send() (via enc_send_to_dst()), and sch_dec_send() (via
    dec_send_to_dst()), but not by sch_filter_send().
    
    Implement the same queue-closing logic for them. The main benefit here is 
that
    this will allow them to mark downstream inputs as send-done (in addition
    to received-done), which is useful for a following commit.

diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 589f5360f2..dbe337ca16 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -2492,6 +2492,7 @@ int sch_filter_send(Scheduler *sch, unsigned fg_idx, 
unsigned out_idx, AVFrame *
 {
     SchFilterGraph *fg;
     SchedulerNode  dst;
+    int ret;
 
     av_assert0(fg_idx < sch->nb_filters);
     fg = &sch->filters[fg_idx];
@@ -2499,9 +2500,16 @@ int sch_filter_send(Scheduler *sch, unsigned fg_idx, 
unsigned out_idx, AVFrame *
     av_assert0(out_idx < fg->nb_outputs);
     dst = fg->outputs[out_idx].dst;
 
-    return (dst.type == SCH_NODE_TYPE_ENC)                                    ?
-           send_to_enc   (sch, &sch->enc[dst.idx],                     frame) :
-           send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, frame);
+    if (dst.type == SCH_NODE_TYPE_ENC) {
+        ret = send_to_enc(sch, &sch->enc[dst.idx], frame);
+        if (ret == AVERROR_EOF)
+            send_to_enc(sch, &sch->enc[dst.idx], NULL);
+    } else {
+        ret = send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, 
frame);
+        if (ret == AVERROR_EOF)
+            send_to_filter(sch, &sch->filters[dst.idx], dst.idx_stream, NULL);
+    }
+    return ret;
 }
 
 static int filter_done(Scheduler *sch, unsigned fg_idx)

commit 56d9ca69d7f229dccee6ad47c67a37f558196fb7
Author:     Niklas Haas <[email protected]>
AuthorDate: Mon Sep 8 14:00:53 2025 +0200
Commit:     Niklas Haas <[email protected]>
CommitDate: Tue Sep 30 13:16:59 2025 +0200

    fftools/ffmpeg_dec: free decoder ctx after EOF
    
    The codec context is no longer used after the decoder thread exits, but
    still idly sticks around until program exit, wasting memory.

diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c
index 521bd657f3..66c58c1c3c 100644
--- a/fftools/ffmpeg_dec.c
+++ b/fftools/ffmpeg_dec.c
@@ -1018,6 +1018,7 @@ static int decoder_thread(void *arg)
 
 finish:
     dec_thread_uninit(&dt);
+    avcodec_free_context(&dp->dec_ctx);
 
     return ret;
 }

commit 9d0b88feb17dabfebcb10b801045c1285fa5e4bc
Author:     Niklas Haas <[email protected]>
AuthorDate: Thu Sep 4 18:22:27 2025 +0200
Commit:     Niklas Haas <[email protected]>
CommitDate: Tue Sep 30 13:16:59 2025 +0200

    fftools/ffmpeg_sched: forward demuxer choke status to dst queues
    
    Cut off a choked demuxer's output codec/filter queues, effectively 
preventing
    them from processing packets while the demuxer is choked. Avoids downstream
    nodes from piling up extra input that a demuxer shouldn't currently be
    sending.
    
    The main benefit of this is to avoid queuing up excess packets that don't 
want
    to be decoded yet, reducing memory consumption for idle inputs by preventing
    them from being read earlier than needed.

diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 039cd1c9aa..589f5360f2 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -1294,6 +1294,36 @@ static void unchoke_for_stream(Scheduler *sch, 
SchedulerNode src)
     }
 }
 
+static void choke_demux(const Scheduler *sch, int demux_id, int choked)
+{
+    av_assert1(demux_id < sch->nb_demux);
+    SchDemux *demux = &sch->demux[demux_id];
+
+    for (int i = 0; i < demux->nb_streams; i++) {
+        SchedulerNode *dst = demux->streams[i].dst;
+        SchFilterGraph *fg;
+
+        switch (dst->type) {
+        case SCH_NODE_TYPE_DEC:
+            tq_choke(sch->dec[dst->idx].queue, choked);
+            break;
+        case SCH_NODE_TYPE_ENC:
+            tq_choke(sch->enc[dst->idx].queue, choked);
+            break;
+        case SCH_NODE_TYPE_MUX:
+            break;
+        case SCH_NODE_TYPE_FILTER_IN:
+            fg = &sch->filters[dst->idx];
+            if (fg->nb_inputs == 1)
+                tq_choke(fg->queue, choked);
+            break;
+        default:
+            av_unreachable("Invalid destination node type?");
+            break;
+        }
+    }
+}
+
 static void schedule_update_locked(Scheduler *sch)
 {
     int64_t dts;
@@ -1350,13 +1380,16 @@ static void schedule_update_locked(Scheduler *sch)
             }
         }
 
-
-    for (unsigned type = 0; type < 2; type++)
+    for (unsigned type = 0; type < 2; type++) {
         for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); 
i++) {
             SchWaiter *w = type ? &sch->filters[i].waiter : 
&sch->demux[i].waiter;
-            if (w->choked_prev != w->choked_next)
+            if (w->choked_prev != w->choked_next) {
                 waiter_set(w, w->choked_next);
+                if (!type)
+                    choke_demux(sch, i, w->choked_next);
+            }
         }
+    }
 
 }
 
@@ -2595,6 +2628,8 @@ int sch_stop(Scheduler *sch, int64_t *finish_ts)
         for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); 
i++) {
             SchWaiter *w = type ? &sch->demux[i].waiter : 
&sch->filters[i].waiter;
             waiter_set(w, 1);
+            if (type)
+                choke_demux(sch, i, 0); // unfreeze to allow draining
         }
 
     for (unsigned i = 0; i < sch->nb_demux; i++) {

commit 59a847a23718a87a2bd1e2eae893d9784ed34b0f
Author:     Niklas Haas <[email protected]>
AuthorDate: Thu Sep 4 18:09:31 2025 +0200
Commit:     Niklas Haas <[email protected]>
CommitDate: Tue Sep 30 13:16:59 2025 +0200

    fftools/thread_queue: allow choking thread queues directly
    
    Currently, when a demuxer thread is choked, it will avoid queuing more
    packets, but any packets already present on the thread queue will still be
    processed.
    
    This can be quite wasteful if the choke is due to e.g. decoder not being
    needed yet, such as in a filter graph involving concatenation-style filters.
    Adding the ability to propagate the choke status to the thread queue 
directly
    allows downstream decoders and filter graphs to avoid unnecessary work and
    buffering.
    
    Reduces the effective latency between scheduler updates and changes in the
    thread workfload.

diff --git a/fftools/thread_queue.c b/fftools/thread_queue.c
index b035ffe11d..eb33431c98 100644
--- a/fftools/thread_queue.c
+++ b/fftools/thread_queue.c
@@ -38,6 +38,7 @@ enum {
 };
 
 struct ThreadQueue {
+    int             choked;
     int              *finished;
     unsigned int    nb_streams;
 
@@ -157,6 +158,9 @@ static int receive_locked(ThreadQueue *tq, int *stream_idx,
 {
     unsigned int nb_finished = 0;
 
+    if (tq->choked)
+        return AVERROR(EAGAIN);
+
     while (av_container_fifo_read(tq->fifo, data, 0) >= 0) {
         unsigned idx;
         int ret;
@@ -230,6 +234,7 @@ void tq_send_finish(ThreadQueue *tq, unsigned int 
stream_idx)
      * next time the consumer thread tries to read this stream it will get
      * an EOF and recv-finished flag will be set */
     tq->finished[stream_idx] |= FINISHED_SEND;
+    tq->choked = 0;
     pthread_cond_broadcast(&tq->cond);
 
     pthread_mutex_unlock(&tq->lock);
@@ -249,3 +254,15 @@ void tq_receive_finish(ThreadQueue *tq, unsigned int 
stream_idx)
 
     pthread_mutex_unlock(&tq->lock);
 }
+
+void tq_choke(ThreadQueue *tq, int choked)
+{
+    pthread_mutex_lock(&tq->lock);
+
+    int prev_choked = tq->choked;
+    tq->choked = choked;
+    if (choked != prev_choked)
+        pthread_cond_broadcast(&tq->cond);
+
+    pthread_mutex_unlock(&tq->lock);
+}
diff --git a/fftools/thread_queue.h b/fftools/thread_queue.h
index cc01c8a2c9..ad7669f131 100644
--- a/fftools/thread_queue.h
+++ b/fftools/thread_queue.h
@@ -58,6 +58,15 @@ int tq_send(ThreadQueue *tq, unsigned int stream_idx, void 
*data);
  */
 void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx);
 
+/**
+ * Prevent further reads from the thread queue until it is unchoked. Threads
+ * attempting to read from the queue will block, similar to when the queue is
+ * empty.
+ *
+ * @param choked 1 to choke, 0 to unchoke
+ */
+void tq_choke(ThreadQueue *tq, int choked);
+
 /**
  * Read the next item from the queue.
  *

commit 5f4cbb5617ba52fb0b5bce36eb36e4cd7bdaec70
Author:     Niklas Haas <[email protected]>
AuthorDate: Thu Sep 4 15:11:46 2025 +0200
Commit:     Niklas Haas <[email protected]>
CommitDate: Tue Sep 30 13:16:59 2025 +0200

    fftools/ffmpeg_sched: choke inputs during filtergraph configuration
    
    Currently, while the filter graph is being initially created, the scheduler
    continues demuxing frames on the last input that happened to be active 
before
    the filter graph was complete.
    
    This can lead to an excess number of decoded frames "piling" up on this 
input,
    regardless of whether or not it will actually be requested by the configured
    filter graph. Suspending the filter graph during this initialization phase
    reduces the amount of wasted memory.

diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
index 2dae6400c8..c1c8eeb2d8 100644
--- a/fftools/ffmpeg_filter.c
+++ b/fftools/ffmpeg_filter.c
@@ -2878,6 +2878,7 @@ static const char *unknown_if_null(const char *str)
 static int send_frame(FilterGraph *fg, FilterGraphThread *fgt,
                       InputFilter *ifilter, AVFrame *frame)
 {
+    FilterGraphPriv *fgp = fgp_from_fg(fg);
     InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
     FrameData       *fd;
     AVFrameSideData *sd;
@@ -2986,6 +2987,11 @@ static int send_frame(FilterGraph *fg, FilterGraphThread 
*fgt,
             if (reason.len > 1)
                 reason.str[reason.len - 2] = '\0'; // remove last comma
             av_log(fg, AV_LOG_INFO, "Reconfiguring filter graph%s%s\n", 
reason.len ? " because " : "", reason.str);
+        } else {
+            /* Choke all input to avoid buffering excessive frames while the
+             * initial filter graph is being configured, and before we have a
+             * preferred input */
+            sch_filter_choke_inputs(fgp->sch, fgp->sch_idx);
         }
 
         ret = configure_filtergraph(fg, fgt);
diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 3c5cffa594..039cd1c9aa 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -2510,6 +2510,18 @@ int sch_filter_command(Scheduler *sch, unsigned fg_idx, 
AVFrame *frame)
     return send_to_filter(sch, fg, fg->nb_inputs, frame);
 }
 
+void sch_filter_choke_inputs(Scheduler *sch, unsigned fg_idx)
+{
+    SchFilterGraph *fg;
+    av_assert0(fg_idx < sch->nb_filters);
+    fg = &sch->filters[fg_idx];
+
+    pthread_mutex_lock(&sch->schedule_lock);
+    fg->best_input = fg->nb_inputs;
+    schedule_update_locked(sch);
+    pthread_mutex_unlock(&sch->schedule_lock);
+}
+
 static int task_cleanup(Scheduler *sch, SchedulerNode node)
 {
     switch (node.type) {
diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h
index 24ad37b778..0c01f558e4 100644
--- a/fftools/ffmpeg_sched.h
+++ b/fftools/ffmpeg_sched.h
@@ -443,6 +443,13 @@ int sch_filter_send(Scheduler *sch, unsigned fg_idx, 
unsigned out_idx,
 
 int sch_filter_command(Scheduler *sch, unsigned fg_idx, struct AVFrame *frame);
 
+/**
+ * Called by filtergraph tasks to choke all filter inputs, preventing them from
+ * receiving more frames until woken up again by the scheduler. Used during
+ * initial graph configuration to avoid unnecessary buffering.
+ */
+void sch_filter_choke_inputs(Scheduler *sch, unsigned fg_idx);
+
 /**
  * Called by encoder tasks to obtain frames for encoding. Will wait for a frame
  * to become available and return it in frame.

commit 23f1f094f854bdeaae4225c1876fd2b985e977f4
Author:     Niklas Haas <[email protected]>
AuthorDate: Thu Sep 4 16:53:28 2025 +0200
Commit:     Niklas Haas <[email protected]>
CommitDate: Tue Sep 30 13:16:59 2025 +0200

    fftools/ffmpeg_sched: get rid of src_sched
    
    This field is just saving (typically) a single pointer indirection; and IMO
    makes the logic and graph relations unnecessarily complicated. I am also
    considering adding choking logic to decoders and encoders as well, which 
this
    field would get in the way of.
    
    Apart from the unchoking logic in unchoke_for_input(), the only other place
    that uses this field is the (cold) function check_acyclic(), which can be
    served just as well with a simple function to do the graph traversal there.

diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 4f0d446007..3c5cffa594 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -189,7 +189,6 @@ typedef struct PreMuxQueue {
 
 typedef struct SchMuxStream {
     SchedulerNode       src;
-    SchedulerNode       src_sched;
 
     unsigned           *sub_heartbeat_dst;
     unsigned         nb_sub_heartbeat_dst;
@@ -235,7 +234,6 @@ typedef struct SchMux {
 
 typedef struct SchFilterIn {
     SchedulerNode       src;
-    SchedulerNode       src_sched;
     int                 send_finished;
     int                 receive_finished;
 } SchFilterIn;
@@ -1268,24 +1266,31 @@ static void unchoke_for_stream(Scheduler *sch, 
SchedulerNode src)
 {
     while (1) {
         SchFilterGraph *fg;
-
-        // fed directly by a demuxer (i.e. not through a filtergraph)
-        if (src.type == SCH_NODE_TYPE_DEMUX) {
+        switch (src.type) {
+        case SCH_NODE_TYPE_DEMUX:
+            // fed directly by a demuxer (i.e. not through a filtergraph)
             sch->demux[src.idx].waiter.choked_next = 0;
             return;
-        }
-
-        av_assert0(src.type == SCH_NODE_TYPE_FILTER_OUT);
-        fg = &sch->filters[src.idx];
-
-        // the filtergraph contains internal sources and
-        // requested to be scheduled directly
-        if (fg->best_input == fg->nb_inputs) {
-            fg->waiter.choked_next = 0;
+        case SCH_NODE_TYPE_DEC:
+            src = sch->dec[src.idx].src;
+            continue;
+        case SCH_NODE_TYPE_ENC:
+            src = sch->enc[src.idx].src;
+            continue;
+        case SCH_NODE_TYPE_FILTER_OUT:
+            fg = &sch->filters[src.idx];
+            // the filtergraph contains internal sources and
+            // requested to be scheduled directly
+            if (fg->best_input == fg->nb_inputs) {
+                fg->waiter.choked_next = 0;
+                return;
+            }
+            src = fg->inputs[fg->best_input].src;
+            continue;
+        default:
+            av_unreachable("Invalid source node type?");
             return;
         }
-
-        src = fg->inputs[fg->best_input].src_sched;
     }
 }
 
@@ -1328,7 +1333,7 @@ static void schedule_update_locked(Scheduler *sch)
                 continue;
 
             // resolve the source to unchoke
-            unchoke_for_stream(sch, ms->src_sched);
+            unchoke_for_stream(sch, ms->src);
             have_unchoked = 1;
         }
     }
@@ -1361,6 +1366,27 @@ enum {
     CYCLE_NODE_DONE,
 };
 
+// Finds the filtergraph or muxer upstream of a scheduler node
+static SchedulerNode src_filtergraph(const Scheduler *sch, SchedulerNode src)
+{
+    while (1) {
+        switch (src.type) {
+        case SCH_NODE_TYPE_DEMUX:
+        case SCH_NODE_TYPE_FILTER_OUT:
+            return src;
+        case SCH_NODE_TYPE_DEC:
+            src = sch->dec[src.idx].src;
+            continue;
+        case SCH_NODE_TYPE_ENC:
+            src = sch->enc[src.idx].src;
+            continue;
+        default:
+            av_unreachable("Invalid source node type?");
+            return (SchedulerNode) {0};
+        }
+    }
+}
+
 static int
 check_acyclic_for_output(const Scheduler *sch, SchedulerNode src,
                          uint8_t *filters_visited, SchedulerNode 
*filters_stack)
@@ -1377,22 +1403,23 @@ check_acyclic_for_output(const Scheduler *sch, 
SchedulerNode src,
         // descend into every input, depth first
         if (src.idx_stream < fg->nb_inputs) {
             const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
+            SchedulerNode node = src_filtergraph(sch, fi->src);
 
             // connected to demuxer, no cycles possible
-            if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX)
+            if (node.type == SCH_NODE_TYPE_DEMUX)
                 continue;
 
             // otherwise connected to another filtergraph
-            av_assert0(fi->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
+            av_assert0(node.type == SCH_NODE_TYPE_FILTER_OUT);
 
             // found a cycle
-            if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED)
+            if (filters_visited[node.idx] == CYCLE_NODE_STARTED)
                 return AVERROR(EINVAL);
 
             // place current position on stack and descend
             av_assert0(nb_filters_stack < sch->nb_filters);
             filters_stack[nb_filters_stack++] = src;
-            src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 };
+            src = (SchedulerNode){ .idx = node.idx, .idx_stream = 0 };
             continue;
         }
 
@@ -1514,22 +1541,7 @@ static int start_prepare(Scheduler *sch)
         for (unsigned j = 0; j < mux->nb_streams; j++) {
             SchMuxStream *ms = &mux->streams[j];
 
-            switch (ms->src.type) {
-            case SCH_NODE_TYPE_ENC: {
-                SchEnc *enc = &sch->enc[ms->src.idx];
-                if (enc->src.type == SCH_NODE_TYPE_DEC) {
-                    ms->src_sched = sch->dec[enc->src.idx].src;
-                    av_assert0(ms->src_sched.type == SCH_NODE_TYPE_DEMUX);
-                } else {
-                    ms->src_sched = enc->src;
-                    av_assert0(ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
-                }
-                break;
-                }
-            case SCH_NODE_TYPE_DEMUX:
-                ms->src_sched = ms->src;
-                break;
-            default:
+            if (!ms->src.type) {
                 av_log(mux, AV_LOG_ERROR,
                        "Muxer stream #%u not connected to a source\n", j);
                 return AVERROR(EINVAL);
@@ -1547,26 +1559,12 @@ static int start_prepare(Scheduler *sch)
 
         for (unsigned j = 0; j < fg->nb_inputs; j++) {
             SchFilterIn *fi = &fg->inputs[j];
-            SchDec     *dec;
 
             if (!fi->src.type) {
                 av_log(fg, AV_LOG_ERROR,
                        "Filtergraph input %u not connected to a source\n", j);
                 return AVERROR(EINVAL);
             }
-
-            if (fi->src.type == SCH_NODE_TYPE_FILTER_OUT)
-                fi->src_sched = fi->src;
-            else {
-                av_assert0(fi->src.type == SCH_NODE_TYPE_DEC);
-                dec = &sch->dec[fi->src.idx];
-
-                switch (dec->src.type) {
-                case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src;            
       break;
-                case SCH_NODE_TYPE_ENC:   fi->src_sched = 
sch->enc[dec->src.idx].src; break;
-                default: av_assert0(0);
-                }
-            }
         }
 
         for (unsigned j = 0; j < fg->nb_outputs; j++) {

commit fd4b5b24cedac1f7bae6792cbe4216f3e30a2cae
Author:     Niklas Haas <[email protected]>
AuthorDate: Wed Sep 3 14:38:38 2025 +0200
Commit:     Niklas Haas <[email protected]>
CommitDate: Tue Sep 30 13:16:59 2025 +0200

    fftools/ffmpeg_sched: lower default frame queue size
    
    I tested this extensively under different conditions and could not come up
    with any scenario where using a larger queue size was actually beneficial.
    Moreover, having such a large default queue is very wasteful especially
    for larger frame sizes; and can in the worst case lead to an extra ~50% 
memory
    footprint per input (with the default 16 threads), regardless of whether 
that
    input is currently in use or not.
    
    My methodology was to add logging in the event of a queue underrun/overrun,
    and then observe and then observe the frequency of such events in practice,
    as well as the impact on performance. I came up with an example filter graph
    involving decoding, filtering and encoding with several input files and
    various changes to move the bottleneck around.
    
    I found that, in all configurations I tested, with all thread counts and
    bottlenecks, using a queue size of 2 frames yielded practically identical
    performance to a queue size of 8 frames. I was only able to consistently
    measure a slowdown when restricting the queue to a single frame, where the
    underruns ended up making up almost 1.1% of frame events in the worst case.
    
    A summary of my test log follows:
    
    = Bottleneck in decoder =
    
    ffmpeg -i A -i B -i C -filter_complex "concat=n=3" -f null -
    
    == 16 threads ==
    
    === Queue statistics (dec -> filtergraph) ===
    - 8 frames = 91355 underruns, 1 overrun
    - 4 frames = 91381 underruns, 2 overruns
    - 2 frames = 91326 underruns, 21 overruns
    - 1 frame  = 91284 underruns, 102 overruns
    
    === Time elapsed ===
    - 8 frames = 14.37s
    - 4 frames = 14.28s
    - 2 frames = 14.27s
    - 1 frame  = 14.35s
    
    == 1 thread ==
    
    === Queue statistics (dec -> filtergraph) ===
    - 8 frames = 91801 underruns, 0 overruns
    - 4 frames = 91929 underruns, 1 overrun
    - 2 frames = 91854 underruns, 7 overruns
    - 1 frame  = 91745 underrons, 83 overruns
    
    === Time elapsed ===
    - 8 frames = 39.51s
    - 4 frames = 39.94s
    - 2 frames = 39.91s
    - 1 frame  = 41.69s
    
    = Bottleneck in filter graph: =
    
    ffmpeg -i A -i B -i C -filter_complex "concat=n=3,scale=3840x2160" -f null -
    
    == 16 threads ==
    
    === Queue statistics (dec -> filtergraph) ===
    - 8 frames =  277 underruns, 84673 overruns
    - 4 frames =  640 underruns, 86523 overruns
    - 2 frames =  850 underruns, 88751 overruns
    - 1 frame  = 1028 underruns, 89957 overruns
    
    === Time elapsed ===
    - 8 frames = 26.35s
    - 4 frames = 26.31s
    - 2 frames = 26.38s
    - 1 frame  = 26.55s
    
    == 1 thread ==
    
    === Queue statistics (dec -> filtergraph) ===
    - 8 frames = 29746 underruns, 57033 overruns
    - 4 frames = 29940 underruns, 58948 overruns
    - 2 frames = 30160 underruns, 60185 overruns
    - 1 frame  = 30259 underruns, 61126 overruns
    
    === Time elapsed ===
    - 8 frames = 52.08s
    - 4 frames = 52.49s
    - 2 frames = 52.25s
    - 1 frame  = 52.69s
    
    = Bottleneck in encoder: =
    
    ffmpeg -i A -i B -i C -filter_complex "concat=n=3" -c:v libx264 -preset 
veryfast -f null -
    
    == 1 thread ==
    
    == Queue statistics (filtergraph -> enc) ==
    - 8 frames = 26763 underruns, 63535 overruns
    - 4 frames = 26863 underruns, 63810 overruns
    - 2 frames = 27243 underruns, 63839 overruns
    - 1 frame  = 27670 underruns, 63953 overruns
    
    == Time elapsed ==
    - 8 frames = 89.45s
    - 4 frames = 89.04s
    - 2 frames = 89.24s
    - 1 frame  = 90.26s

diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h
index fb7a77ddfc..24ad37b778 100644
--- a/fftools/ffmpeg_sched.h
+++ b/fftools/ffmpeg_sched.h
@@ -257,7 +257,7 @@ int sch_add_mux(Scheduler *sch, SchThreadFunc func, int 
(*init)(void *),
 /**
  * Default size of a frame thread queue.
  */
-#define DEFAULT_FRAME_THREAD_QUEUE_SIZE 8
+#define DEFAULT_FRAME_THREAD_QUEUE_SIZE 2
 
 /**
  * Add a muxed stream for a previously added muxer.

commit 15407cf90bb4fd9f47f85f078e5689b2593ccbc3
Author:     Niklas Haas <[email protected]>
AuthorDate: Wed Sep 3 14:10:55 2025 +0200
Commit:     Niklas Haas <[email protected]>
CommitDate: Tue Sep 30 13:16:59 2025 +0200

    fftools/ffmpeg_sched: relax queue size assertion
    
    The code in the decoder just cares about allocating enough extra hw frames
    to cover the size of the queue; but there's no reason we actually *have* to
    use this many. We can safely relax the assertion to a <= check.

diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 3180367576..4f0d446007 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -389,7 +389,7 @@ static int queue_alloc(ThreadQueue **ptq, unsigned 
nb_streams, unsigned queue_si
         // for frames held in queues inside the ffmpeg utility.  If this
         // can ever dynamically change then the corresponding decode
         // code needs to be updated as well.
-        av_assert0(queue_size == DEFAULT_FRAME_THREAD_QUEUE_SIZE);
+        av_assert0(queue_size <= DEFAULT_FRAME_THREAD_QUEUE_SIZE);
     }
 
     tq = tq_alloc(nb_streams, queue_size,

-----------------------------------------------------------------------

Summary of changes:
 fftools/ffmpeg_dec.c    |   1 +
 fftools/ffmpeg_filter.c |   6 ++
 fftools/ffmpeg_sched.c  | 183 +++++++++++++++++++++++++++++++++---------------
 fftools/ffmpeg_sched.h  |   9 ++-
 fftools/thread_queue.c  |  17 +++++
 fftools/thread_queue.h  |   9 +++
 6 files changed, 168 insertions(+), 57 deletions(-)


hooks/post-receive
-- 

_______________________________________________
ffmpeg-cvslog mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to