Signed-off-by: Stephan Holljes <klaxa1...@googlemail.com> --- ffserver.c | 248 ++++++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 172 insertions(+), 76 deletions(-)
diff --git a/ffserver.c b/ffserver.c index b80a7f8..1363cdc 100644 --- a/ffserver.c +++ b/ffserver.c @@ -38,6 +38,7 @@ #include "segment.h" #include "publisher.h" #include "httpd.h" +#include "configreader.h" #define BUFFER_SECS 30 #define LISTEN_TIMEOUT_MSEC 1000 @@ -54,9 +55,11 @@ struct WriteInfo { }; struct AcceptInfo { - struct PublisherContext *pub; + struct PublisherContext **pubs; struct HTTPDInterface *httpd; - AVFormatContext *ifmt_ctx; + AVFormatContext **ifmt_ctxs; + struct HTTPDConfig *config; + int nb_pub; /* number of publishers (streams) equal to number of ifmt_ctx */ }; @@ -286,52 +289,77 @@ void *accept_thread(void *arg) { struct AcceptInfo *info = (struct AcceptInfo*) arg; struct FFServerInfo *ffinfo = NULL; + struct PublisherContext *pub; char status[4096]; + char *stream_name; struct HTTPClient *client = NULL; void *server = NULL; AVIOContext *client_ctx = NULL; AVFormatContext *ofmt_ctx = NULL; + AVFormatContext *ifmt_ctx; unsigned char *avio_buffer; AVOutputFormat *ofmt; AVDictionary *mkvopts = NULL; AVStream *in_stream, *out_stream; int ret, i, reply_code; - struct HTTPDConfig config = { - .bind_address = "0", - .port = 8080, - .accept_timeout = LISTEN_TIMEOUT_MSEC, - }; - - info->httpd->init(&server, config); - - + int shutdown; + struct HTTPDConfig *config = info->config; + + info->httpd->init(&server, *config); + for (;;) { - if (info->pub->shutdown) + shutdown = 1; + for (i = 0; i < config->nb_streams; i++) { + if (info->pubs[i] && !info->pubs[i]->shutdown) + shutdown = 0; + } + if (shutdown) break; - publisher_gen_status_json(info->pub, status); - av_log(server, AV_LOG_INFO, status); + for (i = 0; i < config->nb_streams; i++) { + publisher_gen_status_json(info->pubs[i], status); + av_log(server, AV_LOG_INFO, status); + } client = NULL; av_log(server, AV_LOG_DEBUG, "Accepting new clients.\n"); reply_code = 200; - if (publisher_reserve_client(info->pub)) { - av_log(client, AV_LOG_WARNING, "No more client slots free, Returning 503.\n"); - reply_code = 503; - } - + if ((ret = info->httpd->accept(server, &client, reply_code)) < 0) { if (ret == HTTPD_LISTEN_TIMEOUT) { - publisher_cancel_reserve(info->pub); continue; } else if (ret == HTTPD_CLIENT_ERROR) { info->httpd->close(server, client); } av_log(server, AV_LOG_WARNING, "Error during accept, retrying.\n"); - publisher_cancel_reserve(info->pub); continue; } - + + pub = NULL; + ifmt_ctx = NULL; + for (i = 0; i < config->nb_streams; i++) { + stream_name = info->pubs[i]->stream_name; + // skip leading '/' ---v + if(!strncmp(client->resource + 1, stream_name, strlen(stream_name))) { + pub = info->pubs[i]; + ifmt_ctx = info->ifmt_ctxs[i]; + break; + } + } + + if (!pub || !ifmt_ctx) { + av_log(client_ctx, AV_LOG_WARNING, "No suitable publisher found for resource: %s.\n", + client->resource ? client->resource : "(null)"); + reply_code = 404; + } + + + if (pub && ifmt_ctx && publisher_reserve_client(pub)) { + av_log(client_ctx, AV_LOG_WARNING, "No more client slots free, Returning 503.\n"); + reply_code = 503; + } + if (reply_code != 200) { - publisher_cancel_reserve(info->pub); + if (pub && ifmt_ctx) + publisher_cancel_reserve(pub); info->httpd->close(server, client); continue; } @@ -344,7 +372,7 @@ void *accept_thread(void *arg) client_ctx = avio_alloc_context(avio_buffer, AV_BUFSIZE, 1, ffinfo, NULL, &ffserver_write, NULL); if (!client_ctx) { av_log(client, AV_LOG_ERROR, "Could not allocate output format context.\n"); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); av_free(client_ctx->buffer); avio_context_free(&client_ctx); @@ -354,7 +382,7 @@ void *accept_thread(void *arg) avformat_alloc_output_context2(&ofmt_ctx, NULL, "matroska", NULL); if (!ofmt_ctx) { av_log(client, AV_LOG_ERROR, "Could not allocate output format context.\n"); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); avformat_free_context(ofmt_ctx); av_free(client_ctx->buffer); @@ -364,7 +392,7 @@ void *accept_thread(void *arg) } if ((ret = av_dict_set(&mkvopts, "live", "1", 0)) < 0) { av_log(client, AV_LOG_ERROR, "Failed to set live mode for matroska: %s\n", av_err2str(ret)); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); avformat_free_context(ofmt_ctx); av_free(client_ctx->buffer); @@ -376,13 +404,13 @@ void *accept_thread(void *arg) ofmt = ofmt_ctx->oformat; ofmt->flags |= AVFMT_NOFILE | AVFMT_FLAG_AUTO_BSF; - for (i = 0; i < info->ifmt_ctx->nb_streams; i++) { - in_stream = info->ifmt_ctx->streams[i]; + for (i = 0; i < ifmt_ctx->nb_streams; i++) { + in_stream = ifmt_ctx->streams[i]; out_stream = avformat_new_stream(ofmt_ctx, NULL); if (!out_stream) { av_log(client, AV_LOG_ERROR, "Could not allocate output stream.\n"); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); avformat_free_context(ofmt_ctx); av_free(client_ctx->buffer); @@ -394,7 +422,7 @@ void *accept_thread(void *arg) ret = avcodec_parameters_copy(out_stream->codecpar, in_stream->codecpar); if (ret < 0) { av_log(client, AV_LOG_ERROR, "Failed to copy context from input to output stream codec context: %s.\n", av_err2str(ret)); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); avformat_free_context(ofmt_ctx); av_free(client_ctx->buffer); @@ -411,12 +439,12 @@ void *accept_thread(void *arg) } av_dict_copy(&out_stream->metadata, in_stream->metadata, 0); } - av_dict_copy(&info->ifmt_ctx->metadata, ofmt_ctx->metadata, 0); + av_dict_copy(&ifmt_ctx->metadata, ofmt_ctx->metadata, 0); ofmt_ctx->pb = client_ctx; ret = avformat_write_header(ofmt_ctx, &mkvopts); if (ret < 0) { av_log(client, AV_LOG_ERROR, "Could not write header to client: %s.\n", av_err2str(ret)); - publisher_cancel_reserve(info->pub); + publisher_cancel_reserve(pub); info->httpd->close(server, client); avformat_free_context(ofmt_ctx); av_free(client_ctx->buffer); @@ -424,7 +452,7 @@ void *accept_thread(void *arg) av_free(ffinfo); continue; } - publisher_add_client(info->pub, ofmt_ctx, ffinfo); + publisher_add_client(pub, ofmt_ctx, ffinfo); ofmt_ctx = NULL; } @@ -466,59 +494,127 @@ void *write_thread(void *arg) return NULL; } - -int main(int argc, char *argv[]) -{ - struct ReadInfo rinfo; +void *run_server(void *arg) { struct AcceptInfo ainfo; - struct WriteInfo *winfos; - struct PublisherContext *pub; - int ret, i; - pthread_t r_thread, a_thread; - pthread_t *w_threads; + struct ReadInfo *rinfos; + struct WriteInfo **winfos_p; + struct HTTPDConfig *config = (struct HTTPDConfig*) arg; + struct PublisherContext **pubs; + AVFormatContext **ifmt_ctxs; + int ret, i, stream_index; + pthread_t *r_threads; + pthread_t **w_threads_p; - AVFormatContext *ifmt_ctx = NULL; - - rinfo.in_filename = "pipe:0"; - if (argc > 1) - rinfo.in_filename = argv[1]; + pubs = av_mallocz(config->nb_streams * sizeof(struct PublisherContext*)); + ifmt_ctxs = av_mallocz(config->nb_streams * sizeof(AVFormatContext*)); av_log_set_level(AV_LOG_INFO); - if ((ret = avformat_open_input(&ifmt_ctx, rinfo.in_filename, NULL, NULL))) { - av_log(NULL, AV_LOG_ERROR, "main: Could not open input\n"); - return 1; - } - - publisher_init(&pub); - - rinfo.ifmt_ctx = ifmt_ctx; - rinfo.pub = pub; - ainfo.ifmt_ctx = ifmt_ctx; - ainfo.pub = pub; + ainfo.pubs = pubs; + ainfo.ifmt_ctxs = ifmt_ctxs; + ainfo.nb_pub = config->nb_streams; ainfo.httpd = &lavfhttpd; + ainfo.config = config; - w_threads = (pthread_t*) av_malloc(sizeof(pthread_t) * pub->nb_threads); - winfos = (struct WriteInfo*) av_malloc(sizeof(struct WriteInfo) * pub->nb_threads); + rinfos = av_mallocz(config->nb_streams * sizeof(struct ReadInfo)); + winfos_p = av_mallocz(config->nb_streams * sizeof(struct WriteInfo*)); + r_threads = av_mallocz(config->nb_streams * sizeof(pthread_t)); + w_threads_p = av_mallocz(config->nb_streams * sizeof(pthread_t*)); - for (i = 0; i < pub->nb_threads; i++) { - winfos[i].pub = pub; - winfos[i].thread_id = i; - pthread_create(&w_threads[i], NULL, write_thread, &winfos[i]); + for (stream_index = 0; stream_index < config->nb_streams; stream_index++) { + struct PublisherContext *pub = NULL; + struct AVFormatContext *ifmt_ctx = NULL; + struct ReadInfo rinfo; + struct WriteInfo *winfos = NULL; + pthread_t *w_threads = NULL; + pthread_t r_thread; + rinfo.input_uri = config->streams[stream_index].input_uri; + + if ((ret = avformat_open_input(&ifmt_ctx, rinfo.input_uri, NULL, NULL))) { + av_log(NULL, AV_LOG_ERROR, "run_server: Could not open input\n"); + continue; + } + + ifmt_ctxs[stream_index] = ifmt_ctx; + + publisher_init(&pub, config->streams[stream_index].stream_name); + pubs[stream_index] = pub; + + rinfo.ifmt_ctx = ifmt_ctx; + rinfo.pub = pub; + + rinfos[stream_index] = rinfo; + + w_threads = av_malloc(sizeof(pthread_t) * pub->nb_threads); + winfos = av_malloc(sizeof(struct WriteInfo) * pub->nb_threads); + + w_threads_p[stream_index] = w_threads; + winfos_p[stream_index] = winfos; + + for (i = 0; i < pub->nb_threads; i++) { + winfos[i].pub = pub; + winfos[i].thread_id = i; + pthread_create(&w_threads[i], NULL, write_thread, &winfos_p[stream_index][i]); + } + w_threads_p[stream_index] = w_threads; + pthread_create(&r_thread, NULL, read_thread, &rinfos[stream_index]); + r_threads[stream_index] = r_thread; } - - pthread_create(&r_thread, NULL, read_thread, &rinfo); - + + + //pthread_create(&a_thread, NULL, accept_thread, &ainfo); accept_thread(&ainfo); - - pthread_join(r_thread, NULL); - - for (i = 0; i < pub->nb_threads; i++) { - pthread_join(w_threads[i], NULL); + for (stream_index = 0; stream_index < config->nb_streams; stream_index++) { + pthread_join(r_threads[stream_index], NULL); + if (pubs[stream_index]) { + for (i = 0; i < pubs[stream_index]->nb_threads; i++) { + pthread_join(w_threads_p[stream_index][i], NULL); + } + } + av_free(winfos_p[stream_index]); + av_free(w_threads_p[stream_index]); + // pubs[stream_index] could be null if the file could not be opened + if (pubs[stream_index]) + publisher_free(pubs[stream_index]); } - av_free(w_threads); - av_free(winfos); - - publisher_freep(&pub); + av_free(rinfos); + av_free(winfos_p); + av_free(r_threads); + av_free(w_threads_p); + av_free(pubs); + av_free(ifmt_ctxs); + + return NULL; +} + +int main(int argc, char *argv[]) +{ + struct HTTPDConfig *configs; + int nb_configs; + pthread_t *server_threads; + int i; + + if (argc < 2) { + printf("Usage: %s config.lua\n", argv[0]); + return 1; + } + + nb_configs = configs_read(&configs, argv[1]); + if (nb_configs <= 0) { + printf("No valid configurations parsed.\n"); + return 1; + } + server_threads = av_malloc(nb_configs * sizeof(pthread_t)); + for (i = 0; i < nb_configs; i++) { + config_dump(configs + i); + pthread_create(&server_threads[i], NULL, run_server, configs + i); + } + + for (i = 0; i < nb_configs; i++) { + pthread_join(server_threads[i], NULL); + config_free(configs + i); + } + av_free(configs); + av_free(server_threads); return 0; } -- 2.16.2 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org http://ffmpeg.org/mailman/listinfo/ffmpeg-devel