Ben, are you still interested in working on this?

On Mon, Jun 15, 2015 at 9:49 AM Daniel Schierbeck <
daniel.schierb...@gmail.com> wrote:

> I like to refer to it as "conditional write" or "conditional request",
> semantically similar to HTTP's If-Match header.
>
> Ben: I'm adding a comment about per-key checking to your JIRA.
>
> On Mon, Jun 15, 2015 at 4:06 AM Ben Kirwin <b...@kirw.in> wrote:
>
>> Yeah, it's definitely not a standard CAS, but it feels like the right
>> fit for the commit log abstraction -- CAS on a 'current value' does
>> seem a bit too key-value-store-ish for Kafka to support natively.
>>
>> I tried to avoid referring to the check-offset-before-publish
>> functionality as a CAS in the ticket because, while they're both types
>> of 'optimistic concurrency control', they are a bit different -- and
>> the offset check is both easier to implement and handier for the stuff
>> I tend to work on. (Though that ticket's about checking the latest
>> offset on a whole partition, not the key -- there's a different set of
>> tradeoffs for the latter, and I haven't thought it through properly
>> yet.)
>>
>> On Sat, Jun 13, 2015 at 3:35 PM, Ewen Cheslack-Postava
>> <e...@confluent.io> wrote:
>> > If you do CAS where you compare the offset of the current record for the
>> > key, then yes. This might work fine for applications that track key,
>> value,
>> > and offset. It is not quite the same as doing a normal CAS.
>> >
>> > On Sat, Jun 13, 2015 at 12:07 PM, Daniel Schierbeck <
>> > daniel.schierb...@gmail.com> wrote:
>> >
>> >> But wouldn't the key->offset table be enough to accept or reject a
>> write?
>> >> I'm not familiar with the exact implementation of Kafka, so I may be
>> wrong.
>> >>
>> >> On lør. 13. jun. 2015 at 21.05 Ewen Cheslack-Postava <
>> e...@confluent.io>
>> >> wrote:
>> >>
>> >> > Daniel: By random read, I meant not reading the data sequentially as
>> is
>> >> the
>> >> > norm in Kafka, not necessarily a random disk seek. That in-memory
>> data
>> >> > structure is what enables the random read. You're either going to
>> need
>> >> the
>> >> > disk seek if the data isn't in the fs cache or you're trading memory
>> to
>> >> > avoid it. If it's a full index containing keys and values then you're
>> >> > potentially committing to a much larger JVM memory footprint (and
>> all the
>> >> > GC issues that come with it) since you'd be storing that data in the
>> JVM
>> >> > heap. If you're only storing the keys + offset info, then you
>> potentially
>> >> > introduce random disk seeks on any CAS operation (and making page
>> caching
>> >> > harder for the OS, etc.).
>> >> >
>> >> >
>> >> > On Sat, Jun 13, 2015 at 11:33 AM, Daniel Schierbeck <
>> >> > daniel.schierb...@gmail.com> wrote:
>> >> >
>> >> > > Ewen: would single-key CAS necessitate random reads? My idea was to
>> >> have
>> >> > > the broker maintain an in-memory table that could be rebuilt from
>> the
>> >> log
>> >> > > or a snapshot.
>> >> > > On lør. 13. jun. 2015 at 20.26 Ewen Cheslack-Postava <
>> >> e...@confluent.io>
>> >> > > wrote:
>> >> > >
>> >> > > > Jay - I think you need broker support if you want CAS to work
>> with
>> >> > > > compacted topics. With the approach you described you can't turn
>> on
>> >> > > > compaction since that would make it last-writer-wins, and using
>> any
>> >> > > > non-infinite retention policy would require some external
>> process to
>> >> > > > monitor keys that might expire and refresh them by rewriting the
>> >> data.
>> >> > > >
>> >> > > > That said, I think any addition like this warrants a lot of
>> >> discussion
>> >> > > > about potential use cases since there are a lot of ways you
>> could go
>> >> > > adding
>> >> > > > support for something like this. I think this is an obvious next
>> >> > > > incremental step, but someone is bound to have a use case that
>> would
>> >> > > > require multi-key CAS and would be costly to build atop single
>> key
>> >> CAS.
>> >> > > Or,
>> >> > > > since the compare requires a random read anyway, why not throw in
>> >> > > > read-by-key rather than sequential log reads, which would allow
>> for
>> >> > > > minitransactions a la Sinfonia?
>> >> > > >
>> >> > > > I'm not convinced trying to make Kafka support traditional
>> key-value
>> >> > > store
>> >> > > > functionality is a good idea. Compacted topics made it possible
>> to
>> >> use
>> >> > > it a
>> >> > > > bit more in that way, but didn't change the public interface,
>> only
>> >> the
>> >> > > way
>> >> > > > storage was implemented, and importantly all the potential
>> additional
>> >> > > > performance costs & data structures are isolated to background
>> >> threads.
>> >> > > >
>> >> > > > -Ewen
>> >> > > >
>> >> > > > On Sat, Jun 13, 2015 at 9:59 AM, Daniel Schierbeck <
>> >> > > > daniel.schierb...@gmail.com> wrote:
>> >> > > >
>> >> > > > > @Jay:
>> >> > > > >
>> >> > > > > Regarding your first proposal: wouldn't that mean that a
>> producer
>> >> > > > wouldn't
>> >> > > > > know whether a write succeeded? In the case of event sourcing,
>> a
>> >> > failed
>> >> > > > CAS
>> >> > > > > may require re-validating the input with the new state. Simply
>> >> > > discarding
>> >> > > > > the write would be wrong.
>> >> > > > >
>> >> > > > > As for the second idea: how would a client of the writer
>> service
>> >> know
>> >> > > > which
>> >> > > > > writer is the leader? For example, how would a load balancer
>> know
>> >> > which
>> >> > > > web
>> >> > > > > app process to route requests to? Ideally, all processes would
>> be
>> >> > able
>> >> > > to
>> >> > > > > handle requests.
>> >> > > > >
>> >> > > > > Using conditional writes would allow any producer to write and
>> >> > provide
>> >> > > > > synchronous feedback to the producers.
>> >> > > > > On fre. 12. jun. 2015 at 18.41 Jay Kreps <j...@confluent.io>
>> wrote:
>> >> > > > >
>> >> > > > > > I have been thinking a little about this. I don't think CAS
>> >> > actually
>> >> > > > > > requires any particular broker support. Rather the two
>> writers
>> >> just
>> >> > > > write
>> >> > > > > > messages with some deterministic check-and-set criteria and
>> all
>> >> the
>> >> > > > > > replicas read from the log and check this criteria before
>> >> applying
>> >> > > the
>> >> > > > > > write. This mechanism has the downside that it creates
>> additional
>> >> > > > writes
>> >> > > > > > when there is a conflict and requires waiting on the full
>> >> roundtrip
>> >> > > > > (write
>> >> > > > > > and then read) but it has the advantage that it is very
>> flexible
>> >> as
>> >> > > to
>> >> > > > > the
>> >> > > > > > criteria you use.
>> >> > > > > >
>> >> > > > > > An alternative strategy for accomplishing the same thing a
>> bit
>> >> more
>> >> > > > > > efficiently is to elect leaders amongst the writers
>> themselves.
>> >> > This
>> >> > > > > would
>> >> > > > > > require broker support for single writer to avoid the
>> possibility
>> >> > of
>> >> > > > > split
>> >> > > > > > brain. I like this approach better because the leader for a
>> >> > partition
>> >> > > > can
>> >> > > > > > then do anything they want on their local data to make the
>> >> decision
>> >> > > of
>> >> > > > > what
>> >> > > > > > is committed, however the downside is that the mechanism is
>> more
>> >> > > > > involved.
>> >> > > > > >
>> >> > > > > > -Jay
>> >> > > > > >
>> >> > > > > > On Fri, Jun 12, 2015 at 6:43 AM, Ben Kirwin <b...@kirw.in>
>> wrote:
>> >> > > > > >
>> >> > > > > > > Gwen: Right now I'm just looking for feedback -- but yes,
>> if
>> >> > folks
>> >> > > > are
>> >> > > > > > > interested, I do plan to do that implementation work.
>> >> > > > > > >
>> >> > > > > > > Daniel: Yes, that's exactly right. I haven't thought much
>> about
>> >> > > > > > > per-key... it does sound useful, but the implementation
>> seems a
>> >> > bit
>> >> > > > > > > more involved. Want to add it to the ticket?
>> >> > > > > > >
>> >> > > > > > > On Fri, Jun 12, 2015 at 7:49 AM, Daniel Schierbeck
>> >> > > > > > > <daniel.schierb...@gmail.com> wrote:
>> >> > > > > > > > Ben: your solutions seems to focus on partition-wide CAS.
>> >> Have
>> >> > > you
>> >> > > > > > > > considered per-key CAS? That would make the feature more
>> >> useful
>> >> > > in
>> >> > > > my
>> >> > > > > > > > opinion, as you'd greatly reduce the contention.
>> >> > > > > > > >
>> >> > > > > > > > On Fri, Jun 12, 2015 at 6:54 AM Gwen Shapira <
>> >> > > > gshap...@cloudera.com>
>> >> > > > > > > wrote:
>> >> > > > > > > >
>> >> > > > > > > >> Hi Ben,
>> >> > > > > > > >>
>> >> > > > > > > >> Thanks for creating the ticket. Having check-and-set
>> >> > capability
>> >> > > > will
>> >> > > > > > be
>> >> > > > > > > >> sweet :)
>> >> > > > > > > >> Are you planning to implement this yourself? Or is it
>> just
>> >> an
>> >> > > idea
>> >> > > > > for
>> >> > > > > > > >> the community?
>> >> > > > > > > >>
>> >> > > > > > > >> Gwen
>> >> > > > > > > >>
>> >> > > > > > > >> On Thu, Jun 11, 2015 at 8:01 PM, Ben Kirwin <
>> b...@kirw.in>
>> >> > > wrote:
>> >> > > > > > > >> > As it happens, I submitted a ticket for this feature a
>> >> > couple
>> >> > > > days
>> >> > > > > > > ago:
>> >> > > > > > > >> >
>> >> > > > > > > >> > https://issues.apache.org/jira/browse/KAFKA-2260
>> >> > > > > > > >> >
>> >> > > > > > > >> > Couldn't find any existing proposals for similar
>> things,
>> >> but
>> >> > > > it's
>> >> > > > > > > >> > certainly possible they're out there...
>> >> > > > > > > >> >
>> >> > > > > > > >> > On the other hand, I think you can solve your
>> particular
>> >> > issue
>> >> > > > by
>> >> > > > > > > >> > reframing the problem: treating the messages as
>> 'requests'
>> >> > or
>> >> > > > > > > >> > 'commands' instead of statements of fact. In your
>> >> > > flight-booking
>> >> > > > > > > >> > example, the log would correctly reflect that two
>> >> different
>> >> > > > people
>> >> > > > > > > >> > tried to book the same flight; the stream consumer
>> would
>> >> be
>> >> > > > > > > >> > responsible for finalizing one booking, and notifying
>> the
>> >> > > other
>> >> > > > > > client
>> >> > > > > > > >> > that their request had failed. (In-browser or by
>> email.)
>> >> > > > > > > >> >
>> >> > > > > > > >> > On Wed, Jun 10, 2015 at 5:04 AM, Daniel Schierbeck
>> >> > > > > > > >> > <daniel.schierb...@gmail.com> wrote:
>> >> > > > > > > >> >> I've been working on an application which uses Event
>> >> > > Sourcing,
>> >> > > > > and
>> >> > > > > > > I'd
>> >> > > > > > > >> like
>> >> > > > > > > >> >> to use Kafka as opposed to, say, a SQL database to
>> store
>> >> > > > events.
>> >> > > > > > This
>> >> > > > > > > >> would
>> >> > > > > > > >> >> allow me to easily integrate other systems by having
>> them
>> >> > > read
>> >> > > > > off
>> >> > > > > > > the
>> >> > > > > > > >> >> Kafka topics.
>> >> > > > > > > >> >>
>> >> > > > > > > >> >> I do have one concern, though: the consistency of the
>> >> data
>> >> > > can
>> >> > > > > only
>> >> > > > > > > be
>> >> > > > > > > >> >> guaranteed if a command handler has a complete
>> picture of
>> >> > all
>> >> > > > > past
>> >> > > > > > > >> events
>> >> > > > > > > >> >> pertaining to some entity.
>> >> > > > > > > >> >>
>> >> > > > > > > >> >> As an example, consider an airline seat reservation
>> >> system.
>> >> > > > Each
>> >> > > > > > > >> >> reservation command issued by a user is rejected if
>> the
>> >> > seat
>> >> > > > has
>> >> > > > > > > already
>> >> > > > > > > >> >> been taken. If the seat is available, a record
>> describing
>> >> > the
>> >> > > > > event
>> >> > > > > > > is
>> >> > > > > > > >> >> appended to the log. This works great when there's
>> only
>> >> one
>> >> > > > > > producer,
>> >> > > > > > > >> but
>> >> > > > > > > >> >> in order to scale I may need multiple producer
>> processes.
>> >> > > This
>> >> > > > > > > >> introduces a
>> >> > > > > > > >> >> race condition: two command handlers may
>> simultaneously
>> >> > > > receive a
>> >> > > > > > > >> command
>> >> > > > > > > >> >> to reserver the same seat. The event log indicates
>> that
>> >> the
>> >> > > > seat
>> >> > > > > is
>> >> > > > > > > >> >> available, so each handler will append a reservation
>> >> event
>> >> > –
>> >> > > > thus
>> >> > > > > > > >> >> double-booking that seat!
>> >> > > > > > > >> >>
>> >> > > > > > > >> >> I see three ways around that issue:
>> >> > > > > > > >> >> 1. Don't use Kafka for this.
>> >> > > > > > > >> >> 2. Force a singler producer for a given flight. This
>> will
>> >> > > > impact
>> >> > > > > > > >> >> availability and make routing more complex.
>> >> > > > > > > >> >> 3. Have a way to do optimistic locking in Kafka.
>> >> > > > > > > >> >>
>> >> > > > > > > >> >> The latter idea would work either on a per-key basis
>> or
>> >> > > > globally
>> >> > > > > > for
>> >> > > > > > > a
>> >> > > > > > > >> >> partition: when appending to a partition, the
>> producer
>> >> > would
>> >> > > > > > > indicate in
>> >> > > > > > > >> >> its request that the request should be rejected
>> unless
>> >> the
>> >> > > > > current
>> >> > > > > > > >> offset
>> >> > > > > > > >> >> of the partition is equal to x. For the per-key
>> setup,
>> >> > Kafka
>> >> > > > > > brokers
>> >> > > > > > > >> would
>> >> > > > > > > >> >> track the offset of the latest message for each
>> unique
>> >> key,
>> >> > > if
>> >> > > > so
>> >> > > > > > > >> >> configured. This would allow the request to specify
>> that
>> >> it
>> >> > > > > should
>> >> > > > > > be
>> >> > > > > > > >> >> rejected if the offset for key k is not equal to x.
>> >> > > > > > > >> >>
>> >> > > > > > > >> >> This way, only one of the command handlers would
>> succeed
>> >> in
>> >> > > > > writing
>> >> > > > > > > to
>> >> > > > > > > >> >> Kafka, thus ensuring consistency.
>> >> > > > > > > >> >>
>> >> > > > > > > >> >> There are different levels of complexity associated
>> with
>> >> > > > > > implementing
>> >> > > > > > > >> this
>> >> > > > > > > >> >> in Kafka depending on whether the feature would work
>> >> > > > > per-partition
>> >> > > > > > or
>> >> > > > > > > >> >> per-key:
>> >> > > > > > > >> >> * For the per-partition optimistic locking, the
>> broker
>> >> > would
>> >> > > > just
>> >> > > > > > > need
>> >> > > > > > > >> to
>> >> > > > > > > >> >> keep track of the high water mark for each partition
>> and
>> >> > > reject
>> >> > > > > > > >> conditional
>> >> > > > > > > >> >> requests when the offset doesn't match.
>> >> > > > > > > >> >> * For per-key locking, the broker would need to
>> maintain
>> >> an
>> >> > > > > > in-memory
>> >> > > > > > > >> table
>> >> > > > > > > >> >> mapping keys to the offset of the last message with
>> that
>> >> > key.
>> >> > > > > This
>> >> > > > > > > >> should
>> >> > > > > > > >> >> be fairly easy to maintain and recreate from the log
>> if
>> >> > > > > necessary.
>> >> > > > > > It
>> >> > > > > > > >> could
>> >> > > > > > > >> >> also be saved to disk as a snapshot from time to
>> time in
>> >> > > order
>> >> > > > to
>> >> > > > > > cut
>> >> > > > > > > >> down
>> >> > > > > > > >> >> the time needed to recreate the table on restart.
>> >> There's a
>> >> > > > small
>> >> > > > > > > >> >> performance penalty associated with this, but it
>> could be
>> >> > > > opt-in
>> >> > > > > > for
>> >> > > > > > > a
>> >> > > > > > > >> >> topic.
>> >> > > > > > > >> >>
>> >> > > > > > > >> >> Am I the only one thinking about using Kafka like
>> this?
>> >> > Would
>> >> > > > > this
>> >> > > > > > > be a
>> >> > > > > > > >> >> nice feature to have?
>> >> > > > > > > >>
>> >> > > > > > >
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > > >
>> >> > > >
>> >> > > > --
>> >> > > > Thanks,
>> >> > > > Ewen
>> >> > > >
>> >> > >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Thanks,
>> >> > Ewen
>> >> >
>> >>
>> >
>> >
>> >
>> > --
>> > Thanks,
>> > Ewen
>>
>

Reply via email to