Hi all, Just as an addition to what Dawid asked, I would also like to ask: 1) which Flink version are you using? because the stack trace line numbers do not match the current master. 2) as a clarification (although maybe not relevant here), there is no guarantee on the order of the elements, so the non-broadcast side may be faster and when you do
incomingPattern = ctx.getBroadcastState(ruleDescriptor).get(Key); the "Key" may not be there yet. Cheers, Kostas On Fri, Jul 3, 2020 at 9:56 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > > Hi Rahul. > > Could you verify that the provided code is the one that fails? Something > does not seem right for me in the stacktrace. The stacktrace shows that > you call processElement recursively, but I can not see that in the code: > > com.westpac.itm.eq.pattern.TestProcess.processElement(TestProcess.java:35) > at > com.westpac.itm.eq.pattern.TestProcess.processElement(TestProcess.java:15) > at > > Where do you exactly get the exception? When accessing the local > variable Map<String,String> incomingRule? Is it only a typo or are you > really using > > Hashmap instead of java.util.HashMap? Not sure what is the Hashmap class... > > If you still find some problem there, could you provide is with an > example with which we could reproduce the problem? > > Best, > > Dawid > > On 15/06/2020 16:21, bujjirahul45 . wrote: > > I have some interesting scenario i am working on pattern matching in flink > > evaluating the incoming data against a set of patterns using > > keyedbroadcastprocessfunction, when i am running the program in IDE i am > > getting null pointer exception in processElements method when trying to > > access ReadOnlyContext but the same program is running fine in flink > > terminal, below is my keyedbroadcastprocessfunction > > > > public class TestProcess extends KeyedBroadcastProcessFunction<String, > > Tuple2<String, sampleSignal>, > > Tuple2<String, Map<String, String>>, Tuple2<String, sampleSignal>> { > > > > public static final MapStateDescriptor <String,Map<String,String>> > > ruleDescriptor = > > new MapStateDescriptor <>("RuleDiscriptor", > > ,BasicTypeInfo.STRING_TYPE_INFO > > ,new MapTypeInfo<>(String.class,String.class)); > > > > @Override > > public void processElement(Tuple2<String, sampleSignal> value, > > ReadOnlyContext ctx, Collector<Tuple2<String, > > sampleSignal>> out) throws Exception { > > > > System.out.println("sampleSignal: " +value.f1.toString()); > > > > String Context = ctx.getBroadcastState(ruleDescriptor).toString(); > > > > Map<String,String> incomingRule = new Hashmap<>(); > > > > incomingPattern = ctx.getBroadcastState(ruleDescriptor).get(Key); > > > > /*It's hitting nullpointer exception when printing the size of > > hashmpa*/ > > System.out.println("Map Size: " +incomingRule.size()); > > > > System.out.println("Context: " +Context); > > > > System.out.println("Before Rule Iterator"); > > > > /*I tried below way to print the values in broadcaststream just to > > print the values > > in broadcast state it don't print anything*/ > > for(Map.Entry<String, Map<String, String>> rules: > > ctx.getBroadcastState(ruleDescriptor).immutableEntries()){ > > System.out.println("Key: " +rules.getKey()); > > System.out.println("Value: "+rules.getValue()); > > } > > > > > > for(Map.Entry<String,String> rules: incomingRule.entrySet()){ > > > > System.out.println("Key: " +rules.getKey()); > > System.out.println("Value: "+rules.getValue()); > > } > > > > out.collect(new Tuple2<>(value.f0,value.f1)); > > > > } > > > > @Override > > public void processBroadcastElement(Tuple2<String, Map<String, String>> > > value, Context ctx, > > Collector<Tuple2<String, > > sampleSignal>> out) throws Exception { > > > > System.out.println("BroadCastState Key: " +value.f0); > > System.out.println("BroadCastState Value: " +value.f1); > > ctx.getBroadcastState(ruleDescriptor).put(value.f0,value.f1); > > > > } > > } > > Below is the IDE Terminal output with error exception > > > > /*Its prints below data in BroadCastState in processBroadcastElement*/ > > BroadCastState Key: Key > > BroadCastState Value: {"RuleKey":"RuleValue"} > > > > > > /*Its printing below data in processElement*/ > > > > sampleSignal: {SignalData} > > > > When it hits the Map in which i am storing the Rule Name and Rule Condition > > its throwing nullpointer exception and below is the stack trace of error > > > > Exception in thread "main" > > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > > at > > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > > at > > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638) > > at > > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > > at com.westpac.itm.eq.pattern.App.main(App.java:34) > > Caused by: java.lang.NullPointerException > > at > > com.westpac.itm.eq.pattern.TestProcess.processElement(TestProcess.java:35) > > at > > com.westpac.itm.eq.pattern.TestProcess.processElement(TestProcess.java:15) > > at > > org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:113) > > at > > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:238) > > Caused by: java.lang.NullPointerException > > > > at > > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117) > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > > at java.lang.Thread.run(Thread.java:748) > > > > > > Please help me in solving the issue > > > > Thanks, > > Rahul. > > >