blankensteiner commented on a change in pull request #71: URL: https://github.com/apache/pulsar-dotpulsar/pull/71#discussion_r613025495
########## File path: src/DotPulsar/Internal/Events/PartitionedSubProducerStateChanged.cs ########## @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Internal.Events +{ + using Abstractions; + using System; + + /// <summary> + /// Representation of the sub producer of a partitioned producer state change. + /// </summary> + public class PartitionedSubProducerStateChanged : IEvent Review comment: Let's mark it as sealed ########## File path: src/DotPulsar/Internal/PartitionedProducer.cs ########## @@ -0,0 +1,99 @@ +namespace DotPulsar.Internal +{ + using Abstractions; + using DotPulsar.Abstractions; + using Events; + using System; + using System.Collections.Concurrent; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + + public class PartitionedProducer<TMessage> : IProducer<TMessage> + { + private readonly Guid _correlationId; + private readonly IRegisterEvent _eventRegister; + private readonly IStateChanged<ProducerState> _state; + private readonly PulsarClient _pulsarClient; + private readonly ProducerOptions<TMessage> _options; + private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers; + private readonly IMessageRouter _messageRouter; + private readonly CancellationTokenSource _cts = new(); + private int _isDisposed; + private int _producersCount; + public Uri ServiceUrl { get; } + public string Topic { get; } + + public PartitionedProducer( + Guid correlationId, + Uri serviceUrl, + string topic, + IRegisterEvent registerEvent, + IStateChanged<ProducerState> state, + uint partitionsCount, + ProducerOptions<TMessage> options, + PulsarClient pulsarClient + ) + { + _correlationId = correlationId; + ServiceUrl = serviceUrl; + Topic = topic; + _eventRegister = registerEvent; + _state = state; + _isDisposed = 0; + _options = options; + _pulsarClient = pulsarClient; + _producersCount = (int) partitionsCount; + _messageRouter = options.MessageRouter; + + _producers = new ConcurrentDictionary<int, IProducer<TMessage>>(Environment.ProcessorCount, _producersCount); + CreateSubProducers(0, _producersCount).Wait(); Review comment: .Wait() and .Result on a task are blocking and as such a no-go. We need to rethink how to create a partitioned producer without "Create" needing to be an async operation. ########## File path: src/DotPulsar/Internal/PartitionedProducer.cs ########## @@ -0,0 +1,99 @@ +namespace DotPulsar.Internal +{ + using Abstractions; + using DotPulsar.Abstractions; + using Events; + using System; + using System.Collections.Concurrent; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + + public class PartitionedProducer<TMessage> : IProducer<TMessage> + { + private readonly Guid _correlationId; + private readonly IRegisterEvent _eventRegister; + private readonly IStateChanged<ProducerState> _state; + private readonly PulsarClient _pulsarClient; + private readonly ProducerOptions<TMessage> _options; + private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers; + private readonly IMessageRouter _messageRouter; + private readonly CancellationTokenSource _cts = new(); + private int _isDisposed; + private int _producersCount; + public Uri ServiceUrl { get; } + public string Topic { get; } + + public PartitionedProducer( + Guid correlationId, + Uri serviceUrl, + string topic, + IRegisterEvent registerEvent, + IStateChanged<ProducerState> state, + uint partitionsCount, + ProducerOptions<TMessage> options, + PulsarClient pulsarClient + ) + { + _correlationId = correlationId; + ServiceUrl = serviceUrl; + Topic = topic; + _eventRegister = registerEvent; + _state = state; + _isDisposed = 0; + _options = options; + _pulsarClient = pulsarClient; + _producersCount = (int) partitionsCount; + _messageRouter = options.MessageRouter; + + _producers = new ConcurrentDictionary<int, IProducer<TMessage>>(Environment.ProcessorCount, _producersCount); + CreateSubProducers(0, _producersCount).Wait(); + } + + private async Task CreateSubProducers(int startIndex, int count) + { + await Task.WhenAll(Enumerable.Range(startIndex, count).Select(n => Review comment: The producers can just be created, no need to create a task for creating them ########## File path: src/DotPulsar/Internal/PartitionedProducer.cs ########## @@ -0,0 +1,99 @@ +namespace DotPulsar.Internal +{ + using Abstractions; + using DotPulsar.Abstractions; + using Events; + using System; + using System.Collections.Concurrent; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + + public class PartitionedProducer<TMessage> : IProducer<TMessage> + { + private readonly Guid _correlationId; + private readonly IRegisterEvent _eventRegister; + private readonly IStateChanged<ProducerState> _state; + private readonly PulsarClient _pulsarClient; + private readonly ProducerOptions<TMessage> _options; + private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers; + private readonly IMessageRouter _messageRouter; + private readonly CancellationTokenSource _cts = new(); + private int _isDisposed; + private int _producersCount; + public Uri ServiceUrl { get; } + public string Topic { get; } + + public PartitionedProducer( + Guid correlationId, + Uri serviceUrl, + string topic, + IRegisterEvent registerEvent, + IStateChanged<ProducerState> state, + uint partitionsCount, + ProducerOptions<TMessage> options, + PulsarClient pulsarClient + ) + { + _correlationId = correlationId; + ServiceUrl = serviceUrl; + Topic = topic; + _eventRegister = registerEvent; + _state = state; + _isDisposed = 0; + _options = options; + _pulsarClient = pulsarClient; + _producersCount = (int) partitionsCount; + _messageRouter = options.MessageRouter; + + _producers = new ConcurrentDictionary<int, IProducer<TMessage>>(Environment.ProcessorCount, _producersCount); + CreateSubProducers(0, _producersCount).Wait(); + } + + private async Task CreateSubProducers(int startIndex, int count) + { + await Task.WhenAll(Enumerable.Range(startIndex, count).Select(n => + { + return Task.Run( () => + { + var producer = _pulsarClient.NewProducer(Topic, _options, (uint)n, _correlationId); + _producers[n] = producer; + }, _cts.Token); + }).ToList()).ConfigureAwait(false); + } + + public bool IsFinalState() + => _state.IsFinalState(); + + public bool IsFinalState(ProducerState state) + => _state.IsFinalState(state); + + public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken = default) + => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false); + + public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken = default) + => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false); + + public async ValueTask DisposeAsync() + { + if (Interlocked.Exchange(ref _isDisposed, 1) != 0) + return; + + _cts.Cancel(); + _cts.Dispose(); + + foreach (var producer in _producers.Values) + { + await producer.DisposeAsync().ConfigureAwait(false); + } + + _eventRegister.Register(new ProducerDisposed(_correlationId)); + } + + public ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken = default) + => _producers[_messageRouter.ChoosePartition(null, _producersCount)].Send(message, cancellationToken); Review comment: Doing this is totally valid, but in the name of consistency let's await it ########## File path: src/DotPulsar/Internal/Events/UpdatePartitions.cs ########## @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Internal.Events +{ + using Abstractions; + using System; + + /// <summary> + /// Representation of the partitions count of the partitioned topic updating. + /// </summary> + public class UpdatePartitions : IEvent Review comment: Let's mark it as sealed ########## File path: src/DotPulsar/Internal/PartitionedProducer.cs ########## @@ -0,0 +1,99 @@ +namespace DotPulsar.Internal +{ + using Abstractions; + using DotPulsar.Abstractions; + using Events; + using System; + using System.Collections.Concurrent; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + + public class PartitionedProducer<TMessage> : IProducer<TMessage> + { + private readonly Guid _correlationId; + private readonly IRegisterEvent _eventRegister; + private readonly IStateChanged<ProducerState> _state; + private readonly PulsarClient _pulsarClient; + private readonly ProducerOptions<TMessage> _options; + private readonly ConcurrentDictionary<int, IProducer<TMessage>> _producers; + private readonly IMessageRouter _messageRouter; + private readonly CancellationTokenSource _cts = new(); + private int _isDisposed; + private int _producersCount; + public Uri ServiceUrl { get; } + public string Topic { get; } + + public PartitionedProducer( + Guid correlationId, + Uri serviceUrl, + string topic, + IRegisterEvent registerEvent, + IStateChanged<ProducerState> state, + uint partitionsCount, + ProducerOptions<TMessage> options, + PulsarClient pulsarClient + ) + { + _correlationId = correlationId; + ServiceUrl = serviceUrl; + Topic = topic; + _eventRegister = registerEvent; + _state = state; + _isDisposed = 0; + _options = options; + _pulsarClient = pulsarClient; + _producersCount = (int) partitionsCount; + _messageRouter = options.MessageRouter; + + _producers = new ConcurrentDictionary<int, IProducer<TMessage>>(Environment.ProcessorCount, _producersCount); + CreateSubProducers(0, _producersCount).Wait(); + } + + private async Task CreateSubProducers(int startIndex, int count) + { + await Task.WhenAll(Enumerable.Range(startIndex, count).Select(n => + { + return Task.Run( () => + { + var producer = _pulsarClient.NewProducer(Topic, _options, (uint)n, _correlationId); + _producers[n] = producer; + }, _cts.Token); + }).ToList()).ConfigureAwait(false); + } + + public bool IsFinalState() + => _state.IsFinalState(); + + public bool IsFinalState(ProducerState state) + => _state.IsFinalState(state); + + public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken = default) + => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false); + + public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken = default) + => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false); + + public async ValueTask DisposeAsync() + { + if (Interlocked.Exchange(ref _isDisposed, 1) != 0) + return; + + _cts.Cancel(); + _cts.Dispose(); + + foreach (var producer in _producers.Values) + { + await producer.DisposeAsync().ConfigureAwait(false); + } + + _eventRegister.Register(new ProducerDisposed(_correlationId)); + } + + public ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken = default) + => _producers[_messageRouter.ChoosePartition(null, _producersCount)].Send(message, cancellationToken); + + public ValueTask<MessageId> Send(MessageMetadata metadata, TMessage message, CancellationToken cancellationToken = default) + => _producers[_messageRouter.ChoosePartition(metadata, _producersCount)].Send(message, cancellationToken); Review comment: Doing this is totally valid, but in the name of consistency let's await it ########## File path: src/DotPulsar/Internal/PartitionedProducer.cs ########## @@ -0,0 +1,99 @@ +namespace DotPulsar.Internal +{ + using Abstractions; + using DotPulsar.Abstractions; + using Events; + using System; + using System.Collections.Concurrent; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + + public class PartitionedProducer<TMessage> : IProducer<TMessage> Review comment: Let's mark it as sealed ########## File path: src/DotPulsar/PulsarClient.cs ########## @@ -37,7 +38,7 @@ public sealed class PulsarClient : IPulsarClient public Uri ServiceUrl { get; } - internal PulsarClient( + public PulsarClient( Review comment: We can't make this public. If you need to create an instance for testing, then (in the internal namespace) you can create a PulsarClientFactory ########## File path: src/DotPulsar/PulsarClient.cs ########## @@ -57,26 +58,84 @@ public sealed class PulsarClient : IPulsarClient public static IPulsarClientBuilder Builder() => new PulsarClientBuilder(); + public async Task<uint> GetNumberOfPartitions(string topic, CancellationToken cancellationToken) + { + var connection = await _connectionPool.FindConnectionForTopic(topic, cancellationToken).ConfigureAwait(false); + var commandPartitionedMetadata = new CommandPartitionedTopicMetadata() { Topic = topic }; + var response = await connection.Send(commandPartitionedMetadata, cancellationToken).ConfigureAwait(false); + + response.Expect(BaseCommand.Type.PartitionedMetadataResponse); + + if (response.PartitionMetadataResponse.Response == CommandPartitionedTopicMetadataResponse.LookupType.Failed) + response.PartitionMetadataResponse.Throw(); + + return response.PartitionMetadataResponse.Partitions; + } + /// <summary> /// Create a producer. /// </summary> public IProducer<TMessage> CreateProducer<TMessage>(ProducerOptions<TMessage> options) { ThrowIfDisposed(); + var partitionsCount = GetNumberOfPartitions(options.Topic, default).Result; Review comment: .Result is a no-go, so we need to rethink how the partitioned producer is created. Let's have a talk about that on slack maybe? ########## File path: tests/DotPulsar.Tests/DotPulsar.Tests.csproj ########## @@ -8,6 +8,7 @@ <ItemGroup> <PackageReference Include="FluentAssertions" Version="5.10.3" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.9.1" /> + <PackageReference Include="Moq" Version="4.16.1" /> Review comment: Like we talk about, we'll be using NSubstitute instead ########## File path: src/DotPulsar/PulsarClient.cs ########## @@ -57,26 +58,84 @@ public sealed class PulsarClient : IPulsarClient public static IPulsarClientBuilder Builder() => new PulsarClientBuilder(); + public async Task<uint> GetNumberOfPartitions(string topic, CancellationToken cancellationToken) Review comment: Intentional that it is public? ########## File path: src/DotPulsar/RoundRobinPartitionRouter.cs ########## @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar +{ + using Abstractions; + using HashDepot; + using System.Text; + using System.Threading; + + public class RoundRobinPartitionRouter : IMessageRouter Review comment: Let's make this sealed ########## File path: tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs ########## @@ -0,0 +1,105 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Tests.Internal +{ + using Abstractions; + using DotPulsar.Internal; + using DotPulsar.Internal.Abstractions; + using DotPulsar.Internal.Events; + using Moq; + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using Xunit; + + public class PartitionedProducerProcessTests + { + [Fact] + public void TestPartitionedProducerStateManage() + { + var connectionPoolMock = new Mock<IConnectionPool>(MockBehavior.Loose); + var connectionPool = connectionPoolMock.Object; + var establishNewChannelMock = new Mock<IEstablishNewChannel>(MockBehavior.Loose); + var establishNewChannel = establishNewChannelMock.Object; + var producerMock = new Mock<IProducer>(MockBehavior.Loose); + var producer = producerMock.Object; + + var processManager = new ProcessManager(connectionPool); + + var producerProcesses = new Dictionary<uint, ProducerProcess>(3); + var producerGuids = new Dictionary<uint, Guid>(3); + var producersGroup = new ConcurrentDictionary<uint, IProducer>(Environment.ProcessorCount, 3); + var partitionedProducerGuid = Guid.NewGuid(); + + for (uint i = 0; i < 3; i++) + { + var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted); + var correlationId = Guid.NewGuid(); + var process = new ProducerProcess(correlationId, stateManager, establishNewChannel, processManager, partitionedProducerGuid, i); + producerGuids[i] = correlationId; + producerProcesses[i] = process; + producersGroup[i] = producer; + processManager.Add(process); + } + + var partitionedStateManager = + new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted); + var partitionedProducerProcess = new PartitionedProducerProcess(partitionedProducerGuid, partitionedStateManager, producersGroup.Count); + processManager.Add(partitionedProducerProcess); + + // Test connect + Assert.Equal(ProducerState.Disconnected, partitionedStateManager.CurrentState); + processManager.Register(new ChannelConnected(producerGuids[0])); + Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState); + processManager.Register(new ChannelConnected(producerGuids[1])); + Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState); + processManager.Register(new ChannelConnected(producerGuids[2])); + Assert.Equal(ProducerState.Connected, partitionedStateManager.CurrentState); + + // Test disconnect + processManager.Register(new ChannelDisconnected(producerGuids[1])); + Assert.Equal(ProducerState.PartiallyConnected, partitionedStateManager.CurrentState); + + // Test reconnect + processManager.Register(new ChannelConnected(producerGuids[1])); + Assert.Equal(ProducerState.Connected, partitionedStateManager.CurrentState); + + // Test fault + processManager.Register(new ExecutorFaulted(producerGuids[1])); + Assert.Equal(ProducerState.Faulted, partitionedStateManager.CurrentState); + } + + [Fact] + public void TestUpdatePartitions() Review comment: The test naming convention is [Method/Feature]_[Given/When]Something_[Should/Then]ExpectedResult ########## File path: tests/DotPulsar.Tests/PulsarClientTests.cs ########## @@ -0,0 +1,69 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Tests +{ + using Abstractions; + using DotPulsar.Internal; + using DotPulsar.Internal.Abstractions; + using DotPulsar.Internal.PulsarApi; + using Moq; + using System; + using System.Threading; + using Xunit; + + public class PulsarClientTests + { + [Fact] + public async void GetPartitions_GivenPartitionedTopic_ShouldReturnPartitionsNumber() Review comment: Use "Task" instead of "void" ########## File path: src/DotPulsar/Internal/PartitionedProducerProcess.cs ########## @@ -0,0 +1,93 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Internal +{ + using Abstractions; + using Events; + using System; + using System.Threading; + using System.Threading.Tasks; + + public class PartitionedProducerProcess : IProcess Review comment: As we talked about, this can just be deleted as you decided to make changes to ProducerProcess instead. ########## File path: tests/DotPulsar.Tests/Internal/PartitionedProducerProcessTests.cs ########## @@ -0,0 +1,105 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace DotPulsar.Tests.Internal +{ + using Abstractions; + using DotPulsar.Internal; + using DotPulsar.Internal.Abstractions; + using DotPulsar.Internal.Events; + using Moq; + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using Xunit; + + public class PartitionedProducerProcessTests + { + [Fact] + public void TestPartitionedProducerStateManage() Review comment: The test naming convention is [Method/Feature]_[Given/When]Something_[Should/Then]ExpectedResult -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org