DataStream<Tuple11<String, String, String, String, String, String, String, 
String, String, String, Long>> withTimestampsAndWatermarks1 = formatStream1
                                .assignTimestampsAndWatermarks(
                                                new 
BoundedOutOfOrdernessTimestampExtractor<Tuple11<String, String, String, String, 
String, String, String, String, String, String, Long>>(
                                                                
Time.seconds(Integer.parseInt(parameters.get("watermarkTime")))) {

                                                        /**
                                                                         * 
                                                                         */
                                                                        private 
static final long serialVersionUID = 1L;

                                                        @Override
                                                        public long 
extractTimestamp(
                                                                        
Tuple11<String, String, String, String, String, String, String, String, String, 
String, Long> element) {
                                                                return 
element.f10;
                                                        }
                                                });

                DataStream<Tuple7<String, String, String, String, String, 
String, Long>> withTimestampsAndWatermarks2 = formatStream2
                                .assignTimestampsAndWatermarks(
                                                new 
BoundedOutOfOrdernessTimestampExtractor<Tuple7<String, String, String, String, 
String, String, Long>>(
                                                                
Time.seconds(Integer.parseInt(parameters.get("watermarkTime")))) {

                                                        /**
                                                                         * 
                                                                         */
                                                                        private 
static final long serialVersionUID = 1L;

                                                        @Override
                                                        public long 
extractTimestamp(
                                                                        
Tuple7<String, String, String, String, String, String, Long> element) {
                                                                return 
element.f6;
                                                        }
                                                });
                
                withTimestampsAndWatermarks1.print();
                withTimestampsAndWatermarks2.print();
                
                DataStream< Tuple17<String, String, String, String, String, 
String, String, String, String, String, String, String, String, String, String, 
Long, Long>> joined = withTimestampsAndWatermarks1
                                .join(withTimestampsAndWatermarks2)
                                .where(new KeySelector<Tuple11<String, String, 
String, String, String, String, String, String, String, String, Long>, 
String>() {
                                        /**
                                         * 
                                         */
                                        private static final long 
serialVersionUID = 1L;

                                        public String getKey(
                                                        Tuple11<String, String, 
String, String, String, String, String, String, String, String, Long> t1)
                                                        throws Exception {
                                                return t1.f0;
                                        }
                                }).equalTo(new KeySelector<Tuple7<String, 
String, String, String, String, String, Long>, String>() {
                                        /**
                                         * 
                                         */
                                        private static final long 
serialVersionUID = 1L;

                                        public String getKey(Tuple7<String, 
String, String, String, String, String, Long> t1)
                                                        throws Exception {
                                                return t1.f0;
                                        }
                                
}).window(TumblingEventTimeWindows.of(Time.seconds(15)))
                                .apply(new JoinFunction<Tuple11<String, String, 
String, String, String, String, String, String, String, String, Long>, 
Tuple7<String, String, String, String, String, String, Long>, Tuple17<String, 
String, String, String, String, String, String, String, String, String, String, 
String, String, String, String, Long, Long>>() {

                                        /**
                                         * 
                                         */
                                        private static final long 
serialVersionUID = 1L;

                                        public Tuple17<String, String, String, 
String, String, String, String, String, String, String, String, String, String, 
String, String, Long, Long> join(
                                                        Tuple11<String, String, 
String, String, String, String, String, String, String, String, Long> first,
                                                        Tuple7<String, String, 
String, String, String, String, Long> second) {
                                                
                                                return new Tuple17<String, 
String, String, String, String, String, String, String, String, String, String, 
String, String, String, String, Long, Long>(first.f0, first.f1, first.f2, 
first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, first.f9, 
second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, first.f10);
                                        }
                                });
                
                joined.print();

Ok, so now I did it like this. Errors resolved! but, now I'm not able to see 
any output when I'm printing joined datastream.

> On 23-Nov-2018, at 12:42 PM, Nagarjun Guraja <nagar...@gmail.com> wrote:
> 
> Looks like you need to assign time stamps and emit watermarks to both the 
> streams viz. formatStream1 and formatStream2 as described at 
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html>
> 
> On Thu, Nov 22, 2018 at 10:55 PM Abhijeet Kumar <abhijeet.ku...@sentienz.com 
> <mailto:abhijeet.ku...@sentienz.com>> wrote:
> Hello Team,
> 
> I'm new to Flink and coming from Spark background. I need help in completing 
> this stream job. I'm reading data from two different Kafka topics and I want 
> to join them.
> 
> My code:
> 
> formatStream1.join(formatStream2)
>               .where(new KeySelector<Tuple11<String, String, String, String, 
> String, String, String, String, String, String, Long>, String>() {
>                       public String getKey(Tuple11<String, String, String, 
> String, String, String, String, String, String, String, Long> t1) throws 
> Exception {
>                               return t1.f0;
>                       }
>               })
>               .equalTo(new KeySelector<Tuple7<String, String, String, String, 
> String, String, Long>, String>() {
>                       public String getKey(Tuple7<String, String, String, 
> String, String, String, Long> t1) throws Exception {
>                               return t1.f0;
>                       }
>               }).window(TumblingEventTimeWindows.of(Time.seconds(15)))
>               .apply(new JoinFunction<Tuple11<String, String, String, String, 
> String, String, String, String, String, String, Long>, Tuple7<String, String, 
> String, String, String, String, Long>, Tuple17<String, String, String, 
> String, String, String, String, String, String, String, String, String, 
> String, String, String, Long, Long>>() {
> 
>                                       public Tuple17<String, String, String, 
> String, String, String, String, String, String, String, String, String, 
> String, String, String, Long, Long> join(
>                                                       Tuple11<String, String, 
> String, String, String, String, String, String, String, String, Long> first,
>                                                       Tuple7<String, String, 
> String, String, String, String, Long> second) {
>                                               return new Tuple17<String, 
> String, String, String, String, String, String, String, String, String, 
> String, String, String, String, String, Long, Long>(first.f0, first.f1, 
> first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, 
> first.f9, second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, 
> first.f10);
>                                       }
>                               }).print();
> 
> 
> Error:
> 
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>       at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>       at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
>       at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
>       at 
> com.argoid.flink.streaming.kafkajoin.JoinKafka.main(JoinKafka.java:155)
> Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= 
> no timestamp marker). Is the time characteristic set to 'ProcessingTime', or 
> did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
>       at 
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
>       at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>       at java.lang.Thread.run(Thread.java:748)
> 
> In formatStream1 and formatStream2 variable data is coming I checked by 
> printing them. So, the issue is in the code which I shared. Thanks in 
> advance!!!
> 
> Thanks,
> 
>       
> Abhijeet Kumar
> Software Development Engineer,
> Sentienz Solutions Pvt Ltd
> Cognitive Data Platform - Perceive the Data !
> abhijeet.ku...@sentienz.com <mailto:abhijeet.ku...@sentienz.com> 
> |www.sentienz.com <http://www.sentienz.com/> | Bengaluru
> 
> -- 
> Regards,
> Nagarjun
> 
> Success is not final, failure is not fatal: it is the courage to continue 
> that counts. 
> - Winston Churchill - 

Reply via email to