Hey Jun,

Yeah I think Ben is right, both these cases are covered. The algorithm is
something like

while(true) {
  v = get_local_val(key)
  v' = modify(v)
  try {
    log_to_kafka(v')
    put_local_val(k, v')
    break
  } catch(CasFailureException e) {
    warn("optimistic lock failure)
  }

}

What I have yet to see is a complete design that would make use of this. I
think we have two use cases this might (or might not) cover, a distributed
log-centric data system and an event-sourced app. Both are actually kind of
the same. I think what I haven't really seen is a working through of the
details.

I think it is important to think through this use case end-to-end. i.e. how
is concurrency handled? what about other errors? what about request
pipelining? how would queries work?

Basically someone should write out notes on how to implement a key-value
store with a kafka commit log assuming the existence of this feature and
making use of something like RocksDB. I think that would uncover any
problems that remain.

-Jay

On Tue, Aug 4, 2015 at 12:46 AM, Ben Kirwin <b...@kirw.in> wrote:

> This is a great summary of the commit log options. Some additional
> comments:
>
> 1. One way to handle a transient failure is to treat it the same way
> as a conditional publish failure: recompute the post-value before
> retrying. (I believe this is enough to make this approach work
> correctly.)
>
> 2. You can think of 2 as an 'optimization' of 1: the election
> mechanism is there to ensure that the conditional publish failures
> happen very rarely. When there are no conflicts, the conditional
> publish is essentially free.
>
> I guess I think of the zombie master problem like this: given some
> window of time where two nodes both think they are the master,
> conditional publish is enough to ensure that only one of the two will
> successfully publish. However, it's not enough to ensure that the
> 'new' master is the successful one. This might cause the leadership
> transition to happen a bit later than it would otherwise, but it
> doesn't seem to actually impact correctness.
>
> On Tue, Aug 4, 2015 at 12:55 AM, Jun Rao <j...@confluent.io> wrote:
> > A couple of thoughts on the commit log use case. Suppose that we want to
> > maintain multiple replicas of a K/V store backed by a shared Kafka
> > topic/partition as a commit log. There are two possible ways to use Kafka
> > as a commit log.
> >
> > 1. The first approach allows multiple producers to publish to Kafka. Each
> > replica of the data store keeps reading from a Kafka topic/partition to
> > refresh the replica's view. Every time a replica gets an update to a key
> > from a client, it combines the update and the current value of the key in
> > its view and generates a post-value. It then does a conditional publish
> to
> > Kafka with the post-value. The update is successful if the conditional
> > publish succeeds. Otherwise, the replica has to recompute the post-value
> > (potentially after the replica's view is refreshed) and retry the
> > conditional publish. A potential issue with this approach is when there
> is
> > a transient failure during publishing to Kafka (e.g., the leader of the
> > partition changes). When this happens, the conditional publish will get
> an
> > error. The replica doesn't know whether the publish actually succeeded or
> > not. If we just blindly retry, it may not give the correct behavior
> (e.g.,
> > we could be applying +1 twice). So, not sure if conditional publish
> itself
> > is enough for this approach.
> >
> > 2. The second approach allows only a single producer to publish to Kafka.
> > We somehow elect one of the replicas to be the master that handles all
> > updates. Normally, we don't need conditional publish since there is a
> > single producer. Conditional publish can potentially be used to deal with
> > duplicates. If the master encounters the same transient failure as the
> > above, it can get the latest offset from the Kafka topic/partition to see
> > if the publish actually succeeded or not since it's the only producer. A
> > potential issue here is to handle the zombie master problem: if the
> master
> > has a soft failure and another master is elected, we need to prevent the
> > old master from publishing new data to Kafka. So, for this approach to
> work
> > properly, we need some kind of support of single writer in addition to
> > conditional publish.
> >
> >
> > Jiangjie,
> >
> > The issue with partial commit is the following. Say we have a batch of 10
> > uncompressed messages sent to the leader. The followers only fetched the
> > first 5 messages and then the leader dies. In this case, we only
> committed
> > 5 out of the 10 messages.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Jul 28, 2015 at 1:16 AM, Daniel Schierbeck <
> > daniel.schierb...@gmail.com> wrote:
> >
> >> Jiangjie: I think giving users the possibility of defining a custom
> policy
> >> for handling rejections is a good idea. For instance, this will allow
> Kafka
> >> to act as an event store in an Event Sourcing application. If the
> event(s)
> >> are rejected by the store, the original command may need to be
> re-validated
> >> against the new state.
> >>
> >> On Tue, Jul 28, 2015 at 1:27 AM Jiangjie Qin <j...@linkedin.com.invalid
> >
> >> wrote:
> >>
> >> > @Ewen, good point about batching. Yes, it would be tricky if we want
> to
> >> do
> >> > a per-key conditional produce. My understanding is that the
> prerequisite
> >> of
> >> > this KIP is:
> >> > 1. Single producer for each partition.
> >> > 2. Acks=-1, max.in.flight.request.per.connection=1,
> >> retries=SOME_BIG_NUMBER
> >> >
> >> > The major problem it tries to solve is exact once produce, i.e. solve
> the
> >> > duplicates from producer side. In that case, a batch will be
> considered
> >> as
> >> > atomic. The only possibility of a batch got rejected should be it is
> >> > already appended. So the producer should just move on.
> >> >
> >> > It looks to me even a transient multiple producer scenario will cause
> >> issue
> >> > because user need to think about what should do if a request got
> rejected
> >> > and the answer varies for different use cases.
> >> >
> >> > Thanks,
> >> >
> >> > Jiangjie (Becket) Qin
> >> >
> >> > On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin <b...@kirw.in> wrote:
> >> >
> >> > > So I had another look at the 'Idempotent Producer' proposal this
> >> > > afternoon, and made a few notes on how I think they compare; if I've
> >> > > made any mistakes, I'd be delighted if someone with more context on
> >> > > the idempotent producer design would correct me.
> >> > >
> >> > > As a first intuition, you can think of the 'conditional publish'
> >> > > proposal as the special case of the 'idempotent producer' idea,
> where
> >> > > there's just a single producer per-partition. The key observation
> here
> >> > > is: if there's only one producer, you can conflate the 'sequence
> >> > > number' and the expected offset. The conditional publish proposal
> uses
> >> > > existing Kafka offset APIs for roughly the same things as the
> >> > > idempotent producer proposal uses sequence numbers for -- eg.
> instead
> >> > > of having a "lease PID" API that returns the current sequence
> number,
> >> > > we can use the existing 'offset API' to retrieve the upcoming
> offset.
> >> > >
> >> > > Both proposals attempt to deal with the situation where there are
> >> > > transiently multiple publishers for the same partition (and PID).
> The
> >> > > idempotent producer setup tracks a generation id for each pid, and
> >> > > discards any writes with a generation id smaller than the latest
> >> > > value. Conditional publish is 'first write wins' -- and instead of
> >> > > dropping duplicates on the server, it returns an error to the
> client.
> >> > > The duplicate-handling behaviour (dropping vs. erroring) has some
> >> > > interesting consequences:
> >> > >
> >> > > - If all producers are producing the same stream of messages,
> silently
> >> > > dropping duplicates on the server is more convenient. (Suppose we
> have
> >> > > a batch of messages 0-9, and the high-water mark on the server is 7.
> >> > > Idempotent producer, as I read it, would append 7-9 to the partition
> >> > > and return success; meanwhile, conditional publish would fail the
> >> > > entire batch.)
> >> > >
> >> > > - If producers might be writing different streams of messages, the
> >> > > proposed behaviour of the idempotent producer is probably worse --
> >> > > since it can silently interleave messages from two different
> >> > > producers. This can be a problem for some commit-log style
> use-cases,
> >> > > since it can transform a valid series of operations into an invalid
> >> > > one.
> >> > >
> >> > > - Given the error-on-duplicate behaviour, it's possible to implement
> >> > > deduplication on the client. (Sketch: if a publish returns an error
> >> > > for some partition, fetch the upcoming offset / sequence number for
> >> > > that partition, and discard all messages with a smaller offset on
> the
> >> > > client before republishing.)
> >> > >
> >> > > I think this makes the erroring behaviour more general, though
> >> > > deduplicating saves a roundtrip or two at conflict time.
> >> > >
> >> > > I'm less clear about the behaviour of the generation id, or what
> >> > > happens when (say) two producers with the same generation id are
> spun
> >> > > up at the same time. I'd be interested in hearing other folks'
> >> > > comments on this.
> >> > >
> >> > > Ewen: I'm not sure I understand the questions well enough to answer
> >> > > properly, but some quick notes:
> >> > > - I don't think it makes sense to assign an expected offset without
> >> > > already having assigned a partition. If the producer code is doing
> the
> >> > > partition assignment, it should probably do the offset assignment
> >> > > too... or we could just let application code handle both.
> >> > > - I'm not aware of any case where reassigning offsets to messages
> >> > > automatically after an offset mismatch makes sense: in the cases
> we've
> >> > > discussed, it seems like either it's safe to drop duplicates, or we
> >> > > want to handle the error at the application level.
> >> > >
> >> > > I'm going to try and come with an idempotent-producer-type example
> >> > > that works with the draft patch in the next few days, so hopefully
> >> > > we'll have something more concrete to discuss. Otherwise -- if you
> >> > > have a clear idea of how eg. sequence number assignment would work
> in
> >> > > the idempotent-producer proposal, we could probably translate that
> >> > > over to get the equivalent for the conditional publish API.
> >> > >
> >> > >
> >> > >
> >> > > On Fri, Jul 24, 2015 at 2:16 AM, Ewen Cheslack-Postava
> >> > > <e...@confluent.io> wrote:
> >> > > > @Becket - for compressed batches, I think this just works out
> given
> >> the
> >> > > KIP
> >> > > > as described. Without the change you're referring to, it still
> only
> >> > makes
> >> > > > sense to batch messages with this KIP if all the expected offsets
> are
> >> > > > sequential (else some messages are guaranteed to fail). I think
> that
> >> > > > probably just works out, but raises an issue I brought up on the
> KIP
> >> > > call.
> >> > > >
> >> > > > Batching can be a bit weird with this proposal. If you try to
> write
> >> > key A
> >> > > > and key B, the second operation is dependent on the first. Which
> >> means
> >> > to
> >> > > > make an effective client for this, we need to keep track of
> >> > per-partition
> >> > > > offsets so we can set expected offsets properly. For example, if A
> >> was
> >> > > > expected to publish at offset 10, then if B was published to the
> same
> >> > > > partition, we need to make sure it's marked as expected offset 11
> >> > > (assuming
> >> > > > no subpartition high water marks). We either need to have the
> >> > application
> >> > > > keep track of this itself and set the offsets, which requires
> that it
> >> > > know
> >> > > > about how keys map to partitions, or the client needs to manage
> this
> >> > > > process. But if the client manages it, I think the client gets
> quite
> >> a
> >> > > bit
> >> > > > more complicated. If the produce request containing A fails, what
> >> > happens
> >> > > > to B? Are there retries that somehow update the expected offset,
> or
> >> do
> >> > we
> >> > > > just give up since we know it's always going to fail with the
> >> expected
> >> > > > offset that was automatically assigned to it?
> >> > > >
> >> > > > One way to handle this is to use Yasuhiro's idea of increasing the
> >> > > > granularity of high watermarks using subpartitions. But I guess my
> >> > > question
> >> > > > is: if one producer client is writing many keys, and some of those
> >> keys
> >> > > are
> >> > > > produced to the same partition, and those messages are batched,
> what
> >> > > > happens? Do we end up with lots of failed messages? Or do we have
> >> > > > complicated logic in the producer to figure out what the right
> >> expected
> >> > > > offset for each message is? Or do they all share the same base
> >> expected
> >> > > > offset as in the compressed case, in which case they all share the
> >> same
> >> > > > fate and subpartitioning doesn't help? Or is there a simpler
> solution
> >> > I'm
> >> > > > just not seeing? Maybe this just disables batching entirely and
> >> > > throughput
> >> > > > isn't an issue in these cases?
> >> > > >
> >> > > > Sorry, I know that's probably not entirely clear, but that's
> because
> >> > I'm
> >> > > > very uncertain of how batching works with this KIP.
> >> > > >
> >> > > >
> >> > > > On how this relates to other proposals: I think it might also be
> >> > helpful
> >> > > to
> >> > > > get an overview of all the proposals for relevant modifications to
> >> > > > producers/produce requests since many of these proposals are
> possibly
> >> > > > alternatives (though some may not be mutually exclusive). Many
> people
> >> > > don't
> >> > > > have all the context from the past couple of years of the project.
> >> Are
> >> > > > there any other relevant wikis or docs besides the following?
> >> > > >
> >> > > >
> >> https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
> >> > > >
> >> > > > -Ewen
> >> > > >
> >> > > > On Wed, Jul 22, 2015 at 11:18 AM, Gwen Shapira <
> >> gshap...@cloudera.com>
> >> > > > wrote:
> >> > > >
> >> > > >> Tangent: I think we should complete the move of Produce / Fetch
> RPC
> >> to
> >> > > >> the client libraries before we add more revisions to this
> protocol.
> >> > > >>
> >> > > >> On Wed, Jul 22, 2015 at 11:02 AM, Jiangjie Qin
> >> > > >> <j...@linkedin.com.invalid> wrote:
> >> > > >> > I missed yesterday's KIP hangout. I'm currently working on
> another
> >> > KIP
> >> > > >> for
> >> > > >> > enriched metadata of messages. Guozhang has already created a
> wiki
> >> > > page
> >> > > >> > before (
> >> > > >> >
> >> > > >>
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
> >> > > >> ).
> >> > > >> > We plan to fill the relative offset to the offset field in the
> >> batch
> >> > > sent
> >> > > >> > by producer to avoid broker side re-compression. The message
> >> offset
> >> > > would
> >> > > >> > become batch base offset + relative offset. I guess maybe the
> >> > expected
> >> > > >> > offset in KIP-27 can be only set for base offset? Would that
> >> affect
> >> > > >> certain
> >> > > >> > use cases?
> >> > > >> >
> >> > > >> > For Jun's comments, I am not sure I completely get it. I think
> the
> >> > > >> producer
> >> > > >> > only sends one batch per partition in a request. So either that
> >> > batch
> >> > > is
> >> > > >> > appended or not. Why a batch would be partially committed?
> >> > > >> >
> >> > > >> > Thanks,
> >> > > >> >
> >> > > >> > Jiangjie (Becket) Qin
> >> > > >> >
> >> > > >> > On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin <b...@kirw.in>
> wrote:
> >> > > >> >
> >> > > >> >> That's a fair point. I've added some imagined job logic to the
> >> KIP,
> >> > > so
> >> > > >> >> we can make sure the proposal stays in sync with the usages
> we're
> >> > > >> >> discussing. (The logic is just a quick sketch for now -- I
> expect
> >> > > I'll
> >> > > >> >> need to elaborate it as we get into more detail, or to address
> >> > other
> >> > > >> >> concerns...)
> >> > > >> >>
> >> > > >> >> On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao <j...@confluent.io>
> >> > wrote:
> >> > > >> >> > For 1, yes, when there is a transient leader change, it's
> >> > > guaranteed
> >> > > >> >> that a
> >> > > >> >> > prefix of the messages in a request will be committed.
> However,
> >> > it
> >> > > >> seems
> >> > > >> >> > that the client needs to know what subset of messages are
> >> > > committed in
> >> > > >> >> > order to resume the sending. Then the question is how.
> >> > > >> >> >
> >> > > >> >> > As Flavio indicated, for the use cases that you listed, it
> >> would
> >> > be
> >> > > >> >> useful
> >> > > >> >> > to figure out the exact logic by using this feature. For
> >> example,
> >> > > in
> >> > > >> the
> >> > > >> >> > partition K/V store example, when we fail over to a new
> writer
> >> to
> >> > > the
> >> > > >> >> > commit log, the zombie writer can publish new messages to
> the
> >> log
> >> > > >> after
> >> > > >> >> the
> >> > > >> >> > new writer takes over, but before it publishes any message.
> We
> >> > > >> probably
> >> > > >> >> > need to outline how this case can be handled properly.
> >> > > >> >> >
> >> > > >> >> > Thanks,
> >> > > >> >> >
> >> > > >> >> > Jun
> >> > > >> >> >
> >> > > >> >> > On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin <b...@kirw.in>
> >> > wrote:
> >> > > >> >> >
> >> > > >> >> >> Hi Jun,
> >> > > >> >> >>
> >> > > >> >> >> Thanks for the close reading! Responses inline.
> >> > > >> >> >>
> >> > > >> >> >> > Thanks for the write-up. The single producer use case you
> >> > > mentioned
> >> > > >> >> makes
> >> > > >> >> >> > sense. It would be useful to include that in the KIP
> wiki.
> >> > > >> >> >>
> >> > > >> >> >> Great -- I'll make sure that the wiki is clear about this.
> >> > > >> >> >>
> >> > > >> >> >> > 1. What happens when the leader of the partition changes
> in
> >> > the
> >> > > >> middle
> >> > > >> >> >> of a
> >> > > >> >> >> > produce request? In this case, the producer client is not
> >> sure
> >> > > >> whether
> >> > > >> >> >> the
> >> > > >> >> >> > request succeeds or not. If there is only a single
> message
> >> in
> >> > > the
> >> > > >> >> >> request,
> >> > > >> >> >> > the producer can just resend the request. If it sees an
> >> > > >> OffsetMismatch
> >> > > >> >> >> > error, it knows that the previous send actually succeeded
> >> and
> >> > > can
> >> > > >> >> proceed
> >> > > >> >> >> > with the next write. This is nice since it not only
> allows
> >> the
> >> > > >> >> producer
> >> > > >> >> >> to
> >> > > >> >> >> > proceed during transient failures in the broker, it also
> >> > avoids
> >> > > >> >> >> duplicates
> >> > > >> >> >> > during producer resend. One caveat is when there are
> >> multiple
> >> > > >> >> messages in
> >> > > >> >> >> > the same partition in a produce request. The issue is
> that
> >> in
> >> > > our
> >> > > >> >> current
> >> > > >> >> >> > replication protocol, it's possible for some, but not all
> >> > > messages
> >> > > >> in
> >> > > >> >> the
> >> > > >> >> >> > request to be committed. This makes resend a bit harder
> to
> >> > deal
> >> > > >> with
> >> > > >> >> >> since
> >> > > >> >> >> > on receiving an OffsetMismatch error, it's not clear
> which
> >> > > messages
> >> > > >> >> have
> >> > > >> >> >> > been committed. One possibility is to expect that
> >> compression
> >> > is
> >> > > >> >> enabled,
> >> > > >> >> >> > in which case multiple messages are compressed into a
> single
> >> > > >> message.
> >> > > >> >> I
> >> > > >> >> >> was
> >> > > >> >> >> > thinking that another possibility is for the broker to
> >> return
> >> > > the
> >> > > >> >> current
> >> > > >> >> >> > high watermark when sending an OffsetMismatch error.
> Based
> >> on
> >> > > this
> >> > > >> >> info,
> >> > > >> >> >> > the producer can resend the subset of messages that have
> not
> >> > > been
> >> > > >> >> >> > committed. However, this may not work in a compacted
> topic
> >> > since
> >> > > >> there
> >> > > >> >> >> can
> >> > > >> >> >> > be holes in the offset.
> >> > > >> >> >>
> >> > > >> >> >> This is a excellent question. It's my understanding that at
> >> > least
> >> > > a
> >> > > >> >> >> *prefix* of messages will be committed (right?) -- which
> seems
> >> > to
> >> > > be
> >> > > >> >> >> enough for many cases. I'll try and come up with a more
> >> concrete
> >> > > >> >> >> answer here.
> >> > > >> >> >>
> >> > > >> >> >> > 2. Is this feature only intended to be used with ack =
> all?
> >> > The
> >> > > >> client
> >> > > >> >> >> > doesn't get the offset with ack = 0. With ack = 1, it's
> >> > possible
> >> > > >> for a
> >> > > >> >> >> > previously acked message to be lost during leader
> >> transition,
> >> > > which
> >> > > >> >> will
> >> > > >> >> >> > make the client logic more complicated.
> >> > > >> >> >>
> >> > > >> >> >> It's true that acks = 0 doesn't seem to be particularly
> >> useful;
> >> > in
> >> > > >> all
> >> > > >> >> >> the cases I've come across, the client eventually wants to
> >> know
> >> > > about
> >> > > >> >> >> the mismatch error. However, it seems like there are some
> >> cases
> >> > > where
> >> > > >> >> >> acks = 1 would be fine -- eg. in a bulk load of a fixed
> >> dataset,
> >> > > >> >> >> losing messages during a leader transition just means you
> need
> >> > to
> >> > > >> >> >> rewind / restart the load, which is not especially
> >> catastrophic.
> >> > > For
> >> > > >> >> >> many other interesting cases, acks = all is probably
> >> preferable.
> >> > > >> >> >>
> >> > > >> >> >> > 3. How does the producer client know the offset to send
> the
> >> > > first
> >> > > >> >> >> message?
> >> > > >> >> >> > Do we need to expose an API in producer to get the
> current
> >> > high
> >> > > >> >> >> watermark?
> >> > > >> >> >>
> >> > > >> >> >> You're right, it might be irritating to have to go through
> the
> >> > > >> >> >> consumer API just for this. There are some cases where the
> >> > offsets
> >> > > >> are
> >> > > >> >> >> already available -- like the commit-log-for-KV-store
> example
> >> --
> >> > > but
> >> > > >> >> >> in general, being able to get the offsets from the producer
> >> > > interface
> >> > > >> >> >> does sound convenient.
> >> > > >> >> >>
> >> > > >> >> >> > We plan to have a KIP discussion meeting tomorrow at 11am
> >> PST.
> >> > > >> Perhaps
> >> > > >> >> >> you
> >> > > >> >> >> > can describe this KIP a bit then?
> >> > > >> >> >>
> >> > > >> >> >> Sure, happy to join.
> >> > > >> >> >>
> >> > > >> >> >> > Thanks,
> >> > > >> >> >> >
> >> > > >> >> >> > Jun
> >> > > >> >> >> >
> >> > > >> >> >> >
> >> > > >> >> >> >
> >> > > >> >> >> > On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin <
> b...@kirw.in>
> >> > > wrote:
> >> > > >> >> >> >
> >> > > >> >> >> >> Just wanted to flag a little discussion that happened on
> >> the
> >> > > >> ticket:
> >> > > >> >> >> >>
> >> > > >> >> >> >>
> >> > > >> >> >>
> >> > > >> >>
> >> > > >>
> >> > >
> >> >
> >>
> https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259
> >> > > >> >> >> >>
> >> > > >> >> >> >> In particular, Yasuhiro Matsuda proposed an interesting
> >> > > variant on
> >> > > >> >> >> >> this that performs the offset check on the message key
> >> > > (instead of
> >> > > >> >> >> >> just the partition), with bounded space requirements, at
> >> the
> >> > > cost
> >> > > >> of
> >> > > >> >> >> >> potentially some spurious failures. (ie. the produce
> >> request
> >> > > may
> >> > > >> fail
> >> > > >> >> >> >> even if that particular key hasn't been updated
> recently.)
> >> > This
> >> > > >> >> >> >> addresses a couple of the drawbacks of the per-key
> approach
> >> > > >> mentioned
> >> > > >> >> >> >> at the bottom of the KIP.
> >> > > >> >> >> >>
> >> > > >> >> >> >> On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin <
> b...@kirw.in>
> >> > > wrote:
> >> > > >> >> >> >> > Hi all,
> >> > > >> >> >> >> >
> >> > > >> >> >> >> > So, perhaps it's worth adding a couple specific
> examples
> >> of
> >> > > >> where
> >> > > >> >> this
> >> > > >> >> >> >> > feature is useful, to make this a bit more concrete:
> >> > > >> >> >> >> >
> >> > > >> >> >> >> > - Suppose I'm using Kafka as a commit log for a
> >> partitioned
> >> > > KV
> >> > > >> >> store,
> >> > > >> >> >> >> > like Samza or Pistachio (?) do. We bootstrap the
> process
> >> > > state
> >> > > >> by
> >> > > >> >> >> >> > reading from that partition, and log all state
> updates to
> >> > > that
> >> > > >> >> >> >> > partition when we're running. Now imagine that one of
> my
> >> > > >> processes
> >> > > >> >> >> >> > locks up -- GC or similar -- and the system
> transitions
> >> > that
> >> > > >> >> partition
> >> > > >> >> >> >> > over to another node. When the GC is finished, the old
> >> > > 'owner'
> >> > > >> of
> >> > > >> >> that
> >> > > >> >> >> >> > partition might still be trying to write to the commit
> >> log
> >> > at
> >> > > >> the
> >> > > >> >> same
> >> > > >> >> >> >> > as the new one is. A process might detect this by
> >> noticing
> >> > > that
> >> > > >> the
> >> > > >> >> >> >> > offset of the published message is bigger than it
> thought
> >> > the
> >> > > >> >> upcoming
> >> > > >> >> >> >> > offset was, which implies someone else has been
> writing
> >> to
> >> > > the
> >> > > >> >> log...
> >> > > >> >> >> >> > but by then it's too late, and the commit log is
> already
> >> > > >> corrupt.
> >> > > >> >> With
> >> > > >> >> >> >> > a 'conditional produce', one of those processes will
> have
> >> > > it's
> >> > > >> >> publish
> >> > > >> >> >> >> > request refused -- so we've avoided corrupting the
> state.
> >> > > >> >> >> >> >
> >> > > >> >> >> >> > - Envision some copycat-like system, where we have
> some
> >> > > sharded
> >> > > >> >> >> >> > postgres setup and we're tailing each shard into its
> own
> >> > > >> partition.
> >> > > >> >> >> >> > Normally, it's fairly easy to avoid duplicates here:
> we
> >> can
> >> > > >> track
> >> > > >> >> >> >> > which offset in the WAL corresponds to which offset in
> >> > Kafka,
> >> > > >> and
> >> > > >> >> we
> >> > > >> >> >> >> > know how many messages we've written to Kafka
> already, so
> >> > the
> >> > > >> >> state is
> >> > > >> >> >> >> > very simple. However, it is possible that for a
> moment --
> >> > > due to
> >> > > >> >> >> >> > rebalancing or operator error or some other thing --
> two
> >> > > >> different
> >> > > >> >> >> >> > nodes are tailing the same postgres shard at once!
> >> Normally
> >> > > this
> >> > > >> >> would
> >> > > >> >> >> >> > introduce duplicate messages, but by specifying the
> >> > expected
> >> > > >> >> offset,
> >> > > >> >> >> >> > we can avoid this.
> >> > > >> >> >> >> >
> >> > > >> >> >> >> > So perhaps it's better to say that this is useful
> when a
> >> > > single
> >> > > >> >> >> >> > producer is *expected*, but multiple producers are
> >> > > *possible*?
> >> > > >> (In
> >> > > >> >> the
> >> > > >> >> >> >> > same way that the high-level consumer normally has 1
> >> > consumer
> >> > > >> in a
> >> > > >> >> >> >> > group reading from a partition, but there are small
> >> windows
> >> > > >> where
> >> > > >> >> more
> >> > > >> >> >> >> > than one might be reading at the same time.) This is
> also
> >> > the
> >> > > >> >> spirit
> >> > > >> >> >> >> > of the 'runtime cost' comment -- in the common case,
> >> where
> >> > > >> there is
> >> > > >> >> >> >> > little to no contention, there's no performance
> overhead
> >> > > >> either. I
> >> > > >> >> >> >> > mentioned this a little in the Motivation section --
> >> maybe
> >> > I
> >> > > >> should
> >> > > >> >> >> >> > flesh that out a little bit?
> >> > > >> >> >> >> >
> >> > > >> >> >> >> > For me, the motivation to work this up was that I kept
> >> > > running
> >> > > >> into
> >> > > >> >> >> >> > cases, like the above, where the existing API was
> >> > > >> >> almost-but-not-quite
> >> > > >> >> >> >> > enough to give the guarantees I was looking for -- and
> >> the
> >> > > >> >> extension
> >> > > >> >> >> >> > needed to handle those cases too was pretty small and
> >> > > >> >> natural-feeling.
> >> > > >> >> >> >> >
> >> > > >> >> >> >> > On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh <
> >> > > >> asi...@cloudera.com
> >> > > >> >> >
> >> > > >> >> >> >> wrote:
> >> > > >> >> >> >> >> Good concept. I have a question though.
> >> > > >> >> >> >> >>
> >> > > >> >> >> >> >> Say there are two producers A and B. Both producers
> are
> >> > > >> producing
> >> > > >> >> to
> >> > > >> >> >> >> same
> >> > > >> >> >> >> >> partition.
> >> > > >> >> >> >> >> - A sends a message with expected offset, x1
> >> > > >> >> >> >> >> - Broker accepts is and sends an Ack
> >> > > >> >> >> >> >> - B sends a message with expected offset, x1
> >> > > >> >> >> >> >> - Broker rejects it, sends nack
> >> > > >> >> >> >> >> - B sends message again with expected offset, x1+1
> >> > > >> >> >> >> >> - Broker accepts it and sends Ack
> >> > > >> >> >> >> >> I guess this is what this KIP suggests, right? If
> yes,
> >> > then
> >> > > how
> >> > > >> >> does
> >> > > >> >> >> >> this
> >> > > >> >> >> >> >> ensure that same message will not be written twice
> when
> >> > two
> >> > > >> >> producers
> >> > > >> >> >> >> are
> >> > > >> >> >> >> >> producing to same partition? Producer on receiving a
> >> nack
> >> > > will
> >> > > >> try
> >> > > >> >> >> again
> >> > > >> >> >> >> >> with next offset and will keep doing so till the
> message
> >> > is
> >> > > >> >> accepted.
> >> > > >> >> >> >> Am I
> >> > > >> >> >> >> >> missing something?
> >> > > >> >> >> >> >>
> >> > > >> >> >> >> >> Also, you have mentioned on KIP, "it imposes little
> to
> >> no
> >> > > >> runtime
> >> > > >> >> >> cost
> >> > > >> >> >> >> in
> >> > > >> >> >> >> >> memory or time", I think that is not true for time.
> With
> >> > > this
> >> > > >> >> >> approach
> >> > > >> >> >> >> >> producers' performance will reduce proportionally to
> >> > number
> >> > > of
> >> > > >> >> >> producers
> >> > > >> >> >> >> >> writing to same partition. Please correct me if I am
> >> > missing
> >> > > >> out
> >> > > >> >> >> >> something.
> >> > > >> >> >> >> >>
> >> > > >> >> >> >> >>
> >> > > >> >> >> >> >> On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat <
> >> > > >> >> >> >> >> gharatmayures...@gmail.com> wrote:
> >> > > >> >> >> >> >>
> >> > > >> >> >> >> >>> If we have 2 producers producing to a partition,
> they
> >> can
> >> > > be
> >> > > >> out
> >> > > >> >> of
> >> > > >> >> >> >> order,
> >> > > >> >> >> >> >>> then how does one producer know what offset to
> expect
> >> as
> >> > it
> >> > > >> does
> >> > > >> >> not
> >> > > >> >> >> >> >>> interact with other producer?
> >> > > >> >> >> >> >>>
> >> > > >> >> >> >> >>> Can you give an example flow that explains how it
> works
> >> > > with
> >> > > >> >> single
> >> > > >> >> >> >> >>> producer and with multiple producers?
> >> > > >> >> >> >> >>>
> >> > > >> >> >> >> >>>
> >> > > >> >> >> >> >>> Thanks,
> >> > > >> >> >> >> >>>
> >> > > >> >> >> >> >>> Mayuresh
> >> > > >> >> >> >> >>>
> >> > > >> >> >> >> >>> On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira <
> >> > > >> >> >> >> >>> fpjunque...@yahoo.com.invalid> wrote:
> >> > > >> >> >> >> >>>
> >> > > >> >> >> >> >>> > I like this feature, it reminds me of conditional
> >> > > updates in
> >> > > >> >> >> >> zookeeper.
> >> > > >> >> >> >> >>> > I'm not sure if it'd be best to have some
> mechanism
> >> for
> >> > > >> fencing
> >> > > >> >> >> >> rather
> >> > > >> >> >> >> >>> than
> >> > > >> >> >> >> >>> > a conditional write like you're proposing. The
> reason
> >> > I'm
> >> > > >> >> saying
> >> > > >> >> >> >> this is
> >> > > >> >> >> >> >>> > that the conditional write applies to requests
> >> > > individually,
> >> > > >> >> >> while it
> >> > > >> >> >> >> >>> > sounds like you want to make sure that there is a
> >> > single
> >> > > >> client
> >> > > >> >> >> >> writing
> >> > > >> >> >> >> >>> so
> >> > > >> >> >> >> >>> > over multiple requests.
> >> > > >> >> >> >> >>> >
> >> > > >> >> >> >> >>> > -Flavio
> >> > > >> >> >> >> >>> >
> >> > > >> >> >> >> >>> > > On 17 Jul 2015, at 07:30, Ben Kirwin <
> b...@kirw.in>
> >> > > wrote:
> >> > > >> >> >> >> >>> > >
> >> > > >> >> >> >> >>> > > Hi there,
> >> > > >> >> >> >> >>> > >
> >> > > >> >> >> >> >>> > > I just added a KIP for a 'conditional publish'
> >> > > operation:
> >> > > >> a
> >> > > >> >> >> simple
> >> > > >> >> >> >> >>> > > CAS-like mechanism for the Kafka producer. The
> wiki
> >> > > page
> >> > > >> is
> >> > > >> >> >> here:
> >> > > >> >> >> >> >>> > >
> >> > > >> >> >> >> >>> > >
> >> > > >> >> >> >> >>> >
> >> > > >> >> >> >> >>>
> >> > > >> >> >> >>
> >> > > >> >> >>
> >> > > >> >>
> >> > > >>
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish
> >> > > >> >> >> >> >>> > >
> >> > > >> >> >> >> >>> > > And there's some previous discussion on the
> ticket
> >> > and
> >> > > the
> >> > > >> >> users
> >> > > >> >> >> >> list:
> >> > > >> >> >> >> >>> > >
> >> > > >> >> >> >> >>> > >
> https://issues.apache.org/jira/browse/KAFKA-2260
> >> > > >> >> >> >> >>> > >
> >> > > >> >> >> >> >>> > >
> >> > > >> >> >> >> >>> >
> >> > > >> >> >> >> >>>
> >> > > >> >> >> >>
> >> > > >> >> >>
> >> > > >> >>
> >> > > >>
> >> > >
> >> >
> >>
> https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E
> >> > > >> >> >> >> >>> > >
> >> > > >> >> >> >> >>> > > As always, comments and suggestions are very
> >> welcome.
> >> > > >> >> >> >> >>> > >
> >> > > >> >> >> >> >>> > > Thanks,
> >> > > >> >> >> >> >>> > > Ben
> >> > > >> >> >> >> >>> >
> >> > > >> >> >> >> >>> >
> >> > > >> >> >> >> >>>
> >> > > >> >> >> >> >>>
> >> > > >> >> >> >> >>> --
> >> > > >> >> >> >> >>> -Regards,
> >> > > >> >> >> >> >>> Mayuresh R. Gharat
> >> > > >> >> >> >> >>> (862) 250-7125
> >> > > >> >> >> >> >>>
> >> > > >> >> >> >> >>
> >> > > >> >> >> >> >>
> >> > > >> >> >> >> >>
> >> > > >> >> >> >> >> --
> >> > > >> >> >> >> >>
> >> > > >> >> >> >> >> Regards,
> >> > > >> >> >> >> >> Ashish
> >> > > >> >> >> >>
> >> > > >> >> >>
> >> > > >> >>
> >> > > >>
> >> > > >
> >> > > >
> >> > > >
> >> > > > --
> >> > > > Thanks,
> >> > > > Ewen
> >> > >
> >> >
> >>
>

Reply via email to