thanks, I did using ".timeWindowAll(Time.seconds(5), Time.seconds(1)).apply(new LogLineAllWindowFunction());" My output is filtering only tha values inside the window.
thanks, Felipe On Mon, Mar 19, 2018 at 10:54 AM, Fabian Hueske <fhue...@gmail.com> wrote: > If you don't want to partition by key, i.e., have a single result for each > time window, you should not use keyBy and an allWindow. > However, this will only be executed with a parallelism of 1. > > 2018-03-19 13:54 GMT+01:00 Felipe Gutierrez <felipe.o.gutier...@gmail.com> > : > >> thanks a lot Fabian, >> >> It clarified my way to developing. I am using keyBy, timeWindow, and >> apply monad operator at the EventTimeStreamExampleJava >> <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/example/streaming/EventTimeStreamExampleJava.java> >> now. I am generating dates in order and with a bit out of orderness now at >> LogSourceFunction >> <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/util/LogSourceFunction.java>. >> And only using Date as my key at LogLine >> <https://github.com/felipegutierrez/flink-first/blob/master/src/main/java/flink/util/LogLine.java> >> object. >> >> If I understood watermarks well, my program should combine all the lines >> that are inside the same watermark when I set ".timeWindow(Time.seconds(5), >> Time.seconds(1))" and used ".apply(new LogLineCounterFunction())". But it >> is still not happening because I didn't use a good key ".keyBy(lineLog -> >> lineLog.getTime())" and my key at the LogLineCounterFunction class is still >> the Date. >> >> public static class LogLineCounterFunction implements WindowFunction< >> LogLine, // input >> Tuple3<LogLine, Long, Integer>, // output >> Date, // key >> TimeWindow> { // window type >> >> What should I use as a key in my case? >> >> My output is combining only the lines with the same key (Date). I want to >> combine the dates between the watermarks ".timeWindow(Time.seconds(5), >> Time.seconds(1))"... >> >> 3> (LogLine{time=2003-12-15 16:31:08.534, line=' | 2003-12-15 >> 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | >> 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 >> | 2003-12-15 16:31:08.534 | 2003-12-15 16:31:08.534 | 2003-12-15 >> 16:31:08.534},1071516670000,9) >> 3> (LogLine{time=2003-12-15 16:31:04.184, line=' | 2003-12-15 >> 16:31:04.184 | 2003-12-15 16:31:04.184 | 2003-12-15 16:31:04.184 | >> 2003-12-15 16:31:04.184},1071516670000,4) >> 3> (LogLine{time=2003-12-15 16:31:00.884, line=' | 2003-12-15 >> 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | >> 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 >> | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 >> 16:31:00.884 | 2003-12-15 16:31:00.884 | 2003-12-15 16:31:00.884 | >> 2003-12-15 16:31:00.884},1071516670000,12) >> 3> (LogLine{time=2003-12-15 16:31:03.784, line=' | 2003-12-15 >> 16:31:03.784},1071516670000,1) >> 3> (LogLine{time=2003-12-15 16:31:06.334, line=' | 2003-12-15 >> 16:31:06.334 | 2003-12-15 16:31:06.334 | 2003-12-15 16:31:06.334 | >> 2003-12-15 16:31:06.334},1071516670000,4) >> >> >> >> >> On Mon, Mar 19, 2018 at 6:44 AM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi, >>> >>> The timestamps of the stream records should be increasing (strict >>> monotonicity is not required, a bit out of orderness can be handled due to >>> watermarks). >>> So, the events should also be generated with increasing timestamps. It >>> looks like your generator generates random dates. I'd also generate data >>> with millisecond precision, not just days. >>> >>> Also, a timestamp in Flink is the number of milliseconds since >>> 1970-01-01-00:00:00. >>> However, your timestamp extractor only returns the number of seconds >>> since last minute (i.e., from 0 to 60). You should use Date.getTime() >>> instead of Date.getSeconds(). >>> >>> Best, Fabian >>> >>> 2018-03-16 18:08 GMT+01:00 Felipe Gutierrez < >>> felipe.o.gutier...@gmail.com>: >>> >>>> Hi all, >>>> >>>> I am building an example with DataStream using Flink that has a fake >>>> source generator of LogLine(Date d, String line). I want to work with >>>> Watermarks on it so I created a class that implements >>>> AssignerWithPeriodicWatermarks. >>>> If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream >>>> I can group by second and concatenate the lines. When I use >>>> ".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I >>>> misunderstood something when I was reading about Event Time. Could anyone >>>> help me please? My source code is as follow. >>>> >>>> Thanks for the ideas. Kind Regards, Felipe >>>> >>>> package flink.example.streaming; >>>> >>>> import flink.util.LogLine; >>>> import flink.util.LogSourceFunction; >>>> import flink.util.UtilDate; >>>> import org.apache.flink.api.common.functions.MapFunction; >>>> import org.apache.flink.streaming.api.TimeCharacteristic; >>>> import org.apache.flink.streaming.api.datastream.DataStream; >>>> import org.apache.flink.streaming.api.environment.StreamExecutionEn >>>> vironment; >>>> import org.apache.flink.streaming.api.functions.AssignerWithPeriodi >>>> cWatermarks; >>>> import org.apache.flink.streaming.api.watermark.Watermark; >>>> import org.apache.flink.streaming.api.windowing.time.Time; >>>> >>>> import javax.annotation.Nullable; >>>> import java.util.Date; >>>> >>>> public class EventTimeStreamExampleJava { >>>> public static void main(String[] args) throws Exception { >>>> >>>> final StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> >>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>> >>>> DataStream<LogLine> dataStream = env >>>> .addSource(new LogSourceFunction()) >>>> .assignTimestampsAndWatermarks(new >>>> BoundedOutOfOrdernessGenerator()) >>>> .keyBy(lineLog -> lineLog.getSec()) >>>> // .timeWindow(Time.seconds(2)) >>>> .reduce((log1, log2) -> new LogLine(log1.getTime(), >>>> log1.getLine() + " | " + log2.getLine())) >>>> ; >>>> >>>> dataStream.print(); >>>> >>>> env.execute("Window LogRead"); >>>> } >>>> >>>> public static class BoundedOutOfOrdernessGenerator implements >>>> AssignerWithPeriodicWatermarks<LogLine> { >>>> >>>> private final long maxOutOfOrderness = 3500; // 3.5 seconds >>>> >>>> private long currentMaxTimestamp; >>>> >>>> @Override >>>> public long extractTimestamp(LogLine element, long >>>> previousElementTimestamp) { >>>> long timestamp = element.getTime().getSeconds(); >>>> currentMaxTimestamp = Math.max(timestamp, >>>> currentMaxTimestamp); >>>> return timestamp; >>>> } >>>> >>>> @Nullable >>>> @Override >>>> public Watermark getCurrentWatermark() { >>>> // return the watermark as current highest timestamp minus >>>> the out-of-orderness bound >>>> return new Watermark(currentMaxTimestamp - >>>> maxOutOfOrderness); >>>> } >>>> } >>>> } >>>> >>>> package flink.util; >>>> >>>> import org.apache.flink.streaming.api.functions.source.SourceFunction; >>>> >>>> public class LogSourceFunction implements SourceFunction<LogLine> { >>>> >>>> private volatile boolean isRunning = true; >>>> >>>> @Override >>>> public void run(SourceContext<LogLine> ctx) throws Exception { >>>> while (isRunning) { >>>> ctx.collect(new LogLine(UtilDate.getRandomSec(), >>>> UtilDate.getRandomString())); >>>> } >>>> } >>>> >>>> @Override >>>> public void cancel() { >>>> isRunning = false; >>>> } >>>> } >>>> >>>> package flink.util; >>>> >>>> import java.util.Date; >>>> import java.util.Objects; >>>> >>>> public class LogLine { >>>> >>>> private Date time; >>>> private int sec; >>>> private String line; >>>> >>>> public LogLine() { >>>> } >>>> >>>> public LogLine(Date time, String line) { >>>> this.sec = time.getSeconds(); >>>> this.time = time; >>>> this.line = line; >>>> } >>>> >>>> public LogLine(int sec, String line) { >>>> this.sec = sec; >>>> this.time = UtilDate.getRandomDate(sec); >>>> this.line = line; >>>> } >>>> >>>> public int getSec() { >>>> return sec; >>>> } >>>> >>>> public void setSec(int sec) { >>>> this.sec = sec; >>>> } >>>> >>>> public Date getTime() { >>>> return time; >>>> } >>>> >>>> public String getLine() { >>>> return line; >>>> } >>>> >>>> public void setTime(Date time) { >>>> this.time = time; >>>> } >>>> >>>> public void setLine(String line) { >>>> this.line = line; >>>> } >>>> >>>> @Override >>>> public boolean equals(Object o) { >>>> if (this == o) return true; >>>> if (o == null || getClass() != o.getClass()) return false; >>>> LogLine logLine = (LogLine) o; >>>> return Objects.equals(time, logLine.time) && >>>> Objects.equals(sec, logLine.sec) && >>>> Objects.equals(line, logLine.line); >>>> } >>>> >>>> @Override >>>> public int hashCode() { >>>> >>>> return Objects.hash(time, sec, line); >>>> } >>>> >>>> @Override >>>> public String toString() { >>>> return "LogLine{" + >>>> "time=" + time + >>>> ", sec=" + sec + >>>> ", line='" + line + >>>> '}'; >>>> } >>>> } >>>> >>>> >>>> -- >>>> >>>> *---- Felipe Oliveira Gutierrez* >>>> >>>> *-- skype: felipe.o.gutierrez* >>>> *--* *https://felipeogutierrez.blogspot.com >>>> <https://felipeogutierrez.blogspot.com>* >>>> >>> >>> >> >> >> -- >> >> *---- Felipe Oliveira Gutierrez* >> >> *-- skype: felipe.o.gutierrez* >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com>* >> > > -- *---- Felipe Oliveira Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>*