Hey Jason/Neha, Yeah, clearly having a mandatory, generic init() method that initializes both transactions and topic metadata would be the ideal solution. This would solve the occasional complaint about blocking behavior during initialization of metadata (or at least shift it to a new complaint about an inability to initialize when the cluster isn't up in test environments). The challenge is that we can't do this because it isn't backwards compatible with existing apps that don't call init.
The alternative of having an optional generic init() call is a bit odd because to figure out if you need to call it you need to discover what it does, which is not generic, it initializes transactions. We can't really add more logic to init because it only gets invoked by transaction users so it doesn't really function as a generic init. What do you think of this solution: 1. Add a generic init() call which initializes both transactions and metadata 2. If you don't call init(), metadata is initialized on the first send (as now) and transactions are lazily initialized at the first beginTransaction() call. -Jay On Mon, Dec 12, 2016 at 9:17 PM, Jason Gustafson <ja...@confluent.io> wrote: > @Neha > > > 1. I think we should consider renaming initTransactions to just init() and > > moving the metadata initialization there. Let's make sure we don't add > APIs > > that are relevant to this proposal only. Instead, try to think what we'd > > propose if we were writing the producer from scratch today. I suspect we > > would end up with an init() API that would do the metadata initialization > > as well as the transaction stuff lazily. If so, let's make that change > now. > > > I think the only awkwardness with `init()` is that it would probably have > to be an optional API for non-transactional usage to support existing code. > I'm also not sure what metadata we can actually initialize at that point > since we don't know which topics will be produced to. That said, I'm also > not fond of the `initTransactions` name, and we may find other uses for a > generic `init()` in the future, so I'm in favor this renaming. > > > > 2. Along the same lines, let's think about the role of each id that the > > producer will have and see if everything still makes sense. For instance, > > we have quite a few per-producer-instance notions -- client.id, a > producer > > id and a transaction.app.id, some set via config and some generated > > on-the-fly. What role does each play, how do they relate to each other > and > > is there an opportunity to get rid of any. > > > The abundance of ids is super annoying. The producer ID is not actually > exposed in either the producer or consumer, but I'm not sure how successful > we'll be in hiding its existence from the user (you probably need to know > about it for debugging and administrative purposes at least). This issue > has been a continual thorn and I'm not sure I have a great answer. We have > been tempted to use client.id as the AppID at one point or another, but > its > current usage is to have the same value for all producers in an > application. The lack of an AppID meant that we would have to expose the > producer ID and the application would be responsible for persisting it. In > the use cases we looked at, it was simpler to let the application provide > its own ID through configuration. And in use cases where there was no > obvious ID to serve as the AppID, it seemed simple enough to let the > application generate its own. We also looked at removing the producer ID. > This was discussed somewhere above, but basically the idea is to store the > AppID in the message set header directly and avoid the mapping to producer > ID altogether. As long as batching isn't too bad, the impact on total size > may not be too bad, but we were ultimately more comfortable with a fixed > size ID. > > 3. I think we should definitely consider renaming transaction.app.id to > > something else. Given that we already have a notion of application.id > and > > it represents the entire Streams application, having transaction.app.id > > that represents a producer instance is confusing. I do understand that, > for > > Streams, the user doesn't have to set transaction.app.id as it will > likely > > be application.id+taskId (am I understanding that correctly?) > > > Your understanding is correct. The "transaction" prefix was intended to > make it clear that it was only needed for transactional usage. We've also > referred to the AppID as a producer "instance ID." This is more suggestive > of the fact that it needs to be unique within the producers of a particular > application. Maybe we could drop the "transaction" and use "instance.id" > or > "app.instance.id"? Not sure that's any better, but perhaps it avoids the > confusion with application.id? > > Thanks, > Jason > > On Mon, Dec 12, 2016 at 8:37 PM, Jason Gustafson <ja...@confluent.io> > wrote: > > > @Becket > > > > It has been a pain in many cases that we do not know the number of > >> messages in a message set, not sure if the OffsetDelta field in the > >> wrapper > >> message will address this. > > > > > > Interestingly, we had this in one of the design iterations, but we found > > in the prototype that we weren't really using it. Did you have a > particular > > use case in mind? I share the intuition that it may be helpful to know, > but > > I don't have a clear example in mind. In fact, in the initial version, we > > attempted to let the message set always represent a contiguous sequence > of > > messages. In that case, the message set only needed a base offset and a > > count of the number of messages, and the individual messages no longer > > needed the offset delta. We ultimately abandoned that because we were > > uncomfortable with its impact on compaction. > > > > -Jason > > > > On Mon, Dec 12, 2016 at 5:55 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > >> Andrew, > >> > >> As I mentioned above, in Kafka durability is supported via data > >> replication > >> instead of sync-flushing to disks. KIP-98 does not try to change that > part > >> of the Kafka: if all your replicas are gone at the same time before the > >> data was ever flushed to disks, then your data is lost today, and it > will > >> be still the case after KIP-98. > >> > >> As for atomicity, KIP-98 does provide all-or-nothing guarantee for > writes > >> to multiple partitions, and it is based on its existing durability > >> guarantees. So it is possible that if your durability breaks, then > >> atomicity will be violated: some of the committed transaction's messages > >> could be lost if the above scenarios happen while others can be > >> successfully appended. My take is that, if you have concerns that > Kafka's > >> replication mechanism i not good enough for your durability requirements > >> as > >> of today, then you should have the same level of concerns with > durability > >> if you want to use Kafka with KIP-98 as your transactional queuing > system > >> as well. > >> > >> > >> Guozhang > >> > >> > >> On Mon, Dec 12, 2016 at 1:49 AM, Andrew Schofield < > >> andrew_schofi...@live.com > >> > wrote: > >> > >> > Guozhang, > >> > Exactly. This is the crux of the matter. Because it's async, the log > is > >> > basically > >> > slightly out of date wrt to the run-time state and a failure of all > >> > replicas might > >> > take the data slightly back in time. > >> > > >> > Given this, do you think that KIP-98 gives an all-or-nothing, > >> > no-matter-what guarantee > >> > for Kafka transactions? I think the key is whether the data which is > >> > asynchronously > >> > flushed is guaranteed to be recovered atomically in all cases. > >> > Asynchronous but > >> > atomic would be good. > >> > > >> > Andrew Schofield > >> > IBM Watson and Cloud Platform > >> > > >> > > >> > > > >> > > From: Guozhang Wang <wangg...@gmail.com> > >> > > Sent: 09 December 2016 22:59 > >> > > To: dev@kafka.apache.org > >> > > Subject: Re: [DISCUSS] KIP-98: Exactly Once Delivery and > Transactional > >> > Messaging > >> > > > >> > > Onur, > >> > > > >> > > I understand your question now. So it is indeed possible that after > >> > > commitTxn() returned the messages could still be lost permanently if > >> all > >> > > replicas failed before the data was flushed to disk. This is the > >> virtue > >> > of > >> > > Kafka's design to reply on replication (probably in memory) for high > >> > > availability, hence async flushing. This scenario already exist > today > >> and > >> > > KIP-98 did not intend to change this factor in any ways. > >> > > > >> > > Guozhang > >> > > > >> > > > >> > > > >> > > >> > >> > >> > >> -- > >> -- Guozhang > >> > > > > >