Ping.... Any thoughts on this?
Seems like a bug, but then again, we're not sure what the expected behavior for regexes should be here (e.g. is there a way to whitelist topics with a filter that looks for a leading substring, but then blocks subsequent substrings)? E.g. apply a blacklist to a whitelist :)..... Jason On Thu, Dec 12, 2013 at 1:01 PM, Jason Rosenberg <j...@squareup.com> wrote: > All, I've filed: https://issues.apache.org/jira/browse/KAFKA-1180 > > We are needing to create a stream selector that essentially combines the > logic of the BlackList and WhiteList classes. That is, we want to select a > topic that contains a certain prefix, as long as it doesn't also contain a > secondary string. > > This should be easy to do with ordinary java Regex's, but we're running > into some issues, trying to do this with the WhiteList class only. > > We have a pattern that uses negative lookahead, like this: > > "test-(?!bad\\b)[\\w]+" > > So this should select a topic like: "test-good", but exclude a topic like > "test-bad", and also exclude a topic without the "test" prefix, like > "foo-bar". > > Instead, what we see is a NullPointerException in the ConsumerIterator, > and the consumer just hangs, after sending a topic of 'test-topic' followed > by 'test-bad': > > 21700 > [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683] > ERROR kafka.consumer.ConsumerFetcherThread - > [ConsumerFetcherThread-group1_square-1a7ac0.local-1386869343370-dc19c7dc-0-1946108683], > Error due to > kafka.common.KafkaException: error processing data for partition > [test-bad,0] offset 0 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:137) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:109) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:109) > at kafka.utils.Utils$.inLock(Utils.scala:565) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:108) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > Caused by: java.lang.NullPointerException > at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) > at > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128) > ... 9 more >