Hi Jerry, the issue occurs because Flink's storm compatibility layer does not support custom configuration parameters currently. There is this JIRA which aims to add the missing feature to Flink: https://issues.apache.org/jira/browse/FLINK-2525 Maybe (but its unlikely) passing an empty Map in the AbstractStormSpoutWrapper:
this.spout.open(null, StormWrapperSetupHelper .convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), true), new SpoutOutputCollector(this.collector)); would fix the issue. But I suspect that the KafkaSpout needs some configuration parameters, so we have to wait for FLINK-2525. Best, Robert On Wed, Sep 2, 2015 at 7:58 PM, Jerry Peng <jerry.boyang.p...@gmail.com> wrote: > Hello, > > When I try to run a storm topology with a Kafka Spout on top of Flink, I > get an NPE at: > > 15:00:32,853 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask > - Error closing stream operators after an exception. > > java.lang.NullPointerException > > at storm.kafka.KafkaSpout.close(KafkaSpout.java:130) > > at > org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.close(AbstractStormSpoutWrapper.java:128) > > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:40) > > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:75) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:243) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:197) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > > at java.lang.Thread.run(Thread.java:745) > > 15:00:32,855 INFO org.apache.flink.streaming.runtime.tasks.StreamTask > - State backend for state checkpoints is set to jobmanager. > > 15:00:32,855 INFO org.apache.flink.runtime.taskmanager.Task > - event_deserializer (5/5) switched to RUNNING > > 15:00:32,859 INFO org.apache.flink.runtime.taskmanager.Task > - Source: ads (1/1) switched to FAILED with exception. > > java.lang.NullPointerException > > at java.util.HashMap.putMapEntries(HashMap.java:500) > > at java.util.HashMap.<init>(HashMap.java:489) > > at storm.kafka.KafkaSpout.open(KafkaSpout.java:73) > > at > org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper.run(AbstractStormSpoutWrapper.java:102) > > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) > > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:58) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) > > at java.lang.Thread.run(Thread.java:745) > > > Has someone seen this before? or Have a fix? I am using 0.10beta1 for all > storm packages and a 0.10-snapshot (latest compiled) for all flink > packages. Sample of the kafka code I am using: > > Broker broker = new Broker("localhost", 9092); > GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation(); > partitionInfo.addPartition(0, broker); > StaticHosts hosts = new StaticHosts(partitionInfo); > > SpoutConfig spoutConfig = new SpoutConfig(hosts, "stuff", > UUID.randomUUID().toString()); > spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); > KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); > > builder.setSpout("kafkaSpout", kafkaSpout, 1); > >