Hi All

I've implemented a typical consumer/producer pattern, using a container holding 
a FIFO list of buffers. This buffer list has functions to store and retrieve 
these buffers concurrently. Consumer and Producer are running in the global 
concurrent dispatch queue.

I'm experiencing huge differences in performance in two almost identical 
versions of the code. I've tried to figure the cause with Instruments, but 
didn't find any concrete hints. The "xxx_block_invoke"  of the consumer part of 
one version just takes considerable more time than the other.




The code basically works as follows:

The producer is simply creating a certain amount of buffers (NSData objects), 
initializing its content and storing them into the concurrent accessible buffer 
list. The producer part of the code is identical in both versions. The 
concurrently accessible buffers list is the same, too.

In the consumer part , the code simply retrieves buffers and processes them. 
The difference is in the processing of the received buffer content - that is, 
how the bytes of the buffer's content are accessed.
This tiny bit of difference in code should't make a huge difference in runtime, 
but it actually there is a huge difference, where ever it comes from!

The first uses a "direct" implementation using pointers accessing the content 
of the buffer.
The second implementation uses a C++ iterator concept. But when compiled with 
-03, it should produce almost the same code as the former. So, I would expect 
only minor differences in performance.

However, the difference is about factor 3! I have no idea what the cause of 
this huge difference is. The main part of the code is shown below. The source 
is actual C++ and uses just the dispatch library. Not every piece of source is 
shown, but I can provide it if necessary.


Additional Info:

Synchronizing the access is achieved using dispatch_semaphore objects. Storing 
a buffer into the buffers  list may block if the buffers list has reached its 
maximum number of buffers. Retrieving a buffer may block if there is no buffer 
available.

After testing, it seems, the implementation is correct.

Class CFDataConstBuffers<char> is the type of the buffer list. Is has two 
principal functions:
consume() and produce() which can be called concurrently.

consume() returns the next buffer in the list (FIFO). It may block until the 
buffer list has one available.

produce() puts back a buffer. It may block, if the buffer's capacity (max 
number of buffers) is reached.


Class CFDataConstBuffer<char>, the buffer class, is basically a thin wrapper 
around a CFDataRef.

Class semaphore is a thin wrapper around a dispatch_semahore_t.



Below are two functions whose runtime duration is measured. Note that the 
consumer part of the function ConcurrentProduceConsume() is written such that 
it mimics the code produced by the compiler in the second function 
ConcurrentProduceConsumeIter() which uses C++ Iterators - hence, it looks a bit 
more complex than necessary. The code for the iterator isn't shown here, though.

The buffer size was set from 4KB to 32KB, incrementing in steps.
The buffers' list capacity (max number of hold buffers) was set to 1 to 8.
LLVM compiler, Xcode 4.02.

For no apparent reason ConcurrentProduceConsume() performs significantly faster 
(about 2.5x) than ConcurrentProduceConsumeIter().

Is there a hidden cost for C++ instances in a block, for C++ exception 
handlers, etc.?


Thanks for tips!



// Create a buffers instance with capacity C, then produce N buffers with 
// BUFFER_SIZE bytes and fill them. Concurrently consume the buffers and 
// iterate over the consumed buffer.
// Performs concurrently on two threads. 
//
void ConcurrentProduceConsume(size_t C = 1, size_t N = 100)
{    
    typedef std::pair<CFDataConstBuffer<char>, bool> result_t;
    
    // Create a buffers instance with at max C buffers:
    CFDataConstBuffers<char> buffers(C);
    CFDataConstBuffers<char>* buffersPtr = &buffers;
    
    const size_t TOTAL = BUFFER_SIZE * N;
    
    // Get the global concurrent queue:
    dispatch_queue_t queue = 
dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
    
    // Create a group in order to sync the two threads:
    dispatch_group_t group = dispatch_group_create();
    
    dispatch_group_async(group, queue,
    ^{        
        unsigned long k = 0;
        UInt8 data[BUFFER_SIZE];
        for (int i = 0; i < N; ++i) {
            // fill the buffer:
            for (int j = 0; j < BUFFER_SIZE; ++j, ++k) {
                data[j] = char(k);                                     
            }                                 
            CFDataRef d = CFDataCreate(NULL, data, sizeof(data));
            CFDataConstBuffer<char> buffer = d;
            CFRelease(d);
            buffersPtr->produce(buffer);
        }
        
        // put EOF:
        buffersPtr->produce(CFDataConstBuffer<char>());
    });
    
    
    dispatch_group_async(group, queue,
    ^{
        const char* p;
        const char* back;
        
        result_t result = buffersPtr->consume(3);
        if (result.second and result.first.data() and result.first.size() > 0) {
            p = result.first.data();
            back = p + result.first.size() - 1;
        } 
        else {
            // timeout or eof
            p = 0;
        }
             
        size_t total = 0;
        int sum = 0;
        while (p != 0) 
        {   
            ++total;
            sum += *p;
            
            // Increment p. consume a new buffer if required:
            if (p != back) {
                ++p;
            }
            else
            {
                result = buffersPtr->consume(3);
                // Did we receive an EOF or error?
                if (result.second and result.first.data() and 
result.first.size() > 0) { 
                    // no eof
                    p = result.first.data();
                    back = p + result.first.size() - 1;
                }
                else {
                    // eof or timeout
                    p = 0;
                }
            }
        }
    });

    if (dispatch_group_wait(group, dispatch_time(DISPATCH_TIME_NOW, 1e9*10))) {
        std::cout << "ERROR: ConcurrentProduceConsume received timeout." << 
std::endl;
    }
    dispatch_release(group);
}



// Create a buffers instance with capacity C, then produce N buffers with 
// BUFFER_SIZE bytes. Concurrently consume the buffers and process it with 
// iterating over the buffer's content.
// Performs concurrently on two threads. 

void ConcurrentProduceConsumeIter(size_t C = 1, size_t N = 100)
{    
    typedef std::pair<CFDataConstBuffer<char>, bool> result_t;
    
    // Create a buffers instance with at max C buffers:
    CFDataConstBuffers<char> buffers(C);
    CFDataConstBuffers<char>* buffersPtr = &buffers;
    
    // Get the global concurrent queue:
    dispatch_queue_t queue = 
dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
    
    // Create a semaphore in order to sync on the completion of the two threads:
    // (could use a group as well, doesn't cause differences in runtime)        
    semaphore sem(0);
    semaphore* semPtr = &sem;
        
    dispatch_async(queue,
     ^{  
         unsigned long k = 0;
         UInt8 data[BUFFER_SIZE];
         for (int i = 0; i < N; ++i) {
             // fill the buffer:
             for (int j = 0; j < BUFFER_SIZE; ++j, ++k) {
                 data[j] = char(k);                                     
             }                                 
             CFDataRef d = CFDataCreate(NULL, data, sizeof(data));
             CFDataConstBuffer<char> buffer = d;
             CFRelease(d);
             buffersPtr->produce(buffer);
         }
         
         // put EOF:
         buffersPtr->produce(CFDataConstBuffer<char>());
     });
    
    // We need the eof iterator and an iterator which is initialized with the
    // buffers list.
    // Using pointers to C++ classes defined outside the block seems to 
increase performance
    CFDataConstBuffers_iterator<char> eof;
    CFDataConstBuffers_iterator<char> iter(*buffersPtr);
    CFDataConstBuffers_iterator<char>* eofPtr = &eof;
    CFDataConstBuffers_iterator<char>* iterPtr = &iter;    

    dispatch_async(queue,
     ^{
         size_t total = 0;
         int sum = 0;
         while ( *iterPtr != *eofPtr)
         {
             sum += *(*iterPtr);
             ++(*iterPtr);   // incrementing may block
             ++total;
         }
         semPtr->signal();
     });
    
    if (!sem.wait(10)) {
        std::cout << "ERROR: ConcurrentProduceConsumeIter received timeout." << 
std::endl;
    }
}



Regards
Andreas_______________________________________________

Cocoa-dev mailing list (Cocoa-dev@lists.apple.com)

Please do not post admin requests or moderator comments to the list.
Contact the moderators at cocoa-dev-admins(at)lists.apple.com

Help/Unsubscribe/Update your Subscription:
http://lists.apple.com/mailman/options/cocoa-dev/archive%40mail-archive.com

This email sent to arch...@mail-archive.com

Reply via email to