Hey Jun, Yeah I think Ben is right, both these cases are covered. The algorithm is something like
while(true) { v = get_local_val(key) v' = modify(v) try { log_to_kafka(v') put_local_val(k, v') break } catch(CasFailureException e) { warn("optimistic lock failure) } } What I have yet to see is a complete design that would make use of this. I think we have two use cases this might (or might not) cover, a distributed log-centric data system and an event-sourced app. Both are actually kind of the same. I think what I haven't really seen is a working through of the details. I think it is important to think through this use case end-to-end. i.e. how is concurrency handled? what about other errors? what about request pipelining? how would queries work? Basically someone should write out notes on how to implement a key-value store with a kafka commit log assuming the existence of this feature and making use of something like RocksDB. I think that would uncover any problems that remain. -Jay On Tue, Aug 4, 2015 at 12:46 AM, Ben Kirwin <b...@kirw.in> wrote: > This is a great summary of the commit log options. Some additional > comments: > > 1. One way to handle a transient failure is to treat it the same way > as a conditional publish failure: recompute the post-value before > retrying. (I believe this is enough to make this approach work > correctly.) > > 2. You can think of 2 as an 'optimization' of 1: the election > mechanism is there to ensure that the conditional publish failures > happen very rarely. When there are no conflicts, the conditional > publish is essentially free. > > I guess I think of the zombie master problem like this: given some > window of time where two nodes both think they are the master, > conditional publish is enough to ensure that only one of the two will > successfully publish. However, it's not enough to ensure that the > 'new' master is the successful one. This might cause the leadership > transition to happen a bit later than it would otherwise, but it > doesn't seem to actually impact correctness. > > On Tue, Aug 4, 2015 at 12:55 AM, Jun Rao <j...@confluent.io> wrote: > > A couple of thoughts on the commit log use case. Suppose that we want to > > maintain multiple replicas of a K/V store backed by a shared Kafka > > topic/partition as a commit log. There are two possible ways to use Kafka > > as a commit log. > > > > 1. The first approach allows multiple producers to publish to Kafka. Each > > replica of the data store keeps reading from a Kafka topic/partition to > > refresh the replica's view. Every time a replica gets an update to a key > > from a client, it combines the update and the current value of the key in > > its view and generates a post-value. It then does a conditional publish > to > > Kafka with the post-value. The update is successful if the conditional > > publish succeeds. Otherwise, the replica has to recompute the post-value > > (potentially after the replica's view is refreshed) and retry the > > conditional publish. A potential issue with this approach is when there > is > > a transient failure during publishing to Kafka (e.g., the leader of the > > partition changes). When this happens, the conditional publish will get > an > > error. The replica doesn't know whether the publish actually succeeded or > > not. If we just blindly retry, it may not give the correct behavior > (e.g., > > we could be applying +1 twice). So, not sure if conditional publish > itself > > is enough for this approach. > > > > 2. The second approach allows only a single producer to publish to Kafka. > > We somehow elect one of the replicas to be the master that handles all > > updates. Normally, we don't need conditional publish since there is a > > single producer. Conditional publish can potentially be used to deal with > > duplicates. If the master encounters the same transient failure as the > > above, it can get the latest offset from the Kafka topic/partition to see > > if the publish actually succeeded or not since it's the only producer. A > > potential issue here is to handle the zombie master problem: if the > master > > has a soft failure and another master is elected, we need to prevent the > > old master from publishing new data to Kafka. So, for this approach to > work > > properly, we need some kind of support of single writer in addition to > > conditional publish. > > > > > > Jiangjie, > > > > The issue with partial commit is the following. Say we have a batch of 10 > > uncompressed messages sent to the leader. The followers only fetched the > > first 5 messages and then the leader dies. In this case, we only > committed > > 5 out of the 10 messages. > > > > Thanks, > > > > Jun > > > > > > On Tue, Jul 28, 2015 at 1:16 AM, Daniel Schierbeck < > > daniel.schierb...@gmail.com> wrote: > > > >> Jiangjie: I think giving users the possibility of defining a custom > policy > >> for handling rejections is a good idea. For instance, this will allow > Kafka > >> to act as an event store in an Event Sourcing application. If the > event(s) > >> are rejected by the store, the original command may need to be > re-validated > >> against the new state. > >> > >> On Tue, Jul 28, 2015 at 1:27 AM Jiangjie Qin <j...@linkedin.com.invalid > > > >> wrote: > >> > >> > @Ewen, good point about batching. Yes, it would be tricky if we want > to > >> do > >> > a per-key conditional produce. My understanding is that the > prerequisite > >> of > >> > this KIP is: > >> > 1. Single producer for each partition. > >> > 2. Acks=-1, max.in.flight.request.per.connection=1, > >> retries=SOME_BIG_NUMBER > >> > > >> > The major problem it tries to solve is exact once produce, i.e. solve > the > >> > duplicates from producer side. In that case, a batch will be > considered > >> as > >> > atomic. The only possibility of a batch got rejected should be it is > >> > already appended. So the producer should just move on. > >> > > >> > It looks to me even a transient multiple producer scenario will cause > >> issue > >> > because user need to think about what should do if a request got > rejected > >> > and the answer varies for different use cases. > >> > > >> > Thanks, > >> > > >> > Jiangjie (Becket) Qin > >> > > >> > On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin <b...@kirw.in> wrote: > >> > > >> > > So I had another look at the 'Idempotent Producer' proposal this > >> > > afternoon, and made a few notes on how I think they compare; if I've > >> > > made any mistakes, I'd be delighted if someone with more context on > >> > > the idempotent producer design would correct me. > >> > > > >> > > As a first intuition, you can think of the 'conditional publish' > >> > > proposal as the special case of the 'idempotent producer' idea, > where > >> > > there's just a single producer per-partition. The key observation > here > >> > > is: if there's only one producer, you can conflate the 'sequence > >> > > number' and the expected offset. The conditional publish proposal > uses > >> > > existing Kafka offset APIs for roughly the same things as the > >> > > idempotent producer proposal uses sequence numbers for -- eg. > instead > >> > > of having a "lease PID" API that returns the current sequence > number, > >> > > we can use the existing 'offset API' to retrieve the upcoming > offset. > >> > > > >> > > Both proposals attempt to deal with the situation where there are > >> > > transiently multiple publishers for the same partition (and PID). > The > >> > > idempotent producer setup tracks a generation id for each pid, and > >> > > discards any writes with a generation id smaller than the latest > >> > > value. Conditional publish is 'first write wins' -- and instead of > >> > > dropping duplicates on the server, it returns an error to the > client. > >> > > The duplicate-handling behaviour (dropping vs. erroring) has some > >> > > interesting consequences: > >> > > > >> > > - If all producers are producing the same stream of messages, > silently > >> > > dropping duplicates on the server is more convenient. (Suppose we > have > >> > > a batch of messages 0-9, and the high-water mark on the server is 7. > >> > > Idempotent producer, as I read it, would append 7-9 to the partition > >> > > and return success; meanwhile, conditional publish would fail the > >> > > entire batch.) > >> > > > >> > > - If producers might be writing different streams of messages, the > >> > > proposed behaviour of the idempotent producer is probably worse -- > >> > > since it can silently interleave messages from two different > >> > > producers. This can be a problem for some commit-log style > use-cases, > >> > > since it can transform a valid series of operations into an invalid > >> > > one. > >> > > > >> > > - Given the error-on-duplicate behaviour, it's possible to implement > >> > > deduplication on the client. (Sketch: if a publish returns an error > >> > > for some partition, fetch the upcoming offset / sequence number for > >> > > that partition, and discard all messages with a smaller offset on > the > >> > > client before republishing.) > >> > > > >> > > I think this makes the erroring behaviour more general, though > >> > > deduplicating saves a roundtrip or two at conflict time. > >> > > > >> > > I'm less clear about the behaviour of the generation id, or what > >> > > happens when (say) two producers with the same generation id are > spun > >> > > up at the same time. I'd be interested in hearing other folks' > >> > > comments on this. > >> > > > >> > > Ewen: I'm not sure I understand the questions well enough to answer > >> > > properly, but some quick notes: > >> > > - I don't think it makes sense to assign an expected offset without > >> > > already having assigned a partition. If the producer code is doing > the > >> > > partition assignment, it should probably do the offset assignment > >> > > too... or we could just let application code handle both. > >> > > - I'm not aware of any case where reassigning offsets to messages > >> > > automatically after an offset mismatch makes sense: in the cases > we've > >> > > discussed, it seems like either it's safe to drop duplicates, or we > >> > > want to handle the error at the application level. > >> > > > >> > > I'm going to try and come with an idempotent-producer-type example > >> > > that works with the draft patch in the next few days, so hopefully > >> > > we'll have something more concrete to discuss. Otherwise -- if you > >> > > have a clear idea of how eg. sequence number assignment would work > in > >> > > the idempotent-producer proposal, we could probably translate that > >> > > over to get the equivalent for the conditional publish API. > >> > > > >> > > > >> > > > >> > > On Fri, Jul 24, 2015 at 2:16 AM, Ewen Cheslack-Postava > >> > > <e...@confluent.io> wrote: > >> > > > @Becket - for compressed batches, I think this just works out > given > >> the > >> > > KIP > >> > > > as described. Without the change you're referring to, it still > only > >> > makes > >> > > > sense to batch messages with this KIP if all the expected offsets > are > >> > > > sequential (else some messages are guaranteed to fail). I think > that > >> > > > probably just works out, but raises an issue I brought up on the > KIP > >> > > call. > >> > > > > >> > > > Batching can be a bit weird with this proposal. If you try to > write > >> > key A > >> > > > and key B, the second operation is dependent on the first. Which > >> means > >> > to > >> > > > make an effective client for this, we need to keep track of > >> > per-partition > >> > > > offsets so we can set expected offsets properly. For example, if A > >> was > >> > > > expected to publish at offset 10, then if B was published to the > same > >> > > > partition, we need to make sure it's marked as expected offset 11 > >> > > (assuming > >> > > > no subpartition high water marks). We either need to have the > >> > application > >> > > > keep track of this itself and set the offsets, which requires > that it > >> > > know > >> > > > about how keys map to partitions, or the client needs to manage > this > >> > > > process. But if the client manages it, I think the client gets > quite > >> a > >> > > bit > >> > > > more complicated. If the produce request containing A fails, what > >> > happens > >> > > > to B? Are there retries that somehow update the expected offset, > or > >> do > >> > we > >> > > > just give up since we know it's always going to fail with the > >> expected > >> > > > offset that was automatically assigned to it? > >> > > > > >> > > > One way to handle this is to use Yasuhiro's idea of increasing the > >> > > > granularity of high watermarks using subpartitions. But I guess my > >> > > question > >> > > > is: if one producer client is writing many keys, and some of those > >> keys > >> > > are > >> > > > produced to the same partition, and those messages are batched, > what > >> > > > happens? Do we end up with lots of failed messages? Or do we have > >> > > > complicated logic in the producer to figure out what the right > >> expected > >> > > > offset for each message is? Or do they all share the same base > >> expected > >> > > > offset as in the compressed case, in which case they all share the > >> same > >> > > > fate and subpartitioning doesn't help? Or is there a simpler > solution > >> > I'm > >> > > > just not seeing? Maybe this just disables batching entirely and > >> > > throughput > >> > > > isn't an issue in these cases? > >> > > > > >> > > > Sorry, I know that's probably not entirely clear, but that's > because > >> > I'm > >> > > > very uncertain of how batching works with this KIP. > >> > > > > >> > > > > >> > > > On how this relates to other proposals: I think it might also be > >> > helpful > >> > > to > >> > > > get an overview of all the proposals for relevant modifications to > >> > > > producers/produce requests since many of these proposals are > possibly > >> > > > alternatives (though some may not be mutually exclusive). Many > people > >> > > don't > >> > > > have all the context from the past couple of years of the project. > >> Are > >> > > > there any other relevant wikis or docs besides the following? > >> > > > > >> > > > > >> https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata > >> > > > > >> > > > -Ewen > >> > > > > >> > > > On Wed, Jul 22, 2015 at 11:18 AM, Gwen Shapira < > >> gshap...@cloudera.com> > >> > > > wrote: > >> > > > > >> > > >> Tangent: I think we should complete the move of Produce / Fetch > RPC > >> to > >> > > >> the client libraries before we add more revisions to this > protocol. > >> > > >> > >> > > >> On Wed, Jul 22, 2015 at 11:02 AM, Jiangjie Qin > >> > > >> <j...@linkedin.com.invalid> wrote: > >> > > >> > I missed yesterday's KIP hangout. I'm currently working on > another > >> > KIP > >> > > >> for > >> > > >> > enriched metadata of messages. Guozhang has already created a > wiki > >> > > page > >> > > >> > before ( > >> > > >> > > >> > > >> > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata > >> > > >> ). > >> > > >> > We plan to fill the relative offset to the offset field in the > >> batch > >> > > sent > >> > > >> > by producer to avoid broker side re-compression. The message > >> offset > >> > > would > >> > > >> > become batch base offset + relative offset. I guess maybe the > >> > expected > >> > > >> > offset in KIP-27 can be only set for base offset? Would that > >> affect > >> > > >> certain > >> > > >> > use cases? > >> > > >> > > >> > > >> > For Jun's comments, I am not sure I completely get it. I think > the > >> > > >> producer > >> > > >> > only sends one batch per partition in a request. So either that > >> > batch > >> > > is > >> > > >> > appended or not. Why a batch would be partially committed? > >> > > >> > > >> > > >> > Thanks, > >> > > >> > > >> > > >> > Jiangjie (Becket) Qin > >> > > >> > > >> > > >> > On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin <b...@kirw.in> > wrote: > >> > > >> > > >> > > >> >> That's a fair point. I've added some imagined job logic to the > >> KIP, > >> > > so > >> > > >> >> we can make sure the proposal stays in sync with the usages > we're > >> > > >> >> discussing. (The logic is just a quick sketch for now -- I > expect > >> > > I'll > >> > > >> >> need to elaborate it as we get into more detail, or to address > >> > other > >> > > >> >> concerns...) > >> > > >> >> > >> > > >> >> On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao <j...@confluent.io> > >> > wrote: > >> > > >> >> > For 1, yes, when there is a transient leader change, it's > >> > > guaranteed > >> > > >> >> that a > >> > > >> >> > prefix of the messages in a request will be committed. > However, > >> > it > >> > > >> seems > >> > > >> >> > that the client needs to know what subset of messages are > >> > > committed in > >> > > >> >> > order to resume the sending. Then the question is how. > >> > > >> >> > > >> > > >> >> > As Flavio indicated, for the use cases that you listed, it > >> would > >> > be > >> > > >> >> useful > >> > > >> >> > to figure out the exact logic by using this feature. For > >> example, > >> > > in > >> > > >> the > >> > > >> >> > partition K/V store example, when we fail over to a new > writer > >> to > >> > > the > >> > > >> >> > commit log, the zombie writer can publish new messages to > the > >> log > >> > > >> after > >> > > >> >> the > >> > > >> >> > new writer takes over, but before it publishes any message. > We > >> > > >> probably > >> > > >> >> > need to outline how this case can be handled properly. > >> > > >> >> > > >> > > >> >> > Thanks, > >> > > >> >> > > >> > > >> >> > Jun > >> > > >> >> > > >> > > >> >> > On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin <b...@kirw.in> > >> > wrote: > >> > > >> >> > > >> > > >> >> >> Hi Jun, > >> > > >> >> >> > >> > > >> >> >> Thanks for the close reading! Responses inline. > >> > > >> >> >> > >> > > >> >> >> > Thanks for the write-up. The single producer use case you > >> > > mentioned > >> > > >> >> makes > >> > > >> >> >> > sense. It would be useful to include that in the KIP > wiki. > >> > > >> >> >> > >> > > >> >> >> Great -- I'll make sure that the wiki is clear about this. > >> > > >> >> >> > >> > > >> >> >> > 1. What happens when the leader of the partition changes > in > >> > the > >> > > >> middle > >> > > >> >> >> of a > >> > > >> >> >> > produce request? In this case, the producer client is not > >> sure > >> > > >> whether > >> > > >> >> >> the > >> > > >> >> >> > request succeeds or not. If there is only a single > message > >> in > >> > > the > >> > > >> >> >> request, > >> > > >> >> >> > the producer can just resend the request. If it sees an > >> > > >> OffsetMismatch > >> > > >> >> >> > error, it knows that the previous send actually succeeded > >> and > >> > > can > >> > > >> >> proceed > >> > > >> >> >> > with the next write. This is nice since it not only > allows > >> the > >> > > >> >> producer > >> > > >> >> >> to > >> > > >> >> >> > proceed during transient failures in the broker, it also > >> > avoids > >> > > >> >> >> duplicates > >> > > >> >> >> > during producer resend. One caveat is when there are > >> multiple > >> > > >> >> messages in > >> > > >> >> >> > the same partition in a produce request. The issue is > that > >> in > >> > > our > >> > > >> >> current > >> > > >> >> >> > replication protocol, it's possible for some, but not all > >> > > messages > >> > > >> in > >> > > >> >> the > >> > > >> >> >> > request to be committed. This makes resend a bit harder > to > >> > deal > >> > > >> with > >> > > >> >> >> since > >> > > >> >> >> > on receiving an OffsetMismatch error, it's not clear > which > >> > > messages > >> > > >> >> have > >> > > >> >> >> > been committed. One possibility is to expect that > >> compression > >> > is > >> > > >> >> enabled, > >> > > >> >> >> > in which case multiple messages are compressed into a > single > >> > > >> message. > >> > > >> >> I > >> > > >> >> >> was > >> > > >> >> >> > thinking that another possibility is for the broker to > >> return > >> > > the > >> > > >> >> current > >> > > >> >> >> > high watermark when sending an OffsetMismatch error. > Based > >> on > >> > > this > >> > > >> >> info, > >> > > >> >> >> > the producer can resend the subset of messages that have > not > >> > > been > >> > > >> >> >> > committed. However, this may not work in a compacted > topic > >> > since > >> > > >> there > >> > > >> >> >> can > >> > > >> >> >> > be holes in the offset. > >> > > >> >> >> > >> > > >> >> >> This is a excellent question. It's my understanding that at > >> > least > >> > > a > >> > > >> >> >> *prefix* of messages will be committed (right?) -- which > seems > >> > to > >> > > be > >> > > >> >> >> enough for many cases. I'll try and come up with a more > >> concrete > >> > > >> >> >> answer here. > >> > > >> >> >> > >> > > >> >> >> > 2. Is this feature only intended to be used with ack = > all? > >> > The > >> > > >> client > >> > > >> >> >> > doesn't get the offset with ack = 0. With ack = 1, it's > >> > possible > >> > > >> for a > >> > > >> >> >> > previously acked message to be lost during leader > >> transition, > >> > > which > >> > > >> >> will > >> > > >> >> >> > make the client logic more complicated. > >> > > >> >> >> > >> > > >> >> >> It's true that acks = 0 doesn't seem to be particularly > >> useful; > >> > in > >> > > >> all > >> > > >> >> >> the cases I've come across, the client eventually wants to > >> know > >> > > about > >> > > >> >> >> the mismatch error. However, it seems like there are some > >> cases > >> > > where > >> > > >> >> >> acks = 1 would be fine -- eg. in a bulk load of a fixed > >> dataset, > >> > > >> >> >> losing messages during a leader transition just means you > need > >> > to > >> > > >> >> >> rewind / restart the load, which is not especially > >> catastrophic. > >> > > For > >> > > >> >> >> many other interesting cases, acks = all is probably > >> preferable. > >> > > >> >> >> > >> > > >> >> >> > 3. How does the producer client know the offset to send > the > >> > > first > >> > > >> >> >> message? > >> > > >> >> >> > Do we need to expose an API in producer to get the > current > >> > high > >> > > >> >> >> watermark? > >> > > >> >> >> > >> > > >> >> >> You're right, it might be irritating to have to go through > the > >> > > >> >> >> consumer API just for this. There are some cases where the > >> > offsets > >> > > >> are > >> > > >> >> >> already available -- like the commit-log-for-KV-store > example > >> -- > >> > > but > >> > > >> >> >> in general, being able to get the offsets from the producer > >> > > interface > >> > > >> >> >> does sound convenient. > >> > > >> >> >> > >> > > >> >> >> > We plan to have a KIP discussion meeting tomorrow at 11am > >> PST. > >> > > >> Perhaps > >> > > >> >> >> you > >> > > >> >> >> > can describe this KIP a bit then? > >> > > >> >> >> > >> > > >> >> >> Sure, happy to join. > >> > > >> >> >> > >> > > >> >> >> > Thanks, > >> > > >> >> >> > > >> > > >> >> >> > Jun > >> > > >> >> >> > > >> > > >> >> >> > > >> > > >> >> >> > > >> > > >> >> >> > On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin < > b...@kirw.in> > >> > > wrote: > >> > > >> >> >> > > >> > > >> >> >> >> Just wanted to flag a little discussion that happened on > >> the > >> > > >> ticket: > >> > > >> >> >> >> > >> > > >> >> >> >> > >> > > >> >> >> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259 > >> > > >> >> >> >> > >> > > >> >> >> >> In particular, Yasuhiro Matsuda proposed an interesting > >> > > variant on > >> > > >> >> >> >> this that performs the offset check on the message key > >> > > (instead of > >> > > >> >> >> >> just the partition), with bounded space requirements, at > >> the > >> > > cost > >> > > >> of > >> > > >> >> >> >> potentially some spurious failures. (ie. the produce > >> request > >> > > may > >> > > >> fail > >> > > >> >> >> >> even if that particular key hasn't been updated > recently.) > >> > This > >> > > >> >> >> >> addresses a couple of the drawbacks of the per-key > approach > >> > > >> mentioned > >> > > >> >> >> >> at the bottom of the KIP. > >> > > >> >> >> >> > >> > > >> >> >> >> On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin < > b...@kirw.in> > >> > > wrote: > >> > > >> >> >> >> > Hi all, > >> > > >> >> >> >> > > >> > > >> >> >> >> > So, perhaps it's worth adding a couple specific > examples > >> of > >> > > >> where > >> > > >> >> this > >> > > >> >> >> >> > feature is useful, to make this a bit more concrete: > >> > > >> >> >> >> > > >> > > >> >> >> >> > - Suppose I'm using Kafka as a commit log for a > >> partitioned > >> > > KV > >> > > >> >> store, > >> > > >> >> >> >> > like Samza or Pistachio (?) do. We bootstrap the > process > >> > > state > >> > > >> by > >> > > >> >> >> >> > reading from that partition, and log all state > updates to > >> > > that > >> > > >> >> >> >> > partition when we're running. Now imagine that one of > my > >> > > >> processes > >> > > >> >> >> >> > locks up -- GC or similar -- and the system > transitions > >> > that > >> > > >> >> partition > >> > > >> >> >> >> > over to another node. When the GC is finished, the old > >> > > 'owner' > >> > > >> of > >> > > >> >> that > >> > > >> >> >> >> > partition might still be trying to write to the commit > >> log > >> > at > >> > > >> the > >> > > >> >> same > >> > > >> >> >> >> > as the new one is. A process might detect this by > >> noticing > >> > > that > >> > > >> the > >> > > >> >> >> >> > offset of the published message is bigger than it > thought > >> > the > >> > > >> >> upcoming > >> > > >> >> >> >> > offset was, which implies someone else has been > writing > >> to > >> > > the > >> > > >> >> log... > >> > > >> >> >> >> > but by then it's too late, and the commit log is > already > >> > > >> corrupt. > >> > > >> >> With > >> > > >> >> >> >> > a 'conditional produce', one of those processes will > have > >> > > it's > >> > > >> >> publish > >> > > >> >> >> >> > request refused -- so we've avoided corrupting the > state. > >> > > >> >> >> >> > > >> > > >> >> >> >> > - Envision some copycat-like system, where we have > some > >> > > sharded > >> > > >> >> >> >> > postgres setup and we're tailing each shard into its > own > >> > > >> partition. > >> > > >> >> >> >> > Normally, it's fairly easy to avoid duplicates here: > we > >> can > >> > > >> track > >> > > >> >> >> >> > which offset in the WAL corresponds to which offset in > >> > Kafka, > >> > > >> and > >> > > >> >> we > >> > > >> >> >> >> > know how many messages we've written to Kafka > already, so > >> > the > >> > > >> >> state is > >> > > >> >> >> >> > very simple. However, it is possible that for a > moment -- > >> > > due to > >> > > >> >> >> >> > rebalancing or operator error or some other thing -- > two > >> > > >> different > >> > > >> >> >> >> > nodes are tailing the same postgres shard at once! > >> Normally > >> > > this > >> > > >> >> would > >> > > >> >> >> >> > introduce duplicate messages, but by specifying the > >> > expected > >> > > >> >> offset, > >> > > >> >> >> >> > we can avoid this. > >> > > >> >> >> >> > > >> > > >> >> >> >> > So perhaps it's better to say that this is useful > when a > >> > > single > >> > > >> >> >> >> > producer is *expected*, but multiple producers are > >> > > *possible*? > >> > > >> (In > >> > > >> >> the > >> > > >> >> >> >> > same way that the high-level consumer normally has 1 > >> > consumer > >> > > >> in a > >> > > >> >> >> >> > group reading from a partition, but there are small > >> windows > >> > > >> where > >> > > >> >> more > >> > > >> >> >> >> > than one might be reading at the same time.) This is > also > >> > the > >> > > >> >> spirit > >> > > >> >> >> >> > of the 'runtime cost' comment -- in the common case, > >> where > >> > > >> there is > >> > > >> >> >> >> > little to no contention, there's no performance > overhead > >> > > >> either. I > >> > > >> >> >> >> > mentioned this a little in the Motivation section -- > >> maybe > >> > I > >> > > >> should > >> > > >> >> >> >> > flesh that out a little bit? > >> > > >> >> >> >> > > >> > > >> >> >> >> > For me, the motivation to work this up was that I kept > >> > > running > >> > > >> into > >> > > >> >> >> >> > cases, like the above, where the existing API was > >> > > >> >> almost-but-not-quite > >> > > >> >> >> >> > enough to give the guarantees I was looking for -- and > >> the > >> > > >> >> extension > >> > > >> >> >> >> > needed to handle those cases too was pretty small and > >> > > >> >> natural-feeling. > >> > > >> >> >> >> > > >> > > >> >> >> >> > On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh < > >> > > >> asi...@cloudera.com > >> > > >> >> > > >> > > >> >> >> >> wrote: > >> > > >> >> >> >> >> Good concept. I have a question though. > >> > > >> >> >> >> >> > >> > > >> >> >> >> >> Say there are two producers A and B. Both producers > are > >> > > >> producing > >> > > >> >> to > >> > > >> >> >> >> same > >> > > >> >> >> >> >> partition. > >> > > >> >> >> >> >> - A sends a message with expected offset, x1 > >> > > >> >> >> >> >> - Broker accepts is and sends an Ack > >> > > >> >> >> >> >> - B sends a message with expected offset, x1 > >> > > >> >> >> >> >> - Broker rejects it, sends nack > >> > > >> >> >> >> >> - B sends message again with expected offset, x1+1 > >> > > >> >> >> >> >> - Broker accepts it and sends Ack > >> > > >> >> >> >> >> I guess this is what this KIP suggests, right? If > yes, > >> > then > >> > > how > >> > > >> >> does > >> > > >> >> >> >> this > >> > > >> >> >> >> >> ensure that same message will not be written twice > when > >> > two > >> > > >> >> producers > >> > > >> >> >> >> are > >> > > >> >> >> >> >> producing to same partition? Producer on receiving a > >> nack > >> > > will > >> > > >> try > >> > > >> >> >> again > >> > > >> >> >> >> >> with next offset and will keep doing so till the > message > >> > is > >> > > >> >> accepted. > >> > > >> >> >> >> Am I > >> > > >> >> >> >> >> missing something? > >> > > >> >> >> >> >> > >> > > >> >> >> >> >> Also, you have mentioned on KIP, "it imposes little > to > >> no > >> > > >> runtime > >> > > >> >> >> cost > >> > > >> >> >> >> in > >> > > >> >> >> >> >> memory or time", I think that is not true for time. > With > >> > > this > >> > > >> >> >> approach > >> > > >> >> >> >> >> producers' performance will reduce proportionally to > >> > number > >> > > of > >> > > >> >> >> producers > >> > > >> >> >> >> >> writing to same partition. Please correct me if I am > >> > missing > >> > > >> out > >> > > >> >> >> >> something. > >> > > >> >> >> >> >> > >> > > >> >> >> >> >> > >> > > >> >> >> >> >> On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat < > >> > > >> >> >> >> >> gharatmayures...@gmail.com> wrote: > >> > > >> >> >> >> >> > >> > > >> >> >> >> >>> If we have 2 producers producing to a partition, > they > >> can > >> > > be > >> > > >> out > >> > > >> >> of > >> > > >> >> >> >> order, > >> > > >> >> >> >> >>> then how does one producer know what offset to > expect > >> as > >> > it > >> > > >> does > >> > > >> >> not > >> > > >> >> >> >> >>> interact with other producer? > >> > > >> >> >> >> >>> > >> > > >> >> >> >> >>> Can you give an example flow that explains how it > works > >> > > with > >> > > >> >> single > >> > > >> >> >> >> >>> producer and with multiple producers? > >> > > >> >> >> >> >>> > >> > > >> >> >> >> >>> > >> > > >> >> >> >> >>> Thanks, > >> > > >> >> >> >> >>> > >> > > >> >> >> >> >>> Mayuresh > >> > > >> >> >> >> >>> > >> > > >> >> >> >> >>> On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira < > >> > > >> >> >> >> >>> fpjunque...@yahoo.com.invalid> wrote: > >> > > >> >> >> >> >>> > >> > > >> >> >> >> >>> > I like this feature, it reminds me of conditional > >> > > updates in > >> > > >> >> >> >> zookeeper. > >> > > >> >> >> >> >>> > I'm not sure if it'd be best to have some > mechanism > >> for > >> > > >> fencing > >> > > >> >> >> >> rather > >> > > >> >> >> >> >>> than > >> > > >> >> >> >> >>> > a conditional write like you're proposing. The > reason > >> > I'm > >> > > >> >> saying > >> > > >> >> >> >> this is > >> > > >> >> >> >> >>> > that the conditional write applies to requests > >> > > individually, > >> > > >> >> >> while it > >> > > >> >> >> >> >>> > sounds like you want to make sure that there is a > >> > single > >> > > >> client > >> > > >> >> >> >> writing > >> > > >> >> >> >> >>> so > >> > > >> >> >> >> >>> > over multiple requests. > >> > > >> >> >> >> >>> > > >> > > >> >> >> >> >>> > -Flavio > >> > > >> >> >> >> >>> > > >> > > >> >> >> >> >>> > > On 17 Jul 2015, at 07:30, Ben Kirwin < > b...@kirw.in> > >> > > wrote: > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> >>> > > Hi there, > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> >>> > > I just added a KIP for a 'conditional publish' > >> > > operation: > >> > > >> a > >> > > >> >> >> simple > >> > > >> >> >> >> >>> > > CAS-like mechanism for the Kafka producer. The > wiki > >> > > page > >> > > >> is > >> > > >> >> >> here: > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> >>> > > >> > > >> >> >> >> >>> > >> > > >> >> >> >> > >> > > >> >> >> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> >>> > > And there's some previous discussion on the > ticket > >> > and > >> > > the > >> > > >> >> users > >> > > >> >> >> >> list: > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> >>> > > > https://issues.apache.org/jira/browse/KAFKA-2260 > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> >>> > > >> > > >> >> >> >> >>> > >> > > >> >> >> >> > >> > > >> >> >> > >> > > >> >> > >> > > >> > >> > > > >> > > >> > https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> >>> > > As always, comments and suggestions are very > >> welcome. > >> > > >> >> >> >> >>> > > > >> > > >> >> >> >> >>> > > Thanks, > >> > > >> >> >> >> >>> > > Ben > >> > > >> >> >> >> >>> > > >> > > >> >> >> >> >>> > > >> > > >> >> >> >> >>> > >> > > >> >> >> >> >>> > >> > > >> >> >> >> >>> -- > >> > > >> >> >> >> >>> -Regards, > >> > > >> >> >> >> >>> Mayuresh R. Gharat > >> > > >> >> >> >> >>> (862) 250-7125 > >> > > >> >> >> >> >>> > >> > > >> >> >> >> >> > >> > > >> >> >> >> >> > >> > > >> >> >> >> >> > >> > > >> >> >> >> >> -- > >> > > >> >> >> >> >> > >> > > >> >> >> >> >> Regards, > >> > > >> >> >> >> >> Ashish > >> > > >> >> >> >> > >> > > >> >> >> > >> > > >> >> > >> > > >> > >> > > > > >> > > > > >> > > > > >> > > > -- > >> > > > Thanks, > >> > > > Ewen > >> > > > >> > > >> >