@Ewen, good point about batching. Yes, it would be tricky if we want to do
a per-key conditional produce. My understanding is that the prerequisite of
this KIP is:
1. Single producer for each partition.
2. Acks=-1, max.in.flight.request.per.connection=1, retries=SOME_BIG_NUMBER

The major problem it tries to solve is exact once produce, i.e. solve the
duplicates from producer side. In that case, a batch will be considered as
atomic. The only possibility of a batch got rejected should be it is
already appended. So the producer should just move on.

It looks to me even a transient multiple producer scenario will cause issue
because user need to think about what should do if a request got rejected
and the answer varies for different use cases.

Thanks,

Jiangjie (Becket) Qin

On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin <b...@kirw.in> wrote:

> So I had another look at the 'Idempotent Producer' proposal this
> afternoon, and made a few notes on how I think they compare; if I've
> made any mistakes, I'd be delighted if someone with more context on
> the idempotent producer design would correct me.
>
> As a first intuition, you can think of the 'conditional publish'
> proposal as the special case of the 'idempotent producer' idea, where
> there's just a single producer per-partition. The key observation here
> is: if there's only one producer, you can conflate the 'sequence
> number' and the expected offset. The conditional publish proposal uses
> existing Kafka offset APIs for roughly the same things as the
> idempotent producer proposal uses sequence numbers for -- eg. instead
> of having a "lease PID" API that returns the current sequence number,
> we can use the existing 'offset API' to retrieve the upcoming offset.
>
> Both proposals attempt to deal with the situation where there are
> transiently multiple publishers for the same partition (and PID). The
> idempotent producer setup tracks a generation id for each pid, and
> discards any writes with a generation id smaller than the latest
> value. Conditional publish is 'first write wins' -- and instead of
> dropping duplicates on the server, it returns an error to the client.
> The duplicate-handling behaviour (dropping vs. erroring) has some
> interesting consequences:
>
> - If all producers are producing the same stream of messages, silently
> dropping duplicates on the server is more convenient. (Suppose we have
> a batch of messages 0-9, and the high-water mark on the server is 7.
> Idempotent producer, as I read it, would append 7-9 to the partition
> and return success; meanwhile, conditional publish would fail the
> entire batch.)
>
> - If producers might be writing different streams of messages, the
> proposed behaviour of the idempotent producer is probably worse --
> since it can silently interleave messages from two different
> producers. This can be a problem for some commit-log style use-cases,
> since it can transform a valid series of operations into an invalid
> one.
>
> - Given the error-on-duplicate behaviour, it's possible to implement
> deduplication on the client. (Sketch: if a publish returns an error
> for some partition, fetch the upcoming offset / sequence number for
> that partition, and discard all messages with a smaller offset on the
> client before republishing.)
>
> I think this makes the erroring behaviour more general, though
> deduplicating saves a roundtrip or two at conflict time.
>
> I'm less clear about the behaviour of the generation id, or what
> happens when (say) two producers with the same generation id are spun
> up at the same time. I'd be interested in hearing other folks'
> comments on this.
>
> Ewen: I'm not sure I understand the questions well enough to answer
> properly, but some quick notes:
> - I don't think it makes sense to assign an expected offset without
> already having assigned a partition. If the producer code is doing the
> partition assignment, it should probably do the offset assignment
> too... or we could just let application code handle both.
> - I'm not aware of any case where reassigning offsets to messages
> automatically after an offset mismatch makes sense: in the cases we've
> discussed, it seems like either it's safe to drop duplicates, or we
> want to handle the error at the application level.
>
> I'm going to try and come with an idempotent-producer-type example
> that works with the draft patch in the next few days, so hopefully
> we'll have something more concrete to discuss. Otherwise -- if you
> have a clear idea of how eg. sequence number assignment would work in
> the idempotent-producer proposal, we could probably translate that
> over to get the equivalent for the conditional publish API.
>
>
>
> On Fri, Jul 24, 2015 at 2:16 AM, Ewen Cheslack-Postava
> <e...@confluent.io> wrote:
> > @Becket - for compressed batches, I think this just works out given the
> KIP
> > as described. Without the change you're referring to, it still only makes
> > sense to batch messages with this KIP if all the expected offsets are
> > sequential (else some messages are guaranteed to fail). I think that
> > probably just works out, but raises an issue I brought up on the KIP
> call.
> >
> > Batching can be a bit weird with this proposal. If you try to write key A
> > and key B, the second operation is dependent on the first. Which means to
> > make an effective client for this, we need to keep track of per-partition
> > offsets so we can set expected offsets properly. For example, if A was
> > expected to publish at offset 10, then if B was published to the same
> > partition, we need to make sure it's marked as expected offset 11
> (assuming
> > no subpartition high water marks). We either need to have the application
> > keep track of this itself and set the offsets, which requires that it
> know
> > about how keys map to partitions, or the client needs to manage this
> > process. But if the client manages it, I think the client gets quite a
> bit
> > more complicated. If the produce request containing A fails, what happens
> > to B? Are there retries that somehow update the expected offset, or do we
> > just give up since we know it's always going to fail with the expected
> > offset that was automatically assigned to it?
> >
> > One way to handle this is to use Yasuhiro's idea of increasing the
> > granularity of high watermarks using subpartitions. But I guess my
> question
> > is: if one producer client is writing many keys, and some of those keys
> are
> > produced to the same partition, and those messages are batched, what
> > happens? Do we end up with lots of failed messages? Or do we have
> > complicated logic in the producer to figure out what the right expected
> > offset for each message is? Or do they all share the same base expected
> > offset as in the compressed case, in which case they all share the same
> > fate and subpartitioning doesn't help? Or is there a simpler solution I'm
> > just not seeing? Maybe this just disables batching entirely and
> throughput
> > isn't an issue in these cases?
> >
> > Sorry, I know that's probably not entirely clear, but that's because I'm
> > very uncertain of how batching works with this KIP.
> >
> >
> > On how this relates to other proposals: I think it might also be helpful
> to
> > get an overview of all the proposals for relevant modifications to
> > producers/produce requests since many of these proposals are possibly
> > alternatives (though some may not be mutually exclusive). Many people
> don't
> > have all the context from the past couple of years of the project. Are
> > there any other relevant wikis or docs besides the following?
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
> >
> > -Ewen
> >
> > On Wed, Jul 22, 2015 at 11:18 AM, Gwen Shapira <gshap...@cloudera.com>
> > wrote:
> >
> >> Tangent: I think we should complete the move of Produce / Fetch RPC to
> >> the client libraries before we add more revisions to this protocol.
> >>
> >> On Wed, Jul 22, 2015 at 11:02 AM, Jiangjie Qin
> >> <j...@linkedin.com.invalid> wrote:
> >> > I missed yesterday's KIP hangout. I'm currently working on another KIP
> >> for
> >> > enriched metadata of messages. Guozhang has already created a wiki
> page
> >> > before (
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
> >> ).
> >> > We plan to fill the relative offset to the offset field in the batch
> sent
> >> > by producer to avoid broker side re-compression. The message offset
> would
> >> > become batch base offset + relative offset. I guess maybe the expected
> >> > offset in KIP-27 can be only set for base offset? Would that affect
> >> certain
> >> > use cases?
> >> >
> >> > For Jun's comments, I am not sure I completely get it. I think the
> >> producer
> >> > only sends one batch per partition in a request. So either that batch
> is
> >> > appended or not. Why a batch would be partially committed?
> >> >
> >> > Thanks,
> >> >
> >> > Jiangjie (Becket) Qin
> >> >
> >> > On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin <b...@kirw.in> wrote:
> >> >
> >> >> That's a fair point. I've added some imagined job logic to the KIP,
> so
> >> >> we can make sure the proposal stays in sync with the usages we're
> >> >> discussing. (The logic is just a quick sketch for now -- I expect
> I'll
> >> >> need to elaborate it as we get into more detail, or to address other
> >> >> concerns...)
> >> >>
> >> >> On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao <j...@confluent.io> wrote:
> >> >> > For 1, yes, when there is a transient leader change, it's
> guaranteed
> >> >> that a
> >> >> > prefix of the messages in a request will be committed. However, it
> >> seems
> >> >> > that the client needs to know what subset of messages are
> committed in
> >> >> > order to resume the sending. Then the question is how.
> >> >> >
> >> >> > As Flavio indicated, for the use cases that you listed, it would be
> >> >> useful
> >> >> > to figure out the exact logic by using this feature. For example,
> in
> >> the
> >> >> > partition K/V store example, when we fail over to a new writer to
> the
> >> >> > commit log, the zombie writer can publish new messages to the log
> >> after
> >> >> the
> >> >> > new writer takes over, but before it publishes any message. We
> >> probably
> >> >> > need to outline how this case can be handled properly.
> >> >> >
> >> >> > Thanks,
> >> >> >
> >> >> > Jun
> >> >> >
> >> >> > On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin <b...@kirw.in> wrote:
> >> >> >
> >> >> >> Hi Jun,
> >> >> >>
> >> >> >> Thanks for the close reading! Responses inline.
> >> >> >>
> >> >> >> > Thanks for the write-up. The single producer use case you
> mentioned
> >> >> makes
> >> >> >> > sense. It would be useful to include that in the KIP wiki.
> >> >> >>
> >> >> >> Great -- I'll make sure that the wiki is clear about this.
> >> >> >>
> >> >> >> > 1. What happens when the leader of the partition changes in the
> >> middle
> >> >> >> of a
> >> >> >> > produce request? In this case, the producer client is not sure
> >> whether
> >> >> >> the
> >> >> >> > request succeeds or not. If there is only a single message in
> the
> >> >> >> request,
> >> >> >> > the producer can just resend the request. If it sees an
> >> OffsetMismatch
> >> >> >> > error, it knows that the previous send actually succeeded and
> can
> >> >> proceed
> >> >> >> > with the next write. This is nice since it not only allows the
> >> >> producer
> >> >> >> to
> >> >> >> > proceed during transient failures in the broker, it also avoids
> >> >> >> duplicates
> >> >> >> > during producer resend. One caveat is when there are multiple
> >> >> messages in
> >> >> >> > the same partition in a produce request. The issue is that in
> our
> >> >> current
> >> >> >> > replication protocol, it's possible for some, but not all
> messages
> >> in
> >> >> the
> >> >> >> > request to be committed. This makes resend a bit harder to deal
> >> with
> >> >> >> since
> >> >> >> > on receiving an OffsetMismatch error, it's not clear which
> messages
> >> >> have
> >> >> >> > been committed. One possibility is to expect that compression is
> >> >> enabled,
> >> >> >> > in which case multiple messages are compressed into a single
> >> message.
> >> >> I
> >> >> >> was
> >> >> >> > thinking that another possibility is for the broker to return
> the
> >> >> current
> >> >> >> > high watermark when sending an OffsetMismatch error. Based on
> this
> >> >> info,
> >> >> >> > the producer can resend the subset of messages that have not
> been
> >> >> >> > committed. However, this may not work in a compacted topic since
> >> there
> >> >> >> can
> >> >> >> > be holes in the offset.
> >> >> >>
> >> >> >> This is a excellent question. It's my understanding that at least
> a
> >> >> >> *prefix* of messages will be committed (right?) -- which seems to
> be
> >> >> >> enough for many cases. I'll try and come up with a more concrete
> >> >> >> answer here.
> >> >> >>
> >> >> >> > 2. Is this feature only intended to be used with ack = all? The
> >> client
> >> >> >> > doesn't get the offset with ack = 0. With ack = 1, it's possible
> >> for a
> >> >> >> > previously acked message to be lost during leader transition,
> which
> >> >> will
> >> >> >> > make the client logic more complicated.
> >> >> >>
> >> >> >> It's true that acks = 0 doesn't seem to be particularly useful; in
> >> all
> >> >> >> the cases I've come across, the client eventually wants to know
> about
> >> >> >> the mismatch error. However, it seems like there are some cases
> where
> >> >> >> acks = 1 would be fine -- eg. in a bulk load of a fixed dataset,
> >> >> >> losing messages during a leader transition just means you need to
> >> >> >> rewind / restart the load, which is not especially catastrophic.
> For
> >> >> >> many other interesting cases, acks = all is probably preferable.
> >> >> >>
> >> >> >> > 3. How does the producer client know the offset to send the
> first
> >> >> >> message?
> >> >> >> > Do we need to expose an API in producer to get the current high
> >> >> >> watermark?
> >> >> >>
> >> >> >> You're right, it might be irritating to have to go through the
> >> >> >> consumer API just for this. There are some cases where the offsets
> >> are
> >> >> >> already available -- like the commit-log-for-KV-store example --
> but
> >> >> >> in general, being able to get the offsets from the producer
> interface
> >> >> >> does sound convenient.
> >> >> >>
> >> >> >> > We plan to have a KIP discussion meeting tomorrow at 11am PST.
> >> Perhaps
> >> >> >> you
> >> >> >> > can describe this KIP a bit then?
> >> >> >>
> >> >> >> Sure, happy to join.
> >> >> >>
> >> >> >> > Thanks,
> >> >> >> >
> >> >> >> > Jun
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> > On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin <b...@kirw.in>
> wrote:
> >> >> >> >
> >> >> >> >> Just wanted to flag a little discussion that happened on the
> >> ticket:
> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259
> >> >> >> >>
> >> >> >> >> In particular, Yasuhiro Matsuda proposed an interesting
> variant on
> >> >> >> >> this that performs the offset check on the message key
> (instead of
> >> >> >> >> just the partition), with bounded space requirements, at the
> cost
> >> of
> >> >> >> >> potentially some spurious failures. (ie. the produce request
> may
> >> fail
> >> >> >> >> even if that particular key hasn't been updated recently.) This
> >> >> >> >> addresses a couple of the drawbacks of the per-key approach
> >> mentioned
> >> >> >> >> at the bottom of the KIP.
> >> >> >> >>
> >> >> >> >> On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin <b...@kirw.in>
> wrote:
> >> >> >> >> > Hi all,
> >> >> >> >> >
> >> >> >> >> > So, perhaps it's worth adding a couple specific examples of
> >> where
> >> >> this
> >> >> >> >> > feature is useful, to make this a bit more concrete:
> >> >> >> >> >
> >> >> >> >> > - Suppose I'm using Kafka as a commit log for a partitioned
> KV
> >> >> store,
> >> >> >> >> > like Samza or Pistachio (?) do. We bootstrap the process
> state
> >> by
> >> >> >> >> > reading from that partition, and log all state updates to
> that
> >> >> >> >> > partition when we're running. Now imagine that one of my
> >> processes
> >> >> >> >> > locks up -- GC or similar -- and the system transitions that
> >> >> partition
> >> >> >> >> > over to another node. When the GC is finished, the old
> 'owner'
> >> of
> >> >> that
> >> >> >> >> > partition might still be trying to write to the commit log at
> >> the
> >> >> same
> >> >> >> >> > as the new one is. A process might detect this by noticing
> that
> >> the
> >> >> >> >> > offset of the published message is bigger than it thought the
> >> >> upcoming
> >> >> >> >> > offset was, which implies someone else has been writing to
> the
> >> >> log...
> >> >> >> >> > but by then it's too late, and the commit log is already
> >> corrupt.
> >> >> With
> >> >> >> >> > a 'conditional produce', one of those processes will have
> it's
> >> >> publish
> >> >> >> >> > request refused -- so we've avoided corrupting the state.
> >> >> >> >> >
> >> >> >> >> > - Envision some copycat-like system, where we have some
> sharded
> >> >> >> >> > postgres setup and we're tailing each shard into its own
> >> partition.
> >> >> >> >> > Normally, it's fairly easy to avoid duplicates here: we can
> >> track
> >> >> >> >> > which offset in the WAL corresponds to which offset in Kafka,
> >> and
> >> >> we
> >> >> >> >> > know how many messages we've written to Kafka already, so the
> >> >> state is
> >> >> >> >> > very simple. However, it is possible that for a moment --
> due to
> >> >> >> >> > rebalancing or operator error or some other thing -- two
> >> different
> >> >> >> >> > nodes are tailing the same postgres shard at once! Normally
> this
> >> >> would
> >> >> >> >> > introduce duplicate messages, but by specifying the expected
> >> >> offset,
> >> >> >> >> > we can avoid this.
> >> >> >> >> >
> >> >> >> >> > So perhaps it's better to say that this is useful when a
> single
> >> >> >> >> > producer is *expected*, but multiple producers are
> *possible*?
> >> (In
> >> >> the
> >> >> >> >> > same way that the high-level consumer normally has 1 consumer
> >> in a
> >> >> >> >> > group reading from a partition, but there are small windows
> >> where
> >> >> more
> >> >> >> >> > than one might be reading at the same time.) This is also the
> >> >> spirit
> >> >> >> >> > of the 'runtime cost' comment -- in the common case, where
> >> there is
> >> >> >> >> > little to no contention, there's no performance overhead
> >> either. I
> >> >> >> >> > mentioned this a little in the Motivation section -- maybe I
> >> should
> >> >> >> >> > flesh that out a little bit?
> >> >> >> >> >
> >> >> >> >> > For me, the motivation to work this up was that I kept
> running
> >> into
> >> >> >> >> > cases, like the above, where the existing API was
> >> >> almost-but-not-quite
> >> >> >> >> > enough to give the guarantees I was looking for -- and the
> >> >> extension
> >> >> >> >> > needed to handle those cases too was pretty small and
> >> >> natural-feeling.
> >> >> >> >> >
> >> >> >> >> > On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh <
> >> asi...@cloudera.com
> >> >> >
> >> >> >> >> wrote:
> >> >> >> >> >> Good concept. I have a question though.
> >> >> >> >> >>
> >> >> >> >> >> Say there are two producers A and B. Both producers are
> >> producing
> >> >> to
> >> >> >> >> same
> >> >> >> >> >> partition.
> >> >> >> >> >> - A sends a message with expected offset, x1
> >> >> >> >> >> - Broker accepts is and sends an Ack
> >> >> >> >> >> - B sends a message with expected offset, x1
> >> >> >> >> >> - Broker rejects it, sends nack
> >> >> >> >> >> - B sends message again with expected offset, x1+1
> >> >> >> >> >> - Broker accepts it and sends Ack
> >> >> >> >> >> I guess this is what this KIP suggests, right? If yes, then
> how
> >> >> does
> >> >> >> >> this
> >> >> >> >> >> ensure that same message will not be written twice when two
> >> >> producers
> >> >> >> >> are
> >> >> >> >> >> producing to same partition? Producer on receiving a nack
> will
> >> try
> >> >> >> again
> >> >> >> >> >> with next offset and will keep doing so till the message is
> >> >> accepted.
> >> >> >> >> Am I
> >> >> >> >> >> missing something?
> >> >> >> >> >>
> >> >> >> >> >> Also, you have mentioned on KIP, "it imposes little to no
> >> runtime
> >> >> >> cost
> >> >> >> >> in
> >> >> >> >> >> memory or time", I think that is not true for time. With
> this
> >> >> >> approach
> >> >> >> >> >> producers' performance will reduce proportionally to number
> of
> >> >> >> producers
> >> >> >> >> >> writing to same partition. Please correct me if I am missing
> >> out
> >> >> >> >> something.
> >> >> >> >> >>
> >> >> >> >> >>
> >> >> >> >> >> On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat <
> >> >> >> >> >> gharatmayures...@gmail.com> wrote:
> >> >> >> >> >>
> >> >> >> >> >>> If we have 2 producers producing to a partition, they can
> be
> >> out
> >> >> of
> >> >> >> >> order,
> >> >> >> >> >>> then how does one producer know what offset to expect as it
> >> does
> >> >> not
> >> >> >> >> >>> interact with other producer?
> >> >> >> >> >>>
> >> >> >> >> >>> Can you give an example flow that explains how it works
> with
> >> >> single
> >> >> >> >> >>> producer and with multiple producers?
> >> >> >> >> >>>
> >> >> >> >> >>>
> >> >> >> >> >>> Thanks,
> >> >> >> >> >>>
> >> >> >> >> >>> Mayuresh
> >> >> >> >> >>>
> >> >> >> >> >>> On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira <
> >> >> >> >> >>> fpjunque...@yahoo.com.invalid> wrote:
> >> >> >> >> >>>
> >> >> >> >> >>> > I like this feature, it reminds me of conditional
> updates in
> >> >> >> >> zookeeper.
> >> >> >> >> >>> > I'm not sure if it'd be best to have some mechanism for
> >> fencing
> >> >> >> >> rather
> >> >> >> >> >>> than
> >> >> >> >> >>> > a conditional write like you're proposing. The reason I'm
> >> >> saying
> >> >> >> >> this is
> >> >> >> >> >>> > that the conditional write applies to requests
> individually,
> >> >> >> while it
> >> >> >> >> >>> > sounds like you want to make sure that there is a single
> >> client
> >> >> >> >> writing
> >> >> >> >> >>> so
> >> >> >> >> >>> > over multiple requests.
> >> >> >> >> >>> >
> >> >> >> >> >>> > -Flavio
> >> >> >> >> >>> >
> >> >> >> >> >>> > > On 17 Jul 2015, at 07:30, Ben Kirwin <b...@kirw.in>
> wrote:
> >> >> >> >> >>> > >
> >> >> >> >> >>> > > Hi there,
> >> >> >> >> >>> > >
> >> >> >> >> >>> > > I just added a KIP for a 'conditional publish'
> operation:
> >> a
> >> >> >> simple
> >> >> >> >> >>> > > CAS-like mechanism for the Kafka producer. The wiki
> page
> >> is
> >> >> >> here:
> >> >> >> >> >>> > >
> >> >> >> >> >>> > >
> >> >> >> >> >>> >
> >> >> >> >> >>>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish
> >> >> >> >> >>> > >
> >> >> >> >> >>> > > And there's some previous discussion on the ticket and
> the
> >> >> users
> >> >> >> >> list:
> >> >> >> >> >>> > >
> >> >> >> >> >>> > > https://issues.apache.org/jira/browse/KAFKA-2260
> >> >> >> >> >>> > >
> >> >> >> >> >>> > >
> >> >> >> >> >>> >
> >> >> >> >> >>>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E
> >> >> >> >> >>> > >
> >> >> >> >> >>> > > As always, comments and suggestions are very welcome.
> >> >> >> >> >>> > >
> >> >> >> >> >>> > > Thanks,
> >> >> >> >> >>> > > Ben
> >> >> >> >> >>> >
> >> >> >> >> >>> >
> >> >> >> >> >>>
> >> >> >> >> >>>
> >> >> >> >> >>> --
> >> >> >> >> >>> -Regards,
> >> >> >> >> >>> Mayuresh R. Gharat
> >> >> >> >> >>> (862) 250-7125
> >> >> >> >> >>>
> >> >> >> >> >>
> >> >> >> >> >>
> >> >> >> >> >>
> >> >> >> >> >> --
> >> >> >> >> >>
> >> >> >> >> >> Regards,
> >> >> >> >> >> Ashish
> >> >> >> >>
> >> >> >>
> >> >>
> >>
> >
> >
> >
> > --
> > Thanks,
> > Ewen
>

Reply via email to