Hi,

I assume that 

                withTimestampsAndWatermarks1.print();
                withTimestampsAndWatermarks2.print();

Actually prints what you have expected? 

If so, the problem might be that:
a) time/watermarks are not progressing (watermarks are triggering the output of 
your `TumblingEventTimeWindows.of(Time.seconds(15))`)
b) data are not being joined, because:
  - there are no matching elements (based on your KeySelectors) to join with 
between those two streams
  - elements are out of sync with respect to window length (within your 15 
second tumbling window, there are no elements to join)
c) streams are producing different event times/watermarks (for example one is 
far ahead of the other). Windowed join will produce result only once their’s 
both watermarks catch up/sync up.
  
Piotrek 

> On 23 Nov 2018, at 08:50, Abhijeet Kumar <abhijeet.ku...@sentienz.com> wrote:
> 
> DataStream<Tuple11<String, String, String, String, String, String, String, 
> String, String, String, Long>> withTimestampsAndWatermarks1 = formatStream1
>                               .assignTimestampsAndWatermarks(
>                                               new 
> BoundedOutOfOrdernessTimestampExtractor<Tuple11<String, String, String, 
> String, String, String, String, String, String, String, Long>>(
>                                                               
> Time.seconds(Integer.parseInt(parameters.get("watermarkTime")))) {
> 
>                                                       /**
>                                                                        * 
>                                                                        */
>                                                                       private 
> static final long serialVersionUID = 1L;
> 
>                                                       @Override
>                                                       public long 
> extractTimestamp(
>                                                                       
> Tuple11<String, String, String, String, String, String, String, String, 
> String, String, Long> element) {
>                                                               return 
> element.f10;
>                                                       }
>                                               });
> 
>               DataStream<Tuple7<String, String, String, String, String, 
> String, Long>> withTimestampsAndWatermarks2 = formatStream2
>                               .assignTimestampsAndWatermarks(
>                                               new 
> BoundedOutOfOrdernessTimestampExtractor<Tuple7<String, String, String, 
> String, String, String, Long>>(
>                                                               
> Time.seconds(Integer.parseInt(parameters.get("watermarkTime")))) {
> 
>                                                       /**
>                                                                        * 
>                                                                        */
>                                                                       private 
> static final long serialVersionUID = 1L;
> 
>                                                       @Override
>                                                       public long 
> extractTimestamp(
>                                                                       
> Tuple7<String, String, String, String, String, String, Long> element) {
>                                                               return 
> element.f6;
>                                                       }
>                                               });
>               
>               withTimestampsAndWatermarks1.print();
>               withTimestampsAndWatermarks2.print();
>               
>               DataStream< Tuple17<String, String, String, String, String, 
> String, String, String, String, String, String, String, String, String, 
> String, Long, Long>> joined = withTimestampsAndWatermarks1
>                               .join(withTimestampsAndWatermarks2)
>                               .where(new KeySelector<Tuple11<String, String, 
> String, String, String, String, String, String, String, String, Long>, 
> String>() {
>                                       /**
>                                        * 
>                                        */
>                                       private static final long 
> serialVersionUID = 1L;
> 
>                                       public String getKey(
>                                                       Tuple11<String, String, 
> String, String, String, String, String, String, String, String, Long> t1)
>                                                       throws Exception {
>                                               return t1.f0;
>                                       }
>                               }).equalTo(new KeySelector<Tuple7<String, 
> String, String, String, String, String, Long>, String>() {
>                                       /**
>                                        * 
>                                        */
>                                       private static final long 
> serialVersionUID = 1L;
> 
>                                       public String getKey(Tuple7<String, 
> String, String, String, String, String, Long> t1)
>                                                       throws Exception {
>                                               return t1.f0;
>                                       }
>                               
> }).window(TumblingEventTimeWindows.of(Time.seconds(15)))
>                               .apply(new JoinFunction<Tuple11<String, String, 
> String, String, String, String, String, String, String, String, Long>, 
> Tuple7<String, String, String, String, String, String, Long>, Tuple17<String, 
> String, String, String, String, String, String, String, String, String, 
> String, String, String, String, String, Long, Long>>() {
> 
>                                       /**
>                                        * 
>                                        */
>                                       private static final long 
> serialVersionUID = 1L;
> 
>                                       public Tuple17<String, String, String, 
> String, String, String, String, String, String, String, String, String, 
> String, String, String, Long, Long> join(
>                                                       Tuple11<String, String, 
> String, String, String, String, String, String, String, String, Long> first,
>                                                       Tuple7<String, String, 
> String, String, String, String, Long> second) {
>                                               
>                                               return new Tuple17<String, 
> String, String, String, String, String, String, String, String, String, 
> String, String, String, String, String, Long, Long>(first.f0, first.f1, 
> first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, 
> first.f9, second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, 
> first.f10);
>                                       }
>                               });
>               
>               joined.print();
> 
> Ok, so now I did it like this. Errors resolved! but, now I'm not able to see 
> any output when I'm printing joined datastream.
> 
>> On 23-Nov-2018, at 12:42 PM, Nagarjun Guraja <nagar...@gmail.com 
>> <mailto:nagar...@gmail.com>> wrote:
>> 
>> Looks like you need to assign time stamps and emit watermarks to both the 
>> streams viz. formatStream1 and formatStream2 as described at 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html>
>> 
>> On Thu, Nov 22, 2018 at 10:55 PM Abhijeet Kumar <abhijeet.ku...@sentienz.com 
>> <mailto:abhijeet.ku...@sentienz.com>> wrote:
>> Hello Team,
>> 
>> I'm new to Flink and coming from Spark background. I need help in completing 
>> this stream job. I'm reading data from two different Kafka topics and I want 
>> to join them.
>> 
>> My code:
>> 
>> formatStream1.join(formatStream2)
>>              .where(new KeySelector<Tuple11<String, String, String, String, 
>> String, String, String, String, String, String, Long>, String>() {
>>                      public String getKey(Tuple11<String, String, String, 
>> String, String, String, String, String, String, String, Long> t1) throws 
>> Exception {
>>                              return t1.f0;
>>                      }
>>              })
>>              .equalTo(new KeySelector<Tuple7<String, String, String, String, 
>> String, String, Long>, String>() {
>>                      public String getKey(Tuple7<String, String, String, 
>> String, String, String, Long> t1) throws Exception {
>>                              return t1.f0;
>>                      }
>>              }).window(TumblingEventTimeWindows.of(Time.seconds(15)))
>>              .apply(new JoinFunction<Tuple11<String, String, String, String, 
>> String, String, String, String, String, String, Long>, Tuple7<String, 
>> String, String, String, String, String, Long>, Tuple17<String, String, 
>> String, String, String, String, String, String, String, String, String, 
>> String, String, String, String, Long, Long>>() {
>> 
>>                                      public Tuple17<String, String, String, 
>> String, String, String, String, String, String, String, String, String, 
>> String, String, String, Long, Long> join(
>>                                                      Tuple11<String, String, 
>> String, String, String, String, String, String, String, String, Long> first,
>>                                                      Tuple7<String, String, 
>> String, String, String, String, Long> second) {
>>                                              return new Tuple17<String, 
>> String, String, String, String, String, String, String, String, String, 
>> String, String, String, String, String, Long, Long>(first.f0, first.f1, 
>> first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, 
>> first.f9, second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, 
>> first.f10);
>>                                      }
>>                              }).print();
>> 
>> 
>> Error:
>> 
>> Exception in thread "main" 
>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>      at 
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>      at 
>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
>>      at 
>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>      at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
>>      at 
>> com.argoid.flink.streaming.kafkajoin.JoinKafka.main(JoinKafka.java:155)
>> Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp 
>> (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', 
>> or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
>>      at 
>> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
>>      at 
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
>>      at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>      at java.lang.Thread.run(Thread.java:748)
>> 
>> In formatStream1 and formatStream2 variable data is coming I checked by 
>> printing them. So, the issue is in the code which I shared. Thanks in 
>> advance!!!
>> 
>> Thanks,
>> 
>>      
>> Abhijeet Kumar
>> Software Development Engineer,
>> Sentienz Solutions Pvt Ltd
>> Cognitive Data Platform - Perceive the Data !
>> abhijeet.ku...@sentienz.com <mailto:abhijeet.ku...@sentienz.com> 
>> |www.sentienz.com <http://www.sentienz.com/> | Bengaluru
>> 
>> -- 
>> Regards,
>> Nagarjun
>> 
>> Success is not final, failure is not fatal: it is the courage to continue 
>> that counts. 
>> - Winston Churchill - 
> 

Reply via email to