Another question: The KIP doesn’t exactly spell out how it uses the idempotence guarantee from KIP-98. It seems that only the transactional part is needed. Or is the idempotence guarantee working behind the scenes and helping for some scenarios for which it is not worthwhile aborting a transaction (e.g., retransmitting a record after a temporary network glitch)?
Thanks Eno > On Mar 2, 2017, at 4:56 PM, Jay Kreps <j...@confluent.io> wrote: > > I second the concern on with the one producer per task approach. At a > high-level it seems to make sense but I think Damian is exactly right that > that cuts against the general design of the producer. Many people have high > input partition counts and will have high task counts as a result. I think > processing 1000 partitions should not be an unreasonable thing to want to > do. > > The tricky bits will be: > > - Reduced effectiveness of batching (or more latency and memory to get > equivalent batching). This doesn't show up in simple benchmarks because > much of the penalty is I/O and CPU on the broker and the additional threads > from all the producers can make a single-threaded benchmark seem faster. > - TCP connection explosion. We maintain one connection per broker. This > is already high since each app instance does this. This design though will > add an additional multiplicative factor based on the partition count of the > input. > - Connection and metadata request storms. When an instance with 1000 > tasks starts up it is going to try to create many thousands of connections > and issue a thousand metadata requests all at once. > - Memory usage. We currently default to 64MB per producer. This can be > tuned down, but the fact that we are spreading the batching over more > producers will fundamentally mean we need a lot more memory to get good > perf and the memory usage will change as your task assignment changes so it > will be hard to set correctly unless it is done automatically. > - Metrics explosion (1000 producer instances, each with their own > metrics to monitor). > - Thread explosion, 1000 background threads, one per producer, each > sending data. > > -Jay > > On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <damian....@gmail.com> wrote: > >> Hi Guozhang, >> >> Thanks for the KIP! This is an important feature for Kafka Streams and will >> help to unlock a bunch of use cases. >> >> I have some concerns/questions: >> >> 1. Producer per task: I'm worried about the overhead this is going to >> put on both the streams app and the Kafka Brokers. You can easily >> imagine >> an app consuming thousands of partitions. What load will this put on the >> brokers? Am i correct in assuming that there will be metadata requests >> per >> Producer? The memory overhead in the streams app will also increase >> fairly >> significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_CONFIG? >> 2. State Store recovery: As we already know, restoring the entire >> changelog can take an extremely long time. Even with a fairly small >> dataset >> and an inappropriately tuned segment size, this can take way too long. >> My >> concern is that failures happen and then recovery takes "forever" and we >> end up in a situation where we need to change the max.poll.interval to >> be >> some very large number or else we end up in "rebalance hell". I don't >> think >> this provides a very good user experience. You mention RocksDB >> checkpointing in the doc - should we explore this idea some more? i.e., >> understand the penalty for checkpointing. Maybe checkpoint every *n* >> commits? >> 3. What does EoS mean for Caching? If we set the commit interval to >> 100ms then the cache is not going to be very effective. Should it just >> be >> disabled? >> >> Thanks, >> Damian >> >> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <wangg...@gmail.com> wrote: >> >>> Hi all, >>> >>> I have just created KIP-129 to leverage KIP-98 in Kafka Streams and >> provide >>> exactly-once processing semantics: >>> >>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> 129%3A+Streams+Exactly-Once+Semantics >>> >>> This KIP enables Streams users to optionally turn on exactly-once >>> processing semantics without changing their app code at all by leveraging >>> the transactional messaging features provided in KIP-98. >>> >>> The above wiki page provides a high-level view of the proposed changes, >>> while detailed implementation design can be found in this Google doc: >>> >>> >>> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMadu >> FK1DAB8_gBYA2c >>> >>> We would love to hear your comments and suggestions. >>> >>> Thanks, >>> -- Guozhang >>> >>