Hi Jay,

I like the idea of having a single `init`, but I am not sure about the
specifics of the metadata initialisation (as Jason alluded to). More inline.

On Tue, Dec 13, 2016 at 6:01 AM, Jay Kreps <j...@confluent.io> wrote:

>    1. Add a generic init() call which initializes both transactions and
>    metadata
>

Would this initialise metadata for all topics? One advantage of doing the
metadata call during `send` is that we only retrieve metadata for the
subset of topics that you are producing to. For large clusters, retrieving
the metadata for all the topics is relatively expensive and I think users
would prefer to avoid that unless there are some concrete benefits. We
could pass the topics to `init`, but that seems a bit clunky.


>    2. If you don't call init(), metadata is initialized on the first send
>    (as now)


We need to maintain the logic to refresh the metadata on `send` anyway if
you try to send to a topic that is missing from the metadata (e.g. if it's
added after the `init` method is called, assuming that we don't expect
people to call `init` more than once) so that seems fine.


> and transactions are lazily initialized at the first beginTransaction()
> call.


I'll leave it to Jason to say if this is feasible. However, if it is, it
seems like we can just do this and avoid the `init` method altogether?

Ismael

On Tue, Dec 13, 2016 at 6:01 AM, Jay Kreps <j...@confluent.io> wrote:

> Hey Jason/Neha,
>
> Yeah, clearly having a mandatory, generic init() method that initializes
> both transactions and topic metadata would be the ideal solution. This
> would solve the occasional complaint about blocking behavior during
> initialization of metadata (or at least shift it to a new complaint about
> an inability to initialize when the cluster isn't up in test environments).
> The challenge is that we can't do this because it isn't backwards
> compatible with existing apps that don't call init.
>
> The alternative of having an optional generic init() call is a bit odd
> because to figure out if you need to call it you need to discover what it
> does, which is not generic, it initializes transactions. We can't really
> add more logic to init because it only gets invoked by transaction users so
> it doesn't really function as a generic init.
>
> What do you think of this solution:
>
>    1. Add a generic init() call which initializes both transactions and
>    metadata
>    2. If you don't call init(), metadata is initialized on the first send
>    (as now) and transactions are lazily initialized at the first
>    beginTransaction() call.
>
> -Jay
>
>
>
>
>
>
>
> On Mon, Dec 12, 2016 at 9:17 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > @Neha
> >
> >
> > 1. I think we should consider renaming initTransactions to just init()
> and
> > > moving the metadata initialization there. Let's make sure we don't add
> > APIs
> > > that are relevant to this proposal only. Instead, try to think what
> we'd
> > > propose if we were writing the producer from scratch today. I suspect
> we
> > > would end up with an init() API that would do the metadata
> initialization
> > > as well as the transaction stuff lazily. If so, let's make that change
> > now.
> >
> >
> > I think the only awkwardness with `init()` is that it would probably have
> > to be an optional API for non-transactional usage to support existing
> code.
> > I'm also not sure what metadata we can actually initialize at that point
> > since we don't know which topics will be produced to. That said, I'm also
> > not fond of the `initTransactions` name, and we may find other uses for a
> > generic `init()` in the future, so I'm in favor this renaming.
> >
> >
> > > 2. Along the same lines, let's think about the role of each id that the
> > > producer will have and see if everything still makes sense. For
> instance,
> > > we have quite a few per-producer-instance notions -- client.id, a
> > producer
> > > id and a transaction.app.id, some set via config and some generated
> > > on-the-fly. What role does each play, how do they relate to each other
> > and
> > > is there an opportunity to get rid of any.
> >
> >
> > The abundance of ids is super annoying. The producer ID is not actually
> > exposed in either the producer or consumer, but I'm not sure how
> successful
> > we'll be in hiding its existence from the user (you probably need to know
> > about it for debugging and administrative purposes at least). This issue
> > has been a continual thorn and I'm not sure I have a great answer. We
> have
> > been tempted to use client.id as the AppID at one point or another, but
> > its
> > current usage is to have the same value for all producers in an
> > application. The lack of an AppID meant that we would have to expose the
> > producer ID and the application would be responsible for persisting it.
> In
> > the use cases we looked at, it was simpler to let the application provide
> > its own ID through configuration. And in use cases where there was no
> > obvious ID to serve as the AppID, it seemed simple enough to let the
> > application generate its own. We also looked at removing the producer ID.
> > This was discussed somewhere above, but basically the idea is to store
> the
> > AppID in the message set header directly and avoid the mapping to
> producer
> > ID altogether. As long as batching isn't too bad, the impact on total
> size
> > may not be too bad, but we were ultimately more comfortable with a fixed
> > size ID.
> >
> > 3. I think we should definitely consider renaming transaction.app.id to
> > > something else. Given that we already have a notion of application.id
> > and
> > > it represents the entire Streams application, having
> transaction.app.id
> > > that represents a producer instance is confusing. I do understand that,
> > for
> > > Streams, the user doesn't have to set transaction.app.id as it will
> > likely
> > > be application.id+taskId (am I understanding that correctly?)
> >
> >
> > Your understanding is correct. The "transaction" prefix was intended to
> > make it clear that it was only needed for transactional usage. We've also
> > referred to the AppID as a producer "instance ID." This is more
> suggestive
> > of the fact that it needs to be unique within the producers of a
> particular
> > application. Maybe we could drop the "transaction" and use "instance.id"
> > or
> > "app.instance.id"? Not sure that's any better, but perhaps it avoids the
> > confusion with application.id?
> >
> > Thanks,
> > Jason
> >
> > On Mon, Dec 12, 2016 at 8:37 PM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > @Becket
> > >
> > > It has been a pain in many cases that we do not know the number of
> > >>    messages in a message set, not sure if the OffsetDelta field in the
> > >> wrapper
> > >>    message will address this.
> > >
> > >
> > > Interestingly, we had this in one of the design iterations, but we
> found
> > > in the prototype that we weren't really using it. Did you have a
> > particular
> > > use case in mind? I share the intuition that it may be helpful to know,
> > but
> > > I don't have a clear example in mind. In fact, in the initial version,
> we
> > > attempted to let the message set always represent a contiguous sequence
> > of
> > > messages. In that case, the message set only needed a base offset and a
> > > count of the number of messages, and the individual messages no longer
> > > needed the offset delta. We ultimately abandoned that because we were
> > > uncomfortable with its impact on compaction.
> > >
> > > -Jason
> > >
> > > On Mon, Dec 12, 2016 at 5:55 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > >> Andrew,
> > >>
> > >> As I mentioned above, in Kafka durability is supported via data
> > >> replication
> > >> instead of sync-flushing to disks. KIP-98 does not try to change that
> > part
> > >> of the Kafka: if all your replicas are gone at the same time before
> the
> > >> data was ever flushed to disks, then your data is lost today, and it
> > will
> > >> be still the case after KIP-98.
> > >>
> > >> As for atomicity, KIP-98 does provide all-or-nothing guarantee for
> > writes
> > >> to multiple partitions, and it is based on its existing durability
> > >> guarantees. So it is possible that if your durability breaks, then
> > >> atomicity will be violated: some of the committed transaction's
> messages
> > >> could be lost if the above scenarios happen while others can be
> > >> successfully appended. My take is that, if you have concerns that
> > Kafka's
> > >> replication mechanism i not good enough for your durability
> requirements
> > >> as
> > >> of today, then you should have the same level of concerns with
> > durability
> > >> if you want to use Kafka with KIP-98 as your transactional queuing
> > system
> > >> as well.
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Mon, Dec 12, 2016 at 1:49 AM, Andrew Schofield <
> > >> andrew_schofi...@live.com
> > >> > wrote:
> > >>
> > >> > Guozhang,
> > >> > Exactly. This is the crux of the matter. Because it's async, the log
> > is
> > >> > basically
> > >> > slightly out of date wrt to the run-time state and a failure of all
> > >> > replicas might
> > >> > take the data slightly back in time.
> > >> >
> > >> > Given this, do you think that KIP-98 gives an all-or-nothing,
> > >> > no-matter-what guarantee
> > >> > for Kafka transactions? I think the key is whether the data which is
> > >> > asynchronously
> > >> > flushed is guaranteed to be recovered atomically in all cases.
> > >> > Asynchronous but
> > >> > atomic would be good.
> > >> >
> > >> > Andrew Schofield
> > >> > IBM Watson and Cloud Platform
> > >> >
> > >> >
> > >> > >
> > >> > > From: Guozhang Wang <wangg...@gmail.com>
> > >> > > Sent: 09 December 2016 22:59
> > >> > > To: dev@kafka.apache.org
> > >> > > Subject: Re: [DISCUSS] KIP-98: Exactly Once Delivery and
> > Transactional
> > >> > Messaging
> > >> > >
> > >> > > Onur,
> > >> > >
> > >> > > I understand your question now. So it is indeed possible that
> after
> > >> > > commitTxn() returned the messages could still be lost permanently
> if
> > >> all
> > >> > > replicas failed before the data was flushed to disk. This is the
> > >> virtue
> > >> > of
> > >> > > Kafka's design to reply on replication (probably in memory) for
> high
> > >> > > availability, hence async flushing. This scenario already exist
> > today
> > >> and
> > >> > > KIP-98 did not intend to change this factor in any ways.
> > >> > >
> > >> > > Guozhang
> > >> > >
> > >> > >
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
>

Reply via email to