offset storage as kafka with zookeeper 3.4.6
I am trying to migrate the offset storage to kafka (3 brokers of version 0.8.2.1) using the consumer property offsets.storage=kafka. I noticed that a new topic, __consumer_offsets got created. But nothing is being written to this topic, while the consumer offsets continue to reside on zookeeper. I am using a 3 node zookeeper ensemble (version 3.4.6) and not using the one that comes with kafka. The current config consumer.properties now contains: offsets.storage=kafka dual.commit.enabled=false exclude.internal.topics=false Is it mandatory to use the zookeeper that comes with kafka for offset storage to be migrated to kafka? I tried both the approaches: 1. As listed on slide 34 of http://www.slideshare.net/jjkoshy/offset-management-in-kafka. 2. By deleting the zookeeper data directories and kafka log directories. None of them worked. Thanks Kris
Kafka 0.8.1.1 arbitrarily increased replication factor
Hello, I've run into a weird situation with Kafka 0.8.1.1. I had an operating cluster which I wanted to extend with new brokers. The sequence was as follows: 1. I added the brokers to cluster and ensured that they appeared under /brokers/ids. 2. Ran reassign-partitions tool to redistribute the data and load evenly between all the brokers. There was 2 topics with 200 partitions each, both having replication factor 3. 3. Data transfer between replicas was too slow, so I decided to increase num.replica.fetchers from 1 to 4 to speed up the process. I adjusted brokers configuration and began rolling restart on broker at a time. Over the course of restarts I noticed lots of errors in the logs, such as "topic is in the process of being deleted" (which obviously didn't hold true) and "incorrect LeaderAndIsr received". Had no idea what to do about them, so repeated restart for some brokers. 4. Waited for a while so that replicas caught up 5. Ran preferred-replica-election and finished the process. Observations. When I ran kafka-topics.sh --list during the reassignment, I saw more than 3 replicas for some partitions in the "Replicas" field. I assumed this is expected, since a partition might be assigned to a completely different set of replicas which did not overlap with the original replicas. Bad thing is that this situation have not changed till now. I still see 4-6 replicas in "Replicas" and "ISR" for many partitions, even when kafka-topics.sh --under-replicated does not show anything. What is worse, the kafka-topics.sh --describe shows "Replication factor" changed to 5 for the one topic, and 6 for the other! I wonder how it might happen that replication factor was increased by Kafka in this way. Any idea on how I can get my topics back to replication factor 3 is appreciated.
Re: If you run Kafka in AWS or Docker, how do you persist data?
@Jeff Schroeder: I’m trying to do a similar thing: running Kafka brokers in docker under marathon under Mesos. I’m wondering if you're able to do a rolling deploy in a way that Marathon waits for the old broker instances to replicate the data to the new ones before killing them? Best, Domen
Re: offset storage as kafka with zookeeper 3.4.6
> Is it mandatory to use the zookeeper that comes with kafka for offset > storage to be migrated to kafka? If you want to "move" offsets from zookeeper to Kafka then yes you need to have a phase where all consumers in your group set dual commit to true. If you are starting a fresh consumer group then you can turn off dual-commit. > But nothing is being written to this topic, while the consumer offsets > continue to reside on zookeeper. The zookeeper offsets won't be removed. However, are they changing? How are you verifying that nothing is written to this topic? If you are trying to consume it, then you will need to set exclude.internal.topics=false in your consumer properties. You can also check consumer mbeans that give the KafkaCommitRate or enable trace logging in either the consumer or the broker's request logs to check if offset commit request are getting sent out to the cluster. On Thu, Jun 11, 2015 at 01:03:09AM -0700, Kris K wrote: > I am trying to migrate the offset storage to kafka (3 brokers of version > 0.8.2.1) using the consumer property offsets.storage=kafka. I noticed that > a new topic, __consumer_offsets got created. > But nothing is being written to this topic, while the consumer offsets > continue to reside on zookeeper. > > I am using a 3 node zookeeper ensemble (version 3.4.6) and not using the > one that comes with kafka. > > The current config consumer.properties now contains: > > offsets.storage=kafka > dual.commit.enabled=false > exclude.internal.topics=false > > Is it mandatory to use the zookeeper that comes with kafka for offset > storage to be migrated to kafka? > > I tried both the approaches: > > 1. As listed on slide 34 of > http://www.slideshare.net/jjkoshy/offset-management-in-kafka. > 2. By deleting the zookeeper data directories and kafka log directories. > > None of them worked. > > Thanks > Kris
Re: How to prevent custom Partitioner from increasing the number of producer's requests?
Thanks Jiangjie. The new producer is exactly what I was looking for and it works perfectly in production. It should be more documented on the official site. Jason : you're right, I missed a point with my AtomicIntegers. Regards, Sebastien 2015-06-04 20:02 GMT+02:00 Jason Rosenberg : > Sebastien, I think you may have an off by 1 error (e.g. batch should be > 0-199, not 1-200). Thus you are sending 2 batches each time (one for 0, > another for 1-199). > > Jason > > On Thu, Jun 4, 2015 at 1:33 PM, Jiangjie Qin > wrote: > > > From the code you pasted, that is old producer. > > The new producer class is > org.apache.kafka.clients.producer.KafkaProducer. > > > > The new producer does not have sticky partition behavior. The default > > partitioner use round-robin like way to send non-keyed messages to > > partitions. > > > > Jiangjie (Becket) Qin > > > > On 6/3/15, 11:35 PM, "Sebastien Falquier" > > wrote: > > > > >I am using this code (from "org.apache.kafka" % "kafka_2.10" % > "0.8.2.0"), > > >no idea if it is the old producer or the new one > > > > > >import kafka.producer.Produced > > >import kafka.producer.ProducerConfig > > >val prodConfig : ProducerConfig = new ProducerConfig(properties) > > >val producer : Producer[Integer,String] = new > > >Producer[Integer,String](prodConfig) > > > > > >How can I know which producer I am using? And what is the behavior of > the > > >new producer? > > > > > >Thanks, > > >Sébastien > > > > > > > > >2015-06-03 20:04 GMT+02:00 Jiangjie Qin : > > > > > >> > > >> Are you using new producer or old producer? > > >> The old producer has 10 min sticky partition behavior while the new > > >> producer does not. > > >> > > >> Thanks, > > >> > > >> Jiangjie (Becket) Qin > > >> > > >> On 6/2/15, 11:58 PM, "Sebastien Falquier" < > sebastien.falqu...@teads.tv> > > >> wrote: > > >> > > >> >Hi Jason, > > >> > > > >> >The default partitioner does not make the job since my producers > > >>haven't a > > >> >smooth traffic. What I mean is that they can deliver lots of messages > > >> >during 10 minutes and less during the next 10 minutes, that is too > say > > >>the > > >> >first partition will have stacked most of the messages of the last 20 > > >> >minutes. > > >> > > > >> >By the way, I don't understand your point about breaking batch into 2 > > >> >separate partitions. With that code, I jump to a new partition on > > >>message > > >> >201, 401, 601, ... with batch size = 200, where is my mistake? > > >> > > > >> >Thanks for your help, > > >> >Sébastien > > >> > > > >> >2015-06-02 16:55 GMT+02:00 Jason Rosenberg : > > >> > > > >> >> Hi Sebastien, > > >> >> > > >> >> You might just try using the default partitioner (which is random). > > >>It > > >> >> works by choosing a random partition each time it re-polls the > > >>meta-data > > >> >> for the topic. By default, this happens every 10 minutes for each > > >>topic > > >> >> you produce to (so it evenly distributes load at a granularity of > 10 > > >> >> minutes). This is based on 'topic.metadata.refresh.interval.ms'. > > >> >> > > >> >> I suspect your code is causing double requests for each batch, if > > >>your > > >> >> partitioning is actually breaking up your batches into 2 separate > > >> >> partitions. Could be an off by 1 error, with your modulo > > >>calculation? > > >> >> Perhaps you need to use '% 0' instead of '% 1' there? > > >> >> > > >> >> Jason > > >> >> > > >> >> > > >> >> > > >> >> On Tue, Jun 2, 2015 at 3:35 AM, Sebastien Falquier < > > >> >> sebastien.falqu...@teads.tv> wrote: > > >> >> > > >> >> > Hi guys, > > >> >> > > > >> >> > I am new to Kafka and I am facing a problem I am not able to sort > > >>out. > > >> >> > > > >> >> > To smooth traffic over all my brokers' partitions, I have coded a > > >> >>custom > > >> >> > Paritioner for my producers, using a simple round robin algorithm > > >>that > > >> >> > jumps from a partition to another on every batch of messages > > >> >> (corresponding > > >> >> > to batch.num.messages value). It looks like that : > > >> >> > https://gist.github.com/sfalquier/4c0c7f36dd96d642b416 > > >> >> > > > >> >> > With that fix, every partitions are used equally, but the amount > of > > >> >> > requests from the producers to the brokers have been multiplied > by > > >>2. > > >> >>I > > >> >> do > > >> >> > not understand since all producers are async with > > >> >>batch.num.messages=200 > > >> >> > and the amount of messages processed is still the same as before. > > >>Why > > >> >>do > > >> >> > producers need more requests to do the job? As internal traffic > is > > >>a > > >> >>bit > > >> >> > critical on our platform, I would really like to reduce > producers' > > >> >> requests > > >> >> > volume if possible. > > >> >> > > > >> >> > Any idea? Any suggestion? > > >> >> > > > >> >> > Regards, > > >> >> > Sébastien > > >> >> > > > >> >> > > >> > > >> > > > > >
Re: Increased replication factor. Replication didn't happen!
This may help: http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor On Thu, Jun 11, 2015 at 11:20:05AM +0800, Shady Xu wrote: > Right now, Kafka topics do not support changing replication factor or > partition number after creation. The kafka-reassign-partitions.sh tool can > only reassign existent partitions. > > 2015-06-11 9:31 GMT+08:00 Gwen Shapira : > > > What do the logs show? > > > > On Wed, Jun 10, 2015 at 5:07 PM, Dillian Murphey > > wrote: > > > Ran this: > > > > > > $KAFKA_HOME/bin/kafka-reassign-partitions.sh > > > > > > But Kafka did not actually do the replication. Topic description shows > > the > > > right numbers, but it just didn't replicate. > > > > > > What's wrong, and how do I trigger the replication to occur?? > > > > > > I'm running 0.8.2.0 > > > > > > thanks > >
Re: offset storage as kafka with zookeeper 3.4.6
>If you want to "move" offsets from zookeeper to Kafka then yes you >need to have a phase where all consumers in your group set dual commit >to true. If you are starting a fresh consumer group then you can >turn off dual-commit. I followed these steps to move the offsets from zookeeper to kafka: 1. Set dual commit to true, exclude.internal.topics=false and offset storage to kafka on all 3 consumer.properties files 2. Rolling restart on all 3 brokers. All the consumers are high level consumers with auto commit enable set to false 3. Left these settings for about an hour while data kept flowing through some topics (not all) 4. Used the utility ./kafka-console-consumer.sh --topic __consumer_offsets --zookeeper xxx:2181,yyy:2181,zzz:2181 --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" --consumer.config ../config/consumer.properties and found that nothing is written to this topic 5. Changed dual commit to false followed by a rolling restart. All the consumers are high level consumers 6. Zookeeper offsets kept changing while nothing gets written to __consumer_offsets In order to reproduce the issue: 1. Brought down all 3 brokers and 3 nodes of zookeeper 2. Deleted all the contents of snapshot and transaction log directories of zookeeper 3. Added myid files in snapshot directories on all zk nodes with node ids 4. Deleted all the contents of kafka log directories on all 3 brokers 5. Set dual commit to false, exclude.internal.topics=false and offset storage to kafka on all 3 consumer.properties files 6. Brought the environment up. All the consumers are high level consumers with auto commit enable set to false 7. Consumer offsets still got recorded on zookeeper and kept changing while __consumer_offsets was empty When I did a standalone installation with single broker and used the zookeeper that comes with kafka, the offsets got written to __consumer_offsets. This made me ask the question about using zookeeper 3.4.6 against the one the comes with kafka. >You can >also check consumer mbeans that give the KafkaCommitRate or enable >trace logging in either the consumer or the broker's request logs to >check if offset commit request are getting sent out to the cluster. I will check on this Thanks, Kris On Thu, Jun 11, 2015 at 7:45 AM, Joel Koshy wrote: > > Is it mandatory to use the zookeeper that comes with kafka for offset > > storage to be migrated to kafka? > If you want to "move" offsets from zookeeper to Kafka then yes you > need to have a phase where all consumers in your group set dual commit > to true. If you are starting a fresh consumer group then you can > turn off dual-commit. > > > But nothing is being written to this topic, while the consumer offsets > > continue to reside on zookeeper. > > The zookeeper offsets won't be removed. However, are they changing? > How are you verifying that nothing is written to this topic? If you > are trying to consume it, then you will need to set > exclude.internal.topics=false in your consumer properties. You can > also check consumer mbeans that give the KafkaCommitRate or enable > trace logging in either the consumer or the broker's request logs to > check if offset commit request are getting sent out to the cluster. > > On Thu, Jun 11, 2015 at 01:03:09AM -0700, Kris K wrote: > > I am trying to migrate the offset storage to kafka (3 brokers of version > > 0.8.2.1) using the consumer property offsets.storage=kafka. I noticed > that > > a new topic, __consumer_offsets got created. > > But nothing is being written to this topic, while the consumer offsets > > continue to reside on zookeeper. > > > > I am using a 3 node zookeeper ensemble (version 3.4.6) and not using the > > one that comes with kafka. > > > > The current config consumer.properties now contains: > > > > offsets.storage=kafka > > dual.commit.enabled=false > > exclude.internal.topics=false > > > > Is it mandatory to use the zookeeper that comes with kafka for offset > > storage to be migrated to kafka? > > > > I tried both the approaches: > > > > 1. As listed on slide 34 of > > http://www.slideshare.net/jjkoshy/offset-management-in-kafka. > > 2. By deleting the zookeeper data directories and kafka log directories. > > > > None of them worked. > > > > Thanks > > Kris > >
Re: If you run Kafka in AWS or Docker, how do you persist data?
If your running kafka on mesos you should use the framework https://github.com/mesos/kafka as the scheduler is better opinionated than generic launch on marathon. ~ Joestein On Jun 11, 2015 10:26 AM, "Domen Pogacnik" < domen.pogac...@rocket-internet.de> wrote: > @Jeff Schroeder: > > I’m trying to do a similar thing: running Kafka brokers in docker under > marathon under Mesos. I’m wondering if you're able to do a rolling deploy > in > a way that Marathon waits for the old broker instances to replicate the > data to the new ones before killing them? > > Best, > > Domen >
Re: Kafka 0.8.3 - New Consumer - user implemented partition.assignment.strategies?
Thanks Guozhang, I agree that it seems to be a common reassignment strategy that should be one of the pre-defined strategies. Do you have a Jira ticket for this specific case, and/or a Jira ticket to add user defined partitions.assignment.strategies that I can watch? / Olof On 10/06/2015 14:35, "Guozhang Wang" wrote: >Hi Olof, > >Yes we have plans to allow user defined partitions.assignment strategy to >pass in to the consumer coordinator; I am not sure if this feature will >not >be available in the first release of the new consumer in 0.8.3 though. >Currently users still have to choose one from the server-defined strategy >upon registering themselves. > >On the other hand, I think "rebalance with a minimal number of >reassignment" should be quite a common reassignment strategy, and I think >it is possible to just add it into the server-defined strategies list. > >Guozhang > > >On Wed, Jun 10, 2015 at 9:32 AM, Johansson, Olof < >olof.johans...@thingworx.com> wrote: > >> Is it possible to have a consumer rebalancing partition assignment >> strategy that will rebalance with a minimal number of reassignments? >> >> Per the "Kafka 0.9 Consumer Rewrite Design" it should be possible to >> define my own partition assignment strategy: >> "partition.assignment.strategies - may take a comma separated list of >> properties that map the strategy's friendly name to to the class that >> implements the strategy. This is used for any strategy implemented by >>the >> user and released to the Kafka cluster. By default, Kafka will include a >> set of strategies that can be used by the consumer." >> >> Is there a Jira ticket that tracks adding user defined >> partitions.assignment.strategies? In the latest source, range, and >> roundrobin are still the only possible values (hard-coded). >> >> I assume that any user implemented strategy would have to implement the >> PartitionAssignor trait. If so, by naively looking at the 0.8.3 source, >>a >> strategy that should do a minimal number of partition reassignments >>would >> need the ConsumerMetaData. That's not currently available in the >> PartitionAssignor contract - assign(topicsPerConsumer, >>partitionsPerTopic). >> Have there been any discussion to change the contract to pass >> ConsumerMetaData? >> > > > >-- >-- Guozhang
Re: How to manage the consumer group id?
When we need to delete a group, we do it in Zookeeper directly. When we need to roll back offsets, we use the Import/Export tool classes to do it, because it's a little more efficient than working in Zookeeper. You can find the details on the tools at https://cwiki.apache.org/confluence/display/KAFKA/System+Tools I believe that as of right now (and please someone correct me if I'm wrong), the Import/Export tools are not available yet for Kafka-committed offsets. There is a patch for this, and we have a version of it built internally. I think it's waiting for some of the KIP work before it is finalized. -Todd On Wed, Jun 10, 2015 at 4:48 PM, James Cheng wrote: > > > On Jun 10, 2015, at 1:26 PM, Todd Palino wrote: > > > > For us, group ID is a configuration parameter of the application. So we > > store it in configuration files (generally on disk) and maintain it there > > through our configuration and deployment infrastructure. As you pointed > > out, hard coding the group ID into the application is not usually a good > > pattern. > > > > If you want to reset, you have a couple choices. One is that you can just > > switch group names and start fresh. Another is that you can shut down the > > consumer and delete the existing consumer group, then restart. You could > > also stop, edit the offsets to set them to something specific (if you > need > > to roll back to a specific point, for example), and restart. > > > > Thanks Todd. That helps. The "on disk" storage doesn't work well if you > are running consumers in ephemeral nodes like EC2 machines, but in that > case, I guess you would save the group ID in some other data store ("on > disk, but elsewhere") associated with your "application cluster" rather > than any one node of the cluster. > > I often hear about people saving their offsets using the consumer, and > monitoring offsets for lag. I don't hear much about people deleting or > changing/setting offsets by other means. How is it usually done? Are there > tools to change the offsets, or do people go into zookeeper to change them > directly? Or, for broker-stored offsets, use the Kafka APIs? > > -James > > > -Todd > > > > > > On Wed, Jun 10, 2015 at 1:20 PM, James Cheng wrote: > > > >> Hi, > >> > >> How are people specifying/persisting/resetting the consumer group > >> identifier ("group.id") when using the high-level consumer? > >> > >> I understand how it works. I specify some string and all consumers that > >> use that same string will help consume a topic. The partitions will be > >> distributed amongst them for consumption. And when they save their > offsets, > >> the offsets will be saved according to the consumer group. That all > makes > >> sense to me. > >> > >> What I don't understand is the best way to set and persist them, and > reset > >> them if needed. For example, do I simply hardcode the string in my > code? If > >> so, then all deployed instances will have the same value (that's good). > If > >> I want to bring up a test instance of that code, or a new installation, > >> though, then it will also share the load (that's bad). > >> > >> If I pass in a value to my instances, that lets me have different test > and > >> production instances of the same code (that's good), but then I have to > >> persist my consumer group id somewhere outside of the process (on disk, > in > >> zookeeper, etc). Which then means I need some way to manage *that* > >> identifier (that's... just how it is?). > >> > >> What if I decide that I want my app to start over? In the case of > >> log-compacted streams, I want to throw away any processing I did and > start > >> "from the beginning". Do I change my consumer group, which effective > resets > >> everything? Or do I delete my saved offsets, and then resume with the > same > >> consumer group? The latter is functionally equivalent to the former. > >> > >> Thanks, > >> -James > >> > >> > >
Re: Increased replication factor. Replication didn't happen!
Oh, hmm. There is even documentation on it: http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor Having a difficult time sifting through the logs. Is this not a common operation? Do users normally just delete the topic and create a new one?? On Wed, Jun 10, 2015 at 8:20 PM, Shady Xu wrote: > Right now, Kafka topics do not support changing replication factor or > partition number after creation. The kafka-reassign-partitions.sh tool can > only reassign existent partitions. > > 2015-06-11 9:31 GMT+08:00 Gwen Shapira : > > > What do the logs show? > > > > On Wed, Jun 10, 2015 at 5:07 PM, Dillian Murphey > > wrote: > > > Ran this: > > > > > > $KAFKA_HOME/bin/kafka-reassign-partitions.sh > > > > > > But Kafka did not actually do the replication. Topic description shows > > the > > > right numbers, but it just didn't replicate. > > > > > > What's wrong, and how do I trigger the replication to occur?? > > > > > > I'm running 0.8.2.0 > > > > > > thanks > > >
Re: Increased replication factor. Replication didn't happen!
I just tried it out on my 0.8.2 cluster and it worked just fine - the ISR grew, replica factor changed and data was physically moved to new brokers. Was there not output/no logs? I see things like INFO Created log for partition [topicName,7] in /opt/kafka/ with properties {.. some json} in server.log on new brokers. 2015-06-11 21:02 GMT+02:00 Dillian Murphey : > Oh, hmm. There is even documentation on it: > > > http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor > > Having a difficult time sifting through the logs. Is this not a common > operation? Do users normally just delete the topic and create a new one?? > > > > On Wed, Jun 10, 2015 at 8:20 PM, Shady Xu wrote: > > > Right now, Kafka topics do not support changing replication factor or > > partition number after creation. The kafka-reassign-partitions.sh tool > can > > only reassign existent partitions. > > > > 2015-06-11 9:31 GMT+08:00 Gwen Shapira : > > > > > What do the logs show? > > > > > > On Wed, Jun 10, 2015 at 5:07 PM, Dillian Murphey > > > wrote: > > > > Ran this: > > > > > > > > $KAFKA_HOME/bin/kafka-reassign-partitions.sh > > > > > > > > But Kafka did not actually do the replication. Topic description > shows > > > the > > > > right numbers, but it just didn't replicate. > > > > > > > > What's wrong, and how do I trigger the replication to occur?? > > > > > > > > I'm running 0.8.2.0 > > > > > > > > thanks > > > > > >
Re: Kafka as an event store for Event Sourcing
As it happens, I submitted a ticket for this feature a couple days ago: https://issues.apache.org/jira/browse/KAFKA-2260 Couldn't find any existing proposals for similar things, but it's certainly possible they're out there... On the other hand, I think you can solve your particular issue by reframing the problem: treating the messages as 'requests' or 'commands' instead of statements of fact. In your flight-booking example, the log would correctly reflect that two different people tried to book the same flight; the stream consumer would be responsible for finalizing one booking, and notifying the other client that their request had failed. (In-browser or by email.) On Wed, Jun 10, 2015 at 5:04 AM, Daniel Schierbeck wrote: > I've been working on an application which uses Event Sourcing, and I'd like > to use Kafka as opposed to, say, a SQL database to store events. This would > allow me to easily integrate other systems by having them read off the > Kafka topics. > > I do have one concern, though: the consistency of the data can only be > guaranteed if a command handler has a complete picture of all past events > pertaining to some entity. > > As an example, consider an airline seat reservation system. Each > reservation command issued by a user is rejected if the seat has already > been taken. If the seat is available, a record describing the event is > appended to the log. This works great when there's only one producer, but > in order to scale I may need multiple producer processes. This introduces a > race condition: two command handlers may simultaneously receive a command > to reserver the same seat. The event log indicates that the seat is > available, so each handler will append a reservation event – thus > double-booking that seat! > > I see three ways around that issue: > 1. Don't use Kafka for this. > 2. Force a singler producer for a given flight. This will impact > availability and make routing more complex. > 3. Have a way to do optimistic locking in Kafka. > > The latter idea would work either on a per-key basis or globally for a > partition: when appending to a partition, the producer would indicate in > its request that the request should be rejected unless the current offset > of the partition is equal to x. For the per-key setup, Kafka brokers would > track the offset of the latest message for each unique key, if so > configured. This would allow the request to specify that it should be > rejected if the offset for key k is not equal to x. > > This way, only one of the command handlers would succeed in writing to > Kafka, thus ensuring consistency. > > There are different levels of complexity associated with implementing this > in Kafka depending on whether the feature would work per-partition or > per-key: > * For the per-partition optimistic locking, the broker would just need to > keep track of the high water mark for each partition and reject conditional > requests when the offset doesn't match. > * For per-key locking, the broker would need to maintain an in-memory table > mapping keys to the offset of the last message with that key. This should > be fairly easy to maintain and recreate from the log if necessary. It could > also be saved to disk as a snapshot from time to time in order to cut down > the time needed to recreate the table on restart. There's a small > performance penalty associated with this, but it could be opt-in for a > topic. > > Am I the only one thinking about using Kafka like this? Would this be a > nice feature to have?
Re: Kafka as an event store for Event Sourcing
Hi Ben, Thanks for creating the ticket. Having check-and-set capability will be sweet :) Are you planning to implement this yourself? Or is it just an idea for the community? Gwen On Thu, Jun 11, 2015 at 8:01 PM, Ben Kirwin wrote: > As it happens, I submitted a ticket for this feature a couple days ago: > > https://issues.apache.org/jira/browse/KAFKA-2260 > > Couldn't find any existing proposals for similar things, but it's > certainly possible they're out there... > > On the other hand, I think you can solve your particular issue by > reframing the problem: treating the messages as 'requests' or > 'commands' instead of statements of fact. In your flight-booking > example, the log would correctly reflect that two different people > tried to book the same flight; the stream consumer would be > responsible for finalizing one booking, and notifying the other client > that their request had failed. (In-browser or by email.) > > On Wed, Jun 10, 2015 at 5:04 AM, Daniel Schierbeck > wrote: >> I've been working on an application which uses Event Sourcing, and I'd like >> to use Kafka as opposed to, say, a SQL database to store events. This would >> allow me to easily integrate other systems by having them read off the >> Kafka topics. >> >> I do have one concern, though: the consistency of the data can only be >> guaranteed if a command handler has a complete picture of all past events >> pertaining to some entity. >> >> As an example, consider an airline seat reservation system. Each >> reservation command issued by a user is rejected if the seat has already >> been taken. If the seat is available, a record describing the event is >> appended to the log. This works great when there's only one producer, but >> in order to scale I may need multiple producer processes. This introduces a >> race condition: two command handlers may simultaneously receive a command >> to reserver the same seat. The event log indicates that the seat is >> available, so each handler will append a reservation event – thus >> double-booking that seat! >> >> I see three ways around that issue: >> 1. Don't use Kafka for this. >> 2. Force a singler producer for a given flight. This will impact >> availability and make routing more complex. >> 3. Have a way to do optimistic locking in Kafka. >> >> The latter idea would work either on a per-key basis or globally for a >> partition: when appending to a partition, the producer would indicate in >> its request that the request should be rejected unless the current offset >> of the partition is equal to x. For the per-key setup, Kafka brokers would >> track the offset of the latest message for each unique key, if so >> configured. This would allow the request to specify that it should be >> rejected if the offset for key k is not equal to x. >> >> This way, only one of the command handlers would succeed in writing to >> Kafka, thus ensuring consistency. >> >> There are different levels of complexity associated with implementing this >> in Kafka depending on whether the feature would work per-partition or >> per-key: >> * For the per-partition optimistic locking, the broker would just need to >> keep track of the high water mark for each partition and reject conditional >> requests when the offset doesn't match. >> * For per-key locking, the broker would need to maintain an in-memory table >> mapping keys to the offset of the last message with that key. This should >> be fairly easy to maintain and recreate from the log if necessary. It could >> also be saved to disk as a snapshot from time to time in order to cut down >> the time needed to recreate the table on restart. There's a small >> performance penalty associated with this, but it could be opt-in for a >> topic. >> >> Am I the only one thinking about using Kafka like this? Would this be a >> nice feature to have?