If you don't want to partition by key, i.e., have a single result for each time window, you should not use keyBy and an allWindow. However, this will only be executed with a parallelism of 1.
2018-03-19 13:54 GMT+01:00 Felipe Gutierrez <felipe.o.gutier...@gmail.com>: > thanks a lot Fabian, > > It clarified my way to developing. I am using keyBy, timeWindow, and apply > monad operator at the EventTimeStreamExampleJava > <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/example/streaming/EventTimeStreamExampleJava.java> > now. I am generating dates in order and with a bit out of orderness now at > LogSourceFunction > <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/util/LogSourceFunction.java>. > And only using Date as my key at LogLine > <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/util/LogLine.java> > object. > > If I understood watermarks well, my program should combine all the lines > that are inside the same watermark when I set ".timeWindow(Time.seconds(5), > Time.seconds(1))" and used ".apply(new LogLineCounterFunction())". But it > is still not happening because I didn't use a good key ".keyBy(lineLog -> > lineLog.getTime())" and my key at the LogLineCounterFunction class is still > the Date. > > public static class LogLineCounterFunction implements WindowFunction< > LogLine, // input > Tuple3<LogLine, Long, Integer>, // output > Date, // key > TimeWindow> { // window type > > What should I use as a key in my case? > > My output is combining only the lines with the same key (Date). I want to > combine the dates between the watermarks ".timeWindow(Time.seconds(5), > Time.seconds(1))"... > > 3> (LogLine{time=2003-12-15 16:31:08.534, line=' | 2003-12-15 16:31:08.534 > | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 > 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | > 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 > 16:31:08.534},1071516670000,9) > 3> (LogLine{time=2003-12-15 16:31:04.184, line=' | 2003-12-15 16:31:04.184 > | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15 > 16:31:04.184},1071516670000,4) > 3> (LogLine{time=2003-12-15 16:31:00.884, line=' | 2003-12-15 16:31:00.884 > | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 > 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | > 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 > | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 > 16:31:00.884},1071516670000,12) > 3> (LogLine{time=2003-12-15 16:31:03.784, line=' | 2003-12-15 > 16:31:03.784},1071516670000,1) > 3> (LogLine{time=2003-12-15 16:31:06.334, line=' | 2003-12-15 16:31:06.334 > | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15 > 16:31:06.334},1071516670000,4) > > > > > On Mon, Mar 19, 2018 at 6:44 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi, >> >> The timestamps of the stream records should be increasing (strict >> monotonicity is not required, a bit out of orderness can be handled due to >> watermarks). >> So, the events should also be generated with increasing timestamps. It >> looks like your generator generates random dates. I'd also generate data >> with millisecond precision, not just days. >> >> Also, a timestamp in Flink is the number of milliseconds since >> 1970-01-01-00:00:00. >> However, your timestamp extractor only returns the number of seconds >> since last minute (i.e., from 0 to 60). You should use Date.getTime() >> instead of Date.getSeconds(). >> >> Best, Fabian >> >> 2018-03-16 18:08 GMT+01:00 Felipe Gutierrez <felipe.o.gutier...@gmail.com >> >: >> >>> Hi all, >>> >>> I am building an example with DataStream using Flink that has a fake >>> source generator of LogLine(Date d, String line). I want to work with >>> Watermarks on it so I created a class that implements >>> AssignerWithPeriodicWatermarks. >>> If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream >>> I can group by second and concatenate the lines. When I use >>> ".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I >>> misunderstood something when I was reading about Event Time. Could anyone >>> help me please? My source code is as follow. >>> >>> Thanks for the ideas. Kind Regards, Felipe >>> >>> package flink.example.streaming; >>> >>> import flink.util.LogLine; >>> import flink.util.LogSourceFunction; >>> import flink.util.UtilDate; >>> import org.apache.flink.api.common.functions.MapFunction; >>> import org.apache.flink.streaming.api.TimeCharacteristic; >>> import org.apache.flink.streaming.api.datastream.DataStream; >>> import org.apache.flink.streaming.api.environment.StreamExecutionEn >>> vironment; >>> import org.apache.flink.streaming.api.functions.AssignerWithPeriodi >>> cWatermarks; >>> import org.apache.flink.streaming.api.watermark.Watermark; >>> import org.apache.flink.streaming.api.windowing.time.Time; >>> >>> import javax.annotation.Nullable; >>> import java.util.Date; >>> >>> public class EventTimeStreamExampleJava { >>> public static void main(String[] args) throws Exception { >>> >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> >>> DataStream<LogLine> dataStream = env >>> .addSource(new LogSourceFunction()) >>> .assignTimestampsAndWatermarks(new >>> BoundedOutOfOrdernessGenerator()) >>> .keyBy(lineLog -> lineLog.getSec()) >>> // .timeWindow(Time.seconds(2)) >>> .reduce((log1, log2) -> new LogLine(log1.getTime(), >>> log1.getLine() + " | " + log2.getLine())) >>> ; >>> >>> dataStream.print(); >>> >>> env.execute("Window LogRead"); >>> } >>> >>> public static class BoundedOutOfOrdernessGenerator implements >>> AssignerWithPeriodicWatermarks<LogLine> { >>> >>> private final long maxOutOfOrderness = 3500; // 3.5 seconds >>> >>> private long currentMaxTimestamp; >>> >>> @Override >>> public long extractTimestamp(LogLine element, long >>> previousElementTimestamp) { >>> long timestamp = element.getTime().getSeconds(); >>> currentMaxTimestamp = Math.max(timestamp, >>> currentMaxTimestamp); >>> return timestamp; >>> } >>> >>> @Nullable >>> @Override >>> public Watermark getCurrentWatermark() { >>> // return the watermark as current highest timestamp minus >>> the out-of-orderness bound >>> return new Watermark(currentMaxTimestamp - >>> maxOutOfOrderness); >>> } >>> } >>> } >>> >>> package flink.util; >>> >>> import org.apache.flink.streaming.api.functions.source.SourceFunction; >>> >>> public class LogSourceFunction implements SourceFunction<LogLine> { >>> >>> private volatile boolean isRunning = true; >>> >>> @Override >>> public void run(SourceContext<LogLine> ctx) throws Exception { >>> while (isRunning) { >>> ctx.collect(new LogLine(UtilDate.getRandomSec(), >>> UtilDate.getRandomString())); >>> } >>> } >>> >>> @Override >>> public void cancel() { >>> isRunning = false; >>> } >>> } >>> >>> package flink.util; >>> >>> import java.util.Date; >>> import java.util.Objects; >>> >>> public class LogLine { >>> >>> private Date time; >>> private int sec; >>> private String line; >>> >>> public LogLine() { >>> } >>> >>> public LogLine(Date time, String line) { >>> this.sec = time.getSeconds(); >>> this.time = time; >>> this.line = line; >>> } >>> >>> public LogLine(int sec, String line) { >>> this.sec = sec; >>> this.time = UtilDate.getRandomDate(sec); >>> this.line = line; >>> } >>> >>> public int getSec() { >>> return sec; >>> } >>> >>> public void setSec(int sec) { >>> this.sec = sec; >>> } >>> >>> public Date getTime() { >>> return time; >>> } >>> >>> public String getLine() { >>> return line; >>> } >>> >>> public void setTime(Date time) { >>> this.time = time; >>> } >>> >>> public void setLine(String line) { >>> this.line = line; >>> } >>> >>> @Override >>> public boolean equals(Object o) { >>> if (this == o) return true; >>> if (o == null || getClass() != o.getClass()) return false; >>> LogLine logLine = (LogLine) o; >>> return Objects.equals(time, logLine.time) && >>> Objects.equals(sec, logLine.sec) && >>> Objects.equals(line, logLine.line); >>> } >>> >>> @Override >>> public int hashCode() { >>> >>> return Objects.hash(time, sec, line); >>> } >>> >>> @Override >>> public String toString() { >>> return "LogLine{" + >>> "time=" + time + >>> ", sec=" + sec + >>> ", line='" + line + >>> '}'; >>> } >>> } >>> >>> >>> -- >>> >>> *---- Felipe Oliveira Gutierrez* >>> >>> *-- skype: felipe.o.gutierrez* >>> *--* *https://felipeogutierrez.blogspot.com >>> <https://felipeogutierrez.blogspot.com>* >>> >> >> > > > -- > > *---- Felipe Oliveira Gutierrez* > > *-- skype: felipe.o.gutierrez* > *--* *https://felipeogutierrez.blogspot.com > <https://felipeogutierrez.blogspot.com>* >