Hello Team,

I'm new to Flink and coming from Spark background. I need help in completing 
this stream job. I'm reading data from two different Kafka topics and I want to 
join them.

My code:

formatStream1.join(formatStream2)
                .where(new KeySelector<Tuple11<String, String, String, String, 
String, String, String, String, String, String, Long>, String>() {
                        public String getKey(Tuple11<String, String, String, 
String, String, String, String, String, String, String, Long> t1) throws 
Exception {
                                return t1.f0;
                        }
                })
                .equalTo(new KeySelector<Tuple7<String, String, String, String, 
String, String, Long>, String>() {
                        public String getKey(Tuple7<String, String, String, 
String, String, String, Long> t1) throws Exception {
                                return t1.f0;
                        }
                }).window(TumblingEventTimeWindows.of(Time.seconds(15)))
                .apply(new JoinFunction<Tuple11<String, String, String, String, 
String, String, String, String, String, String, Long>, Tuple7<String, String, 
String, String, String, String, Long>, Tuple17<String, String, String, String, 
String, String, String, String, String, String, String, String, String, String, 
String, Long, Long>>() {

                                        public Tuple17<String, String, String, 
String, String, String, String, String, String, String, String, String, String, 
String, String, Long, Long> join(
                                                        Tuple11<String, String, 
String, String, String, String, String, String, String, String, Long> first,
                                                        Tuple7<String, String, 
String, String, String, String, Long> second) {
                                                return new Tuple17<String, 
String, String, String, String, String, String, String, String, String, String, 
String, String, String, String, Long, Long>(first.f0, first.f1, first.f2, 
first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, first.f9, 
second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, first.f10);
                                        }
                                }).print();


Error:

Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
        at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
        at 
com.argoid.flink.streaming.kafkajoin.JoinKafka.main(JoinKafka.java:155)
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= 
no timestamp marker). Is the time characteristic set to 'ProcessingTime', or 
did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
        at 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)

In formatStream1 and formatStream2 variable data is coming I checked by 
printing them. So, the issue is in the code which I shared. Thanks in advance!!!

Thanks,

        
Abhijeet Kumar
Software Development Engineer,
Sentienz Solutions Pvt Ltd
Cognitive Data Platform - Perceive the Data !
abhijeet.ku...@sentienz.com <mailto:abhijeet.ku...@sentienz.com> 
|www.sentienz.com <http://www.sentienz.com/> | Bengaluru


Reply via email to