Hi Matthias, Thanks. The perf test is a good start but I don't think it goes far enough. 100 partitions is not a lot. What happens when there are thousands of partitions? What is the load on the brokers? How much more memory is used by the Streams App etc?
Thanks, Damian On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax <matth...@confluent.io> wrote: > Hi, > > I want to give a first respond: > > > > 1. Producer per task: > > First, we did some performance tests, indicating that the performance > penalty is small. Please have a look here: > > https://docs.google.com/spreadsheets/d/18aGOB13-ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing > > For the test, we ran with a trunk version and a modified version that > uses a producer per task (of course, no transactions, but at-least-once > semantics). The scaling factor indicates the number of brokers and > (single threaded) Streams instances. We used SimpleBenchmark that is > part of AK code base. > > > Second, as the design is "producer per task" (and not "producer per > partition") it is possible to specify a custom PartitionGrouper that > assigns multiple partitions to a single task. Thus, it allows to reduce > the number of tasks for scenarios with many partitions. Right now, this > interface must be implemented solely by the user, but we could also add > a new config parameter that specifies the max.number.of.tasks or > partitions.per.task so that the user can configure this instead of > implementing the interface. > > Third, there is the idea of a "Producer Pool" that would allow to share > resources (network connections, memory, etc) over multiple producers. > This would allow to separate multiple transaction on the producer level, > while resources are shared. There is no detailed design document yet and > there would be a KIP for this feature. > > Thus, if there should be any performance problems for high scale > scenarios, there are multiple ways to tackle them while keeping the > "producer per task" design. > > Additionally, a "producer per thread" design would be way more complex > and I summarized the issues in a separate document. I will share a link > to the document soon. > > > > 2. StateStore recovery: > > Streams EoS will in the first design not allow to exploit the > improvements that are added for 0.11 at the moment. However, as 0.10.2 > faces the same issues of potentially long recovery, there is no > regression with this regard. Thus, I see those improvements as > orthogonal or add-ons. Nevertheless, we should try to explore those > options and if possible get them into 0.11 such that Streams with EoS > gets the same improvements as at-least-once scenario. > > > > 3. Caching: > > We might need to do some experiments to quantify the impact on caching. > If it's severe, the suggested default commit interval of 100ms could > also be increased. Also, EoS will not enforce any commit interval, but > only change the default value. Thus, a user can freely trade-off latency > vs. caching-effect. > > Last but not least, there is the idea to allow "read_uncommitted" for > intermediate topic. This would be an advance design for Streams EoS that > allows downstream sub-topologies to read uncommitted data > optimistically. In case of failure, a cascading abort of transactions > would be required. This change will need another KIP. > > > > 4. Idempotent Producer: > > The transactional part automatically leverages the idempotent properties > of the producer. Idempotency is a requirement: > > > Note that enable.idempotence must be enabled if a TransactionalId is > configured. > > See > > https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#bookmark=id.g2xsf9n49puh > > All idempotent retries, are handled by the producer internally (with or > without transaction) if enable.idempotence is set to true. > > > > -Matthias > > > > On 3/3/17 3:34 AM, Eno Thereska wrote: > > Another question: > > > > The KIP doesn’t exactly spell out how it uses the idempotence guarantee > from KIP-98. It seems that only the transactional part is needed. Or is the > idempotence guarantee working behind the scenes and helping for some > scenarios for which it is not worthwhile aborting a transaction (e.g., > retransmitting a record after a temporary network glitch)? > > > > Thanks > > Eno > > > >> On Mar 2, 2017, at 4:56 PM, Jay Kreps <j...@confluent.io> wrote: > >> > >> I second the concern on with the one producer per task approach. At a > >> high-level it seems to make sense but I think Damian is exactly right > that > >> that cuts against the general design of the producer. Many people have > high > >> input partition counts and will have high task counts as a result. I > think > >> processing 1000 partitions should not be an unreasonable thing to want > to > >> do. > >> > >> The tricky bits will be: > >> > >> - Reduced effectiveness of batching (or more latency and memory to get > >> equivalent batching). This doesn't show up in simple benchmarks > because > >> much of the penalty is I/O and CPU on the broker and the additional > threads > >> from all the producers can make a single-threaded benchmark seem > faster. > >> - TCP connection explosion. We maintain one connection per broker. > This > >> is already high since each app instance does this. This design though > will > >> add an additional multiplicative factor based on the partition count > of the > >> input. > >> - Connection and metadata request storms. When an instance with 1000 > >> tasks starts up it is going to try to create many thousands of > connections > >> and issue a thousand metadata requests all at once. > >> - Memory usage. We currently default to 64MB per producer. This can be > >> tuned down, but the fact that we are spreading the batching over more > >> producers will fundamentally mean we need a lot more memory to get > good > >> perf and the memory usage will change as your task assignment changes > so it > >> will be hard to set correctly unless it is done automatically. > >> - Metrics explosion (1000 producer instances, each with their own > >> metrics to monitor). > >> - Thread explosion, 1000 background threads, one per producer, each > >> sending data. > >> > >> -Jay > >> > >> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <damian....@gmail.com> > wrote: > >> > >>> Hi Guozhang, > >>> > >>> Thanks for the KIP! This is an important feature for Kafka Streams and > will > >>> help to unlock a bunch of use cases. > >>> > >>> I have some concerns/questions: > >>> > >>> 1. Producer per task: I'm worried about the overhead this is going to > >>> put on both the streams app and the Kafka Brokers. You can easily > >>> imagine > >>> an app consuming thousands of partitions. What load will this put on > the > >>> brokers? Am i correct in assuming that there will be metadata > requests > >>> per > >>> Producer? The memory overhead in the streams app will also increase > >>> fairly > >>> significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_CONFIG? > >>> 2. State Store recovery: As we already know, restoring the entire > >>> changelog can take an extremely long time. Even with a fairly small > >>> dataset > >>> and an inappropriately tuned segment size, this can take way too > long. > >>> My > >>> concern is that failures happen and then recovery takes "forever" > and we > >>> end up in a situation where we need to change the max.poll.interval > to > >>> be > >>> some very large number or else we end up in "rebalance hell". I don't > >>> think > >>> this provides a very good user experience. You mention RocksDB > >>> checkpointing in the doc - should we explore this idea some more? > i.e., > >>> understand the penalty for checkpointing. Maybe checkpoint every *n* > >>> commits? > >>> 3. What does EoS mean for Caching? If we set the commit interval to > >>> 100ms then the cache is not going to be very effective. Should it > just > >>> be > >>> disabled? > >>> > >>> Thanks, > >>> Damian > >>> > >>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <wangg...@gmail.com> wrote: > >>> > >>>> Hi all, > >>>> > >>>> I have just created KIP-129 to leverage KIP-98 in Kafka Streams and > >>> provide > >>>> exactly-once processing semantics: > >>>> > >>>> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>> 129%3A+Streams+Exactly-Once+Semantics > >>>> > >>>> This KIP enables Streams users to optionally turn on exactly-once > >>>> processing semantics without changing their app code at all by > leveraging > >>>> the transactional messaging features provided in KIP-98. > >>>> > >>>> The above wiki page provides a high-level view of the proposed > changes, > >>>> while detailed implementation design can be found in this Google doc: > >>>> > >>>> > >>>> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMadu > >>> FK1DAB8_gBYA2c > >>>> > >>>> We would love to hear your comments and suggestions. > >>>> > >>>> Thanks, > >>>> -- Guozhang > >>>> > >>> > > > >