Ben, are you still interested in working on this? On Mon, Jun 15, 2015 at 9:49 AM Daniel Schierbeck < daniel.schierb...@gmail.com> wrote:
> I like to refer to it as "conditional write" or "conditional request", > semantically similar to HTTP's If-Match header. > > Ben: I'm adding a comment about per-key checking to your JIRA. > > On Mon, Jun 15, 2015 at 4:06 AM Ben Kirwin <b...@kirw.in> wrote: > >> Yeah, it's definitely not a standard CAS, but it feels like the right >> fit for the commit log abstraction -- CAS on a 'current value' does >> seem a bit too key-value-store-ish for Kafka to support natively. >> >> I tried to avoid referring to the check-offset-before-publish >> functionality as a CAS in the ticket because, while they're both types >> of 'optimistic concurrency control', they are a bit different -- and >> the offset check is both easier to implement and handier for the stuff >> I tend to work on. (Though that ticket's about checking the latest >> offset on a whole partition, not the key -- there's a different set of >> tradeoffs for the latter, and I haven't thought it through properly >> yet.) >> >> On Sat, Jun 13, 2015 at 3:35 PM, Ewen Cheslack-Postava >> <e...@confluent.io> wrote: >> > If you do CAS where you compare the offset of the current record for the >> > key, then yes. This might work fine for applications that track key, >> value, >> > and offset. It is not quite the same as doing a normal CAS. >> > >> > On Sat, Jun 13, 2015 at 12:07 PM, Daniel Schierbeck < >> > daniel.schierb...@gmail.com> wrote: >> > >> >> But wouldn't the key->offset table be enough to accept or reject a >> write? >> >> I'm not familiar with the exact implementation of Kafka, so I may be >> wrong. >> >> >> >> On lør. 13. jun. 2015 at 21.05 Ewen Cheslack-Postava < >> e...@confluent.io> >> >> wrote: >> >> >> >> > Daniel: By random read, I meant not reading the data sequentially as >> is >> >> the >> >> > norm in Kafka, not necessarily a random disk seek. That in-memory >> data >> >> > structure is what enables the random read. You're either going to >> need >> >> the >> >> > disk seek if the data isn't in the fs cache or you're trading memory >> to >> >> > avoid it. If it's a full index containing keys and values then you're >> >> > potentially committing to a much larger JVM memory footprint (and >> all the >> >> > GC issues that come with it) since you'd be storing that data in the >> JVM >> >> > heap. If you're only storing the keys + offset info, then you >> potentially >> >> > introduce random disk seeks on any CAS operation (and making page >> caching >> >> > harder for the OS, etc.). >> >> > >> >> > >> >> > On Sat, Jun 13, 2015 at 11:33 AM, Daniel Schierbeck < >> >> > daniel.schierb...@gmail.com> wrote: >> >> > >> >> > > Ewen: would single-key CAS necessitate random reads? My idea was to >> >> have >> >> > > the broker maintain an in-memory table that could be rebuilt from >> the >> >> log >> >> > > or a snapshot. >> >> > > On lør. 13. jun. 2015 at 20.26 Ewen Cheslack-Postava < >> >> e...@confluent.io> >> >> > > wrote: >> >> > > >> >> > > > Jay - I think you need broker support if you want CAS to work >> with >> >> > > > compacted topics. With the approach you described you can't turn >> on >> >> > > > compaction since that would make it last-writer-wins, and using >> any >> >> > > > non-infinite retention policy would require some external >> process to >> >> > > > monitor keys that might expire and refresh them by rewriting the >> >> data. >> >> > > > >> >> > > > That said, I think any addition like this warrants a lot of >> >> discussion >> >> > > > about potential use cases since there are a lot of ways you >> could go >> >> > > adding >> >> > > > support for something like this. I think this is an obvious next >> >> > > > incremental step, but someone is bound to have a use case that >> would >> >> > > > require multi-key CAS and would be costly to build atop single >> key >> >> CAS. >> >> > > Or, >> >> > > > since the compare requires a random read anyway, why not throw in >> >> > > > read-by-key rather than sequential log reads, which would allow >> for >> >> > > > minitransactions a la Sinfonia? >> >> > > > >> >> > > > I'm not convinced trying to make Kafka support traditional >> key-value >> >> > > store >> >> > > > functionality is a good idea. Compacted topics made it possible >> to >> >> use >> >> > > it a >> >> > > > bit more in that way, but didn't change the public interface, >> only >> >> the >> >> > > way >> >> > > > storage was implemented, and importantly all the potential >> additional >> >> > > > performance costs & data structures are isolated to background >> >> threads. >> >> > > > >> >> > > > -Ewen >> >> > > > >> >> > > > On Sat, Jun 13, 2015 at 9:59 AM, Daniel Schierbeck < >> >> > > > daniel.schierb...@gmail.com> wrote: >> >> > > > >> >> > > > > @Jay: >> >> > > > > >> >> > > > > Regarding your first proposal: wouldn't that mean that a >> producer >> >> > > > wouldn't >> >> > > > > know whether a write succeeded? In the case of event sourcing, >> a >> >> > failed >> >> > > > CAS >> >> > > > > may require re-validating the input with the new state. Simply >> >> > > discarding >> >> > > > > the write would be wrong. >> >> > > > > >> >> > > > > As for the second idea: how would a client of the writer >> service >> >> know >> >> > > > which >> >> > > > > writer is the leader? For example, how would a load balancer >> know >> >> > which >> >> > > > web >> >> > > > > app process to route requests to? Ideally, all processes would >> be >> >> > able >> >> > > to >> >> > > > > handle requests. >> >> > > > > >> >> > > > > Using conditional writes would allow any producer to write and >> >> > provide >> >> > > > > synchronous feedback to the producers. >> >> > > > > On fre. 12. jun. 2015 at 18.41 Jay Kreps <j...@confluent.io> >> wrote: >> >> > > > > >> >> > > > > > I have been thinking a little about this. I don't think CAS >> >> > actually >> >> > > > > > requires any particular broker support. Rather the two >> writers >> >> just >> >> > > > write >> >> > > > > > messages with some deterministic check-and-set criteria and >> all >> >> the >> >> > > > > > replicas read from the log and check this criteria before >> >> applying >> >> > > the >> >> > > > > > write. This mechanism has the downside that it creates >> additional >> >> > > > writes >> >> > > > > > when there is a conflict and requires waiting on the full >> >> roundtrip >> >> > > > > (write >> >> > > > > > and then read) but it has the advantage that it is very >> flexible >> >> as >> >> > > to >> >> > > > > the >> >> > > > > > criteria you use. >> >> > > > > > >> >> > > > > > An alternative strategy for accomplishing the same thing a >> bit >> >> more >> >> > > > > > efficiently is to elect leaders amongst the writers >> themselves. >> >> > This >> >> > > > > would >> >> > > > > > require broker support for single writer to avoid the >> possibility >> >> > of >> >> > > > > split >> >> > > > > > brain. I like this approach better because the leader for a >> >> > partition >> >> > > > can >> >> > > > > > then do anything they want on their local data to make the >> >> decision >> >> > > of >> >> > > > > what >> >> > > > > > is committed, however the downside is that the mechanism is >> more >> >> > > > > involved. >> >> > > > > > >> >> > > > > > -Jay >> >> > > > > > >> >> > > > > > On Fri, Jun 12, 2015 at 6:43 AM, Ben Kirwin <b...@kirw.in> >> wrote: >> >> > > > > > >> >> > > > > > > Gwen: Right now I'm just looking for feedback -- but yes, >> if >> >> > folks >> >> > > > are >> >> > > > > > > interested, I do plan to do that implementation work. >> >> > > > > > > >> >> > > > > > > Daniel: Yes, that's exactly right. I haven't thought much >> about >> >> > > > > > > per-key... it does sound useful, but the implementation >> seems a >> >> > bit >> >> > > > > > > more involved. Want to add it to the ticket? >> >> > > > > > > >> >> > > > > > > On Fri, Jun 12, 2015 at 7:49 AM, Daniel Schierbeck >> >> > > > > > > <daniel.schierb...@gmail.com> wrote: >> >> > > > > > > > Ben: your solutions seems to focus on partition-wide CAS. >> >> Have >> >> > > you >> >> > > > > > > > considered per-key CAS? That would make the feature more >> >> useful >> >> > > in >> >> > > > my >> >> > > > > > > > opinion, as you'd greatly reduce the contention. >> >> > > > > > > > >> >> > > > > > > > On Fri, Jun 12, 2015 at 6:54 AM Gwen Shapira < >> >> > > > gshap...@cloudera.com> >> >> > > > > > > wrote: >> >> > > > > > > > >> >> > > > > > > >> Hi Ben, >> >> > > > > > > >> >> >> > > > > > > >> Thanks for creating the ticket. Having check-and-set >> >> > capability >> >> > > > will >> >> > > > > > be >> >> > > > > > > >> sweet :) >> >> > > > > > > >> Are you planning to implement this yourself? Or is it >> just >> >> an >> >> > > idea >> >> > > > > for >> >> > > > > > > >> the community? >> >> > > > > > > >> >> >> > > > > > > >> Gwen >> >> > > > > > > >> >> >> > > > > > > >> On Thu, Jun 11, 2015 at 8:01 PM, Ben Kirwin < >> b...@kirw.in> >> >> > > wrote: >> >> > > > > > > >> > As it happens, I submitted a ticket for this feature a >> >> > couple >> >> > > > days >> >> > > > > > > ago: >> >> > > > > > > >> > >> >> > > > > > > >> > https://issues.apache.org/jira/browse/KAFKA-2260 >> >> > > > > > > >> > >> >> > > > > > > >> > Couldn't find any existing proposals for similar >> things, >> >> but >> >> > > > it's >> >> > > > > > > >> > certainly possible they're out there... >> >> > > > > > > >> > >> >> > > > > > > >> > On the other hand, I think you can solve your >> particular >> >> > issue >> >> > > > by >> >> > > > > > > >> > reframing the problem: treating the messages as >> 'requests' >> >> > or >> >> > > > > > > >> > 'commands' instead of statements of fact. In your >> >> > > flight-booking >> >> > > > > > > >> > example, the log would correctly reflect that two >> >> different >> >> > > > people >> >> > > > > > > >> > tried to book the same flight; the stream consumer >> would >> >> be >> >> > > > > > > >> > responsible for finalizing one booking, and notifying >> the >> >> > > other >> >> > > > > > client >> >> > > > > > > >> > that their request had failed. (In-browser or by >> email.) >> >> > > > > > > >> > >> >> > > > > > > >> > On Wed, Jun 10, 2015 at 5:04 AM, Daniel Schierbeck >> >> > > > > > > >> > <daniel.schierb...@gmail.com> wrote: >> >> > > > > > > >> >> I've been working on an application which uses Event >> >> > > Sourcing, >> >> > > > > and >> >> > > > > > > I'd >> >> > > > > > > >> like >> >> > > > > > > >> >> to use Kafka as opposed to, say, a SQL database to >> store >> >> > > > events. >> >> > > > > > This >> >> > > > > > > >> would >> >> > > > > > > >> >> allow me to easily integrate other systems by having >> them >> >> > > read >> >> > > > > off >> >> > > > > > > the >> >> > > > > > > >> >> Kafka topics. >> >> > > > > > > >> >> >> >> > > > > > > >> >> I do have one concern, though: the consistency of the >> >> data >> >> > > can >> >> > > > > only >> >> > > > > > > be >> >> > > > > > > >> >> guaranteed if a command handler has a complete >> picture of >> >> > all >> >> > > > > past >> >> > > > > > > >> events >> >> > > > > > > >> >> pertaining to some entity. >> >> > > > > > > >> >> >> >> > > > > > > >> >> As an example, consider an airline seat reservation >> >> system. >> >> > > > Each >> >> > > > > > > >> >> reservation command issued by a user is rejected if >> the >> >> > seat >> >> > > > has >> >> > > > > > > already >> >> > > > > > > >> >> been taken. If the seat is available, a record >> describing >> >> > the >> >> > > > > event >> >> > > > > > > is >> >> > > > > > > >> >> appended to the log. This works great when there's >> only >> >> one >> >> > > > > > producer, >> >> > > > > > > >> but >> >> > > > > > > >> >> in order to scale I may need multiple producer >> processes. >> >> > > This >> >> > > > > > > >> introduces a >> >> > > > > > > >> >> race condition: two command handlers may >> simultaneously >> >> > > > receive a >> >> > > > > > > >> command >> >> > > > > > > >> >> to reserver the same seat. The event log indicates >> that >> >> the >> >> > > > seat >> >> > > > > is >> >> > > > > > > >> >> available, so each handler will append a reservation >> >> event >> >> > – >> >> > > > thus >> >> > > > > > > >> >> double-booking that seat! >> >> > > > > > > >> >> >> >> > > > > > > >> >> I see three ways around that issue: >> >> > > > > > > >> >> 1. Don't use Kafka for this. >> >> > > > > > > >> >> 2. Force a singler producer for a given flight. This >> will >> >> > > > impact >> >> > > > > > > >> >> availability and make routing more complex. >> >> > > > > > > >> >> 3. Have a way to do optimistic locking in Kafka. >> >> > > > > > > >> >> >> >> > > > > > > >> >> The latter idea would work either on a per-key basis >> or >> >> > > > globally >> >> > > > > > for >> >> > > > > > > a >> >> > > > > > > >> >> partition: when appending to a partition, the >> producer >> >> > would >> >> > > > > > > indicate in >> >> > > > > > > >> >> its request that the request should be rejected >> unless >> >> the >> >> > > > > current >> >> > > > > > > >> offset >> >> > > > > > > >> >> of the partition is equal to x. For the per-key >> setup, >> >> > Kafka >> >> > > > > > brokers >> >> > > > > > > >> would >> >> > > > > > > >> >> track the offset of the latest message for each >> unique >> >> key, >> >> > > if >> >> > > > so >> >> > > > > > > >> >> configured. This would allow the request to specify >> that >> >> it >> >> > > > > should >> >> > > > > > be >> >> > > > > > > >> >> rejected if the offset for key k is not equal to x. >> >> > > > > > > >> >> >> >> > > > > > > >> >> This way, only one of the command handlers would >> succeed >> >> in >> >> > > > > writing >> >> > > > > > > to >> >> > > > > > > >> >> Kafka, thus ensuring consistency. >> >> > > > > > > >> >> >> >> > > > > > > >> >> There are different levels of complexity associated >> with >> >> > > > > > implementing >> >> > > > > > > >> this >> >> > > > > > > >> >> in Kafka depending on whether the feature would work >> >> > > > > per-partition >> >> > > > > > or >> >> > > > > > > >> >> per-key: >> >> > > > > > > >> >> * For the per-partition optimistic locking, the >> broker >> >> > would >> >> > > > just >> >> > > > > > > need >> >> > > > > > > >> to >> >> > > > > > > >> >> keep track of the high water mark for each partition >> and >> >> > > reject >> >> > > > > > > >> conditional >> >> > > > > > > >> >> requests when the offset doesn't match. >> >> > > > > > > >> >> * For per-key locking, the broker would need to >> maintain >> >> an >> >> > > > > > in-memory >> >> > > > > > > >> table >> >> > > > > > > >> >> mapping keys to the offset of the last message with >> that >> >> > key. >> >> > > > > This >> >> > > > > > > >> should >> >> > > > > > > >> >> be fairly easy to maintain and recreate from the log >> if >> >> > > > > necessary. >> >> > > > > > It >> >> > > > > > > >> could >> >> > > > > > > >> >> also be saved to disk as a snapshot from time to >> time in >> >> > > order >> >> > > > to >> >> > > > > > cut >> >> > > > > > > >> down >> >> > > > > > > >> >> the time needed to recreate the table on restart. >> >> There's a >> >> > > > small >> >> > > > > > > >> >> performance penalty associated with this, but it >> could be >> >> > > > opt-in >> >> > > > > > for >> >> > > > > > > a >> >> > > > > > > >> >> topic. >> >> > > > > > > >> >> >> >> > > > > > > >> >> Am I the only one thinking about using Kafka like >> this? >> >> > Would >> >> > > > > this >> >> > > > > > > be a >> >> > > > > > > >> >> nice feature to have? >> >> > > > > > > >> >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > -- >> >> > > > Thanks, >> >> > > > Ewen >> >> > > > >> >> > > >> >> > >> >> > >> >> > >> >> > -- >> >> > Thanks, >> >> > Ewen >> >> > >> >> >> > >> > >> > >> > -- >> > Thanks, >> > Ewen >> >