If you don't want to partition by key, i.e., have a single result for each
time window, you should not use keyBy and an allWindow.
However, this will only be executed with a parallelism of 1.

2018-03-19 13:54 GMT+01:00 Felipe Gutierrez <felipe.o.gutier...@gmail.com>:

> thanks a lot Fabian,
>
> It clarified my way to developing. I am using keyBy, timeWindow, and apply
> monad operator at the EventTimeStreamExampleJava
> <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/example/streaming/EventTimeStreamExampleJava.java>
> now. I am generating dates in order and with a bit out of orderness now at
> LogSourceFunction
> <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/util/LogSourceFunction.java>.
> And only using Date as my key at LogLine
> <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/util/LogLine.java>
> object.
>
> If I understood watermarks well, my program should combine all the lines
> that are inside the same watermark when I set ".timeWindow(Time.seconds(5),
> Time.seconds(1))" and used ".apply(new LogLineCounterFunction())". But it
> is still not happening because I didn't use a good key ".keyBy(lineLog ->
> lineLog.getTime())" and my key at the LogLineCounterFunction class is still
> the Date.
>
> public static class LogLineCounterFunction implements WindowFunction<
>         LogLine, // input
>         Tuple3<LogLine, Long, Integer>, // output
>         Date, // key
>         TimeWindow> { // window type
>
> What should I use as a key in my case?
>
> My output is combining only the lines with the same key (Date). I want to
> combine the dates between the watermarks ".timeWindow(Time.seconds(5),
> Time.seconds(1))"...
>
> 3> (LogLine{time=2003-12-15 16:31:08.534, line=' | 2003-12-15 16:31:08.534
> | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15
> 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 |
> 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15
> 16:31:08.534},1071516670000,9)
> 3> (LogLine{time=2003-12-15 16:31:04.184, line=' | 2003-12-15 16:31:04.184
> | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15
> 16:31:04.184},1071516670000,4)
> 3> (LogLine{time=2003-12-15 16:31:00.884, line=' | 2003-12-15 16:31:00.884
> | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15
> 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 |
> 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884
> | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15
> 16:31:00.884},1071516670000,12)
> 3> (LogLine{time=2003-12-15 16:31:03.784, line=' | 2003-12-15
> 16:31:03.784},1071516670000,1)
> 3> (LogLine{time=2003-12-15 16:31:06.334, line=' | 2003-12-15 16:31:06.334
> | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15
> 16:31:06.334},1071516670000,4)
>
>
>
>
> On Mon, Mar 19, 2018 at 6:44 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi,
>>
>> The timestamps of the stream records should be increasing (strict
>> monotonicity is not required, a bit out of orderness can be handled due to
>> watermarks).
>> So, the events should also be generated with increasing timestamps. It
>> looks like your generator generates random dates. I'd also generate data
>> with millisecond precision, not just days.
>>
>> Also, a timestamp in Flink is the number of milliseconds since
>> 1970-01-01-00:00:00.
>> However, your timestamp extractor only returns the number of seconds
>> since last minute (i.e., from 0 to 60). You should use Date.getTime()
>> instead of Date.getSeconds().
>>
>> Best, Fabian
>>
>> 2018-03-16 18:08 GMT+01:00 Felipe Gutierrez <felipe.o.gutier...@gmail.com
>> >:
>>
>>> Hi all,
>>>
>>> I am building an example with DataStream using Flink that has a fake
>>> source generator of LogLine(Date d, String line). I want to work with
>>> Watermarks on it so I created a class that implements 
>>> AssignerWithPeriodicWatermarks.
>>> If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream
>>> I can group by second and concatenate the lines. When I use
>>> ".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I
>>> misunderstood something when I was reading about Event Time. Could anyone
>>> help me please? My source code is as follow.
>>>
>>> Thanks for the ideas. Kind Regards,  Felipe
>>>
>>> package flink.example.streaming;
>>>
>>> import flink.util.LogLine;
>>> import flink.util.LogSourceFunction;
>>> import flink.util.UtilDate;
>>> import org.apache.flink.api.common.functions.MapFunction;
>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>>> vironment;
>>> import org.apache.flink.streaming.api.functions.AssignerWithPeriodi
>>> cWatermarks;
>>> import org.apache.flink.streaming.api.watermark.Watermark;
>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>>
>>> import javax.annotation.Nullable;
>>> import java.util.Date;
>>>
>>> public class EventTimeStreamExampleJava {
>>>     public static void main(String[] args) throws Exception {
>>>
>>>         final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>
>>>         DataStream<LogLine> dataStream = env
>>>                 .addSource(new LogSourceFunction())
>>>                 .assignTimestampsAndWatermarks(new
>>> BoundedOutOfOrdernessGenerator())
>>>                 .keyBy(lineLog -> lineLog.getSec())
>>>                 // .timeWindow(Time.seconds(2))
>>>                 .reduce((log1, log2) -> new LogLine(log1.getTime(),
>>> log1.getLine() + " | " + log2.getLine()))
>>>                 ;
>>>
>>>         dataStream.print();
>>>
>>>         env.execute("Window LogRead");
>>>     }
>>>
>>>     public static class BoundedOutOfOrdernessGenerator implements
>>> AssignerWithPeriodicWatermarks<LogLine> {
>>>
>>>         private final long maxOutOfOrderness = 3500; // 3.5 seconds
>>>
>>>         private long currentMaxTimestamp;
>>>
>>>         @Override
>>>         public long extractTimestamp(LogLine element, long
>>> previousElementTimestamp) {
>>>             long timestamp = element.getTime().getSeconds();
>>>             currentMaxTimestamp = Math.max(timestamp,
>>> currentMaxTimestamp);
>>>             return timestamp;
>>>         }
>>>
>>>         @Nullable
>>>         @Override
>>>         public Watermark getCurrentWatermark() {
>>>             // return the watermark as current highest timestamp minus
>>> the out-of-orderness bound
>>>             return new Watermark(currentMaxTimestamp -
>>> maxOutOfOrderness);
>>>         }
>>>     }
>>> }
>>>
>>> package flink.util;
>>>
>>> import org.apache.flink.streaming.api.functions.source.SourceFunction;
>>>
>>> public class LogSourceFunction implements SourceFunction<LogLine> {
>>>
>>>     private volatile boolean isRunning = true;
>>>
>>>     @Override
>>>     public void run(SourceContext<LogLine> ctx) throws Exception {
>>>         while (isRunning) {
>>>             ctx.collect(new LogLine(UtilDate.getRandomSec(),
>>> UtilDate.getRandomString()));
>>>         }
>>>     }
>>>
>>>     @Override
>>>     public void cancel() {
>>>         isRunning = false;
>>>     }
>>> }
>>>
>>> package flink.util;
>>>
>>> import java.util.Date;
>>> import java.util.Objects;
>>>
>>> public class LogLine {
>>>
>>>     private Date time;
>>>     private int sec;
>>>     private String line;
>>>
>>>     public LogLine() {
>>>     }
>>>
>>>     public LogLine(Date time, String line) {
>>>         this.sec = time.getSeconds();
>>>         this.time = time;
>>>         this.line = line;
>>>     }
>>>
>>>     public LogLine(int sec, String line) {
>>>         this.sec = sec;
>>>         this.time = UtilDate.getRandomDate(sec);
>>>         this.line = line;
>>>     }
>>>
>>>     public int getSec() {
>>>         return sec;
>>>     }
>>>
>>>     public void setSec(int sec) {
>>>         this.sec = sec;
>>>     }
>>>
>>>     public Date getTime() {
>>>         return time;
>>>     }
>>>
>>>     public String getLine() {
>>>         return line;
>>>     }
>>>
>>>     public void setTime(Date time) {
>>>         this.time = time;
>>>     }
>>>
>>>     public void setLine(String line) {
>>>         this.line = line;
>>>     }
>>>
>>>     @Override
>>>     public boolean equals(Object o) {
>>>         if (this == o) return true;
>>>         if (o == null || getClass() != o.getClass()) return false;
>>>         LogLine logLine = (LogLine) o;
>>>         return Objects.equals(time, logLine.time) &&
>>>                 Objects.equals(sec, logLine.sec) &&
>>>                 Objects.equals(line, logLine.line);
>>>     }
>>>
>>>     @Override
>>>     public int hashCode() {
>>>
>>>         return Objects.hash(time, sec, line);
>>>     }
>>>
>>>     @Override
>>>     public String toString() {
>>>         return "LogLine{" +
>>>                 "time=" + time +
>>>                 ", sec=" + sec +
>>>                 ", line='" + line +
>>>                 '}';
>>>     }
>>> }
>>>
>>>
>>> --
>>>
>>> *---- Felipe Oliveira Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>
>>
>
>
> --
>
> *---- Felipe Oliveira Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>

Reply via email to