Hi Rahul. Could you verify that the provided code is the one that fails? Something does not seem right for me in the stacktrace. The stacktrace shows that you call processElement recursively, but I can not see that in the code:
com.westpac.itm.eq.pattern.TestProcess.processElement(TestProcess.java:35) at com.westpac.itm.eq.pattern.TestProcess.processElement(TestProcess.java:15) at Where do you exactly get the exception? When accessing the local variable Map<String,String> incomingRule? Is it only a typo or are you really using Hashmap instead of java.util.HashMap? Not sure what is the Hashmap class... If you still find some problem there, could you provide is with an example with which we could reproduce the problem? Best, Dawid On 15/06/2020 16:21, bujjirahul45 . wrote: > I have some interesting scenario i am working on pattern matching in flink > evaluating the incoming data against a set of patterns using > keyedbroadcastprocessfunction, when i am running the program in IDE i am > getting null pointer exception in processElements method when trying to > access ReadOnlyContext but the same program is running fine in flink > terminal, below is my keyedbroadcastprocessfunction > > public class TestProcess extends KeyedBroadcastProcessFunction<String, > Tuple2<String, sampleSignal>, > Tuple2<String, Map<String, String>>, Tuple2<String, sampleSignal>> { > > public static final MapStateDescriptor <String,Map<String,String>> > ruleDescriptor = > new MapStateDescriptor <>("RuleDiscriptor", > ,BasicTypeInfo.STRING_TYPE_INFO > ,new MapTypeInfo<>(String.class,String.class)); > > @Override > public void processElement(Tuple2<String, sampleSignal> value, > ReadOnlyContext ctx, Collector<Tuple2<String, > sampleSignal>> out) throws Exception { > > System.out.println("sampleSignal: " +value.f1.toString()); > > String Context = ctx.getBroadcastState(ruleDescriptor).toString(); > > Map<String,String> incomingRule = new Hashmap<>(); > > incomingPattern = ctx.getBroadcastState(ruleDescriptor).get(Key); > > /*It's hitting nullpointer exception when printing the size of > hashmpa*/ > System.out.println("Map Size: " +incomingRule.size()); > > System.out.println("Context: " +Context); > > System.out.println("Before Rule Iterator"); > > /*I tried below way to print the values in broadcaststream just to > print the values > in broadcast state it don't print anything*/ > for(Map.Entry<String, Map<String, String>> rules: > ctx.getBroadcastState(ruleDescriptor).immutableEntries()){ > System.out.println("Key: " +rules.getKey()); > System.out.println("Value: "+rules.getValue()); > } > > > for(Map.Entry<String,String> rules: incomingRule.entrySet()){ > > System.out.println("Key: " +rules.getKey()); > System.out.println("Value: "+rules.getValue()); > } > > out.collect(new Tuple2<>(value.f0,value.f1)); > > } > > @Override > public void processBroadcastElement(Tuple2<String, Map<String, String>> > value, Context ctx, > Collector<Tuple2<String, > sampleSignal>> out) throws Exception { > > System.out.println("BroadCastState Key: " +value.f0); > System.out.println("BroadCastState Value: " +value.f1); > ctx.getBroadcastState(ruleDescriptor).put(value.f0,value.f1); > > } > } > Below is the IDE Terminal output with error exception > > /*Its prints below data in BroadCastState in processBroadcastElement*/ > BroadCastState Key: Key > BroadCastState Value: {"RuleKey":"RuleValue"} > > > /*Its printing below data in processElement*/ > > sampleSignal: {SignalData} > > When it hits the Map in which i am storing the Rule Name and Rule Condition > its throwing nullpointer exception and below is the stack trace of error > > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at com.westpac.itm.eq.pattern.App.main(App.java:34) > Caused by: java.lang.NullPointerException > at > com.westpac.itm.eq.pattern.TestProcess.processElement(TestProcess.java:35) > at > com.westpac.itm.eq.pattern.TestProcess.processElement(TestProcess.java:15) > at > org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:113) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:238) > Caused by: java.lang.NullPointerException > > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > > > Please help me in solving the issue > > Thanks, > Rahul. >
signature.asc
Description: OpenPGP digital signature