Hi Apurva, Thanks for the detailed answers... and sorry for the late reply!
It does sound like, if the input-partitions-to-app-id mapping never changes, the existing fencing mechanisms should prevent duplicates. Great! I'm a bit concerned the proposed API will be delicate to program against successfully -- even in the simple case, we need to create a new producer instance per input partition, and anything fancier is going to need its own implementation of the Streams/Samza-style 'task' idea -- but that may be fine for this sort of advanced feature. For the second question, I notice that Jason also elaborated on this downthread: > We also looked at removing the producer ID. > This was discussed somewhere above, but basically the idea is to store the > AppID in the message set header directly and avoid the mapping to producer > ID altogether. As long as batching isn't too bad, the impact on total size > may not be too bad, but we were ultimately more comfortable with a fixed > size ID. ...which suggests that the distinction is useful for performance, but not necessary for correctness, which makes good sense to me. (Would a 128-bid ID be a reasonable compromise? That's enough room for a UUID, or a reasonable hash of an arbitrary string, and has only a marginal increase on the message size.) On Tue, Dec 6, 2016 at 11:44 PM, Apurva Mehta <apu...@confluent.io> wrote: > Hi Ben, > > Now, on to your first question of how deal with consumer rebalances. The > short answer is that the application needs to ensure that the the > assignment of input partitions to appId is consistent across rebalances. > > For Kafka streams, they already ensure that the mapping of input partitions > to task Id is invariant across rebalances by implementing a custom sticky > assignor. Other non-streams apps can trivially have one producer per input > partition and have the appId be the same as the partition number to achieve > the same effect. > > With this precondition in place, we can maintain transactions across > rebalances. > > Hope this answers your question. > > Thanks, > Apurva > > On Tue, Dec 6, 2016 at 3:22 PM, Ben Kirwin <b...@kirw.in> wrote: > > > Thanks for this! I'm looking forward to going through the full proposal > in > > detail soon; a few early questions: > > > > First: what happens when a consumer rebalances in the middle of a > > transaction? The full documentation suggests that such a transaction > ought > > to be rejected: > > > > > [...] if a rebalance has happened and this consumer > > > instance becomes a zombie, even if this offset message is appended in > the > > > offset topic, the transaction will be rejected later on when it tries > to > > > commit the transaction via the EndTxnRequest. > > > > ...but it's unclear to me how we ensure that a transaction can't complete > > if a rebalance has happened. (It's quite possible I'm missing something > > obvious!) > > > > As a concrete example: suppose a process with PID 1 adds offsets for some > > partition to a transaction; a consumer rebalance happens that assigns the > > partition to a process with PID 2, which adds some offsets to its current > > transaction; both processes try and commit. Allowing both commits would > > cause the messages to be processed twice -- how is that avoided? > > > > Second: App IDs normally map to a single PID. It seems like one could do > > away with the PID concept entirely, and just use App IDs in most places > > that require a PID. This feels like it would be significantly simpler, > > though it does increase the message size. Are there other reasons why the > > App ID / PID split is necessary? > > > > On Wed, Nov 30, 2016 at 2:19 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hi all, > > > > > > I have just created KIP-98 to enhance Kafka with exactly once delivery > > > semantics: > > > > > > *https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging>* > > > > > > This KIP adds a transactional messaging mechanism along with an > > idempotent > > > producer implementation to make sure that 1) duplicated messages sent > > from > > > the same identified producer can be detected on the broker side, and > 2) a > > > group of messages sent within a transaction will atomically be either > > > reflected and fetchable to consumers or not as a whole. > > > > > > The above wiki page provides a high-level view of the proposed changes > as > > > well as summarized guarantees. Initial draft of the detailed > > implementation > > > design is described in this Google doc: > > > > > > https://docs.google.com/document/d/11Jqy_ > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF > > > 0wSw9ra8 > > > > > > > > > We would love to hear your comments and suggestions. > > > > > > Thanks, > > > > > > -- Guozhang > > > > > >