0.8 beta release status
We are investigating a metadata related issue when there are a large number of clients (1000+). This issue, if not resolved, may cause the whole cluster to be unavailable. We are testing a fix. Once the issue is resolved, we can start the release process. Thanks, Jun
[jira] [Commented] (KAFKA-897) NullPointerException in ConsoleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13652035#comment-13652035 ] Jun Rao commented on KAFKA-897: --- Thanks for the patch. Actually, I think the wiki is inaccurate. The following is the comment on Message in the code. It seems that we only allow key to be null, but not value (since it's length is always >=0). /** * A message. The format of an N byte message is the following: * * 1. 4 byte CRC32 of the message * 2. 1 byte "magic" identifier to allow format changes, value is 2 currently * 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) * 4. 4 byte key length, containing length K * 5. K byte key * 6. (N - K - 10) byte payload > NullPointerException in ConsoleConsumer > --- > > Key: KAFKA-897 > URL: https://issues.apache.org/jira/browse/KAFKA-897 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.8 >Reporter: Colin B. >Assignee: Neha Narkhede >Priority: Minor > Attachments: Kafka897-v1.patch > > > The protocol document [1] mentions that keys and values in message sets can > be null. However the ConsoleConsumer throws a NPE when a null is passed for > the value. > java.lang.NullPointerException > at kafka.utils.Utils$.readBytes(Utils.scala:141) > at > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:106) > at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25) > at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:195) > at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala) > [1] > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-897) NullPointerException in ConsoleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13652041#comment-13652041 ] Jay Kreps commented on KAFKA-897: - Yup, I think we are confusing the protocol and the code. The protocol allows null values. The code didn't handle this until we added log compaction which is on trunk. In other words this was intentional sequencing so we wouldn't change the protocol again. > NullPointerException in ConsoleConsumer > --- > > Key: KAFKA-897 > URL: https://issues.apache.org/jira/browse/KAFKA-897 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.8 >Reporter: Colin B. >Assignee: Neha Narkhede >Priority: Minor > Attachments: Kafka897-v1.patch > > > The protocol document [1] mentions that keys and values in message sets can > be null. However the ConsoleConsumer throws a NPE when a null is passed for > the value. > java.lang.NullPointerException > at kafka.utils.Utils$.readBytes(Utils.scala:141) > at > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:106) > at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25) > at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:195) > at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala) > [1] > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-897) NullPointerException in ConsoleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13652087#comment-13652087 ] Jun Rao commented on KAFKA-897: --- Ok. So the confusing part is that the comment in the code is inaccurate. We actually explicitly store the size of the value in the binary representation. So we should change 6. in the above comment to the following: * 6. 4 byte payload length, containing length V * 7. V byte payload Colin, It seems that our code only allows null key/value in trunk. So, we can fix this in trunk. It seems that ConsumerIterator is already fixed in trunk. For the changes in DefaultMessageFormatter, could you verify if FilterOutputStream.write() supports null input? If so, we don't need to patch it either. > NullPointerException in ConsoleConsumer > --- > > Key: KAFKA-897 > URL: https://issues.apache.org/jira/browse/KAFKA-897 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.8 >Reporter: Colin B. >Assignee: Neha Narkhede >Priority: Minor > Attachments: Kafka897-v1.patch > > > The protocol document [1] mentions that keys and values in message sets can > be null. However the ConsoleConsumer throws a NPE when a null is passed for > the value. > java.lang.NullPointerException > at kafka.utils.Utils$.readBytes(Utils.scala:141) > at > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:106) > at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25) > at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:195) > at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala) > [1] > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-897) NullPointerException in ConsoleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13652152#comment-13652152 ] Colin B. commented on KAFKA-897: I had been testing on 0.8 and did not notice KAFKA-739 (Handle null values in Message payload) had been commited. I will test the DefaultMessageFormatter with nulls on trunk. I think however that the write calls do not support null. > NullPointerException in ConsoleConsumer > --- > > Key: KAFKA-897 > URL: https://issues.apache.org/jira/browse/KAFKA-897 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.8 >Reporter: Colin B. >Assignee: Neha Narkhede >Priority: Minor > Attachments: Kafka897-v1.patch > > > The protocol document [1] mentions that keys and values in message sets can > be null. However the ConsoleConsumer throws a NPE when a null is passed for > the value. > java.lang.NullPointerException > at kafka.utils.Utils$.readBytes(Utils.scala:141) > at > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:106) > at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25) > at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:195) > at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala) > [1] > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-897) NullPointerException in ConsoleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13652229#comment-13652229 ] Colin B. commented on KAFKA-897: Looks like FilterOutputStream does not handle nulls. java.lang.NullPointerException at java.io.FilterOutputStream.write(FilterOutputStream.java:97) at kafka.consumer.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:288) ... at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:195) Attached is patch v2, which only touches ConsoleConsumer and allows the output of null values as "null". > NullPointerException in ConsoleConsumer > --- > > Key: KAFKA-897 > URL: https://issues.apache.org/jira/browse/KAFKA-897 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.8 >Reporter: Colin B. >Assignee: Neha Narkhede >Priority: Minor > Attachments: Kafka897-v1.patch > > > The protocol document [1] mentions that keys and values in message sets can > be null. However the ConsoleConsumer throws a NPE when a null is passed for > the value. > java.lang.NullPointerException > at kafka.utils.Utils$.readBytes(Utils.scala:141) > at > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:106) > at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25) > at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:195) > at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala) > [1] > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-897) NullPointerException in ConsoleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin B. updated KAFKA-897: --- Attachment: KAFKA-897-v2.patch > NullPointerException in ConsoleConsumer > --- > > Key: KAFKA-897 > URL: https://issues.apache.org/jira/browse/KAFKA-897 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.8 >Reporter: Colin B. >Assignee: Neha Narkhede >Priority: Minor > Attachments: Kafka897-v1.patch, KAFKA-897-v2.patch > > > The protocol document [1] mentions that keys and values in message sets can > be null. However the ConsoleConsumer throws a NPE when a null is passed for > the value. > java.lang.NullPointerException > at kafka.utils.Utils$.readBytes(Utils.scala:141) > at > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:106) > at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:61) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:53) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25) > at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:195) > at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala) > [1] > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-898) Add a KafkaMetricsReporter that wraps Librato's reporter
[ https://issues.apache.org/jira/browse/KAFKA-898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Scott Clasen updated KAFKA-898: --- Status: Patch Available (was: Open) Subject: [PATCH] add optional librato metrics reporter --- config/server.properties | 9 core/build.sbt | 1 + .../metrics/KafkaLibratoMetricsReporter.scala | 26 ++ 3 files changed, 36 insertions(+) create mode 100644 core/src/main/scala/kafka/metrics/KafkaLibratoMetricsReporter.scala diff --git a/config/server.properties b/config/server.properties index bc6a521..a48951c 100644 --- a/config/server.properties +++ b/config/server.properties @@ -104,6 +104,7 @@ log.cleanup.interval.mins=1 # root directory for all kafka znodes. zookeeper.connect=localhost:2181 + # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=100 @@ -114,3 +115,11 @@ kafka.csv.metrics.dir=/tmp/kafka_metrics # Disable csv reporting by default. kafka.csv.metrics.reporter.enabled=false +#kafka.metrics.reporters=kafka.metrics.KafkaLibratoMetricsReporter +#kafka.librato.metrics.reporter.enabled=true +kafka.librato.metrics.reporter.account=y...@librato.account.com +kafka.librato.metrics.reporter.token=somelibratoapikey +kafka.librato.metrics.reporter.prefix=testing123 +kafka.librato.metrics.reporter.source=testingsource +kafka.librato.metrics.reporter.interval.secs=5 + diff --git a/core/build.sbt b/core/build.sbt index 405ea55..e12fa4b 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -16,6 +16,7 @@ libraryDependencies ++= Seq( "org.xerial.snappy" % "snappy-java" % "1.0.4.1", "com.yammer.metrics"% "metrics-core" % "2.2.0", "com.yammer.metrics"% "metrics-annotation" % "2.2.0", + "com.librato.metrics" % "metrics-librato" % "2.2.0.0", "org.easymock" % "easymock"% "3.0" % "test", "junit" % "junit" % "4.1" % "test" ) diff --git a/core/src/main/scala/kafka/metrics/KafkaLibratoMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaLibratoMetricsReporter.scala new file mode 100644 index 000..5070b98 --- /dev/null +++ b/core/src/main/scala/kafka/metrics/KafkaLibratoMetricsReporter.scala @@ -0,0 +1,26 @@ +package kafka.metrics + +import kafka.utils.VerifiableProperties +import com.librato.metrics.{APIUtil, LibratoReporter} +import java.util.concurrent.TimeUnit + + +class KafkaLibratoMetricsReporter extends KafkaMetricsReporter { + def init(props: VerifiableProperties) { +val enabled = props.getBoolean("kafka.librato.metrics.reporter.enabled", false) +if(enabled){ + val account = props.getString("kafka.librato.metrics.reporter.account") + val token = props.getString("kafka.librato.metrics.reporter.token") + val source = props.getString("kafka.librato.metrics.reporter.source") + val prefix = Option(props.getString("kafka.librato.metrics.reporter.prefix", null)) + val interval = props.getInt("kafka.librato.metrics.reporter.interval.secs",30) + var builder = LibratoReporter.builder(account, token, source) + prefix.foreach(p => builder = builder.setSanitizer(new PrefixingSanitizer(p))) + LibratoReporter.enable(builder, interval, TimeUnit.SECONDS) +} + } +} + +class PrefixingSanitizer(prefix:String) extends APIUtil.Sanitizer{ + def apply(name: String): String = prefix + "." + name +} -- 1.8.0.1 > Add a KafkaMetricsReporter that wraps Librato's reporter > > > Key: KAFKA-898 > URL: https://issues.apache.org/jira/browse/KAFKA-898 > Project: Kafka > Issue Type: Improvement >Reporter: Scott Clasen > -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-898) Add a KafkaMetricsReporter that wraps Librato's reporter
Scott Clasen created KAFKA-898: -- Summary: Add a KafkaMetricsReporter that wraps Librato's reporter Key: KAFKA-898 URL: https://issues.apache.org/jira/browse/KAFKA-898 Project: Kafka Issue Type: Improvement Reporter: Scott Clasen -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-881) Kafka broker not respecting log.roll.hours
[ https://issues.apache.org/jira/browse/KAFKA-881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13652447#comment-13652447 ] Dan F commented on KAFKA-881: - We are going to roll this out. Is anyone willing to look at the patch any time soon? It'd be nice to increase our confidence it is the right thing, and that it will show up in future versions. > Kafka broker not respecting log.roll.hours > -- > > Key: KAFKA-881 > URL: https://issues.apache.org/jira/browse/KAFKA-881 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.7.2 >Reporter: Dan F >Assignee: Jay Kreps > Attachments: kafka_roll.patch > > > We are running Kafka 0.7.2. We set log.roll.hours=1. I hoped that meant logs > would be rolled every hour, or more. Only, sometimes logs that are many hours > (sometimes days) old have more data added to them. This perturbs our systems > for reasons I won't get in to. > I don't know Scala or Kafka well, but I have proposal for why this might > happen: upon restart, a broker forgets when its log files have been appended > to ("firstAppendTime"). Then a potentially infinite amount of time later, the > restarted broker receives another message for the particular (topic, > partition), and starts the clock again. It will then roll over that log after > an hour. > https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/server/KafkaConfig.scala > says: > /* the maximum time before a new log segment is rolled out */ > val logRollHours = Utils.getIntInRange(props, "log.roll.hours", 24*7, (1, > Int.MaxValue)) > https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/log/Log.scala > has maybeRoll, which needs segment.firstAppendTime defined. It also has > updateFirstAppendTime() which says if it's empty, then set it. > If my hypothesis is correct about why it is happening, here is a case where > rolling is longer than an hour, even on a high volume topic: > - write to a topic for 20 minutes > - restart the broker > - wait for 5 days > - write to a topic for 20 minutes > - restart the broker > - write to a topic for an hour > The rollover time was now 5 days, 1 hour, 40 minutes. You can make it as long > as you want. > Proposed solution: > The very easiest thing to do would be to have Kafka re-initialized > firstAppendTime with the file creation time. Unfortunately, there is no file > creation time in UNIX. There is ctime, change time, updated when a file's > inode information is changed. > One solution is to embed the firstAppendTime in the filename (say, seconds > since epoch). Then when you open it you could reset firstAppendTime to > exactly what it really was. This ignores clock drift or resetting. One could > set firstAppendTime to min(filename-based time, current time). > A second solution is to make the Kafka log roll over at specific times, > regardless of when the file was created. Conceptually, time can be divided > into windows of size log.rollover.hours since epoch (UNIX time 0, 1970). So, > when firstAppendTime is empty, compute the next rollover time (say, next = > (hours since epoch) % (log.rollover.hours) + log.rollover.hours). If the file > mtime (last modified) is before the current rollover window ( > (next-log.rollover.hours) .. next ), roll it over right away. Otherwise, roll > over when you cross next, and reset next. > A third solution (not perfect, but an approximation at least) would be to not > to write to a segment if firstAppendTime is not defined and the timestamp on > the file is more than log.roll.hours old. > There are probably other solutions. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-294) "Path length must be > 0" error during startup
[ https://issues.apache.org/jira/browse/KAFKA-294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13652744#comment-13652744 ] Jason Rosenberg commented on KAFKA-294: --- This issue happens also in 0.8.0 It would be even better, if the chroot is not present in zk, that it be automatically created, thus avoiding this issue altogether. > "Path length must be > 0" error during startup > -- > > Key: KAFKA-294 > URL: https://issues.apache.org/jira/browse/KAFKA-294 > Project: Kafka > Issue Type: Bug >Reporter: Thomas Dudziak > > When starting Kafka 0.7.0 using zkclient-0.1.jar, I get this error: > INFO 2012-03-06 02:39:04,072 main kafka.server.KafkaZooKeeper Registering > broker /brokers/ids/1 > FATAL 2012-03-06 02:39:04,111 main kafka.server.KafkaServer Fatal error > during startup. > java.lang.IllegalArgumentException: Path length must be > 0 > at > org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48) > at > org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:35) > at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:620) > at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87) > at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308) > at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304) > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) > at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304) > at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:213) > at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223) > at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223) > at kafka.utils.ZkUtils$.createParentPath(ZkUtils.scala:48) > at kafka.utils.ZkUtils$.createEphemeralPath(ZkUtils.scala:60) > at > kafka.utils.ZkUtils$.createEphemeralPathExpectConflict(ZkUtils.scala:72) > at > kafka.server.KafkaZooKeeper.registerBrokerInZk(KafkaZooKeeper.scala:57) > at kafka.log.LogManager.startup(LogManager.scala:124) > at kafka.server.KafkaServer.startup(KafkaServer.scala:80) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:47) > at kafka.Kafka$.main(Kafka.scala:60) > at kafka.Kafka.main(Kafka.scala) > The problem seems to be this code in ZkClient's createPersistent method: > String parentDir = path.substring(0, path.lastIndexOf('/')); > createPersistent(parentDir, createParents); > createPersistent(path, createParents); > which doesn't check for whether parentDir is an empty string, which it will > become for /brokers/ids/1 after two recursions. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-899) LeaderNotAvailableException the first time a new message for a partition is processed.
Jason Rosenberg created KAFKA-899: - Summary: LeaderNotAvailableException the first time a new message for a partition is processed. Key: KAFKA-899 URL: https://issues.apache.org/jira/browse/KAFKA-899 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: Jason Rosenberg I'm porting some unit tests from 0.7.2 to 0.8.0. The test does the following, all embedded in the same java process: -- spins up a zk instance -- spins up a kafka server using a fresh log directory -- creates a producer and sends a message -- creates a high-level consumer and verifies that it can consume the message -- shuts down the consumer -- stops the kafka server -- stops zk The test seems to be working fine now, however, I consistently see the following exceptions (which from poking around the mailing list seem to be expected?). If these are expected, can we suppress the logging of these exceptions, since it clutters the output of tests, and presumably, clutters the logs of the running server/consumers, during clean startup and shutdown.. When I call producer.send(), I get: 1071 [main] WARN kafka.producer.BrokerPartitionInfo - Error while fetching metadatapartition 0 leader: nonereplicas: isr: isUnderReplicated: false for topic partition [test-topic,0]: [class kafka.common.LeaderNotAvailableException] 1081 [main] WARN kafka.producer.async.DefaultEventHandler - Failed to collate messages by topic,partition due to kafka.common.LeaderNotAvailableException: No leader for any partition at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartition(DefaultEventHandler.scala:212) at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:148) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:148) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:94) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) at kafka.producer.Producer.send(Producer.scala:74) at kafka.javaapi.producer.Producer.send(Producer.scala:32) at com.squareup.kafka.server.KafkaServerTest.produceAndConsumeMessage(KafkaServerTest.java:98) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222) at org.junit.runners.ParentRunner.run(ParentRunner.java:292) at org.junit.runner.JUnitCore.run(JUnitCore.java:157) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:77) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:195) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:63) 1133 [kafka-request-handler-1] WARN kafka.server.HighwaterMarkCheckpoint - No highwatermark file is found. Returning 0 as the highwatermark for partition [test-topic,0] ... ... It would be great if instead of this exception, it would just log a meaningful message, like: "No leader was available for partition X, one will now be created" Jason -- This message is auto
[jira] [Created] (KAFKA-900) ClosedByInterruptException when high-level consumer shutdown normally
Jason Rosenberg created KAFKA-900: - Summary: ClosedByInterruptException when high-level consumer shutdown normally Key: KAFKA-900 URL: https://issues.apache.org/jira/browse/KAFKA-900 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8 Reporter: Jason Rosenberg Assignee: Neha Narkhede I'm porting some unit tests from 0.7.2 to 0.8.0. The test does the following, all embedded in the same java process: -- spins up a zk instance -- spins up a kafka server using a fresh log directory -- creates a producer and sends a message -- creates a high-level consumer and verifies that it can consume the message -- shuts down the consumer -- stops the kafka server -- stops zk The test seems to be working fine now, however, I consistently see the following exception, when the consumer connector is shutdown: 1699 [ConsumerFetcherThread-group1_square-1a7ac0.local-1368076598439-d66bb2eb-0-1946108683] WARN kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-group1_square-1a7ac0.local-1368076598439-d66bb2eb-0-1946108683], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1; ClientId: group1-ConsumerFetcherThread-group1_square-1a7ac0.local-1368076598439-d66bb2eb-0-1946108683; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [test-topic,0] -> PartitionFetchInfo(1,1048576) java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:543) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:47) at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:60) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:73) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) 1721 [Thread-12] INFO com.squareup.kafka.server.KafkaServer - Shutting down KafkaServer 2030 [main] INFO com.squareup.kafka.server.KafkaServer - Shut down complete for KafkaServer Disconnected from the target VM, address: '127.0.0.1:49243', transport: 'socket' It would be great if instead, something meaningful was logged, like: "Consumer connector has been shutdown" -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-865) Mavenize and separate the client.
[ https://issues.apache.org/jira/browse/KAFKA-865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashwanth Fernando updated KAFKA-865: Description: It seems that the java client for Kafka is also bundled with the server JAR file and this is generated using sbt package. This is difficult for java folks to work with because: 1) Many java shops use maven and they want to specify the GAV of kafka in their pom and bang, the client jar and all its dependencies should be added to the application's classpath. I can't do that right now, because I need to run ./sbt eclipse, get the .JAR, add that to my classpath, add a whole lot of dependencies (log4j, slf4j, zkClient and so on) manually, which is a pain. There are 90 million maven central uploads/downloads in 2012 alone. Almost all the java shops out there have maven (either central or in house sonatype). 2) Separation of concerns - keeping the server (core) and the client's classes together in same jar file, increases the size of the bundle for a client and also everytime the server's code changes and a release is performed, the client also needs to update their .JAR file. which is not very great. We don't want a ton of clients to update their .JAR file, just because a faster replication strategy for the kafka server cluster changed in a new release. Action items are to separate the client and server portions of Kafka, add it in a pom along with the compile time dependencies and upload it to Maven Central or if you have a LinkedIn externally exposed Nexus, over there. This will increase adoption of the Kafka framework. was: It seems that the java client for Kafka is also bundled with the server JAR file and this is generated using sbt package. This is difficult for java folks to work with because: 1) Many java shops use maven (and a lot of them have a Sonatype Nexus repository in house) for dependency management. They want to specify the GAV and bang, the client jar and all its dependencies should be added to the application's classpath. I can't do that right now, because I need to run ./sbt eclipse, get the .JAR, add that to my classpath, add a whole lot of dependencies (log4j, slf4j, zkClient and so on) manually, which is a pain. There are 90 million maven central uploads/downloads in 2012 alone. Almost all the java shops out there have maven (either central or in house sonatype). 2) Separation of concerns - keeping the server (core) and the client's classes increases the size of the bundle for the client and also everytime the server's code changes and a release is performed, the client also needs to update their .JAR file. which is not very great. We don't want a ton of clients to update their .JAR file, just because a faster replication strategy for my kafka cluster changed in a new release. Action items are to separate the client portion of Kafka, add it in a pom along with the compile time dependencies and upload it to Maven Central or if you have a LinkedIn externally exposed Nexus, over there. This will increase adoption of the Kafka framework. > Mavenize and separate the client. > - > > Key: KAFKA-865 > URL: https://issues.apache.org/jira/browse/KAFKA-865 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 0.8 >Reporter: Ashwanth Fernando > > It seems that the java client for Kafka is also bundled with the server JAR > file and this is generated using sbt package. This is difficult for java > folks to work with because: > 1) Many java shops use maven and they want to specify the GAV of kafka in > their pom and bang, the client jar and all its dependencies should be added > to the application's classpath. I can't do that right now, because I need to > run ./sbt eclipse, get the .JAR, add that to my classpath, add a whole lot of > dependencies (log4j, slf4j, zkClient and so on) manually, which is a pain. > There are 90 million maven central uploads/downloads in 2012 alone. Almost > all the java shops out there have maven (either central or in house sonatype). > 2) Separation of concerns - keeping the server (core) and the client's > classes together in same jar file, increases the size of the bundle for a > client and also everytime the server's code changes and a release is > performed, the client also needs to update their .JAR file. which is not very > great. We don't want a ton of clients to update their .JAR file, just because > a faster replication strategy for the kafka server cluster changed in a new > release. > Action items are to separate the client and server portions of Kafka, add it > in a pom along with the compile time dependencies and upload it to Maven > Central or if you have a LinkedIn externally exposed Nexus, over there. > This will increase adoption of the Ka