blankensteiner commented on a change in pull request #71:
URL: https://github.com/apache/pulsar-dotpulsar/pull/71#discussion_r613025495



##########
File path: src/DotPulsar/Internal/Events/PartitionedSubProducerStateChanged.cs
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Events
+{
+    using Abstractions;
+    using System;
+
+    /// <summary>
+    /// Representation of the sub producer of a partitioned producer state 
change.
+    /// </summary>
+    public class PartitionedSubProducerStateChanged : IEvent

Review comment:
       Let's mark it as sealed

##########
File path: src/DotPulsar/Internal/PartitionedProducer.cs
##########
@@ -0,0 +1,99 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using Events;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Linq;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public class PartitionedProducer<TMessage> : IProducer<TMessage>
+    {
+        private readonly Guid _correlationId;
+        private readonly IRegisterEvent _eventRegister;
+        private readonly IStateChanged<ProducerState> _state;
+        private readonly PulsarClient _pulsarClient;
+        private readonly ProducerOptions<TMessage> _options;
+        private readonly ConcurrentDictionary<int, IProducer<TMessage>> 
_producers;
+        private readonly IMessageRouter _messageRouter;
+        private readonly CancellationTokenSource _cts = new();
+        private int _isDisposed;
+        private int _producersCount;
+        public Uri ServiceUrl { get; }
+        public string Topic { get; }
+
+        public PartitionedProducer(
+            Guid correlationId,
+            Uri serviceUrl,
+            string topic,
+            IRegisterEvent registerEvent,
+            IStateChanged<ProducerState> state,
+            uint partitionsCount,
+            ProducerOptions<TMessage> options,
+            PulsarClient pulsarClient
+        )
+        {
+            _correlationId = correlationId;
+            ServiceUrl = serviceUrl;
+            Topic = topic;
+            _eventRegister = registerEvent;
+            _state = state;
+            _isDisposed = 0;
+            _options = options;
+            _pulsarClient = pulsarClient;
+            _producersCount = (int) partitionsCount;
+            _messageRouter = options.MessageRouter;
+
+            _producers = new ConcurrentDictionary<int, 
IProducer<TMessage>>(Environment.ProcessorCount, _producersCount);
+            CreateSubProducers(0, _producersCount).Wait();

Review comment:
       .Wait() and .Result on a task are blocking and as such a no-go.
   We need to rethink how to create a partitioned producer without "Create" 
needing to be an async operation.

##########
File path: src/DotPulsar/Internal/PartitionedProducer.cs
##########
@@ -0,0 +1,99 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using Events;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Linq;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public class PartitionedProducer<TMessage> : IProducer<TMessage>
+    {
+        private readonly Guid _correlationId;
+        private readonly IRegisterEvent _eventRegister;
+        private readonly IStateChanged<ProducerState> _state;
+        private readonly PulsarClient _pulsarClient;
+        private readonly ProducerOptions<TMessage> _options;
+        private readonly ConcurrentDictionary<int, IProducer<TMessage>> 
_producers;
+        private readonly IMessageRouter _messageRouter;
+        private readonly CancellationTokenSource _cts = new();
+        private int _isDisposed;
+        private int _producersCount;
+        public Uri ServiceUrl { get; }
+        public string Topic { get; }
+
+        public PartitionedProducer(
+            Guid correlationId,
+            Uri serviceUrl,
+            string topic,
+            IRegisterEvent registerEvent,
+            IStateChanged<ProducerState> state,
+            uint partitionsCount,
+            ProducerOptions<TMessage> options,
+            PulsarClient pulsarClient
+        )
+        {
+            _correlationId = correlationId;
+            ServiceUrl = serviceUrl;
+            Topic = topic;
+            _eventRegister = registerEvent;
+            _state = state;
+            _isDisposed = 0;
+            _options = options;
+            _pulsarClient = pulsarClient;
+            _producersCount = (int) partitionsCount;
+            _messageRouter = options.MessageRouter;
+
+            _producers = new ConcurrentDictionary<int, 
IProducer<TMessage>>(Environment.ProcessorCount, _producersCount);
+            CreateSubProducers(0, _producersCount).Wait();
+        }
+
+        private async Task CreateSubProducers(int startIndex, int count)
+        {
+            await Task.WhenAll(Enumerable.Range(startIndex, count).Select(n =>

Review comment:
       The producers can just be created, no need to create a task for creating 
them

##########
File path: src/DotPulsar/Internal/PartitionedProducer.cs
##########
@@ -0,0 +1,99 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using Events;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Linq;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public class PartitionedProducer<TMessage> : IProducer<TMessage>
+    {
+        private readonly Guid _correlationId;
+        private readonly IRegisterEvent _eventRegister;
+        private readonly IStateChanged<ProducerState> _state;
+        private readonly PulsarClient _pulsarClient;
+        private readonly ProducerOptions<TMessage> _options;
+        private readonly ConcurrentDictionary<int, IProducer<TMessage>> 
_producers;
+        private readonly IMessageRouter _messageRouter;
+        private readonly CancellationTokenSource _cts = new();
+        private int _isDisposed;
+        private int _producersCount;
+        public Uri ServiceUrl { get; }
+        public string Topic { get; }
+
+        public PartitionedProducer(
+            Guid correlationId,
+            Uri serviceUrl,
+            string topic,
+            IRegisterEvent registerEvent,
+            IStateChanged<ProducerState> state,
+            uint partitionsCount,
+            ProducerOptions<TMessage> options,
+            PulsarClient pulsarClient
+        )
+        {
+            _correlationId = correlationId;
+            ServiceUrl = serviceUrl;
+            Topic = topic;
+            _eventRegister = registerEvent;
+            _state = state;
+            _isDisposed = 0;
+            _options = options;
+            _pulsarClient = pulsarClient;
+            _producersCount = (int) partitionsCount;
+            _messageRouter = options.MessageRouter;
+
+            _producers = new ConcurrentDictionary<int, 
IProducer<TMessage>>(Environment.ProcessorCount, _producersCount);
+            CreateSubProducers(0, _producersCount).Wait();
+        }
+
+        private async Task CreateSubProducers(int startIndex, int count)
+        {
+            await Task.WhenAll(Enumerable.Range(startIndex, count).Select(n =>
+            {
+                return Task.Run( () =>
+                {
+                    var producer = _pulsarClient.NewProducer(Topic, _options, 
(uint)n, _correlationId);
+                    _producers[n] = producer;
+                }, _cts.Token);
+            }).ToList()).ConfigureAwait(false);
+        }
+
+        public bool IsFinalState()
+            => _state.IsFinalState();
+
+        public bool IsFinalState(ProducerState state)
+            => _state.IsFinalState(state);
+
+        public async ValueTask<ProducerState> OnStateChangeTo(ProducerState 
state, CancellationToken cancellationToken = default)
+            => await _state.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
+
+        public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState 
state, CancellationToken cancellationToken = default)
+            => await _state.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
+
+        public async ValueTask DisposeAsync()
+        {
+            if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
+                return;
+
+            _cts.Cancel();
+            _cts.Dispose();
+
+            foreach (var producer in _producers.Values)
+            {
+                await producer.DisposeAsync().ConfigureAwait(false);
+            }
+
+            _eventRegister.Register(new ProducerDisposed(_correlationId));
+        }
+
+        public ValueTask<MessageId> Send(TMessage message, CancellationToken 
cancellationToken = default)
+            => _producers[_messageRouter.ChoosePartition(null, 
_producersCount)].Send(message, cancellationToken);

Review comment:
       Doing this is totally valid, but in the name of consistency let's await 
it

##########
File path: src/DotPulsar/Internal/Events/UpdatePartitions.cs
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Events
+{
+    using Abstractions;
+    using System;
+
+    /// <summary>
+    /// Representation of the partitions count of the partitioned topic 
updating.
+    /// </summary>
+    public class UpdatePartitions : IEvent

Review comment:
       Let's mark it as sealed

##########
File path: src/DotPulsar/Internal/PartitionedProducer.cs
##########
@@ -0,0 +1,99 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using Events;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Linq;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public class PartitionedProducer<TMessage> : IProducer<TMessage>
+    {
+        private readonly Guid _correlationId;
+        private readonly IRegisterEvent _eventRegister;
+        private readonly IStateChanged<ProducerState> _state;
+        private readonly PulsarClient _pulsarClient;
+        private readonly ProducerOptions<TMessage> _options;
+        private readonly ConcurrentDictionary<int, IProducer<TMessage>> 
_producers;
+        private readonly IMessageRouter _messageRouter;
+        private readonly CancellationTokenSource _cts = new();
+        private int _isDisposed;
+        private int _producersCount;
+        public Uri ServiceUrl { get; }
+        public string Topic { get; }
+
+        public PartitionedProducer(
+            Guid correlationId,
+            Uri serviceUrl,
+            string topic,
+            IRegisterEvent registerEvent,
+            IStateChanged<ProducerState> state,
+            uint partitionsCount,
+            ProducerOptions<TMessage> options,
+            PulsarClient pulsarClient
+        )
+        {
+            _correlationId = correlationId;
+            ServiceUrl = serviceUrl;
+            Topic = topic;
+            _eventRegister = registerEvent;
+            _state = state;
+            _isDisposed = 0;
+            _options = options;
+            _pulsarClient = pulsarClient;
+            _producersCount = (int) partitionsCount;
+            _messageRouter = options.MessageRouter;
+
+            _producers = new ConcurrentDictionary<int, 
IProducer<TMessage>>(Environment.ProcessorCount, _producersCount);
+            CreateSubProducers(0, _producersCount).Wait();
+        }
+
+        private async Task CreateSubProducers(int startIndex, int count)
+        {
+            await Task.WhenAll(Enumerable.Range(startIndex, count).Select(n =>
+            {
+                return Task.Run( () =>
+                {
+                    var producer = _pulsarClient.NewProducer(Topic, _options, 
(uint)n, _correlationId);
+                    _producers[n] = producer;
+                }, _cts.Token);
+            }).ToList()).ConfigureAwait(false);
+        }
+
+        public bool IsFinalState()
+            => _state.IsFinalState();
+
+        public bool IsFinalState(ProducerState state)
+            => _state.IsFinalState(state);
+
+        public async ValueTask<ProducerState> OnStateChangeTo(ProducerState 
state, CancellationToken cancellationToken = default)
+            => await _state.StateChangedTo(state, 
cancellationToken).ConfigureAwait(false);
+
+        public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState 
state, CancellationToken cancellationToken = default)
+            => await _state.StateChangedFrom(state, 
cancellationToken).ConfigureAwait(false);
+
+        public async ValueTask DisposeAsync()
+        {
+            if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
+                return;
+
+            _cts.Cancel();
+            _cts.Dispose();
+
+            foreach (var producer in _producers.Values)
+            {
+                await producer.DisposeAsync().ConfigureAwait(false);
+            }
+
+            _eventRegister.Register(new ProducerDisposed(_correlationId));
+        }
+
+        public ValueTask<MessageId> Send(TMessage message, CancellationToken 
cancellationToken = default)
+            => _producers[_messageRouter.ChoosePartition(null, 
_producersCount)].Send(message, cancellationToken);
+
+        public ValueTask<MessageId> Send(MessageMetadata metadata, TMessage 
message, CancellationToken cancellationToken = default)
+            => _producers[_messageRouter.ChoosePartition(metadata, 
_producersCount)].Send(message, cancellationToken);

Review comment:
       Doing this is totally valid, but in the name of consistency let's await 
it

##########
File path: src/DotPulsar/Internal/PartitionedProducer.cs
##########
@@ -0,0 +1,99 @@
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using DotPulsar.Abstractions;
+    using Events;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Linq;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public class PartitionedProducer<TMessage> : IProducer<TMessage>

Review comment:
       Let's mark it as sealed

##########
File path: src/DotPulsar/PulsarClient.cs
##########
@@ -37,7 +38,7 @@ public sealed class PulsarClient : IPulsarClient
 
         public Uri ServiceUrl { get; }
 
-        internal PulsarClient(
+        public PulsarClient(

Review comment:
       We can't make this public. If you need to create an instance for 
testing, then (in the internal namespace) you can create a PulsarClientFactory

##########
File path: src/DotPulsar/PulsarClient.cs
##########
@@ -57,26 +58,84 @@ public sealed class PulsarClient : IPulsarClient
         public static IPulsarClientBuilder Builder()
             => new PulsarClientBuilder();
 
+        public async Task<uint> GetNumberOfPartitions(string topic, 
CancellationToken cancellationToken)
+        {
+            var connection = await 
_connectionPool.FindConnectionForTopic(topic, 
cancellationToken).ConfigureAwait(false);
+            var commandPartitionedMetadata = new 
CommandPartitionedTopicMetadata() { Topic = topic };
+            var response = await connection.Send(commandPartitionedMetadata, 
cancellationToken).ConfigureAwait(false);
+
+            response.Expect(BaseCommand.Type.PartitionedMetadataResponse);
+
+            if (response.PartitionMetadataResponse.Response == 
CommandPartitionedTopicMetadataResponse.LookupType.Failed)
+                response.PartitionMetadataResponse.Throw();
+
+            return response.PartitionMetadataResponse.Partitions;
+        }
+
         /// <summary>
         /// Create a producer.
         /// </summary>
         public IProducer<TMessage> 
CreateProducer<TMessage>(ProducerOptions<TMessage> options)
         {
             ThrowIfDisposed();
 
+            var partitionsCount = GetNumberOfPartitions(options.Topic, 
default).Result;

Review comment:
       .Result is a no-go, so we need to rethink how the partitioned producer 
is created. Let's have a talk about that on slack maybe?

##########
File path: tests/DotPulsar.Tests/DotPulsar.Tests.csproj
##########
@@ -8,6 +8,7 @@
   <ItemGroup>
     <PackageReference Include="FluentAssertions" Version="5.10.3" />
     <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.9.1" />
+    <PackageReference Include="Moq" Version="4.16.1" />

Review comment:
       Like we talk about, we'll be using NSubstitute instead

##########
File path: src/DotPulsar/PulsarClient.cs
##########
@@ -57,26 +58,84 @@ public sealed class PulsarClient : IPulsarClient
         public static IPulsarClientBuilder Builder()
             => new PulsarClientBuilder();
 
+        public async Task<uint> GetNumberOfPartitions(string topic, 
CancellationToken cancellationToken)

Review comment:
       Intentional that it is public?

##########
File path: src/DotPulsar/RoundRobinPartitionRouter.cs
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using Abstractions;
+    using HashDepot;
+    using System.Text;
+    using System.Threading;
+
+    public class RoundRobinPartitionRouter : IMessageRouter

Review comment:
       Let's make this sealed

##########
File path: tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using Abstractions;
+    using DotPulsar.Internal;
+    using DotPulsar.Internal.Abstractions;
+    using DotPulsar.Internal.Events;
+    using Moq;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using Xunit;
+
+    public class PartitionedProducerProcessTests
+    {
+        [Fact]
+        public void TestPartitionedProducerStateManage()
+        {
+            var connectionPoolMock = new 
Mock<IConnectionPool>(MockBehavior.Loose);
+            var connectionPool = connectionPoolMock.Object;
+            var establishNewChannelMock = new 
Mock<IEstablishNewChannel>(MockBehavior.Loose);
+            var establishNewChannel = establishNewChannelMock.Object;
+            var producerMock = new Mock<IProducer>(MockBehavior.Loose);
+            var producer = producerMock.Object;
+
+            var processManager = new ProcessManager(connectionPool);
+
+            var producerProcesses = new Dictionary<uint, ProducerProcess>(3);
+            var producerGuids = new Dictionary<uint, Guid>(3);
+            var producersGroup = new ConcurrentDictionary<uint, 
IProducer>(Environment.ProcessorCount, 3);
+            var partitionedProducerGuid = Guid.NewGuid();
+
+            for (uint i = 0; i < 3; i++)
+            {
+                var stateManager = new 
StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, 
ProducerState.Faulted);
+                var correlationId = Guid.NewGuid();
+                var process = new ProducerProcess(correlationId, stateManager, 
establishNewChannel, processManager, partitionedProducerGuid, i);
+                producerGuids[i] = correlationId;
+                producerProcesses[i] = process;
+                producersGroup[i] = producer;
+                processManager.Add(process);
+            }
+
+            var partitionedStateManager =
+                new StateManager<ProducerState>(ProducerState.Disconnected, 
ProducerState.Closed, ProducerState.Faulted);
+            var partitionedProducerProcess = new 
PartitionedProducerProcess(partitionedProducerGuid, partitionedStateManager, 
producersGroup.Count);
+            processManager.Add(partitionedProducerProcess);
+
+            // Test connect
+            Assert.Equal(ProducerState.Disconnected, 
partitionedStateManager.CurrentState);
+            processManager.Register(new ChannelConnected(producerGuids[0]));
+            Assert.Equal(ProducerState.PartiallyConnected, 
partitionedStateManager.CurrentState);
+            processManager.Register(new ChannelConnected(producerGuids[1]));
+            Assert.Equal(ProducerState.PartiallyConnected, 
partitionedStateManager.CurrentState);
+            processManager.Register(new ChannelConnected(producerGuids[2]));
+            Assert.Equal(ProducerState.Connected, 
partitionedStateManager.CurrentState);
+
+            // Test disconnect
+            processManager.Register(new ChannelDisconnected(producerGuids[1]));
+            Assert.Equal(ProducerState.PartiallyConnected, 
partitionedStateManager.CurrentState);
+
+            // Test reconnect
+            processManager.Register(new ChannelConnected(producerGuids[1]));
+            Assert.Equal(ProducerState.Connected, 
partitionedStateManager.CurrentState);
+
+            // Test fault
+            processManager.Register(new ExecutorFaulted(producerGuids[1]));
+            Assert.Equal(ProducerState.Faulted, 
partitionedStateManager.CurrentState);
+        }
+
+        [Fact]
+        public void TestUpdatePartitions()

Review comment:
       The test naming convention is 
[Method/Feature]_[Given/When]Something_[Should/Then]ExpectedResult

##########
File path: tests/DotPulsar.Tests/PulsarClientTests.cs
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests
+{
+    using Abstractions;
+    using DotPulsar.Internal;
+    using DotPulsar.Internal.Abstractions;
+    using DotPulsar.Internal.PulsarApi;
+    using Moq;
+    using System;
+    using System.Threading;
+    using Xunit;
+
+    public class PulsarClientTests
+    {
+        [Fact]
+        public async void 
GetPartitions_GivenPartitionedTopic_ShouldReturnPartitionsNumber()

Review comment:
       Use "Task" instead of "void"

##########
File path: src/DotPulsar/Internal/PartitionedProducerProcess.cs
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using Abstractions;
+    using Events;
+    using System;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public class PartitionedProducerProcess : IProcess

Review comment:
       As we talked about, this can just be deleted as you decided to make 
changes to ProducerProcess instead.

##########
File path: tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using Abstractions;
+    using DotPulsar.Internal;
+    using DotPulsar.Internal.Abstractions;
+    using DotPulsar.Internal.Events;
+    using Moq;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using Xunit;
+
+    public class PartitionedProducerProcessTests
+    {
+        [Fact]
+        public void TestPartitionedProducerStateManage()

Review comment:
       The test naming convention is 
[Method/Feature]_[Given/When]Something_[Should/Then]ExpectedResult




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to