Hi there, I'm the author of ruby-kafka, and as such am slightly biased towards simplicity of implementation :-)
I like the proposal, and would love to use idempotent producer semantics in our projects at Zendesk, but I'm a bit worried about the complexity that would go into the clients; specifically: it sounds to me that in order to get idempotent producer semantics, I'd have to implement the transaction coordinator discovery. I may be wrong, but it would seem that it's not strictly necessary if you're not using transactions – we could just use the topic partition's leader as the coordinator, avoiding the extra discovery. In my experience, most bugs are related to figuring out which broker is the leader of which partition/group/whatever, so minimizing the number of moving parts would be beneficial to me. I'd also like to point out that I would be reluctant to implement the transaction API in the near future, but would love to implement the idempotency API soon. The former seems only relevant to real stream processing frameworks, which is probably not the best use case for ruby-kafka. Cheers, Daniel Schierbeck On Thu, Dec 1, 2016 at 9:54 AM Jason Gustafson <ja...@confluent.io> wrote: > Hey Neha, > > Thanks for the thoughtful questions. I'll try to address the first question > since Apurva addressed the second. Since most readers are probably getting > up to speed with this large proposal, let me first take a step back and > explain why we need the AppID at all. As Confluent tradition demands, I > present you a big wall of text: > > Clearly "exactly once" delivery requires resilience to client failures. > When a client crashes or turns into a zombie, another client must > eventually be started to resume the work. There are two problems: 1) we > need to ensure that the old process is actually dead or at least that it > cannot write any more data, and 2) we need to be able to pick up wherever > the last process left off. To do either of these, we need some kind of > identifier to tie the two instances together. > > There are only two choices for where this ID comes from: either the user > gives it to us or the server generates it. In the latter case, the user is > responsible for fetching it from the client and persisting it somewhere for > use after failure. We ultimately felt that the most flexible option is to > have the user give it to us. In many applications, there is already a > natural identifier which is already used to divide the workload. For > example, in Kafka Streams and Kafka Connect, we have a taskId. For > applications where there is no natural ID, the user can generate a UUID and > persist it locally, which is as good as having the server generate it. > > So the AppID is used to provide continuity between the instances of a > producer which are handling a certain workload. One of the early design > decisions we made in this work was to make the delivery guarantees we > provide agnostic of the workload that the producer is assigned. The > producer is not in the business of trying to divide up the work among all > its peers who are participating in the same duty (unlike the consumer, we > don't know anything about where the data comes from). This has huge > implications for "exactly-once" delivery because it puts the burden on the > user to divide the total workload among producer instances and to assign > AppIDs accordingly. > > I've been using the term "workload" loosely, but we usually imagine > something like Kafka Connect's notion of a "source partition." A source > partition could be a topic partition if the source is Kafka, or it could be > a database table, a log file, or whatever makes sense for the source of the > data. The point is that it's an independent source of data which can be > assigned to a producer instance. > > If the same source partition is always assigned to the producer with the > the same AppID, then Kafka transactions will give you "exactly once" > delivery without much additional work. On initialization, the producer will > ensure that 1) any previous producers using that AppID are "fenced" off, > and 2) that any transaction which had been started by a previous producer > with that AppID have either completed or aborted. > > Based on this, it should be clear that the ideal is to divide the workload > so that you have a one-to-one mapping from the source partition to the > AppID. If the source of the data is Kafka, then the source partition is > just a topic partition, and the AppID can be generated from the name of the > topic and the partition number. > > To finally get back to your auto-scaling question, let's assume for a > moment the ideal mapping of source partition to AppID. The main question is > whether the scaling is "horizontal" or "vertical." By horizontal, I mean an > increase in the number of source partitions. This case is easy. Assign new > AppIDs based on the new source partitions and you're done. > > But if the scaling is vertical (i.e. an increase in the load on the source > partitions), there's not much this proposal can do to help. You're going to > have to break the source partition into child partitions, and assign each > of the new partitions a new AppID. To preserve "exactly once" delivery, you > must make sure that the producers using the AppID assigned to the parent > partition have been shutdown cleanly. We could provide a way to pass in a > "parent AppID" so that the producer could check the appropriate safety > conditions, but for the first version, we assume that users consider > scaling requirements when dividing the workload into source partitions. > > Unfortunately, the real world is always falling short of the ideal, and > it's not always practical to have a one-to-one mapping of source partition > to AppID, since that also implies a one-to-one mapping of source partition > to producer instance. If I were a user, I'd push this limit as far as is > reasonable, but with enough source partitions, it eventually breaks down. > At some point, you need a producer to handle the load of more than one > source partition. This is fine in itself if the assignment is sticky: that > is, if we can ensure that the same source partition is assigned to the > producer using a certain AppID. If not, then the user is responsible for > ensuring a clean hand-off. The producer reading from the migrating source > partition must stop reading, commit or abort any transaction containing > data processed from that source partition, and then signal the producer > which is taking over that it is safe to begin. > > This burden is a consequence of the decision to keep the producer out of > the role of assigning work. We could do more if we forced users to > formalize their application-specific notion of a source partition, and if > we turned the producer into something like a consumer group, with a > rebalance protocol. This would allow the broker to be the one to ensure a > clean hand-off of work, but it would be a huge departure from the way the > producer currently works, and not all applications have a notion of source > partition anyway. So the result is a bit more work for the user, though of > course it would be transparent to for Kafka Streams users. > > One final note. I've described above how to get the strongest guarantees > that this work is capable of providing in an auto-scaling environment. We > also provide weaker guarantees, which are still an improvement over the > current state. For example, without specifying any kind of AppID, we > provide idempotent production for the lifetime of a producer instance. This > ensures reliable delivery without duplicates even with broker failures. It > is also possible to use transactions without an ephemeral AppID. If the > application generates a UUID for user as the AppID, and only uses it for > the lifetime of a single producer, you can still take advantage of > transactional semantics, which allows you to write to a set of messages to > multiple partitions atomically. > > Hope that answers the question and helps others understand the work a bit > better! > > Thanks, > Jason > > > > > On Wed, Nov 30, 2016 at 9:51 PM, Apurva Mehta <apu...@confluent.io> wrote: > > > Thanks for your comment, I updated the document. Let me know if it is > clear > > now. > > > > Apurva > > > > On Wed, Nov 30, 2016 at 9:42 PM, Onur Karaman < > > onurkaraman.apa...@gmail.com> > > wrote: > > > > > @Apurva yep that's what I was trying to say. > > > > > > Original message: > > > If there is already an entry with the AppID in the mapping, increment > the > > > epoch number and go on to the next step. If there is no entry with the > > > AppID in the mapping, construct a PID with initialized epoch number; > > append > > > an AppID message into the transaction topic, insert into the mapping > and > > > reply with the PID / epoch / timestamp. > > > > > > Just wanted to make it explicit because: > > > 1. The "append an AppID message..." chunk was ambiguous on whether it > > > applied to the "if exists" or "if not exists" condition > > > 2. I think the google doc is pretty explicit on appending to the log > > > everywhere else. > > > > > > On Wed, Nov 30, 2016 at 9:36 PM, Apurva Mehta <apu...@confluent.io> > > wrote: > > > > > > > The first line in step 2 of that section is: "If there is already an > > > entry > > > > with the AppID in the mapping, increment the epoch number and go on > to > > > the > > > > next step." > > > > > > > > Are you suggesting that it be made explicit that 'increment the epoch > > > > number' includes persisting the updated value to the log? > > > > > > > > Thanks, > > > > Apurva > > > > > > > > On Wed, Nov 30, 2016 at 9:21 PM, Onur Karaman < > > > > onurkaraman.apa...@gmail.com> > > > > wrote: > > > > > > > > > Nice google doc! > > > > > > > > > > Probably need to go over the google doc a few more times, but a > minor > > > > > comment from the first pass: > > > > > > > > > > In Transaction Coordinator Request Handling ( > > > > > https://docs.google.com/document/d/11Jqy_ > <https://docs.google.com/document/d/11Jqy_> > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF > > > > > 0wSw9ra8/edit#bookmark=id.jro89lml46du), > > > > > step 2 mentions that if the Transaction Coordinator doesn't already > > > see a > > > > > producer with the same app-id, it creates a pid and appends > (app-id, > > > pid, > > > > > epoch) into the transaction log. > > > > > > > > > > What about if the app-id/pid pair already exists and we increment > the > > > > > epoch? Should we append (app-id, pid, epoch++) to the transaction > > log? > > > I > > > > > think we should, but step 2 doesn't mention this. > > > > > > > > > > On Wed, Nov 30, 2016 at 5:35 PM, Apurva Mehta <apu...@confluent.io > > > > > > wrote: > > > > > > > > > > > Thanks for your comments, let me deal with your second point > > > regarding > > > > > > merging the __consumer-offsets and transactions topic. > > > > > > > > > > > > Needless to say, we considered doing this, but chose to keep them > > > > > separate > > > > > > for the following reasons: > > > > > > > > > > > > 1. Your assumption that group.id and transaction.app.id can be > > > the > > > > > same > > > > > > does not hold for streams applications. All colocated tasks of a > > > > > streams > > > > > > application will share the same consumer (and hence implicitly > > > will > > > > > have > > > > > > the same group.id), but each task will have its own producer > > > > > instance. > > > > > > The transaction.app.id for each producer instance will still > > have > > > > to > > > > > be > > > > > > distinct. So to colocate the transaction and consumer group > > > > > > coordinators, > > > > > > we will have to now introduce a 'group.id' config in the > > producer > > > > and > > > > > > require it to be the same as the consumer. This seemed like a > > very > > > > > > fragile > > > > > > option. > > > > > > 2. Following on from the above, the transaction coordinator and > > > > group > > > > > > coordinator would _have_ to be colocated inorder to be the > > leader > > > > for > > > > > > the > > > > > > same TopicPartition, unless we wanted to make even more > > > fundamental > > > > > > changes > > > > > > to Kafka. > > > > > > 3. We don't require that the consumer coordinator and the > > > > transaction > > > > > > coordinator have the same view of the current PID/Epoch pair. > > If a > > > > > > producer > > > > > > instance is bounced, the epoch will be bumped. Any transactions > > > > > > initiated > > > > > > by the previous instance would either be fully committed or > > fully > > > > > rolled > > > > > > back. Since the writes to the offset topics are just like writes > > > to > > > > a > > > > > > regular topic, these would enjoy the same guarantees, and the > > > > > > inconsistency > > > > > > will be eventually resolved. > > > > > > 4. Finally, every application will have consumers, and hence > > > record > > > > > > consumer offsets. But a very small fraction of applications > > would > > > > use > > > > > > transactions. Blending the two topics would make recovering > > > > > transaction > > > > > > coordinator state unnecessarily inefficient since it has to read > > > > from > > > > > > the > > > > > > beginning of the topic to reconstruct its data structures -- it > > > > would > > > > > > have > > > > > > to inspect and skip a majority of the messages if the offsets > > were > > > > in > > > > > > the > > > > > > same topic. > > > > > > > > > > > > Thanks, > > > > > > Apurva > > > > > > > > > > > > On Wed, Nov 30, 2016 at 4:47 PM, Neha Narkhede < > n...@confluent.io> > > > > > wrote: > > > > > > > > > > > > > Thanks for initiating this KIP! I think it is well written and > > I'm > > > > > > excited > > > > > > > to see the first step towards adding an important feature in > > Kafka. > > > > > > > > > > > > > > I had a few initial thoughts on the KIP, mostly not as deeply > > > thought > > > > > > > through than what you've done - > > > > > > > > > > > > > > 1. Perhaps you’ve thought about how this would work already — > > since > > > > we > > > > > > now > > > > > > > require a producer to specify a unique AppID across different > > > > instances > > > > > > of > > > > > > > an application, how would applications that run in the cloud > use > > > this > > > > > > > feature with auto scaling? > > > > > > > > > > > > > > 2. Making it easy for applications to get exactly-once > semantics > > > for > > > > a > > > > > > > consume-process-produce workflow is a great feature to have. To > > > > enable > > > > > > > this, the proposal now includes letting a producer initiate a > > write > > > > to > > > > > > the > > > > > > > offset topic as well (just like consumers do). The consumer > > > > coordinator > > > > > > > (which could be on a different broker than the txn coordinator) > > > would > > > > > > then > > > > > > > validate if the PID and producer epoch is valid before it > writes > > to > > > > the > > > > > > > offset topic along with the associated PID. This is a great > > feature > > > > > > though > > > > > > > I see 2 difficulties > > > > > > > > > > > > > > -- This needs the consumer coordinator to have a consistent > view > > of > > > > the > > > > > > > PID/epochs that is same as the view on the txn coordinator. > > > However, > > > > as > > > > > > the > > > > > > > offset and the transaction topics are different, the 2 > > coordinators > > > > > might > > > > > > > live on different brokers. > > > > > > > -- We now also have 2 internal topics - a transaction topic and > > the > > > > > > > __consumer_offsets topic. > > > > > > > > > > > > > > Maybe you’ve thought about this already and discarded it ... > let > > me > > > > > make > > > > > > a > > > > > > > somewhat crazy proposal — Why don’t we upgrade the transaction > > > topic > > > > to > > > > > > be > > > > > > > the new offsets topic as well? For consumers that want EoS > > > guarantees > > > > > for > > > > > > > a consume-process-produce pattern, the group.id is the same as > > the > > > > > > > transaction.app.id set for the producer. Assume that the > > > transaction > > > > > > topic > > > > > > > also stores consumer offsets. It stores both the transaction > > > metadata > > > > > > > messages as well as offset messages, both for transactional as > > well > > > > as > > > > > > > non-transactional consumers. Since the group.id of the > consumer > > > and > > > > > the > > > > > > > app.id of the producer is the same, the offsets associated > with > > a > > > > > > consumer > > > > > > > group and topic-partition end up in the same transaction topic > > > > > partition > > > > > > as > > > > > > > the transaction metadata messages. The transaction coordinator > > and > > > > the > > > > > > > consumer coordinator always live on the same broker since they > > both > > > > map > > > > > > to > > > > > > > the same partition in the transaction topic. Even if there are > > > > > failures, > > > > > > > they end up on the same new broker. Hence, they share the same > > and > > > > > > > consistent view of the PIDs, epochs and App IDs, whatever it > is. > > > The > > > > > > > consumer coordinator will skip over the transaction metadata > > > messages > > > > > > when > > > > > > > it bootstraps the offsets from this new topic for consumer > groups > > > > that > > > > > > are > > > > > > > not involved in a transaction and don’t have a txn id > associated > > > with > > > > > the > > > > > > > offset message in the transaction topic. The consumer > coordinator > > > > will > > > > > > > expose only committed offsets in cases of consumer groups that > > are > > > > > > involved > > > > > > > in a txn. It will also be able to validate the > > OffsetCommitRequests > > > > > > coming > > > > > > > from a transactional producer by ensuring that it is coming > from > > a > > > > > valid > > > > > > > PID, producer epoch since it uses the same view of this data > > > created > > > > by > > > > > > the > > > > > > > transaction coordinator (that lives on the same broker). And we > > > will > > > > > end > > > > > > up > > > > > > > with one internal topic, not too. > > > > > > > > > > > > > > This proposal offers better operational simplicity and fewer > > > internal > > > > > > > topics but there are some downsides that come with it — there > > are 2 > > > > > types > > > > > > > of messages in one topic (txn metadata ones and offset ones). > > Since > > > > > this > > > > > > > internal topic serves a dual purpose, it will be harder to name > > it > > > > and > > > > > > also > > > > > > > design a message format that includes the different types of > > > messages > > > > > > that > > > > > > > will live in the topic. Though the transaction topic already > > needs > > > to > > > > > > write > > > > > > > 5 different types of messages (the AppID->PID mapping, the > > BeginTxn > > > > > > > message, InsertTxn, PrepareCommit, Committed/Aborted) so maybe > > > adding > > > > > the > > > > > > > offset message isn't a big deal? > > > > > > > > > > > > > > Back when we introduced the offsets topic, we had discussed > > making > > > it > > > > > > more > > > > > > > general and allowing the producer to send offset commit > messages > > to > > > > it > > > > > > but > > > > > > > ended up creating a specialized topic to allow the consumer > > > > coordinator > > > > > > to > > > > > > > wall off and prevent unauthorized writes from consumers outside > > of > > > a > > > > > > group. > > > > > > > Jason can comment on the specifics but I don't believe that > goal > > of > > > > the > > > > > > new > > > > > > > consumer protocol was quite achieved. > > > > > > > > > > > > > > I have other comments on the message format, request names etc > > but > > > > > wanted > > > > > > > to get your thoughts on these 2 issues first :-) > > > > > > > > > > > > > > On Wed, Nov 30, 2016 at 2:19 PM Guozhang Wang < > > wangg...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > I have just created KIP-98 to enhance Kafka with exactly once > > > > > delivery > > > > > > > > semantics: > > > > > > > > > > > > > > > > * > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-> > > > > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging > > > > > > > > < > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-> > > > > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging > > > > > > > > >* > > > > > > > > > > > > > > > > This KIP adds a transactional messaging mechanism along with > an > > > > > > > idempotent > > > > > > > > producer implementation to make sure that 1) duplicated > > messages > > > > sent > > > > > > > from > > > > > > > > the same identified producer can be detected on the broker > > side, > > > > and > > > > > > 2) a > > > > > > > > group of messages sent within a transaction will atomically > be > > > > either > > > > > > > > reflected and fetchable to consumers or not as a whole. > > > > > > > > > > > > > > > > The above wiki page provides a high-level view of the > proposed > > > > > changes > > > > > > as > > > > > > > > well as summarized guarantees. Initial draft of the detailed > > > > > > > implementation > > > > > > > > design is described in this Google doc: > > > > > > > > > > > > > > > > https://docs.google.com/document/d/11Jqy_ > <https://docs.google.com/document/d/11Jqy_> > > > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF > > > > > > > > 0wSw9ra8 > > > > > > > > <https://docs.google.com/document/d/11Jqy_ > <https://docs.google.com/document/d/11Jqy_> > > > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF > > > > > > > 0wSw9ra8> > > > > > > > > > > > > > > > > > > > > > > > > We would love to hear your comments and suggestions. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > -- > > > > > > > Thanks, > > > > > > > Neha > > > > > > > > > > > > > > > > > > > > > > > > > > > >