Looks like you need to assign time stamps and emit watermarks to both the
streams viz. formatStream1 and formatStream2 as described at
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html

On Thu, Nov 22, 2018 at 10:55 PM Abhijeet Kumar <abhijeet.ku...@sentienz.com>
wrote:

> Hello Team,
>
> I'm new to Flink and coming from Spark background. I need help in
> completing this stream job. I'm reading data from two different Kafka
> topics and I want to join them.
>
> My code:
>
> formatStream1.join(formatStream2)
> .where(new KeySelector<Tuple11<String, String, String, String, String,
> String, String, String, String, String, Long>, String>() {
> public String getKey(Tuple11<String, String, String, String, String,
> String, String, String, String, String, Long> t1) throws Exception {
> return t1.f0;
> }
> })
> .equalTo(new KeySelector<Tuple7<String, String, String, String, String,
> String, Long>, String>() {
> public String getKey(Tuple7<String, String, String, String, String,
> String, Long> t1) throws Exception {
> return t1.f0;
> }
> }).window(TumblingEventTimeWindows.of(Time.seconds(15)))
> .apply(new JoinFunction<Tuple11<String, String, String, String, String,
> String, String, String, String, String, Long>, Tuple7<String, String,
> String, String, String, String, Long>, Tuple17<String, String, String,
> String, String, String, String, String, String, String, String, String,
> String, String, String, Long, Long>>() {
>
> public Tuple17<String, String, String, String, String, String, String,
> String, String, String, String, String, String, String, String, Long, Long>
> join(
> Tuple11<String, String, String, String, String, String, String, String,
> String, String, Long> first,
> Tuple7<String, String, String, String, String, String, Long> second) {
> return new Tuple17<String, String, String, String, String, String,
> String, String, String, String, String, String, String, String, String,
> Long, Long>(first.f0, first.f1, first.f2, first.f3, first.f4, first.f5,
> first.f6, first.f7, first.f8, first.f9, second.f1, second.f2, second.f3,
> second.f4, second.f5, second.f6, first.f10);
> }
> }).print();
>
>
> Error:
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed.
> at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(
> JobResult.java:146)
> at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(
> MiniCluster.java:630)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(
> LocalStreamEnvironment.java:123)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(
> StreamExecutionEnvironment.java:1511)
> at com.argoid.flink.streaming.kafkajoin.JoinKafka.main(JoinKafka.java:155)
> Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE
> timestamp (= no timestamp marker). Is the time characteristic set to
> 'ProcessingTime', or did you forget to call
> 'DataStream.assignTimestampsAndWatermarks(...)'?
> at
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(
> TumblingEventTimeWindows.java:69)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(
> WindowOperator.java:295)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
> StreamInputProcessor.java:202)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:105)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
>
> In formatStream1 and formatStream2 variable data is coming I checked by
> printing them. So, the issue is in the code which I shared. Thanks in
> advance!!!
>
> Thanks,
>
>
> *Abhijeet Kumar*
> Software Development Engineer,
> Sentienz Solutions Pvt Ltd
> Cognitive Data Platform - Perceive the Data !
> abhijeet.ku...@sentienz.com |www.sentienz.com | Bengaluru
>
> --
Regards,
Nagarjun

*Success is not final, failure is not fatal: it is the courage to continue
that counts. *
*- Winston Churchill - *

Reply via email to