This is a very nice summary of the consistency / correctness issues possible with a commit log.
> (assuming it’s publishing asynchronously and in an open loop) It's perhaps already clear to folks here, but -- if you *don't* do that, and instead only send one batch of messages at a time and check the result, you don't have the interleaving issue. (Of course, that means you give up pipelining batches...) On Aug 10, 2015 2:46 PM, "Flavio Junqueira" <f...@apache.org> wrote: > I've been trying to understand what is being proposed in this KIP and I've > put down some notes with some feedback from Ben that I wanted to share for > feedback. I'm not really following the flow of the thread, since I've read > a few sources to get to this, and I apologize for that. > > Here is how I see it t a high level. There are really two problems being > discussed in the context of this KIP: > Single writer with failover: > Consistent logs > > Single writer with failover > The idea is that at any time there must be at most one publisher active. > To get high availability, we can’t rely on a single process to be such a > publisher and consequently we need the failover part: if the current active > publisher crashes, then another publisher takes over and becomes active. > One important issue with scenarios like this is that during transitions > from one active publisher to another, there could be races and two > publishers end up interleaving messages in a topic/partition/key. > > Why is this interleaving bad? This is really application specific, but one > general way of seeing this is that only one process has the authoritative > application state to generate messages to publish. Transitioning from an > active publisher to another, typically requires recovering state or > performing some kind of coordination. If no such recovery is required, then > we are essentially in the multi-writer space. The commit log use case is a > general one mentioned in the KIP description. > > Consistent logs > Consistent logs might not be the best term here, but I’m using it to > describe the need of having the messages in a topic/partition/key > reflecting consistently the state of the application. For example, some > applications might be OK with a published sequence: > > A B B C (e.g., value = 10) > > in the case the messages are idempotent operations, but others might > really require: > > A B C (e.g., value += 10) > > if they aren’t idempotent operations. Order and gaps are also an issue, so > some applications might be OK with: > > A C B (e.g., value += x) > > and skipping B altogether might be ok if B has no side-effects (e.g., > operation associated to B has failed). > > Putting things together > The current KIP-27 proposal seems to do a good job with providing a > consistent log in the absence of concurrency. It enables publishers to > re-publish messages without duplication, which is one requirement for > exactly-once semantics. Gaps need to be handled by the publisher. For > example, if the publisher publishes A B C (assuming it’s publishing > asynchronously and in an open loop), it could have A succeeding but not B > and C. In this case, it needs to redo the publish of B and C. It could also > have B failing and C succeeding, in which case the publisher repeats B and > C. > > A really nice feature of the current proposal is that it is a simple > primitive that enables the implementation of publishers with different > delivery guarantees. It doesn’t seem to be well suited to the first problem > of implementing a single writer with failover, however. It allows runs in > which two producers interleave messages because the mechanism focuses on a > single message. The single writer might not even care about duplicates and > gaps depending on the application, but it might care that there aren’t two > publishers interleaving messages in the Kafka log. > > A typical way of dealing with these cases is to use a token associated to > a lease to fence off the other publishers. For example, to demote an active > publisher, another publisher could invoke a demote call and have the ISR > leader replace the token. The lease of the token could be done directly > with ZooKeeper or via the ISR leader. The condition to publish a message or > a batch could be a combination of token verification and offset check. > > -Flavio > > > On 10 Aug 2015, at 00:15, Jun Rao <j...@confluent.io> wrote: > > > > Couple of other things. > > > > A. In the discussion, we talked about the usage of getting the latest > high > > watermark from the broker. Currently, the high watermark in a partition > can > > go back a bit for a short period of time during leader change. So, the > high > > watermark returned in the getOffset api is not 100% accurate. There is a > > jira (KAFKA-2334) to track this issue. > > > > B. The proposal in the wiki is to put the expected offset in every > message, > > even when the messages are compressed. With Jiangjie's proposal of > relative > > offset, the expected offset probably can only be set at the shallow > > compressed message level. We will need to think this through. > > > > Thanks, > > > > Jun > > > > > > > > On Tue, Aug 4, 2015 at 3:05 PM, Jiangjie Qin <j...@linkedin.com.invalid> > > wrote: > > > >> Jun, I see. So this only applies to uncompressed messages. Maybe that is > >> fine given most user will probably turn on compression? > >> I think the first approach is a more general approach but from > application > >> point of view might harder to implement. I am thinking is it easier for > the > >> application simply have one producer for a partition and hash the > message > >> to producer. In that case, we can use the second approach but still have > >> multiple producers. The downside might be potentially more memory > >> footprint? We might also need to think about the fault tolerance a > little > >> bit more. > >> > >> Ben, I agree when everything goes fine, having pipeline turned on is > >> probably fine. But if we take leader migration, broker down, message > >> appended to leader but not follower, etc, etc into consideration, it is > not > >> clear to me how the conditional publish will still provide its guarantee > >> without enforcing those strict settings. > >> > >> Thanks, > >> > >> Jiangjie (Becket) Qin > >> > >> > >> > >> On Mon, Aug 3, 2015 at 9:55 PM, Jun Rao <j...@confluent.io> wrote: > >> > >>> A couple of thoughts on the commit log use case. Suppose that we want > to > >>> maintain multiple replicas of a K/V store backed by a shared Kafka > >>> topic/partition as a commit log. There are two possible ways to use > Kafka > >>> as a commit log. > >>> > >>> 1. The first approach allows multiple producers to publish to Kafka. > Each > >>> replica of the data store keeps reading from a Kafka topic/partition to > >>> refresh the replica's view. Every time a replica gets an update to a > key > >>> from a client, it combines the update and the current value of the key > in > >>> its view and generates a post-value. It then does a conditional publish > >> to > >>> Kafka with the post-value. The update is successful if the conditional > >>> publish succeeds. Otherwise, the replica has to recompute the > post-value > >>> (potentially after the replica's view is refreshed) and retry the > >>> conditional publish. A potential issue with this approach is when there > >> is > >>> a transient failure during publishing to Kafka (e.g., the leader of the > >>> partition changes). When this happens, the conditional publish will get > >> an > >>> error. The replica doesn't know whether the publish actually succeeded > or > >>> not. If we just blindly retry, it may not give the correct behavior > >> (e.g., > >>> we could be applying +1 twice). So, not sure if conditional publish > >> itself > >>> is enough for this approach. > >>> > >>> 2. The second approach allows only a single producer to publish to > Kafka. > >>> We somehow elect one of the replicas to be the master that handles all > >>> updates. Normally, we don't need conditional publish since there is a > >>> single producer. Conditional publish can potentially be used to deal > with > >>> duplicates. If the master encounters the same transient failure as the > >>> above, it can get the latest offset from the Kafka topic/partition to > see > >>> if the publish actually succeeded or not since it's the only producer. > A > >>> potential issue here is to handle the zombie master problem: if the > >> master > >>> has a soft failure and another master is elected, we need to prevent > the > >>> old master from publishing new data to Kafka. So, for this approach to > >> work > >>> properly, we need some kind of support of single writer in addition to > >>> conditional publish. > >>> > >>> > >>> Jiangjie, > >>> > >>> The issue with partial commit is the following. Say we have a batch of > 10 > >>> uncompressed messages sent to the leader. The followers only fetched > the > >>> first 5 messages and then the leader dies. In this case, we only > >> committed > >>> 5 out of the 10 messages. > >>> > >>> Thanks, > >>> > >>> Jun > >>> > >>> > >>> On Tue, Jul 28, 2015 at 1:16 AM, Daniel Schierbeck < > >>> daniel.schierb...@gmail.com> wrote: > >>> > >>>> Jiangjie: I think giving users the possibility of defining a custom > >>> policy > >>>> for handling rejections is a good idea. For instance, this will allow > >>> Kafka > >>>> to act as an event store in an Event Sourcing application. If the > >>> event(s) > >>>> are rejected by the store, the original command may need to be > >>> re-validated > >>>> against the new state. > >>>> > >>>> On Tue, Jul 28, 2015 at 1:27 AM Jiangjie Qin > <j...@linkedin.com.invalid > >>> > >>>> wrote: > >>>> > >>>>> @Ewen, good point about batching. Yes, it would be tricky if we want > >> to > >>>> do > >>>>> a per-key conditional produce. My understanding is that the > >>> prerequisite > >>>> of > >>>>> this KIP is: > >>>>> 1. Single producer for each partition. > >>>>> 2. Acks=-1, max.in.flight.request.per.connection=1, > >>>> retries=SOME_BIG_NUMBER > >>>>> > >>>>> The major problem it tries to solve is exact once produce, i.e. solve > >>> the > >>>>> duplicates from producer side. In that case, a batch will be > >> considered > >>>> as > >>>>> atomic. The only possibility of a batch got rejected should be it is > >>>>> already appended. So the producer should just move on. > >>>>> > >>>>> It looks to me even a transient multiple producer scenario will cause > >>>> issue > >>>>> because user need to think about what should do if a request got > >>> rejected > >>>>> and the answer varies for different use cases. > >>>>> > >>>>> Thanks, > >>>>> > >>>>> Jiangjie (Becket) Qin > >>>>> > >>>>> On Sun, Jul 26, 2015 at 11:54 AM, Ben Kirwin <b...@kirw.in> wrote: > >>>>> > >>>>>> So I had another look at the 'Idempotent Producer' proposal this > >>>>>> afternoon, and made a few notes on how I think they compare; if > >> I've > >>>>>> made any mistakes, I'd be delighted if someone with more context on > >>>>>> the idempotent producer design would correct me. > >>>>>> > >>>>>> As a first intuition, you can think of the 'conditional publish' > >>>>>> proposal as the special case of the 'idempotent producer' idea, > >> where > >>>>>> there's just a single producer per-partition. The key observation > >>> here > >>>>>> is: if there's only one producer, you can conflate the 'sequence > >>>>>> number' and the expected offset. The conditional publish proposal > >>> uses > >>>>>> existing Kafka offset APIs for roughly the same things as the > >>>>>> idempotent producer proposal uses sequence numbers for -- eg. > >> instead > >>>>>> of having a "lease PID" API that returns the current sequence > >> number, > >>>>>> we can use the existing 'offset API' to retrieve the upcoming > >> offset. > >>>>>> > >>>>>> Both proposals attempt to deal with the situation where there are > >>>>>> transiently multiple publishers for the same partition (and PID). > >> The > >>>>>> idempotent producer setup tracks a generation id for each pid, and > >>>>>> discards any writes with a generation id smaller than the latest > >>>>>> value. Conditional publish is 'first write wins' -- and instead of > >>>>>> dropping duplicates on the server, it returns an error to the > >> client. > >>>>>> The duplicate-handling behaviour (dropping vs. erroring) has some > >>>>>> interesting consequences: > >>>>>> > >>>>>> - If all producers are producing the same stream of messages, > >>> silently > >>>>>> dropping duplicates on the server is more convenient. (Suppose we > >>> have > >>>>>> a batch of messages 0-9, and the high-water mark on the server is > >> 7. > >>>>>> Idempotent producer, as I read it, would append 7-9 to the > >> partition > >>>>>> and return success; meanwhile, conditional publish would fail the > >>>>>> entire batch.) > >>>>>> > >>>>>> - If producers might be writing different streams of messages, the > >>>>>> proposed behaviour of the idempotent producer is probably worse -- > >>>>>> since it can silently interleave messages from two different > >>>>>> producers. This can be a problem for some commit-log style > >> use-cases, > >>>>>> since it can transform a valid series of operations into an invalid > >>>>>> one. > >>>>>> > >>>>>> - Given the error-on-duplicate behaviour, it's possible to > >> implement > >>>>>> deduplication on the client. (Sketch: if a publish returns an error > >>>>>> for some partition, fetch the upcoming offset / sequence number for > >>>>>> that partition, and discard all messages with a smaller offset on > >> the > >>>>>> client before republishing.) > >>>>>> > >>>>>> I think this makes the erroring behaviour more general, though > >>>>>> deduplicating saves a roundtrip or two at conflict time. > >>>>>> > >>>>>> I'm less clear about the behaviour of the generation id, or what > >>>>>> happens when (say) two producers with the same generation id are > >> spun > >>>>>> up at the same time. I'd be interested in hearing other folks' > >>>>>> comments on this. > >>>>>> > >>>>>> Ewen: I'm not sure I understand the questions well enough to answer > >>>>>> properly, but some quick notes: > >>>>>> - I don't think it makes sense to assign an expected offset without > >>>>>> already having assigned a partition. If the producer code is doing > >>> the > >>>>>> partition assignment, it should probably do the offset assignment > >>>>>> too... or we could just let application code handle both. > >>>>>> - I'm not aware of any case where reassigning offsets to messages > >>>>>> automatically after an offset mismatch makes sense: in the cases > >>> we've > >>>>>> discussed, it seems like either it's safe to drop duplicates, or we > >>>>>> want to handle the error at the application level. > >>>>>> > >>>>>> I'm going to try and come with an idempotent-producer-type example > >>>>>> that works with the draft patch in the next few days, so hopefully > >>>>>> we'll have something more concrete to discuss. Otherwise -- if you > >>>>>> have a clear idea of how eg. sequence number assignment would work > >> in > >>>>>> the idempotent-producer proposal, we could probably translate that > >>>>>> over to get the equivalent for the conditional publish API. > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Fri, Jul 24, 2015 at 2:16 AM, Ewen Cheslack-Postava > >>>>>> <e...@confluent.io> wrote: > >>>>>>> @Becket - for compressed batches, I think this just works out > >> given > >>>> the > >>>>>> KIP > >>>>>>> as described. Without the change you're referring to, it still > >> only > >>>>> makes > >>>>>>> sense to batch messages with this KIP if all the expected offsets > >>> are > >>>>>>> sequential (else some messages are guaranteed to fail). I think > >>> that > >>>>>>> probably just works out, but raises an issue I brought up on the > >>> KIP > >>>>>> call. > >>>>>>> > >>>>>>> Batching can be a bit weird with this proposal. If you try to > >> write > >>>>> key A > >>>>>>> and key B, the second operation is dependent on the first. Which > >>>> means > >>>>> to > >>>>>>> make an effective client for this, we need to keep track of > >>>>> per-partition > >>>>>>> offsets so we can set expected offsets properly. For example, if > >> A > >>>> was > >>>>>>> expected to publish at offset 10, then if B was published to the > >>> same > >>>>>>> partition, we need to make sure it's marked as expected offset 11 > >>>>>> (assuming > >>>>>>> no subpartition high water marks). We either need to have the > >>>>> application > >>>>>>> keep track of this itself and set the offsets, which requires > >> that > >>> it > >>>>>> know > >>>>>>> about how keys map to partitions, or the client needs to manage > >>> this > >>>>>>> process. But if the client manages it, I think the client gets > >>> quite > >>>> a > >>>>>> bit > >>>>>>> more complicated. If the produce request containing A fails, what > >>>>> happens > >>>>>>> to B? Are there retries that somehow update the expected offset, > >> or > >>>> do > >>>>> we > >>>>>>> just give up since we know it's always going to fail with the > >>>> expected > >>>>>>> offset that was automatically assigned to it? > >>>>>>> > >>>>>>> One way to handle this is to use Yasuhiro's idea of increasing > >> the > >>>>>>> granularity of high watermarks using subpartitions. But I guess > >> my > >>>>>> question > >>>>>>> is: if one producer client is writing many keys, and some of > >> those > >>>> keys > >>>>>> are > >>>>>>> produced to the same partition, and those messages are batched, > >>> what > >>>>>>> happens? Do we end up with lots of failed messages? Or do we have > >>>>>>> complicated logic in the producer to figure out what the right > >>>> expected > >>>>>>> offset for each message is? Or do they all share the same base > >>>> expected > >>>>>>> offset as in the compressed case, in which case they all share > >> the > >>>> same > >>>>>>> fate and subpartitioning doesn't help? Or is there a simpler > >>> solution > >>>>> I'm > >>>>>>> just not seeing? Maybe this just disables batching entirely and > >>>>>> throughput > >>>>>>> isn't an issue in these cases? > >>>>>>> > >>>>>>> Sorry, I know that's probably not entirely clear, but that's > >>> because > >>>>> I'm > >>>>>>> very uncertain of how batching works with this KIP. > >>>>>>> > >>>>>>> > >>>>>>> On how this relates to other proposals: I think it might also be > >>>>> helpful > >>>>>> to > >>>>>>> get an overview of all the proposals for relevant modifications > >> to > >>>>>>> producers/produce requests since many of these proposals are > >>> possibly > >>>>>>> alternatives (though some may not be mutually exclusive). Many > >>> people > >>>>>> don't > >>>>>>> have all the context from the past couple of years of the > >> project. > >>>> Are > >>>>>>> there any other relevant wikis or docs besides the following? > >>>>>>> > >>>>>>> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata > >>>>>>> > >>>>>>> -Ewen > >>>>>>> > >>>>>>> On Wed, Jul 22, 2015 at 11:18 AM, Gwen Shapira < > >>>> gshap...@cloudera.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Tangent: I think we should complete the move of Produce / Fetch > >>> RPC > >>>> to > >>>>>>>> the client libraries before we add more revisions to this > >>> protocol. > >>>>>>>> > >>>>>>>> On Wed, Jul 22, 2015 at 11:02 AM, Jiangjie Qin > >>>>>>>> <j...@linkedin.com.invalid> wrote: > >>>>>>>>> I missed yesterday's KIP hangout. I'm currently working on > >>> another > >>>>> KIP > >>>>>>>> for > >>>>>>>>> enriched metadata of messages. Guozhang has already created a > >>> wiki > >>>>>> page > >>>>>>>>> before ( > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata > >>>>>>>> ). > >>>>>>>>> We plan to fill the relative offset to the offset field in the > >>>> batch > >>>>>> sent > >>>>>>>>> by producer to avoid broker side re-compression. The message > >>>> offset > >>>>>> would > >>>>>>>>> become batch base offset + relative offset. I guess maybe the > >>>>> expected > >>>>>>>>> offset in KIP-27 can be only set for base offset? Would that > >>>> affect > >>>>>>>> certain > >>>>>>>>> use cases? > >>>>>>>>> > >>>>>>>>> For Jun's comments, I am not sure I completely get it. I think > >>> the > >>>>>>>> producer > >>>>>>>>> only sends one batch per partition in a request. So either > >> that > >>>>> batch > >>>>>> is > >>>>>>>>> appended or not. Why a batch would be partially committed? > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> > >>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>> > >>>>>>>>> On Tue, Jul 21, 2015 at 10:42 AM, Ben Kirwin <b...@kirw.in> > >>> wrote: > >>>>>>>>> > >>>>>>>>>> That's a fair point. I've added some imagined job logic to > >> the > >>>> KIP, > >>>>>> so > >>>>>>>>>> we can make sure the proposal stays in sync with the usages > >>> we're > >>>>>>>>>> discussing. (The logic is just a quick sketch for now -- I > >>> expect > >>>>>> I'll > >>>>>>>>>> need to elaborate it as we get into more detail, or to > >> address > >>>>> other > >>>>>>>>>> concerns...) > >>>>>>>>>> > >>>>>>>>>> On Tue, Jul 21, 2015 at 11:45 AM, Jun Rao <j...@confluent.io> > >>>>> wrote: > >>>>>>>>>>> For 1, yes, when there is a transient leader change, it's > >>>>>> guaranteed > >>>>>>>>>> that a > >>>>>>>>>>> prefix of the messages in a request will be committed. > >>> However, > >>>>> it > >>>>>>>> seems > >>>>>>>>>>> that the client needs to know what subset of messages are > >>>>>> committed in > >>>>>>>>>>> order to resume the sending. Then the question is how. > >>>>>>>>>>> > >>>>>>>>>>> As Flavio indicated, for the use cases that you listed, it > >>>> would > >>>>> be > >>>>>>>>>> useful > >>>>>>>>>>> to figure out the exact logic by using this feature. For > >>>> example, > >>>>>> in > >>>>>>>> the > >>>>>>>>>>> partition K/V store example, when we fail over to a new > >>> writer > >>>> to > >>>>>> the > >>>>>>>>>>> commit log, the zombie writer can publish new messages to > >> the > >>>> log > >>>>>>>> after > >>>>>>>>>> the > >>>>>>>>>>> new writer takes over, but before it publishes any message. > >>> We > >>>>>>>> probably > >>>>>>>>>>> need to outline how this case can be handled properly. > >>>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> > >>>>>>>>>>> Jun > >>>>>>>>>>> > >>>>>>>>>>> On Mon, Jul 20, 2015 at 12:05 PM, Ben Kirwin <b...@kirw.in> > >>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi Jun, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for the close reading! Responses inline. > >>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the write-up. The single producer use case > >> you > >>>>>> mentioned > >>>>>>>>>> makes > >>>>>>>>>>>>> sense. It would be useful to include that in the KIP > >> wiki. > >>>>>>>>>>>> > >>>>>>>>>>>> Great -- I'll make sure that the wiki is clear about this. > >>>>>>>>>>>> > >>>>>>>>>>>>> 1. What happens when the leader of the partition changes > >>> in > >>>>> the > >>>>>>>> middle > >>>>>>>>>>>> of a > >>>>>>>>>>>>> produce request? In this case, the producer client is > >> not > >>>> sure > >>>>>>>> whether > >>>>>>>>>>>> the > >>>>>>>>>>>>> request succeeds or not. If there is only a single > >> message > >>>> in > >>>>>> the > >>>>>>>>>>>> request, > >>>>>>>>>>>>> the producer can just resend the request. If it sees an > >>>>>>>> OffsetMismatch > >>>>>>>>>>>>> error, it knows that the previous send actually > >> succeeded > >>>> and > >>>>>> can > >>>>>>>>>> proceed > >>>>>>>>>>>>> with the next write. This is nice since it not only > >> allows > >>>> the > >>>>>>>>>> producer > >>>>>>>>>>>> to > >>>>>>>>>>>>> proceed during transient failures in the broker, it also > >>>>> avoids > >>>>>>>>>>>> duplicates > >>>>>>>>>>>>> during producer resend. One caveat is when there are > >>>> multiple > >>>>>>>>>> messages in > >>>>>>>>>>>>> the same partition in a produce request. The issue is > >> that > >>>> in > >>>>>> our > >>>>>>>>>> current > >>>>>>>>>>>>> replication protocol, it's possible for some, but not > >> all > >>>>>> messages > >>>>>>>> in > >>>>>>>>>> the > >>>>>>>>>>>>> request to be committed. This makes resend a bit harder > >> to > >>>>> deal > >>>>>>>> with > >>>>>>>>>>>> since > >>>>>>>>>>>>> on receiving an OffsetMismatch error, it's not clear > >> which > >>>>>> messages > >>>>>>>>>> have > >>>>>>>>>>>>> been committed. One possibility is to expect that > >>>> compression > >>>>> is > >>>>>>>>>> enabled, > >>>>>>>>>>>>> in which case multiple messages are compressed into a > >>> single > >>>>>>>> message. > >>>>>>>>>> I > >>>>>>>>>>>> was > >>>>>>>>>>>>> thinking that another possibility is for the broker to > >>>> return > >>>>>> the > >>>>>>>>>> current > >>>>>>>>>>>>> high watermark when sending an OffsetMismatch error. > >> Based > >>>> on > >>>>>> this > >>>>>>>>>> info, > >>>>>>>>>>>>> the producer can resend the subset of messages that have > >>> not > >>>>>> been > >>>>>>>>>>>>> committed. However, this may not work in a compacted > >> topic > >>>>> since > >>>>>>>> there > >>>>>>>>>>>> can > >>>>>>>>>>>>> be holes in the offset. > >>>>>>>>>>>> > >>>>>>>>>>>> This is a excellent question. It's my understanding that > >> at > >>>>> least > >>>>>> a > >>>>>>>>>>>> *prefix* of messages will be committed (right?) -- which > >>> seems > >>>>> to > >>>>>> be > >>>>>>>>>>>> enough for many cases. I'll try and come up with a more > >>>> concrete > >>>>>>>>>>>> answer here. > >>>>>>>>>>>> > >>>>>>>>>>>>> 2. Is this feature only intended to be used with ack = > >>> all? > >>>>> The > >>>>>>>> client > >>>>>>>>>>>>> doesn't get the offset with ack = 0. With ack = 1, it's > >>>>> possible > >>>>>>>> for a > >>>>>>>>>>>>> previously acked message to be lost during leader > >>>> transition, > >>>>>> which > >>>>>>>>>> will > >>>>>>>>>>>>> make the client logic more complicated. > >>>>>>>>>>>> > >>>>>>>>>>>> It's true that acks = 0 doesn't seem to be particularly > >>>> useful; > >>>>> in > >>>>>>>> all > >>>>>>>>>>>> the cases I've come across, the client eventually wants to > >>>> know > >>>>>> about > >>>>>>>>>>>> the mismatch error. However, it seems like there are some > >>>> cases > >>>>>> where > >>>>>>>>>>>> acks = 1 would be fine -- eg. in a bulk load of a fixed > >>>> dataset, > >>>>>>>>>>>> losing messages during a leader transition just means you > >>> need > >>>>> to > >>>>>>>>>>>> rewind / restart the load, which is not especially > >>>> catastrophic. > >>>>>> For > >>>>>>>>>>>> many other interesting cases, acks = all is probably > >>>> preferable. > >>>>>>>>>>>> > >>>>>>>>>>>>> 3. How does the producer client know the offset to send > >>> the > >>>>>> first > >>>>>>>>>>>> message? > >>>>>>>>>>>>> Do we need to expose an API in producer to get the > >> current > >>>>> high > >>>>>>>>>>>> watermark? > >>>>>>>>>>>> > >>>>>>>>>>>> You're right, it might be irritating to have to go through > >>> the > >>>>>>>>>>>> consumer API just for this. There are some cases where the > >>>>> offsets > >>>>>>>> are > >>>>>>>>>>>> already available -- like the commit-log-for-KV-store > >>> example > >>>> -- > >>>>>> but > >>>>>>>>>>>> in general, being able to get the offsets from the > >> producer > >>>>>> interface > >>>>>>>>>>>> does sound convenient. > >>>>>>>>>>>> > >>>>>>>>>>>>> We plan to have a KIP discussion meeting tomorrow at > >> 11am > >>>> PST. > >>>>>>>> Perhaps > >>>>>>>>>>>> you > >>>>>>>>>>>>> can describe this KIP a bit then? > >>>>>>>>>>>> > >>>>>>>>>>>> Sure, happy to join. > >>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Jun > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Sat, Jul 18, 2015 at 10:37 AM, Ben Kirwin < > >> b...@kirw.in > >>>> > >>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Just wanted to flag a little discussion that happened > >> on > >>>> the > >>>>>>>> ticket: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://issues.apache.org/jira/browse/KAFKA-2260?focusedCommentId=14632259&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14632259 > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> In particular, Yasuhiro Matsuda proposed an interesting > >>>>>> variant on > >>>>>>>>>>>>>> this that performs the offset check on the message key > >>>>>> (instead of > >>>>>>>>>>>>>> just the partition), with bounded space requirements, > >> at > >>>> the > >>>>>> cost > >>>>>>>> of > >>>>>>>>>>>>>> potentially some spurious failures. (ie. the produce > >>>> request > >>>>>> may > >>>>>>>> fail > >>>>>>>>>>>>>> even if that particular key hasn't been updated > >>> recently.) > >>>>> This > >>>>>>>>>>>>>> addresses a couple of the drawbacks of the per-key > >>> approach > >>>>>>>> mentioned > >>>>>>>>>>>>>> at the bottom of the KIP. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Fri, Jul 17, 2015 at 6:47 PM, Ben Kirwin < > >> b...@kirw.in > >>>> > >>>>>> wrote: > >>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> So, perhaps it's worth adding a couple specific > >>> examples > >>>> of > >>>>>>>> where > >>>>>>>>>> this > >>>>>>>>>>>>>>> feature is useful, to make this a bit more concrete: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> - Suppose I'm using Kafka as a commit log for a > >>>> partitioned > >>>>>> KV > >>>>>>>>>> store, > >>>>>>>>>>>>>>> like Samza or Pistachio (?) do. We bootstrap the > >>> process > >>>>>> state > >>>>>>>> by > >>>>>>>>>>>>>>> reading from that partition, and log all state > >> updates > >>> to > >>>>>> that > >>>>>>>>>>>>>>> partition when we're running. Now imagine that one of > >>> my > >>>>>>>> processes > >>>>>>>>>>>>>>> locks up -- GC or similar -- and the system > >> transitions > >>>>> that > >>>>>>>>>> partition > >>>>>>>>>>>>>>> over to another node. When the GC is finished, the > >> old > >>>>>> 'owner' > >>>>>>>> of > >>>>>>>>>> that > >>>>>>>>>>>>>>> partition might still be trying to write to the > >> commit > >>>> log > >>>>> at > >>>>>>>> the > >>>>>>>>>> same > >>>>>>>>>>>>>>> as the new one is. A process might detect this by > >>>> noticing > >>>>>> that > >>>>>>>> the > >>>>>>>>>>>>>>> offset of the published message is bigger than it > >>> thought > >>>>> the > >>>>>>>>>> upcoming > >>>>>>>>>>>>>>> offset was, which implies someone else has been > >> writing > >>>> to > >>>>>> the > >>>>>>>>>> log... > >>>>>>>>>>>>>>> but by then it's too late, and the commit log is > >>> already > >>>>>>>> corrupt. > >>>>>>>>>> With > >>>>>>>>>>>>>>> a 'conditional produce', one of those processes will > >>> have > >>>>>> it's > >>>>>>>>>> publish > >>>>>>>>>>>>>>> request refused -- so we've avoided corrupting the > >>> state. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> - Envision some copycat-like system, where we have > >> some > >>>>>> sharded > >>>>>>>>>>>>>>> postgres setup and we're tailing each shard into its > >>> own > >>>>>>>> partition. > >>>>>>>>>>>>>>> Normally, it's fairly easy to avoid duplicates here: > >> we > >>>> can > >>>>>>>> track > >>>>>>>>>>>>>>> which offset in the WAL corresponds to which offset > >> in > >>>>> Kafka, > >>>>>>>> and > >>>>>>>>>> we > >>>>>>>>>>>>>>> know how many messages we've written to Kafka > >> already, > >>> so > >>>>> the > >>>>>>>>>> state is > >>>>>>>>>>>>>>> very simple. However, it is possible that for a > >> moment > >>> -- > >>>>>> due to > >>>>>>>>>>>>>>> rebalancing or operator error or some other thing -- > >>> two > >>>>>>>> different > >>>>>>>>>>>>>>> nodes are tailing the same postgres shard at once! > >>>> Normally > >>>>>> this > >>>>>>>>>> would > >>>>>>>>>>>>>>> introduce duplicate messages, but by specifying the > >>>>> expected > >>>>>>>>>> offset, > >>>>>>>>>>>>>>> we can avoid this. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> So perhaps it's better to say that this is useful > >> when > >>> a > >>>>>> single > >>>>>>>>>>>>>>> producer is *expected*, but multiple producers are > >>>>>> *possible*? > >>>>>>>> (In > >>>>>>>>>> the > >>>>>>>>>>>>>>> same way that the high-level consumer normally has 1 > >>>>> consumer > >>>>>>>> in a > >>>>>>>>>>>>>>> group reading from a partition, but there are small > >>>> windows > >>>>>>>> where > >>>>>>>>>> more > >>>>>>>>>>>>>>> than one might be reading at the same time.) This is > >>> also > >>>>> the > >>>>>>>>>> spirit > >>>>>>>>>>>>>>> of the 'runtime cost' comment -- in the common case, > >>>> where > >>>>>>>> there is > >>>>>>>>>>>>>>> little to no contention, there's no performance > >>> overhead > >>>>>>>> either. I > >>>>>>>>>>>>>>> mentioned this a little in the Motivation section -- > >>>> maybe > >>>>> I > >>>>>>>> should > >>>>>>>>>>>>>>> flesh that out a little bit? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> For me, the motivation to work this up was that I > >> kept > >>>>>> running > >>>>>>>> into > >>>>>>>>>>>>>>> cases, like the above, where the existing API was > >>>>>>>>>> almost-but-not-quite > >>>>>>>>>>>>>>> enough to give the guarantees I was looking for -- > >> and > >>>> the > >>>>>>>>>> extension > >>>>>>>>>>>>>>> needed to handle those cases too was pretty small and > >>>>>>>>>> natural-feeling. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Fri, Jul 17, 2015 at 4:49 PM, Ashish Singh < > >>>>>>>> asi...@cloudera.com > >>>>>>>>>>> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> Good concept. I have a question though. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Say there are two producers A and B. Both producers > >>> are > >>>>>>>> producing > >>>>>>>>>> to > >>>>>>>>>>>>>> same > >>>>>>>>>>>>>>>> partition. > >>>>>>>>>>>>>>>> - A sends a message with expected offset, x1 > >>>>>>>>>>>>>>>> - Broker accepts is and sends an Ack > >>>>>>>>>>>>>>>> - B sends a message with expected offset, x1 > >>>>>>>>>>>>>>>> - Broker rejects it, sends nack > >>>>>>>>>>>>>>>> - B sends message again with expected offset, x1+1 > >>>>>>>>>>>>>>>> - Broker accepts it and sends Ack > >>>>>>>>>>>>>>>> I guess this is what this KIP suggests, right? If > >> yes, > >>>>> then > >>>>>> how > >>>>>>>>>> does > >>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>> ensure that same message will not be written twice > >>> when > >>>>> two > >>>>>>>>>> producers > >>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>> producing to same partition? Producer on receiving a > >>>> nack > >>>>>> will > >>>>>>>> try > >>>>>>>>>>>> again > >>>>>>>>>>>>>>>> with next offset and will keep doing so till the > >>> message > >>>>> is > >>>>>>>>>> accepted. > >>>>>>>>>>>>>> Am I > >>>>>>>>>>>>>>>> missing something? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Also, you have mentioned on KIP, "it imposes little > >> to > >>>> no > >>>>>>>> runtime > >>>>>>>>>>>> cost > >>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>> memory or time", I think that is not true for time. > >>> With > >>>>>> this > >>>>>>>>>>>> approach > >>>>>>>>>>>>>>>> producers' performance will reduce proportionally to > >>>>> number > >>>>>> of > >>>>>>>>>>>> producers > >>>>>>>>>>>>>>>> writing to same partition. Please correct me if I am > >>>>> missing > >>>>>>>> out > >>>>>>>>>>>>>> something. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Fri, Jul 17, 2015 at 11:32 AM, Mayuresh Gharat < > >>>>>>>>>>>>>>>> gharatmayures...@gmail.com> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> If we have 2 producers producing to a partition, > >> they > >>>> can > >>>>>> be > >>>>>>>> out > >>>>>>>>>> of > >>>>>>>>>>>>>> order, > >>>>>>>>>>>>>>>>> then how does one producer know what offset to > >> expect > >>>> as > >>>>> it > >>>>>>>> does > >>>>>>>>>> not > >>>>>>>>>>>>>>>>> interact with other producer? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Can you give an example flow that explains how it > >>> works > >>>>>> with > >>>>>>>>>> single > >>>>>>>>>>>>>>>>> producer and with multiple producers? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Mayuresh > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Fri, Jul 17, 2015 at 10:28 AM, Flavio Junqueira > >> < > >>>>>>>>>>>>>>>>> fpjunque...@yahoo.com.invalid> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I like this feature, it reminds me of conditional > >>>>>> updates in > >>>>>>>>>>>>>> zookeeper. > >>>>>>>>>>>>>>>>>> I'm not sure if it'd be best to have some > >> mechanism > >>>> for > >>>>>>>> fencing > >>>>>>>>>>>>>> rather > >>>>>>>>>>>>>>>>> than > >>>>>>>>>>>>>>>>>> a conditional write like you're proposing. The > >>> reason > >>>>> I'm > >>>>>>>>>> saying > >>>>>>>>>>>>>> this is > >>>>>>>>>>>>>>>>>> that the conditional write applies to requests > >>>>>> individually, > >>>>>>>>>>>> while it > >>>>>>>>>>>>>>>>>> sounds like you want to make sure that there is a > >>>>> single > >>>>>>>> client > >>>>>>>>>>>>>> writing > >>>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>> over multiple requests. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> -Flavio > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On 17 Jul 2015, at 07:30, Ben Kirwin < > >>> b...@kirw.in> > >>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hi there, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I just added a KIP for a 'conditional publish' > >>>>>> operation: > >>>>>>>> a > >>>>>>>>>>>> simple > >>>>>>>>>>>>>>>>>>> CAS-like mechanism for the Kafka producer. The > >>> wiki > >>>>>> page > >>>>>>>> is > >>>>>>>>>>>> here: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> And there's some previous discussion on the > >>> ticket > >>>>> and > >>>>>> the > >>>>>>>>>> users > >>>>>>>>>>>>>> list: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >> https://issues.apache.org/jira/browse/KAFKA-2260 > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > https://mail-archives.apache.org/mod_mbox/kafka-users/201506.mbox/%3CCAAeOB6ccyAA13YNPqVQv2o-mT5r=c9v7a+55sf2wp93qg7+...@mail.gmail.com%3E > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> As always, comments and suggestions are very > >>>> welcome. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>> Ben > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>> -Regards, > >>>>>>>>>>>>>>>>> Mayuresh R. Gharat > >>>>>>>>>>>>>>>>> (862) 250-7125 > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>> Ashish > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> Thanks, > >>>>>>> Ewen > >>>>>> > >>>>> > >>>> > >>> > >> > >