Hello, When I try to run a storm topology with a Kafka Spout on top of Flink, I get an NPE at:
15:00:32,853 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error closing stream operators after an exception. java.lang.NullPointerException at storm.kafka.KafkaSpout.close(KafkaSpout.java:130) at org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.close(AbstractStormSpoutWrapper.java:128) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:40) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:75) at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:243) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:197) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) at java.lang.Thread.run(Thread.java:745) 15:00:32,855 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend for state checkpoints is set to jobmanager. 15:00:32,855 INFO org.apache.flink.runtime.taskmanager.Task - event_deserializer (5/5) switched to RUNNING 15:00:32,859 INFO org.apache.flink.runtime.taskmanager.Task - Source: ads (1/1) switched to FAILED with exception. java.lang.NullPointerException at java.util.HashMap.putMapEntries(HashMap.java:500) at java.util.HashMap.<init>(HashMap.java:489) at storm.kafka.KafkaSpout.open(KafkaSpout.java:73) at org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.run(AbstractStormSpoutWrapper.java:102) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) at java.lang.Thread.run(Thread.java:745) Has someone seen this before? or Have a fix? I am using 0.10beta1 for all storm packages and a 0.10-snapshot (latest compiled) for all flink packages. Sample of the kafka code I am using: Broker broker = new Broker("localhost", 9092); GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation(); partitionInfo.addPartition(0, broker); StaticHosts hosts = new StaticHosts(partitionInfo); SpoutConfig spoutConfig = new SpoutConfig(hosts, "stuff", UUID.randomUUID().toString()); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); builder.setSpout("kafkaSpout", kafkaSpout, 1);