Hi Taher,

Thanks for the quick response but, if you tell the problem in my code then it 
would be a great help.

Thanks,

        
Abhijeet Kumar
Software Development Engineer,
Sentienz Solutions Pvt Ltd
Cognitive Data Platform - Perceive the Data !
[email protected] <mailto:[email protected]> 
|www.sentienz.com <http://www.sentienz.com/> | Bengaluru


> On 26-Nov-2018, at 1:25 PM, Taher Koitawala <[email protected]> wrote:
> 
> Hi Abhijeet,
>                   Refer to this code
> 
> assignTimestampsAndWatermarks(
>                                       new 
> AssignerWithPeriodicWatermarks<Tuple4<String, String, String, String>>() {
>                                               long currentTimstamp = 0L;
> 
>                                               @Override
>                                               public long 
> extractTimestamp(Tuple4<String, String, String, String> tuple4, long 
> timestamp) {
>                                                       currentTimstamp = 
> Long.parseLong(tuple4.f0.substring(0, 13));
>                                                       return currentTimstamp;
>                                               }
> 
> 
> Regards,
> Taher Koitawala
> GS Lab Pune
> +91 8407979163
> 
> 
> On Mon, Nov 26, 2018 at 1:05 PM Abhijeet Kumar <[email protected] 
> <mailto:[email protected]>> wrote:
> Hello Team,
> 
> I'm new to Flink and I don't know why window is not working
> 
> DataStream<Tuple7<String, String, String, String, String, String, Long>> 
> window2 = stream2.assignTimestampsAndWatermarks(
>                                               new 
> AscendingTimestampExtractor<Tuple7<String, String, String, String, String, 
> String, Long>>() {
>                                                       public long 
> extractAscendingTimestamp(
>                                                                       
> Tuple7<String, String, String, String, String, String, Long> t) {
>                                                               return t.f6;
>                                                       }
>                                               
> }).windowAll(TumblingEventTimeWindows.of(Time.seconds(2))).reduce(new 
> Reducer2());
>               
> window2.print();
> 
> So, this is the code that I've written for 2 sec window and I'm using event 
> time for processing window
> 
> My data format is like 
> 
> 13,234234,34,32445,3423fsdf,20191111111119
> 
> The last value in csv is time(YYYYMMDDHHmmss)
> 
> Definition of Reducer2:
> 
> public static final class Reducer2
>                       implements ReduceFunction<Tuple7<String, String, 
> String, String, String, String, Long>> {
>               public Tuple7<String, String, String, String, String, String, 
> Long> reduce(
>                               Tuple7<String, String, String, String, String, 
> String, Long> t_new,
>                               Tuple7<String, String, String, String, String, 
> String, Long> t_old) {
>                       return new Tuple7<String, String, String, String, 
> String, String, Long>(t_new.f0, t_new.f1, t_new.f2,
>                                       t_new.f3, t_new.f4, t_new.f5, t_new.f6);
>               }
>       }
> 
> With my understanding when the data comes similar to above sample, then first 
> window is created. When timestamp is 20191111111120 then, this will also be 
> added to the window. Finally if something comes like 20191111111122, the old 
> window should be stopped and this code should print the result on the 
> console. Problem is it's not working the same way. May be my understanding is 
> not correct. please correct me if I'm wrong.
> 
> Thanks,
> 
>       
> Abhijeet Kumar
> Software Development Engineer,
> Sentienz Solutions Pvt Ltd
> Cognitive Data Platform - Perceive the Data !
> [email protected] <mailto:[email protected]> 
> |www.sentienz.com <http://www.sentienz.com/> | Bengaluru
> 
> 

Reply via email to