Re: no partition leader yet for partition 0 (0.8.0)
Hi Jun, No, all 9 brokers are up and when I look at the files in /opt/kafka-[]-logs there is data for partition 0 of that topic on 3 different brokers. After confirming this was still happening this morning, I bounced all the brokers and on restart one of them took over primary on partition 0. No more errors after reboot. However, I now have a different problem. To see if the issue was creating a new topic with all the brokers live, I created a new topic using the same command line as below. The list_topics show it was created with primaries on all partitions. However on one of machines (with 3 brokers running (1,2& 3) ) I keep getting the following warning: [2012-11-28 07:56:46,014] WARN [ReplicaFetcherThread-9-0-on-broker-1], error for test2 2 to broker 9 (kafka.server.ReplicaFetcherThread) kafka.common.UnknownTopicOrPartitionException at sun.reflect.GeneratedConstructorAccessor1.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) at kafka.server.AbstractFetcherThread$$anonfun$doWork$5$$anonfun$apply$3.apply(AbstractFetcherThread.scala:131) at kafka.server.AbstractFetcherThread$$anonfun$doWork$5$$anonfun$apply$3.apply(AbstractFetcherThread.scala:131) at kafka.utils.Logging$class.warn(Logging.scala:88) at kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23) at kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:130) at kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:106) at scala.collection.immutable.Map$Map2.foreach(Map.scala:127) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:106) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50) [2012-11-28 07:56:46,289] WARN [ReplicaFetcherThread-8-0-on-broker-1], error for test2 1 to broker 8 (kafka.server.ReplicaFetcherThread) kafka.common.UnknownTopicOrPartitionException at sun.reflect.GeneratedConstructorAccessor1.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) at kafka.server.AbstractFetcherThread$$anonfun$doWork$5$$anonfun$apply$3.apply(AbstractFetcherThread.scala:131) at kafka.server.AbstractFetcherThread$$anonfun$doWork$5$$anonfun$apply$3.apply(AbstractFetcherThread.scala:131) at kafka.utils.Logging$class.warn(Logging.scala:88) at kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23) at kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:130) at kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:106) at scala.collection.immutable.Map$Map2.foreach(Map.scala:127) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:106) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50) (3 brokers on that machine so I can't easily tell if the errors to the screen are from one or all 3.) The 2nd set of brokers, (4,5,6) don't show these messages. On the 3rd set of brokers (7,8,9) I get a different message: [2012-11-28 07:58:34,180] WARN Replica Manager on Broker 8: While recording the follower position, the partition [test2, 1] hasn't been created, skip updating leader HW (kafka.server.ReplicaManager) [2012-11-28 07:58:34,180] ERROR [KafkaApi-8] error when processing request (test2,1,0,1048576) (kafka.server.KafkaApis) kafka.common.UnknownTopicOrPartitionException: Topic test2 partition 1 doesn't exist on 8 at kafka.server.ReplicaManager.getLeaderReplicaIfLocal(ReplicaManager.scala:163) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:359) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:325) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:321) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.immutable.Map$Map2.foreach(Map.scala:127) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.immutable.Map$Map2.map(M
Re: consumer read process
If you read from offset x last, what information can you get regarding how many messages are left to process? On Wed, Nov 28, 2012 at 10:13 AM, S Ahmed wrote: > Can someone go over how a consumer goes about reading from a broker? > > example: > > 1. connect to zookeeper, get information on the broker that corresponds to > the topic/partition > > Then what does it do, does it register which message_id it is picking up, > or a group of them? >
Re: consumer read process
You can find the information at http://incubator.apache.org/kafka/design.html Look for consumer registration algorithm and consumer rebalancing algorithm. Thanks, Jun On Wed, Nov 28, 2012 at 7:13 AM, S Ahmed wrote: > Can someone go over how a consumer goes about reading from a broker? > > example: > > 1. connect to zookeeper, get information on the broker that corresponds to > the topic/partition > > Then what does it do, does it register which message_id it is picking up, > or a group of them? >
Consumer questions: 0.8.0 vs. 0.7.2
Hi, First, thanks for 0.8.0 I'm really impressed with the redundancy and simplification of the producer and consumer models. I've upgraded my consumers from 0.7.2 to 0.8.0 and have some questions. I am using the Simple Consumer since I need to support replay of messages at request from the client. In 0.7.2 I connected to each broker and requested messages for a specific partition from all of them since data was being spread across all the brokers. I see I don't need this in 0.8.0. Instead I need to know how to find the primary Broker for a topic/partition. Is there an API for this? I looked at the Zookeeper rebalance logic in the regular consumer, but was wondering if that is somewhere that I can just call it instead of duplicating an internal implementation. Before I figured out the 'primary' Broker I was able to connect to the other brokers and ask for a topic/partition/offset. I got nothing back at the client, but on the server there was an error about not being the primary. Is there something on the client I can check to see if the Broker I'm talking to is not the Primary (or no longer if something changes)? Why doesn't the fetch logic return an error in this case? It looks like the meaning of the 'offset' passed to the fetch() method has changed. in 0.7.2 passing an offset returned everything SINCE that offset, with 0.8.0 the FetchRequest is everything STARTING with that offset. Am I correct this changed? (or did I find a bug in my 0.7.2 logic?) Looks like the offset is now an sequential number with no gaps in 0.8.0 vs. a byte offset in the file. Is this offset now guaranteed to be unique/incrementing for a topic/partition, even if the primary changes? So message 1000 on each of the replicas is the same? I used to keep a map of Broker/offset in the client to know what data I'd already seen, but now I THINK I can only keep the one offset. What is the purpose of the 'clientId' in the FetchRequestBuilder? For the SimpleConsumer-based logic can this be anything? Can it be reused by parallel connections, even if to the same topic/partition? Thanks, Chris
Re: no partition leader yet for partition 0 (0.8.0)
Chris, Not sure what happened to the WARN logging that you saw. Is that easily reproducible? As for log4j, you just need to change log4j.properties. You can find out on the web how to configure a rolling log file. Thanks, Jun On Wed, Nov 28, 2012 at 5:10 AM, Chris Curtin wrote: > Hi Jun, > > No, all 9 brokers are up and when I look at the files in /opt/kafka-[]-logs > there is data for partition 0 of that topic on 3 different brokers. > > After confirming this was still happening this morning, I bounced all the > brokers and on restart one of them took over primary on partition 0. No > more errors after reboot. > > However, I now have a different problem. To see if the issue was creating a > new topic with all the brokers live, I created a new topic using the same > command line as below. The list_topics show it was created with primaries > on all partitions. However on one of machines (with 3 brokers running (1,2& > 3) ) I keep getting the following warning: > > [2012-11-28 07:56:46,014] WARN [ReplicaFetcherThread-9-0-on-broker-1], > error for test2 2 to broker 9 (kafka.server.ReplicaFetcherThread) > kafka.common.UnknownTopicOrPartitionException > at sun.reflect.GeneratedConstructorAccessor1.newInstance(Unknown > Source) > at > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) > at java.lang.reflect.Constructor.newInstance(Constructor.java:513) > at java.lang.Class.newInstance0(Class.java:355) > at java.lang.Class.newInstance(Class.java:308) > at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) > at > > kafka.server.AbstractFetcherThread$$anonfun$doWork$5$$anonfun$apply$3.apply(AbstractFetcherThread.scala:131) > at > > kafka.server.AbstractFetcherThread$$anonfun$doWork$5$$anonfun$apply$3.apply(AbstractFetcherThread.scala:131) > at kafka.utils.Logging$class.warn(Logging.scala:88) > at kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23) > at > > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:130) > at > > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:106) > at scala.collection.immutable.Map$Map2.foreach(Map.scala:127) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:106) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50) > [2012-11-28 07:56:46,289] WARN [ReplicaFetcherThread-8-0-on-broker-1], > error for test2 1 to broker 8 (kafka.server.ReplicaFetcherThread) > kafka.common.UnknownTopicOrPartitionException > at sun.reflect.GeneratedConstructorAccessor1.newInstance(Unknown > Source) > at > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) > at java.lang.reflect.Constructor.newInstance(Constructor.java:513) > at java.lang.Class.newInstance0(Class.java:355) > at java.lang.Class.newInstance(Class.java:308) > at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) > at > > kafka.server.AbstractFetcherThread$$anonfun$doWork$5$$anonfun$apply$3.apply(AbstractFetcherThread.scala:131) > at > > kafka.server.AbstractFetcherThread$$anonfun$doWork$5$$anonfun$apply$3.apply(AbstractFetcherThread.scala:131) > at kafka.utils.Logging$class.warn(Logging.scala:88) > at kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23) > at > > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:130) > at > > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:106) > at scala.collection.immutable.Map$Map2.foreach(Map.scala:127) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:106) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50) > > (3 brokers on that machine so I can't easily tell if the errors to the > screen are from one or all 3.) > > The 2nd set of brokers, (4,5,6) don't show these messages. > > On the 3rd set of brokers (7,8,9) I get a different message: > > [2012-11-28 07:58:34,180] WARN Replica Manager on Broker 8: While recording > the follower position, the partition [test2, 1] hasn't been created, skip > updating leader HW (kafka.server.ReplicaManager) > [2012-11-28 07:58:34,180] ERROR [KafkaApi-8] error when processing request > (test2,1,0,1048576) (kafka.server.KafkaApis) > kafka.common.UnknownTopicOrPartitionException: Topic test2 partition 1 > doesn't exist on 8 > at > > kafka.server.ReplicaManager.getLeaderReplicaIfLocal(ReplicaManager.scala:163) > at > > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:359) > at > > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:325) > at > > kafka.se
Re: no partition leader yet for partition 0 (0.8.0)
Hi Jun, Sorry, neither the missing 0 leader or all those WARN messages have been reproducible. Tried several times this morning. I'll be starting from a green-field cluster again this afternoon so I'll keep an eye out for it happening again. Thanks, Chris On Wed, Nov 28, 2012 at 12:08 PM, Jun Rao wrote: > Chris, > > Not sure what happened to the WARN logging that you saw. Is that easily > reproducible? As for log4j, you just need to change log4j.properties. You > can find out on the web how to configure a rolling log file. > > Thanks, > > Jun > > On Wed, Nov 28, 2012 at 5:10 AM, Chris Curtin >wrote: > > > Hi Jun, > > > > No, all 9 brokers are up and when I look at the files in > /opt/kafka-[]-logs > > there is data for partition 0 of that topic on 3 different brokers. > > > > After confirming this was still happening this morning, I bounced all the > > brokers and on restart one of them took over primary on partition 0. No > > more errors after reboot. > > > > However, I now have a different problem. To see if the issue was > creating a > > new topic with all the brokers live, I created a new topic using the same > > command line as below. The list_topics show it was created with primaries > > on all partitions. However on one of machines (with 3 brokers running > (1,2& > > 3) ) I keep getting the following warning: > > > > [2012-11-28 07:56:46,014] WARN [ReplicaFetcherThread-9-0-on-broker-1], > > error for test2 2 to broker 9 (kafka.server.ReplicaFetcherThread) > > kafka.common.UnknownTopicOrPartitionException > > at sun.reflect.GeneratedConstructorAccessor1.newInstance(Unknown > > Source) > > at > > > > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) > > at > java.lang.reflect.Constructor.newInstance(Constructor.java:513) > > at java.lang.Class.newInstance0(Class.java:355) > > at java.lang.Class.newInstance(Class.java:308) > > at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) > > at > > > > > kafka.server.AbstractFetcherThread$$anonfun$doWork$5$$anonfun$apply$3.apply(AbstractFetcherThread.scala:131) > > at > > > > > kafka.server.AbstractFetcherThread$$anonfun$doWork$5$$anonfun$apply$3.apply(AbstractFetcherThread.scala:131) > > at kafka.utils.Logging$class.warn(Logging.scala:88) > > at > kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23) > > at > > > > > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:130) > > at > > > > > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:106) > > at scala.collection.immutable.Map$Map2.foreach(Map.scala:127) > > at > > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:106) > > at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50) > > [2012-11-28 07:56:46,289] WARN [ReplicaFetcherThread-8-0-on-broker-1], > > error for test2 1 to broker 8 (kafka.server.ReplicaFetcherThread) > > kafka.common.UnknownTopicOrPartitionException > > at sun.reflect.GeneratedConstructorAccessor1.newInstance(Unknown > > Source) > > at > > > > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) > > at > java.lang.reflect.Constructor.newInstance(Constructor.java:513) > > at java.lang.Class.newInstance0(Class.java:355) > > at java.lang.Class.newInstance(Class.java:308) > > at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:70) > > at > > > > > kafka.server.AbstractFetcherThread$$anonfun$doWork$5$$anonfun$apply$3.apply(AbstractFetcherThread.scala:131) > > at > > > > > kafka.server.AbstractFetcherThread$$anonfun$doWork$5$$anonfun$apply$3.apply(AbstractFetcherThread.scala:131) > > at kafka.utils.Logging$class.warn(Logging.scala:88) > > at > kafka.utils.ShutdownableThread.warn(ShutdownableThread.scala:23) > > at > > > > > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:130) > > at > > > > > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:106) > > at scala.collection.immutable.Map$Map2.foreach(Map.scala:127) > > at > > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:106) > > at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50) > > > > (3 brokers on that machine so I can't easily tell if the errors to the > > screen are from one or all 3.) > > > > The 2nd set of brokers, (4,5,6) don't show these messages. > > > > On the 3rd set of brokers (7,8,9) I get a different message: > > > > [2012-11-28 07:58:34,180] WARN Replica Manager on Broker 8: While > recording > > the follower position, the partition [test2, 1] hasn't been created, skip > > updating leader HW (kafka.server.ReplicaManager) > > [2
Re: Consumer questions: 0.8.0 vs. 0.7.2
To use SimpleConsumer, you need to send TopicMetadataRequest (available in SimpleConsumer) to figure out the leader of each partition before making the fetch requests. In both 0.7 and 0.8, a fetch request fetches data starting at the provided offset. In 0.8, offset is a sequential and evergrowing number (ie., 1, 2, 3, ...). Thanks, Jun On Wed, Nov 28, 2012 at 8:36 AM, Chris Curtin wrote: > Hi, > > First, thanks for 0.8.0 I'm really impressed with the redundancy > and simplification of the producer and consumer models. > > I've upgraded my consumers from 0.7.2 to 0.8.0 and have some questions. > > I am using the Simple Consumer since I need to support replay of messages > at request from the client. > > In 0.7.2 I connected to each broker and requested messages for a specific > partition from all of them since data was being spread across all the > brokers. I see I don't need this in 0.8.0. Instead I need to know how to > find the primary Broker for a topic/partition. Is there an API for this? I > looked at the Zookeeper rebalance logic in the regular consumer, but was > wondering if that is somewhere that I can just call it instead of > duplicating an internal implementation. > > Before I figured out the 'primary' Broker I was able to connect to the > other brokers and ask for a topic/partition/offset. I got nothing back at > the client, but on the server there was an error about not being the > primary. Is there something on the client I can check to see if the Broker > I'm talking to is not the Primary (or no longer if something changes)? Why > doesn't the fetch logic return an error in this case? > > It looks like the meaning of the 'offset' passed to the fetch() method has > changed. in 0.7.2 passing an offset returned everything SINCE that offset, > with 0.8.0 the FetchRequest is everything STARTING with that offset. Am I > correct this changed? (or did I find a bug in my 0.7.2 logic?) > > Looks like the offset is now an sequential number with no gaps in 0.8.0 vs. > a byte offset in the file. Is this offset now guaranteed to be > unique/incrementing for a topic/partition, even if the primary changes? So > message 1000 on each of the replicas is the same? I used to keep a map of > Broker/offset in the client to know what data I'd already seen, but now I > THINK I can only keep the one offset. > > What is the purpose of the 'clientId' in the FetchRequestBuilder? For the > SimpleConsumer-based logic can this be anything? Can it be reused by > parallel connections, even if to the same topic/partition? > > Thanks, > > Chris >
Re: Consumer questions: 0.8.0 vs. 0.7.2
The offset now begins at 0 and increases sequentially for each partition. The offset is identical across all replicas of that partition on different brokers, but amongst different partitions the offsets are independent (as before). The offset of a committed message is unique within that topic/partition. -Jay On Wed, Nov 28, 2012 at 8:36 AM, Chris Curtin wrote: > Hi, > > First, thanks for 0.8.0 I'm really impressed with the redundancy > and simplification of the producer and consumer models. > > I've upgraded my consumers from 0.7.2 to 0.8.0 and have some questions. > > I am using the Simple Consumer since I need to support replay of messages > at request from the client. > > In 0.7.2 I connected to each broker and requested messages for a specific > partition from all of them since data was being spread across all the > brokers. I see I don't need this in 0.8.0. Instead I need to know how to > find the primary Broker for a topic/partition. Is there an API for this? I > looked at the Zookeeper rebalance logic in the regular consumer, but was > wondering if that is somewhere that I can just call it instead of > duplicating an internal implementation. > > Before I figured out the 'primary' Broker I was able to connect to the > other brokers and ask for a topic/partition/offset. I got nothing back at > the client, but on the server there was an error about not being the > primary. Is there something on the client I can check to see if the Broker > I'm talking to is not the Primary (or no longer if something changes)? Why > doesn't the fetch logic return an error in this case? > > It looks like the meaning of the 'offset' passed to the fetch() method has > changed. in 0.7.2 passing an offset returned everything SINCE that offset, > with 0.8.0 the FetchRequest is everything STARTING with that offset. Am I > correct this changed? (or did I find a bug in my 0.7.2 logic?) > > Looks like the offset is now an sequential number with no gaps in 0.8.0 vs. > a byte offset in the file. Is this offset now guaranteed to be > unique/incrementing for a topic/partition, even if the primary changes? So > message 1000 on each of the replicas is the same? I used to keep a map of > Broker/offset in the client to know what data I'd already seen, but now I > THINK I can only keep the one offset. > > What is the purpose of the 'clientId' in the FetchRequestBuilder? For the > SimpleConsumer-based logic can this be anything? Can it be reused by > parallel connections, even if to the same topic/partition? > > Thanks, > > Chris >
Kafka svn repo removed from incubator
Hi, It seems that Kafka svn repository suddenly disappeared today. We officially graduated from incubator last week, but haven't filed infra tickets to move our repository yet. Is our repository automatically moved to somewhere? svn ls https://svn.apache.org/repos/asf/incubator/kafka svn: URL 'https://svn.apache.org/repos/asf/incubator/kafka' non-existent in that revision Thanks, Jun
new location of Kafka repo
It seems that Apache infra already moved Kafka repo to the top level. The following is the new svn location for the 0.8 branch. https://svn.apache.org/repos/asf/kafka/branches/0.8 Thanks, Jun
Re: Consumer questions: 0.8.0 vs. 0.7.2
The clientid is used to identify a particular client application. This is used by the server side request logging to identify the client sending a particular request. The clientid is also used to give meaningful names to the mbeans for producer/consumer clients. Also, there are 2 ways to send the topic metadata request. One way is how SimpleConsumerShell.scala uses ClientUtils.fetchTopicMetadata(). Another way is by using the send() API on SyncProducer. Thanks, Neha On Wed, Nov 28, 2012 at 10:12 AM, Jay Kreps wrote: > The offset now begins at 0 and increases sequentially for each partition. > The offset is identical across all replicas of that partition on different > brokers, but amongst different partitions the offsets are independent (as > before). The offset of a committed message is unique within that > topic/partition. > > -Jay > > > On Wed, Nov 28, 2012 at 8:36 AM, Chris Curtin wrote: > >> Hi, >> >> First, thanks for 0.8.0 I'm really impressed with the redundancy >> and simplification of the producer and consumer models. >> >> I've upgraded my consumers from 0.7.2 to 0.8.0 and have some questions. >> >> I am using the Simple Consumer since I need to support replay of messages >> at request from the client. >> >> In 0.7.2 I connected to each broker and requested messages for a specific >> partition from all of them since data was being spread across all the >> brokers. I see I don't need this in 0.8.0. Instead I need to know how to >> find the primary Broker for a topic/partition. Is there an API for this? I >> looked at the Zookeeper rebalance logic in the regular consumer, but was >> wondering if that is somewhere that I can just call it instead of >> duplicating an internal implementation. >> >> Before I figured out the 'primary' Broker I was able to connect to the >> other brokers and ask for a topic/partition/offset. I got nothing back at >> the client, but on the server there was an error about not being the >> primary. Is there something on the client I can check to see if the Broker >> I'm talking to is not the Primary (or no longer if something changes)? Why >> doesn't the fetch logic return an error in this case? >> >> It looks like the meaning of the 'offset' passed to the fetch() method has >> changed. in 0.7.2 passing an offset returned everything SINCE that offset, >> with 0.8.0 the FetchRequest is everything STARTING with that offset. Am I >> correct this changed? (or did I find a bug in my 0.7.2 logic?) >> >> Looks like the offset is now an sequential number with no gaps in 0.8.0 vs. >> a byte offset in the file. Is this offset now guaranteed to be >> unique/incrementing for a topic/partition, even if the primary changes? So >> message 1000 on each of the replicas is the same? I used to keep a map of >> Broker/offset in the client to know what data I'd already seen, but now I >> THINK I can only keep the one offset. >> >> What is the purpose of the 'clientId' in the FetchRequestBuilder? For the >> SimpleConsumer-based logic can this be anything? Can it be reused by >> parallel connections, even if to the same topic/partition? >> >> Thanks, >> >> Chris >>
Unexpected end of ZLIB input stream
Hi. In the kafka broker (version 0.7.0) log I see occasionally following error message FATAL Halting due to unrecoverable I/O error while handling producer request: Unexpected end of ZLIB input stream (kafka.server.KafkaRequestHandlers) java.io.EOFException: Unexpected end of ZLIB input stream at java.util.zip.InflaterInputStream.fill(InflaterInputStream.java:223) at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:141) at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:90) at java.io.FilterInputStream.read(FilterInputStream.java:90) at kafka.message.CompressionUtils$$anonfun$decompress$4.apply$mcI$sp(CompressionUtils.scala:123) at kafka.message.CompressionUtils$$anonfun$decompress$4.apply(CompressionUtils.scala:123) at kafka.message.CompressionUtils$$anonfun$decompress$4.apply(CompressionUtils.scala:123) at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:598) at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:598) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:555) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:549) at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:394) at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:394) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:555) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:549) at scala.collection.immutable.Stream.foreach(Stream.scala:255) at kafka.message.CompressionUtils$.decompress(CompressionUtils.scala:123) at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:124) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) at kafka.message.MessageSet.foreach(MessageSet.scala:87) at kafka.log.Log.append(Log.scala:202) at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:75) at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:68) at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:68) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:68) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:46) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:46) at kafka.network.Processor.handle(SocketServer.scala:289) at kafka.network.Processor.read(SocketServer.scala:312) at kafka.network.Processor.run(SocketServer.scala:207) at java.lang.Thread.run(Thread.java:662) At which point broker actually dies. Shouldn't it keep working even even if there is some such error? Also, does anyone else saw this error? And is it fixed in the newer versions? -- Dmitri Priimak
Zookeeper Configuration Question
All, We're having an issue with Zookeeper, which has nothing to do with Kafka, but my consumers don't appear to be attempting to connect with the two nodes that are up. I specify my zk.connect as such: host1:2181,host2:2181,host3:2181. Is this correct? Should this work? I didn't see anything in the Quick Start to contradict this, so I'm a bit confused. Thanks.
Kafka partitions and physical LB
When using a physical LB on the producer side for Kafka, is it possible to have more than one partition per topic in each broker? Thanks, Riju
Re: Kafka partitions and physical LB
In 0.7, the number of partitions is a per broker config and in 0.8, the number of partitions is a per topic config. In both cases, it is does not change whether or not you use a physical load balancer or not. Thanks, Neha On Wed, Nov 28, 2012 at 12:08 PM, Riju Kallivalappil wrote: > When using a physical LB on the producer side for Kafka, is it possible to > have more than one partition per topic in each broker? > > Thanks, > Riju
Re: Would this work as a general solution for transactions in Kafka?
If your transaction messages are all in a single data topic then you could perhaps use compressed message set for each transaction- that way you dont need control messages and thus would be atomic (message sets are stored at a single physical offset and delivered the same way to consumers). Not sure if this holds for 0.8 sequential offsets but i saw in another thread they are still delivered to consumers as a whole and then located so i guess i would or not? On Nov 16, 2012 5:45 PM, "Jay Kreps" wrote: > I don't think all messages need to be sequential. You just need to omit > messages from failed transactions in serving fetch requests, and this > requires storage proportional to the number of failed transactions. The > assumption is that failed transactions are very rare (i.e. due to machine > failures) so this should be small. > > WRT client versus server the assumption is that all control messages are > useful to some consumer so reading all of them on the server side should > not be a limitation. > > There are a number of things not worked out here so I wouldn't take it to > seriously I just wanted to throw out the thought experiment because to > really be useful I do think it is necessary to allow multiple producers and > move any complex logic to the server side. > > -Jay > > > On Fri, Nov 16, 2012 at 8:46 AM, Tom Brown wrote: > > > Jay, > > > > I'm not sure how you're going to get around the issue of a single > > producer per partition. For efficient reads, all of the messages from > > a single transaction have to be sequential, and that only happens if > > either a) the messages are all written atomically (perhaps from > > memory, or temporary storage, etc), or b) all messages come from a > > single producer. > > > > If you use a single (internal) control partition for all topics the > > server would need to read and ignore irrelevant transaction records > > from topics the consumer isn't interested in. Also, you would not be > > able to effectively delete a single partition (though that may only be > > valuable for developers). That said, the simplicity of a single > > control partition may outweigh those problems. > > > > --Tom > > > > On Thu, Nov 15, 2012 at 6:24 PM, Jay Kreps wrote: > > > Hey Tom, > > > > > > Yes, this is very similar to what I had in mind. > > > > > > The primary difference is that I want to implement the control on the > > > server-side. That is, rather than having the consumer be smart and use > > the > > > control topic directly it would be preferable to have the server handle > > > this. This way it would be easy to carry this logic across consumers > in a > > > variety of languages. The implementation would be that we add a new > > > parameter to the fetch request read_committed={true, false}. If this > > > parameter is set to true then we would not hand out messages until we > had > > > the commit message for the requested offset. The other advantage of > doing > > > this on the server side is that I think we could then have only a > single > > > control/commit topic rather than one per data topic. > > > > > > I think there might also be an alternative to requiring exclusivity on > > the > > > producer side--indeed requiring this makes the feature a lot less > useful. > > > This requires waiting until all offsets in a given range are committed > > > before it can be handed out, though this is more complex. The details > of > > my > > > proposal involved a unique producer id per producer and a generation id > > > that increased on every "rollback". A commit with a higher generation > id > > > for an existing producer id would implicitly roll back everything that > > > producer sent since the last commit. > > > > > > -Jay > > > > > > > > > On Wed, Nov 14, 2012 at 12:12 PM, Tom Brown > > wrote: > > > > > >> Just thought of a way to do transactions in Kafka. I think this > > >> solution would cover the most common types of transactions. However, > > >> it's often useful to run an idea by a second set of eyes. I am > > >> interested in knowing where the holes are in this design that I > > >> haven't been able to see. If you're interested in transactional kafka, > > >> please review this and let me know any feedback you have. > > >> > > >> A transactional topic can be approximated by using a second topic as a > > >> control stream. Each message in the control topic would contain the > > >> offset and length (and an optional transaction ID). There is no change > > >> to the messages written to the data topic. The performance impact > > >> would generally be low-- the larger the transaction size, the less the > > >> performance impact would be. > > >> > > >> To write a transaction to the data partition, note the end offset of > > >> the partition in memory. Write all your messages to the partition. > > >> Note the new offset at the end of the partition (to calculate the > > >> length). Write the transaction offset+length into the control > > >> partition. > > >> > > >> To
RE: producer queue size
Hi, Thanks for all the replies. It helped me understand the system. I appreciate it. I tried change the producer properties to async and also changed queue.enqueueTimeout.ms=-1. But I still get the exception. I then changed the producer queue size to 20K and 30K. My hope is by making the queue size bigger, it will buy the producer more time to digest the total amount of messages. But that didn't help in either 20K or 30K instances. I then increased max queue time to 5000ms in hopes that would hold some of the data. But in all these case, I still get the exception. I looked the console producer code and it uses synchronous producer class. In this case, does it make any difference if I change the producer's properties (e.g., to async)? I guess if the answer is no, it would explain what saw the above. Also is there any definition or explanation on the Java API? As I read the doc page on the site, there is none... Thanks Jamie -Original Message- From: Neha Narkhede [mailto:neha.narkh...@gmail.com] Sent: Monday, November 26, 2012 4:40 PM To: kafka-us...@incubator.apache.org Subject: Re: producer queue size This will happen if you push data to the producer at a higher rate than it is able to send to the server. queue.size allows you to configure the size of the producer queue (defaults to 10K). Also, queue.enqueueTimeout.ms, if set to -1 will lead to blocking behaviour instead of the producer throwing QueueFullExceptions. Thanks, Neha On Mon, Nov 26, 2012 at 4:06 PM, Joel Koshy wrote: > To use async, set producer.type to async ; The default queue size is 1; > and the default batch size is 200. > > > On Mon, Nov 26, 2012 at 2:28 PM, Jamie Wang wrote: > >> Hi, I am running the console demo comes with 0.7.2 in getting started >> guide. All is working fine. Then I use stdio redirect a file of 30K lines >> of messages into the producer and I received an error "ERROR Event queue >> is full of unsent messages, could not send event:..." and exception stack >> shows: >> Exception in thread "main" kafka.producer.async.QueueFullException: Event >> queue is full of unsent messages, could not send event:... >> >> Is there a way in the produce.properties to configure the size of producer >> queue? I know the producer is running in synchronous mode. How do I solve >> this problem or my usage is totally wrong? >> >> Thanks >> Jamie >>
Re: Unexpected end of ZLIB input stream
Dmitri, Could you reproduce this easily? Are you using a load balancer? Earlier, another user had the same issue and eventually figured out that the problem is in the network router. Thanks, Jun On Wed, Nov 28, 2012 at 11:34 AM, Dmitri Priimak < prii...@highwire.stanford.edu> wrote: > Hi. > > In the kafka broker (version 0.7.0) log I see occasionally following error > message > > FATAL Halting due to unrecoverable I/O error while handling producer > request: Unexpected end of > ZLIB input stream (kafka.server.KafkaRequestHandlers) > java.io.EOFException: Unexpected end of ZLIB input stream > at > java.util.zip.InflaterInputStream.fill(InflaterInputStream.java:223) > at > java.util.zip.InflaterInputStream.read(InflaterInputStream.java:141) > at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:90) > at java.io.FilterInputStream.read(FilterInputStream.java:90) > at > kafka.message.CompressionUtils$$anonfun$decompress$4.apply$mcI$sp(CompressionUtils.scala:123) > at > kafka.message.CompressionUtils$$anonfun$decompress$4.apply(CompressionUtils.scala:123) > at > kafka.message.CompressionUtils$$anonfun$decompress$4.apply(CompressionUtils.scala:123) > at > scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:598) > at > scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:598) > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:555) > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:549) > at > scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:394) > at > scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:394) > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:555) > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:549) > at scala.collection.immutable.Stream.foreach(Stream.scala:255) > at > kafka.message.CompressionUtils$.decompress(CompressionUtils.scala:123) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:124) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > at kafka.message.MessageSet.foreach(MessageSet.scala:87) > at kafka.log.Log.append(Log.scala:202) > at > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:75) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:68) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:68) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) > at > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:68) > at > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:46) > at > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:46) > at kafka.network.Processor.handle(SocketServer.scala:289) > at kafka.network.Processor.read(SocketServer.scala:312) > at kafka.network.Processor.run(SocketServer.scala:207) > at java.lang.Thread.run(Thread.java:662) > > At which point broker actually dies. Shouldn't it keep working even even > if there is some such error? > Also, does anyone else saw this error? And is it fixed in the newer > versions? > > -- > Dmitri Priimak > >
Re: producer queue size
Is the server busy on I/O? What's log.flush.interval on the broker? For better performance, you need to set it to a few hundreds or more. Thanks, Jun On Wed, Nov 28, 2012 at 7:12 PM, Jamie Wang wrote: > Hi, > > Thanks for all the replies. It helped me understand the system. I > appreciate it. > > I tried change the producer properties to async and also changed > queue.enqueueTimeout.ms=-1. But I still get the exception. I then changed > the producer queue size to 20K and 30K. My hope is by making the queue size > bigger, it will buy the producer more time to digest the total amount of > messages. But that didn't help in either 20K or 30K instances. I then > increased max queue time to 5000ms in hopes that would hold some of the > data. But in all these case, I still get the exception. > > I looked the console producer code and it uses synchronous producer class. > In this case, does it make any difference if I change the producer's > properties (e.g., to async)? I guess if the answer is no, it would explain > what saw the above. > > Also is there any definition or explanation on the Java API? As I read the > doc page on the site, there is none... > > Thanks > Jamie > > > > -Original Message- > From: Neha Narkhede [mailto:neha.narkh...@gmail.com] > Sent: Monday, November 26, 2012 4:40 PM > To: kafka-us...@incubator.apache.org > Subject: Re: producer queue size > > This will happen if you push data to the producer at a higher rate > than it is able to send to the server. > queue.size allows you to configure the size of the producer queue > (defaults to 10K). Also, > queue.enqueueTimeout.ms, if set to -1 will lead to blocking behaviour > instead of the producer throwing > QueueFullExceptions. > > Thanks, > Neha > > On Mon, Nov 26, 2012 at 4:06 PM, Joel Koshy wrote: > > To use async, set producer.type to async ; The default queue size is > 1; > > and the default batch size is 200. > > > > > > On Mon, Nov 26, 2012 at 2:28 PM, Jamie Wang > wrote: > > > >> Hi, I am running the console demo comes with 0.7.2 in getting started > >> guide. All is working fine. Then I use stdio redirect a file of 30K > lines > >> of messages into the producer and I received an error "ERROR Event > queue > >> is full of unsent messages, could not send event:..." and exception > stack > >> shows: > >> Exception in thread "main" kafka.producer.async.QueueFullException: > Event > >> queue is full of unsent messages, could not send event:... > >> > >> Is there a way in the produce.properties to configure the size of > producer > >> queue? I know the producer is running in synchronous mode. How do I > solve > >> this problem or my usage is totally wrong? > >> > >> Thanks > >> Jamie > >> >