Hi all,

I am building an example with DataStream using Flink that has a fake source
generator of LogLine(Date d, String line). I want to work with Watermarks
on it so I created a class that implements AssignerWithPeriodicWatermarks.
If I don't use the monad ".timeWindow(Time.seconds(2))" on the data stream
I can group by second and concatenate the lines. When I use
".timeWindow(Time.seconds(2))" nothing is shown on the output. I guess I
misunderstood something when I was reading about Event Time. Could anyone
help me please? My source code is as follow.

Thanks for the ideas. Kind Regards,  Felipe

package flink.example.streaming;

import flink.util.LogLine;
import flink.util.LogSourceFunction;
import flink.util.UtilDate;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

import javax.annotation.Nullable;
import java.util.Date;

public class EventTimeStreamExampleJava {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<LogLine> dataStream = env
                .addSource(new LogSourceFunction())
                .assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessGenerator())
                .keyBy(lineLog -> lineLog.getSec())
                // .timeWindow(Time.seconds(2))
                .reduce((log1, log2) -> new LogLine(log1.getTime(),
log1.getLine() + " | " + log2.getLine()))
                ;

        dataStream.print();

        env.execute("Window LogRead");
    }

    public static class BoundedOutOfOrdernessGenerator implements
AssignerWithPeriodicWatermarks<LogLine> {

        private final long maxOutOfOrderness = 3500; // 3.5 seconds

        private long currentMaxTimestamp;

        @Override
        public long extractTimestamp(LogLine element, long
previousElementTimestamp) {
            long timestamp = element.getTime().getSeconds();
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            return timestamp;
        }

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            // return the watermark as current highest timestamp minus the
out-of-orderness bound
            return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }
}

package flink.util;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class LogSourceFunction implements SourceFunction<LogLine> {

    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<LogLine> ctx) throws Exception {
        while (isRunning) {
            ctx.collect(new LogLine(UtilDate.getRandomSec(),
UtilDate.getRandomString()));
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

package flink.util;

import java.util.Date;
import java.util.Objects;

public class LogLine {

    private Date time;
    private int sec;
    private String line;

    public LogLine() {
    }

    public LogLine(Date time, String line) {
        this.sec = time.getSeconds();
        this.time = time;
        this.line = line;
    }

    public LogLine(int sec, String line) {
        this.sec = sec;
        this.time = UtilDate.getRandomDate(sec);
        this.line = line;
    }

    public int getSec() {
        return sec;
    }

    public void setSec(int sec) {
        this.sec = sec;
    }

    public Date getTime() {
        return time;
    }

    public String getLine() {
        return line;
    }

    public void setTime(Date time) {
        this.time = time;
    }

    public void setLine(String line) {
        this.line = line;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        LogLine logLine = (LogLine) o;
        return Objects.equals(time, logLine.time) &&
                Objects.equals(sec, logLine.sec) &&
                Objects.equals(line, logLine.line);
    }

    @Override
    public int hashCode() {

        return Objects.hash(time, sec, line);
    }

    @Override
    public String toString() {
        return "LogLine{" +
                "time=" + time +
                ", sec=" + sec +
                ", line='" + line +
                '}';
    }
}


-- 

*---- Felipe Oliveira Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*

Reply via email to