thanks,

I did using ".timeWindowAll(Time.seconds(5), Time.seconds(1)).apply(new
LogLineAllWindowFunction());" My output is filtering only tha values inside
the window.

thanks, Felipe



On Mon, Mar 19, 2018 at 10:54 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> If you don't want to partition by key, i.e., have a single result for each
> time window, you should not use keyBy and an allWindow.
> However, this will only be executed with a parallelism of 1.
>
> 2018-03-19 13:54 GMT+01:00 Felipe Gutierrez <felipe.o.gutier...@gmail.com>
> :
>
>> thanks a lot Fabian,
>>
>> It clarified my way to developing. I am using keyBy, timeWindow, and
>> apply monad operator at the EventTimeStreamExampleJava
>> <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/example/streaming/EventTimeStreamExampleJava.java>
>> now. I am generating dates in order and with a bit out of orderness now at
>> LogSourceFunction
>> <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/util/LogSourceFunction.java>.
>> And only using Date as my key at LogLine
>> <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/util/LogLine.java>
>> object.
>>
>> If I understood watermarks well, my program should combine all the lines
>> that are inside the same watermark when I set ".timeWindow(Time.seconds(5),
>> Time.seconds(1))" and used ".apply(new LogLineCounterFunction())". But it
>> is still not happening because I didn't use a good key ".keyBy(lineLog ->
>> lineLog.getTime())" and my key at the LogLineCounterFunction class is still
>> the Date.
>>
>> public static class LogLineCounterFunction implements WindowFunction<
>>         LogLine, // input
>>         Tuple3<LogLine, Long, Integer>, // output
>>         Date, // key
>>         TimeWindow> { // window type
>>
>> What should I use as a key in my case?
>>
>> My output is combining only the lines with the same key (Date). I want to
>> combine the dates between the watermarks ".timeWindow(Time.seconds(5),
>> Time.seconds(1))"...
>>
>> 3> (LogLine{time=2003-12-15 16:31:08.534, line=' | 2003-12-15
>> 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 |
>> 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534
>> | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15
>> 16:31:08.534},1071516670000,9)
>> 3> (LogLine{time=2003-12-15 16:31:04.184, line=' | 2003-12-15
>> 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 |
>> 2003-12-15 16:31:04.184},1071516670000,4)
>> 3> (LogLine{time=2003-12-15 16:31:00.884, line=' | 2003-12-15
>> 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 |
>> 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884
>> | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15
>> 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 |
>> 2003-12-15 16:31:00.884},1071516670000,12)
>> 3> (LogLine{time=2003-12-15 16:31:03.784, line=' | 2003-12-15
>> 16:31:03.784},1071516670000,1)
>> 3> (LogLine{time=2003-12-15 16:31:06.334, line=' | 2003-12-15
>> 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 |
>> 2003-12-15 16:31:06.334},1071516670000,4)
>>
>>
>>
>>
>> On Mon, Mar 19, 2018 at 6:44 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> The timestamps of the stream records should be increasing (strict
>>> monotonicity is not required, a bit out of orderness can be handled due to
>>> watermarks).
>>> So, the events should also be generated with increasing timestamps. It
>>> looks like your generator generates random dates. I'd also generate data
>>> with millisecond precision, not just days.
>>>
>>> Also, a timestamp in Flink is the number of milliseconds since
>>> 1970-01-01-00:00:00.
>>> However, your timestamp extractor only returns the number of seconds
>>> since last minute (i.e., from 0 to 60). You should use Date.getTime()
>>> instead of Date.getSeconds().
>>>
>>> Best, Fabian
>>>
>>> 2018-03-16 18:08 GMT+01:00 Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com>:
>>>
>>>> Hi all,
>>>>
>>>> I am building an example with DataStream using Flink that has a fake
>>>> source generator of LogLine(Date d, String line). I want to work with
>>>> Watermarks on it so I created a class that implements 
>>>> AssignerWithPeriodicWatermarks.
>>>> If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream
>>>> I can group by second and concatenate the lines. When I use
>>>> ".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I
>>>> misunderstood something when I was reading about Event Time. Could anyone
>>>> help me please? My source code is as follow.
>>>>
>>>> Thanks for the ideas. Kind Regards,  Felipe
>>>>
>>>> package flink.example.streaming;
>>>>
>>>> import flink.util.LogLine;
>>>> import flink.util.LogSourceFunction;
>>>> import flink.util.UtilDate;
>>>> import org.apache.flink.api.common.functions.MapFunction;
>>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>>>> vironment;
>>>> import org.apache.flink.streaming.api.functions.AssignerWithPeriodi
>>>> cWatermarks;
>>>> import org.apache.flink.streaming.api.watermark.Watermark;
>>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>>>
>>>> import javax.annotation.Nullable;
>>>> import java.util.Date;
>>>>
>>>> public class EventTimeStreamExampleJava {
>>>>     public static void main(String[] args) throws Exception {
>>>>
>>>>         final StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>
>>>>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>
>>>>         DataStream<LogLine> dataStream = env
>>>>                 .addSource(new LogSourceFunction())
>>>>                 .assignTimestampsAndWatermarks(new
>>>> BoundedOutOfOrdernessGenerator())
>>>>                 .keyBy(lineLog -> lineLog.getSec())
>>>>                 // .timeWindow(Time.seconds(2))
>>>>                 .reduce((log1, log2) -> new LogLine(log1.getTime(),
>>>> log1.getLine() + " | " + log2.getLine()))
>>>>                 ;
>>>>
>>>>         dataStream.print();
>>>>
>>>>         env.execute("Window LogRead");
>>>>     }
>>>>
>>>>     public static class BoundedOutOfOrdernessGenerator implements
>>>> AssignerWithPeriodicWatermarks<LogLine> {
>>>>
>>>>         private final long maxOutOfOrderness = 3500; // 3.5 seconds
>>>>
>>>>         private long currentMaxTimestamp;
>>>>
>>>>         @Override
>>>>         public long extractTimestamp(LogLine element, long
>>>> previousElementTimestamp) {
>>>>             long timestamp = element.getTime().getSeconds();
>>>>             currentMaxTimestamp = Math.max(timestamp,
>>>> currentMaxTimestamp);
>>>>             return timestamp;
>>>>         }
>>>>
>>>>         @Nullable
>>>>         @Override
>>>>         public Watermark getCurrentWatermark() {
>>>>             // return the watermark as current highest timestamp minus
>>>> the out-of-orderness bound
>>>>             return new Watermark(currentMaxTimestamp -
>>>> maxOutOfOrderness);
>>>>         }
>>>>     }
>>>> }
>>>>
>>>> package flink.util;
>>>>
>>>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>>>>
>>>> public class LogSourceFunction implements SourceFunction<LogLine> {
>>>>
>>>>     private volatile boolean isRunning = true;
>>>>
>>>>     @Override
>>>>     public void run(SourceContext<LogLine> ctx) throws Exception {
>>>>         while (isRunning) {
>>>>             ctx.collect(new LogLine(UtilDate.getRandomSec(),
>>>> UtilDate.getRandomString()));
>>>>         }
>>>>     }
>>>>
>>>>     @Override
>>>>     public void cancel() {
>>>>         isRunning = false;
>>>>     }
>>>> }
>>>>
>>>> package flink.util;
>>>>
>>>> import java.util.Date;
>>>> import java.util.Objects;
>>>>
>>>> public class LogLine {
>>>>
>>>>     private Date time;
>>>>     private int sec;
>>>>     private String line;
>>>>
>>>>     public LogLine() {
>>>>     }
>>>>
>>>>     public LogLine(Date time, String line) {
>>>>         this.sec = time.getSeconds();
>>>>         this.time = time;
>>>>         this.line = line;
>>>>     }
>>>>
>>>>     public LogLine(int sec, String line) {
>>>>         this.sec = sec;
>>>>         this.time = UtilDate.getRandomDate(sec);
>>>>         this.line = line;
>>>>     }
>>>>
>>>>     public int getSec() {
>>>>         return sec;
>>>>     }
>>>>
>>>>     public void setSec(int sec) {
>>>>         this.sec = sec;
>>>>     }
>>>>
>>>>     public Date getTime() {
>>>>         return time;
>>>>     }
>>>>
>>>>     public String getLine() {
>>>>         return line;
>>>>     }
>>>>
>>>>     public void setTime(Date time) {
>>>>         this.time = time;
>>>>     }
>>>>
>>>>     public void setLine(String line) {
>>>>         this.line = line;
>>>>     }
>>>>
>>>>     @Override
>>>>     public boolean equals(Object o) {
>>>>         if (this == o) return true;
>>>>         if (o == null || getClass() != o.getClass()) return false;
>>>>         LogLine logLine = (LogLine) o;
>>>>         return Objects.equals(time, logLine.time) &&
>>>>                 Objects.equals(sec, logLine.sec) &&
>>>>                 Objects.equals(line, logLine.line);
>>>>     }
>>>>
>>>>     @Override
>>>>     public int hashCode() {
>>>>
>>>>         return Objects.hash(time, sec, line);
>>>>     }
>>>>
>>>>     @Override
>>>>     public String toString() {
>>>>         return "LogLine{" +
>>>>                 "time=" + time +
>>>>                 ", sec=" + sec +
>>>>                 ", line='" + line +
>>>>                 '}';
>>>>     }
>>>> }
>>>>
>>>>
>>>> --
>>>>
>>>> *---- Felipe Oliveira Gutierrez*
>>>>
>>>> *-- skype: felipe.o.gutierrez*
>>>> *--* *https://felipeogutierrez.blogspot.com
>>>> <https://felipeogutierrez.blogspot.com>*
>>>>
>>>
>>>
>>
>>
>> --
>>
>> *---- Felipe Oliveira Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>
>


-- 

*---- Felipe Oliveira Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*

Reply via email to