--- fftools/ffmpeg.c | 38 +++------ fftools/ffmpeg.h | 7 +- fftools/ffmpeg_mux.c | 197 +++++++++++++++++++++++++++++++++++-------- 3 files changed, 178 insertions(+), 64 deletions(-)
diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 9dfbc4216a..8ea27d3422 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -1286,10 +1286,7 @@ static void finish_output_stream(OutputStream *ost) OutputFile *of = output_files[ost->file_index]; ost->finished = ENCODER_FINISHED; - if (ost->sq_idx_mux >= 0) - sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(NULL)); - else - ost->finished |= MUXER_FINISHED; + output_packet(of, ost->pkt, ost, 1); } /** @@ -3421,9 +3418,8 @@ static int need_output(void) for (i = 0; i < nb_output_streams; i++) { OutputStream *ost = output_streams[i]; - OutputFile *of = output_files[ost->file_index]; - if (ost->finished || of_finished(of)) + if (ost->finished) continue; return 1; @@ -4269,26 +4265,6 @@ static int transcode_step(void) return reap_filters(0); } -static void flush_sync_queues_mux(void) -{ - /* mark all queue inputs as done */ - for (int i = 0; i < nb_output_streams; i++) { - OutputStream *ost = output_streams[i]; - OutputFile *of = output_files[ost->file_index]; - if (ost->sq_idx_mux >= 0) - sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(NULL)); - } - - /* encode all packets remaining in the sync queues */ - for (int i = 0; i < nb_output_streams; i++) { - OutputStream *ost = output_streams[i]; - OutputFile *of = output_files[ost->file_index]; - - if (!(ost->finished & MUXER_FINISHED)) - output_packet(of, ost->pkt, ost, 1); - } -} - /* * The following code is the main loop of the file converter */ @@ -4310,6 +4286,12 @@ static int transcode(void) timer_start = av_gettime_relative(); + for (i = 0; i < nb_output_files; i++) { + ret = of_thread_start(output_files[i]); + if (ret < 0) + goto fail; + } + if ((ret = init_input_threads()) < 0) goto fail; @@ -4346,7 +4328,9 @@ static int transcode(void) } } flush_encoders(); - flush_sync_queues_mux(); + + for (i = 0; i < nb_output_files; i++) + of_thread_stop(output_files[i]); term_exit(); diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index 407342462f..c4a5c2a0a2 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -583,6 +583,8 @@ typedef struct OutputFile { const AVOutputFormat *format; const char *url; + AVThreadMessageQueue *mux_queue; + SyncQueue *sq_encode; SyncQueue *sq_mux; @@ -697,11 +699,14 @@ int hwaccel_decode_init(AVCodecContext *avctx); int of_muxer_init(OutputFile *of, AVFormatContext *fc, AVDictionary *opts, int64_t limit_filesize); + +int of_thread_start(OutputFile *of); +void of_thread_stop(OutputFile *of); + int of_write_trailer(OutputFile *of); void of_close(OutputFile **pof); int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof); -int of_finished(OutputFile *of); int64_t of_filesize(OutputFile *of); AVChapter * const * of_get_chapters(OutputFile *of, unsigned int *nb_chapters); diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c index 6ca9a51dd6..f99dd5ec3e 100644 --- a/fftools/ffmpeg_mux.c +++ b/fftools/ffmpeg_mux.c @@ -16,17 +16,20 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ +#include <stdatomic.h> #include <stdio.h> #include <string.h> #include "ffmpeg.h" #include "sync_queue.h" +#include "thread_queue.h" #include "libavutil/fifo.h" #include "libavutil/intreadwrite.h" #include "libavutil/log.h" #include "libavutil/mem.h" #include "libavutil/timestamp.h" +#include "libavutil/thread.h" #include "libavcodec/packet.h" @@ -46,18 +49,24 @@ typedef struct MuxStream { /* dts of the last packet sent to the muxer, in the stream timebase * used for making up missing dts values */ int64_t last_mux_dts; + + /* data (a real or a flush packet) was received for this stream */ + int got_data; } MuxStream; struct Muxer { AVFormatContext *fc; + pthread_t thread; + ThreadQueue *tq; + MuxStream *streams; AVDictionary *opts; /* filesize limit expressed in bytes */ int64_t limit_filesize; - int64_t final_filesize; + atomic_int_least64_t last_filesize; int header_written; }; @@ -221,13 +230,32 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) return 0; } +static int64_t filesize(AVIOContext *pb) +{ + int64_t ret = -1; + + if (pb) { + ret = avio_size(pb); + if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too + ret = avio_tell(pb); + } + + return ret; +} + static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) { MuxStream *ms = &of->mux->streams[ost->index]; AVFormatContext *s = of->mux->fc; AVStream *st = ost->st; + int64_t fs; int ret; + fs = filesize(s->pb); + atomic_store(&of->mux->last_filesize, fs); + if (fs >= of->mux->limit_filesize) + return AVERROR_EOF; + if ((st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO && video_sync_method == VSYNC_DROP) || (st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO && audio_sync_method < 0)) pkt->pts = pkt->dts = AV_NOPTS_VALUE; @@ -333,8 +361,8 @@ static int check_write_header(OutputFile *of) int ret, i; for (i = 0; i < fc->nb_streams; i++) { - OutputStream *ost = output_streams[of->ost_index + i]; - if (!ost->initialized) + MuxStream *ms = &of->mux->streams[i]; + if (!ms->got_data) return 0; } @@ -378,12 +406,15 @@ static int check_write_header(OutputFile *of) return 0; } -int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof) +static int sync_queue_process(OutputFile *of, OutputStream *ost, AVPacket *pkt) { + Muxer *mux = of->mux; + MuxStream *ms = &mux->streams[ost->index]; int ret; - if (!of->mux->header_written) { - ret = check_write_header(of); + ms->got_data = 1; + if (!mux->header_written) { + ret = check_write_header(of); if (ret < 0) { av_packet_unref(pkt); return ret; @@ -391,34 +422,102 @@ int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof) } if (ost->sq_idx_mux >= 0) { - ret = sq_send(of->sq_mux, ost->sq_idx_mux, - SQPKT(eof ? NULL: pkt)); + int ret = sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(pkt)); if (ret < 0) { - av_packet_unref(pkt); - if (ret == AVERROR_EOF) { - ost->finished |= MUXER_FINISHED; - return 0; - } else - return ret; + if (pkt) + av_packet_unref(pkt); + return ret; } while (1) { + pkt = av_packet_alloc(); + if (!pkt) + // XXX + abort(); + ret = sq_receive(of->sq_mux, -1, SQPKT(pkt)); - if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) - return 0; - else if (ret < 0) - return ret; + if (ret < 0) { + av_packet_free(&pkt); + return (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) ? 0 : ret; + } ret = submit_packet(of, pkt, output_streams[of->ost_index + ret]); + av_packet_free(&pkt); if (ret < 0) return ret; } - } else if (!eof) + } else if (pkt) return submit_packet(of, pkt, ost); return 0; } +static void *muxer_thread(void *arg) +{ + OutputFile *of = arg; + Muxer *mux = of->mux; + + while (1) { + OutputStream *ost; + AVPacket *pkt = NULL; + int stream_idx, ret; + + ret = tq_receive(mux->tq, &stream_idx, &pkt); + if (stream_idx < 0) { + av_log(NULL, AV_LOG_DEBUG, + "All streams finished for output file #%d\n", of->index); + break; + } + + ost = output_streams[of->ost_index + stream_idx]; + ret = sync_queue_process(of, ost, ret < 0 ? NULL : pkt); + av_packet_free(&pkt); + if (ret == AVERROR_EOF) + tq_receive_finish(mux->tq, stream_idx); + else if (ret < 0) { + av_log(NULL, AV_LOG_ERROR, + "Error muxing a packet for output file #%d\n", of->index); + break; + } + } + + for (unsigned int i = 0; i < mux->fc->nb_streams; i++) { + sync_queue_process(of, output_streams[of->ost_index], NULL); + tq_receive_finish(mux->tq, i); + } + + av_log(NULL, AV_LOG_DEBUG, "Terminating muxer thread %d\n", of->index); + + return NULL; +} + +int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof) +{ + AVPacket *pkt1; + int ret = 0; + + if (eof) { + tq_send_finish(of->mux->tq, ost->index); + return 0; + } + + pkt1 = av_packet_alloc(); + if (!pkt1) { + av_packet_unref(pkt); + return AVERROR(ENOMEM); + } + + av_packet_move_ref(pkt1, pkt); + + ret = tq_send(of->mux->tq, ost->index, &pkt1); + if (ret < 0) { + av_packet_free(&pkt1); + ost->finished |= MUXER_FINISHED; + } + + return ret == AVERROR_EOF ? 0 : ret; +} + int of_write_trailer(OutputFile *of) { AVFormatContext *fc = of->mux->fc; @@ -438,7 +537,7 @@ int of_write_trailer(OutputFile *of) return ret; } - of->mux->final_filesize = of_filesize(of); + of->mux->last_filesize = filesize(fc->pb); if (!(of->format->flags & AVFMT_NOFILE)) { ret = avio_closep(&fc->pb); @@ -487,6 +586,9 @@ static void mux_free(Muxer **pmux) av_freep(&mux->streams); av_dict_free(&mux->opts); + if (mux->tq) { + } + fc_close(&mux->fc); av_freep(pmux); @@ -558,30 +660,53 @@ fail: return ret; } -int of_finished(OutputFile *of) +int64_t of_filesize(OutputFile *of) { - return of_filesize(of) >= of->mux->limit_filesize; + return atomic_load(&of->mux->last_filesize); } -int64_t of_filesize(OutputFile *of) +AVChapter * const * +of_get_chapters(OutputFile *of, unsigned int *nb_chapters) { - AVIOContext *pb = of->mux->fc->pb; - int64_t ret = -1; + *nb_chapters = of->mux->fc->nb_chapters; + return of->mux->fc->chapters; +} - if (of->mux->final_filesize) - ret = of->mux->final_filesize; - else if (pb) { - ret = avio_size(pb); - if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too - ret = avio_tell(pb); +static void pkt_free(void *pkt) +{ + av_packet_free((AVPacket**)&pkt); +} + +int of_thread_start(OutputFile *of) +{ + Muxer *mux = of->mux; + int ret; + + mux->tq = tq_alloc(mux->fc->nb_streams, 8, sizeof(AVPacket*), + pkt_free); + if (!mux->tq) + return AVERROR(ENOMEM); + + ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)of); + if (ret) { + tq_free(&mux->tq); + return AVERROR(ret); } - return ret; + return 0; } -AVChapter * const * -of_get_chapters(OutputFile *of, unsigned int *nb_chapters) +void of_thread_stop(OutputFile *of) { - *nb_chapters = of->mux->fc->nb_chapters; - return of->mux->fc->chapters; + Muxer *mux = of->mux; + + if (!mux || !mux->tq) + return; + + for (unsigned int i = 0; i < mux->fc->nb_streams; i++) + tq_send_finish(mux->tq, i); + + pthread_join(mux->thread, NULL); + + tq_free(&mux->tq); } -- 2.34.1 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe".