This is an automated email from the ASF dual-hosted git repository. hezhangjian pushed a commit to branch fix/consumer-keynotfoundexception in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
commit e0da267ce7a5a37f71023ab8464ab64d2d4cecf3 Author: Zhangjian He <[email protected]> AuthorDate: Mon Sep 8 11:31:11 2025 +0800 fix: Replace Dictionary with ConcurrentDictionary in Consumer to fix KeyNotFoundException - Replace Dictionary<string, SubConsumer<TMessage>> with ConcurrentDictionary to ensure thread safety - Fix KeyNotFoundException that occurs in multi-threaded scenarios when accessing _subConsumers - Add System.Collections.Concurrent using directive - Minimal change with maximum impact for thread safety Fixes: KeyNotFoundException in Consumer.Acknowledge method when multiple threads access the dictionary concurrently --- src/DotPulsar/Internal/Consumer.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs index a89f997..b941f6b 100644 --- a/src/DotPulsar/Internal/Consumer.cs +++ b/src/DotPulsar/Internal/Consumer.cs @@ -18,6 +18,7 @@ using DotPulsar.Abstractions; using DotPulsar.Exceptions; using DotPulsar.Extensions; using DotPulsar.Internal.Abstractions; +using System.Collections.Concurrent; using DotPulsar.Internal.Compression; using DotPulsar.Internal.PulsarApi; @@ -32,7 +33,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage> private readonly Executor _executor; private readonly SemaphoreSlim _semaphoreSlim; private readonly AsyncLock _lock; - private readonly Dictionary<string, SubConsumer<TMessage>> _subConsumers; + private readonly ConcurrentDictionary<string, SubConsumer<TMessage>> _subConsumers; private readonly TaskCompletionSource<IMessage<TMessage>> _neverEndingTask; private SubConsumer<TMessage>[] _receivers; private Task<IMessage<TMessage>>[] _receiveTasks; @@ -79,7 +80,7 @@ public sealed class Consumer<TMessage> : IConsumer<TMessage> _exceptionHandler = exceptionHandler; _allSubConsumersAreReady = false; _isDisposed = 0; - _subConsumers = []; + _subConsumers = new ConcurrentDictionary<string, SubConsumer<TMessage>>(); _singleSubConsumer = null; _ = Setup();
