This is a very nice summary of the consistency / correctness issues
possible with a commit log.

> (assuming it’s publishing asynchronously and in an open loop)

It's perhaps already clear to folks here, but -- if you *don't* do that,
and instead only send one batch of messages at a time and check the result,
you don't have the interleaving issue. (Of course, that means you give up
pipelining batches...)
On Aug 10, 2015 2:46 PM, "Flavio Junqueira" <f...@apache.org> wrote:

> I've been trying to understand what is being proposed in this KIP and I've
> put down some notes with some feedback from Ben that I wanted to share for
> feedback. I'm not really following the flow of the thread, since I've read
> a few sources to get to this, and I apologize for that.
>
> Here is how I see it t a high level. There are really two problems being
> discussed in the context of this KIP:
> Single writer with failover:
> Consistent logs
>
> Single writer with failover
> The idea is that at any time there must be at most one publisher active.
> To get high availability, we can’t rely on a single process to be such a
> publisher and consequently we need the failover part: if the current active
> publisher crashes, then another publisher takes over and becomes active.
> One important issue with scenarios like this is that during transitions
> from one active publisher to another, there could be races and two
> publishers end up interleaving messages in a topic/partition/key.
>
> Why is this interleaving bad? This is really application specific, but one
> general way of seeing this is that only one process has the authoritative
> application state to generate messages to publish. Transitioning from an
> active publisher to another, typically requires recovering state or
> performing some kind of coordination. If no such recovery is required, then
> we are essentially in the multi-writer space. The commit log use case is a
> general one mentioned in the KIP description.
>
> Consistent logs
> Consistent logs might not be the best term here, but I’m using it to
> describe the need of having the messages in a topic/partition/key
> reflecting consistently the state of the application. For example, some
> applications might be OK with a published sequence:
>
> A B B C (e.g., value = 10)
>
> in the case the messages are idempotent operations, but others might
> really require:
>
> A B C (e.g., value += 10)
>
> if they aren’t idempotent operations. Order and gaps are also an issue, so
> some applications might be OK with:
>
> A C B (e.g., value += x)
>
> and skipping B altogether might be ok if B has no side-effects (e.g.,
> operation associated to B has failed).
>
> Putting things together
> The current KIP-27 proposal seems to do a good job with providing a
> consistent log in the absence of concurrency. It enables publishers to
> re-publish messages without duplication, which is one requirement for
> exactly-once semantics. Gaps need to be handled by the publisher. For
> example, if the publisher publishes A B C (assuming it’s publishing
> asynchronously and in an open loop), it could have A succeeding but not B
> and C. In this case, it needs to redo the publish of B and C. It could also
> have B failing and C succeeding, in which case the publisher repeats B and
> C.
>
> A really nice feature of the current proposal is that it is a simple
> primitive that enables the implementation of publishers with different
> delivery guarantees. It doesn’t seem to be well suited to the first problem
> of implementing a single writer with failover, however. It allows runs in
> which two producers interleave messages because the mechanism focuses on a
> single message. The single writer might not even care about duplicates and
> gaps depending on the application, but it might care that there aren’t two
> publishers interleaving messages in the Kafka log.
>
> A typical way of dealing with these cases is to use a token associated to
> a lease to fence off the other publishers. For example, to demote an active
> publisher, another publisher could invoke a demote call and have the ISR
> leader replace the token. The lease of the token could be done directly
> with ZooKeeper or via the ISR leader. The condition to publish a message or
> a batch could be a combination of token verification and offset check.
>
> -Flavio
>
> > On 10 Aug 2015, at 00:15, Jun Rao <j...@confluent.io> wrote:
> >
> > Couple of other things.
> >
> > A. In the discussion, we talked about the usage of getting the latest
> high
> > watermark from the broker. Currently, the high watermark in a partition
> can
> > go back a bit for a short period of time during leader change. So, the
> high
> > watermark returned in the getOffset api is not 100% accurate. There is a
> > jira (KAFKA-2334) to track this issue.
> >
> > B. The proposal in the wiki is to put the expected offset in every
> message,
> > even when the messages are compressed. With Jiangjie's proposal of
> relative
> > offset, the expected offset probably can only be set at the shallow
> > compressed message level. We will need to think this through.
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> > On Tue, Aug 4, 2015 at 3:05 PM, Jiangjie Qin <j...@linkedin.com.invalid>
> > wrote:
> >
> >> Jun, I see. So this only applies to uncompressed messages. Maybe that is
> >> fine given most user will probably turn on compression?
> >> I think the first approach is a more general approach but from
> application
> >> point of view might harder to implement. I am thinking is it easier for
> the
> >> application simply have one producer for a partition and hash the
> message
> >> to producer. In that case, we can use the second approach but still have
> >> multiple producers. The downside might be potentially more memory
> >> footprint? We might also need to think about the fault tolerance a
> little
> >> bit more.
> >>
> >> Ben, I agree when everything goes fine, having pipeline turned on is
> >> probably fine. But if we take leader migration, broker down, message
> >> appended to leader but not follower, etc, etc into consideration, it is
> not
> >> clear to me how the conditional publish will still provide its guarantee
> >> without enforcing those strict settings.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >>
> >>
> >> On Mon, Aug 3, 2015 at 9:55 PM, Jun Rao <j...@confluent.io> wrote:
> >>
> >>> A couple of thoughts on the commit log use case. Suppose that we want
> to
> >>> maintain multiple replicas of a K/V store backed by a shared Kafka
> >>> topic/partition as a commit log. There are two possible ways to use
> Kafka
> >>> as a commit log.
> >>>
> >>> 1. The first approach allows multiple producers to publish to Kafka.
> Each
> >>> replica of the data store keeps reading from a Kafka topic/partition to
> >>> refresh the replica's view. Every time a replica gets an update to a
> key
> >>> from a client, it combines the update and the current value of the key
> in
> >>> its view and generates a post-value. It then does a conditional publish
> >> to
> >>> Kafka with the post-value. The update is successful if the conditional
> >>> publish succeeds. Otherwise, the replica has to recompute the
> post-value
> >>> (potentially after the replica's view is refreshed) and retry the
> >>> conditional publish. A potential issue with this approach is when there
> >> is
> >>> a transient failure during publishing to Kafka (e.g., the leader of the
> >>> partition changes). When this happens, the conditional publish will get
> >> an
> >>> error. The replica doesn't know whether the publish actually succeeded
> or
> >>> not. If we just blindly retry, it may not give the correct behavior
> >> (e.g.,
> >>> we could be applying +1 twice). So, not sure if conditional publish
> >> itself
> >>> is enough for this approach.
> >>>
> >>> 2. The second approach allows only a single producer to publish to
> Kafka.
> >>> We somehow elect one of the replicas to be the master that handles all
> >>> updates. Normally, we don't need conditional publish since there is a
> >>> single producer. Conditional publish can potentially be used to deal
> with
> >>> duplicates. If the master encounters the same transient failure as the
> >>> above, it can get the latest offset from the Kafka topic/partition to
> see
> >>> if the publish actually succeeded or not since it's the only producer.
> A
> >>> potential issue here is to handle the zombie master problem: if the
> >> master
> >>> has a soft failure and another master is elected, we need to prevent
> the
> >>> old master from publishing new data to Kafka. So, for this approach to
> >> work
> >>> properly, we need some kind of support of single writer in addition to
> >>> conditional publish.
> >>>
> >>>
> >>> Jiangjie,
> >>>
> >>> The issue with partial commit is the following. Say we have a batch of
> 10
> >>> uncompressed messages sent to the leader. The followers only fetched
> the
> >>> first 5 messages and then the leader dies. In this case, we only
> >> committed
> >>> 5 out of the 10 messages.
> >>>
> >>> Thanks,
> >>>
> >>> Jun
> >>>
> >>>
> >>> On Tue, Jul 28, 2015 at 1:16 AM, Daniel Schierbeck <
> >>> daniel.schierb...@gmail.com> wrote:
> >>>
> >>>> Jiangjie: I think giving users the possibility of defining a custom
> >>> policy
> >>>> for handling rejections is a good idea. For instance, this will allow
> >>> Kafka
> >>>> to act as an event store in an Event Sourcing application. If the
> >>> event(s)
> >>>> are rejected by the store, the original command may need to be
> >>> re-validated
> >>>> against the new state.
> >>>>
> >>>> On Tue, Jul 28, 2015 at 1:27 AM Jiangjie Qin
> <j...@linkedin.com.invalid
> >>>
> >>>> wrote:
> >>>>
> >>>>> @Ewen, good point about batching. Yes, it would be tricky if we want
> >> to
> >>>> do
> >>>>> a per-key conditional produce. My understanding is that the
> >>> prerequisite
> >>>> of
> >>>>> this KIP is:
> >>>>> 1. Single producer for each partition.
> >>>>> 2. Acks=-1, max.in.flight.request.per.connection=1,
> >>>> retries=SOME_BIG_NUMBER
> >>>>>
> >>>>> The major problem it tries to solve is exact once produce, i.e. solve
> >>> the
> >>>>> duplicates from producer side. In that case, a batch will be
> >> considered
> >>>> as
> >>>>> atomic. The only possibility of a batch got rejected should be it is
> >>>>> already appended. So the producer should just move on.
> >>>>>
> >>>>> It looks to me even a transient multiple producer scenario will cause
> >>>> issue
> >>>>> because user need to think about what should do if a request got
> >>> rejected
> >>>>> and the answer varies for different use cases.
> >>>>>
> >>>>> Thanks,
> >>>>>
> >>>>> Jiangjie (Becket) Qin
> >>>>>
> >>>>> On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin <b...@kirw.in> wrote:
> >>>>>
> >>>>>> So I had another look at the 'Idempotent Producer' proposal this
> >>>>>> afternoon, and made a few notes on how I think they compare; if
> >> I've
> >>>>>> made any mistakes, I'd be delighted if someone with more context on
> >>>>>> the idempotent producer design would correct me.
> >>>>>>
> >>>>>> As a first intuition, you can think of the 'conditional publish'
> >>>>>> proposal as the special case of the 'idempotent producer' idea,
> >> where
> >>>>>> there's just a single producer per-partition. The key observation
> >>> here
> >>>>>> is: if there's only one producer, you can conflate the 'sequence
> >>>>>> number' and the expected offset. The conditional publish proposal
> >>> uses
> >>>>>> existing Kafka offset APIs for roughly the same things as the
> >>>>>> idempotent producer proposal uses sequence numbers for -- eg.
> >> instead
> >>>>>> of having a "lease PID" API that returns the current sequence
> >> number,
> >>>>>> we can use the existing 'offset API' to retrieve the upcoming
> >> offset.
> >>>>>>
> >>>>>> Both proposals attempt to deal with the situation where there are
> >>>>>> transiently multiple publishers for the same partition (and PID).
> >> The
> >>>>>> idempotent producer setup tracks a generation id for each pid, and
> >>>>>> discards any writes with a generation id smaller than the latest
> >>>>>> value. Conditional publish is 'first write wins' -- and instead of
> >>>>>> dropping duplicates on the server, it returns an error to the
> >> client.
> >>>>>> The duplicate-handling behaviour (dropping vs. erroring) has some
> >>>>>> interesting consequences:
> >>>>>>
> >>>>>> - If all producers are producing the same stream of messages,
> >>> silently
> >>>>>> dropping duplicates on the server is more convenient. (Suppose we
> >>> have
> >>>>>> a batch of messages 0-9, and the high-water mark on the server is
> >> 7.
> >>>>>> Idempotent producer, as I read it, would append 7-9 to the
> >> partition
> >>>>>> and return success; meanwhile, conditional publish would fail the
> >>>>>> entire batch.)
> >>>>>>
> >>>>>> - If producers might be writing different streams of messages, the
> >>>>>> proposed behaviour of the idempotent producer is probably worse --
> >>>>>> since it can silently interleave messages from two different
> >>>>>> producers. This can be a problem for some commit-log style
> >> use-cases,
> >>>>>> since it can transform a valid series of operations into an invalid
> >>>>>> one.
> >>>>>>
> >>>>>> - Given the error-on-duplicate behaviour, it's possible to
> >> implement
> >>>>>> deduplication on the client. (Sketch: if a publish returns an error
> >>>>>> for some partition, fetch the upcoming offset / sequence number for
> >>>>>> that partition, and discard all messages with a smaller offset on
> >> the
> >>>>>> client before republishing.)
> >>>>>>
> >>>>>> I think this makes the erroring behaviour more general, though
> >>>>>> deduplicating saves a roundtrip or two at conflict time.
> >>>>>>
> >>>>>> I'm less clear about the behaviour of the generation id, or what
> >>>>>> happens when (say) two producers with the same generation id are
> >> spun
> >>>>>> up at the same time. I'd be interested in hearing other folks'
> >>>>>> comments on this.
> >>>>>>
> >>>>>> Ewen: I'm not sure I understand the questions well enough to answer
> >>>>>> properly, but some quick notes:
> >>>>>> - I don't think it makes sense to assign an expected offset without
> >>>>>> already having assigned a partition. If the producer code is doing
> >>> the
> >>>>>> partition assignment, it should probably do the offset assignment
> >>>>>> too... or we could just let application code handle both.
> >>>>>> - I'm not aware of any case where reassigning offsets to messages
> >>>>>> automatically after an offset mismatch makes sense: in the cases
> >>> we've
> >>>>>> discussed, it seems like either it's safe to drop duplicates, or we
> >>>>>> want to handle the error at the application level.
> >>>>>>
> >>>>>> I'm going to try and come with an idempotent-producer-type example
> >>>>>> that works with the draft patch in the next few days, so hopefully
> >>>>>> we'll have something more concrete to discuss. Otherwise -- if you
> >>>>>> have a clear idea of how eg. sequence number assignment would work
> >> in
> >>>>>> the idempotent-producer proposal, we could probably translate that
> >>>>>> over to get the equivalent for the conditional publish API.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Fri, Jul 24, 2015 at 2:16 AM, Ewen Cheslack-Postava
> >>>>>> <e...@confluent.io> wrote:
> >>>>>>> @Becket - for compressed batches, I think this just works out
> >> given
> >>>> the
> >>>>>> KIP
> >>>>>>> as described. Without the change you're referring to, it still
> >> only
> >>>>> makes
> >>>>>>> sense to batch messages with this KIP if all the expected offsets
> >>> are
> >>>>>>> sequential (else some messages are guaranteed to fail). I think
> >>> that
> >>>>>>> probably just works out, but raises an issue I brought up on the
> >>> KIP
> >>>>>> call.
> >>>>>>>
> >>>>>>> Batching can be a bit weird with this proposal. If you try to
> >> write
> >>>>> key A
> >>>>>>> and key B, the second operation is dependent on the first. Which
> >>>> means
> >>>>> to
> >>>>>>> make an effective client for this, we need to keep track of
> >>>>> per-partition
> >>>>>>> offsets so we can set expected offsets properly. For example, if
> >> A
> >>>> was
> >>>>>>> expected to publish at offset 10, then if B was published to the
> >>> same
> >>>>>>> partition, we need to make sure it's marked as expected offset 11
> >>>>>> (assuming
> >>>>>>> no subpartition high water marks). We either need to have the
> >>>>> application
> >>>>>>> keep track of this itself and set the offsets, which requires
> >> that
> >>> it
> >>>>>> know
> >>>>>>> about how keys map to partitions, or the client needs to manage
> >>> this
> >>>>>>> process. But if the client manages it, I think the client gets
> >>> quite
> >>>> a
> >>>>>> bit
> >>>>>>> more complicated. If the produce request containing A fails, what
> >>>>> happens
> >>>>>>> to B? Are there retries that somehow update the expected offset,
> >> or
> >>>> do
> >>>>> we
> >>>>>>> just give up since we know it's always going to fail with the
> >>>> expected
> >>>>>>> offset that was automatically assigned to it?
> >>>>>>>
> >>>>>>> One way to handle this is to use Yasuhiro's idea of increasing
> >> the
> >>>>>>> granularity of high watermarks using subpartitions. But I guess
> >> my
> >>>>>> question
> >>>>>>> is: if one producer client is writing many keys, and some of
> >> those
> >>>> keys
> >>>>>> are
> >>>>>>> produced to the same partition, and those messages are batched,
> >>> what
> >>>>>>> happens? Do we end up with lots of failed messages? Or do we have
> >>>>>>> complicated logic in the producer to figure out what the right
> >>>> expected
> >>>>>>> offset for each message is? Or do they all share the same base
> >>>> expected
> >>>>>>> offset as in the compressed case, in which case they all share
> >> the
> >>>> same
> >>>>>>> fate and subpartitioning doesn't help? Or is there a simpler
> >>> solution
> >>>>> I'm
> >>>>>>> just not seeing? Maybe this just disables batching entirely and
> >>>>>> throughput
> >>>>>>> isn't an issue in these cases?
> >>>>>>>
> >>>>>>> Sorry, I know that's probably not entirely clear, but that's
> >>> because
> >>>>> I'm
> >>>>>>> very uncertain of how batching works with this KIP.
> >>>>>>>
> >>>>>>>
> >>>>>>> On how this relates to other proposals: I think it might also be
> >>>>> helpful
> >>>>>> to
> >>>>>>> get an overview of all the proposals for relevant modifications
> >> to
> >>>>>>> producers/produce requests since many of these proposals are
> >>> possibly
> >>>>>>> alternatives (though some may not be mutually exclusive). Many
> >>> people
> >>>>>> don't
> >>>>>>> have all the context from the past couple of years of the
> >> project.
> >>>> Are
> >>>>>>> there any other relevant wikis or docs besides the following?
> >>>>>>>
> >>>>>>>
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
> >>>>>>>
> >>>>>>> -Ewen
> >>>>>>>
> >>>>>>> On Wed, Jul 22, 2015 at 11:18 AM, Gwen Shapira <
> >>>> gshap...@cloudera.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Tangent: I think we should complete the move of Produce / Fetch
> >>> RPC
> >>>> to
> >>>>>>>> the client libraries before we add more revisions to this
> >>> protocol.
> >>>>>>>>
> >>>>>>>> On Wed, Jul 22, 2015 at 11:02 AM, Jiangjie Qin
> >>>>>>>> <j...@linkedin.com.invalid> wrote:
> >>>>>>>>> I missed yesterday's KIP hangout. I'm currently working on
> >>> another
> >>>>> KIP
> >>>>>>>> for
> >>>>>>>>> enriched metadata of messages. Guozhang has already created a
> >>> wiki
> >>>>>> page
> >>>>>>>>> before (
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
> >>>>>>>> ).
> >>>>>>>>> We plan to fill the relative offset to the offset field in the
> >>>> batch
> >>>>>> sent
> >>>>>>>>> by producer to avoid broker side re-compression. The message
> >>>> offset
> >>>>>> would
> >>>>>>>>> become batch base offset + relative offset. I guess maybe the
> >>>>> expected
> >>>>>>>>> offset in KIP-27 can be only set for base offset? Would that
> >>>> affect
> >>>>>>>> certain
> >>>>>>>>> use cases?
> >>>>>>>>>
> >>>>>>>>> For Jun's comments, I am not sure I completely get it. I think
> >>> the
> >>>>>>>> producer
> >>>>>>>>> only sends one batch per partition in a request. So either
> >> that
> >>>>> batch
> >>>>>> is
> >>>>>>>>> appended or not. Why a batch would be partially committed?
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>>
> >>>>>>>>> Jiangjie (Becket) Qin
> >>>>>>>>>
> >>>>>>>>> On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin <b...@kirw.in>
> >>> wrote:
> >>>>>>>>>
> >>>>>>>>>> That's a fair point. I've added some imagined job logic to
> >> the
> >>>> KIP,
> >>>>>> so
> >>>>>>>>>> we can make sure the proposal stays in sync with the usages
> >>> we're
> >>>>>>>>>> discussing. (The logic is just a quick sketch for now -- I
> >>> expect
> >>>>>> I'll
> >>>>>>>>>> need to elaborate it as we get into more detail, or to
> >> address
> >>>>> other
> >>>>>>>>>> concerns...)
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao <j...@confluent.io>
> >>>>> wrote:
> >>>>>>>>>>> For 1, yes, when there is a transient leader change, it's
> >>>>>> guaranteed
> >>>>>>>>>> that a
> >>>>>>>>>>> prefix of the messages in a request will be committed.
> >>> However,
> >>>>> it
> >>>>>>>> seems
> >>>>>>>>>>> that the client needs to know what subset of messages are
> >>>>>> committed in
> >>>>>>>>>>> order to resume the sending. Then the question is how.
> >>>>>>>>>>>
> >>>>>>>>>>> As Flavio indicated, for the use cases that you listed, it
> >>>> would
> >>>>> be
> >>>>>>>>>> useful
> >>>>>>>>>>> to figure out the exact logic by using this feature. For
> >>>> example,
> >>>>>> in
> >>>>>>>> the
> >>>>>>>>>>> partition K/V store example, when we fail over to a new
> >>> writer
> >>>> to
> >>>>>> the
> >>>>>>>>>>> commit log, the zombie writer can publish new messages to
> >> the
> >>>> log
> >>>>>>>> after
> >>>>>>>>>> the
> >>>>>>>>>>> new writer takes over, but before it publishes any message.
> >>> We
> >>>>>>>> probably
> >>>>>>>>>>> need to outline how this case can be handled properly.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>>
> >>>>>>>>>>> Jun
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin <b...@kirw.in>
> >>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Jun,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for the close reading! Responses inline.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the write-up. The single producer use case
> >> you
> >>>>>> mentioned
> >>>>>>>>>> makes
> >>>>>>>>>>>>> sense. It would be useful to include that in the KIP
> >> wiki.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Great -- I'll make sure that the wiki is clear about this.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> 1. What happens when the leader of the partition changes
> >>> in
> >>>>> the
> >>>>>>>> middle
> >>>>>>>>>>>> of a
> >>>>>>>>>>>>> produce request? In this case, the producer client is
> >> not
> >>>> sure
> >>>>>>>> whether
> >>>>>>>>>>>> the
> >>>>>>>>>>>>> request succeeds or not. If there is only a single
> >> message
> >>>> in
> >>>>>> the
> >>>>>>>>>>>> request,
> >>>>>>>>>>>>> the producer can just resend the request. If it sees an
> >>>>>>>> OffsetMismatch
> >>>>>>>>>>>>> error, it knows that the previous send actually
> >> succeeded
> >>>> and
> >>>>>> can
> >>>>>>>>>> proceed
> >>>>>>>>>>>>> with the next write. This is nice since it not only
> >> allows
> >>>> the
> >>>>>>>>>> producer
> >>>>>>>>>>>> to
> >>>>>>>>>>>>> proceed during transient failures in the broker, it also
> >>>>> avoids
> >>>>>>>>>>>> duplicates
> >>>>>>>>>>>>> during producer resend. One caveat is when there are
> >>>> multiple
> >>>>>>>>>> messages in
> >>>>>>>>>>>>> the same partition in a produce request. The issue is
> >> that
> >>>> in
> >>>>>> our
> >>>>>>>>>> current
> >>>>>>>>>>>>> replication protocol, it's possible for some, but not
> >> all
> >>>>>> messages
> >>>>>>>> in
> >>>>>>>>>> the
> >>>>>>>>>>>>> request to be committed. This makes resend a bit harder
> >> to
> >>>>> deal
> >>>>>>>> with
> >>>>>>>>>>>> since
> >>>>>>>>>>>>> on receiving an OffsetMismatch error, it's not clear
> >> which
> >>>>>> messages
> >>>>>>>>>> have
> >>>>>>>>>>>>> been committed. One possibility is to expect that
> >>>> compression
> >>>>> is
> >>>>>>>>>> enabled,
> >>>>>>>>>>>>> in which case multiple messages are compressed into a
> >>> single
> >>>>>>>> message.
> >>>>>>>>>> I
> >>>>>>>>>>>> was
> >>>>>>>>>>>>> thinking that another possibility is for the broker to
> >>>> return
> >>>>>> the
> >>>>>>>>>> current
> >>>>>>>>>>>>> high watermark when sending an OffsetMismatch error.
> >> Based
> >>>> on
> >>>>>> this
> >>>>>>>>>> info,
> >>>>>>>>>>>>> the producer can resend the subset of messages that have
> >>> not
> >>>>>> been
> >>>>>>>>>>>>> committed. However, this may not work in a compacted
> >> topic
> >>>>> since
> >>>>>>>> there
> >>>>>>>>>>>> can
> >>>>>>>>>>>>> be holes in the offset.
> >>>>>>>>>>>>
> >>>>>>>>>>>> This is a excellent question. It's my understanding that
> >> at
> >>>>> least
> >>>>>> a
> >>>>>>>>>>>> *prefix* of messages will be committed (right?) -- which
> >>> seems
> >>>>> to
> >>>>>> be
> >>>>>>>>>>>> enough for many cases. I'll try and come up with a more
> >>>> concrete
> >>>>>>>>>>>> answer here.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> 2. Is this feature only intended to be used with ack =
> >>> all?
> >>>>> The
> >>>>>>>> client
> >>>>>>>>>>>>> doesn't get the offset with ack = 0. With ack = 1, it's
> >>>>> possible
> >>>>>>>> for a
> >>>>>>>>>>>>> previously acked message to be lost during leader
> >>>> transition,
> >>>>>> which
> >>>>>>>>>> will
> >>>>>>>>>>>>> make the client logic more complicated.
> >>>>>>>>>>>>
> >>>>>>>>>>>> It's true that acks = 0 doesn't seem to be particularly
> >>>> useful;
> >>>>> in
> >>>>>>>> all
> >>>>>>>>>>>> the cases I've come across, the client eventually wants to
> >>>> know
> >>>>>> about
> >>>>>>>>>>>> the mismatch error. However, it seems like there are some
> >>>> cases
> >>>>>> where
> >>>>>>>>>>>> acks = 1 would be fine -- eg. in a bulk load of a fixed
> >>>> dataset,
> >>>>>>>>>>>> losing messages during a leader transition just means you
> >>> need
> >>>>> to
> >>>>>>>>>>>> rewind / restart the load, which is not especially
> >>>> catastrophic.
> >>>>>> For
> >>>>>>>>>>>> many other interesting cases, acks = all is probably
> >>>> preferable.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> 3. How does the producer client know the offset to send
> >>> the
> >>>>>> first
> >>>>>>>>>>>> message?
> >>>>>>>>>>>>> Do we need to expose an API in producer to get the
> >> current
> >>>>> high
> >>>>>>>>>>>> watermark?
> >>>>>>>>>>>>
> >>>>>>>>>>>> You're right, it might be irritating to have to go through
> >>> the
> >>>>>>>>>>>> consumer API just for this. There are some cases where the
> >>>>> offsets
> >>>>>>>> are
> >>>>>>>>>>>> already available -- like the commit-log-for-KV-store
> >>> example
> >>>> --
> >>>>>> but
> >>>>>>>>>>>> in general, being able to get the offsets from the
> >> producer
> >>>>>> interface
> >>>>>>>>>>>> does sound convenient.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> We plan to have a KIP discussion meeting tomorrow at
> >> 11am
> >>>> PST.
> >>>>>>>> Perhaps
> >>>>>>>>>>>> you
> >>>>>>>>>>>>> can describe this KIP a bit then?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Sure, happy to join.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Jun
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin <
> >> b...@kirw.in
> >>>>
> >>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Just wanted to flag a little discussion that happened
> >> on
> >>>> the
> >>>>>>>> ticket:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> In particular, Yasuhiro Matsuda proposed an interesting
> >>>>>> variant on
> >>>>>>>>>>>>>> this that performs the offset check on the message key
> >>>>>> (instead of
> >>>>>>>>>>>>>> just the partition), with bounded space requirements,
> >> at
> >>>> the
> >>>>>> cost
> >>>>>>>> of
> >>>>>>>>>>>>>> potentially some spurious failures. (ie. the produce
> >>>> request
> >>>>>> may
> >>>>>>>> fail
> >>>>>>>>>>>>>> even if that particular key hasn't been updated
> >>> recently.)
> >>>>> This
> >>>>>>>>>>>>>> addresses a couple of the drawbacks of the per-key
> >>> approach
> >>>>>>>> mentioned
> >>>>>>>>>>>>>> at the bottom of the KIP.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin <
> >> b...@kirw.in
> >>>>
> >>>>>> wrote:
> >>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> So, perhaps it's worth adding a couple specific
> >>> examples
> >>>> of
> >>>>>>>> where
> >>>>>>>>>> this
> >>>>>>>>>>>>>>> feature is useful, to make this a bit more concrete:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> - Suppose I'm using Kafka as a commit log for a
> >>>> partitioned
> >>>>>> KV
> >>>>>>>>>> store,
> >>>>>>>>>>>>>>> like Samza or Pistachio (?) do. We bootstrap the
> >>> process
> >>>>>> state
> >>>>>>>> by
> >>>>>>>>>>>>>>> reading from that partition, and log all state
> >> updates
> >>> to
> >>>>>> that
> >>>>>>>>>>>>>>> partition when we're running. Now imagine that one of
> >>> my
> >>>>>>>> processes
> >>>>>>>>>>>>>>> locks up -- GC or similar -- and the system
> >> transitions
> >>>>> that
> >>>>>>>>>> partition
> >>>>>>>>>>>>>>> over to another node. When the GC is finished, the
> >> old
> >>>>>> 'owner'
> >>>>>>>> of
> >>>>>>>>>> that
> >>>>>>>>>>>>>>> partition might still be trying to write to the
> >> commit
> >>>> log
> >>>>> at
> >>>>>>>> the
> >>>>>>>>>> same
> >>>>>>>>>>>>>>> as the new one is. A process might detect this by
> >>>> noticing
> >>>>>> that
> >>>>>>>> the
> >>>>>>>>>>>>>>> offset of the published message is bigger than it
> >>> thought
> >>>>> the
> >>>>>>>>>> upcoming
> >>>>>>>>>>>>>>> offset was, which implies someone else has been
> >> writing
> >>>> to
> >>>>>> the
> >>>>>>>>>> log...
> >>>>>>>>>>>>>>> but by then it's too late, and the commit log is
> >>> already
> >>>>>>>> corrupt.
> >>>>>>>>>> With
> >>>>>>>>>>>>>>> a 'conditional produce', one of those processes will
> >>> have
> >>>>>> it's
> >>>>>>>>>> publish
> >>>>>>>>>>>>>>> request refused -- so we've avoided corrupting the
> >>> state.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> - Envision some copycat-like system, where we have
> >> some
> >>>>>> sharded
> >>>>>>>>>>>>>>> postgres setup and we're tailing each shard into its
> >>> own
> >>>>>>>> partition.
> >>>>>>>>>>>>>>> Normally, it's fairly easy to avoid duplicates here:
> >> we
> >>>> can
> >>>>>>>> track
> >>>>>>>>>>>>>>> which offset in the WAL corresponds to which offset
> >> in
> >>>>> Kafka,
> >>>>>>>> and
> >>>>>>>>>> we
> >>>>>>>>>>>>>>> know how many messages we've written to Kafka
> >> already,
> >>> so
> >>>>> the
> >>>>>>>>>> state is
> >>>>>>>>>>>>>>> very simple. However, it is possible that for a
> >> moment
> >>> --
> >>>>>> due to
> >>>>>>>>>>>>>>> rebalancing or operator error or some other thing --
> >>> two
> >>>>>>>> different
> >>>>>>>>>>>>>>> nodes are tailing the same postgres shard at once!
> >>>> Normally
> >>>>>> this
> >>>>>>>>>> would
> >>>>>>>>>>>>>>> introduce duplicate messages, but by specifying the
> >>>>> expected
> >>>>>>>>>> offset,
> >>>>>>>>>>>>>>> we can avoid this.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> So perhaps it's better to say that this is useful
> >> when
> >>> a
> >>>>>> single
> >>>>>>>>>>>>>>> producer is *expected*, but multiple producers are
> >>>>>> *possible*?
> >>>>>>>> (In
> >>>>>>>>>> the
> >>>>>>>>>>>>>>> same way that the high-level consumer normally has 1
> >>>>> consumer
> >>>>>>>> in a
> >>>>>>>>>>>>>>> group reading from a partition, but there are small
> >>>> windows
> >>>>>>>> where
> >>>>>>>>>> more
> >>>>>>>>>>>>>>> than one might be reading at the same time.) This is
> >>> also
> >>>>> the
> >>>>>>>>>> spirit
> >>>>>>>>>>>>>>> of the 'runtime cost' comment -- in the common case,
> >>>> where
> >>>>>>>> there is
> >>>>>>>>>>>>>>> little to no contention, there's no performance
> >>> overhead
> >>>>>>>> either. I
> >>>>>>>>>>>>>>> mentioned this a little in the Motivation section --
> >>>> maybe
> >>>>> I
> >>>>>>>> should
> >>>>>>>>>>>>>>> flesh that out a little bit?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> For me, the motivation to work this up was that I
> >> kept
> >>>>>> running
> >>>>>>>> into
> >>>>>>>>>>>>>>> cases, like the above, where the existing API was
> >>>>>>>>>> almost-but-not-quite
> >>>>>>>>>>>>>>> enough to give the guarantees I was looking for --
> >> and
> >>>> the
> >>>>>>>>>> extension
> >>>>>>>>>>>>>>> needed to handle those cases too was pretty small and
> >>>>>>>>>> natural-feeling.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh <
> >>>>>>>> asi...@cloudera.com
> >>>>>>>>>>>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>> Good concept. I have a question though.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Say there are two producers A and B. Both producers
> >>> are
> >>>>>>>> producing
> >>>>>>>>>> to
> >>>>>>>>>>>>>> same
> >>>>>>>>>>>>>>>> partition.
> >>>>>>>>>>>>>>>> - A sends a message with expected offset, x1
> >>>>>>>>>>>>>>>> - Broker accepts is and sends an Ack
> >>>>>>>>>>>>>>>> - B sends a message with expected offset, x1
> >>>>>>>>>>>>>>>> - Broker rejects it, sends nack
> >>>>>>>>>>>>>>>> - B sends message again with expected offset, x1+1
> >>>>>>>>>>>>>>>> - Broker accepts it and sends Ack
> >>>>>>>>>>>>>>>> I guess this is what this KIP suggests, right? If
> >> yes,
> >>>>> then
> >>>>>> how
> >>>>>>>>>> does
> >>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>> ensure that same message will not be written twice
> >>> when
> >>>>> two
> >>>>>>>>>> producers
> >>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>> producing to same partition? Producer on receiving a
> >>>> nack
> >>>>>> will
> >>>>>>>> try
> >>>>>>>>>>>> again
> >>>>>>>>>>>>>>>> with next offset and will keep doing so till the
> >>> message
> >>>>> is
> >>>>>>>>>> accepted.
> >>>>>>>>>>>>>> Am I
> >>>>>>>>>>>>>>>> missing something?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Also, you have mentioned on KIP, "it imposes little
> >> to
> >>>> no
> >>>>>>>> runtime
> >>>>>>>>>>>> cost
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>> memory or time", I think that is not true for time.
> >>> With
> >>>>>> this
> >>>>>>>>>>>> approach
> >>>>>>>>>>>>>>>> producers' performance will reduce proportionally to
> >>>>> number
> >>>>>> of
> >>>>>>>>>>>> producers
> >>>>>>>>>>>>>>>> writing to same partition. Please correct me if I am
> >>>>> missing
> >>>>>>>> out
> >>>>>>>>>>>>>> something.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat <
> >>>>>>>>>>>>>>>> gharatmayures...@gmail.com> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> If we have 2 producers producing to a partition,
> >> they
> >>>> can
> >>>>>> be
> >>>>>>>> out
> >>>>>>>>>> of
> >>>>>>>>>>>>>> order,
> >>>>>>>>>>>>>>>>> then how does one producer know what offset to
> >> expect
> >>>> as
> >>>>> it
> >>>>>>>> does
> >>>>>>>>>> not
> >>>>>>>>>>>>>>>>> interact with other producer?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Can you give an example flow that explains how it
> >>> works
> >>>>>> with
> >>>>>>>>>> single
> >>>>>>>>>>>>>>>>> producer and with multiple producers?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Mayuresh
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira
> >> <
> >>>>>>>>>>>>>>>>> fpjunque...@yahoo.com.invalid> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I like this feature, it reminds me of conditional
> >>>>>> updates in
> >>>>>>>>>>>>>> zookeeper.
> >>>>>>>>>>>>>>>>>> I'm not sure if it'd be best to have some
> >> mechanism
> >>>> for
> >>>>>>>> fencing
> >>>>>>>>>>>>>> rather
> >>>>>>>>>>>>>>>>> than
> >>>>>>>>>>>>>>>>>> a conditional write like you're proposing. The
> >>> reason
> >>>>> I'm
> >>>>>>>>>> saying
> >>>>>>>>>>>>>> this is
> >>>>>>>>>>>>>>>>>> that the conditional write applies to requests
> >>>>>> individually,
> >>>>>>>>>>>> while it
> >>>>>>>>>>>>>>>>>> sounds like you want to make sure that there is a
> >>>>> single
> >>>>>>>> client
> >>>>>>>>>>>>>> writing
> >>>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>> over multiple requests.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> -Flavio
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On 17 Jul 2015, at 07:30, Ben Kirwin <
> >>> b...@kirw.in>
> >>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hi there,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I just added a KIP for a 'conditional publish'
> >>>>>> operation:
> >>>>>>>> a
> >>>>>>>>>>>> simple
> >>>>>>>>>>>>>>>>>>> CAS-like mechanism for the Kafka producer. The
> >>> wiki
> >>>>>> page
> >>>>>>>> is
> >>>>>>>>>>>> here:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> And there's some previous discussion on the
> >>> ticket
> >>>>> and
> >>>>>> the
> >>>>>>>>>> users
> >>>>>>>>>>>>>> list:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >> https://issues.apache.org/jira/browse/KAFKA-2260
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> As always, comments and suggestions are very
> >>>> welcome.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>> Ben
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>> -Regards,
> >>>>>>>>>>>>>>>>> Mayuresh R. Gharat
> >>>>>>>>>>>>>>>>> (862) 250-7125
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>> Ashish
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Thanks,
> >>>>>>> Ewen
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>

Reply via email to