@Henry Cai, I am working on a separate KIP on Streams to leverage this KIP to have exactly-once processing semantics (note the exactly-once processing is a bit different from exactly-once delivery semantics), which should cover your question.
The short answer is that writing the changelog messages need to be part of the transaction, and when a fatal error happens within a transaction, since the store updates cannot be rolled back like the messages in the worst case we need to restore from the changelog from scratch, or from a checkpoint with a starting offset in changelog, and restoring consumer will fetch committed messages only as well. Guozhang On Thu, Dec 1, 2016 at 9:34 AM, Apurva Mehta <apu...@confluent.io> wrote: > Hi Daniel, > > That is a very good point. You are correct in saying that one does not need > a transaction coordinator to get idempotent semantics. > > There are, however, three reasons why we chose this route: > > 1. The request to find a transaction coordinator is exactly the same as > the request consumers use to find the group coordinator. So if clients > already implement the new consumer, you should already have the code you > need to find the transaction coordinator. I would even so far as to say > that the majority coordinator discovery code can be effectively shared > between producers and consumers. Jason should correct me on this, > however, > since he is most familiar with that bit. > 2. With this route, the broker side changes are simpler. In particular, > we have to implement the InitPIDRequest only in the coordinator. > 3. By always having a transaction coordinator, we can enable > applications to use transactions even if they don't specify the AppId. > The > only thing you lose is transaction recovery across sessions. > > Needless to say, we did debate this point extensively. What swung our > decision ultimately was the following observation: if the user does not > provide a transaction.app.id, the client can generate a UUID and use that > as the appId for the rest of the session. This means that there are no > branches in the client and server code, and is overall simpler to maintain. > All the producer APIs are also available to the user and it would be more > intuitive. > > It also means that clients cannot choose idempotence without transactions, > and hence it does place a greater burden on implementors of kafka clients. > But the cost should be minimal given point 1 above, and was deemed worth > it. > > Thanks once more for your thoughtful comments. It would be great for other > client implementors to chime in on this. > > Regards, > Apurva > > > On Thu, Dec 1, 2016 at 3:16 AM, Daniel Schierbeck > <da...@zendesk.com.invalid > > wrote: > > > Hi there, > > > > I'm the author of ruby-kafka, and as such am slightly biased towards > > simplicity of implementation :-) > > > > I like the proposal, and would love to use idempotent producer semantics > in > > our projects at Zendesk, but I'm a bit worried about the complexity that > > would go into the clients; specifically: it sounds to me that in order to > > get idempotent producer semantics, I'd have to implement the transaction > > coordinator discovery. I may be wrong, but it would seem that it's not > > strictly necessary if you're not using transactions – we could just use > the > > topic partition's leader as the coordinator, avoiding the extra > discovery. > > In my experience, most bugs are related to figuring out which broker is > the > > leader of which partition/group/whatever, so minimizing the number of > > moving parts would be beneficial to me. I'd also like to point out that I > > would be reluctant to implement the transaction API in the near future, > but > > would love to implement the idempotency API soon. The former seems only > > relevant to real stream processing frameworks, which is probably not the > > best use case for ruby-kafka. > > > > Cheers, > > Daniel Schierbeck > > > > On Thu, Dec 1, 2016 at 9:54 AM Jason Gustafson <ja...@confluent.io> > wrote: > > > > > Hey Neha, > > > > > > Thanks for the thoughtful questions. I'll try to address the first > > question > > > since Apurva addressed the second. Since most readers are probably > > getting > > > up to speed with this large proposal, let me first take a step back and > > > explain why we need the AppID at all. As Confluent tradition demands, I > > > present you a big wall of text: > > > > > > Clearly "exactly once" delivery requires resilience to client failures. > > > When a client crashes or turns into a zombie, another client must > > > eventually be started to resume the work. There are two problems: 1) we > > > need to ensure that the old process is actually dead or at least that > it > > > cannot write any more data, and 2) we need to be able to pick up > wherever > > > the last process left off. To do either of these, we need some kind of > > > identifier to tie the two instances together. > > > > > > There are only two choices for where this ID comes from: either the > user > > > gives it to us or the server generates it. In the latter case, the user > > is > > > responsible for fetching it from the client and persisting it somewhere > > for > > > use after failure. We ultimately felt that the most flexible option is > to > > > have the user give it to us. In many applications, there is already a > > > natural identifier which is already used to divide the workload. For > > > example, in Kafka Streams and Kafka Connect, we have a taskId. For > > > applications where there is no natural ID, the user can generate a UUID > > and > > > persist it locally, which is as good as having the server generate it. > > > > > > So the AppID is used to provide continuity between the instances of a > > > producer which are handling a certain workload. One of the early design > > > decisions we made in this work was to make the delivery guarantees we > > > provide agnostic of the workload that the producer is assigned. The > > > producer is not in the business of trying to divide up the work among > all > > > its peers who are participating in the same duty (unlike the consumer, > we > > > don't know anything about where the data comes from). This has huge > > > implications for "exactly-once" delivery because it puts the burden on > > the > > > user to divide the total workload among producer instances and to > assign > > > AppIDs accordingly. > > > > > > I've been using the term "workload" loosely, but we usually imagine > > > something like Kafka Connect's notion of a "source partition." A source > > > partition could be a topic partition if the source is Kafka, or it > could > > be > > > a database table, a log file, or whatever makes sense for the source of > > the > > > data. The point is that it's an independent source of data which can be > > > assigned to a producer instance. > > > > > > If the same source partition is always assigned to the producer with > the > > > the same AppID, then Kafka transactions will give you "exactly once" > > > delivery without much additional work. On initialization, the producer > > will > > > ensure that 1) any previous producers using that AppID are "fenced" > off, > > > and 2) that any transaction which had been started by a previous > producer > > > with that AppID have either completed or aborted. > > > > > > Based on this, it should be clear that the ideal is to divide the > > workload > > > so that you have a one-to-one mapping from the source partition to the > > > AppID. If the source of the data is Kafka, then the source partition is > > > just a topic partition, and the AppID can be generated from the name of > > the > > > topic and the partition number. > > > > > > To finally get back to your auto-scaling question, let's assume for a > > > moment the ideal mapping of source partition to AppID. The main > question > > is > > > whether the scaling is "horizontal" or "vertical." By horizontal, I > mean > > an > > > increase in the number of source partitions. This case is easy. Assign > > new > > > AppIDs based on the new source partitions and you're done. > > > > > > But if the scaling is vertical (i.e. an increase in the load on the > > source > > > partitions), there's not much this proposal can do to help. You're > going > > to > > > have to break the source partition into child partitions, and assign > each > > > of the new partitions a new AppID. To preserve "exactly once" delivery, > > you > > > must make sure that the producers using the AppID assigned to the > parent > > > partition have been shutdown cleanly. We could provide a way to pass > in a > > > "parent AppID" so that the producer could check the appropriate safety > > > conditions, but for the first version, we assume that users consider > > > scaling requirements when dividing the workload into source partitions. > > > > > > Unfortunately, the real world is always falling short of the ideal, and > > > it's not always practical to have a one-to-one mapping of source > > partition > > > to AppID, since that also implies a one-to-one mapping of source > > partition > > > to producer instance. If I were a user, I'd push this limit as far as > is > > > reasonable, but with enough source partitions, it eventually breaks > down. > > > At some point, you need a producer to handle the load of more than one > > > source partition. This is fine in itself if the assignment is sticky: > > that > > > is, if we can ensure that the same source partition is assigned to the > > > producer using a certain AppID. If not, then the user is responsible > for > > > ensuring a clean hand-off. The producer reading from the migrating > source > > > partition must stop reading, commit or abort any transaction containing > > > data processed from that source partition, and then signal the producer > > > which is taking over that it is safe to begin. > > > > > > This burden is a consequence of the decision to keep the producer out > of > > > the role of assigning work. We could do more if we forced users to > > > formalize their application-specific notion of a source partition, and > if > > > we turned the producer into something like a consumer group, with a > > > rebalance protocol. This would allow the broker to be the one to > ensure a > > > clean hand-off of work, but it would be a huge departure from the way > the > > > producer currently works, and not all applications have a notion of > > source > > > partition anyway. So the result is a bit more work for the user, though > > of > > > course it would be transparent to for Kafka Streams users. > > > > > > One final note. I've described above how to get the strongest > guarantees > > > that this work is capable of providing in an auto-scaling environment. > We > > > also provide weaker guarantees, which are still an improvement over the > > > current state. For example, without specifying any kind of AppID, we > > > provide idempotent production for the lifetime of a producer instance. > > This > > > ensures reliable delivery without duplicates even with broker failures. > > It > > > is also possible to use transactions without an ephemeral AppID. If the > > > application generates a UUID for user as the AppID, and only uses it > for > > > the lifetime of a single producer, you can still take advantage of > > > transactional semantics, which allows you to write to a set of messages > > to > > > multiple partitions atomically. > > > > > > Hope that answers the question and helps others understand the work a > bit > > > better! > > > > > > Thanks, > > > Jason > > > > > > > > > > > > > > > On Wed, Nov 30, 2016 at 9:51 PM, Apurva Mehta <apu...@confluent.io> > > wrote: > > > > > > > Thanks for your comment, I updated the document. Let me know if it is > > > clear > > > > now. > > > > > > > > Apurva > > > > > > > > On Wed, Nov 30, 2016 at 9:42 PM, Onur Karaman < > > > > onurkaraman.apa...@gmail.com> > > > > wrote: > > > > > > > > > @Apurva yep that's what I was trying to say. > > > > > > > > > > Original message: > > > > > If there is already an entry with the AppID in the mapping, > increment > > > the > > > > > epoch number and go on to the next step. If there is no entry with > > the > > > > > AppID in the mapping, construct a PID with initialized epoch > number; > > > > append > > > > > an AppID message into the transaction topic, insert into the > mapping > > > and > > > > > reply with the PID / epoch / timestamp. > > > > > > > > > > Just wanted to make it explicit because: > > > > > 1. The "append an AppID message..." chunk was ambiguous on whether > it > > > > > applied to the "if exists" or "if not exists" condition > > > > > 2. I think the google doc is pretty explicit on appending to the > log > > > > > everywhere else. > > > > > > > > > > On Wed, Nov 30, 2016 at 9:36 PM, Apurva Mehta <apu...@confluent.io > > > > > > wrote: > > > > > > > > > > > The first line in step 2 of that section is: "If there is already > > an > > > > > entry > > > > > > with the AppID in the mapping, increment the epoch number and go > on > > > to > > > > > the > > > > > > next step." > > > > > > > > > > > > Are you suggesting that it be made explicit that 'increment the > > epoch > > > > > > number' includes persisting the updated value to the log? > > > > > > > > > > > > Thanks, > > > > > > Apurva > > > > > > > > > > > > On Wed, Nov 30, 2016 at 9:21 PM, Onur Karaman < > > > > > > onurkaraman.apa...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Nice google doc! > > > > > > > > > > > > > > Probably need to go over the google doc a few more times, but a > > > minor > > > > > > > comment from the first pass: > > > > > > > > > > > > > > In Transaction Coordinator Request Handling ( > > > > > > > https://docs.google.com/document/d/11Jqy_ > > > <https://docs.google.com/document/d/11Jqy_> > > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF > > > > > > > 0wSw9ra8/edit#bookmark=id.jro89lml46du), > > > > > > > step 2 mentions that if the Transaction Coordinator doesn't > > already > > > > > see a > > > > > > > producer with the same app-id, it creates a pid and appends > > > (app-id, > > > > > pid, > > > > > > > epoch) into the transaction log. > > > > > > > > > > > > > > What about if the app-id/pid pair already exists and we > increment > > > the > > > > > > > epoch? Should we append (app-id, pid, epoch++) to the > transaction > > > > log? > > > > > I > > > > > > > think we should, but step 2 doesn't mention this. > > > > > > > > > > > > > > On Wed, Nov 30, 2016 at 5:35 PM, Apurva Mehta < > > apu...@confluent.io > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Thanks for your comments, let me deal with your second point > > > > > regarding > > > > > > > > merging the __consumer-offsets and transactions topic. > > > > > > > > > > > > > > > > Needless to say, we considered doing this, but chose to keep > > them > > > > > > > separate > > > > > > > > for the following reasons: > > > > > > > > > > > > > > > > 1. Your assumption that group.id and transaction.app.id can > be > > > > > the > > > > > > > same > > > > > > > > does not hold for streams applications. All colocated tasks > of > > a > > > > > > > streams > > > > > > > > application will share the same consumer (and hence > implicitly > > > > > will > > > > > > > have > > > > > > > > the same group.id), but each task will have its own producer > > > > > > > instance. > > > > > > > > The transaction.app.id for each producer instance will still > > > > have > > > > > > to > > > > > > > be > > > > > > > > distinct. So to colocate the transaction and consumer group > > > > > > > > coordinators, > > > > > > > > we will have to now introduce a 'group.id' config in the > > > > producer > > > > > > and > > > > > > > > require it to be the same as the consumer. This seemed like a > > > > very > > > > > > > > fragile > > > > > > > > option. > > > > > > > > 2. Following on from the above, the transaction coordinator > and > > > > > > group > > > > > > > > coordinator would _have_ to be colocated inorder to be the > > > > leader > > > > > > for > > > > > > > > the > > > > > > > > same TopicPartition, unless we wanted to make even more > > > > > fundamental > > > > > > > > changes > > > > > > > > to Kafka. > > > > > > > > 3. We don't require that the consumer coordinator and the > > > > > > transaction > > > > > > > > coordinator have the same view of the current PID/Epoch pair. > > > > If a > > > > > > > > producer > > > > > > > > instance is bounced, the epoch will be bumped. Any > transactions > > > > > > > > initiated > > > > > > > > by the previous instance would either be fully committed or > > > > fully > > > > > > > rolled > > > > > > > > back. Since the writes to the offset topics are just like > > writes > > > > > to > > > > > > a > > > > > > > > regular topic, these would enjoy the same guarantees, and the > > > > > > > > inconsistency > > > > > > > > will be eventually resolved. > > > > > > > > 4. Finally, every application will have consumers, and hence > > > > > record > > > > > > > > consumer offsets. But a very small fraction of applications > > > > would > > > > > > use > > > > > > > > transactions. Blending the two topics would make recovering > > > > > > > transaction > > > > > > > > coordinator state unnecessarily inefficient since it has to > > read > > > > > > from > > > > > > > > the > > > > > > > > beginning of the topic to reconstruct its data structures -- > it > > > > > > would > > > > > > > > have > > > > > > > > to inspect and skip a majority of the messages if the offsets > > > > were > > > > > > in > > > > > > > > the > > > > > > > > same topic. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Apurva > > > > > > > > > > > > > > > > On Wed, Nov 30, 2016 at 4:47 PM, Neha Narkhede < > > > n...@confluent.io> > > > > > > > wrote: > > > > > > > > > > > > > > > > > Thanks for initiating this KIP! I think it is well written > > and > > > > I'm > > > > > > > > excited > > > > > > > > > to see the first step towards adding an important feature > in > > > > Kafka. > > > > > > > > > > > > > > > > > > I had a few initial thoughts on the KIP, mostly not as > deeply > > > > > thought > > > > > > > > > through than what you've done - > > > > > > > > > > > > > > > > > > 1. Perhaps you’ve thought about how this would work > already — > > > > since > > > > > > we > > > > > > > > now > > > > > > > > > require a producer to specify a unique AppID across > different > > > > > > instances > > > > > > > > of > > > > > > > > > an application, how would applications that run in the > cloud > > > use > > > > > this > > > > > > > > > feature with auto scaling? > > > > > > > > > > > > > > > > > > 2. Making it easy for applications to get exactly-once > > > semantics > > > > > for > > > > > > a > > > > > > > > > consume-process-produce workflow is a great feature to > have. > > To > > > > > > enable > > > > > > > > > this, the proposal now includes letting a producer > initiate a > > > > write > > > > > > to > > > > > > > > the > > > > > > > > > offset topic as well (just like consumers do). The consumer > > > > > > coordinator > > > > > > > > > (which could be on a different broker than the txn > > coordinator) > > > > > would > > > > > > > > then > > > > > > > > > validate if the PID and producer epoch is valid before it > > > writes > > > > to > > > > > > the > > > > > > > > > offset topic along with the associated PID. This is a great > > > > feature > > > > > > > > though > > > > > > > > > I see 2 difficulties > > > > > > > > > > > > > > > > > > -- This needs the consumer coordinator to have a consistent > > > view > > > > of > > > > > > the > > > > > > > > > PID/epochs that is same as the view on the txn coordinator. > > > > > However, > > > > > > as > > > > > > > > the > > > > > > > > > offset and the transaction topics are different, the 2 > > > > coordinators > > > > > > > might > > > > > > > > > live on different brokers. > > > > > > > > > -- We now also have 2 internal topics - a transaction topic > > and > > > > the > > > > > > > > > __consumer_offsets topic. > > > > > > > > > > > > > > > > > > Maybe you’ve thought about this already and discarded it > ... > > > let > > > > me > > > > > > > make > > > > > > > > a > > > > > > > > > somewhat crazy proposal — Why don’t we upgrade the > > transaction > > > > > topic > > > > > > to > > > > > > > > be > > > > > > > > > the new offsets topic as well? For consumers that want EoS > > > > > guarantees > > > > > > > for > > > > > > > > > a consume-process-produce pattern, the group.id is the > same > > as > > > > the > > > > > > > > > transaction.app.id set for the producer. Assume that the > > > > > transaction > > > > > > > > topic > > > > > > > > > also stores consumer offsets. It stores both the > transaction > > > > > metadata > > > > > > > > > messages as well as offset messages, both for transactional > > as > > > > well > > > > > > as > > > > > > > > > non-transactional consumers. Since the group.id of the > > > consumer > > > > > and > > > > > > > the > > > > > > > > > app.id of the producer is the same, the offsets associated > > > with > > > > a > > > > > > > > consumer > > > > > > > > > group and topic-partition end up in the same transaction > > topic > > > > > > > partition > > > > > > > > as > > > > > > > > > the transaction metadata messages. The transaction > > coordinator > > > > and > > > > > > the > > > > > > > > > consumer coordinator always live on the same broker since > > they > > > > both > > > > > > map > > > > > > > > to > > > > > > > > > the same partition in the transaction topic. Even if there > > are > > > > > > > failures, > > > > > > > > > they end up on the same new broker. Hence, they share the > > same > > > > and > > > > > > > > > consistent view of the PIDs, epochs and App IDs, whatever > it > > > is. > > > > > The > > > > > > > > > consumer coordinator will skip over the transaction > metadata > > > > > messages > > > > > > > > when > > > > > > > > > it bootstraps the offsets from this new topic for consumer > > > groups > > > > > > that > > > > > > > > are > > > > > > > > > not involved in a transaction and don’t have a txn id > > > associated > > > > > with > > > > > > > the > > > > > > > > > offset message in the transaction topic. The consumer > > > coordinator > > > > > > will > > > > > > > > > expose only committed offsets in cases of consumer groups > > that > > > > are > > > > > > > > involved > > > > > > > > > in a txn. It will also be able to validate the > > > > OffsetCommitRequests > > > > > > > > coming > > > > > > > > > from a transactional producer by ensuring that it is coming > > > from > > > > a > > > > > > > valid > > > > > > > > > PID, producer epoch since it uses the same view of this > data > > > > > created > > > > > > by > > > > > > > > the > > > > > > > > > transaction coordinator (that lives on the same broker). > And > > we > > > > > will > > > > > > > end > > > > > > > > up > > > > > > > > > with one internal topic, not too. > > > > > > > > > > > > > > > > > > This proposal offers better operational simplicity and > fewer > > > > > internal > > > > > > > > > topics but there are some downsides that come with it — > there > > > > are 2 > > > > > > > types > > > > > > > > > of messages in one topic (txn metadata ones and offset > ones). > > > > Since > > > > > > > this > > > > > > > > > internal topic serves a dual purpose, it will be harder to > > name > > > > it > > > > > > and > > > > > > > > also > > > > > > > > > design a message format that includes the different types > of > > > > > messages > > > > > > > > that > > > > > > > > > will live in the topic. Though the transaction topic > already > > > > needs > > > > > to > > > > > > > > write > > > > > > > > > 5 different types of messages (the AppID->PID mapping, the > > > > BeginTxn > > > > > > > > > message, InsertTxn, PrepareCommit, Committed/Aborted) so > > maybe > > > > > adding > > > > > > > the > > > > > > > > > offset message isn't a big deal? > > > > > > > > > > > > > > > > > > Back when we introduced the offsets topic, we had discussed > > > > making > > > > > it > > > > > > > > more > > > > > > > > > general and allowing the producer to send offset commit > > > messages > > > > to > > > > > > it > > > > > > > > but > > > > > > > > > ended up creating a specialized topic to allow the consumer > > > > > > coordinator > > > > > > > > to > > > > > > > > > wall off and prevent unauthorized writes from consumers > > outside > > > > of > > > > > a > > > > > > > > group. > > > > > > > > > Jason can comment on the specifics but I don't believe that > > > goal > > > > of > > > > > > the > > > > > > > > new > > > > > > > > > consumer protocol was quite achieved. > > > > > > > > > > > > > > > > > > I have other comments on the message format, request names > > etc > > > > but > > > > > > > wanted > > > > > > > > > to get your thoughts on these 2 issues first :-) > > > > > > > > > > > > > > > > > > On Wed, Nov 30, 2016 at 2:19 PM Guozhang Wang < > > > > wangg...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > I have just created KIP-98 to enhance Kafka with exactly > > once > > > > > > > delivery > > > > > > > > > > semantics: > > > > > > > > > > > > > > > > > > > > * > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-> > > > > > > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging > > > > > > > > > > < > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-> > > > > > > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging > > > > > > > > > > >* > > > > > > > > > > > > > > > > > > > > This KIP adds a transactional messaging mechanism along > > with > > > an > > > > > > > > > idempotent > > > > > > > > > > producer implementation to make sure that 1) duplicated > > > > messages > > > > > > sent > > > > > > > > > from > > > > > > > > > > the same identified producer can be detected on the > broker > > > > side, > > > > > > and > > > > > > > > 2) a > > > > > > > > > > group of messages sent within a transaction will > atomically > > > be > > > > > > either > > > > > > > > > > reflected and fetchable to consumers or not as a whole. > > > > > > > > > > > > > > > > > > > > The above wiki page provides a high-level view of the > > > proposed > > > > > > > changes > > > > > > > > as > > > > > > > > > > well as summarized guarantees. Initial draft of the > > detailed > > > > > > > > > implementation > > > > > > > > > > design is described in this Google doc: > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/11Jqy_ > > > <https://docs.google.com/document/d/11Jqy_> > > > > > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF > > > > > > > > > > 0wSw9ra8 > > > > > > > > > > <https://docs.google.com/document/d/11Jqy_ > > > <https://docs.google.com/document/d/11Jqy_> > > > > > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF > > > > > > > > > 0wSw9ra8> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We would love to hear your comments and suggestions. > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > Thanks, > > > > > > > > > Neha > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- -- Guozhang