Hi Matthias,

Thanks. The perf test is a good start but I don't think it goes far enough.
100 partitions is not a lot. What happens when there are thousands of
partitions? What is the load on the brokers? How much more memory is used
by the Streams App etc?

Thanks,
Damian

On Tue, 7 Mar 2017 at 03:02 Matthias J. Sax <matth...@confluent.io> wrote:

> Hi,
>
> I want to give a first respond:
>
>
>
> 1. Producer per task:
>
> First, we did some performance tests, indicating that the performance
> penalty is small. Please have a look here:
>
> https://docs.google.com/spreadsheets/d/18aGOB13-ibwHJl5VE27HnHBlNkZAIeLF9yQ99FDxkuM/edit?usp=sharing
>
> For the test, we ran with a trunk version and a modified version that
> uses a producer per task (of course, no transactions, but at-least-once
> semantics). The scaling factor indicates the number of brokers and
> (single threaded) Streams instances. We used SimpleBenchmark that is
> part of AK code base.
>
>
> Second, as the design is "producer per task" (and not "producer per
> partition") it is possible to specify a custom PartitionGrouper that
> assigns multiple partitions to a single task. Thus, it allows to reduce
> the number of tasks for scenarios with many partitions. Right now, this
> interface must be implemented solely by the user, but we could also add
> a new config parameter that specifies the max.number.of.tasks or
> partitions.per.task so that the user can configure this instead of
> implementing the interface.
>
> Third, there is the idea of a "Producer Pool" that would allow to share
> resources (network connections, memory, etc) over multiple producers.
> This would allow to separate multiple transaction on the producer level,
> while resources are shared. There is no detailed design document yet and
> there would be a KIP for this feature.
>
> Thus, if there should be any performance problems for high scale
> scenarios, there are multiple ways to tackle them while keeping the
> "producer per task" design.
>
> Additionally, a "producer per thread" design would be way more complex
> and I summarized the issues in a separate document. I will share a link
> to the document soon.
>
>
>
> 2. StateStore recovery:
>
> Streams EoS will in the first design not allow to exploit the
> improvements that are added for 0.11 at the moment. However, as 0.10.2
> faces the same issues of potentially long recovery, there is no
> regression with this regard. Thus, I see those improvements as
> orthogonal or add-ons. Nevertheless, we should try to explore those
> options and if possible get them into 0.11 such that Streams with EoS
> gets the same improvements as at-least-once scenario.
>
>
>
> 3. Caching:
>
> We might need to do some experiments to quantify the impact on caching.
> If it's severe, the suggested default commit interval of 100ms could
> also be increased. Also, EoS will not enforce any commit interval, but
> only change the default value. Thus, a user can freely trade-off latency
> vs. caching-effect.
>
> Last but not least, there is the idea to allow "read_uncommitted" for
> intermediate topic. This would be an advance design for Streams EoS that
> allows downstream sub-topologies to read uncommitted data
> optimistically. In case of failure, a cascading abort of transactions
> would be required. This change will need another KIP.
>
>
>
> 4. Idempotent Producer:
>
> The transactional part automatically leverages the idempotent properties
> of the producer. Idempotency is a requirement:
>
> > Note that enable.idempotence must be enabled if a TransactionalId is
> configured.
>
> See
>
> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#bookmark=id.g2xsf9n49puh
>
> All idempotent retries, are handled by the producer internally (with or
> without transaction) if enable.idempotence is set to true.
>
>
>
> -Matthias
>
>
>
> On 3/3/17 3:34 AM, Eno Thereska wrote:
> > Another question:
> >
> > The KIP doesn’t exactly spell out how it uses the idempotence guarantee
> from KIP-98. It seems that only the transactional part is needed. Or is the
> idempotence guarantee working behind the scenes and helping for some
> scenarios for which it is not worthwhile aborting a transaction (e.g.,
> retransmitting a record after a temporary network glitch)?
> >
> > Thanks
> > Eno
> >
> >> On Mar 2, 2017, at 4:56 PM, Jay Kreps <j...@confluent.io> wrote:
> >>
> >> I second the concern on with the one producer per task approach. At a
> >> high-level it seems to make sense but I think Damian is exactly right
> that
> >> that cuts against the general design of the producer. Many people have
> high
> >> input partition counts and will have high task counts as a result. I
> think
> >> processing 1000 partitions should not be an unreasonable thing to want
> to
> >> do.
> >>
> >> The tricky bits will be:
> >>
> >>   - Reduced effectiveness of batching (or more latency and memory to get
> >>   equivalent batching). This doesn't show up in simple benchmarks
> because
> >>   much of the penalty is I/O and CPU on the broker and the additional
> threads
> >>   from all the producers can make a single-threaded benchmark seem
> faster.
> >>   - TCP connection explosion. We maintain one connection per broker.
> This
> >>   is already high since each app instance does this. This design though
> will
> >>   add an additional multiplicative factor based on the partition count
> of the
> >>   input.
> >>   - Connection and metadata request storms. When an instance with 1000
> >>   tasks starts up it is going to try to create many thousands of
> connections
> >>   and issue a thousand metadata requests all at once.
> >>   - Memory usage. We currently default to 64MB per producer. This can be
> >>   tuned down, but the fact that we are spreading the batching over more
> >>   producers will fundamentally mean we need a lot more memory to get
> good
> >>   perf and the memory usage will change as your task assignment changes
> so it
> >>   will be hard to set correctly unless it is done automatically.
> >>   - Metrics explosion (1000 producer instances, each with their own
> >>   metrics to monitor).
> >>   - Thread explosion, 1000 background threads, one per producer, each
> >>   sending data.
> >>
> >> -Jay
> >>
> >> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <damian....@gmail.com>
> wrote:
> >>
> >>> Hi Guozhang,
> >>>
> >>> Thanks for the KIP! This is an important feature for Kafka Streams and
> will
> >>> help to unlock a bunch of use cases.
> >>>
> >>> I have some concerns/questions:
> >>>
> >>>   1. Producer per task: I'm worried about the overhead this is going to
> >>>   put on both the streams app and the Kafka Brokers. You can easily
> >>> imagine
> >>>   an app consuming thousands of partitions. What load will this put on
> the
> >>>   brokers? Am i correct in assuming that there will be metadata
> requests
> >>> per
> >>>   Producer? The memory overhead in the streams app will also increase
> >>> fairly
> >>>   significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_CONFIG?
> >>>   2. State Store recovery: As we already know, restoring the entire
> >>>   changelog can take an extremely long time. Even with a fairly small
> >>> dataset
> >>>   and an inappropriately tuned segment size, this can take way too
> long.
> >>> My
> >>>   concern is that failures happen and then recovery takes "forever"
> and we
> >>>   end up in a situation where we need to change the max.poll.interval
> to
> >>> be
> >>>   some very large number or else we end up in "rebalance hell". I don't
> >>> think
> >>>   this provides a very good user experience. You mention RocksDB
> >>>   checkpointing in the doc - should we explore this idea some more?
> i.e.,
> >>>   understand the penalty for checkpointing. Maybe checkpoint every *n*
> >>>    commits?
> >>>   3. What does EoS mean for Caching? If we set the commit interval to
> >>>   100ms then the cache is not going to be very effective. Should it
> just
> >>> be
> >>>   disabled?
> >>>
> >>> Thanks,
> >>> Damian
> >>>
> >>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <wangg...@gmail.com> wrote:
> >>>
> >>>> Hi all,
> >>>>
> >>>> I have just created KIP-129 to leverage KIP-98 in Kafka Streams and
> >>> provide
> >>>> exactly-once processing semantics:
> >>>>
> >>>>
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>> 129%3A+Streams+Exactly-Once+Semantics
> >>>>
> >>>> This KIP enables Streams users to optionally turn on exactly-once
> >>>> processing semantics without changing their app code at all by
> leveraging
> >>>> the transactional messaging features provided in KIP-98.
> >>>>
> >>>> The above wiki page provides a high-level view of the proposed
> changes,
> >>>> while detailed implementation design can be found in this Google doc:
> >>>>
> >>>>
> >>>> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
> >>> FK1DAB8_gBYA2c
> >>>>
> >>>> We would love to hear your comments and suggestions.
> >>>>
> >>>> Thanks,
> >>>> -- Guozhang
> >>>>
> >>>
> >
>
>

Reply via email to