This is an automated email from the ASF dual-hosted git repository.

hezhangjian pushed a commit to branch fix/consumer-keynotfoundexception
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git

commit e0da267ce7a5a37f71023ab8464ab64d2d4cecf3
Author: Zhangjian He <[email protected]>
AuthorDate: Mon Sep 8 11:31:11 2025 +0800

    fix: Replace Dictionary with ConcurrentDictionary in Consumer to fix 
KeyNotFoundException
    
    - Replace Dictionary<string, SubConsumer<TMessage>> with 
ConcurrentDictionary to ensure thread safety
    - Fix KeyNotFoundException that occurs in multi-threaded scenarios when 
accessing _subConsumers
    - Add System.Collections.Concurrent using directive
    - Minimal change with maximum impact for thread safety
    
    Fixes: KeyNotFoundException in Consumer.Acknowledge method when multiple 
threads access the dictionary concurrently
---
 src/DotPulsar/Internal/Consumer.cs | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/src/DotPulsar/Internal/Consumer.cs 
b/src/DotPulsar/Internal/Consumer.cs
index a89f997..b941f6b 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -18,6 +18,7 @@ using DotPulsar.Abstractions;
 using DotPulsar.Exceptions;
 using DotPulsar.Extensions;
 using DotPulsar.Internal.Abstractions;
+using System.Collections.Concurrent;
 using DotPulsar.Internal.Compression;
 using DotPulsar.Internal.PulsarApi;
 
@@ -32,7 +33,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
     private readonly Executor _executor;
     private readonly SemaphoreSlim _semaphoreSlim;
     private readonly AsyncLock _lock;
-    private readonly Dictionary<string, SubConsumer<TMessage>> _subConsumers;
+    private readonly ConcurrentDictionary<string, SubConsumer<TMessage>> 
_subConsumers;
     private readonly TaskCompletionSource<IMessage<TMessage>> _neverEndingTask;
     private SubConsumer<TMessage>[] _receivers;
     private Task<IMessage<TMessage>>[] _receiveTasks;
@@ -79,7 +80,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage>
         _exceptionHandler = exceptionHandler;
         _allSubConsumersAreReady = false;
         _isDisposed = 0;
-        _subConsumers = [];
+        _subConsumers = new ConcurrentDictionary<string, 
SubConsumer<TMessage>>();
         _singleSubConsumer = null;
 
         _ = Setup();

Reply via email to