I like to refer to it as "conditional write" or "conditional request", semantically similar to HTTP's If-Match header.
Ben: I'm adding a comment about per-key checking to your JIRA. On Mon, Jun 15, 2015 at 4:06 AM Ben Kirwin <b...@kirw.in> wrote: > Yeah, it's definitely not a standard CAS, but it feels like the right > fit for the commit log abstraction -- CAS on a 'current value' does > seem a bit too key-value-store-ish for Kafka to support natively. > > I tried to avoid referring to the check-offset-before-publish > functionality as a CAS in the ticket because, while they're both types > of 'optimistic concurrency control', they are a bit different -- and > the offset check is both easier to implement and handier for the stuff > I tend to work on. (Though that ticket's about checking the latest > offset on a whole partition, not the key -- there's a different set of > tradeoffs for the latter, and I haven't thought it through properly > yet.) > > On Sat, Jun 13, 2015 at 3:35 PM, Ewen Cheslack-Postava > <e...@confluent.io> wrote: > > If you do CAS where you compare the offset of the current record for the > > key, then yes. This might work fine for applications that track key, > value, > > and offset. It is not quite the same as doing a normal CAS. > > > > On Sat, Jun 13, 2015 at 12:07 PM, Daniel Schierbeck < > > daniel.schierb...@gmail.com> wrote: > > > >> But wouldn't the key->offset table be enough to accept or reject a > write? > >> I'm not familiar with the exact implementation of Kafka, so I may be > wrong. > >> > >> On lør. 13. jun. 2015 at 21.05 Ewen Cheslack-Postava <e...@confluent.io > > > >> wrote: > >> > >> > Daniel: By random read, I meant not reading the data sequentially as > is > >> the > >> > norm in Kafka, not necessarily a random disk seek. That in-memory data > >> > structure is what enables the random read. You're either going to need > >> the > >> > disk seek if the data isn't in the fs cache or you're trading memory > to > >> > avoid it. If it's a full index containing keys and values then you're > >> > potentially committing to a much larger JVM memory footprint (and all > the > >> > GC issues that come with it) since you'd be storing that data in the > JVM > >> > heap. If you're only storing the keys + offset info, then you > potentially > >> > introduce random disk seeks on any CAS operation (and making page > caching > >> > harder for the OS, etc.). > >> > > >> > > >> > On Sat, Jun 13, 2015 at 11:33 AM, Daniel Schierbeck < > >> > daniel.schierb...@gmail.com> wrote: > >> > > >> > > Ewen: would single-key CAS necessitate random reads? My idea was to > >> have > >> > > the broker maintain an in-memory table that could be rebuilt from > the > >> log > >> > > or a snapshot. > >> > > On lør. 13. jun. 2015 at 20.26 Ewen Cheslack-Postava < > >> e...@confluent.io> > >> > > wrote: > >> > > > >> > > > Jay - I think you need broker support if you want CAS to work with > >> > > > compacted topics. With the approach you described you can't turn > on > >> > > > compaction since that would make it last-writer-wins, and using > any > >> > > > non-infinite retention policy would require some external process > to > >> > > > monitor keys that might expire and refresh them by rewriting the > >> data. > >> > > > > >> > > > That said, I think any addition like this warrants a lot of > >> discussion > >> > > > about potential use cases since there are a lot of ways you could > go > >> > > adding > >> > > > support for something like this. I think this is an obvious next > >> > > > incremental step, but someone is bound to have a use case that > would > >> > > > require multi-key CAS and would be costly to build atop single key > >> CAS. > >> > > Or, > >> > > > since the compare requires a random read anyway, why not throw in > >> > > > read-by-key rather than sequential log reads, which would allow > for > >> > > > minitransactions a la Sinfonia? > >> > > > > >> > > > I'm not convinced trying to make Kafka support traditional > key-value > >> > > store > >> > > > functionality is a good idea. Compacted topics made it possible to > >> use > >> > > it a > >> > > > bit more in that way, but didn't change the public interface, only > >> the > >> > > way > >> > > > storage was implemented, and importantly all the potential > additional > >> > > > performance costs & data structures are isolated to background > >> threads. > >> > > > > >> > > > -Ewen > >> > > > > >> > > > On Sat, Jun 13, 2015 at 9:59 AM, Daniel Schierbeck < > >> > > > daniel.schierb...@gmail.com> wrote: > >> > > > > >> > > > > @Jay: > >> > > > > > >> > > > > Regarding your first proposal: wouldn't that mean that a > producer > >> > > > wouldn't > >> > > > > know whether a write succeeded? In the case of event sourcing, a > >> > failed > >> > > > CAS > >> > > > > may require re-validating the input with the new state. Simply > >> > > discarding > >> > > > > the write would be wrong. > >> > > > > > >> > > > > As for the second idea: how would a client of the writer service > >> know > >> > > > which > >> > > > > writer is the leader? For example, how would a load balancer > know > >> > which > >> > > > web > >> > > > > app process to route requests to? Ideally, all processes would > be > >> > able > >> > > to > >> > > > > handle requests. > >> > > > > > >> > > > > Using conditional writes would allow any producer to write and > >> > provide > >> > > > > synchronous feedback to the producers. > >> > > > > On fre. 12. jun. 2015 at 18.41 Jay Kreps <j...@confluent.io> > wrote: > >> > > > > > >> > > > > > I have been thinking a little about this. I don't think CAS > >> > actually > >> > > > > > requires any particular broker support. Rather the two writers > >> just > >> > > > write > >> > > > > > messages with some deterministic check-and-set criteria and > all > >> the > >> > > > > > replicas read from the log and check this criteria before > >> applying > >> > > the > >> > > > > > write. This mechanism has the downside that it creates > additional > >> > > > writes > >> > > > > > when there is a conflict and requires waiting on the full > >> roundtrip > >> > > > > (write > >> > > > > > and then read) but it has the advantage that it is very > flexible > >> as > >> > > to > >> > > > > the > >> > > > > > criteria you use. > >> > > > > > > >> > > > > > An alternative strategy for accomplishing the same thing a bit > >> more > >> > > > > > efficiently is to elect leaders amongst the writers > themselves. > >> > This > >> > > > > would > >> > > > > > require broker support for single writer to avoid the > possibility > >> > of > >> > > > > split > >> > > > > > brain. I like this approach better because the leader for a > >> > partition > >> > > > can > >> > > > > > then do anything they want on their local data to make the > >> decision > >> > > of > >> > > > > what > >> > > > > > is committed, however the downside is that the mechanism is > more > >> > > > > involved. > >> > > > > > > >> > > > > > -Jay > >> > > > > > > >> > > > > > On Fri, Jun 12, 2015 at 6:43 AM, Ben Kirwin <b...@kirw.in> > wrote: > >> > > > > > > >> > > > > > > Gwen: Right now I'm just looking for feedback -- but yes, if > >> > folks > >> > > > are > >> > > > > > > interested, I do plan to do that implementation work. > >> > > > > > > > >> > > > > > > Daniel: Yes, that's exactly right. I haven't thought much > about > >> > > > > > > per-key... it does sound useful, but the implementation > seems a > >> > bit > >> > > > > > > more involved. Want to add it to the ticket? > >> > > > > > > > >> > > > > > > On Fri, Jun 12, 2015 at 7:49 AM, Daniel Schierbeck > >> > > > > > > <daniel.schierb...@gmail.com> wrote: > >> > > > > > > > Ben: your solutions seems to focus on partition-wide CAS. > >> Have > >> > > you > >> > > > > > > > considered per-key CAS? That would make the feature more > >> useful > >> > > in > >> > > > my > >> > > > > > > > opinion, as you'd greatly reduce the contention. > >> > > > > > > > > >> > > > > > > > On Fri, Jun 12, 2015 at 6:54 AM Gwen Shapira < > >> > > > gshap...@cloudera.com> > >> > > > > > > wrote: > >> > > > > > > > > >> > > > > > > >> Hi Ben, > >> > > > > > > >> > >> > > > > > > >> Thanks for creating the ticket. Having check-and-set > >> > capability > >> > > > will > >> > > > > > be > >> > > > > > > >> sweet :) > >> > > > > > > >> Are you planning to implement this yourself? Or is it > just > >> an > >> > > idea > >> > > > > for > >> > > > > > > >> the community? > >> > > > > > > >> > >> > > > > > > >> Gwen > >> > > > > > > >> > >> > > > > > > >> On Thu, Jun 11, 2015 at 8:01 PM, Ben Kirwin <b...@kirw.in > > > >> > > wrote: > >> > > > > > > >> > As it happens, I submitted a ticket for this feature a > >> > couple > >> > > > days > >> > > > > > > ago: > >> > > > > > > >> > > >> > > > > > > >> > https://issues.apache.org/jira/browse/KAFKA-2260 > >> > > > > > > >> > > >> > > > > > > >> > Couldn't find any existing proposals for similar > things, > >> but > >> > > > it's > >> > > > > > > >> > certainly possible they're out there... > >> > > > > > > >> > > >> > > > > > > >> > On the other hand, I think you can solve your > particular > >> > issue > >> > > > by > >> > > > > > > >> > reframing the problem: treating the messages as > 'requests' > >> > or > >> > > > > > > >> > 'commands' instead of statements of fact. In your > >> > > flight-booking > >> > > > > > > >> > example, the log would correctly reflect that two > >> different > >> > > > people > >> > > > > > > >> > tried to book the same flight; the stream consumer > would > >> be > >> > > > > > > >> > responsible for finalizing one booking, and notifying > the > >> > > other > >> > > > > > client > >> > > > > > > >> > that their request had failed. (In-browser or by > email.) > >> > > > > > > >> > > >> > > > > > > >> > On Wed, Jun 10, 2015 at 5:04 AM, Daniel Schierbeck > >> > > > > > > >> > <daniel.schierb...@gmail.com> wrote: > >> > > > > > > >> >> I've been working on an application which uses Event > >> > > Sourcing, > >> > > > > and > >> > > > > > > I'd > >> > > > > > > >> like > >> > > > > > > >> >> to use Kafka as opposed to, say, a SQL database to > store > >> > > > events. > >> > > > > > This > >> > > > > > > >> would > >> > > > > > > >> >> allow me to easily integrate other systems by having > them > >> > > read > >> > > > > off > >> > > > > > > the > >> > > > > > > >> >> Kafka topics. > >> > > > > > > >> >> > >> > > > > > > >> >> I do have one concern, though: the consistency of the > >> data > >> > > can > >> > > > > only > >> > > > > > > be > >> > > > > > > >> >> guaranteed if a command handler has a complete > picture of > >> > all > >> > > > > past > >> > > > > > > >> events > >> > > > > > > >> >> pertaining to some entity. > >> > > > > > > >> >> > >> > > > > > > >> >> As an example, consider an airline seat reservation > >> system. > >> > > > Each > >> > > > > > > >> >> reservation command issued by a user is rejected if > the > >> > seat > >> > > > has > >> > > > > > > already > >> > > > > > > >> >> been taken. If the seat is available, a record > describing > >> > the > >> > > > > event > >> > > > > > > is > >> > > > > > > >> >> appended to the log. This works great when there's > only > >> one > >> > > > > > producer, > >> > > > > > > >> but > >> > > > > > > >> >> in order to scale I may need multiple producer > processes. > >> > > This > >> > > > > > > >> introduces a > >> > > > > > > >> >> race condition: two command handlers may > simultaneously > >> > > > receive a > >> > > > > > > >> command > >> > > > > > > >> >> to reserver the same seat. The event log indicates > that > >> the > >> > > > seat > >> > > > > is > >> > > > > > > >> >> available, so each handler will append a reservation > >> event > >> > – > >> > > > thus > >> > > > > > > >> >> double-booking that seat! > >> > > > > > > >> >> > >> > > > > > > >> >> I see three ways around that issue: > >> > > > > > > >> >> 1. Don't use Kafka for this. > >> > > > > > > >> >> 2. Force a singler producer for a given flight. This > will > >> > > > impact > >> > > > > > > >> >> availability and make routing more complex. > >> > > > > > > >> >> 3. Have a way to do optimistic locking in Kafka. > >> > > > > > > >> >> > >> > > > > > > >> >> The latter idea would work either on a per-key basis > or > >> > > > globally > >> > > > > > for > >> > > > > > > a > >> > > > > > > >> >> partition: when appending to a partition, the producer > >> > would > >> > > > > > > indicate in > >> > > > > > > >> >> its request that the request should be rejected unless > >> the > >> > > > > current > >> > > > > > > >> offset > >> > > > > > > >> >> of the partition is equal to x. For the per-key setup, > >> > Kafka > >> > > > > > brokers > >> > > > > > > >> would > >> > > > > > > >> >> track the offset of the latest message for each unique > >> key, > >> > > if > >> > > > so > >> > > > > > > >> >> configured. This would allow the request to specify > that > >> it > >> > > > > should > >> > > > > > be > >> > > > > > > >> >> rejected if the offset for key k is not equal to x. > >> > > > > > > >> >> > >> > > > > > > >> >> This way, only one of the command handlers would > succeed > >> in > >> > > > > writing > >> > > > > > > to > >> > > > > > > >> >> Kafka, thus ensuring consistency. > >> > > > > > > >> >> > >> > > > > > > >> >> There are different levels of complexity associated > with > >> > > > > > implementing > >> > > > > > > >> this > >> > > > > > > >> >> in Kafka depending on whether the feature would work > >> > > > > per-partition > >> > > > > > or > >> > > > > > > >> >> per-key: > >> > > > > > > >> >> * For the per-partition optimistic locking, the broker > >> > would > >> > > > just > >> > > > > > > need > >> > > > > > > >> to > >> > > > > > > >> >> keep track of the high water mark for each partition > and > >> > > reject > >> > > > > > > >> conditional > >> > > > > > > >> >> requests when the offset doesn't match. > >> > > > > > > >> >> * For per-key locking, the broker would need to > maintain > >> an > >> > > > > > in-memory > >> > > > > > > >> table > >> > > > > > > >> >> mapping keys to the offset of the last message with > that > >> > key. > >> > > > > This > >> > > > > > > >> should > >> > > > > > > >> >> be fairly easy to maintain and recreate from the log > if > >> > > > > necessary. > >> > > > > > It > >> > > > > > > >> could > >> > > > > > > >> >> also be saved to disk as a snapshot from time to time > in > >> > > order > >> > > > to > >> > > > > > cut > >> > > > > > > >> down > >> > > > > > > >> >> the time needed to recreate the table on restart. > >> There's a > >> > > > small > >> > > > > > > >> >> performance penalty associated with this, but it > could be > >> > > > opt-in > >> > > > > > for > >> > > > > > > a > >> > > > > > > >> >> topic. > >> > > > > > > >> >> > >> > > > > > > >> >> Am I the only one thinking about using Kafka like > this? > >> > Would > >> > > > > this > >> > > > > > > be a > >> > > > > > > >> >> nice feature to have? > >> > > > > > > >> > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > -- > >> > > > Thanks, > >> > > > Ewen > >> > > > > >> > > > >> > > >> > > >> > > >> > -- > >> > Thanks, > >> > Ewen > >> > > >> > > > > > > > > -- > > Thanks, > > Ewen >