usaguerrilla commented on a change in pull request #48:
URL: https://github.com/apache/pulsar-dotpulsar/pull/48#discussion_r471829444



##########
File path: src/DotPulsar/PulsarClient.cs
##########
@@ -53,6 +60,38 @@ public static IPulsarClientBuilder Builder()
         public IProducer CreateProducer(ProducerOptions options)
         {
             ThrowIfDisposed();
+            var partitionedTopicMetadata = 
GetPartitionTopicMetadata(options.Topic).Result;
+            if (partitionedTopicMetadata.Partitions > 0)
+            {
+                var correlationId = Guid.NewGuid();
+                var executor = new Executor(correlationId, _processManager, 
_exceptionHandler);
+                var stateManager = new 
StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, 
ProducerState.Faulted);
+                var producers = new ConcurrentDictionary<int, IProducer>();
+                var subproducerTasks = new 
List<Task>(partitionedTopicMetadata.Partitions);
+                for (int i = 0; i < partitionedTopicMetadata.Partitions; i++)
+                {
+                    int partID = i;
+                    subproducerTasks.Add(Task.Run(() =>
+                    {
+                        var subproducerOption = new ProducerOptions(options)
+                        {
+                            Topic = $"{options.Topic}-partition-{partID}"
+                        };
+                        producers[partID] = 
CreateProducerWithoutCheckingPartition(subproducerOption);

Review comment:
       did we lose ```_ = await 
producer.StateChangedTo(ProducerState.Connected).ConfigureAwait(false)``` here 
somehow? After:
   ```producers[partID] = 
CreateProducerWithoutCheckingPartition(subproducerOption);```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to