Nice google doc! Probably need to go over the google doc a few more times, but a minor comment from the first pass:
In Transaction Coordinator Request Handling ( https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#bookmark=id.jro89lml46du), step 2 mentions that if the Transaction Coordinator doesn't already see a producer with the same app-id, it creates a pid and appends (app-id, pid, epoch) into the transaction log. What about if the app-id/pid pair already exists and we increment the epoch? Should we append (app-id, pid, epoch++) to the transaction log? I think we should, but step 2 doesn't mention this. On Wed, Nov 30, 2016 at 5:35 PM, Apurva Mehta <apu...@confluent.io> wrote: > Thanks for your comments, let me deal with your second point regarding > merging the __consumer-offsets and transactions topic. > > Needless to say, we considered doing this, but chose to keep them separate > for the following reasons: > > 1. Your assumption that group.id and transaction.app.id can be the same > does not hold for streams applications. All colocated tasks of a streams > application will share the same consumer (and hence implicitly will have > the same group.id), but each task will have its own producer instance. > The transaction.app.id for each producer instance will still have to be > distinct. So to colocate the transaction and consumer group > coordinators, > we will have to now introduce a 'group.id' config in the producer and > require it to be the same as the consumer. This seemed like a very > fragile > option. > 2. Following on from the above, the transaction coordinator and group > coordinator would _have_ to be colocated inorder to be the leader for > the > same TopicPartition, unless we wanted to make even more fundamental > changes > to Kafka. > 3. We don't require that the consumer coordinator and the transaction > coordinator have the same view of the current PID/Epoch pair. If a > producer > instance is bounced, the epoch will be bumped. Any transactions > initiated > by the previous instance would either be fully committed or fully rolled > back. Since the writes to the offset topics are just like writes to a > regular topic, these would enjoy the same guarantees, and the > inconsistency > will be eventually resolved. > 4. Finally, every application will have consumers, and hence record > consumer offsets. But a very small fraction of applications would use > transactions. Blending the two topics would make recovering transaction > coordinator state unnecessarily inefficient since it has to read from > the > beginning of the topic to reconstruct its data structures -- it would > have > to inspect and skip a majority of the messages if the offsets were in > the > same topic. > > Thanks, > Apurva > > On Wed, Nov 30, 2016 at 4:47 PM, Neha Narkhede <n...@confluent.io> wrote: > > > Thanks for initiating this KIP! I think it is well written and I'm > excited > > to see the first step towards adding an important feature in Kafka. > > > > I had a few initial thoughts on the KIP, mostly not as deeply thought > > through than what you've done - > > > > 1. Perhaps you’ve thought about how this would work already — since we > now > > require a producer to specify a unique AppID across different instances > of > > an application, how would applications that run in the cloud use this > > feature with auto scaling? > > > > 2. Making it easy for applications to get exactly-once semantics for a > > consume-process-produce workflow is a great feature to have. To enable > > this, the proposal now includes letting a producer initiate a write to > the > > offset topic as well (just like consumers do). The consumer coordinator > > (which could be on a different broker than the txn coordinator) would > then > > validate if the PID and producer epoch is valid before it writes to the > > offset topic along with the associated PID. This is a great feature > though > > I see 2 difficulties > > > > -- This needs the consumer coordinator to have a consistent view of the > > PID/epochs that is same as the view on the txn coordinator. However, as > the > > offset and the transaction topics are different, the 2 coordinators might > > live on different brokers. > > -- We now also have 2 internal topics - a transaction topic and the > > __consumer_offsets topic. > > > > Maybe you’ve thought about this already and discarded it ... let me make > a > > somewhat crazy proposal — Why don’t we upgrade the transaction topic to > be > > the new offsets topic as well? For consumers that want EoS guarantees for > > a consume-process-produce pattern, the group.id is the same as the > > transaction.app.id set for the producer. Assume that the transaction > topic > > also stores consumer offsets. It stores both the transaction metadata > > messages as well as offset messages, both for transactional as well as > > non-transactional consumers. Since the group.id of the consumer and the > > app.id of the producer is the same, the offsets associated with a > consumer > > group and topic-partition end up in the same transaction topic partition > as > > the transaction metadata messages. The transaction coordinator and the > > consumer coordinator always live on the same broker since they both map > to > > the same partition in the transaction topic. Even if there are failures, > > they end up on the same new broker. Hence, they share the same and > > consistent view of the PIDs, epochs and App IDs, whatever it is. The > > consumer coordinator will skip over the transaction metadata messages > when > > it bootstraps the offsets from this new topic for consumer groups that > are > > not involved in a transaction and don’t have a txn id associated with the > > offset message in the transaction topic. The consumer coordinator will > > expose only committed offsets in cases of consumer groups that are > involved > > in a txn. It will also be able to validate the OffsetCommitRequests > coming > > from a transactional producer by ensuring that it is coming from a valid > > PID, producer epoch since it uses the same view of this data created by > the > > transaction coordinator (that lives on the same broker). And we will end > up > > with one internal topic, not too. > > > > This proposal offers better operational simplicity and fewer internal > > topics but there are some downsides that come with it — there are 2 types > > of messages in one topic (txn metadata ones and offset ones). Since this > > internal topic serves a dual purpose, it will be harder to name it and > also > > design a message format that includes the different types of messages > that > > will live in the topic. Though the transaction topic already needs to > write > > 5 different types of messages (the AppID->PID mapping, the BeginTxn > > message, InsertTxn, PrepareCommit, Committed/Aborted) so maybe adding the > > offset message isn't a big deal? > > > > Back when we introduced the offsets topic, we had discussed making it > more > > general and allowing the producer to send offset commit messages to it > but > > ended up creating a specialized topic to allow the consumer coordinator > to > > wall off and prevent unauthorized writes from consumers outside of a > group. > > Jason can comment on the specifics but I don't believe that goal of the > new > > consumer protocol was quite achieved. > > > > I have other comments on the message format, request names etc but wanted > > to get your thoughts on these 2 issues first :-) > > > > On Wed, Nov 30, 2016 at 2:19 PM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hi all, > > > > > > I have just created KIP-98 to enhance Kafka with exactly once delivery > > > semantics: > > > > > > * > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging > > > < > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging > > > >* > > > > > > This KIP adds a transactional messaging mechanism along with an > > idempotent > > > producer implementation to make sure that 1) duplicated messages sent > > from > > > the same identified producer can be detected on the broker side, and > 2) a > > > group of messages sent within a transaction will atomically be either > > > reflected and fetchable to consumers or not as a whole. > > > > > > The above wiki page provides a high-level view of the proposed changes > as > > > well as summarized guarantees. Initial draft of the detailed > > implementation > > > design is described in this Google doc: > > > > > > https://docs.google.com/document/d/11Jqy_ > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF > > > 0wSw9ra8 > > > <https://docs.google.com/document/d/11Jqy_ > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF > > 0wSw9ra8> > > > > > > > > > We would love to hear your comments and suggestions. > > > > > > Thanks, > > > > > > -- Guozhang > > > > > -- > > Thanks, > > Neha > > >