Hi all,

Just as an addition to what Dawid asked, I would also like to ask:
1) which Flink version are you using? because the stack trace line
numbers do not match the current master.
2) as a clarification (although maybe not relevant here), there is no
guarantee on the order of the elements, so
the non-broadcast side may be faster and when you do

incomingPattern = ctx.getBroadcastState(ruleDescriptor).get(Key);

the "Key" may not be there yet.

Cheers,
Kostas

On Fri, Jul 3, 2020 at 9:56 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote:
>
> Hi Rahul.
>
> Could you verify that the provided code is the one that fails? Something
> does not seem right for me in the stacktrace. The stacktrace shows that
> you call processElement recursively, but I can not see that in the code:
>
> com.westpac.itm.eq.pattern.TestProcess.processElement(TestProcess.java:35)
>     at
> com.westpac.itm.eq.pattern.TestProcess.processElement(TestProcess.java:15)
>     at
>
> Where do you exactly get the exception? When accessing the local
> variable Map<String,String> incomingRule? Is it only a typo or are you
> really using
>
> Hashmap instead of java.util.HashMap? Not sure what is the Hashmap class...
>
> If you still find some problem there, could you provide is with an
> example with which we could reproduce the problem?
>
> Best,
>
> Dawid
>
> On 15/06/2020 16:21, bujjirahul45 . wrote:
> > I have some interesting scenario i am working on pattern matching in flink
> > evaluating the incoming data against a set of patterns using
> > keyedbroadcastprocessfunction, when i am running the program in IDE i am
> > getting null pointer exception in processElements method when trying to
> > access ReadOnlyContext but the same program is running fine in flink
> > terminal, below is my keyedbroadcastprocessfunction
> >
> > public class TestProcess extends KeyedBroadcastProcessFunction<String,
> > Tuple2<String, sampleSignal>,
> >         Tuple2<String, Map<String, String>>, Tuple2<String, sampleSignal>> {
> >
> >     public static final MapStateDescriptor <String,Map<String,String>>
> > ruleDescriptor =
> >             new MapStateDescriptor <>("RuleDiscriptor",
> >                     ,BasicTypeInfo.STRING_TYPE_INFO
> >                     ,new MapTypeInfo<>(String.class,String.class));
> >
> >     @Override
> >     public void processElement(Tuple2<String, sampleSignal> value,
> > ReadOnlyContext ctx, Collector<Tuple2<String,
> >             sampleSignal>> out) throws Exception {
> >
> >         System.out.println("sampleSignal: " +value.f1.toString());
> >
> >         String Context = ctx.getBroadcastState(ruleDescriptor).toString();
> >
> >         Map<String,String> incomingRule = new Hashmap<>();
> >
> >         incomingPattern = ctx.getBroadcastState(ruleDescriptor).get(Key);
> >
> >         /*It's hitting nullpointer exception when printing the size of
> > hashmpa*/
> >         System.out.println("Map Size: " +incomingRule.size());
> >
> >         System.out.println("Context: " +Context);
> >
> >         System.out.println("Before Rule Iterator");
> >
> >         /*I tried below way to print the values in broadcaststream just to
> > print the values
> >           in broadcast state it don't print anything*/
> >         for(Map.Entry<String, Map<String, String>> rules:
> >                 ctx.getBroadcastState(ruleDescriptor).immutableEntries()){
> >             System.out.println("Key: " +rules.getKey());
> >             System.out.println("Value: "+rules.getValue());
> >         }
> >
> >
> >         for(Map.Entry<String,String> rules: incomingRule.entrySet()){
> >
> >             System.out.println("Key: " +rules.getKey());
> >             System.out.println("Value: "+rules.getValue());
> >         }
> >
> >         out.collect(new Tuple2<>(value.f0,value.f1));
> >
> >     }
> >
> >     @Override
> >     public void processBroadcastElement(Tuple2<String, Map<String, String>>
> > value, Context ctx,
> >                                         Collector<Tuple2<String,
> > sampleSignal>> out) throws Exception {
> >
> >         System.out.println("BroadCastState Key: " +value.f0);
> >         System.out.println("BroadCastState Value: " +value.f1);
> >         ctx.getBroadcastState(ruleDescriptor).put(value.f0,value.f1);
> >
> >     }
> > }
> > Below is the IDE Terminal output with error exception
> >
> > /*Its prints below data in BroadCastState in processBroadcastElement*/
> > BroadCastState Key: Key
> > BroadCastState Value: {"RuleKey":"RuleValue"}
> >
> >
> > /*Its printing below data in processElement*/
> >
> > sampleSignal: {SignalData}
> >
> > When it hits the Map in which i am storing the Rule Name and Rule Condition
> > its throwing nullpointer exception and below is the stack trace of error
> >
> > Exception in thread "main"
> > org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> >     at
> > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> >     at
> > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
> >     at
> > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> >     at com.westpac.itm.eq.pattern.App.main(App.java:34)
> > Caused by: java.lang.NullPointerException
> >     at
> > com.westpac.itm.eq.pattern.TestProcess.processElement(TestProcess.java:35)
> >     at
> > com.westpac.itm.eq.pattern.TestProcess.processElement(TestProcess.java:15)
> >     at
> > org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:113)
> >     at
> > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:238)
> > Caused by: java.lang.NullPointerException
> >
> >     at
> > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
> >     at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> >     at java.lang.Thread.run(Thread.java:748)
> >
> >
> > Please help me in solving the issue
> >
> > Thanks,
> > Rahul.
> >
>

Reply via email to