Hi, I assume that
withTimestampsAndWatermarks1.print(); withTimestampsAndWatermarks2.print(); Actually prints what you have expected? If so, the problem might be that: a) time/watermarks are not progressing (watermarks are triggering the output of your `TumblingEventTimeWindows.of(Time.seconds(15))`) b) data are not being joined, because: - there are no matching elements (based on your KeySelectors) to join with between those two streams - elements are out of sync with respect to window length (within your 15 second tumbling window, there are no elements to join) c) streams are producing different event times/watermarks (for example one is far ahead of the other). Windowed join will produce result only once their’s both watermarks catch up/sync up. Piotrek > On 23 Nov 2018, at 08:50, Abhijeet Kumar <abhijeet.ku...@sentienz.com> wrote: > > DataStream<Tuple11<String, String, String, String, String, String, String, > String, String, String, Long>> withTimestampsAndWatermarks1 = formatStream1 > .assignTimestampsAndWatermarks( > new > BoundedOutOfOrdernessTimestampExtractor<Tuple11<String, String, String, > String, String, String, String, String, String, String, Long>>( > > Time.seconds(Integer.parseInt(parameters.get("watermarkTime")))) { > > /** > * > */ > private > static final long serialVersionUID = 1L; > > @Override > public long > extractTimestamp( > > Tuple11<String, String, String, String, String, String, String, String, > String, String, Long> element) { > return > element.f10; > } > }); > > DataStream<Tuple7<String, String, String, String, String, > String, Long>> withTimestampsAndWatermarks2 = formatStream2 > .assignTimestampsAndWatermarks( > new > BoundedOutOfOrdernessTimestampExtractor<Tuple7<String, String, String, > String, String, String, Long>>( > > Time.seconds(Integer.parseInt(parameters.get("watermarkTime")))) { > > /** > * > */ > private > static final long serialVersionUID = 1L; > > @Override > public long > extractTimestamp( > > Tuple7<String, String, String, String, String, String, Long> element) { > return > element.f6; > } > }); > > withTimestampsAndWatermarks1.print(); > withTimestampsAndWatermarks2.print(); > > DataStream< Tuple17<String, String, String, String, String, > String, String, String, String, String, String, String, String, String, > String, Long, Long>> joined = withTimestampsAndWatermarks1 > .join(withTimestampsAndWatermarks2) > .where(new KeySelector<Tuple11<String, String, > String, String, String, String, String, String, String, String, Long>, > String>() { > /** > * > */ > private static final long > serialVersionUID = 1L; > > public String getKey( > Tuple11<String, String, > String, String, String, String, String, String, String, String, Long> t1) > throws Exception { > return t1.f0; > } > }).equalTo(new KeySelector<Tuple7<String, > String, String, String, String, String, Long>, String>() { > /** > * > */ > private static final long > serialVersionUID = 1L; > > public String getKey(Tuple7<String, > String, String, String, String, String, Long> t1) > throws Exception { > return t1.f0; > } > > }).window(TumblingEventTimeWindows.of(Time.seconds(15))) > .apply(new JoinFunction<Tuple11<String, String, > String, String, String, String, String, String, String, String, Long>, > Tuple7<String, String, String, String, String, String, Long>, Tuple17<String, > String, String, String, String, String, String, String, String, String, > String, String, String, String, String, Long, Long>>() { > > /** > * > */ > private static final long > serialVersionUID = 1L; > > public Tuple17<String, String, String, > String, String, String, String, String, String, String, String, String, > String, String, String, Long, Long> join( > Tuple11<String, String, > String, String, String, String, String, String, String, String, Long> first, > Tuple7<String, String, > String, String, String, String, Long> second) { > > return new Tuple17<String, > String, String, String, String, String, String, String, String, String, > String, String, String, String, String, Long, Long>(first.f0, first.f1, > first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, > first.f9, second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, > first.f10); > } > }); > > joined.print(); > > Ok, so now I did it like this. Errors resolved! but, now I'm not able to see > any output when I'm printing joined datastream. > >> On 23-Nov-2018, at 12:42 PM, Nagarjun Guraja <nagar...@gmail.com >> <mailto:nagar...@gmail.com>> wrote: >> >> Looks like you need to assign time stamps and emit watermarks to both the >> streams viz. formatStream1 and formatStream2 as described at >> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html >> >> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html> >> >> On Thu, Nov 22, 2018 at 10:55 PM Abhijeet Kumar <abhijeet.ku...@sentienz.com >> <mailto:abhijeet.ku...@sentienz.com>> wrote: >> Hello Team, >> >> I'm new to Flink and coming from Spark background. I need help in completing >> this stream job. I'm reading data from two different Kafka topics and I want >> to join them. >> >> My code: >> >> formatStream1.join(formatStream2) >> .where(new KeySelector<Tuple11<String, String, String, String, >> String, String, String, String, String, String, Long>, String>() { >> public String getKey(Tuple11<String, String, String, >> String, String, String, String, String, String, String, Long> t1) throws >> Exception { >> return t1.f0; >> } >> }) >> .equalTo(new KeySelector<Tuple7<String, String, String, String, >> String, String, Long>, String>() { >> public String getKey(Tuple7<String, String, String, >> String, String, String, Long> t1) throws Exception { >> return t1.f0; >> } >> }).window(TumblingEventTimeWindows.of(Time.seconds(15))) >> .apply(new JoinFunction<Tuple11<String, String, String, String, >> String, String, String, String, String, String, Long>, Tuple7<String, >> String, String, String, String, String, Long>, Tuple17<String, String, >> String, String, String, String, String, String, String, String, String, >> String, String, String, String, Long, Long>>() { >> >> public Tuple17<String, String, String, >> String, String, String, String, String, String, String, String, String, >> String, String, String, Long, Long> join( >> Tuple11<String, String, >> String, String, String, String, String, String, String, String, Long> first, >> Tuple7<String, String, >> String, String, String, String, Long> second) { >> return new Tuple17<String, >> String, String, String, String, String, String, String, String, String, >> String, String, String, String, String, Long, Long>(first.f0, first.f1, >> first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, >> first.f9, second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, >> first.f10); >> } >> }).print(); >> >> >> Error: >> >> Exception in thread "main" >> org.apache.flink.runtime.client.JobExecutionException: Job execution failed. >> at >> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >> at >> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630) >> at >> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511) >> at >> com.argoid.flink.streaming.kafkajoin.JoinKafka.main(JoinKafka.java:155) >> Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp >> (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', >> or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'? >> at >> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69) >> at >> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295) >> at >> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >> at java.lang.Thread.run(Thread.java:748) >> >> In formatStream1 and formatStream2 variable data is coming I checked by >> printing them. So, the issue is in the code which I shared. Thanks in >> advance!!! >> >> Thanks, >> >> >> Abhijeet Kumar >> Software Development Engineer, >> Sentienz Solutions Pvt Ltd >> Cognitive Data Platform - Perceive the Data ! >> abhijeet.ku...@sentienz.com <mailto:abhijeet.ku...@sentienz.com> >> |www.sentienz.com <http://www.sentienz.com/> | Bengaluru >> >> -- >> Regards, >> Nagarjun >> >> Success is not final, failure is not fatal: it is the courage to continue >> that counts. >> - Winston Churchill - >