Great! I'd love to see this move forward, especially if the design allows
for per-key conditionals sometime in the future – doesn't have to be in the
first iteration.

On Tue, Jul 14, 2015 at 5:26 AM Ben Kirwin <b...@kirw.in> wrote:

> Ah, just saw this. I actually just submitted a patch this evening --
> just for the partitionwide version at the moment, since it turns out
> to be pretty simple to implement. Still very interested in moving
> forward with this stuff, though not always as much time as I would
> like...
>
> On Thu, Jul 9, 2015 at 9:39 AM, Daniel Schierbeck
> <daniel.schierb...@gmail.com> wrote:
> > Ben, are you still interested in working on this?
> >
> > On Mon, Jun 15, 2015 at 9:49 AM Daniel Schierbeck <
> > daniel.schierb...@gmail.com> wrote:
> >
> >> I like to refer to it as "conditional write" or "conditional request",
> >> semantically similar to HTTP's If-Match header.
> >>
> >> Ben: I'm adding a comment about per-key checking to your JIRA.
> >>
> >> On Mon, Jun 15, 2015 at 4:06 AM Ben Kirwin <b...@kirw.in> wrote:
> >>
> >>> Yeah, it's definitely not a standard CAS, but it feels like the right
> >>> fit for the commit log abstraction -- CAS on a 'current value' does
> >>> seem a bit too key-value-store-ish for Kafka to support natively.
> >>>
> >>> I tried to avoid referring to the check-offset-before-publish
> >>> functionality as a CAS in the ticket because, while they're both types
> >>> of 'optimistic concurrency control', they are a bit different -- and
> >>> the offset check is both easier to implement and handier for the stuff
> >>> I tend to work on. (Though that ticket's about checking the latest
> >>> offset on a whole partition, not the key -- there's a different set of
> >>> tradeoffs for the latter, and I haven't thought it through properly
> >>> yet.)
> >>>
> >>> On Sat, Jun 13, 2015 at 3:35 PM, Ewen Cheslack-Postava
> >>> <e...@confluent.io> wrote:
> >>> > If you do CAS where you compare the offset of the current record for
> the
> >>> > key, then yes. This might work fine for applications that track key,
> >>> value,
> >>> > and offset. It is not quite the same as doing a normal CAS.
> >>> >
> >>> > On Sat, Jun 13, 2015 at 12:07 PM, Daniel Schierbeck <
> >>> > daniel.schierb...@gmail.com> wrote:
> >>> >
> >>> >> But wouldn't the key->offset table be enough to accept or reject a
> >>> write?
> >>> >> I'm not familiar with the exact implementation of Kafka, so I may be
> >>> wrong.
> >>> >>
> >>> >> On lør. 13. jun. 2015 at 21.05 Ewen Cheslack-Postava <
> >>> e...@confluent.io>
> >>> >> wrote:
> >>> >>
> >>> >> > Daniel: By random read, I meant not reading the data sequentially
> as
> >>> is
> >>> >> the
> >>> >> > norm in Kafka, not necessarily a random disk seek. That in-memory
> >>> data
> >>> >> > structure is what enables the random read. You're either going to
> >>> need
> >>> >> the
> >>> >> > disk seek if the data isn't in the fs cache or you're trading
> memory
> >>> to
> >>> >> > avoid it. If it's a full index containing keys and values then
> you're
> >>> >> > potentially committing to a much larger JVM memory footprint (and
> >>> all the
> >>> >> > GC issues that come with it) since you'd be storing that data in
> the
> >>> JVM
> >>> >> > heap. If you're only storing the keys + offset info, then you
> >>> potentially
> >>> >> > introduce random disk seeks on any CAS operation (and making page
> >>> caching
> >>> >> > harder for the OS, etc.).
> >>> >> >
> >>> >> >
> >>> >> > On Sat, Jun 13, 2015 at 11:33 AM, Daniel Schierbeck <
> >>> >> > daniel.schierb...@gmail.com> wrote:
> >>> >> >
> >>> >> > > Ewen: would single-key CAS necessitate random reads? My idea
> was to
> >>> >> have
> >>> >> > > the broker maintain an in-memory table that could be rebuilt
> from
> >>> the
> >>> >> log
> >>> >> > > or a snapshot.
> >>> >> > > On lør. 13. jun. 2015 at 20.26 Ewen Cheslack-Postava <
> >>> >> e...@confluent.io>
> >>> >> > > wrote:
> >>> >> > >
> >>> >> > > > Jay - I think you need broker support if you want CAS to work
> >>> with
> >>> >> > > > compacted topics. With the approach you described you can't
> turn
> >>> on
> >>> >> > > > compaction since that would make it last-writer-wins, and
> using
> >>> any
> >>> >> > > > non-infinite retention policy would require some external
> >>> process to
> >>> >> > > > monitor keys that might expire and refresh them by rewriting
> the
> >>> >> data.
> >>> >> > > >
> >>> >> > > > That said, I think any addition like this warrants a lot of
> >>> >> discussion
> >>> >> > > > about potential use cases since there are a lot of ways you
> >>> could go
> >>> >> > > adding
> >>> >> > > > support for something like this. I think this is an obvious
> next
> >>> >> > > > incremental step, but someone is bound to have a use case that
> >>> would
> >>> >> > > > require multi-key CAS and would be costly to build atop single
> >>> key
> >>> >> CAS.
> >>> >> > > Or,
> >>> >> > > > since the compare requires a random read anyway, why not
> throw in
> >>> >> > > > read-by-key rather than sequential log reads, which would
> allow
> >>> for
> >>> >> > > > minitransactions a la Sinfonia?
> >>> >> > > >
> >>> >> > > > I'm not convinced trying to make Kafka support traditional
> >>> key-value
> >>> >> > > store
> >>> >> > > > functionality is a good idea. Compacted topics made it
> possible
> >>> to
> >>> >> use
> >>> >> > > it a
> >>> >> > > > bit more in that way, but didn't change the public interface,
> >>> only
> >>> >> the
> >>> >> > > way
> >>> >> > > > storage was implemented, and importantly all the potential
> >>> additional
> >>> >> > > > performance costs & data structures are isolated to background
> >>> >> threads.
> >>> >> > > >
> >>> >> > > > -Ewen
> >>> >> > > >
> >>> >> > > > On Sat, Jun 13, 2015 at 9:59 AM, Daniel Schierbeck <
> >>> >> > > > daniel.schierb...@gmail.com> wrote:
> >>> >> > > >
> >>> >> > > > > @Jay:
> >>> >> > > > >
> >>> >> > > > > Regarding your first proposal: wouldn't that mean that a
> >>> producer
> >>> >> > > > wouldn't
> >>> >> > > > > know whether a write succeeded? In the case of event
> sourcing,
> >>> a
> >>> >> > failed
> >>> >> > > > CAS
> >>> >> > > > > may require re-validating the input with the new state.
> Simply
> >>> >> > > discarding
> >>> >> > > > > the write would be wrong.
> >>> >> > > > >
> >>> >> > > > > As for the second idea: how would a client of the writer
> >>> service
> >>> >> know
> >>> >> > > > which
> >>> >> > > > > writer is the leader? For example, how would a load balancer
> >>> know
> >>> >> > which
> >>> >> > > > web
> >>> >> > > > > app process to route requests to? Ideally, all processes
> would
> >>> be
> >>> >> > able
> >>> >> > > to
> >>> >> > > > > handle requests.
> >>> >> > > > >
> >>> >> > > > > Using conditional writes would allow any producer to write
> and
> >>> >> > provide
> >>> >> > > > > synchronous feedback to the producers.
> >>> >> > > > > On fre. 12. jun. 2015 at 18.41 Jay Kreps <j...@confluent.io>
> >>> wrote:
> >>> >> > > > >
> >>> >> > > > > > I have been thinking a little about this. I don't think
> CAS
> >>> >> > actually
> >>> >> > > > > > requires any particular broker support. Rather the two
> >>> writers
> >>> >> just
> >>> >> > > > write
> >>> >> > > > > > messages with some deterministic check-and-set criteria
> and
> >>> all
> >>> >> the
> >>> >> > > > > > replicas read from the log and check this criteria before
> >>> >> applying
> >>> >> > > the
> >>> >> > > > > > write. This mechanism has the downside that it creates
> >>> additional
> >>> >> > > > writes
> >>> >> > > > > > when there is a conflict and requires waiting on the full
> >>> >> roundtrip
> >>> >> > > > > (write
> >>> >> > > > > > and then read) but it has the advantage that it is very
> >>> flexible
> >>> >> as
> >>> >> > > to
> >>> >> > > > > the
> >>> >> > > > > > criteria you use.
> >>> >> > > > > >
> >>> >> > > > > > An alternative strategy for accomplishing the same thing a
> >>> bit
> >>> >> more
> >>> >> > > > > > efficiently is to elect leaders amongst the writers
> >>> themselves.
> >>> >> > This
> >>> >> > > > > would
> >>> >> > > > > > require broker support for single writer to avoid the
> >>> possibility
> >>> >> > of
> >>> >> > > > > split
> >>> >> > > > > > brain. I like this approach better because the leader for
> a
> >>> >> > partition
> >>> >> > > > can
> >>> >> > > > > > then do anything they want on their local data to make the
> >>> >> decision
> >>> >> > > of
> >>> >> > > > > what
> >>> >> > > > > > is committed, however the downside is that the mechanism
> is
> >>> more
> >>> >> > > > > involved.
> >>> >> > > > > >
> >>> >> > > > > > -Jay
> >>> >> > > > > >
> >>> >> > > > > > On Fri, Jun 12, 2015 at 6:43 AM, Ben Kirwin <b...@kirw.in>
> >>> wrote:
> >>> >> > > > > >
> >>> >> > > > > > > Gwen: Right now I'm just looking for feedback -- but
> yes,
> >>> if
> >>> >> > folks
> >>> >> > > > are
> >>> >> > > > > > > interested, I do plan to do that implementation work.
> >>> >> > > > > > >
> >>> >> > > > > > > Daniel: Yes, that's exactly right. I haven't thought
> much
> >>> about
> >>> >> > > > > > > per-key... it does sound useful, but the implementation
> >>> seems a
> >>> >> > bit
> >>> >> > > > > > > more involved. Want to add it to the ticket?
> >>> >> > > > > > >
> >>> >> > > > > > > On Fri, Jun 12, 2015 at 7:49 AM, Daniel Schierbeck
> >>> >> > > > > > > <daniel.schierb...@gmail.com> wrote:
> >>> >> > > > > > > > Ben: your solutions seems to focus on partition-wide
> CAS.
> >>> >> Have
> >>> >> > > you
> >>> >> > > > > > > > considered per-key CAS? That would make the feature
> more
> >>> >> useful
> >>> >> > > in
> >>> >> > > > my
> >>> >> > > > > > > > opinion, as you'd greatly reduce the contention.
> >>> >> > > > > > > >
> >>> >> > > > > > > > On Fri, Jun 12, 2015 at 6:54 AM Gwen Shapira <
> >>> >> > > > gshap...@cloudera.com>
> >>> >> > > > > > > wrote:
> >>> >> > > > > > > >
> >>> >> > > > > > > >> Hi Ben,
> >>> >> > > > > > > >>
> >>> >> > > > > > > >> Thanks for creating the ticket. Having check-and-set
> >>> >> > capability
> >>> >> > > > will
> >>> >> > > > > > be
> >>> >> > > > > > > >> sweet :)
> >>> >> > > > > > > >> Are you planning to implement this yourself? Or is it
> >>> just
> >>> >> an
> >>> >> > > idea
> >>> >> > > > > for
> >>> >> > > > > > > >> the community?
> >>> >> > > > > > > >>
> >>> >> > > > > > > >> Gwen
> >>> >> > > > > > > >>
> >>> >> > > > > > > >> On Thu, Jun 11, 2015 at 8:01 PM, Ben Kirwin <
> >>> b...@kirw.in>
> >>> >> > > wrote:
> >>> >> > > > > > > >> > As it happens, I submitted a ticket for this
> feature a
> >>> >> > couple
> >>> >> > > > days
> >>> >> > > > > > > ago:
> >>> >> > > > > > > >> >
> >>> >> > > > > > > >> > https://issues.apache.org/jira/browse/KAFKA-2260
> >>> >> > > > > > > >> >
> >>> >> > > > > > > >> > Couldn't find any existing proposals for similar
> >>> things,
> >>> >> but
> >>> >> > > > it's
> >>> >> > > > > > > >> > certainly possible they're out there...
> >>> >> > > > > > > >> >
> >>> >> > > > > > > >> > On the other hand, I think you can solve your
> >>> particular
> >>> >> > issue
> >>> >> > > > by
> >>> >> > > > > > > >> > reframing the problem: treating the messages as
> >>> 'requests'
> >>> >> > or
> >>> >> > > > > > > >> > 'commands' instead of statements of fact. In your
> >>> >> > > flight-booking
> >>> >> > > > > > > >> > example, the log would correctly reflect that two
> >>> >> different
> >>> >> > > > people
> >>> >> > > > > > > >> > tried to book the same flight; the stream consumer
> >>> would
> >>> >> be
> >>> >> > > > > > > >> > responsible for finalizing one booking, and
> notifying
> >>> the
> >>> >> > > other
> >>> >> > > > > > client
> >>> >> > > > > > > >> > that their request had failed. (In-browser or by
> >>> email.)
> >>> >> > > > > > > >> >
> >>> >> > > > > > > >> > On Wed, Jun 10, 2015 at 5:04 AM, Daniel Schierbeck
> >>> >> > > > > > > >> > <daniel.schierb...@gmail.com> wrote:
> >>> >> > > > > > > >> >> I've been working on an application which uses
> Event
> >>> >> > > Sourcing,
> >>> >> > > > > and
> >>> >> > > > > > > I'd
> >>> >> > > > > > > >> like
> >>> >> > > > > > > >> >> to use Kafka as opposed to, say, a SQL database to
> >>> store
> >>> >> > > > events.
> >>> >> > > > > > This
> >>> >> > > > > > > >> would
> >>> >> > > > > > > >> >> allow me to easily integrate other systems by
> having
> >>> them
> >>> >> > > read
> >>> >> > > > > off
> >>> >> > > > > > > the
> >>> >> > > > > > > >> >> Kafka topics.
> >>> >> > > > > > > >> >>
> >>> >> > > > > > > >> >> I do have one concern, though: the consistency of
> the
> >>> >> data
> >>> >> > > can
> >>> >> > > > > only
> >>> >> > > > > > > be
> >>> >> > > > > > > >> >> guaranteed if a command handler has a complete
> >>> picture of
> >>> >> > all
> >>> >> > > > > past
> >>> >> > > > > > > >> events
> >>> >> > > > > > > >> >> pertaining to some entity.
> >>> >> > > > > > > >> >>
> >>> >> > > > > > > >> >> As an example, consider an airline seat
> reservation
> >>> >> system.
> >>> >> > > > Each
> >>> >> > > > > > > >> >> reservation command issued by a user is rejected
> if
> >>> the
> >>> >> > seat
> >>> >> > > > has
> >>> >> > > > > > > already
> >>> >> > > > > > > >> >> been taken. If the seat is available, a record
> >>> describing
> >>> >> > the
> >>> >> > > > > event
> >>> >> > > > > > > is
> >>> >> > > > > > > >> >> appended to the log. This works great when there's
> >>> only
> >>> >> one
> >>> >> > > > > > producer,
> >>> >> > > > > > > >> but
> >>> >> > > > > > > >> >> in order to scale I may need multiple producer
> >>> processes.
> >>> >> > > This
> >>> >> > > > > > > >> introduces a
> >>> >> > > > > > > >> >> race condition: two command handlers may
> >>> simultaneously
> >>> >> > > > receive a
> >>> >> > > > > > > >> command
> >>> >> > > > > > > >> >> to reserver the same seat. The event log indicates
> >>> that
> >>> >> the
> >>> >> > > > seat
> >>> >> > > > > is
> >>> >> > > > > > > >> >> available, so each handler will append a
> reservation
> >>> >> event
> >>> >> > –
> >>> >> > > > thus
> >>> >> > > > > > > >> >> double-booking that seat!
> >>> >> > > > > > > >> >>
> >>> >> > > > > > > >> >> I see three ways around that issue:
> >>> >> > > > > > > >> >> 1. Don't use Kafka for this.
> >>> >> > > > > > > >> >> 2. Force a singler producer for a given flight.
> This
> >>> will
> >>> >> > > > impact
> >>> >> > > > > > > >> >> availability and make routing more complex.
> >>> >> > > > > > > >> >> 3. Have a way to do optimistic locking in Kafka.
> >>> >> > > > > > > >> >>
> >>> >> > > > > > > >> >> The latter idea would work either on a per-key
> basis
> >>> or
> >>> >> > > > globally
> >>> >> > > > > > for
> >>> >> > > > > > > a
> >>> >> > > > > > > >> >> partition: when appending to a partition, the
> >>> producer
> >>> >> > would
> >>> >> > > > > > > indicate in
> >>> >> > > > > > > >> >> its request that the request should be rejected
> >>> unless
> >>> >> the
> >>> >> > > > > current
> >>> >> > > > > > > >> offset
> >>> >> > > > > > > >> >> of the partition is equal to x. For the per-key
> >>> setup,
> >>> >> > Kafka
> >>> >> > > > > > brokers
> >>> >> > > > > > > >> would
> >>> >> > > > > > > >> >> track the offset of the latest message for each
> >>> unique
> >>> >> key,
> >>> >> > > if
> >>> >> > > > so
> >>> >> > > > > > > >> >> configured. This would allow the request to
> specify
> >>> that
> >>> >> it
> >>> >> > > > > should
> >>> >> > > > > > be
> >>> >> > > > > > > >> >> rejected if the offset for key k is not equal to
> x.
> >>> >> > > > > > > >> >>
> >>> >> > > > > > > >> >> This way, only one of the command handlers would
> >>> succeed
> >>> >> in
> >>> >> > > > > writing
> >>> >> > > > > > > to
> >>> >> > > > > > > >> >> Kafka, thus ensuring consistency.
> >>> >> > > > > > > >> >>
> >>> >> > > > > > > >> >> There are different levels of complexity
> associated
> >>> with
> >>> >> > > > > > implementing
> >>> >> > > > > > > >> this
> >>> >> > > > > > > >> >> in Kafka depending on whether the feature would
> work
> >>> >> > > > > per-partition
> >>> >> > > > > > or
> >>> >> > > > > > > >> >> per-key:
> >>> >> > > > > > > >> >> * For the per-partition optimistic locking, the
> >>> broker
> >>> >> > would
> >>> >> > > > just
> >>> >> > > > > > > need
> >>> >> > > > > > > >> to
> >>> >> > > > > > > >> >> keep track of the high water mark for each
> partition
> >>> and
> >>> >> > > reject
> >>> >> > > > > > > >> conditional
> >>> >> > > > > > > >> >> requests when the offset doesn't match.
> >>> >> > > > > > > >> >> * For per-key locking, the broker would need to
> >>> maintain
> >>> >> an
> >>> >> > > > > > in-memory
> >>> >> > > > > > > >> table
> >>> >> > > > > > > >> >> mapping keys to the offset of the last message
> with
> >>> that
> >>> >> > key.
> >>> >> > > > > This
> >>> >> > > > > > > >> should
> >>> >> > > > > > > >> >> be fairly easy to maintain and recreate from the
> log
> >>> if
> >>> >> > > > > necessary.
> >>> >> > > > > > It
> >>> >> > > > > > > >> could
> >>> >> > > > > > > >> >> also be saved to disk as a snapshot from time to
> >>> time in
> >>> >> > > order
> >>> >> > > > to
> >>> >> > > > > > cut
> >>> >> > > > > > > >> down
> >>> >> > > > > > > >> >> the time needed to recreate the table on restart.
> >>> >> There's a
> >>> >> > > > small
> >>> >> > > > > > > >> >> performance penalty associated with this, but it
> >>> could be
> >>> >> > > > opt-in
> >>> >> > > > > > for
> >>> >> > > > > > > a
> >>> >> > > > > > > >> >> topic.
> >>> >> > > > > > > >> >>
> >>> >> > > > > > > >> >> Am I the only one thinking about using Kafka like
> >>> this?
> >>> >> > Would
> >>> >> > > > > this
> >>> >> > > > > > > be a
> >>> >> > > > > > > >> >> nice feature to have?
> >>> >> > > > > > > >>
> >>> >> > > > > > >
> >>> >> > > > > >
> >>> >> > > > >
> >>> >> > > >
> >>> >> > > >
> >>> >> > > >
> >>> >> > > > --
> >>> >> > > > Thanks,
> >>> >> > > > Ewen
> >>> >> > > >
> >>> >> > >
> >>> >> >
> >>> >> >
> >>> >> >
> >>> >> > --
> >>> >> > Thanks,
> >>> >> > Ewen
> >>> >> >
> >>> >>
> >>> >
> >>> >
> >>> >
> >>> > --
> >>> > Thanks,
> >>> > Ewen
> >>>
> >>
>

Reply via email to