Just sent an updated patch here: http://ffmpeg.org/pipermail/ffmpeg-devel/2021-December/289684.html
> On Dec 13, 2021, at 3:12 PM, Marvin Scholz <epira...@gmail.com> wrote: > > > > On 13 Dec 2021, at 21:29, Romain Beauxis wrote: > >>> On Dec 13, 2021, at 12:56 PM, Marvin Scholz <epira...@gmail.com> wrote: >>> >>> >>> >>> On 13 Dec 2021, at 17:39, Romain Beauxis wrote: >>> >>>> This is the second patch of a series of 3 that cleanup and enhance the >>>> avfoundation implementation for libavdevice. >>>> >>>> This patch fixes the concurrency model. Avfoundation runs its own >>>> producing thread >>>> to send produced frames and ffmpeg runs its own thread to consume them. >>>> >>>> The existing implementation stores the last transmitted frame and uses a >>>> mutex >>>> to avoid concurrent access. However, this leads to situations where >>>> upcoming frames >>>> can be dropped if the ffmpeg thread is acessing the latest frame. This >>>> happens >>>> even when the thread would otherwise catch up and process frames fast >>>> enought. >>>> >>>> This patches changes this implementation to use a buffer queue with a max >>>> queue length >>>> and encapsulated thread-safety. This greatly simplifies the logic of the >>>> calling code >>>> and gives the consuming thread a chance to process all frames concurrently >>>> to the producing >>>> thread while avoiding memory leaks. >>> >>> Couldn't this just use CMSimpleQueue >>> https://developer.apple.com/documentation/coremedia/cmsimplequeue?language=objc >>> or CMBufferQueue? >> >> I’m happy to switch to this one, which seems more directly related to the >> task at hand if you think it is a better primitive. >> > > I did not check in details but if either of the existing implementations > referred to above do the task you need here, > I would prefer if it is used instead of writing your own implementation. > >>> The implementation of the queue in this patch does not seem right, see >>> review below. >>> >>>> >>>> Signed-off-by: Romain Beauxis <to...@rastageeks.org> >>>> --- >>>> libavdevice/avfoundation.m | 220 +++++++++++++++++++++---------------- >>>> 1 file changed, 127 insertions(+), 93 deletions(-) >>>> >>>> diff --git a/libavdevice/avfoundation.m b/libavdevice/avfoundation.m >>>> index 79c9207cfa..95414fd16a 100644 >>>> --- a/libavdevice/avfoundation.m >>>> +++ b/libavdevice/avfoundation.m >>>> @@ -26,7 +26,6 @@ >>>> */ >>>> >>>> #import <AVFoundation/AVFoundation.h> >>>> -#include <pthread.h> >>>> >>>> #include "libavutil/channel_layout.h" >>>> #include "libavutil/pixdesc.h" >>>> @@ -80,13 +79,97 @@ >>>> { AV_PIX_FMT_NONE, 0 } >>>> }; >>>> >>>> +#define MAX_QUEUED_OBJECTS 10 >>>> + >>>> +@interface AvdeviceAvfoundationBuffer : NSObject >>>> ++ (AvdeviceAvfoundationBuffer *) >>>> fromCMSampleBufferRef:(CMSampleBufferRef)sampleBuffer; >>>> +- (CMSampleBufferRef) getCMSampleBuffer; >>>> +@end >>>> + >>>> +@implementation AvdeviceAvfoundationBuffer { >>>> + CMSampleBufferRef sampleBuffer; >>>> +} >>>> + >>>> ++ (AvdeviceAvfoundationBuffer *) >>>> fromCMSampleBufferRef:(CMSampleBufferRef)sampleBuffer { >>>> + return [[AvdeviceAvfoundationBuffer alloc] init:sampleBuffer]; >>>> +} >>>> + >>>> +- (id) init:(CMSampleBufferRef)buffer { >>>> + sampleBuffer = buffer; >>>> + return self; >>>> +} >>>> + >>>> +- (CMSampleBufferRef) getCMSampleBuffer { >>>> + return sampleBuffer; >>>> +} >>>> +@end >>>> + >>>> +@interface AvdeviceAvfoundationBufferQueue : NSObject >>>> +- (CMSampleBufferRef) dequeue; >>>> +- (NSUInteger) count; >>>> +- (void) enqueue:(CMSampleBufferRef)obj; >>>> +@end >>>> + >>>> +@implementation AvdeviceAvfoundationBufferQueue { >>>> + NSLock *mutex; >>>> + NSMutableArray *queue; >>>> +} >>>> + >>>> +- (id) init { >>>> + mutex = [[[NSLock alloc] init] retain]; >>>> + queue = [[[NSMutableArray alloc] init] retain]; >>>> + return self; >>>> +} >>>> + >>>> +- (oneway void) release { >>>> + NSEnumerator *enumerator = [queue objectEnumerator]; >>>> + AvdeviceAvfoundationBuffer *buffer; >>>> + >>>> + while (buffer = [enumerator nextObject]) { >>>> + CFRelease([buffer getCMSampleBuffer]); >>>> + } >>>> + >>>> + [mutex release]; >>>> + [queue release]; >>>> +} >>> >>> Shouldn't this be done in dealloc instead of release? >>> Especially as retain is not subclassed, so this seems >>> like it could lead to over-releasing resources. >> >> I’m fairly new to objective-c’s memory model, I’ll double check those. >> >>> >>>> + >>>> +- (NSUInteger) count { >>>> + [mutex lock]; >>>> + NSUInteger c = [queue count]; >>>> + [mutex unlock]; >>>> + return c; >>>> +} >>> >>> This does not look right, the count can change after it is returned >>> and the caller does not hold a lock to prevent this. >> >> For a generic queue it is indeed. However, here, it is used in a monotonic >> fashion only: >> * One thread only increases the frame count by pushing new ones >> * One thread only decreases the frame count by pulling existing ones >> > > Ok but then this should be clarified in a comment at least, so that it is > clear how it is > intended to be used to not break its assumptions and introduce bugs in the > future when someone > else is working on this. > >> Frame count is only used by the consuming thread and only to check if there >> are available frames, therefore, the logic is always sound, i.e. if the >> frame count never decreases concurrently. >> >> This being said, after writing that it seems that this could be simplified >> into a Boolean logic that simply says true when frames are available, along >> with a code comment restating the above. I’ll do that with the next >> iteration of the patch set. >> >>> >>>> + >>>> +- (CMSampleBufferRef) dequeue { >>>> + [mutex lock]; >>>> + >>>> + if ([queue count] < 1) { >>>> + [mutex unlock]; >>>> + return nil; >>>> + } >>>> + >>>> + AvdeviceAvfoundationBuffer *buffer = [queue objectAtIndex:0]; >>>> + CMSampleBufferRef sampleBuffer = [buffer getCMSampleBuffer]; >>>> + [queue removeObjectAtIndex:0]; >>>> + [mutex unlock]; >>>> + >>>> + return sampleBuffer; >>>> +} >>>> + >>>> +- (void) enqueue:(CMSampleBufferRef)buffer { >>>> + [mutex lock]; >>>> + while (MAX_QUEUED_OBJECTS < [queue count]) { >>>> + [queue removeObjectAtIndex:0]; >>>> + } >>>> + [queue addObject:[AvdeviceAvfoundationBuffer >>>> fromCMSampleBufferRef:(CMSampleBufferRef)CFRetain(buffer)]]; >>>> + [mutex unlock]; >>>> +} >>>> +@end >>>> + >>>> typedef struct >>>> { >>>> AVClass* class; >>>> >>>> - int frames_captured; >>>> - int audio_frames_captured; >>>> - pthread_mutex_t frame_lock; >>>> id avf_delegate; >>>> id avf_audio_delegate; >>>> >>>> @@ -121,8 +204,8 @@ >>>> AVCaptureSession *capture_session; >>>> AVCaptureVideoDataOutput *video_output; >>>> AVCaptureAudioDataOutput *audio_output; >>>> - CMSampleBufferRef current_frame; >>>> - CMSampleBufferRef current_audio_frame; >>>> + AvdeviceAvfoundationBufferQueue *audio_frames; >>>> + AvdeviceAvfoundationBufferQueue *video_frames; >>>> >>>> AVCaptureDevice *observed_device; >>>> #if !TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED >= 1070 >>>> @@ -131,16 +214,6 @@ >>>> int observed_quit; >>>> } AVFContext; >>>> >>>> -static void lock_frames(AVFContext* ctx) >>>> -{ >>>> - pthread_mutex_lock(&ctx->frame_lock); >>>> -} >>>> - >>>> -static void unlock_frames(AVFContext* ctx) >>>> -{ >>>> - pthread_mutex_unlock(&ctx->frame_lock); >>>> -} >>>> - >>>> /** FrameReciever class - delegate for AVCaptureSession >>>> */ >>>> @interface AVFFrameReceiver : NSObject >>>> @@ -218,17 +291,7 @@ - (void) captureOutput:(AVCaptureOutput >>>> *)captureOutput >>>> didOutputSampleBuffer:(CMSampleBufferRef)videoFrame >>>> fromConnection:(AVCaptureConnection *)connection >>>> { >>>> - lock_frames(_context); >>>> - >>>> - if (_context->current_frame != nil) { >>>> - CFRelease(_context->current_frame); >>>> - } >>>> - >>>> - _context->current_frame = (CMSampleBufferRef)CFRetain(videoFrame); >>>> - >>>> - unlock_frames(_context); >>>> - >>>> - ++_context->frames_captured; >>>> + [_context->video_frames enqueue:videoFrame]; >>>> } >>>> >>>> @end >>>> @@ -262,17 +325,7 @@ - (void) captureOutput:(AVCaptureOutput >>>> *)captureOutput >>>> didOutputSampleBuffer:(CMSampleBufferRef)audioFrame >>>> fromConnection:(AVCaptureConnection *)connection >>>> { >>>> - lock_frames(_context); >>>> - >>>> - if (_context->current_audio_frame != nil) { >>>> - CFRelease(_context->current_audio_frame); >>>> - } >>>> - >>>> - _context->current_audio_frame = >>>> (CMSampleBufferRef)CFRetain(audioFrame); >>>> - >>>> - unlock_frames(_context); >>>> - >>>> - ++_context->audio_frames_captured; >>>> + [_context->audio_frames enqueue:audioFrame]; >>>> } >>>> >>>> @end >>>> @@ -284,12 +337,16 @@ static void destroy_context(AVFContext* ctx) >>>> [ctx->capture_session release]; >>>> [ctx->video_output release]; >>>> [ctx->audio_output release]; >>>> + [ctx->video_frames release]; >>>> + [ctx->audio_frames release]; >>>> [ctx->avf_delegate release]; >>>> [ctx->avf_audio_delegate release]; >>>> >>>> ctx->capture_session = NULL; >>>> ctx->video_output = NULL; >>>> ctx->audio_output = NULL; >>>> + ctx->video_frames = NULL; >>>> + ctx->audio_frames = NULL; >>>> ctx->avf_delegate = NULL; >>>> ctx->avf_audio_delegate = NULL; >>>> >>>> @@ -297,12 +354,6 @@ static void destroy_context(AVFContext* ctx) >>>> AudioConverterDispose(ctx->audio_converter); >>>> ctx->audio_converter = NULL; >>>> } >>>> - >>>> - pthread_mutex_destroy(&ctx->frame_lock); >>>> - >>>> - if (ctx->current_frame) { >>>> - CFRelease(ctx->current_frame); >>>> - } >>>> } >>>> >>>> static void parse_device_name(AVFormatContext *s) >>>> @@ -630,18 +681,18 @@ static int get_video_config(AVFormatContext *s) >>>> } >>>> >>>> // Take stream info from the first frame. >>>> - while (ctx->frames_captured < 1) { >>>> + while ([ctx->video_frames count] < 1) { >>>> CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.1, YES); >>>> } >>>> >>>> - lock_frames(ctx); >>>> + CMSampleBufferRef frame = [ctx->video_frames dequeue]; >>>> >>>> ctx->video_stream_index = stream->index; >>>> >>>> avpriv_set_pts_info(stream, 64, 1, avf_time_base); >>>> >>>> - image_buffer = CMSampleBufferGetImageBuffer(ctx->current_frame); >>>> - block_buffer = CMSampleBufferGetDataBuffer(ctx->current_frame); >>>> + image_buffer = CMSampleBufferGetImageBuffer(frame); >>>> + block_buffer = CMSampleBufferGetDataBuffer(frame); >>>> >>>> if (image_buffer) { >>>> image_buffer_size = CVImageBufferGetEncodedSize(image_buffer); >>>> @@ -657,10 +708,7 @@ static int get_video_config(AVFormatContext *s) >>>> stream->codecpar->format = ctx->pixel_format; >>>> } >>>> >>>> - CFRelease(ctx->current_frame); >>>> - ctx->current_frame = nil; >>>> - >>>> - unlock_frames(ctx); >>>> + CFRelease(frame); >>>> >>>> return 0; >>>> } >>>> @@ -680,27 +728,27 @@ static int get_audio_config(AVFormatContext *s) >>>> } >>>> >>>> // Take stream info from the first frame. >>>> - while (ctx->audio_frames_captured < 1) { >>>> + while ([ctx->audio_frames count] < 1) { >>>> CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.1, YES); >>>> } >>>> >>>> - lock_frames(ctx); >>>> + CMSampleBufferRef frame = [ctx->audio_frames dequeue]; >>>> >>>> ctx->audio_stream_index = stream->index; >>>> >>>> avpriv_set_pts_info(stream, 64, 1, avf_time_base); >>>> >>>> - format_desc = >>>> CMSampleBufferGetFormatDescription(ctx->current_audio_frame); >>>> + format_desc = CMSampleBufferGetFormatDescription(frame); >>>> const AudioStreamBasicDescription *input_format = >>>> CMAudioFormatDescriptionGetStreamBasicDescription(format_desc); >>>> >>>> if (!input_format) { >>>> - unlock_frames(ctx); >>>> + CFRelease(frame); >>>> av_log(s, AV_LOG_ERROR, "audio format not available\n"); >>>> return 1; >>>> } >>>> >>>> if (input_format->mFormatID != kAudioFormatLinearPCM) { >>>> - unlock_frames(ctx); >>>> + CFRelease(frame); >>>> av_log(s, AV_LOG_ERROR, "only PCM audio format are supported at the >>>> moment\n"); >>>> return 1; >>>> } >>>> @@ -778,16 +826,13 @@ static int get_audio_config(AVFormatContext *s) >>>> if (must_convert) { >>>> OSStatus ret = AudioConverterNew(input_format, &output_format, >>>> &ctx->audio_converter); >>>> if (ret != noErr) { >>>> - unlock_frames(ctx); >>>> + CFRelease(frame); >>>> av_log(s, AV_LOG_ERROR, "Error while allocating audio >>>> converter\n"); >>>> return 1; >>>> } >>>> } >>>> >>>> - CFRelease(ctx->current_audio_frame); >>>> - ctx->current_audio_frame = nil; >>>> - >>>> - unlock_frames(ctx); >>>> + CFRelease(frame); >>>> >>>> return 0; >>>> } >>>> @@ -805,8 +850,6 @@ static int avf_read_header(AVFormatContext *s) >>>> >>>> ctx->num_video_devices = [devices count] + [devices_muxed count]; >>>> >>>> - pthread_mutex_init(&ctx->frame_lock, NULL); >>>> - >>>> #if !TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED >= 1070 >>>> CGGetActiveDisplayList(0, NULL, &num_screens); >>>> #endif >>>> @@ -1006,6 +1049,8 @@ static int avf_read_header(AVFormatContext *s) >>>> >>>> // Initialize capture session >>>> ctx->capture_session = [[AVCaptureSession alloc] init]; >>>> + ctx->video_frames = [[AvdeviceAvfoundationBufferQueue alloc] init]; >>>> + ctx->audio_frames = [[AvdeviceAvfoundationBufferQueue alloc] init]; >>>> >>>> if (video_device && add_video_device(s, video_device)) { >>>> goto fail; >>>> @@ -1088,35 +1133,31 @@ static int avf_read_packet(AVFormatContext *s, >>>> AVPacket *pkt) >>>> AVFContext* ctx = (AVFContext*)s->priv_data; >>>> >>>> do { >>>> - CVImageBufferRef image_buffer; >>>> - CMBlockBufferRef block_buffer; >>>> - lock_frames(ctx); >>>> - >>>> - if (ctx->current_frame != nil) { >>>> + if (1 <= [ctx->video_frames count]) { >>>> int status; >>>> int length = 0; >>>> - >>>> - image_buffer = >>>> CMSampleBufferGetImageBuffer(ctx->current_frame); >>>> - block_buffer = >>>> CMSampleBufferGetDataBuffer(ctx->current_frame); >>>> + CMSampleBufferRef video_frame = [ctx->video_frames dequeue]; >>>> + CVImageBufferRef image_buffer = >>>> CMSampleBufferGetImageBuffer(video_frame);; >>>> + CMBlockBufferRef block_buffer = >>>> CMSampleBufferGetDataBuffer(video_frame); >>>> >>>> if (image_buffer != nil) { >>>> length = (int)CVPixelBufferGetDataSize(image_buffer); >>>> } else if (block_buffer != nil) { >>>> length = (int)CMBlockBufferGetDataLength(block_buffer); >>>> } else { >>>> - unlock_frames(ctx); >>>> + CFRelease(video_frame); >>>> return AVERROR(EINVAL); >>>> } >>>> >>>> if (av_new_packet(pkt, length) < 0) { >>>> - unlock_frames(ctx); >>>> + CFRelease(video_frame); >>>> return AVERROR(EIO); >>>> } >>>> >>>> CMItemCount count; >>>> CMSampleTimingInfo timing_info; >>>> >>>> - if >>>> (CMSampleBufferGetOutputSampleTimingInfoArray(ctx->current_frame, 1, >>>> &timing_info, &count) == noErr) { >>>> + if (CMSampleBufferGetOutputSampleTimingInfoArray(video_frame, >>>> 1, &timing_info, &count) == noErr) { >>>> AVRational timebase_q = av_make_q(1, >>>> timing_info.presentationTimeStamp.timescale); >>>> pkt->pts = pkt->dts = >>>> av_rescale_q(timing_info.presentationTimeStamp.value, timebase_q, >>>> avf_time_base_q); >>>> } >>>> @@ -1133,15 +1174,14 @@ static int avf_read_packet(AVFormatContext *s, >>>> AVPacket *pkt) >>>> status = AVERROR(EIO); >>>> } >>>> } >>>> - CFRelease(ctx->current_frame); >>>> - ctx->current_frame = nil; >>>> + CFRelease(video_frame); >>>> >>>> if (status < 0) { >>>> - unlock_frames(ctx); >>>> return status; >>>> } >>>> - } else if (ctx->current_audio_frame != nil) { >>>> - CMBlockBufferRef block_buffer = >>>> CMSampleBufferGetDataBuffer(ctx->current_audio_frame); >>>> + } else if (1 <= [ctx->audio_frames count]) { >>>> + CMSampleBufferRef audio_frame = [ctx->audio_frames dequeue]; >>>> + CMBlockBufferRef block_buffer = >>>> CMSampleBufferGetDataBuffer(audio_frame); >>>> >>>> size_t input_size = CMBlockBufferGetDataLength(block_buffer); >>>> int buffer_size = input_size / ctx->audio_buffers; >>>> @@ -1151,12 +1191,12 @@ static int avf_read_packet(AVFormatContext *s, >>>> AVPacket *pkt) >>>> UInt32 size = sizeof(output_size); >>>> ret = AudioConverterGetProperty(ctx->audio_converter, >>>> kAudioConverterPropertyCalculateOutputBufferSize, &size, &output_size); >>>> if (ret != noErr) { >>>> - unlock_frames(ctx); >>>> + CFRelease(audio_frame); >>>> return AVERROR(EIO); >>>> } >>>> >>>> if (av_new_packet(pkt, output_size) < 0) { >>>> - unlock_frames(ctx); >>>> + CFRelease(audio_frame); >>>> return AVERROR(EIO); >>>> } >>>> >>>> @@ -1173,7 +1213,7 @@ static int avf_read_packet(AVFormatContext *s, >>>> AVPacket *pkt) >>>> >>>> if (ret != kCMBlockBufferNoErr) { >>>> av_free(input_buffer); >>>> - unlock_frames(ctx); >>>> + CFRelease(audio_frame); >>>> return AVERROR(EIO); >>>> } >>>> } >>>> @@ -1191,7 +1231,7 @@ static int avf_read_packet(AVFormatContext *s, >>>> AVPacket *pkt) >>>> av_free(input_buffer); >>>> >>>> if (ret != noErr) { >>>> - unlock_frames(ctx); >>>> + CFRelease(audio_frame); >>>> return AVERROR(EIO); >>>> } >>>> >>>> @@ -1199,7 +1239,7 @@ static int avf_read_packet(AVFormatContext *s, >>>> AVPacket *pkt) >>>> } else { >>>> ret = CMBlockBufferCopyDataBytes(block_buffer, 0, >>>> pkt->size, pkt->data); >>>> if (ret != kCMBlockBufferNoErr) { >>>> - unlock_frames(ctx); >>>> + CFRelease(audio_frame); >>>> return AVERROR(EIO); >>>> } >>>> } >>>> @@ -1207,7 +1247,7 @@ static int avf_read_packet(AVFormatContext *s, >>>> AVPacket *pkt) >>>> CMItemCount count; >>>> CMSampleTimingInfo timing_info; >>>> >>>> - if >>>> (CMSampleBufferGetOutputSampleTimingInfoArray(ctx->current_audio_frame, 1, >>>> &timing_info, &count) == noErr) { >>>> + if (CMSampleBufferGetOutputSampleTimingInfoArray(audio_frame, >>>> 1, &timing_info, &count) == noErr) { >>>> AVRational timebase_q = av_make_q(1, >>>> timing_info.presentationTimeStamp.timescale); >>>> pkt->pts = pkt->dts = >>>> av_rescale_q(timing_info.presentationTimeStamp.value, timebase_q, >>>> avf_time_base_q); >>>> } >>>> @@ -1215,21 +1255,15 @@ static int avf_read_packet(AVFormatContext *s, >>>> AVPacket *pkt) >>>> pkt->stream_index = ctx->audio_stream_index; >>>> pkt->flags |= AV_PKT_FLAG_KEY; >>>> >>>> - CFRelease(ctx->current_audio_frame); >>>> - ctx->current_audio_frame = nil; >>>> - >>>> - unlock_frames(ctx); >>>> + CFRelease(audio_frame); >>>> } else { >>>> pkt->data = NULL; >>>> - unlock_frames(ctx); >>>> if (ctx->observed_quit) { >>>> return AVERROR_EOF; >>>> } else { >>>> return AVERROR(EAGAIN); >>>> } >>>> } >>>> - >>>> - unlock_frames(ctx); >>>> } while (!pkt->data); >>>> >>>> return 0; >>>> -- >>>> 2.30.1 (Apple Git-130) >>>> >>>> _______________________________________________ >>>> ffmpeg-devel mailing list >>>> ffmpeg-devel@ffmpeg.org >>>> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel >>>> >>>> To unsubscribe, visit link above, or email >>>> ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe". >>> _______________________________________________ >>> ffmpeg-devel mailing list >>> ffmpeg-devel@ffmpeg.org >>> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel >>> >>> To unsubscribe, visit link above, or email >>> ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe". >> >> _______________________________________________ >> ffmpeg-devel mailing list >> ffmpeg-devel@ffmpeg.org >> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel >> >> To unsubscribe, visit link above, or email >> ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe". > _______________________________________________ > ffmpeg-devel mailing list > ffmpeg-devel@ffmpeg.org > https://ffmpeg.org/mailman/listinfo/ffmpeg-devel > > To unsubscribe, visit link above, or email > ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe". _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-requ...@ffmpeg.org with subject "unsubscribe".