Hi Ben, You are are right on both counts:
Writing apps to do consume-process-produce batching will be tricky to program using this API directly. The expectation is that 99% of the users would use the streams API to leverage this functionality, and that API will take care of the details. This seems fair, since this pattern is at the core of stream processing. Using an internally generated PID is definitely more a performance than a correctness thing: we could generate UUIDs in the producer if no AppId is specified, and that would also work. However, as you may have seen elsewhere in the thread, there are calls for the PID to be 4 bytes (vs. the present 8 bytes). So 16 bytes will be really far out. While the cost of a PID is amortized across the messages in a message set, we still want to keep it as small as possible to reduce the overhead. We are thinking about how to recover expired PIDs which would enable us to keep it to just 4 bytes (enough to handle 4billion concurrently alive producers). However, this will be very very tricky because a given PID could produce to multiple topic partitions, and recovering a PID will require _all_ instances of it across _all_ topic partitions to be expired. This would be very hard to achieve without invasive things like reference counting. Still we are searching for a more elegant and lightweight solution to the problem, and will use 4 byte PID if we can find an acceptable solution for PID recovery. Thanks, Apurva On Tue, Dec 13, 2016 at 9:30 PM, Ben Kirwin <b...@kirw.in> wrote: > Hi Apurva, > > Thanks for the detailed answers... and sorry for the late reply! > > It does sound like, if the input-partitions-to-app-id mapping never > changes, the existing fencing mechanisms should prevent duplicates. Great! > I'm a bit concerned the proposed API will be delicate to program against > successfully -- even in the simple case, we need to create a new producer > instance per input partition, and anything fancier is going to need its own > implementation of the Streams/Samza-style 'task' idea -- but that may be > fine for this sort of advanced feature. > > For the second question, I notice that Jason also elaborated on this > downthread: > > > We also looked at removing the producer ID. > > This was discussed somewhere above, but basically the idea is to store > the > > AppID in the message set header directly and avoid the mapping to > producer > > ID altogether. As long as batching isn't too bad, the impact on total > size > > may not be too bad, but we were ultimately more comfortable with a fixed > > size ID. > > ...which suggests that the distinction is useful for performance, but not > necessary for correctness, which makes good sense to me. (Would a 128-bid > ID be a reasonable compromise? That's enough room for a UUID, or a > reasonable hash of an arbitrary string, and has only a marginal increase on > the message size.) > > On Tue, Dec 6, 2016 at 11:44 PM, Apurva Mehta <apu...@confluent.io> wrote: > > > Hi Ben, > > > > Now, on to your first question of how deal with consumer rebalances. The > > short answer is that the application needs to ensure that the the > > assignment of input partitions to appId is consistent across rebalances. > > > > For Kafka streams, they already ensure that the mapping of input > partitions > > to task Id is invariant across rebalances by implementing a custom sticky > > assignor. Other non-streams apps can trivially have one producer per > input > > partition and have the appId be the same as the partition number to > achieve > > the same effect. > > > > With this precondition in place, we can maintain transactions across > > rebalances. > > > > Hope this answers your question. > > > > Thanks, > > Apurva > > > > On Tue, Dec 6, 2016 at 3:22 PM, Ben Kirwin <b...@kirw.in> wrote: > > > > > Thanks for this! I'm looking forward to going through the full proposal > > in > > > detail soon; a few early questions: > > > > > > First: what happens when a consumer rebalances in the middle of a > > > transaction? The full documentation suggests that such a transaction > > ought > > > to be rejected: > > > > > > > [...] if a rebalance has happened and this consumer > > > > instance becomes a zombie, even if this offset message is appended in > > the > > > > offset topic, the transaction will be rejected later on when it tries > > to > > > > commit the transaction via the EndTxnRequest. > > > > > > ...but it's unclear to me how we ensure that a transaction can't > complete > > > if a rebalance has happened. (It's quite possible I'm missing something > > > obvious!) > > > > > > As a concrete example: suppose a process with PID 1 adds offsets for > some > > > partition to a transaction; a consumer rebalance happens that assigns > the > > > partition to a process with PID 2, which adds some offsets to its > current > > > transaction; both processes try and commit. Allowing both commits would > > > cause the messages to be processed twice -- how is that avoided? > > > > > > Second: App IDs normally map to a single PID. It seems like one could > do > > > away with the PID concept entirely, and just use App IDs in most places > > > that require a PID. This feels like it would be significantly simpler, > > > though it does increase the message size. Are there other reasons why > the > > > App ID / PID split is necessary? > > > > > > On Wed, Nov 30, 2016 at 2:19 PM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > Hi all, > > > > > > > > I have just created KIP-98 to enhance Kafka with exactly once > delivery > > > > semantics: > > > > > > > > *https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging>* > > > > > > > > This KIP adds a transactional messaging mechanism along with an > > > idempotent > > > > producer implementation to make sure that 1) duplicated messages sent > > > from > > > > the same identified producer can be detected on the broker side, and > > 2) a > > > > group of messages sent within a transaction will atomically be either > > > > reflected and fetchable to consumers or not as a whole. > > > > > > > > The above wiki page provides a high-level view of the proposed > changes > > as > > > > well as summarized guarantees. Initial draft of the detailed > > > implementation > > > > design is described in this Google doc: > > > > > > > > https://docs.google.com/document/d/11Jqy_ > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF > > > > 0wSw9ra8 > > > > > > > > > > > > We would love to hear your comments and suggestions. > > > > > > > > Thanks, > > > > > > > > -- Guozhang > > > > > > > > > >