usaguerrilla commented on a change in pull request #48: URL: https://github.com/apache/pulsar-dotpulsar/pull/48#discussion_r471829444
########## File path: src/DotPulsar/PulsarClient.cs ########## @@ -53,6 +60,38 @@ public static IPulsarClientBuilder Builder() public IProducer CreateProducer(ProducerOptions options) { ThrowIfDisposed(); + var partitionedTopicMetadata = GetPartitionTopicMetadata(options.Topic).Result; + if (partitionedTopicMetadata.Partitions > 0) + { + var correlationId = Guid.NewGuid(); + var executor = new Executor(correlationId, _processManager, _exceptionHandler); + var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted); + var producers = new ConcurrentDictionary<int, IProducer>(); + var subproducerTasks = new List<Task>(partitionedTopicMetadata.Partitions); + for (int i = 0; i < partitionedTopicMetadata.Partitions; i++) + { + int partID = i; + subproducerTasks.Add(Task.Run(() => + { + var subproducerOption = new ProducerOptions(options) + { + Topic = $"{options.Topic}-partition-{partID}" + }; + producers[partID] = CreateProducerWithoutCheckingPartition(subproducerOption); Review comment: did we lose ```_ = await producer.StateChangedTo(ProducerState.Connected).ConfigureAwait(false)``` here somehow? After: ```producers[partID] = CreateProducerWithoutCheckingPartition(subproducerOption);``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org