Hi Jay, I like the idea of having a single `init`, but I am not sure about the specifics of the metadata initialisation (as Jason alluded to). More inline.
On Tue, Dec 13, 2016 at 6:01 AM, Jay Kreps <j...@confluent.io> wrote: > 1. Add a generic init() call which initializes both transactions and > metadata > Would this initialise metadata for all topics? One advantage of doing the metadata call during `send` is that we only retrieve metadata for the subset of topics that you are producing to. For large clusters, retrieving the metadata for all the topics is relatively expensive and I think users would prefer to avoid that unless there are some concrete benefits. We could pass the topics to `init`, but that seems a bit clunky. > 2. If you don't call init(), metadata is initialized on the first send > (as now) We need to maintain the logic to refresh the metadata on `send` anyway if you try to send to a topic that is missing from the metadata (e.g. if it's added after the `init` method is called, assuming that we don't expect people to call `init` more than once) so that seems fine. > and transactions are lazily initialized at the first beginTransaction() > call. I'll leave it to Jason to say if this is feasible. However, if it is, it seems like we can just do this and avoid the `init` method altogether? Ismael On Tue, Dec 13, 2016 at 6:01 AM, Jay Kreps <j...@confluent.io> wrote: > Hey Jason/Neha, > > Yeah, clearly having a mandatory, generic init() method that initializes > both transactions and topic metadata would be the ideal solution. This > would solve the occasional complaint about blocking behavior during > initialization of metadata (or at least shift it to a new complaint about > an inability to initialize when the cluster isn't up in test environments). > The challenge is that we can't do this because it isn't backwards > compatible with existing apps that don't call init. > > The alternative of having an optional generic init() call is a bit odd > because to figure out if you need to call it you need to discover what it > does, which is not generic, it initializes transactions. We can't really > add more logic to init because it only gets invoked by transaction users so > it doesn't really function as a generic init. > > What do you think of this solution: > > 1. Add a generic init() call which initializes both transactions and > metadata > 2. If you don't call init(), metadata is initialized on the first send > (as now) and transactions are lazily initialized at the first > beginTransaction() call. > > -Jay > > > > > > > > On Mon, Dec 12, 2016 at 9:17 PM, Jason Gustafson <ja...@confluent.io> > wrote: > > > @Neha > > > > > > 1. I think we should consider renaming initTransactions to just init() > and > > > moving the metadata initialization there. Let's make sure we don't add > > APIs > > > that are relevant to this proposal only. Instead, try to think what > we'd > > > propose if we were writing the producer from scratch today. I suspect > we > > > would end up with an init() API that would do the metadata > initialization > > > as well as the transaction stuff lazily. If so, let's make that change > > now. > > > > > > I think the only awkwardness with `init()` is that it would probably have > > to be an optional API for non-transactional usage to support existing > code. > > I'm also not sure what metadata we can actually initialize at that point > > since we don't know which topics will be produced to. That said, I'm also > > not fond of the `initTransactions` name, and we may find other uses for a > > generic `init()` in the future, so I'm in favor this renaming. > > > > > > > 2. Along the same lines, let's think about the role of each id that the > > > producer will have and see if everything still makes sense. For > instance, > > > we have quite a few per-producer-instance notions -- client.id, a > > producer > > > id and a transaction.app.id, some set via config and some generated > > > on-the-fly. What role does each play, how do they relate to each other > > and > > > is there an opportunity to get rid of any. > > > > > > The abundance of ids is super annoying. The producer ID is not actually > > exposed in either the producer or consumer, but I'm not sure how > successful > > we'll be in hiding its existence from the user (you probably need to know > > about it for debugging and administrative purposes at least). This issue > > has been a continual thorn and I'm not sure I have a great answer. We > have > > been tempted to use client.id as the AppID at one point or another, but > > its > > current usage is to have the same value for all producers in an > > application. The lack of an AppID meant that we would have to expose the > > producer ID and the application would be responsible for persisting it. > In > > the use cases we looked at, it was simpler to let the application provide > > its own ID through configuration. And in use cases where there was no > > obvious ID to serve as the AppID, it seemed simple enough to let the > > application generate its own. We also looked at removing the producer ID. > > This was discussed somewhere above, but basically the idea is to store > the > > AppID in the message set header directly and avoid the mapping to > producer > > ID altogether. As long as batching isn't too bad, the impact on total > size > > may not be too bad, but we were ultimately more comfortable with a fixed > > size ID. > > > > 3. I think we should definitely consider renaming transaction.app.id to > > > something else. Given that we already have a notion of application.id > > and > > > it represents the entire Streams application, having > transaction.app.id > > > that represents a producer instance is confusing. I do understand that, > > for > > > Streams, the user doesn't have to set transaction.app.id as it will > > likely > > > be application.id+taskId (am I understanding that correctly?) > > > > > > Your understanding is correct. The "transaction" prefix was intended to > > make it clear that it was only needed for transactional usage. We've also > > referred to the AppID as a producer "instance ID." This is more > suggestive > > of the fact that it needs to be unique within the producers of a > particular > > application. Maybe we could drop the "transaction" and use "instance.id" > > or > > "app.instance.id"? Not sure that's any better, but perhaps it avoids the > > confusion with application.id? > > > > Thanks, > > Jason > > > > On Mon, Dec 12, 2016 at 8:37 PM, Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > @Becket > > > > > > It has been a pain in many cases that we do not know the number of > > >> messages in a message set, not sure if the OffsetDelta field in the > > >> wrapper > > >> message will address this. > > > > > > > > > Interestingly, we had this in one of the design iterations, but we > found > > > in the prototype that we weren't really using it. Did you have a > > particular > > > use case in mind? I share the intuition that it may be helpful to know, > > but > > > I don't have a clear example in mind. In fact, in the initial version, > we > > > attempted to let the message set always represent a contiguous sequence > > of > > > messages. In that case, the message set only needed a base offset and a > > > count of the number of messages, and the individual messages no longer > > > needed the offset delta. We ultimately abandoned that because we were > > > uncomfortable with its impact on compaction. > > > > > > -Jason > > > > > > On Mon, Dec 12, 2016 at 5:55 PM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > >> Andrew, > > >> > > >> As I mentioned above, in Kafka durability is supported via data > > >> replication > > >> instead of sync-flushing to disks. KIP-98 does not try to change that > > part > > >> of the Kafka: if all your replicas are gone at the same time before > the > > >> data was ever flushed to disks, then your data is lost today, and it > > will > > >> be still the case after KIP-98. > > >> > > >> As for atomicity, KIP-98 does provide all-or-nothing guarantee for > > writes > > >> to multiple partitions, and it is based on its existing durability > > >> guarantees. So it is possible that if your durability breaks, then > > >> atomicity will be violated: some of the committed transaction's > messages > > >> could be lost if the above scenarios happen while others can be > > >> successfully appended. My take is that, if you have concerns that > > Kafka's > > >> replication mechanism i not good enough for your durability > requirements > > >> as > > >> of today, then you should have the same level of concerns with > > durability > > >> if you want to use Kafka with KIP-98 as your transactional queuing > > system > > >> as well. > > >> > > >> > > >> Guozhang > > >> > > >> > > >> On Mon, Dec 12, 2016 at 1:49 AM, Andrew Schofield < > > >> andrew_schofi...@live.com > > >> > wrote: > > >> > > >> > Guozhang, > > >> > Exactly. This is the crux of the matter. Because it's async, the log > > is > > >> > basically > > >> > slightly out of date wrt to the run-time state and a failure of all > > >> > replicas might > > >> > take the data slightly back in time. > > >> > > > >> > Given this, do you think that KIP-98 gives an all-or-nothing, > > >> > no-matter-what guarantee > > >> > for Kafka transactions? I think the key is whether the data which is > > >> > asynchronously > > >> > flushed is guaranteed to be recovered atomically in all cases. > > >> > Asynchronous but > > >> > atomic would be good. > > >> > > > >> > Andrew Schofield > > >> > IBM Watson and Cloud Platform > > >> > > > >> > > > >> > > > > >> > > From: Guozhang Wang <wangg...@gmail.com> > > >> > > Sent: 09 December 2016 22:59 > > >> > > To: dev@kafka.apache.org > > >> > > Subject: Re: [DISCUSS] KIP-98: Exactly Once Delivery and > > Transactional > > >> > Messaging > > >> > > > > >> > > Onur, > > >> > > > > >> > > I understand your question now. So it is indeed possible that > after > > >> > > commitTxn() returned the messages could still be lost permanently > if > > >> all > > >> > > replicas failed before the data was flushed to disk. This is the > > >> virtue > > >> > of > > >> > > Kafka's design to reply on replication (probably in memory) for > high > > >> > > availability, hence async flushing. This scenario already exist > > today > > >> and > > >> > > KIP-98 did not intend to change this factor in any ways. > > >> > > > > >> > > Guozhang > > >> > > > > >> > > > > >> > > > > >> > > > >> > > >> > > >> > > >> -- > > >> -- Guozhang > > >> > > > > > > > > >