I have some interesting scenario i am working on pattern matching in flink
evaluating the incoming data against a set of patterns using
keyedbroadcastprocessfunction, when i am running the program in IDE i am
getting null pointer exception in processElements method when trying to
access ReadOnlyContext but the same program is running fine in flink
terminal, below is my keyedbroadcastprocessfunction
public class TestProcess extends KeyedBroadcastProcessFunction<String,
Tuple2<String, sampleSignal>,
Tuple2<String, Map<String, String>>, Tuple2<String, sampleSignal>> {
public static final MapStateDescriptor <String,Map<String,String>>
ruleDescriptor =
new MapStateDescriptor <>("RuleDiscriptor",
,BasicTypeInfo.STRING_TYPE_INFO
,new MapTypeInfo<>(String.class,String.class));
@Override
public void processElement(Tuple2<String, sampleSignal> value,
ReadOnlyContext ctx, Collector<Tuple2<String,
sampleSignal>> out) throws Exception {
System.out.println("sampleSignal: " +value.f1.toString());
String Context = ctx.getBroadcastState(ruleDescriptor).toString();
Map<String,String> incomingRule = new Hashmap<>();
incomingPattern = ctx.getBroadcastState(ruleDescriptor).get(Key);
/*It's hitting nullpointer exception when printing the size of
hashmpa*/
System.out.println("Map Size: " +incomingRule.size());
System.out.println("Context: " +Context);
System.out.println("Before Rule Iterator");
/*I tried below way to print the values in broadcaststream just to
print the values
in broadcast state it don't print anything*/
for(Map.Entry<String, Map<String, String>> rules:
ctx.getBroadcastState(ruleDescriptor).immutableEntries()){
System.out.println("Key: " +rules.getKey());
System.out.println("Value: "+rules.getValue());
}
for(Map.Entry<String,String> rules: incomingRule.entrySet()){
System.out.println("Key: " +rules.getKey());
System.out.println("Value: "+rules.getValue());
}
out.collect(new Tuple2<>(value.f0,value.f1));
}
@Override
public void processBroadcastElement(Tuple2<String, Map<String, String>>
value, Context ctx,
Collector<Tuple2<String,
sampleSignal>> out) throws Exception {
System.out.println("BroadCastState Key: " +value.f0);
System.out.println("BroadCastState Value: " +value.f1);
ctx.getBroadcastState(ruleDescriptor).put(value.f0,value.f1);
}
}
Below is the IDE Terminal output with error exception
/*Its prints below data in BroadCastState in processBroadcastElement*/
BroadCastState Key: Key
BroadCastState Value: {"RuleKey":"RuleValue"}
/*Its printing below data in processElement*/
sampleSignal: {SignalData}
When it hits the Map in which i am storing the Rule Name and Rule Condition
its throwing nullpointer exception and below is the stack trace of error
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at com.westpac.itm.eq.pattern.App.main(App.java:34)
Caused by: java.lang.NullPointerException
at
com.westpac.itm.eq.pattern.TestProcess.processElement(TestProcess.java:35)
at
com.westpac.itm.eq.pattern.TestProcess.processElement(TestProcess.java:15)
at
org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:113)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:238)
Caused by: java.lang.NullPointerException
at
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Please help me in solving the issue
Thanks,
Rahul.