Great! I'd love to see this move forward, especially if the design allows for per-key conditionals sometime in the future – doesn't have to be in the first iteration.
On Tue, Jul 14, 2015 at 5:26 AM Ben Kirwin <b...@kirw.in> wrote: > Ah, just saw this. I actually just submitted a patch this evening -- > just for the partitionwide version at the moment, since it turns out > to be pretty simple to implement. Still very interested in moving > forward with this stuff, though not always as much time as I would > like... > > On Thu, Jul 9, 2015 at 9:39 AM, Daniel Schierbeck > <daniel.schierb...@gmail.com> wrote: > > Ben, are you still interested in working on this? > > > > On Mon, Jun 15, 2015 at 9:49 AM Daniel Schierbeck < > > daniel.schierb...@gmail.com> wrote: > > > >> I like to refer to it as "conditional write" or "conditional request", > >> semantically similar to HTTP's If-Match header. > >> > >> Ben: I'm adding a comment about per-key checking to your JIRA. > >> > >> On Mon, Jun 15, 2015 at 4:06 AM Ben Kirwin <b...@kirw.in> wrote: > >> > >>> Yeah, it's definitely not a standard CAS, but it feels like the right > >>> fit for the commit log abstraction -- CAS on a 'current value' does > >>> seem a bit too key-value-store-ish for Kafka to support natively. > >>> > >>> I tried to avoid referring to the check-offset-before-publish > >>> functionality as a CAS in the ticket because, while they're both types > >>> of 'optimistic concurrency control', they are a bit different -- and > >>> the offset check is both easier to implement and handier for the stuff > >>> I tend to work on. (Though that ticket's about checking the latest > >>> offset on a whole partition, not the key -- there's a different set of > >>> tradeoffs for the latter, and I haven't thought it through properly > >>> yet.) > >>> > >>> On Sat, Jun 13, 2015 at 3:35 PM, Ewen Cheslack-Postava > >>> <e...@confluent.io> wrote: > >>> > If you do CAS where you compare the offset of the current record for > the > >>> > key, then yes. This might work fine for applications that track key, > >>> value, > >>> > and offset. It is not quite the same as doing a normal CAS. > >>> > > >>> > On Sat, Jun 13, 2015 at 12:07 PM, Daniel Schierbeck < > >>> > daniel.schierb...@gmail.com> wrote: > >>> > > >>> >> But wouldn't the key->offset table be enough to accept or reject a > >>> write? > >>> >> I'm not familiar with the exact implementation of Kafka, so I may be > >>> wrong. > >>> >> > >>> >> On lør. 13. jun. 2015 at 21.05 Ewen Cheslack-Postava < > >>> e...@confluent.io> > >>> >> wrote: > >>> >> > >>> >> > Daniel: By random read, I meant not reading the data sequentially > as > >>> is > >>> >> the > >>> >> > norm in Kafka, not necessarily a random disk seek. That in-memory > >>> data > >>> >> > structure is what enables the random read. You're either going to > >>> need > >>> >> the > >>> >> > disk seek if the data isn't in the fs cache or you're trading > memory > >>> to > >>> >> > avoid it. If it's a full index containing keys and values then > you're > >>> >> > potentially committing to a much larger JVM memory footprint (and > >>> all the > >>> >> > GC issues that come with it) since you'd be storing that data in > the > >>> JVM > >>> >> > heap. If you're only storing the keys + offset info, then you > >>> potentially > >>> >> > introduce random disk seeks on any CAS operation (and making page > >>> caching > >>> >> > harder for the OS, etc.). > >>> >> > > >>> >> > > >>> >> > On Sat, Jun 13, 2015 at 11:33 AM, Daniel Schierbeck < > >>> >> > daniel.schierb...@gmail.com> wrote: > >>> >> > > >>> >> > > Ewen: would single-key CAS necessitate random reads? My idea > was to > >>> >> have > >>> >> > > the broker maintain an in-memory table that could be rebuilt > from > >>> the > >>> >> log > >>> >> > > or a snapshot. > >>> >> > > On lør. 13. jun. 2015 at 20.26 Ewen Cheslack-Postava < > >>> >> e...@confluent.io> > >>> >> > > wrote: > >>> >> > > > >>> >> > > > Jay - I think you need broker support if you want CAS to work > >>> with > >>> >> > > > compacted topics. With the approach you described you can't > turn > >>> on > >>> >> > > > compaction since that would make it last-writer-wins, and > using > >>> any > >>> >> > > > non-infinite retention policy would require some external > >>> process to > >>> >> > > > monitor keys that might expire and refresh them by rewriting > the > >>> >> data. > >>> >> > > > > >>> >> > > > That said, I think any addition like this warrants a lot of > >>> >> discussion > >>> >> > > > about potential use cases since there are a lot of ways you > >>> could go > >>> >> > > adding > >>> >> > > > support for something like this. I think this is an obvious > next > >>> >> > > > incremental step, but someone is bound to have a use case that > >>> would > >>> >> > > > require multi-key CAS and would be costly to build atop single > >>> key > >>> >> CAS. > >>> >> > > Or, > >>> >> > > > since the compare requires a random read anyway, why not > throw in > >>> >> > > > read-by-key rather than sequential log reads, which would > allow > >>> for > >>> >> > > > minitransactions a la Sinfonia? > >>> >> > > > > >>> >> > > > I'm not convinced trying to make Kafka support traditional > >>> key-value > >>> >> > > store > >>> >> > > > functionality is a good idea. Compacted topics made it > possible > >>> to > >>> >> use > >>> >> > > it a > >>> >> > > > bit more in that way, but didn't change the public interface, > >>> only > >>> >> the > >>> >> > > way > >>> >> > > > storage was implemented, and importantly all the potential > >>> additional > >>> >> > > > performance costs & data structures are isolated to background > >>> >> threads. > >>> >> > > > > >>> >> > > > -Ewen > >>> >> > > > > >>> >> > > > On Sat, Jun 13, 2015 at 9:59 AM, Daniel Schierbeck < > >>> >> > > > daniel.schierb...@gmail.com> wrote: > >>> >> > > > > >>> >> > > > > @Jay: > >>> >> > > > > > >>> >> > > > > Regarding your first proposal: wouldn't that mean that a > >>> producer > >>> >> > > > wouldn't > >>> >> > > > > know whether a write succeeded? In the case of event > sourcing, > >>> a > >>> >> > failed > >>> >> > > > CAS > >>> >> > > > > may require re-validating the input with the new state. > Simply > >>> >> > > discarding > >>> >> > > > > the write would be wrong. > >>> >> > > > > > >>> >> > > > > As for the second idea: how would a client of the writer > >>> service > >>> >> know > >>> >> > > > which > >>> >> > > > > writer is the leader? For example, how would a load balancer > >>> know > >>> >> > which > >>> >> > > > web > >>> >> > > > > app process to route requests to? Ideally, all processes > would > >>> be > >>> >> > able > >>> >> > > to > >>> >> > > > > handle requests. > >>> >> > > > > > >>> >> > > > > Using conditional writes would allow any producer to write > and > >>> >> > provide > >>> >> > > > > synchronous feedback to the producers. > >>> >> > > > > On fre. 12. jun. 2015 at 18.41 Jay Kreps <j...@confluent.io> > >>> wrote: > >>> >> > > > > > >>> >> > > > > > I have been thinking a little about this. I don't think > CAS > >>> >> > actually > >>> >> > > > > > requires any particular broker support. Rather the two > >>> writers > >>> >> just > >>> >> > > > write > >>> >> > > > > > messages with some deterministic check-and-set criteria > and > >>> all > >>> >> the > >>> >> > > > > > replicas read from the log and check this criteria before > >>> >> applying > >>> >> > > the > >>> >> > > > > > write. This mechanism has the downside that it creates > >>> additional > >>> >> > > > writes > >>> >> > > > > > when there is a conflict and requires waiting on the full > >>> >> roundtrip > >>> >> > > > > (write > >>> >> > > > > > and then read) but it has the advantage that it is very > >>> flexible > >>> >> as > >>> >> > > to > >>> >> > > > > the > >>> >> > > > > > criteria you use. > >>> >> > > > > > > >>> >> > > > > > An alternative strategy for accomplishing the same thing a > >>> bit > >>> >> more > >>> >> > > > > > efficiently is to elect leaders amongst the writers > >>> themselves. > >>> >> > This > >>> >> > > > > would > >>> >> > > > > > require broker support for single writer to avoid the > >>> possibility > >>> >> > of > >>> >> > > > > split > >>> >> > > > > > brain. I like this approach better because the leader for > a > >>> >> > partition > >>> >> > > > can > >>> >> > > > > > then do anything they want on their local data to make the > >>> >> decision > >>> >> > > of > >>> >> > > > > what > >>> >> > > > > > is committed, however the downside is that the mechanism > is > >>> more > >>> >> > > > > involved. > >>> >> > > > > > > >>> >> > > > > > -Jay > >>> >> > > > > > > >>> >> > > > > > On Fri, Jun 12, 2015 at 6:43 AM, Ben Kirwin <b...@kirw.in> > >>> wrote: > >>> >> > > > > > > >>> >> > > > > > > Gwen: Right now I'm just looking for feedback -- but > yes, > >>> if > >>> >> > folks > >>> >> > > > are > >>> >> > > > > > > interested, I do plan to do that implementation work. > >>> >> > > > > > > > >>> >> > > > > > > Daniel: Yes, that's exactly right. I haven't thought > much > >>> about > >>> >> > > > > > > per-key... it does sound useful, but the implementation > >>> seems a > >>> >> > bit > >>> >> > > > > > > more involved. Want to add it to the ticket? > >>> >> > > > > > > > >>> >> > > > > > > On Fri, Jun 12, 2015 at 7:49 AM, Daniel Schierbeck > >>> >> > > > > > > <daniel.schierb...@gmail.com> wrote: > >>> >> > > > > > > > Ben: your solutions seems to focus on partition-wide > CAS. > >>> >> Have > >>> >> > > you > >>> >> > > > > > > > considered per-key CAS? That would make the feature > more > >>> >> useful > >>> >> > > in > >>> >> > > > my > >>> >> > > > > > > > opinion, as you'd greatly reduce the contention. > >>> >> > > > > > > > > >>> >> > > > > > > > On Fri, Jun 12, 2015 at 6:54 AM Gwen Shapira < > >>> >> > > > gshap...@cloudera.com> > >>> >> > > > > > > wrote: > >>> >> > > > > > > > > >>> >> > > > > > > >> Hi Ben, > >>> >> > > > > > > >> > >>> >> > > > > > > >> Thanks for creating the ticket. Having check-and-set > >>> >> > capability > >>> >> > > > will > >>> >> > > > > > be > >>> >> > > > > > > >> sweet :) > >>> >> > > > > > > >> Are you planning to implement this yourself? Or is it > >>> just > >>> >> an > >>> >> > > idea > >>> >> > > > > for > >>> >> > > > > > > >> the community? > >>> >> > > > > > > >> > >>> >> > > > > > > >> Gwen > >>> >> > > > > > > >> > >>> >> > > > > > > >> On Thu, Jun 11, 2015 at 8:01 PM, Ben Kirwin < > >>> b...@kirw.in> > >>> >> > > wrote: > >>> >> > > > > > > >> > As it happens, I submitted a ticket for this > feature a > >>> >> > couple > >>> >> > > > days > >>> >> > > > > > > ago: > >>> >> > > > > > > >> > > >>> >> > > > > > > >> > https://issues.apache.org/jira/browse/KAFKA-2260 > >>> >> > > > > > > >> > > >>> >> > > > > > > >> > Couldn't find any existing proposals for similar > >>> things, > >>> >> but > >>> >> > > > it's > >>> >> > > > > > > >> > certainly possible they're out there... > >>> >> > > > > > > >> > > >>> >> > > > > > > >> > On the other hand, I think you can solve your > >>> particular > >>> >> > issue > >>> >> > > > by > >>> >> > > > > > > >> > reframing the problem: treating the messages as > >>> 'requests' > >>> >> > or > >>> >> > > > > > > >> > 'commands' instead of statements of fact. In your > >>> >> > > flight-booking > >>> >> > > > > > > >> > example, the log would correctly reflect that two > >>> >> different > >>> >> > > > people > >>> >> > > > > > > >> > tried to book the same flight; the stream consumer > >>> would > >>> >> be > >>> >> > > > > > > >> > responsible for finalizing one booking, and > notifying > >>> the > >>> >> > > other > >>> >> > > > > > client > >>> >> > > > > > > >> > that their request had failed. (In-browser or by > >>> email.) > >>> >> > > > > > > >> > > >>> >> > > > > > > >> > On Wed, Jun 10, 2015 at 5:04 AM, Daniel Schierbeck > >>> >> > > > > > > >> > <daniel.schierb...@gmail.com> wrote: > >>> >> > > > > > > >> >> I've been working on an application which uses > Event > >>> >> > > Sourcing, > >>> >> > > > > and > >>> >> > > > > > > I'd > >>> >> > > > > > > >> like > >>> >> > > > > > > >> >> to use Kafka as opposed to, say, a SQL database to > >>> store > >>> >> > > > events. > >>> >> > > > > > This > >>> >> > > > > > > >> would > >>> >> > > > > > > >> >> allow me to easily integrate other systems by > having > >>> them > >>> >> > > read > >>> >> > > > > off > >>> >> > > > > > > the > >>> >> > > > > > > >> >> Kafka topics. > >>> >> > > > > > > >> >> > >>> >> > > > > > > >> >> I do have one concern, though: the consistency of > the > >>> >> data > >>> >> > > can > >>> >> > > > > only > >>> >> > > > > > > be > >>> >> > > > > > > >> >> guaranteed if a command handler has a complete > >>> picture of > >>> >> > all > >>> >> > > > > past > >>> >> > > > > > > >> events > >>> >> > > > > > > >> >> pertaining to some entity. > >>> >> > > > > > > >> >> > >>> >> > > > > > > >> >> As an example, consider an airline seat > reservation > >>> >> system. > >>> >> > > > Each > >>> >> > > > > > > >> >> reservation command issued by a user is rejected > if > >>> the > >>> >> > seat > >>> >> > > > has > >>> >> > > > > > > already > >>> >> > > > > > > >> >> been taken. If the seat is available, a record > >>> describing > >>> >> > the > >>> >> > > > > event > >>> >> > > > > > > is > >>> >> > > > > > > >> >> appended to the log. This works great when there's > >>> only > >>> >> one > >>> >> > > > > > producer, > >>> >> > > > > > > >> but > >>> >> > > > > > > >> >> in order to scale I may need multiple producer > >>> processes. > >>> >> > > This > >>> >> > > > > > > >> introduces a > >>> >> > > > > > > >> >> race condition: two command handlers may > >>> simultaneously > >>> >> > > > receive a > >>> >> > > > > > > >> command > >>> >> > > > > > > >> >> to reserver the same seat. The event log indicates > >>> that > >>> >> the > >>> >> > > > seat > >>> >> > > > > is > >>> >> > > > > > > >> >> available, so each handler will append a > reservation > >>> >> event > >>> >> > – > >>> >> > > > thus > >>> >> > > > > > > >> >> double-booking that seat! > >>> >> > > > > > > >> >> > >>> >> > > > > > > >> >> I see three ways around that issue: > >>> >> > > > > > > >> >> 1. Don't use Kafka for this. > >>> >> > > > > > > >> >> 2. Force a singler producer for a given flight. > This > >>> will > >>> >> > > > impact > >>> >> > > > > > > >> >> availability and make routing more complex. > >>> >> > > > > > > >> >> 3. Have a way to do optimistic locking in Kafka. > >>> >> > > > > > > >> >> > >>> >> > > > > > > >> >> The latter idea would work either on a per-key > basis > >>> or > >>> >> > > > globally > >>> >> > > > > > for > >>> >> > > > > > > a > >>> >> > > > > > > >> >> partition: when appending to a partition, the > >>> producer > >>> >> > would > >>> >> > > > > > > indicate in > >>> >> > > > > > > >> >> its request that the request should be rejected > >>> unless > >>> >> the > >>> >> > > > > current > >>> >> > > > > > > >> offset > >>> >> > > > > > > >> >> of the partition is equal to x. For the per-key > >>> setup, > >>> >> > Kafka > >>> >> > > > > > brokers > >>> >> > > > > > > >> would > >>> >> > > > > > > >> >> track the offset of the latest message for each > >>> unique > >>> >> key, > >>> >> > > if > >>> >> > > > so > >>> >> > > > > > > >> >> configured. This would allow the request to > specify > >>> that > >>> >> it > >>> >> > > > > should > >>> >> > > > > > be > >>> >> > > > > > > >> >> rejected if the offset for key k is not equal to > x. > >>> >> > > > > > > >> >> > >>> >> > > > > > > >> >> This way, only one of the command handlers would > >>> succeed > >>> >> in > >>> >> > > > > writing > >>> >> > > > > > > to > >>> >> > > > > > > >> >> Kafka, thus ensuring consistency. > >>> >> > > > > > > >> >> > >>> >> > > > > > > >> >> There are different levels of complexity > associated > >>> with > >>> >> > > > > > implementing > >>> >> > > > > > > >> this > >>> >> > > > > > > >> >> in Kafka depending on whether the feature would > work > >>> >> > > > > per-partition > >>> >> > > > > > or > >>> >> > > > > > > >> >> per-key: > >>> >> > > > > > > >> >> * For the per-partition optimistic locking, the > >>> broker > >>> >> > would > >>> >> > > > just > >>> >> > > > > > > need > >>> >> > > > > > > >> to > >>> >> > > > > > > >> >> keep track of the high water mark for each > partition > >>> and > >>> >> > > reject > >>> >> > > > > > > >> conditional > >>> >> > > > > > > >> >> requests when the offset doesn't match. > >>> >> > > > > > > >> >> * For per-key locking, the broker would need to > >>> maintain > >>> >> an > >>> >> > > > > > in-memory > >>> >> > > > > > > >> table > >>> >> > > > > > > >> >> mapping keys to the offset of the last message > with > >>> that > >>> >> > key. > >>> >> > > > > This > >>> >> > > > > > > >> should > >>> >> > > > > > > >> >> be fairly easy to maintain and recreate from the > log > >>> if > >>> >> > > > > necessary. > >>> >> > > > > > It > >>> >> > > > > > > >> could > >>> >> > > > > > > >> >> also be saved to disk as a snapshot from time to > >>> time in > >>> >> > > order > >>> >> > > > to > >>> >> > > > > > cut > >>> >> > > > > > > >> down > >>> >> > > > > > > >> >> the time needed to recreate the table on restart. > >>> >> There's a > >>> >> > > > small > >>> >> > > > > > > >> >> performance penalty associated with this, but it > >>> could be > >>> >> > > > opt-in > >>> >> > > > > > for > >>> >> > > > > > > a > >>> >> > > > > > > >> >> topic. > >>> >> > > > > > > >> >> > >>> >> > > > > > > >> >> Am I the only one thinking about using Kafka like > >>> this? > >>> >> > Would > >>> >> > > > > this > >>> >> > > > > > > be a > >>> >> > > > > > > >> >> nice feature to have? > >>> >> > > > > > > >> > >>> >> > > > > > > > >>> >> > > > > > > >>> >> > > > > > >>> >> > > > > >>> >> > > > > >>> >> > > > > >>> >> > > > -- > >>> >> > > > Thanks, > >>> >> > > > Ewen > >>> >> > > > > >>> >> > > > >>> >> > > >>> >> > > >>> >> > > >>> >> > -- > >>> >> > Thanks, > >>> >> > Ewen > >>> >> > > >>> >> > >>> > > >>> > > >>> > > >>> > -- > >>> > Thanks, > >>> > Ewen > >>> > >> >