Improved the performance of 1 decode + N filter graphs and adaptive
bitrate scenario.

With new option "-abr_pipeline"
1. It enabled multiple filter graph concurrency, which bring above
about 5%~20% improvement in some 1:N scenario by CPU or GPU
acceleration
2. Next step will continue to improve the concurrency of complex
filter graph which can support high efficiency of filter net

Below are some test cases and test result as reference.
(Hardware platform: Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz)
(Software: Intel iHD driver - 16.9.00100, CentOS 7)

Command for Intel GPU acceleration case, 1 decode to N scaling:
ffmpeg -vaapi_device /dev/dri/renderD128 -hwaccel vaapi \
    -hwaccel_output_format vaapi \
    -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
    -vf "scale_vaapi=1280:720:format=nv12,hwdownload" \
    -pix_fmt nv12 -f null /dev/null \
    -vf "scale_vaapi=720:480:format=nv12,hwdownload" \
    -pix_fmt nv12 -f null /dev/null \
    -abr_pipeline

    test results:
                2 scale
    Improved       ~34%

Command for CPU only 1 decode to N scaling:
ffmpeg -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
    -vf "scale=1280:720" -pix_fmt nv12 -f null /dev/null \
    -vf "scale=720:480" -pix_fmt nv12 -f null /dev/null \
    -abr_pipeline

    test results:
                2 scale
    Improved       ~25%

Command for 1:N transcode by GPU acceleration:
./ffmpeg -vaapi_device /dev/dri/renderD128 -hwaccel vaapi \
    -hwaccel_output_format vaapi \
    -i ~/Videos/1920x1080p_30.00_x264_qp28.h264 \
    -vf "scale_vaapi=1280:720" -c:v h264_vaapi -f null /dev/null \
    -vf "scale_vaapi=720:480" -c:v h264_vaapi -f null /dev/null \
    -abr_pipeline

    test results:
                2 scale+enc
    Improved      ~6.1%

Signed-off-by: Wang, Shaofei <shaofei.w...@intel.com>
Signed-off-by: Jun Zhao <jun.z...@intel.com>
---
 fftools/ffmpeg.c        |  236 ++++++++++++++++++++++++++++++++++++++++++++---
 fftools/ffmpeg.h        |   12 +++
 fftools/ffmpeg_filter.c |    6 +
 fftools/ffmpeg_opt.c    |    6 +-
 4 files changed, 246 insertions(+), 14 deletions(-)

diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 38c21e9..5dc80fd 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -1523,6 +1523,110 @@ static int reap_filters(int flush)
     return 0;
 }
 
+static int pipeline_reap_filters(int flush, InputFilter * ifilter)
+{
+    AVFrame *filtered_frame = NULL;
+    int i;
+
+    for (i = 0; i < nb_output_streams; i++) {
+        if (ifilter == output_streams[i]->filter->graph->inputs[0]) break;
+    }
+    OutputStream *ost = output_streams[i];
+    OutputFile    *of = output_files[ost->file_index];
+    AVFilterContext *filter;
+    AVCodecContext *enc = ost->enc_ctx;
+    int ret = 0;
+
+    if (!ost->filter || !ost->filter->graph->graph)
+        return 0;
+    filter = ost->filter->filter;
+
+    if (!ost->initialized) {
+        char error[1024] = "";
+        ret = init_output_stream(ost, error, sizeof(error));
+        if (ret < 0) {
+            av_log(NULL, AV_LOG_ERROR, "Error initializing output stream %d:%d 
-- %s\n",
+                   ost->file_index, ost->index, error);
+            exit_program(1);
+        }
+    }
+
+    if (!ost->filtered_frame && !(ost->filtered_frame = av_frame_alloc())) {
+        return AVERROR(ENOMEM);
+    }
+    filtered_frame = ost->filtered_frame;
+
+    while (1) {
+        double float_pts = AV_NOPTS_VALUE; // this is identical to 
filtered_frame.pts but with higher precision
+        ret = av_buffersink_get_frame_flags(filter, filtered_frame,
+                                           AV_BUFFERSINK_FLAG_NO_REQUEST);
+        if (ret < 0) {
+            if (ret != AVERROR(EAGAIN) && ret != AVERROR_EOF) {
+                av_log(NULL, AV_LOG_WARNING,
+                       "Error in av_buffersink_get_frame_flags(): %s\n", 
av_err2str(ret));
+            } else if (flush && ret == AVERROR_EOF) {
+                if (av_buffersink_get_type(filter) == AVMEDIA_TYPE_VIDEO)
+                    do_video_out(of, ost, NULL, AV_NOPTS_VALUE);
+            }
+            break;
+        }
+        if (ost->finished) {
+            av_frame_unref(filtered_frame);
+            continue;
+        }
+        if (filtered_frame->pts != AV_NOPTS_VALUE) {
+            int64_t start_time = (of->start_time == AV_NOPTS_VALUE) ? 0 : 
of->start_time;
+            AVRational filter_tb = av_buffersink_get_time_base(filter);
+            AVRational tb = enc->time_base;
+            int extra_bits = av_clip(29 - av_log2(tb.den), 0, 16);
+
+            tb.den <<= extra_bits;
+            float_pts =
+                av_rescale_q(filtered_frame->pts, filter_tb, tb) -
+                av_rescale_q(start_time, AV_TIME_BASE_Q, tb);
+            float_pts /= 1 << extra_bits;
+            // avoid exact midoints to reduce the chance of rounding 
differences, this can be removed in case the fps code is changed to work with 
integers
+            float_pts += FFSIGN(float_pts) * 1.0 / (1<<17);
+
+            filtered_frame->pts =
+                av_rescale_q(filtered_frame->pts, filter_tb, enc->time_base) -
+                av_rescale_q(start_time, AV_TIME_BASE_Q, enc->time_base);
+        }
+
+        switch (av_buffersink_get_type(filter)) {
+        case AVMEDIA_TYPE_VIDEO:
+            if (!ost->frame_aspect_ratio.num)
+                enc->sample_aspect_ratio = filtered_frame->sample_aspect_ratio;
+
+            if (debug_ts) {
+                av_log(NULL, AV_LOG_INFO, "filter -> pts:%s pts_time:%s 
exact:%f time_base:%d/%d\n",
+                        av_ts2str(filtered_frame->pts), 
av_ts2timestr(filtered_frame->pts, &enc->time_base),
+                        float_pts,
+                        enc->time_base.num, enc->time_base.den);
+            }
+
+            do_video_out(of, ost, filtered_frame, float_pts);
+            break;
+        case AVMEDIA_TYPE_AUDIO:
+            if (!(enc->codec->capabilities & AV_CODEC_CAP_PARAM_CHANGE) &&
+                enc->channels != filtered_frame->channels) {
+                av_log(NULL, AV_LOG_ERROR,
+                       "Audio filter graph output is not normalized and 
encoder does not support parameter changes\n");
+                break;
+            }
+            do_audio_out(of, ost, filtered_frame);
+            break;
+        default:
+            // TODO support subtitle filters
+            av_assert0(0);
+        }
+
+        av_frame_unref(filtered_frame);
+    }
+
+    return 0;
+}
+
 static void print_final_stats(int64_t total_size)
 {
     uint64_t video_size = 0, audio_size = 0, extra_size = 0, other_size = 0;
@@ -2175,7 +2279,15 @@ static int ifilter_send_frame(InputFilter *ifilter, 
AVFrame *frame)
             }
         }
 
+#if HAVE_THREADS
+        if (!abr_pipeline) {
+            ret = reap_filters(1);
+        } else {
+            ret = pipeline_reap_filters(1, ifilter);
+        }
+#else
         ret = reap_filters(1);
+#endif
         if (ret < 0 && ret != AVERROR_EOF) {
             av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", 
av_err2str(ret));
             return ret;
@@ -2204,6 +2316,16 @@ static int ifilter_send_eof(InputFilter *ifilter, 
int64_t pts)
 
     ifilter->eof = 1;
 
+#if HAVE_THREADS
+    if (abr_pipeline) {
+        ifilter->waited_frm = NULL;
+        pthread_mutex_lock(&ifilter->process_mutex);
+        ifilter->t_end = 1;
+        pthread_cond_signal(&ifilter->process_cond);
+        pthread_mutex_unlock(&ifilter->process_mutex);
+        pthread_join(ifilter->f_thread, NULL);
+    }
+#endif
     if (ifilter->filter) {
         ret = av_buffersrc_close(ifilter->filter, pts, AV_BUFFERSRC_FLAG_PUSH);
         if (ret < 0)
@@ -2248,6 +2370,41 @@ static int decode(AVCodecContext *avctx, AVFrame *frame, 
int *got_frame, AVPacke
     return 0;
 }
 
+#if HAVE_THREADS
+static void *filter_pipeline(void *arg)
+{
+    InputFilter *fl = arg;
+    AVFrame *frm;
+    int ret;
+    while(1) {
+        pthread_mutex_lock(&fl->process_mutex);
+        while (fl->waited_frm == NULL && !fl->t_end)
+            pthread_cond_wait(&fl->process_cond, &fl->process_mutex);
+        pthread_mutex_unlock(&fl->process_mutex);
+
+        if (fl->t_end) break;
+
+        frm = fl->waited_frm;
+        ret = ifilter_send_frame(fl, frm);
+        if (ret < 0) {
+            av_log(NULL, AV_LOG_ERROR,
+                   "Failed to inject frame into filter network: %s\n", 
av_err2str(ret));
+        } else
+            ret = pipeline_reap_filters(0, fl);
+
+        fl->t_error = ret;
+
+        pthread_mutex_lock(&fl->finish_mutex);
+        fl->waited_frm = NULL;
+        pthread_cond_signal(&fl->finish_cond);
+        pthread_mutex_unlock(&fl->finish_mutex);
+
+        if (ret < 0)
+            break;
+    }
+    return;
+}
+#endif
 static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
 {
     int i, ret;
@@ -2255,22 +2412,72 @@ static int send_frame_to_filters(InputStream *ist, 
AVFrame *decoded_frame)
 
     av_assert1(ist->nb_filters > 0); /* ensure ret is initialized */
     for (i = 0; i < ist->nb_filters; i++) {
-        if (i < ist->nb_filters - 1) {
-            f = ist->filter_frame;
-            ret = av_frame_ref(f, decoded_frame);
-            if (ret < 0)
+#if HAVE_THREADS
+        if (!abr_pipeline) {
+#endif
+            if (i < ist->nb_filters - 1) {
+                f = ist->filter_frame;
+                ret = av_frame_ref(f, decoded_frame);
+                if (ret < 0)
+                    break;
+            } else
+                f = decoded_frame;
+
+                ret = ifilter_send_frame(ist->filters[i], f);
+                if (ret == AVERROR_EOF)
+                    ret = 0; /* ignore */
+                if (ret < 0) {
+                    av_log(NULL, AV_LOG_ERROR,
+                           "Failed to inject frame into filter network: %s\n", 
av_err2str(ret));
+                    break;
+                }
+#if HAVE_THREADS
+        } else {
+            if (i < ist->nb_filters - 1) {
+                f = &ist->filters[i]->input_frm;
+                ret = av_frame_ref(f, decoded_frame);
+                if (ret < 0)
+                    break;
+            } else
+                f = decoded_frame;
+
+            if(ist->filters[i]->f_thread == 0) {
+                if ((ret = pthread_create(&ist->filters[i]->f_thread, NULL, 
filter_pipeline, ist->filters[i]))) {
+                    av_log(NULL, AV_LOG_ERROR, "pthread_create failed: %s. Try 
to increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret));
+                    return AVERROR(ret);
+                }
+                pthread_mutex_init(&ist->filters[i]->process_mutex, NULL);
+                pthread_mutex_init(&ist->filters[i]->finish_mutex, NULL);
+                pthread_cond_init(&ist->filters[i]->process_cond, NULL);
+                pthread_cond_init(&ist->filters[i]->finish_cond, NULL);
+                ist->filters[i]->t_end = 0;
+                ist->filters[i]->t_error = 0;
+            }
+
+            pthread_mutex_lock(&ist->filters[i]->process_mutex);
+            ist->filters[i]->waited_frm = f;
+            pthread_cond_signal(&ist->filters[i]->process_cond);
+            pthread_mutex_unlock(&ist->filters[i]->process_mutex);
+        }
+#endif
+    }
+#if HAVE_THREADS
+    if (abr_pipeline) {
+        for (i = 0; i < ist->nb_filters; i++) {
+            pthread_mutex_lock(&ist->filters[i]->finish_mutex);
+            while(ist->filters[i]->waited_frm != NULL)
+                pthread_cond_wait(&ist->filters[i]->finish_cond, 
&ist->filters[i]->finish_mutex);
+            pthread_mutex_unlock(&ist->filters[i]->finish_mutex);
+        }
+        for (i = 0; i < ist->nb_filters; i++) {
+            if (ist->filters[i]->t_error < 0) {
+                ret = ist->filters[i]->t_error;
                 break;
-        } else
-            f = decoded_frame;
-        ret = ifilter_send_frame(ist->filters[i], f);
-        if (ret == AVERROR_EOF)
-            ret = 0; /* ignore */
-        if (ret < 0) {
-            av_log(NULL, AV_LOG_ERROR,
-                   "Failed to inject frame into filter network: %s\n", 
av_err2str(ret));
-            break;
+            }
         }
     }
+#endif
+
     return ret;
 }
 
@@ -4635,6 +4842,9 @@ static int transcode_step(void)
     if (ret < 0)
         return ret == AVERROR_EOF ? 0 : ret;
 
+#if HAVE_THREADS
+    if (abr_pipeline) return 0;
+#endif
     return reap_filters(0);
 }
 
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index eb1eaf6..436e428 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -253,6 +253,17 @@ typedef struct InputFilter {
 
     AVBufferRef *hw_frames_ctx;
 
+    // for abr pipeline
+    AVFrame *waited_frm;
+    AVFrame input_frm;
+    pthread_t f_thread;
+    pthread_cond_t process_cond;
+    pthread_cond_t finish_cond;
+    pthread_mutex_t process_mutex;
+    pthread_mutex_t finish_mutex;
+    int t_end;
+    int t_error;
+
     int eof;
 } InputFilter;
 
@@ -606,6 +617,7 @@ extern int frame_bits_per_raw_sample;
 extern AVIOContext *progress_avio;
 extern float max_error_rate;
 extern char *videotoolbox_pixfmt;
+extern int abr_pipeline;
 
 extern int filter_nbthreads;
 extern int filter_complex_nbthreads;
diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
index 6518d50..0323b10 100644
--- a/fftools/ffmpeg_filter.c
+++ b/fftools/ffmpeg_filter.c
@@ -197,6 +197,7 @@ DEF_CHOOSE_FORMAT(channel_layouts, uint64_t, 
channel_layout, channel_layouts, 0,
 int init_simple_filtergraph(InputStream *ist, OutputStream *ost)
 {
     FilterGraph *fg = av_mallocz(sizeof(*fg));
+    int i;
 
     if (!fg)
         exit_program(1);
@@ -225,6 +226,11 @@ int init_simple_filtergraph(InputStream *ist, OutputStream 
*ost)
     GROW_ARRAY(ist->filters, ist->nb_filters);
     ist->filters[ist->nb_filters - 1] = fg->inputs[0];
 
+    if (abr_pipeline) {
+        for (i = 0; i < ist->nb_filters; i++) {
+            ist->filters[i]->f_thread = 0;
+        }
+    }
     GROW_ARRAY(filtergraphs, nb_filtergraphs);
     filtergraphs[nb_filtergraphs - 1] = fg;
 
diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
index d4851a2..fa5a556 100644
--- a/fftools/ffmpeg_opt.c
+++ b/fftools/ffmpeg_opt.c
@@ -110,6 +110,7 @@ float max_error_rate  = 2.0/3;
 int filter_nbthreads = 0;
 int filter_complex_nbthreads = 0;
 int vstats_version = 2;
+int abr_pipeline      = 0;
 
 
 static int intra_only         = 0;
@@ -3502,7 +3503,10 @@ const OptionDef options[] = {
         "set the maximum number of queued packets from the demuxer" },
     { "find_stream_info", OPT_BOOL | OPT_PERFILE | OPT_INPUT | OPT_EXPERT, { 
&find_stream_info },
         "read and decode the streams to fill missing information with 
heuristics" },
-
+#if HAVE_THREADS
+    { "abr_pipeline",    OPT_BOOL,                                    { 
&abr_pipeline },
+        "adaptive bitrate pipeline (1 decode to N filter graphs, and 1 to N 
transcode" },
+#endif
     /* video options */
     { "vframes",      OPT_VIDEO | HAS_ARG  | OPT_PERFILE | OPT_OUTPUT,         
  { .func_arg = opt_video_frames },
         "set the number of video frames to output", "number" },
-- 
1.7.1

_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
http://ffmpeg.org/mailman/listinfo/ffmpeg-devel

Reply via email to