Jun, You are right that Jay's example omits the consuming part, and the local state should be maintained only by consumed messages so that all replicas are eventually consistent. Assuming that, the beauty of conditional push is that it is safe for clients to do this independently. It guarantees that a successful update is always based on the most recent value. I think the fact that no master is needed is the greatest advantage.
I think there shouldn't be any automatic retry. An application logic should handle when a conditional push failed. It is very application dependent what to do on failure. In Jay's example, it looks that the code keep modifying and pushing until it succeeds. But I don't think it was what Jay meant. modify(v) may throw an exception if the application decides to give up the update. Regarding the transient failure, isn't it reasonable to assume an application can tell success/failure from data themselves? Anyway, this KIP is not trying to solve a transient failure problem. It should be left to the idempotent producer. On Tue, Aug 4, 2015 at 9:42 AM, Jun Rao <j...@confluent.io> wrote: > Jay, > > The code that you put there seems to be designed for the second case where > there is a master. In the first case when there is no master, updates can > happen from multiple replicas. Therefore, to maintain the local view, each > replica can't just update the local view using only the updates sent to it > directly. Instead, it has to read from the Kafka log to see updates from > other replicas. > > Then the question is what happens when you get a > LeaderNotAvailableException/IOException when calling log_to_kafka(v'). That > call may or may not have succeeded. In the first case, it's hard for a > replica to figure that out since there are other producers to the Kafka > log. If you just retry, you may have modified the value incorrectly. For > example, the conditional publish may have actually succeeded and the local > view will at some point reflect that change. By retrying, we will be > modifying the local view again (e.g., applying +1 twice to a value). > > I agree that it would good to see a more detailed end-to-end design to > determine how conditional publish can be used and how much of the problem > it solves. > > Thanks, > > Jun > > On Tue, Aug 4, 2015 at 9:18 AM, Jay Kreps <j...@confluent.io> wrote: > > > Hey Jun, > > > > Yeah I think Ben is right, both these cases are covered. The algorithm is > > something like > > > > while(true) { > > v = get_local_val(key) > > v' = modify(v) > > try { > > log_to_kafka(v') > > put_local_val(k, v') > > break > > } catch(CasFailureException e) { > > warn("optimistic lock failure) > > } > > > > } > > > > What I have yet to see is a complete design that would make use of this. > I > > think we have two use cases this might (or might not) cover, a > distributed > > log-centric data system and an event-sourced app. Both are actually kind > of > > the same. I think what I haven't really seen is a working through of the > > details. > > > > I think it is important to think through this use case end-to-end. i.e. > how > > is concurrency handled? what about other errors? what about request > > pipelining? how would queries work? > > > > Basically someone should write out notes on how to implement a key-value > > store with a kafka commit log assuming the existence of this feature and > > making use of something like RocksDB. I think that would uncover any > > problems that remain. > > > > -Jay > > > > On Tue, Aug 4, 2015 at 12:46 AM, Ben Kirwin <b...@kirw.in> wrote: > > > > > This is a great summary of the commit log options. Some additional > > > comments: > > > > > > 1. One way to handle a transient failure is to treat it the same way > > > as a conditional publish failure: recompute the post-value before > > > retrying. (I believe this is enough to make this approach work > > > correctly.) > > > > > > 2. You can think of 2 as an 'optimization' of 1: the election > > > mechanism is there to ensure that the conditional publish failures > > > happen very rarely. When there are no conflicts, the conditional > > > publish is essentially free. > > > > > > I guess I think of the zombie master problem like this: given some > > > window of time where two nodes both think they are the master, > > > conditional publish is enough to ensure that only one of the two will > > > successfully publish. However, it's not enough to ensure that the > > > 'new' master is the successful one. This might cause the leadership > > > transition to happen a bit later than it would otherwise, but it > > > doesn't seem to actually impact correctness. > > > > > > On Tue, Aug 4, 2015 at 12:55 AM, Jun Rao <j...@confluent.io> wrote: > > > > A couple of thoughts on the commit log use case. Suppose that we want > > to > > > > maintain multiple replicas of a K/V store backed by a shared Kafka > > > > topic/partition as a commit log. There are two possible ways to use > > Kafka > > > > as a commit log. > > > > > > > > 1. The first approach allows multiple producers to publish to Kafka. > > Each > > > > replica of the data store keeps reading from a Kafka topic/partition > to > > > > refresh the replica's view. Every time a replica gets an update to a > > key > > > > from a client, it combines the update and the current value of the > key > > in > > > > its view and generates a post-value. It then does a conditional > publish > > > to > > > > Kafka with the post-value. The update is successful if the > conditional > > > > publish succeeds. Otherwise, the replica has to recompute the > > post-value > > > > (potentially after the replica's view is refreshed) and retry the > > > > conditional publish. A potential issue with this approach is when > there > > > is > > > > a transient failure during publishing to Kafka (e.g., the leader of > the > > > > partition changes). When this happens, the conditional publish will > get > > > an > > > > error. The replica doesn't know whether the publish actually > succeeded > > or > > > > not. If we just blindly retry, it may not give the correct behavior > > > (e.g., > > > > we could be applying +1 twice). So, not sure if conditional publish > > > itself > > > > is enough for this approach. > > > > > > > > 2. The second approach allows only a single producer to publish to > > Kafka. > > > > We somehow elect one of the replicas to be the master that handles > all > > > > updates. Normally, we don't need conditional publish since there is a > > > > single producer. Conditional publish can potentially be used to deal > > with > > > > duplicates. If the master encounters the same transient failure as > the > > > > above, it can get the latest offset from the Kafka topic/partition to > > see > > > > if the publish actually succeeded or not since it's the only > producer. > > A > > > > potential issue here is to handle the zombie master problem: if the > > > master > > > > has a soft failure and another master is elected, we need to prevent > > the > > > > old master from publishing new data to Kafka. So, for this approach > to > > > work > > > > properly, we need some kind of support of single writer in addition > to > > > > conditional publish. > > > > > > > > > > > > Jiangjie, > > > > > > > > The issue with partial commit is the following. Say we have a batch > of > > 10 > > > > uncompressed messages sent to the leader. The followers only fetched > > the > > > > first 5 messages and then the leader dies. In this case, we only > > > committed > > > > 5 out of the 10 messages. > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > > > > > On Tue, Jul 28, 2015 at 1:16 AM, Daniel Schierbeck < > > > > daniel.schierb...@gmail.com> wrote: > > > > > > > >> Jiangjie: I think giving users the possibility of defining a custom > > > policy > > > >> for handling rejections is a good idea. For instance, this will > allow > > > Kafka > > > >> to act as an event store in an Event Sourcing application. If the > > > event(s) > > > >> are rejected by the store, the original command may need to be > > > re-validated > > > >> against the new state. > > > >> > > > >> On Tue, Jul 28, 2015 at 1:27 AM Jiangjie Qin > > <j...@linkedin.com.invalid > > > > > > > >> wrote: > > > >> > > > >> > @Ewen, good point about batching. Yes, it would be tricky if we > want > > > to > > > >> do > > > >> > a per-key conditional produce. My understanding is that the > > > prerequisite > > > >> of > > > >> > this KIP is: > > > >> > 1. Single producer for each partition. > > > >> > 2. Acks=-1, max.in.flight.request.per.connection=1, > > > >> retries=SOME_BIG_NUMBER > > > >> > > > > >> > The major problem it tries to solve is exact once produce, i.e. > > solve > > > the > > > >> > duplicates from producer side. In that case, a batch will be > > > considered > > > >> as > > > >> > atomic. The only possibility of a batch got rejected should be it > is > > > >> > already appended. So the producer should just move on. > > > >> > > > > >> > It looks to me even a transient multiple producer scenario will > > cause > > > >> issue > > > >> > because user need to think about what should do if a request got > > > rejected > > > >> > and the answer varies for different use cases. > > > >> > > > > >> > Thanks, > > > >> > > > > >> > Jiangjie (Becket) Qin > > > >> > > > > >> > On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin <b...@kirw.in> wrote: > > > >> > > > > >> > > So I had another look at the 'Idempotent Producer' proposal this > > > >> > > afternoon, and made a few notes on how I think they compare; if > > I've > > > >> > > made any mistakes, I'd be delighted if someone with more context > > on > > > >> > > the idempotent producer design would correct me. > > > >> > > > > > >> > > As a first intuition, you can think of the 'conditional publish' > > > >> > > proposal as the special case of the 'idempotent producer' idea, > > > where > > > >> > > there's just a single producer per-partition. The key > observation > > > here > > > >> > > is: if there's only one producer, you can conflate the 'sequence > > > >> > > number' and the expected offset. The conditional publish > proposal > > > uses > > > >> > > existing Kafka offset APIs for roughly the same things as the > > > >> > > idempotent producer proposal uses sequence numbers for -- eg. > > > instead > > > >> > > of having a "lease PID" API that returns the current sequence > > > number, > > > >> > > we can use the existing 'offset API' to retrieve the upcoming > > > offset. > > > >> > > > > > >> > > Both proposals attempt to deal with the situation where there > are > > > >> > > transiently multiple publishers for the same partition (and > PID). > > > The > > > >> > > idempotent producer setup tracks a generation id for each pid, > and > > > >> > > discards any writes with a generation id smaller than the latest > > > >> > > value. Conditional publish is 'first write wins' -- and instead > of > > > >> > > dropping duplicates on the server, it returns an error to the > > > client. > > > >> > > The duplicate-handling behaviour (dropping vs. erroring) has > some > > > >> > > interesting consequences: > > > >> > > > > > >> > > - If all producers are producing the same stream of messages, > > > silently > > > >> > > dropping duplicates on the server is more convenient. (Suppose > we > > > have > > > >> > > a batch of messages 0-9, and the high-water mark on the server > is > > 7. > > > >> > > Idempotent producer, as I read it, would append 7-9 to the > > partition > > > >> > > and return success; meanwhile, conditional publish would fail > the > > > >> > > entire batch.) > > > >> > > > > > >> > > - If producers might be writing different streams of messages, > the > > > >> > > proposed behaviour of the idempotent producer is probably worse > -- > > > >> > > since it can silently interleave messages from two different > > > >> > > producers. This can be a problem for some commit-log style > > > use-cases, > > > >> > > since it can transform a valid series of operations into an > > invalid > > > >> > > one. > > > >> > > > > > >> > > - Given the error-on-duplicate behaviour, it's possible to > > implement > > > >> > > deduplication on the client. (Sketch: if a publish returns an > > error > > > >> > > for some partition, fetch the upcoming offset / sequence number > > for > > > >> > > that partition, and discard all messages with a smaller offset > on > > > the > > > >> > > client before republishing.) > > > >> > > > > > >> > > I think this makes the erroring behaviour more general, though > > > >> > > deduplicating saves a roundtrip or two at conflict time. > > > >> > > > > > >> > > I'm less clear about the behaviour of the generation id, or what > > > >> > > happens when (say) two producers with the same generation id are > > > spun > > > >> > > up at the same time. I'd be interested in hearing other folks' > > > >> > > comments on this. > > > >> > > > > > >> > > Ewen: I'm not sure I understand the questions well enough to > > answer > > > >> > > properly, but some quick notes: > > > >> > > - I don't think it makes sense to assign an expected offset > > without > > > >> > > already having assigned a partition. If the producer code is > doing > > > the > > > >> > > partition assignment, it should probably do the offset > assignment > > > >> > > too... or we could just let application code handle both. > > > >> > > - I'm not aware of any case where reassigning offsets to > messages > > > >> > > automatically after an offset mismatch makes sense: in the cases > > > we've > > > >> > > discussed, it seems like either it's safe to drop duplicates, or > > we > > > >> > > want to handle the error at the application level. > > > >> > > > > > >> > > I'm going to try and come with an idempotent-producer-type > example > > > >> > > that works with the draft patch in the next few days, so > hopefully > > > >> > > we'll have something more concrete to discuss. Otherwise -- if > you > > > >> > > have a clear idea of how eg. sequence number assignment would > work > > > in > > > >> > > the idempotent-producer proposal, we could probably translate > that > > > >> > > over to get the equivalent for the conditional publish API. > > > >> > > > > > >> > > > > > >> > > > > > >> > > On Fri, Jul 24, 2015 at 2:16 AM, Ewen Cheslack-Postava > > > >> > > <e...@confluent.io> wrote: > > > >> > > > @Becket - for compressed batches, I think this just works out > > > given > > > >> the > > > >> > > KIP > > > >> > > > as described. Without the change you're referring to, it still > > > only > > > >> > makes > > > >> > > > sense to batch messages with this KIP if all the expected > > offsets > > > are > > > >> > > > sequential (else some messages are guaranteed to fail). I > think > > > that > > > >> > > > probably just works out, but raises an issue I brought up on > the > > > KIP > > > >> > > call. > > > >> > > > > > > >> > > > Batching can be a bit weird with this proposal. If you try to > > > write > > > >> > key A > > > >> > > > and key B, the second operation is dependent on the first. > Which > > > >> means > > > >> > to > > > >> > > > make an effective client for this, we need to keep track of > > > >> > per-partition > > > >> > > > offsets so we can set expected offsets properly. For example, > > if A > > > >> was > > > >> > > > expected to publish at offset 10, then if B was published to > the > > > same > > > >> > > > partition, we need to make sure it's marked as expected offset > > 11 > > > >> > > (assuming > > > >> > > > no subpartition high water marks). We either need to have the > > > >> > application > > > >> > > > keep track of this itself and set the offsets, which requires > > > that it > > > >> > > know > > > >> > > > about how keys map to partitions, or the client needs to > manage > > > this > > > >> > > > process. But if the client manages it, I think the client gets > > > quite > > > >> a > > > >> > > bit > > > >> > > > more complicated. If the produce request containing A fails, > > what > > > >> > happens > > > >> > > > to B? Are there retries that somehow update the expected > offset, > > > or > > > >> do > > > >> > we > > > >> > > > just give up since we know it's always going to fail with the > > > >> expected > > > >> > > > offset that was automatically assigned to it? > > > >> > > > > > > >> > > > One way to handle this is to use Yasuhiro's idea of increasing > > the > > > >> > > > granularity of high watermarks using subpartitions. But I > guess > > my > > > >> > > question > > > >> > > > is: if one producer client is writing many keys, and some of > > those > > > >> keys > > > >> > > are > > > >> > > > produced to the same partition, and those messages are > batched, > > > what > > > >> > > > happens? Do we end up with lots of failed messages? Or do we > > have > > > >> > > > complicated logic in the producer to figure out what the right > > > >> expected > > > >> > > > offset for each message is? Or do they all share the same base > > > >> expected > > > >> > > > offset as in the compressed case, in which case they all share > > the > > > >> same > > > >> > > > fate and subpartitioning doesn't help? Or is there a simpler > > > solution > > > >> > I'm > > > >> > > > just not seeing? Maybe this just disables batching entirely > and > > > >> > > throughput > > > >> > > > isn't an issue in these cases? > > > >> > > > > > > >> > > > Sorry, I know that's probably not entirely clear, but that's > > > because > > > >> > I'm > > > >> > > > very uncertain of how batching works with this KIP. > > > >> > > > > > > >> > > > > > > >> > > > On how this relates to other proposals: I think it might also > be > > > >> > helpful > > > >> > > to > > > >> > > > get an overview of all the proposals for relevant > modifications > > to > > > >> > > > producers/produce requests since many of these proposals are > > > possibly > > > >> > > > alternatives (though some may not be mutually exclusive). Many > > > people > > > >> > > don't > > > >> > > > have all the context from the past couple of years of the > > project. > > > >> Are > > > >> > > > there any other relevant wikis or docs besides the following? > > > >> > > > > > > >> > > > > > > >> > https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata > > > >> > > > > > > >> > > > -Ewen > > > >> > > > > > > >> > > > On Wed, Jul 22, 2015 at 11:18 AM, Gwen Shapira < > > > >> gshap...@cloudera.com> > > > >> > > > wrote: > > > >> > > > > > > >> > > >> Tangent: I think we should complete the move of Produce / > Fetch > > > RPC > > > >> to > > > >> > > >> the client libraries before we add more revisions to this > > > protocol. > > > >> > > >> > > > >> > > >> On Wed, Jul 22, 2015 at 11:02 AM, Jiangjie Qin > > > >> > > >> <j...@linkedin.com.invalid> wrote: > > > >> > > >> > I missed yesterday's KIP hangout. I'm currently working on > > > another > > > >> > KIP > > > >> > > >> for > > > >> > > >> > enriched metadata of messages. Guozhang has already > created a > > > wiki > > > >> > > page > > > >> > > >> > before ( > > > >> > > >> > > > > >> > > >> > > > >> > > > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata > > > >> > > >> ). > > > >> > > >> > We plan to fill the relative offset to the offset field in > > the > > > >> batch > > > >> > > sent > > > >> > > >> > by producer to avoid broker side re-compression. The > message > > > >> offset > > > >> > > would > > > >> > > >> > become batch base offset + relative offset. I guess maybe > the > > > >> > expected > > > >> > > >> > offset in KIP-27 can be only set for base offset? Would > that > > > >> affect > > > >> > > >> certain > > > >> > > >> > use cases? > > > >> > > >> > > > > >> > > >> > For Jun's comments, I am not sure I completely get it. I > > think > > > the > > > >> > > >> producer > > > >> > > >> > only sends one batch per partition in a request. So either > > that > > > >> > batch > > > >> > > is > > > >> > > >> > appended or not. Why a batch would be partially committed? > > > >> > > >> > > > > >> > > >> > Thanks, > > > >> > > >> > > > > >> > > >> > Jiangjie (Becket) Qin > > > >> > > >> > > > > >> > > >> > On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin <b...@kirw.in> > > > wrote: > > > >> > > >> > > > > >> > > >> >> That's a fair point. I've added some imagined job logic to > > the > > > >> KIP, > > > >> > > so > > > >> > > >> >> we can make sure the proposal stays in sync with the > usages > > > we're > > > >> > > >> >> discussing. (The logic is just a quick sketch for now -- I > > > expect > > > >> > > I'll > > > >> > > >> >> need to elaborate it as we get into more detail, or to > > address > > > >> > other > > > >> > > >> >> concerns...) > > > >> > > >> >> > > > >> > > >> >> On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao < > j...@confluent.io > > > > > > >> > wrote: > > > >> > > >> >> > For 1, yes, when there is a transient leader change, > it's > > > >> > > guaranteed > > > >> > > >> >> that a > > > >> > > >> >> > prefix of the messages in a request will be committed. > > > However, > > > >> > it > > > >> > > >> seems > > > >> > > >> >> > that the client needs to know what subset of messages > are > > > >> > > committed in > > > >> > > >> >> > order to resume the sending. Then the question is how. > > > >> > > >> >> > > > > >> > > >> >> > As Flavio indicated, for the use cases that you listed, > it > > > >> would > > > >> > be > > > >> > > >> >> useful > > > >> > > >> >> > to figure out the exact logic by using this feature. For > > > >> example, > > > >> > > in > > > >> > > >> the > > > >> > > >> >> > partition K/V store example, when we fail over to a new > > > writer > > > >> to > > > >> > > the > > > >> > > >> >> > commit log, the zombie writer can publish new messages > to > > > the > > > >> log > > > >> > > >> after > > > >> > > >> >> the > > > >> > > >> >> > new writer takes over, but before it publishes any > > message. > > > We > > > >> > > >> probably > > > >> > > >> >> > need to outline how this case can be handled properly. > > > >> > > >> >> > > > > >> > > >> >> > Thanks, > > > >> > > >> >> > > > > >> > > >> >> > Jun > > > >> > > >> >> > > > > >> > > >> >> > On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin < > b...@kirw.in > > > > > > >> > wrote: > > > >> > > >> >> > > > > >> > > >> >> >> Hi Jun, > > > >> > > >> >> >> > > > >> > > >> >> >> Thanks for the close reading! Responses inline. > > > >> > > >> >> >> > > > >> > > >> >> >> > Thanks for the write-up. The single producer use case > > you > > > >> > > mentioned > > > >> > > >> >> makes > > > >> > > >> >> >> > sense. It would be useful to include that in the KIP > > > wiki. > > > >> > > >> >> >> > > > >> > > >> >> >> Great -- I'll make sure that the wiki is clear about > > this. > > > >> > > >> >> >> > > > >> > > >> >> >> > 1. What happens when the leader of the partition > > changes > > > in > > > >> > the > > > >> > > >> middle > > > >> > > >> >> >> of a > > > >> > > >> >> >> > produce request? In this case, the producer client is > > not > > > >> sure > > > >> > > >> whether > > > >> > > >> >> >> the > > > >> > > >> >> >> > request succeeds or not. If there is only a single > > > message > > > >> in > > > >> > > the > > > >> > > >> >> >> request, > > > >> > > >> >> >> > the producer can just resend the request. If it sees > an > > > >> > > >> OffsetMismatch > > > >> > > >> >> >> > error, it knows that the previous send actually > > succeeded > > > >> and > > > >> > > can > > > >> > > >> >> proceed > > > >> > > >> >> >> > with the next write. This is nice since it not only > > > allows > > > >> the > > > >> > > >> >> producer > > > >> > > >> >> >> to > > > >> > > >> >> >> > proceed during transient failures in the broker, it > > also > > > >> > avoids > > > >> > > >> >> >> duplicates > > > >> > > >> >> >> > during producer resend. One caveat is when there are > > > >> multiple > > > >> > > >> >> messages in > > > >> > > >> >> >> > the same partition in a produce request. The issue is > > > that > > > >> in > > > >> > > our > > > >> > > >> >> current > > > >> > > >> >> >> > replication protocol, it's possible for some, but not > > all > > > >> > > messages > > > >> > > >> in > > > >> > > >> >> the > > > >> > > >> >> >> > request to be committed. This makes resend a bit > harder > > > to > > > >> > deal > > > >> > > >> with > > > >> > > >> >> >> since > > > >> > > >> >> >> > on receiving an OffsetMismatch error, it's not clear > > > which > > > >> > > messages > > > >> > > >> >> have > > > >> > > >> >> >> > been committed. One possibility is to expect that > > > >> compression > > > >> > is > > > >> > > >> >> enabled, > > > >> > > >> >> >> > in which case multiple messages are compressed into a > > > single > > > >> > > >> message. > > > >> > > >> >> I > > > >> > > >> >> >> was > > > >> > > >> >> >> > thinking that another possibility is for the broker > to > > > >> return > > > >> > > the > > > >> > > >> >> current > > > >> > > >> >> >> > high watermark when sending an OffsetMismatch error. > > > Based > > > >> on > > > >> > > this > > > >> > > >> >> info, > > > >> > > >> >> >> > the producer can resend the subset of messages that > > have > > > not > > > >> > > been > > > >> > > >> >> >> > committed. However, this may not work in a compacted > > > topic > > > >> > since > > > >> > > >> there > > > >> > > >> >> >> can > > > >> > > >> >> >> > be holes in the offset. > > > >> > > >> >> >> > > > >> > > >> >> >> This is a excellent question. It's my understanding > that > > at > > > >> > least > > > >> > > a > > > >> > > >> >> >> *prefix* of messages will be committed (right?) -- > which > > > seems > > > >> > to > > > >> > > be > > > >> > > >> >> >> enough for many cases. I'll try and come up with a more > > > >> concrete > > > >> > > >> >> >> answer here. > > > >> > > >> >> >> > > > >> > > >> >> >> > 2. Is this feature only intended to be used with ack > = > > > all? > > > >> > The > > > >> > > >> client > > > >> > > >> >> >> > doesn't get the offset with ack = 0. With ack = 1, > it's > > > >> > possible > > > >> > > >> for a > > > >> > > >> >> >> > previously acked message to be lost during leader > > > >> transition, > > > >> > > which > > > >> > > >> >> will > > > >> > > >> >> >> > make the client logic more complicated. > > > >> > > >> >> >> > > > >> > > >> >> >> It's true that acks = 0 doesn't seem to be particularly > > > >> useful; > > > >> > in > > > >> > > >> all > > > >> > > >> >> >> the cases I've come across, the client eventually wants > > to > > > >> know > > > >> > > about > > > >> > > >> >> >> the mismatch error. However, it seems like there are > some > > > >> cases > > > >> > > where > > > >> > > >> >> >> acks = 1 would be fine -- eg. in a bulk load of a fixed > > > >> dataset, > > > >> > > >> >> >> losing messages during a leader transition just means > you > > > need > > > >> > to > > > >> > > >> >> >> rewind / restart the load, which is not especially > > > >> catastrophic. > > > >> > > For > > > >> > > >> >> >> many other interesting cases, acks = all is probably > > > >> preferable. > > > >> > > >> >> >> > > > >> > > >> >> >> > 3. How does the producer client know the offset to > send > > > the > > > >> > > first > > > >> > > >> >> >> message? > > > >> > > >> >> >> > Do we need to expose an API in producer to get the > > > current > > > >> > high > > > >> > > >> >> >> watermark? > > > >> > > >> >> >> > > > >> > > >> >> >> You're right, it might be irritating to have to go > > through > > > the > > > >> > > >> >> >> consumer API just for this. There are some cases where > > the > > > >> > offsets > > > >> > > >> are > > > >> > > >> >> >> already available -- like the commit-log-for-KV-store > > > example > > > >> -- > > > >> > > but > > > >> > > >> >> >> in general, being able to get the offsets from the > > producer > > > >> > > interface > > > >> > > >> >> >> does sound convenient. > > > >> > > >> >> >> > > > >> > > >> >> >> > We plan to have a KIP discussion meeting tomorrow at > > 11am > > > >> PST. > > > >> > > >> Perhaps > > > >> > > >> >> >> you > > > >> > > >> >> >> > can describe this KIP a bit then? > > > >> > > >> >> >> > > > >> > > >> >> >> Sure, happy to join. > > > >> > > >> >> >> > > > >> > > >> >> >> > Thanks, > > > >> > > >> >> >> > > > > >> > > >> >> >> > Jun > > > >> > > >> >> >> > > > > >> > > >> >> >> > > > > >> > > >> >> >> > > > > >> > > >> >> >> > On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin < > > > b...@kirw.in> > > > >> > > wrote: > > > >> > > >> >> >> > > > > >> > > >> >> >> >> Just wanted to flag a little discussion that > happened > > on > > > >> the > > > >> > > >> ticket: > > > >> > > >> >> >> >> > > > >> > > >> >> >> >> > > > >> > > >> >> >> > > > >> > > >> >> > > > >> > > >> > > > >> > > > > > >> > > > > >> > > > > > > https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259 > > > >> > > >> >> >> >> > > > >> > > >> >> >> >> In particular, Yasuhiro Matsuda proposed an > > interesting > > > >> > > variant on > > > >> > > >> >> >> >> this that performs the offset check on the message > key > > > >> > > (instead of > > > >> > > >> >> >> >> just the partition), with bounded space > requirements, > > at > > > >> the > > > >> > > cost > > > >> > > >> of > > > >> > > >> >> >> >> potentially some spurious failures. (ie. the produce > > > >> request > > > >> > > may > > > >> > > >> fail > > > >> > > >> >> >> >> even if that particular key hasn't been updated > > > recently.) > > > >> > This > > > >> > > >> >> >> >> addresses a couple of the drawbacks of the per-key > > > approach > > > >> > > >> mentioned > > > >> > > >> >> >> >> at the bottom of the KIP. > > > >> > > >> >> >> >> > > > >> > > >> >> >> >> On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin < > > > b...@kirw.in> > > > >> > > wrote: > > > >> > > >> >> >> >> > Hi all, > > > >> > > >> >> >> >> > > > > >> > > >> >> >> >> > So, perhaps it's worth adding a couple specific > > > examples > > > >> of > > > >> > > >> where > > > >> > > >> >> this > > > >> > > >> >> >> >> > feature is useful, to make this a bit more > concrete: > > > >> > > >> >> >> >> > > > > >> > > >> >> >> >> > - Suppose I'm using Kafka as a commit log for a > > > >> partitioned > > > >> > > KV > > > >> > > >> >> store, > > > >> > > >> >> >> >> > like Samza or Pistachio (?) do. We bootstrap the > > > process > > > >> > > state > > > >> > > >> by > > > >> > > >> >> >> >> > reading from that partition, and log all state > > > updates to > > > >> > > that > > > >> > > >> >> >> >> > partition when we're running. Now imagine that one > > of > > > my > > > >> > > >> processes > > > >> > > >> >> >> >> > locks up -- GC or similar -- and the system > > > transitions > > > >> > that > > > >> > > >> >> partition > > > >> > > >> >> >> >> > over to another node. When the GC is finished, the > > old > > > >> > > 'owner' > > > >> > > >> of > > > >> > > >> >> that > > > >> > > >> >> >> >> > partition might still be trying to write to the > > commit > > > >> log > > > >> > at > > > >> > > >> the > > > >> > > >> >> same > > > >> > > >> >> >> >> > as the new one is. A process might detect this by > > > >> noticing > > > >> > > that > > > >> > > >> the > > > >> > > >> >> >> >> > offset of the published message is bigger than it > > > thought > > > >> > the > > > >> > > >> >> upcoming > > > >> > > >> >> >> >> > offset was, which implies someone else has been > > > writing > > > >> to > > > >> > > the > > > >> > > >> >> log... > > > >> > > >> >> >> >> > but by then it's too late, and the commit log is > > > already > > > >> > > >> corrupt. > > > >> > > >> >> With > > > >> > > >> >> >> >> > a 'conditional produce', one of those processes > will > > > have > > > >> > > it's > > > >> > > >> >> publish > > > >> > > >> >> >> >> > request refused -- so we've avoided corrupting the > > > state. > > > >> > > >> >> >> >> > > > > >> > > >> >> >> >> > - Envision some copycat-like system, where we have > > > some > > > >> > > sharded > > > >> > > >> >> >> >> > postgres setup and we're tailing each shard into > its > > > own > > > >> > > >> partition. > > > >> > > >> >> >> >> > Normally, it's fairly easy to avoid duplicates > here: > > > we > > > >> can > > > >> > > >> track > > > >> > > >> >> >> >> > which offset in the WAL corresponds to which > offset > > in > > > >> > Kafka, > > > >> > > >> and > > > >> > > >> >> we > > > >> > > >> >> >> >> > know how many messages we've written to Kafka > > > already, so > > > >> > the > > > >> > > >> >> state is > > > >> > > >> >> >> >> > very simple. However, it is possible that for a > > > moment -- > > > >> > > due to > > > >> > > >> >> >> >> > rebalancing or operator error or some other thing > -- > > > two > > > >> > > >> different > > > >> > > >> >> >> >> > nodes are tailing the same postgres shard at once! > > > >> Normally > > > >> > > this > > > >> > > >> >> would > > > >> > > >> >> >> >> > introduce duplicate messages, but by specifying > the > > > >> > expected > > > >> > > >> >> offset, > > > >> > > >> >> >> >> > we can avoid this. > > > >> > > >> >> >> >> > > > > >> > > >> >> >> >> > So perhaps it's better to say that this is useful > > > when a > > > >> > > single > > > >> > > >> >> >> >> > producer is *expected*, but multiple producers are > > > >> > > *possible*? > > > >> > > >> (In > > > >> > > >> >> the > > > >> > > >> >> >> >> > same way that the high-level consumer normally > has 1 > > > >> > consumer > > > >> > > >> in a > > > >> > > >> >> >> >> > group reading from a partition, but there are > small > > > >> windows > > > >> > > >> where > > > >> > > >> >> more > > > >> > > >> >> >> >> > than one might be reading at the same time.) This > is > > > also > > > >> > the > > > >> > > >> >> spirit > > > >> > > >> >> >> >> > of the 'runtime cost' comment -- in the common > case, > > > >> where > > > >> > > >> there is > > > >> > > >> >> >> >> > little to no contention, there's no performance > > > overhead > > > >> > > >> either. I > > > >> > > >> >> >> >> > mentioned this a little in the Motivation section > -- > > > >> maybe > > > >> > I > > > >> > > >> should > > > >> > > >> >> >> >> > flesh that out a little bit? > > > >> > > >> >> >> >> > > > > >> > > >> >> >> >> > For me, the motivation to work this up was that I > > kept > > > >> > > running > > > >> > > >> into > > > >> > > >> >> >> >> > cases, like the above, where the existing API was > > > >> > > >> >> almost-but-not-quite > > > >> > > >> >> >> >> > enough to give the guarantees I was looking for -- > > and > > > >> the > > > >> > > >> >> extension > > > >> > > >> >> >> >> > needed to handle those cases too was pretty small > > and > > > >> > > >> >> natural-feeling. > > > >> > > >> >> >> >> > > > > >> > > >> >> >> >> > On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh < > > > >> > > >> asi...@cloudera.com > > > >> > > >> >> > > > > >> > > >> >> >> >> wrote: > > > >> > > >> >> >> >> >> Good concept. I have a question though. > > > >> > > >> >> >> >> >> > > > >> > > >> >> >> >> >> Say there are two producers A and B. Both > producers > > > are > > > >> > > >> producing > > > >> > > >> >> to > > > >> > > >> >> >> >> same > > > >> > > >> >> >> >> >> partition. > > > >> > > >> >> >> >> >> - A sends a message with expected offset, x1 > > > >> > > >> >> >> >> >> - Broker accepts is and sends an Ack > > > >> > > >> >> >> >> >> - B sends a message with expected offset, x1 > > > >> > > >> >> >> >> >> - Broker rejects it, sends nack > > > >> > > >> >> >> >> >> - B sends message again with expected offset, > x1+1 > > > >> > > >> >> >> >> >> - Broker accepts it and sends Ack > > > >> > > >> >> >> >> >> I guess this is what this KIP suggests, right? If > > > yes, > > > >> > then > > > >> > > how > > > >> > > >> >> does > > > >> > > >> >> >> >> this > > > >> > > >> >> >> >> >> ensure that same message will not be written > twice > > > when > > > >> > two > > > >> > > >> >> producers > > > >> > > >> >> >> >> are > > > >> > > >> >> >> >> >> producing to same partition? Producer on > receiving > > a > > > >> nack > > > >> > > will > > > >> > > >> try > > > >> > > >> >> >> again > > > >> > > >> >> >> >> >> with next offset and will keep doing so till the > > > message > > > >> > is > > > >> > > >> >> accepted. > > > >> > > >> >> >> >> Am I > > > >> > > >> >> >> >> >> missing something? > > > >> > > >> >> >> >> >> > > > >> > > >> >> >> >> >> Also, you have mentioned on KIP, "it imposes > little > > > to > > > >> no > > > >> > > >> runtime > > > >> > > >> >> >> cost > > > >> > > >> >> >> >> in > > > >> > > >> >> >> >> >> memory or time", I think that is not true for > time. > > > With > > > >> > > this > > > >> > > >> >> >> approach > > > >> > > >> >> >> >> >> producers' performance will reduce proportionally > > to > > > >> > number > > > >> > > of > > > >> > > >> >> >> producers > > > >> > > >> >> >> >> >> writing to same partition. Please correct me if I > > am > > > >> > missing > > > >> > > >> out > > > >> > > >> >> >> >> something. > > > >> > > >> >> >> >> >> > > > >> > > >> >> >> >> >> > > > >> > > >> >> >> >> >> On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh > Gharat < > > > >> > > >> >> >> >> >> gharatmayures...@gmail.com> wrote: > > > >> > > >> >> >> >> >> > > > >> > > >> >> >> >> >>> If we have 2 producers producing to a partition, > > > they > > > >> can > > > >> > > be > > > >> > > >> out > > > >> > > >> >> of > > > >> > > >> >> >> >> order, > > > >> > > >> >> >> >> >>> then how does one producer know what offset to > > > expect > > > >> as > > > >> > it > > > >> > > >> does > > > >> > > >> >> not > > > >> > > >> >> >> >> >>> interact with other producer? > > > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> >>> Can you give an example flow that explains how > it > > > works > > > >> > > with > > > >> > > >> >> single > > > >> > > >> >> >> >> >>> producer and with multiple producers? > > > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> >>> Thanks, > > > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> >>> Mayuresh > > > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> >>> On Fri, Jul 17, 2015 at 10:28 AM, Flavio > > Junqueira < > > > >> > > >> >> >> >> >>> fpjunque...@yahoo.com.invalid> wrote: > > > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> >>> > I like this feature, it reminds me of > > conditional > > > >> > > updates in > > > >> > > >> >> >> >> zookeeper. > > > >> > > >> >> >> >> >>> > I'm not sure if it'd be best to have some > > > mechanism > > > >> for > > > >> > > >> fencing > > > >> > > >> >> >> >> rather > > > >> > > >> >> >> >> >>> than > > > >> > > >> >> >> >> >>> > a conditional write like you're proposing. The > > > reason > > > >> > I'm > > > >> > > >> >> saying > > > >> > > >> >> >> >> this is > > > >> > > >> >> >> >> >>> > that the conditional write applies to requests > > > >> > > individually, > > > >> > > >> >> >> while it > > > >> > > >> >> >> >> >>> > sounds like you want to make sure that there > is > > a > > > >> > single > > > >> > > >> client > > > >> > > >> >> >> >> writing > > > >> > > >> >> >> >> >>> so > > > >> > > >> >> >> >> >>> > over multiple requests. > > > >> > > >> >> >> >> >>> > > > > >> > > >> >> >> >> >>> > -Flavio > > > >> > > >> >> >> >> >>> > > > > >> > > >> >> >> >> >>> > > On 17 Jul 2015, at 07:30, Ben Kirwin < > > > b...@kirw.in> > > > >> > > wrote: > > > >> > > >> >> >> >> >>> > > > > > >> > > >> >> >> >> >>> > > Hi there, > > > >> > > >> >> >> >> >>> > > > > > >> > > >> >> >> >> >>> > > I just added a KIP for a 'conditional > publish' > > > >> > > operation: > > > >> > > >> a > > > >> > > >> >> >> simple > > > >> > > >> >> >> >> >>> > > CAS-like mechanism for the Kafka producer. > The > > > wiki > > > >> > > page > > > >> > > >> is > > > >> > > >> >> >> here: > > > >> > > >> >> >> >> >>> > > > > > >> > > >> >> >> >> >>> > > > > > >> > > >> >> >> >> >>> > > > > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> > > > >> > > >> >> >> > > > >> > > >> >> > > > >> > > >> > > > >> > > > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish > > > >> > > >> >> >> >> >>> > > > > > >> > > >> >> >> >> >>> > > And there's some previous discussion on the > > > ticket > > > >> > and > > > >> > > the > > > >> > > >> >> users > > > >> > > >> >> >> >> list: > > > >> > > >> >> >> >> >>> > > > > > >> > > >> >> >> >> >>> > > > > > https://issues.apache.org/jira/browse/KAFKA-2260 > > > >> > > >> >> >> >> >>> > > > > > >> > > >> >> >> >> >>> > > > > > >> > > >> >> >> >> >>> > > > > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> > > > >> > > >> >> >> > > > >> > > >> >> > > > >> > > >> > > > >> > > > > > >> > > > > >> > > > > > > https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E > > > >> > > >> >> >> >> >>> > > > > > >> > > >> >> >> >> >>> > > As always, comments and suggestions are very > > > >> welcome. > > > >> > > >> >> >> >> >>> > > > > > >> > > >> >> >> >> >>> > > Thanks, > > > >> > > >> >> >> >> >>> > > Ben > > > >> > > >> >> >> >> >>> > > > > >> > > >> >> >> >> >>> > > > > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> >>> -- > > > >> > > >> >> >> >> >>> -Regards, > > > >> > > >> >> >> >> >>> Mayuresh R. Gharat > > > >> > > >> >> >> >> >>> (862) 250-7125 > > > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> >> > > > >> > > >> >> >> >> >> > > > >> > > >> >> >> >> >> > > > >> > > >> >> >> >> >> -- > > > >> > > >> >> >> >> >> > > > >> > > >> >> >> >> >> Regards, > > > >> > > >> >> >> >> >> Ashish > > > >> > > >> >> >> >> > > > >> > > >> >> >> > > > >> > > >> >> > > > >> > > >> > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > -- > > > >> > > > Thanks, > > > >> > > > Ewen > > > >> > > > > > >> > > > > >> > > > > > >