Hello Sean,

Yes atm we have one producer per task when EOS is turned on compared to one
producer per thread without EOS. There's an ongoing KIP-447 aiming to bring
this back to one producer per thread with EOS as well which involves
broker-side changes.

To answer your question:

1. Assuming each task only have one input topic, then yes there's a 1:1
mapping; if a task involves e.g. joining topic A and topic B, then task1
would be composed of A-1 and B-1 as input. The reason for having one
producer per task with EOS and the proposal to get rid of this for better
scalability can be found in this meetup talk:
https://www.youtube.com/watch?v=j0l_zUhQaTc&list=PLa7VYi0yPIH3kDYOrar5B26WkslCGfDy_&index=7

2. The major performance implications is two folds: 1) given a fixed total
traffic, the more producers created, the less the batching effects and
hence inferior throughput; 2) on the broker side, increased num.connections
and request rate (i.e. more smaller requests compared to fewer larger
requests) is also an increased load overhead.

We plan to address KIP-447 in the near term so that we can lift these
performance hurdles.

Guozhang

On Tue, Oct 15, 2019 at 12:53 PM Sean Glover <sean.glo...@lightbend.com>
wrote:

> Hi,
>
> I would like to understand better how a `KafkaProducer` is used within
> Kafka Streams EoS use cases.  In the Streams Exactly Once Design [1]
> document it states that there is one producer per StreamThread:
>
> Each thread contains one producer client and two consumer clients (one for
> > normal fetching from input topic partitions, and one for fetching from
> > changelog topics for state restoration only). Tasks assigned to the same
> > thread will then share these clients for fetching and producing messages.
>
>
> But when I look at the 2.3.0 implementation I see that the `threadProducer`
> is only defined when not using EoS [2], otherwise it is `null`. Then in
> `TaskCreator` a `createProducer` method is defined which will create a new
> `KafkaProducer` per task id when the `threadProducer` is null [3].  This
> behaviour seems to be confirmed with the
> `shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable` test
> [4].
>
> I assume this just a case of the design doc falling out of sync with the
> impl., or maybe I misunderstood its original meaning.  Either way, I still
> have some questions:
>
> 1. It's my understanding that a `TaskId` is 1:1 with a partition, so is it
> the case that there is 1 `KafkaProducer` created per partition?  Is this to
> make it easier to support transactions across apps that have multiple group
> members, to make rebalancing easier?
> 2. Are there any performance implications to running so many
> KafkaProducer's?  I assume this impl. could lead to 100-1000s of producer
> instances created across all Kafka Streams app instances.
>
> [1] Streams Exactly Once Design -
>
> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMaduFK1DAB8_gBYA2c/edit#heading=h.mki3gltx1zw
> [2] StreamThread.java, creating a `threadProducer` -
>
> https://github.com/apache/kafka/blob/c55277cd79f103d9c686b9698f9f63208fdee272/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L501..L507
> [3] StreamThread.java, `TaskCreator.producerProvider` -
>
> https://github.com/apache/kafka/blob/c55277cd79f103d9c686b9698f9f63208fdee272/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L367..L373
> [4] StreamThreadTest.java,
> `shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable` -
>
> https://github.com/apache/kafka/blob/e3c2148b207a6ca98c89211d12cb47abdfaa70b3/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L583
>
> Regards,
> Sean
>
> PS. I wasn't sure if this would be more appropriate for the dev@ list, but
> I thought I would ask here first.
>


-- 
-- Guozhang

Reply via email to