Just sent an updated patch here: 

> On Dec 13, 2021, at 3:12 PM, Marvin Scholz <epira...@gmail.com> wrote:
> On 13 Dec 2021, at 21:29, Romain Beauxis wrote:
>>> On Dec 13, 2021, at 12:56 PM, Marvin Scholz <epira...@gmail.com> wrote:
>>> On 13 Dec 2021, at 17:39, Romain Beauxis wrote:
>>>> This is the second patch of a series of 3 that cleanup and enhance the
>>>> avfoundation implementation for libavdevice.
>>>> This patch fixes the concurrency model. Avfoundation runs its own 
>>>> producing thread
>>>> to send produced frames and ffmpeg runs its own thread to consume them.
>>>> The existing implementation stores the last transmitted frame and uses a 
>>>> mutex
>>>> to avoid concurrent access. However, this leads to situations where 
>>>> upcoming frames
>>>> can be dropped if the ffmpeg thread is acessing the latest frame. This 
>>>> happens
>>>> even when the thread would otherwise catch up and process frames fast 
>>>> enought.
>>>> This patches changes this implementation to use a buffer queue with a max 
>>>> queue length
>>>> and encapsulated thread-safety. This greatly simplifies the logic of the 
>>>> calling code
>>>> and gives the consuming thread a chance to process all frames concurrently 
>>>> to the producing
>>>> thread while avoiding memory leaks.
>>> Couldn't this just use CMSimpleQueue 
>>> https://developer.apple.com/documentation/coremedia/cmsimplequeue?language=objc
>>> or CMBufferQueue?
>> I’m happy to switch to this one, which seems more directly related to the 
>> task at hand if you think it is a better primitive.
> I did not check in details but if either of the existing implementations 
> referred to above do the task you need here,
> I would prefer if it is used instead of writing your own implementation.
>>> The implementation of the queue in this patch does not seem right, see 
>>> review below.
>>>> Signed-off-by: Romain Beauxis <to...@rastageeks.org>
>>>> ---
>>>> libavdevice/avfoundation.m | 220 +++++++++++++++++++++----------------
>>>> 1 file changed, 127 insertions(+), 93 deletions(-)
>>>> diff --git a/libavdevice/avfoundation.m b/libavdevice/avfoundation.m
>>>> index 79c9207cfa..95414fd16a 100644
>>>> --- a/libavdevice/avfoundation.m
>>>> +++ b/libavdevice/avfoundation.m
>>>> @@ -26,7 +26,6 @@
>>>> */
>>>> #import <AVFoundation/AVFoundation.h>
>>>> -#include <pthread.h>
>>>> #include "libavutil/channel_layout.h"
>>>> #include "libavutil/pixdesc.h"
>>>> @@ -80,13 +79,97 @@
>>>>   { AV_PIX_FMT_NONE, 0 }
>>>> };
>>>> +#define MAX_QUEUED_OBJECTS 10
>>>> +
>>>> +@interface AvdeviceAvfoundationBuffer : NSObject
>>>> ++ (AvdeviceAvfoundationBuffer *) 
>>>> fromCMSampleBufferRef:(CMSampleBufferRef)sampleBuffer;
>>>> +- (CMSampleBufferRef) getCMSampleBuffer;
>>>> +@end
>>>> +
>>>> +@implementation AvdeviceAvfoundationBuffer {
>>>> +    CMSampleBufferRef sampleBuffer;
>>>> +}
>>>> +
>>>> ++ (AvdeviceAvfoundationBuffer *) 
>>>> fromCMSampleBufferRef:(CMSampleBufferRef)sampleBuffer {
>>>> +    return [[AvdeviceAvfoundationBuffer alloc] init:sampleBuffer];
>>>> +}
>>>> +
>>>> +- (id) init:(CMSampleBufferRef)buffer {
>>>> +    sampleBuffer = buffer;
>>>> +    return self;
>>>> +}
>>>> +
>>>> +- (CMSampleBufferRef) getCMSampleBuffer {
>>>> +    return sampleBuffer;
>>>> +}
>>>> +@end
>>>> +
>>>> +@interface AvdeviceAvfoundationBufferQueue : NSObject
>>>> +- (CMSampleBufferRef) dequeue;
>>>> +- (NSUInteger) count;
>>>> +- (void) enqueue:(CMSampleBufferRef)obj;
>>>> +@end
>>>> +
>>>> +@implementation AvdeviceAvfoundationBufferQueue {
>>>> +    NSLock *mutex;
>>>> +    NSMutableArray *queue;
>>>> +}
>>>> +
>>>> +- (id) init {
>>>> +    mutex = [[[NSLock alloc] init] retain];
>>>> +    queue = [[[NSMutableArray alloc] init] retain];
>>>> +    return self;
>>>> +}
>>>> +
>>>> +- (oneway void) release {
>>>> +    NSEnumerator *enumerator = [queue objectEnumerator];
>>>> +    AvdeviceAvfoundationBuffer *buffer;
>>>> +
>>>> +    while (buffer = [enumerator nextObject]) {
>>>> +        CFRelease([buffer getCMSampleBuffer]);
>>>> +    }
>>>> +
>>>> +    [mutex release];
>>>> +    [queue release];
>>>> +}
>>> Shouldn't this be done in dealloc instead of release?
>>> Especially as retain is not subclassed, so this seems
>>> like it could lead to over-releasing resources.
>> I’m fairly new to objective-c’s memory model, I’ll double check those.
>>>> +
>>>> +- (NSUInteger) count {
>>>> +    [mutex lock];
>>>> +    NSUInteger c = [queue count];
>>>> +    [mutex unlock];
>>>> +    return c;
>>>> +}
>>> This does not look right, the count can change after it is returned
>>> and the caller does not hold a lock to prevent this.
>> For a generic queue it is indeed. However, here, it is used in a monotonic 
>> fashion only:
>> * One thread only increases the frame count by pushing new ones
>> * One thread only decreases the frame count by pulling existing ones
> Ok but then this should be clarified in a comment at least, so that it is 
> clear how it is
> intended to be used to not break its assumptions and introduce bugs in the 
> future when someone
> else is working on this.
>> Frame count is only used by the consuming thread and only to check if there 
>> are available frames, therefore, the logic is always sound, i.e. if the 
>> frame count never decreases concurrently.
>> This being said, after writing that it seems that this could be simplified 
>> into a Boolean logic that simply says true when frames are available, along 
>> with a code comment restating the above. I’ll do that with the next 
>> iteration of the patch set.
>>>> +
>>>> +- (CMSampleBufferRef) dequeue {
>>>> +    [mutex lock];
>>>> +
>>>> +    if ([queue count] < 1) {
>>>> +      [mutex unlock];
>>>> +      return nil;
>>>> +    }
>>>> +
>>>> +    AvdeviceAvfoundationBuffer *buffer = [queue objectAtIndex:0];
>>>> +    CMSampleBufferRef sampleBuffer = [buffer getCMSampleBuffer];
>>>> +    [queue removeObjectAtIndex:0];
>>>> +    [mutex unlock];
>>>> +
>>>> +    return sampleBuffer;
>>>> +}
>>>> +
>>>> +- (void) enqueue:(CMSampleBufferRef)buffer {
>>>> +    [mutex lock];
>>>> +    while (MAX_QUEUED_OBJECTS < [queue count]) {
>>>> +      [queue removeObjectAtIndex:0];
>>>> +    }
>>>> +    [queue addObject:[AvdeviceAvfoundationBuffer 
>>>> fromCMSampleBufferRef:(CMSampleBufferRef)CFRetain(buffer)]];
>>>> +    [mutex unlock];
>>>> +}
>>>> +@end
>>>> +
>>>> typedef struct
>>>> {
>>>>   AVClass*        class;
>>>> -    int             frames_captured;
>>>> -    int             audio_frames_captured;
>>>> -    pthread_mutex_t frame_lock;
>>>>   id              avf_delegate;
>>>>   id              avf_audio_delegate;
>>>> @@ -121,8 +204,8 @@
>>>>   AVCaptureSession         *capture_session;
>>>>   AVCaptureVideoDataOutput *video_output;
>>>>   AVCaptureAudioDataOutput *audio_output;
>>>> -    CMSampleBufferRef         current_frame;
>>>> -    CMSampleBufferRef         current_audio_frame;
>>>> +    AvdeviceAvfoundationBufferQueue *audio_frames;
>>>> +    AvdeviceAvfoundationBufferQueue *video_frames;
>>>>   AVCaptureDevice          *observed_device;
>>>> @@ -131,16 +214,6 @@
>>>>   int                      observed_quit;
>>>> } AVFContext;
>>>> -static void lock_frames(AVFContext* ctx)
>>>> -{
>>>> -    pthread_mutex_lock(&ctx->frame_lock);
>>>> -}
>>>> -
>>>> -static void unlock_frames(AVFContext* ctx)
>>>> -{
>>>> -    pthread_mutex_unlock(&ctx->frame_lock);
>>>> -}
>>>> -
>>>> /** FrameReciever class - delegate for AVCaptureSession
>>>> */
>>>> @interface AVFFrameReceiver : NSObject
>>>> @@ -218,17 +291,7 @@ - (void)  captureOutput:(AVCaptureOutput 
>>>> *)captureOutput
>>>> didOutputSampleBuffer:(CMSampleBufferRef)videoFrame
>>>>        fromConnection:(AVCaptureConnection *)connection
>>>> {
>>>> -    lock_frames(_context);
>>>> -
>>>> -    if (_context->current_frame != nil) {
>>>> -        CFRelease(_context->current_frame);
>>>> -    }
>>>> -
>>>> -    _context->current_frame = (CMSampleBufferRef)CFRetain(videoFrame);
>>>> -
>>>> -    unlock_frames(_context);
>>>> -
>>>> -    ++_context->frames_captured;
>>>> +    [_context->video_frames enqueue:videoFrame];
>>>> }
>>>> @end
>>>> @@ -262,17 +325,7 @@ - (void)  captureOutput:(AVCaptureOutput 
>>>> *)captureOutput
>>>> didOutputSampleBuffer:(CMSampleBufferRef)audioFrame
>>>>        fromConnection:(AVCaptureConnection *)connection
>>>> {
>>>> -    lock_frames(_context);
>>>> -
>>>> -    if (_context->current_audio_frame != nil) {
>>>> -        CFRelease(_context->current_audio_frame);
>>>> -    }
>>>> -
>>>> -    _context->current_audio_frame = 
>>>> (CMSampleBufferRef)CFRetain(audioFrame);
>>>> -
>>>> -    unlock_frames(_context);
>>>> -
>>>> -    ++_context->audio_frames_captured;
>>>> +    [_context->audio_frames enqueue:audioFrame];
>>>> }
>>>> @end
>>>> @@ -284,12 +337,16 @@ static void destroy_context(AVFContext* ctx)
>>>>   [ctx->capture_session release];
>>>>   [ctx->video_output    release];
>>>>   [ctx->audio_output    release];
>>>> +    [ctx->video_frames    release];
>>>> +    [ctx->audio_frames    release];
>>>>   [ctx->avf_delegate    release];
>>>>   [ctx->avf_audio_delegate release];
>>>>   ctx->capture_session = NULL;
>>>>   ctx->video_output    = NULL;
>>>>   ctx->audio_output    = NULL;
>>>> +    ctx->video_frames    = NULL;
>>>> +    ctx->audio_frames    = NULL;
>>>>   ctx->avf_delegate    = NULL;
>>>>   ctx->avf_audio_delegate = NULL;
>>>> @@ -297,12 +354,6 @@ static void destroy_context(AVFContext* ctx)
>>>>     AudioConverterDispose(ctx->audio_converter);
>>>>     ctx->audio_converter = NULL;
>>>>   }
>>>> -
>>>> -    pthread_mutex_destroy(&ctx->frame_lock);
>>>> -
>>>> -    if (ctx->current_frame) {
>>>> -        CFRelease(ctx->current_frame);
>>>> -    }
>>>> }
>>>> static void parse_device_name(AVFormatContext *s)
>>>> @@ -630,18 +681,18 @@ static int get_video_config(AVFormatContext *s)
>>>>   }
>>>>   // Take stream info from the first frame.
>>>> -    while (ctx->frames_captured < 1) {
>>>> +    while ([ctx->video_frames count] < 1) {
>>>>       CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.1, YES);
>>>>   }
>>>> -    lock_frames(ctx);
>>>> +    CMSampleBufferRef frame = [ctx->video_frames dequeue];
>>>>   ctx->video_stream_index = stream->index;
>>>>   avpriv_set_pts_info(stream, 64, 1, avf_time_base);
>>>> -    image_buffer = CMSampleBufferGetImageBuffer(ctx->current_frame);
>>>> -    block_buffer = CMSampleBufferGetDataBuffer(ctx->current_frame);
>>>> +    image_buffer = CMSampleBufferGetImageBuffer(frame);
>>>> +    block_buffer = CMSampleBufferGetDataBuffer(frame);
>>>>   if (image_buffer) {
>>>>       image_buffer_size = CVImageBufferGetEncodedSize(image_buffer);
>>>> @@ -657,10 +708,7 @@ static int get_video_config(AVFormatContext *s)
>>>>       stream->codecpar->format     = ctx->pixel_format;
>>>>   }
>>>> -    CFRelease(ctx->current_frame);
>>>> -    ctx->current_frame = nil;
>>>> -
>>>> -    unlock_frames(ctx);
>>>> +    CFRelease(frame);
>>>>   return 0;
>>>> }
>>>> @@ -680,27 +728,27 @@ static int get_audio_config(AVFormatContext *s)
>>>>   }
>>>>   // Take stream info from the first frame.
>>>> -    while (ctx->audio_frames_captured < 1) {
>>>> +    while ([ctx->audio_frames count] < 1) {
>>>>       CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.1, YES);
>>>>   }
>>>> -    lock_frames(ctx);
>>>> +    CMSampleBufferRef frame = [ctx->audio_frames dequeue];
>>>>   ctx->audio_stream_index = stream->index;
>>>>   avpriv_set_pts_info(stream, 64, 1, avf_time_base);
>>>> -    format_desc = 
>>>> CMSampleBufferGetFormatDescription(ctx->current_audio_frame);
>>>> +    format_desc = CMSampleBufferGetFormatDescription(frame);
>>>>   const AudioStreamBasicDescription *input_format = 
>>>> CMAudioFormatDescriptionGetStreamBasicDescription(format_desc);
>>>>   if (!input_format) {
>>>> -        unlock_frames(ctx);
>>>> +        CFRelease(frame);
>>>>       av_log(s, AV_LOG_ERROR, "audio format not available\n");
>>>>       return 1;
>>>>   }
>>>>   if (input_format->mFormatID != kAudioFormatLinearPCM) {
>>>> -        unlock_frames(ctx);
>>>> +        CFRelease(frame);
>>>>       av_log(s, AV_LOG_ERROR, "only PCM audio format are supported at the 
>>>> moment\n");
>>>>       return 1;
>>>>   }
>>>> @@ -778,16 +826,13 @@ static int get_audio_config(AVFormatContext *s)
>>>>   if (must_convert) {
>>>>       OSStatus ret = AudioConverterNew(input_format, &output_format, 
>>>> &ctx->audio_converter);
>>>>       if (ret != noErr) {
>>>> -            unlock_frames(ctx);
>>>> +            CFRelease(frame);
>>>>           av_log(s, AV_LOG_ERROR, "Error while allocating audio 
>>>> converter\n");
>>>>           return 1;
>>>>       }
>>>>   }
>>>> -    CFRelease(ctx->current_audio_frame);
>>>> -    ctx->current_audio_frame = nil;
>>>> -
>>>> -    unlock_frames(ctx);
>>>> +    CFRelease(frame);
>>>>   return 0;
>>>> }
>>>> @@ -805,8 +850,6 @@ static int avf_read_header(AVFormatContext *s)
>>>>   ctx->num_video_devices = [devices count] + [devices_muxed count];
>>>> -    pthread_mutex_init(&ctx->frame_lock, NULL);
>>>> -
>>>>   CGGetActiveDisplayList(0, NULL, &num_screens);
>>>> #endif
>>>> @@ -1006,6 +1049,8 @@ static int avf_read_header(AVFormatContext *s)
>>>>   // Initialize capture session
>>>>   ctx->capture_session = [[AVCaptureSession alloc] init];
>>>> +    ctx->video_frames    = [[AvdeviceAvfoundationBufferQueue alloc] init];
>>>> +    ctx->audio_frames    = [[AvdeviceAvfoundationBufferQueue alloc] init];
>>>>   if (video_device && add_video_device(s, video_device)) {
>>>>       goto fail;
>>>> @@ -1088,35 +1133,31 @@ static int avf_read_packet(AVFormatContext *s, 
>>>> AVPacket *pkt)
>>>>   AVFContext* ctx = (AVFContext*)s->priv_data;
>>>>   do {
>>>> -        CVImageBufferRef image_buffer;
>>>> -        CMBlockBufferRef block_buffer;
>>>> -        lock_frames(ctx);
>>>> -
>>>> -        if (ctx->current_frame != nil) {
>>>> +        if (1 <= [ctx->video_frames count]) {
>>>>           int status;
>>>>           int length = 0;
>>>> -
>>>> -            image_buffer = 
>>>> CMSampleBufferGetImageBuffer(ctx->current_frame);
>>>> -            block_buffer = 
>>>> CMSampleBufferGetDataBuffer(ctx->current_frame);
>>>> +            CMSampleBufferRef video_frame = [ctx->video_frames dequeue];
>>>> +            CVImageBufferRef image_buffer = 
>>>> CMSampleBufferGetImageBuffer(video_frame);;
>>>> +            CMBlockBufferRef block_buffer = 
>>>> CMSampleBufferGetDataBuffer(video_frame);
>>>>           if (image_buffer != nil) {
>>>>               length = (int)CVPixelBufferGetDataSize(image_buffer);
>>>>           } else if (block_buffer != nil) {
>>>>               length = (int)CMBlockBufferGetDataLength(block_buffer);
>>>>           } else  {
>>>> -                unlock_frames(ctx);
>>>> +                CFRelease(video_frame);
>>>>               return AVERROR(EINVAL);
>>>>           }
>>>>           if (av_new_packet(pkt, length) < 0) {
>>>> -                unlock_frames(ctx);
>>>> +                CFRelease(video_frame);
>>>>               return AVERROR(EIO);
>>>>           }
>>>>           CMItemCount count;
>>>>           CMSampleTimingInfo timing_info;
>>>> -            if 
>>>> (CMSampleBufferGetOutputSampleTimingInfoArray(ctx->current_frame, 1, 
>>>> &timing_info, &count) == noErr) {
>>>> +            if (CMSampleBufferGetOutputSampleTimingInfoArray(video_frame, 
>>>> 1, &timing_info, &count) == noErr) {
>>>>               AVRational timebase_q = av_make_q(1, 
>>>> timing_info.presentationTimeStamp.timescale);
>>>>               pkt->pts = pkt->dts = 
>>>> av_rescale_q(timing_info.presentationTimeStamp.value, timebase_q, 
>>>> avf_time_base_q);
>>>>           }
>>>> @@ -1133,15 +1174,14 @@ static int avf_read_packet(AVFormatContext *s, 
>>>> AVPacket *pkt)
>>>>                   status = AVERROR(EIO);
>>>>               }
>>>>            }
>>>> -            CFRelease(ctx->current_frame);
>>>> -            ctx->current_frame = nil;
>>>> +            CFRelease(video_frame);
>>>>           if (status < 0) {
>>>> -                unlock_frames(ctx);
>>>>               return status;
>>>>           }
>>>> -        } else if (ctx->current_audio_frame != nil) {
>>>> -            CMBlockBufferRef block_buffer = 
>>>> CMSampleBufferGetDataBuffer(ctx->current_audio_frame);
>>>> +        } else if (1 <= [ctx->audio_frames count]) {
>>>> +            CMSampleBufferRef audio_frame = [ctx->audio_frames dequeue];
>>>> +            CMBlockBufferRef block_buffer = 
>>>> CMSampleBufferGetDataBuffer(audio_frame);
>>>>           size_t input_size = CMBlockBufferGetDataLength(block_buffer);
>>>>           int buffer_size = input_size / ctx->audio_buffers;
>>>> @@ -1151,12 +1191,12 @@ static int avf_read_packet(AVFormatContext *s, 
>>>> AVPacket *pkt)
>>>>           UInt32 size = sizeof(output_size);
>>>>           ret = AudioConverterGetProperty(ctx->audio_converter, 
>>>> kAudioConverterPropertyCalculateOutputBufferSize, &size, &output_size);
>>>>           if (ret != noErr) {
>>>> -                unlock_frames(ctx);
>>>> +                CFRelease(audio_frame);
>>>>               return AVERROR(EIO);
>>>>           }
>>>>           if (av_new_packet(pkt, output_size) < 0) {
>>>> -                unlock_frames(ctx);
>>>> +                CFRelease(audio_frame);
>>>>               return AVERROR(EIO);
>>>>           }
>>>> @@ -1173,7 +1213,7 @@ static int avf_read_packet(AVFormatContext *s, 
>>>> AVPacket *pkt)
>>>>                   if (ret != kCMBlockBufferNoErr) {
>>>>                       av_free(input_buffer);
>>>> -                        unlock_frames(ctx);
>>>> +                        CFRelease(audio_frame);
>>>>                       return AVERROR(EIO);
>>>>                   }
>>>>               }
>>>> @@ -1191,7 +1231,7 @@ static int avf_read_packet(AVFormatContext *s, 
>>>> AVPacket *pkt)
>>>>               av_free(input_buffer);
>>>>               if (ret != noErr) {
>>>> -                    unlock_frames(ctx);
>>>> +                    CFRelease(audio_frame);
>>>>                   return AVERROR(EIO);
>>>>               }
>>>> @@ -1199,7 +1239,7 @@ static int avf_read_packet(AVFormatContext *s, 
>>>> AVPacket *pkt)
>>>>           } else {
>>>>                ret = CMBlockBufferCopyDataBytes(block_buffer, 0, 
>>>> pkt->size, pkt->data);
>>>>                if (ret != kCMBlockBufferNoErr) {
>>>> -                     unlock_frames(ctx);
>>>> +                     CFRelease(audio_frame);
>>>>                    return AVERROR(EIO);
>>>>                }
>>>>           }
>>>> @@ -1207,7 +1247,7 @@ static int avf_read_packet(AVFormatContext *s, 
>>>> AVPacket *pkt)
>>>>           CMItemCount count;
>>>>           CMSampleTimingInfo timing_info;
>>>> -            if 
>>>> (CMSampleBufferGetOutputSampleTimingInfoArray(ctx->current_audio_frame, 1, 
>>>> &timing_info, &count) == noErr) {
>>>> +            if (CMSampleBufferGetOutputSampleTimingInfoArray(audio_frame, 
>>>> 1, &timing_info, &count) == noErr) {
>>>>               AVRational timebase_q = av_make_q(1, 
>>>> timing_info.presentationTimeStamp.timescale);
>>>>               pkt->pts = pkt->dts = 
>>>> av_rescale_q(timing_info.presentationTimeStamp.value, timebase_q, 
>>>> avf_time_base_q);
>>>>           }
>>>> @@ -1215,21 +1255,15 @@ static int avf_read_packet(AVFormatContext *s, 
>>>> AVPacket *pkt)
>>>>           pkt->stream_index  = ctx->audio_stream_index;
>>>>           pkt->flags        |= AV_PKT_FLAG_KEY;
>>>> -            CFRelease(ctx->current_audio_frame);
>>>> -            ctx->current_audio_frame = nil;
>>>> -
>>>> -            unlock_frames(ctx);
>>>> +            CFRelease(audio_frame);
>>>>       } else {
>>>>           pkt->data = NULL;
>>>> -            unlock_frames(ctx);
>>>>           if (ctx->observed_quit) {
>>>>               return AVERROR_EOF;
>>>>           } else {
>>>>               return AVERROR(EAGAIN);
>>>>           }
>>>>       }
>>>> -
>>>> -        unlock_frames(ctx);
>>>>   } while (!pkt->data);
>>>>   return 0;
>>>> -- 
>>>> 2.30.1 (Apple Git-130)
Reply via email to