I like to refer to it as "conditional write" or "conditional request",
semantically similar to HTTP's If-Match header.

Ben: I'm adding a comment about per-key checking to your JIRA.

On Mon, Jun 15, 2015 at 4:06 AM Ben Kirwin <b...@kirw.in> wrote:

> Yeah, it's definitely not a standard CAS, but it feels like the right
> fit for the commit log abstraction -- CAS on a 'current value' does
> seem a bit too key-value-store-ish for Kafka to support natively.
>
> I tried to avoid referring to the check-offset-before-publish
> functionality as a CAS in the ticket because, while they're both types
> of 'optimistic concurrency control', they are a bit different -- and
> the offset check is both easier to implement and handier for the stuff
> I tend to work on. (Though that ticket's about checking the latest
> offset on a whole partition, not the key -- there's a different set of
> tradeoffs for the latter, and I haven't thought it through properly
> yet.)
>
> On Sat, Jun 13, 2015 at 3:35 PM, Ewen Cheslack-Postava
> <e...@confluent.io> wrote:
> > If you do CAS where you compare the offset of the current record for the
> > key, then yes. This might work fine for applications that track key,
> value,
> > and offset. It is not quite the same as doing a normal CAS.
> >
> > On Sat, Jun 13, 2015 at 12:07 PM, Daniel Schierbeck <
> > daniel.schierb...@gmail.com> wrote:
> >
> >> But wouldn't the key->offset table be enough to accept or reject a
> write?
> >> I'm not familiar with the exact implementation of Kafka, so I may be
> wrong.
> >>
> >> On lør. 13. jun. 2015 at 21.05 Ewen Cheslack-Postava <e...@confluent.io
> >
> >> wrote:
> >>
> >> > Daniel: By random read, I meant not reading the data sequentially as
> is
> >> the
> >> > norm in Kafka, not necessarily a random disk seek. That in-memory data
> >> > structure is what enables the random read. You're either going to need
> >> the
> >> > disk seek if the data isn't in the fs cache or you're trading memory
> to
> >> > avoid it. If it's a full index containing keys and values then you're
> >> > potentially committing to a much larger JVM memory footprint (and all
> the
> >> > GC issues that come with it) since you'd be storing that data in the
> JVM
> >> > heap. If you're only storing the keys + offset info, then you
> potentially
> >> > introduce random disk seeks on any CAS operation (and making page
> caching
> >> > harder for the OS, etc.).
> >> >
> >> >
> >> > On Sat, Jun 13, 2015 at 11:33 AM, Daniel Schierbeck <
> >> > daniel.schierb...@gmail.com> wrote:
> >> >
> >> > > Ewen: would single-key CAS necessitate random reads? My idea was to
> >> have
> >> > > the broker maintain an in-memory table that could be rebuilt from
> the
> >> log
> >> > > or a snapshot.
> >> > > On lør. 13. jun. 2015 at 20.26 Ewen Cheslack-Postava <
> >> e...@confluent.io>
> >> > > wrote:
> >> > >
> >> > > > Jay - I think you need broker support if you want CAS to work with
> >> > > > compacted topics. With the approach you described you can't turn
> on
> >> > > > compaction since that would make it last-writer-wins, and using
> any
> >> > > > non-infinite retention policy would require some external process
> to
> >> > > > monitor keys that might expire and refresh them by rewriting the
> >> data.
> >> > > >
> >> > > > That said, I think any addition like this warrants a lot of
> >> discussion
> >> > > > about potential use cases since there are a lot of ways you could
> go
> >> > > adding
> >> > > > support for something like this. I think this is an obvious next
> >> > > > incremental step, but someone is bound to have a use case that
> would
> >> > > > require multi-key CAS and would be costly to build atop single key
> >> CAS.
> >> > > Or,
> >> > > > since the compare requires a random read anyway, why not throw in
> >> > > > read-by-key rather than sequential log reads, which would allow
> for
> >> > > > minitransactions a la Sinfonia?
> >> > > >
> >> > > > I'm not convinced trying to make Kafka support traditional
> key-value
> >> > > store
> >> > > > functionality is a good idea. Compacted topics made it possible to
> >> use
> >> > > it a
> >> > > > bit more in that way, but didn't change the public interface, only
> >> the
> >> > > way
> >> > > > storage was implemented, and importantly all the potential
> additional
> >> > > > performance costs & data structures are isolated to background
> >> threads.
> >> > > >
> >> > > > -Ewen
> >> > > >
> >> > > > On Sat, Jun 13, 2015 at 9:59 AM, Daniel Schierbeck <
> >> > > > daniel.schierb...@gmail.com> wrote:
> >> > > >
> >> > > > > @Jay:
> >> > > > >
> >> > > > > Regarding your first proposal: wouldn't that mean that a
> producer
> >> > > > wouldn't
> >> > > > > know whether a write succeeded? In the case of event sourcing, a
> >> > failed
> >> > > > CAS
> >> > > > > may require re-validating the input with the new state. Simply
> >> > > discarding
> >> > > > > the write would be wrong.
> >> > > > >
> >> > > > > As for the second idea: how would a client of the writer service
> >> know
> >> > > > which
> >> > > > > writer is the leader? For example, how would a load balancer
> know
> >> > which
> >> > > > web
> >> > > > > app process to route requests to? Ideally, all processes would
> be
> >> > able
> >> > > to
> >> > > > > handle requests.
> >> > > > >
> >> > > > > Using conditional writes would allow any producer to write and
> >> > provide
> >> > > > > synchronous feedback to the producers.
> >> > > > > On fre. 12. jun. 2015 at 18.41 Jay Kreps <j...@confluent.io>
> wrote:
> >> > > > >
> >> > > > > > I have been thinking a little about this. I don't think CAS
> >> > actually
> >> > > > > > requires any particular broker support. Rather the two writers
> >> just
> >> > > > write
> >> > > > > > messages with some deterministic check-and-set criteria and
> all
> >> the
> >> > > > > > replicas read from the log and check this criteria before
> >> applying
> >> > > the
> >> > > > > > write. This mechanism has the downside that it creates
> additional
> >> > > > writes
> >> > > > > > when there is a conflict and requires waiting on the full
> >> roundtrip
> >> > > > > (write
> >> > > > > > and then read) but it has the advantage that it is very
> flexible
> >> as
> >> > > to
> >> > > > > the
> >> > > > > > criteria you use.
> >> > > > > >
> >> > > > > > An alternative strategy for accomplishing the same thing a bit
> >> more
> >> > > > > > efficiently is to elect leaders amongst the writers
> themselves.
> >> > This
> >> > > > > would
> >> > > > > > require broker support for single writer to avoid the
> possibility
> >> > of
> >> > > > > split
> >> > > > > > brain. I like this approach better because the leader for a
> >> > partition
> >> > > > can
> >> > > > > > then do anything they want on their local data to make the
> >> decision
> >> > > of
> >> > > > > what
> >> > > > > > is committed, however the downside is that the mechanism is
> more
> >> > > > > involved.
> >> > > > > >
> >> > > > > > -Jay
> >> > > > > >
> >> > > > > > On Fri, Jun 12, 2015 at 6:43 AM, Ben Kirwin <b...@kirw.in>
> wrote:
> >> > > > > >
> >> > > > > > > Gwen: Right now I'm just looking for feedback -- but yes, if
> >> > folks
> >> > > > are
> >> > > > > > > interested, I do plan to do that implementation work.
> >> > > > > > >
> >> > > > > > > Daniel: Yes, that's exactly right. I haven't thought much
> about
> >> > > > > > > per-key... it does sound useful, but the implementation
> seems a
> >> > bit
> >> > > > > > > more involved. Want to add it to the ticket?
> >> > > > > > >
> >> > > > > > > On Fri, Jun 12, 2015 at 7:49 AM, Daniel Schierbeck
> >> > > > > > > <daniel.schierb...@gmail.com> wrote:
> >> > > > > > > > Ben: your solutions seems to focus on partition-wide CAS.
> >> Have
> >> > > you
> >> > > > > > > > considered per-key CAS? That would make the feature more
> >> useful
> >> > > in
> >> > > > my
> >> > > > > > > > opinion, as you'd greatly reduce the contention.
> >> > > > > > > >
> >> > > > > > > > On Fri, Jun 12, 2015 at 6:54 AM Gwen Shapira <
> >> > > > gshap...@cloudera.com>
> >> > > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > >> Hi Ben,
> >> > > > > > > >>
> >> > > > > > > >> Thanks for creating the ticket. Having check-and-set
> >> > capability
> >> > > > will
> >> > > > > > be
> >> > > > > > > >> sweet :)
> >> > > > > > > >> Are you planning to implement this yourself? Or is it
> just
> >> an
> >> > > idea
> >> > > > > for
> >> > > > > > > >> the community?
> >> > > > > > > >>
> >> > > > > > > >> Gwen
> >> > > > > > > >>
> >> > > > > > > >> On Thu, Jun 11, 2015 at 8:01 PM, Ben Kirwin <b...@kirw.in
> >
> >> > > wrote:
> >> > > > > > > >> > As it happens, I submitted a ticket for this feature a
> >> > couple
> >> > > > days
> >> > > > > > > ago:
> >> > > > > > > >> >
> >> > > > > > > >> > https://issues.apache.org/jira/browse/KAFKA-2260
> >> > > > > > > >> >
> >> > > > > > > >> > Couldn't find any existing proposals for similar
> things,
> >> but
> >> > > > it's
> >> > > > > > > >> > certainly possible they're out there...
> >> > > > > > > >> >
> >> > > > > > > >> > On the other hand, I think you can solve your
> particular
> >> > issue
> >> > > > by
> >> > > > > > > >> > reframing the problem: treating the messages as
> 'requests'
> >> > or
> >> > > > > > > >> > 'commands' instead of statements of fact. In your
> >> > > flight-booking
> >> > > > > > > >> > example, the log would correctly reflect that two
> >> different
> >> > > > people
> >> > > > > > > >> > tried to book the same flight; the stream consumer
> would
> >> be
> >> > > > > > > >> > responsible for finalizing one booking, and notifying
> the
> >> > > other
> >> > > > > > client
> >> > > > > > > >> > that their request had failed. (In-browser or by
> email.)
> >> > > > > > > >> >
> >> > > > > > > >> > On Wed, Jun 10, 2015 at 5:04 AM, Daniel Schierbeck
> >> > > > > > > >> > <daniel.schierb...@gmail.com> wrote:
> >> > > > > > > >> >> I've been working on an application which uses Event
> >> > > Sourcing,
> >> > > > > and
> >> > > > > > > I'd
> >> > > > > > > >> like
> >> > > > > > > >> >> to use Kafka as opposed to, say, a SQL database to
> store
> >> > > > events.
> >> > > > > > This
> >> > > > > > > >> would
> >> > > > > > > >> >> allow me to easily integrate other systems by having
> them
> >> > > read
> >> > > > > off
> >> > > > > > > the
> >> > > > > > > >> >> Kafka topics.
> >> > > > > > > >> >>
> >> > > > > > > >> >> I do have one concern, though: the consistency of the
> >> data
> >> > > can
> >> > > > > only
> >> > > > > > > be
> >> > > > > > > >> >> guaranteed if a command handler has a complete
> picture of
> >> > all
> >> > > > > past
> >> > > > > > > >> events
> >> > > > > > > >> >> pertaining to some entity.
> >> > > > > > > >> >>
> >> > > > > > > >> >> As an example, consider an airline seat reservation
> >> system.
> >> > > > Each
> >> > > > > > > >> >> reservation command issued by a user is rejected if
> the
> >> > seat
> >> > > > has
> >> > > > > > > already
> >> > > > > > > >> >> been taken. If the seat is available, a record
> describing
> >> > the
> >> > > > > event
> >> > > > > > > is
> >> > > > > > > >> >> appended to the log. This works great when there's
> only
> >> one
> >> > > > > > producer,
> >> > > > > > > >> but
> >> > > > > > > >> >> in order to scale I may need multiple producer
> processes.
> >> > > This
> >> > > > > > > >> introduces a
> >> > > > > > > >> >> race condition: two command handlers may
> simultaneously
> >> > > > receive a
> >> > > > > > > >> command
> >> > > > > > > >> >> to reserver the same seat. The event log indicates
> that
> >> the
> >> > > > seat
> >> > > > > is
> >> > > > > > > >> >> available, so each handler will append a reservation
> >> event
> >> > –
> >> > > > thus
> >> > > > > > > >> >> double-booking that seat!
> >> > > > > > > >> >>
> >> > > > > > > >> >> I see three ways around that issue:
> >> > > > > > > >> >> 1. Don't use Kafka for this.
> >> > > > > > > >> >> 2. Force a singler producer for a given flight. This
> will
> >> > > > impact
> >> > > > > > > >> >> availability and make routing more complex.
> >> > > > > > > >> >> 3. Have a way to do optimistic locking in Kafka.
> >> > > > > > > >> >>
> >> > > > > > > >> >> The latter idea would work either on a per-key basis
> or
> >> > > > globally
> >> > > > > > for
> >> > > > > > > a
> >> > > > > > > >> >> partition: when appending to a partition, the producer
> >> > would
> >> > > > > > > indicate in
> >> > > > > > > >> >> its request that the request should be rejected unless
> >> the
> >> > > > > current
> >> > > > > > > >> offset
> >> > > > > > > >> >> of the partition is equal to x. For the per-key setup,
> >> > Kafka
> >> > > > > > brokers
> >> > > > > > > >> would
> >> > > > > > > >> >> track the offset of the latest message for each unique
> >> key,
> >> > > if
> >> > > > so
> >> > > > > > > >> >> configured. This would allow the request to specify
> that
> >> it
> >> > > > > should
> >> > > > > > be
> >> > > > > > > >> >> rejected if the offset for key k is not equal to x.
> >> > > > > > > >> >>
> >> > > > > > > >> >> This way, only one of the command handlers would
> succeed
> >> in
> >> > > > > writing
> >> > > > > > > to
> >> > > > > > > >> >> Kafka, thus ensuring consistency.
> >> > > > > > > >> >>
> >> > > > > > > >> >> There are different levels of complexity associated
> with
> >> > > > > > implementing
> >> > > > > > > >> this
> >> > > > > > > >> >> in Kafka depending on whether the feature would work
> >> > > > > per-partition
> >> > > > > > or
> >> > > > > > > >> >> per-key:
> >> > > > > > > >> >> * For the per-partition optimistic locking, the broker
> >> > would
> >> > > > just
> >> > > > > > > need
> >> > > > > > > >> to
> >> > > > > > > >> >> keep track of the high water mark for each partition
> and
> >> > > reject
> >> > > > > > > >> conditional
> >> > > > > > > >> >> requests when the offset doesn't match.
> >> > > > > > > >> >> * For per-key locking, the broker would need to
> maintain
> >> an
> >> > > > > > in-memory
> >> > > > > > > >> table
> >> > > > > > > >> >> mapping keys to the offset of the last message with
> that
> >> > key.
> >> > > > > This
> >> > > > > > > >> should
> >> > > > > > > >> >> be fairly easy to maintain and recreate from the log
> if
> >> > > > > necessary.
> >> > > > > > It
> >> > > > > > > >> could
> >> > > > > > > >> >> also be saved to disk as a snapshot from time to time
> in
> >> > > order
> >> > > > to
> >> > > > > > cut
> >> > > > > > > >> down
> >> > > > > > > >> >> the time needed to recreate the table on restart.
> >> There's a
> >> > > > small
> >> > > > > > > >> >> performance penalty associated with this, but it
> could be
> >> > > > opt-in
> >> > > > > > for
> >> > > > > > > a
> >> > > > > > > >> >> topic.
> >> > > > > > > >> >>
> >> > > > > > > >> >> Am I the only one thinking about using Kafka like
> this?
> >> > Would
> >> > > > > this
> >> > > > > > > be a
> >> > > > > > > >> >> nice feature to have?
> >> > > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > > >
> >> > > >
> >> > > > --
> >> > > > Thanks,
> >> > > > Ewen
> >> > > >
> >> > >
> >> >
> >> >
> >> >
> >> > --
> >> > Thanks,
> >> > Ewen
> >> >
> >>
> >
> >
> >
> > --
> > Thanks,
> > Ewen
>

Reply via email to