Hello,
When I try to run a storm topology with a Kafka Spout on top of Flink, I
get an NPE at:
15:00:32,853 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask
- Error closing stream operators after an exception.
java.lang.NullPointerException
at storm.kafka.KafkaSpout.close(KafkaSpout.java:130)
at
org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.close(AbstractStormSpoutWrapper.java:128)
at
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:40)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:75)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:243)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:197)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
at java.lang.Thread.run(Thread.java:745)
15:00:32,855 INFO org.apache.flink.streaming.runtime.tasks.StreamTask
- State backend for state checkpoints is set to jobmanager.
15:00:32,855 INFO org.apache.flink.runtime.taskmanager.Task
- event_deserializer (5/5) switched to RUNNING
15:00:32,859 INFO org.apache.flink.runtime.taskmanager.Task
- Source: ads (1/1) switched to FAILED with exception.
java.lang.NullPointerException
at java.util.HashMap.putMapEntries(HashMap.java:500)
at java.util.HashMap.<init>(HashMap.java:489)
at storm.kafka.KafkaSpout.open(KafkaSpout.java:73)
at
org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.run(AbstractStormSpoutWrapper.java:102)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
at java.lang.Thread.run(Thread.java:745)
Has someone seen this before? or Have a fix? I am using 0.10beta1 for all
storm packages and a 0.10-snapshot (latest compiled) for all flink
packages. Sample of the kafka code I am using:
Broker broker = new Broker("localhost", 9092);
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.addPartition(0, broker);
StaticHosts hosts = new StaticHosts(partitionInfo);
SpoutConfig spoutConfig = new SpoutConfig(hosts, "stuff",
UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
builder.setSpout("kafkaSpout", kafkaSpout, 1);