Hello Sean, Yes atm we have one producer per task when EOS is turned on compared to one producer per thread without EOS. There's an ongoing KIP-447 aiming to bring this back to one producer per thread with EOS as well which involves broker-side changes.
To answer your question: 1. Assuming each task only have one input topic, then yes there's a 1:1 mapping; if a task involves e.g. joining topic A and topic B, then task1 would be composed of A-1 and B-1 as input. The reason for having one producer per task with EOS and the proposal to get rid of this for better scalability can be found in this meetup talk: https://www.youtube.com/watch?v=j0l_zUhQaTc&list=PLa7VYi0yPIH3kDYOrar5B26WkslCGfDy_&index=7 2. The major performance implications is two folds: 1) given a fixed total traffic, the more producers created, the less the batching effects and hence inferior throughput; 2) on the broker side, increased num.connections and request rate (i.e. more smaller requests compared to fewer larger requests) is also an increased load overhead. We plan to address KIP-447 in the near term so that we can lift these performance hurdles. Guozhang On Tue, Oct 15, 2019 at 12:53 PM Sean Glover <sean.glo...@lightbend.com> wrote: > Hi, > > I would like to understand better how a `KafkaProducer` is used within > Kafka Streams EoS use cases. In the Streams Exactly Once Design [1] > document it states that there is one producer per StreamThread: > > Each thread contains one producer client and two consumer clients (one for > > normal fetching from input topic partitions, and one for fetching from > > changelog topics for state restoration only). Tasks assigned to the same > > thread will then share these clients for fetching and producing messages. > > > But when I look at the 2.3.0 implementation I see that the `threadProducer` > is only defined when not using EoS [2], otherwise it is `null`. Then in > `TaskCreator` a `createProducer` method is defined which will create a new > `KafkaProducer` per task id when the `threadProducer` is null [3]. This > behaviour seems to be confirmed with the > `shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable` test > [4]. > > I assume this just a case of the design doc falling out of sync with the > impl., or maybe I misunderstood its original meaning. Either way, I still > have some questions: > > 1. It's my understanding that a `TaskId` is 1:1 with a partition, so is it > the case that there is 1 `KafkaProducer` created per partition? Is this to > make it easier to support transactions across apps that have multiple group > members, to make rebalancing easier? > 2. Are there any performance implications to running so many > KafkaProducer's? I assume this impl. could lead to 100-1000s of producer > instances created across all Kafka Streams app instances. > > [1] Streams Exactly Once Design - > > https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMaduFK1DAB8_gBYA2c/edit#heading=h.mki3gltx1zw > [2] StreamThread.java, creating a `threadProducer` - > > https://github.com/apache/kafka/blob/c55277cd79f103d9c686b9698f9f63208fdee272/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L501..L507 > [3] StreamThread.java, `TaskCreator.producerProvider` - > > https://github.com/apache/kafka/blob/c55277cd79f103d9c686b9698f9f63208fdee272/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L367..L373 > [4] StreamThreadTest.java, > `shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable` - > > https://github.com/apache/kafka/blob/e3c2148b207a6ca98c89211d12cb47abdfaa70b3/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L583 > > Regards, > Sean > > PS. I wasn't sure if this would be more appropriate for the dev@ list, but > I thought I would ask here first. > -- -- Guozhang