DataStream<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>> withTimestampsAndWatermarks1 = formatStream1 .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>>( Time.seconds(Integer.parseInt(parameters.get("watermarkTime")))) {
/** * */ private static final long serialVersionUID = 1L; @Override public long extractTimestamp( Tuple11<String, String, String, String, String, String, String, String, String, String, Long> element) { return element.f10; } }); DataStream<Tuple7<String, String, String, String, String, String, Long>> withTimestampsAndWatermarks2 = formatStream2 .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<Tuple7<String, String, String, String, String, String, Long>>( Time.seconds(Integer.parseInt(parameters.get("watermarkTime")))) { /** * */ private static final long serialVersionUID = 1L; @Override public long extractTimestamp( Tuple7<String, String, String, String, String, String, Long> element) { return element.f6; } }); withTimestampsAndWatermarks1.print(); withTimestampsAndWatermarks2.print(); DataStream< Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>> joined = withTimestampsAndWatermarks1 .join(withTimestampsAndWatermarks2) .where(new KeySelector<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, String>() { /** * */ private static final long serialVersionUID = 1L; public String getKey( Tuple11<String, String, String, String, String, String, String, String, String, String, Long> t1) throws Exception { return t1.f0; } }).equalTo(new KeySelector<Tuple7<String, String, String, String, String, String, Long>, String>() { /** * */ private static final long serialVersionUID = 1L; public String getKey(Tuple7<String, String, String, String, String, String, Long> t1) throws Exception { return t1.f0; } }).window(TumblingEventTimeWindows.of(Time.seconds(15))) .apply(new JoinFunction<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, Tuple7<String, String, String, String, String, String, Long>, Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>>() { /** * */ private static final long serialVersionUID = 1L; public Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long> join( Tuple11<String, String, String, String, String, String, String, String, String, String, Long> first, Tuple7<String, String, String, String, String, String, Long> second) { return new Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>(first.f0, first.f1, first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, first.f9, second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, first.f10); } }); joined.print(); Ok, so now I did it like this. Errors resolved! but, now I'm not able to see any output when I'm printing joined datastream. > On 23-Nov-2018, at 12:42 PM, Nagarjun Guraja <nagar...@gmail.com> wrote: > > Looks like you need to assign time stamps and emit watermarks to both the > streams viz. formatStream1 and formatStream2 as described at > https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html > > <https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html> > > On Thu, Nov 22, 2018 at 10:55 PM Abhijeet Kumar <abhijeet.ku...@sentienz.com > <mailto:abhijeet.ku...@sentienz.com>> wrote: > Hello Team, > > I'm new to Flink and coming from Spark background. I need help in completing > this stream job. I'm reading data from two different Kafka topics and I want > to join them. > > My code: > > formatStream1.join(formatStream2) > .where(new KeySelector<Tuple11<String, String, String, String, > String, String, String, String, String, String, Long>, String>() { > public String getKey(Tuple11<String, String, String, > String, String, String, String, String, String, String, Long> t1) throws > Exception { > return t1.f0; > } > }) > .equalTo(new KeySelector<Tuple7<String, String, String, String, > String, String, Long>, String>() { > public String getKey(Tuple7<String, String, String, > String, String, String, Long> t1) throws Exception { > return t1.f0; > } > }).window(TumblingEventTimeWindows.of(Time.seconds(15))) > .apply(new JoinFunction<Tuple11<String, String, String, String, > String, String, String, String, String, String, Long>, Tuple7<String, String, > String, String, String, String, Long>, Tuple17<String, String, String, > String, String, String, String, String, String, String, String, String, > String, String, String, Long, Long>>() { > > public Tuple17<String, String, String, > String, String, String, String, String, String, String, String, String, > String, String, String, Long, Long> join( > Tuple11<String, String, > String, String, String, String, String, String, String, String, Long> first, > Tuple7<String, String, > String, String, String, String, Long> second) { > return new Tuple17<String, > String, String, String, String, String, String, String, String, String, > String, String, String, String, String, Long, Long>(first.f0, first.f1, > first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, > first.f9, second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, > first.f10); > } > }).print(); > > > Error: > > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511) > at > com.argoid.flink.streaming.kafkajoin.JoinKafka.main(JoinKafka.java:155) > Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= > no timestamp marker). Is the time characteristic set to 'ProcessingTime', or > did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'? > at > org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > > In formatStream1 and formatStream2 variable data is coming I checked by > printing them. So, the issue is in the code which I shared. Thanks in > advance!!! > > Thanks, > > > Abhijeet Kumar > Software Development Engineer, > Sentienz Solutions Pvt Ltd > Cognitive Data Platform - Perceive the Data ! > abhijeet.ku...@sentienz.com <mailto:abhijeet.ku...@sentienz.com> > |www.sentienz.com <http://www.sentienz.com/> | Bengaluru > > -- > Regards, > Nagarjun > > Success is not final, failure is not fatal: it is the courage to continue > that counts. > - Winston Churchill -