@Neha
1. I think we should consider renaming initTransactions to just init() and > moving the metadata initialization there. Let's make sure we don't add APIs > that are relevant to this proposal only. Instead, try to think what we'd > propose if we were writing the producer from scratch today. I suspect we > would end up with an init() API that would do the metadata initialization > as well as the transaction stuff lazily. If so, let's make that change now. I think the only awkwardness with `init()` is that it would probably have to be an optional API for non-transactional usage to support existing code. I'm also not sure what metadata we can actually initialize at that point since we don't know which topics will be produced to. That said, I'm also not fond of the `initTransactions` name, and we may find other uses for a generic `init()` in the future, so I'm in favor this renaming. > 2. Along the same lines, let's think about the role of each id that the > producer will have and see if everything still makes sense. For instance, > we have quite a few per-producer-instance notions -- client.id, a producer > id and a transaction.app.id, some set via config and some generated > on-the-fly. What role does each play, how do they relate to each other and > is there an opportunity to get rid of any. The abundance of ids is super annoying. The producer ID is not actually exposed in either the producer or consumer, but I'm not sure how successful we'll be in hiding its existence from the user (you probably need to know about it for debugging and administrative purposes at least). This issue has been a continual thorn and I'm not sure I have a great answer. We have been tempted to use client.id as the AppID at one point or another, but its current usage is to have the same value for all producers in an application. The lack of an AppID meant that we would have to expose the producer ID and the application would be responsible for persisting it. In the use cases we looked at, it was simpler to let the application provide its own ID through configuration. And in use cases where there was no obvious ID to serve as the AppID, it seemed simple enough to let the application generate its own. We also looked at removing the producer ID. This was discussed somewhere above, but basically the idea is to store the AppID in the message set header directly and avoid the mapping to producer ID altogether. As long as batching isn't too bad, the impact on total size may not be too bad, but we were ultimately more comfortable with a fixed size ID. 3. I think we should definitely consider renaming transaction.app.id to > something else. Given that we already have a notion of application.id and > it represents the entire Streams application, having transaction.app.id > that represents a producer instance is confusing. I do understand that, for > Streams, the user doesn't have to set transaction.app.id as it will likely > be application.id+taskId (am I understanding that correctly?) Your understanding is correct. The "transaction" prefix was intended to make it clear that it was only needed for transactional usage. We've also referred to the AppID as a producer "instance ID." This is more suggestive of the fact that it needs to be unique within the producers of a particular application. Maybe we could drop the "transaction" and use "instance.id" or "app.instance.id"? Not sure that's any better, but perhaps it avoids the confusion with application.id? Thanks, Jason On Mon, Dec 12, 2016 at 8:37 PM, Jason Gustafson <ja...@confluent.io> wrote: > @Becket > > It has been a pain in many cases that we do not know the number of >> messages in a message set, not sure if the OffsetDelta field in the >> wrapper >> message will address this. > > > Interestingly, we had this in one of the design iterations, but we found > in the prototype that we weren't really using it. Did you have a particular > use case in mind? I share the intuition that it may be helpful to know, but > I don't have a clear example in mind. In fact, in the initial version, we > attempted to let the message set always represent a contiguous sequence of > messages. In that case, the message set only needed a base offset and a > count of the number of messages, and the individual messages no longer > needed the offset delta. We ultimately abandoned that because we were > uncomfortable with its impact on compaction. > > -Jason > > On Mon, Dec 12, 2016 at 5:55 PM, Guozhang Wang <wangg...@gmail.com> wrote: > >> Andrew, >> >> As I mentioned above, in Kafka durability is supported via data >> replication >> instead of sync-flushing to disks. KIP-98 does not try to change that part >> of the Kafka: if all your replicas are gone at the same time before the >> data was ever flushed to disks, then your data is lost today, and it will >> be still the case after KIP-98. >> >> As for atomicity, KIP-98 does provide all-or-nothing guarantee for writes >> to multiple partitions, and it is based on its existing durability >> guarantees. So it is possible that if your durability breaks, then >> atomicity will be violated: some of the committed transaction's messages >> could be lost if the above scenarios happen while others can be >> successfully appended. My take is that, if you have concerns that Kafka's >> replication mechanism i not good enough for your durability requirements >> as >> of today, then you should have the same level of concerns with durability >> if you want to use Kafka with KIP-98 as your transactional queuing system >> as well. >> >> >> Guozhang >> >> >> On Mon, Dec 12, 2016 at 1:49 AM, Andrew Schofield < >> andrew_schofi...@live.com >> > wrote: >> >> > Guozhang, >> > Exactly. This is the crux of the matter. Because it's async, the log is >> > basically >> > slightly out of date wrt to the run-time state and a failure of all >> > replicas might >> > take the data slightly back in time. >> > >> > Given this, do you think that KIP-98 gives an all-or-nothing, >> > no-matter-what guarantee >> > for Kafka transactions? I think the key is whether the data which is >> > asynchronously >> > flushed is guaranteed to be recovered atomically in all cases. >> > Asynchronous but >> > atomic would be good. >> > >> > Andrew Schofield >> > IBM Watson and Cloud Platform >> > >> > >> > > >> > > From: Guozhang Wang <wangg...@gmail.com> >> > > Sent: 09 December 2016 22:59 >> > > To: dev@kafka.apache.org >> > > Subject: Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional >> > Messaging >> > > >> > > Onur, >> > > >> > > I understand your question now. So it is indeed possible that after >> > > commitTxn() returned the messages could still be lost permanently if >> all >> > > replicas failed before the data was flushed to disk. This is the >> virtue >> > of >> > > Kafka's design to reply on replication (probably in memory) for high >> > > availability, hence async flushing. This scenario already exist today >> and >> > > KIP-98 did not intend to change this factor in any ways. >> > > >> > > Guozhang >> > > >> > > >> > > >> > >> >> >> >> -- >> -- Guozhang >> > >