offset storage as kafka with zookeeper 3.4.6

2015-06-11 Thread Kris K
I am trying to migrate the offset storage to kafka (3 brokers of version
0.8.2.1) using the consumer property offsets.storage=kafka.  I noticed that
a new topic, __consumer_offsets got created.
But nothing is being written to this topic, while the consumer offsets
continue to reside on zookeeper.

I am using a 3 node zookeeper ensemble (version 3.4.6) and not using the
one that comes with kafka.

The current config consumer.properties now contains:

offsets.storage=kafka
dual.commit.enabled=false
exclude.internal.topics=false

Is it mandatory to use the zookeeper that comes with kafka for offset
storage to be migrated to kafka?

I tried both the approaches:

1. As listed on slide 34 of
http://www.slideshare.net/jjkoshy/offset-management-in-kafka.
2. By deleting the zookeeper data directories and kafka log directories.

None of them worked.

Thanks
Kris


Kafka 0.8.1.1 arbitrarily increased replication factor

2015-06-11 Thread Yury Ruchin
Hello,

I've run into a weird situation with Kafka 0.8.1.1. I had an operating
cluster which I wanted to extend with new brokers. The sequence was as
follows:

1. I added the brokers to cluster and ensured that they appeared under
/brokers/ids.

2. Ran reassign-partitions tool to redistribute the data and load evenly
between all the brokers. There was 2 topics with 200 partitions each, both
having replication factor 3.

3. Data transfer between replicas was too slow, so I decided to increase
num.replica.fetchers from 1 to 4 to speed up the process. I adjusted
brokers configuration and began rolling restart on broker at a time. Over
the course of restarts I noticed lots of errors in the logs, such as "topic
is in the process of being deleted" (which obviously didn't hold true) and
"incorrect LeaderAndIsr received". Had no idea what to do about them, so
repeated restart for some brokers.

4. Waited for a while so that replicas caught up

5. Ran preferred-replica-election and finished the process.

Observations. When I ran kafka-topics.sh --list during the reassignment, I
saw more than 3 replicas for some partitions in the "Replicas" field. I
assumed this is expected, since a partition might be assigned to a
completely different set of replicas which did not overlap with the
original replicas. Bad thing is that this situation have not changed till
now. I still see 4-6 replicas in "Replicas" and "ISR" for many partitions,
even when kafka-topics.sh --under-replicated does not show anything. What
is worse, the kafka-topics.sh --describe shows "Replication factor" changed
to 5 for the one topic, and 6 for the other!

I wonder how it might happen that replication factor was increased by Kafka
in this way. Any idea on how I can get my topics back to replication factor
3 is appreciated.


Re: If you run Kafka in AWS or Docker, how do you persist data?

2015-06-11 Thread Domen Pogacnik
@Jeff Schroeder:

I’m trying to do a similar thing: running Kafka brokers in docker under
marathon under Mesos. I’m wondering if you're able to do a rolling deploy in
a way that Marathon waits for the old broker instances to replicate the
data to the new ones before killing them?

Best,

Domen


Re: offset storage as kafka with zookeeper 3.4.6

2015-06-11 Thread Joel Koshy
> Is it mandatory to use the zookeeper that comes with kafka for offset
> storage to be migrated to kafka?
If you want to "move" offsets from zookeeper to Kafka then yes you
need to have a phase where all consumers in your group set dual commit
to true. If you are starting a fresh consumer group then you can
turn off dual-commit.

> But nothing is being written to this topic, while the consumer offsets
> continue to reside on zookeeper.

The zookeeper offsets won't be removed. However, are they changing?
How are you verifying that nothing is written to this topic? If you
are trying to consume it, then you will need to set
exclude.internal.topics=false in your consumer properties. You can
also check consumer mbeans that give the KafkaCommitRate or enable
trace logging in either the consumer or the broker's request logs to
check if offset commit request are getting sent out to the cluster.

On Thu, Jun 11, 2015 at 01:03:09AM -0700, Kris K wrote:
> I am trying to migrate the offset storage to kafka (3 brokers of version
> 0.8.2.1) using the consumer property offsets.storage=kafka.  I noticed that
> a new topic, __consumer_offsets got created.
> But nothing is being written to this topic, while the consumer offsets
> continue to reside on zookeeper.
> 
> I am using a 3 node zookeeper ensemble (version 3.4.6) and not using the
> one that comes with kafka.
> 
> The current config consumer.properties now contains:
> 
> offsets.storage=kafka
> dual.commit.enabled=false
> exclude.internal.topics=false
> 
> Is it mandatory to use the zookeeper that comes with kafka for offset
> storage to be migrated to kafka?
> 
> I tried both the approaches:
> 
> 1. As listed on slide 34 of
> http://www.slideshare.net/jjkoshy/offset-management-in-kafka.
> 2. By deleting the zookeeper data directories and kafka log directories.
> 
> None of them worked.
> 
> Thanks
> Kris



Re: How to prevent custom Partitioner from increasing the number of producer's requests?

2015-06-11 Thread Sebastien Falquier
Thanks Jiangjie. The new producer is exactly what I was looking for and it
works perfectly in production. It should be more documented on the official
site.

Jason : you're right, I missed a point with my AtomicIntegers.

Regards,
Sebastien

2015-06-04 20:02 GMT+02:00 Jason Rosenberg :

> Sebastien, I think you may have an off by 1 error (e.g. batch should be
> 0-199, not 1-200).  Thus you are sending 2 batches each time (one for 0,
> another for 1-199).
>
> Jason
>
> On Thu, Jun 4, 2015 at 1:33 PM, Jiangjie Qin 
> wrote:
>
> > From the code you pasted, that is old producer.
> > The new producer class is
> org.apache.kafka.clients.producer.KafkaProducer.
> >
> > The new producer does not have sticky partition behavior. The default
> > partitioner use round-robin like way to send non-keyed messages to
> > partitions.
> >
> > Jiangjie (Becket) Qin
> >
> > On 6/3/15, 11:35 PM, "Sebastien Falquier" 
> > wrote:
> >
> > >I am using this code (from "org.apache.kafka" % "kafka_2.10" %
> "0.8.2.0"),
> > >no idea if it is the old producer or the new one
> > >
> > >import kafka.producer.Produced
> > >import kafka.producer.ProducerConfig
> > >val prodConfig : ProducerConfig = new ProducerConfig(properties)
> > >val producer : Producer[Integer,String] = new
> > >Producer[Integer,String](prodConfig)
> > >
> > >How can I know which producer I am using? And what is the behavior of
> the
> > >new producer?
> > >
> > >Thanks,
> > >Sébastien
> > >
> > >
> > >2015-06-03 20:04 GMT+02:00 Jiangjie Qin :
> > >
> > >>
> > >> Are you using new producer or old producer?
> > >> The old producer has 10 min sticky partition behavior while the new
> > >> producer does not.
> > >>
> > >> Thanks,
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >> On 6/2/15, 11:58 PM, "Sebastien Falquier" <
> sebastien.falqu...@teads.tv>
> > >> wrote:
> > >>
> > >> >Hi Jason,
> > >> >
> > >> >The default partitioner does not make the job since my producers
> > >>haven't a
> > >> >smooth traffic. What I mean is that they can deliver lots of messages
> > >> >during 10 minutes and less during the next 10 minutes, that is too
> say
> > >>the
> > >> >first partition will have stacked most of the messages of the last 20
> > >> >minutes.
> > >> >
> > >> >By the way, I don't understand your point about breaking batch into 2
> > >> >separate partitions. With that code, I jump to a new partition on
> > >>message
> > >> >201, 401, 601, ... with batch size = 200, where is my mistake?
> > >> >
> > >> >Thanks for your help,
> > >> >Sébastien
> > >> >
> > >> >2015-06-02 16:55 GMT+02:00 Jason Rosenberg :
> > >> >
> > >> >> Hi Sebastien,
> > >> >>
> > >> >> You might just try using the default partitioner (which is random).
> > >>It
> > >> >> works by choosing a random partition each time it re-polls the
> > >>meta-data
> > >> >> for the topic.  By default, this happens every 10 minutes for each
> > >>topic
> > >> >> you produce to (so it evenly distributes load at a granularity of
> 10
> > >> >> minutes).  This is based on 'topic.metadata.refresh.interval.ms'.
> > >> >>
> > >> >> I suspect your code is causing double requests for each batch, if
> > >>your
> > >> >> partitioning is actually breaking up your batches into 2 separate
> > >> >> partitions.  Could be an off by 1 error, with your modulo
> > >>calculation?
> > >> >> Perhaps you need to use '% 0' instead of '% 1' there?
> > >> >>
> > >> >> Jason
> > >> >>
> > >> >>
> > >> >>
> > >> >> On Tue, Jun 2, 2015 at 3:35 AM, Sebastien Falquier <
> > >> >> sebastien.falqu...@teads.tv> wrote:
> > >> >>
> > >> >> > Hi guys,
> > >> >> >
> > >> >> > I am new to Kafka and I am facing a problem I am not able to sort
> > >>out.
> > >> >> >
> > >> >> > To smooth traffic over all my brokers' partitions, I have coded a
> > >> >>custom
> > >> >> > Paritioner for my producers, using a simple round robin algorithm
> > >>that
> > >> >> > jumps from a partition to another on every batch of messages
> > >> >> (corresponding
> > >> >> > to batch.num.messages value). It looks like that :
> > >> >> > https://gist.github.com/sfalquier/4c0c7f36dd96d642b416
> > >> >> >
> > >> >> > With that fix, every partitions are used equally, but the amount
> of
> > >> >> > requests from the producers to the brokers have been multiplied
> by
> > >>2.
> > >> >>I
> > >> >> do
> > >> >> > not understand since all producers are async with
> > >> >>batch.num.messages=200
> > >> >> > and the amount of messages processed is still the same as before.
> > >>Why
> > >> >>do
> > >> >> > producers need more requests to do the job? As internal traffic
> is
> > >>a
> > >> >>bit
> > >> >> > critical on our platform, I would really like to reduce
> producers'
> > >> >> requests
> > >> >> > volume if possible.
> > >> >> >
> > >> >> > Any idea? Any suggestion?
> > >> >> >
> > >> >> > Regards,
> > >> >> > Sébastien
> > >> >> >
> > >> >>
> > >>
> > >>
> >
> >
>


Re: Increased replication factor. Replication didn't happen!

2015-06-11 Thread Joel Koshy
This may help:
http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor

On Thu, Jun 11, 2015 at 11:20:05AM +0800, Shady Xu wrote:
> Right now, Kafka topics do not support changing replication factor or
> partition number after creation. The  kafka-reassign-partitions.sh tool can
> only reassign existent partitions.
> 
> 2015-06-11 9:31 GMT+08:00 Gwen Shapira :
> 
> > What do the logs show?
> >
> > On Wed, Jun 10, 2015 at 5:07 PM, Dillian Murphey
> >  wrote:
> > > Ran this:
> > >
> > > $KAFKA_HOME/bin/kafka-reassign-partitions.sh
> > >
> > > But Kafka did not actually do the replication. Topic description shows
> > the
> > > right numbers, but it just didn't replicate.
> > >
> > > What's wrong, and how do I trigger the replication to occur??
> > >
> > > I'm running 0.8.2.0
> > >
> > > thanks
> >



Re: offset storage as kafka with zookeeper 3.4.6

2015-06-11 Thread Kris K
>If you want to "move" offsets from zookeeper to Kafka then yes you
>need to have a phase where all consumers in your group set dual commit
>to true. If you are starting a fresh consumer group then you can
>turn off dual-commit.
I followed these steps to move the offsets from zookeeper to kafka:
1. Set dual commit to true, exclude.internal.topics=false and offset
storage to kafka on all 3 consumer.properties files
2. Rolling restart on all 3 brokers. All the consumers are high level
consumers with auto commit enable set to false
3. Left these settings for about an hour while data kept flowing through
some topics (not all)
4. Used the utility ./kafka-console-consumer.sh --topic __consumer_offsets
--zookeeper xxx:2181,yyy:2181,zzz:2181 --formatter
"kafka.server.OffsetManager\$OffsetsMessageFormatter" --consumer.config
../config/consumer.properties and found that nothing is written to this
topic
5. Changed dual commit to false followed by a rolling restart. All the
consumers are high level consumers
6. Zookeeper offsets kept changing while nothing gets written to
__consumer_offsets

In order to reproduce the issue:
1. Brought down all 3 brokers and 3 nodes of zookeeper
2. Deleted all the contents of snapshot and transaction log directories of
zookeeper
3. Added myid files in snapshot directories on all zk nodes with node ids
4. Deleted all the contents of kafka log directories on all 3 brokers
5. Set dual commit to false, exclude.internal.topics=false and offset
storage to kafka on all 3 consumer.properties files
6. Brought the environment up. All the consumers are high level consumers
with auto commit enable set to false
7. Consumer offsets still got recorded on zookeeper and kept changing
while __consumer_offsets
was empty

When I did a standalone installation with single broker and used the
zookeeper that comes with kafka, the offsets got written to __consumer_offsets.
This made me ask the question about using zookeeper 3.4.6 against the one
the comes with kafka.

>You can
>also check consumer mbeans that give the KafkaCommitRate or enable
>trace logging in either the consumer or the broker's request logs to
>check if offset commit request are getting sent out to the cluster.
I will check on this

Thanks,
Kris


On Thu, Jun 11, 2015 at 7:45 AM, Joel Koshy  wrote:

> > Is it mandatory to use the zookeeper that comes with kafka for offset
> > storage to be migrated to kafka?
> If you want to "move" offsets from zookeeper to Kafka then yes you
> need to have a phase where all consumers in your group set dual commit
> to true. If you are starting a fresh consumer group then you can
> turn off dual-commit.
>
> > But nothing is being written to this topic, while the consumer offsets
> > continue to reside on zookeeper.
>
> The zookeeper offsets won't be removed. However, are they changing?
> How are you verifying that nothing is written to this topic? If you
> are trying to consume it, then you will need to set
> exclude.internal.topics=false in your consumer properties. You can
> also check consumer mbeans that give the KafkaCommitRate or enable
> trace logging in either the consumer or the broker's request logs to
> check if offset commit request are getting sent out to the cluster.
>
> On Thu, Jun 11, 2015 at 01:03:09AM -0700, Kris K wrote:
> > I am trying to migrate the offset storage to kafka (3 brokers of version
> > 0.8.2.1) using the consumer property offsets.storage=kafka.  I noticed
> that
> > a new topic, __consumer_offsets got created.
> > But nothing is being written to this topic, while the consumer offsets
> > continue to reside on zookeeper.
> >
> > I am using a 3 node zookeeper ensemble (version 3.4.6) and not using the
> > one that comes with kafka.
> >
> > The current config consumer.properties now contains:
> >
> > offsets.storage=kafka
> > dual.commit.enabled=false
> > exclude.internal.topics=false
> >
> > Is it mandatory to use the zookeeper that comes with kafka for offset
> > storage to be migrated to kafka?
> >
> > I tried both the approaches:
> >
> > 1. As listed on slide 34 of
> > http://www.slideshare.net/jjkoshy/offset-management-in-kafka.
> > 2. By deleting the zookeeper data directories and kafka log directories.
> >
> > None of them worked.
> >
> > Thanks
> > Kris
>
>


Re: If you run Kafka in AWS or Docker, how do you persist data?

2015-06-11 Thread Joe Stein
If your running kafka on mesos you should use the framework
https://github.com/mesos/kafka as the scheduler is better opinionated than
generic launch on marathon.

~ Joestein
On Jun 11, 2015 10:26 AM, "Domen Pogacnik" <
domen.pogac...@rocket-internet.de> wrote:

> @Jeff Schroeder:
>
> I’m trying to do a similar thing: running Kafka brokers in docker under
> marathon under Mesos. I’m wondering if you're able to do a rolling deploy
> in
> a way that Marathon waits for the old broker instances to replicate the
> data to the new ones before killing them?
>
> Best,
>
> Domen
>


Re: Kafka 0.8.3 - New Consumer - user implemented partition.assignment.strategies?

2015-06-11 Thread Johansson, Olof
Thanks Guozhang,

I agree that it seems to be a common reassignment strategy that should be
one of the pre-defined strategies. Do you have a Jira ticket for this
specific case, and/or a Jira ticket to add user defined
partitions.assignment.strategies that I can watch?

/ Olof

On 10/06/2015 14:35, "Guozhang Wang"  wrote:

>Hi Olof,
>
>Yes we have plans to allow user defined partitions.assignment strategy to
>pass in to the consumer coordinator; I am not sure if this feature will
>not
>be available in the first release of the new consumer in 0.8.3 though.
>Currently users still have to choose one from the server-defined strategy
>upon registering themselves.
>
>On the other hand, I think "rebalance with a minimal number of
>reassignment" should be quite a common reassignment strategy, and I think
>it is possible to just add it into the server-defined strategies list.
>
>Guozhang
>
>
>On Wed, Jun 10, 2015 at 9:32 AM, Johansson, Olof <
>olof.johans...@thingworx.com> wrote:
>
>> Is it possible to have a consumer rebalancing partition assignment
>> strategy that will rebalance with a minimal number of reassignments?
>>
>> Per the "Kafka 0.9 Consumer Rewrite Design" it should be possible to
>> define my own partition assignment strategy:
>> "partition.assignment.strategies - may take a comma separated list of
>> properties that map the strategy's friendly name to to the class that
>> implements the strategy. This is used for any strategy implemented by
>>the
>> user and released to the Kafka cluster. By default, Kafka will include a
>> set of strategies that can be used by the consumer."
>>
>> Is there a Jira ticket that tracks adding user defined
>> partitions.assignment.strategies? In the latest source, range, and
>> roundrobin are still the only possible values (hard-coded).
>>
>> I assume that any user implemented strategy would have to implement the
>> PartitionAssignor trait. If so, by naively looking at the 0.8.3 source,
>>a
>> strategy that should do a minimal number of partition reassignments
>>would
>> need the ConsumerMetaData. That's not currently available in the
>> PartitionAssignor contract - assign(topicsPerConsumer,
>>partitionsPerTopic).
>> Have there been any discussion to change the contract to pass
>> ConsumerMetaData?
>>
>
>
>
>-- 
>-- Guozhang



Re: How to manage the consumer group id?

2015-06-11 Thread Todd Palino
When we need to delete a group, we do it in Zookeeper directly. When we
need to roll back offsets, we use the Import/Export tool classes to do it,
because it's a little more efficient than working in Zookeeper. You can
find the details on the tools at
https://cwiki.apache.org/confluence/display/KAFKA/System+Tools

I believe that as of right now (and please someone correct me if I'm
wrong), the Import/Export tools are not available yet for Kafka-committed
offsets. There is a patch for this, and we have a version of it built
internally. I think it's waiting for some of the KIP work before it is
finalized.

-Todd


On Wed, Jun 10, 2015 at 4:48 PM, James Cheng  wrote:

>
> > On Jun 10, 2015, at 1:26 PM, Todd Palino  wrote:
> >
> > For us, group ID is a configuration parameter of the application. So we
> > store it in configuration files (generally on disk) and maintain it there
> > through our configuration and deployment infrastructure. As you pointed
> > out, hard coding the group ID into the application is not usually a good
> > pattern.
> >
> > If you want to reset, you have a couple choices. One is that you can just
> > switch group names and start fresh. Another is that you can shut down the
> > consumer and delete the existing consumer group, then restart. You could
> > also stop, edit the offsets to set them to something specific (if you
> need
> > to roll back to a specific point, for example), and restart.
> >
>
> Thanks Todd. That helps. The "on disk" storage doesn't work well if you
> are running consumers in ephemeral nodes like EC2 machines, but in that
> case, I guess you would save the group ID in some other data store ("on
> disk, but elsewhere") associated with your "application cluster" rather
> than any one node of the cluster.
>
> I often hear about people saving their offsets using the consumer, and
> monitoring offsets for lag. I don't hear much about people deleting or
> changing/setting offsets by other means. How is it usually done? Are there
> tools to change the offsets, or do people go into zookeeper to change them
> directly? Or, for broker-stored offsets, use the Kafka APIs?
>
> -James
>
> > -Todd
> >
> >
> > On Wed, Jun 10, 2015 at 1:20 PM, James Cheng  wrote:
> >
> >> Hi,
> >>
> >> How are people specifying/persisting/resetting the consumer group
> >> identifier ("group.id") when using the high-level consumer?
> >>
> >> I understand how it works. I specify some string and all consumers that
> >> use that same string will help consume a topic. The partitions will be
> >> distributed amongst them for consumption. And when they save their
> offsets,
> >> the offsets will be saved according to the consumer group. That all
> makes
> >> sense to me.
> >>
> >> What I don't understand is the best way to set and persist them, and
> reset
> >> them if needed. For example, do I simply hardcode the string in my
> code? If
> >> so, then all deployed instances will have the same value (that's good).
> If
> >> I want to bring up a test instance of that code, or a new installation,
> >> though, then it will also share the load (that's bad).
> >>
> >> If I pass in a value to my instances, that lets me have different test
> and
> >> production instances of the same code (that's good), but then I have to
> >> persist my consumer group id somewhere outside of the process (on disk,
> in
> >> zookeeper, etc). Which then means I need some way to manage *that*
> >> identifier (that's... just how it is?).
> >>
> >> What if I decide that I want my app to start over? In the case of
> >> log-compacted streams, I want to throw away any processing I did and
> start
> >> "from the beginning". Do I change my consumer group, which effective
> resets
> >> everything? Or do I delete my saved offsets, and then resume with the
> same
> >> consumer group? The latter is functionally equivalent to the former.
> >>
> >> Thanks,
> >> -James
> >>
> >>
>
>


Re: Increased replication factor. Replication didn't happen!

2015-06-11 Thread Dillian Murphey
 Oh, hmm.  There is even documentation on it:

http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor

Having a difficult time sifting through the logs.  Is this not a common
operation?  Do users normally just delete the topic and create a new one??



On Wed, Jun 10, 2015 at 8:20 PM, Shady Xu  wrote:

> Right now, Kafka topics do not support changing replication factor or
> partition number after creation. The  kafka-reassign-partitions.sh tool can
> only reassign existent partitions.
>
> 2015-06-11 9:31 GMT+08:00 Gwen Shapira :
>
> > What do the logs show?
> >
> > On Wed, Jun 10, 2015 at 5:07 PM, Dillian Murphey
> >  wrote:
> > > Ran this:
> > >
> > > $KAFKA_HOME/bin/kafka-reassign-partitions.sh
> > >
> > > But Kafka did not actually do the replication. Topic description shows
> > the
> > > right numbers, but it just didn't replicate.
> > >
> > > What's wrong, and how do I trigger the replication to occur??
> > >
> > > I'm running 0.8.2.0
> > >
> > > thanks
> >
>


Re: Increased replication factor. Replication didn't happen!

2015-06-11 Thread Adam Dubiel
I just tried it out on my 0.8.2 cluster and it worked just fine - the ISR
grew, replica factor changed and data was physically moved to new brokers.
Was there not output/no logs? I see things like

INFO Created log for partition [topicName,7] in /opt/kafka/ with
properties {.. some json}

in server.log on new brokers.

2015-06-11 21:02 GMT+02:00 Dillian Murphey :

>  Oh, hmm.  There is even documentation on it:
>
>
> http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
>
> Having a difficult time sifting through the logs.  Is this not a common
> operation?  Do users normally just delete the topic and create a new one??
>
>
>
> On Wed, Jun 10, 2015 at 8:20 PM, Shady Xu  wrote:
>
> > Right now, Kafka topics do not support changing replication factor or
> > partition number after creation. The  kafka-reassign-partitions.sh tool
> can
> > only reassign existent partitions.
> >
> > 2015-06-11 9:31 GMT+08:00 Gwen Shapira :
> >
> > > What do the logs show?
> > >
> > > On Wed, Jun 10, 2015 at 5:07 PM, Dillian Murphey
> > >  wrote:
> > > > Ran this:
> > > >
> > > > $KAFKA_HOME/bin/kafka-reassign-partitions.sh
> > > >
> > > > But Kafka did not actually do the replication. Topic description
> shows
> > > the
> > > > right numbers, but it just didn't replicate.
> > > >
> > > > What's wrong, and how do I trigger the replication to occur??
> > > >
> > > > I'm running 0.8.2.0
> > > >
> > > > thanks
> > >
> >
>


Re: Kafka as an event store for Event Sourcing

2015-06-11 Thread Ben Kirwin
As it happens, I submitted a ticket for this feature a couple days ago:

https://issues.apache.org/jira/browse/KAFKA-2260

Couldn't find any existing proposals for similar things, but it's
certainly possible they're out there...

On the other hand, I think you can solve your particular issue by
reframing the problem: treating the messages as 'requests' or
'commands' instead of statements of fact. In your flight-booking
example, the log would correctly reflect that two different people
tried to book the same flight; the stream consumer would be
responsible for finalizing one booking, and notifying the other client
that their request had failed. (In-browser or by email.)

On Wed, Jun 10, 2015 at 5:04 AM, Daniel Schierbeck
 wrote:
> I've been working on an application which uses Event Sourcing, and I'd like
> to use Kafka as opposed to, say, a SQL database to store events. This would
> allow me to easily integrate other systems by having them read off the
> Kafka topics.
>
> I do have one concern, though: the consistency of the data can only be
> guaranteed if a command handler has a complete picture of all past events
> pertaining to some entity.
>
> As an example, consider an airline seat reservation system. Each
> reservation command issued by a user is rejected if the seat has already
> been taken. If the seat is available, a record describing the event is
> appended to the log. This works great when there's only one producer, but
> in order to scale I may need multiple producer processes. This introduces a
> race condition: two command handlers may simultaneously receive a command
> to reserver the same seat. The event log indicates that the seat is
> available, so each handler will append a reservation event – thus
> double-booking that seat!
>
> I see three ways around that issue:
> 1. Don't use Kafka for this.
> 2. Force a singler producer for a given flight. This will impact
> availability and make routing more complex.
> 3. Have a way to do optimistic locking in Kafka.
>
> The latter idea would work either on a per-key basis or globally for a
> partition: when appending to a partition, the producer would indicate in
> its request that the request should be rejected unless the current offset
> of the partition is equal to x. For the per-key setup, Kafka brokers would
> track the offset of the latest message for each unique key, if so
> configured. This would allow the request to specify that it should be
> rejected if the offset for key k is not equal to x.
>
> This way, only one of the command handlers would succeed in writing to
> Kafka, thus ensuring consistency.
>
> There are different levels of complexity associated with implementing this
> in Kafka depending on whether the feature would work per-partition or
> per-key:
> * For the per-partition optimistic locking, the broker would just need to
> keep track of the high water mark for each partition and reject conditional
> requests when the offset doesn't match.
> * For per-key locking, the broker would need to maintain an in-memory table
> mapping keys to the offset of the last message with that key. This should
> be fairly easy to maintain and recreate from the log if necessary. It could
> also be saved to disk as a snapshot from time to time in order to cut down
> the time needed to recreate the table on restart. There's a small
> performance penalty associated with this, but it could be opt-in for a
> topic.
>
> Am I the only one thinking about using Kafka like this? Would this be a
> nice feature to have?


Re: Kafka as an event store for Event Sourcing

2015-06-11 Thread Gwen Shapira
Hi Ben,

Thanks for creating the ticket. Having check-and-set capability will be sweet :)
Are you planning to implement this yourself? Or is it just an idea for
the community?

Gwen

On Thu, Jun 11, 2015 at 8:01 PM, Ben Kirwin  wrote:
> As it happens, I submitted a ticket for this feature a couple days ago:
>
> https://issues.apache.org/jira/browse/KAFKA-2260
>
> Couldn't find any existing proposals for similar things, but it's
> certainly possible they're out there...
>
> On the other hand, I think you can solve your particular issue by
> reframing the problem: treating the messages as 'requests' or
> 'commands' instead of statements of fact. In your flight-booking
> example, the log would correctly reflect that two different people
> tried to book the same flight; the stream consumer would be
> responsible for finalizing one booking, and notifying the other client
> that their request had failed. (In-browser or by email.)
>
> On Wed, Jun 10, 2015 at 5:04 AM, Daniel Schierbeck
>  wrote:
>> I've been working on an application which uses Event Sourcing, and I'd like
>> to use Kafka as opposed to, say, a SQL database to store events. This would
>> allow me to easily integrate other systems by having them read off the
>> Kafka topics.
>>
>> I do have one concern, though: the consistency of the data can only be
>> guaranteed if a command handler has a complete picture of all past events
>> pertaining to some entity.
>>
>> As an example, consider an airline seat reservation system. Each
>> reservation command issued by a user is rejected if the seat has already
>> been taken. If the seat is available, a record describing the event is
>> appended to the log. This works great when there's only one producer, but
>> in order to scale I may need multiple producer processes. This introduces a
>> race condition: two command handlers may simultaneously receive a command
>> to reserver the same seat. The event log indicates that the seat is
>> available, so each handler will append a reservation event – thus
>> double-booking that seat!
>>
>> I see three ways around that issue:
>> 1. Don't use Kafka for this.
>> 2. Force a singler producer for a given flight. This will impact
>> availability and make routing more complex.
>> 3. Have a way to do optimistic locking in Kafka.
>>
>> The latter idea would work either on a per-key basis or globally for a
>> partition: when appending to a partition, the producer would indicate in
>> its request that the request should be rejected unless the current offset
>> of the partition is equal to x. For the per-key setup, Kafka brokers would
>> track the offset of the latest message for each unique key, if so
>> configured. This would allow the request to specify that it should be
>> rejected if the offset for key k is not equal to x.
>>
>> This way, only one of the command handlers would succeed in writing to
>> Kafka, thus ensuring consistency.
>>
>> There are different levels of complexity associated with implementing this
>> in Kafka depending on whether the feature would work per-partition or
>> per-key:
>> * For the per-partition optimistic locking, the broker would just need to
>> keep track of the high water mark for each partition and reject conditional
>> requests when the offset doesn't match.
>> * For per-key locking, the broker would need to maintain an in-memory table
>> mapping keys to the offset of the last message with that key. This should
>> be fairly easy to maintain and recreate from the log if necessary. It could
>> also be saved to disk as a snapshot from time to time in order to cut down
>> the time needed to recreate the table on restart. There's a small
>> performance penalty associated with this, but it could be opt-in for a
>> topic.
>>
>> Am I the only one thinking about using Kafka like this? Would this be a
>> nice feature to have?