Hey!
Second time posting to a mailing lists, lets hope I'm doing this
correctly :)
My usecase is to take data from the mediawiki dumps and stream it into
Flink via the `readTextFile` method. The dumps are TSV files with an
event per line, each event have a timestamp and a type. I want to use
event time processing and simply print out how many of each event type
there is per hour. The data can be out of order, so I have 1 hour
tolerance.
What I expect to happen here is that as it goes through a month of data,
it will print out the hours as the watermark passes 1 hour. So I'll get
output continuously until the end.
What really happens is that the program outputs nothing until it is done
and then it outputs everything. The timestamp is also stuck at
9223372036854776000 in the web management. If I switch to using
CountWindows instead of timewindows, it outputs continuously like I
would expect it too, so it seems to be watermark related.
I'm running Flink version 1.11.1 on JVM version:
OpenJDK 64-Bit Server VM - GraalVM Community - 11/11.0.7+10-jvmci-20.1-b02
The parallel setting is 1 and it's running on my laptop.
I don't know how much code I'm allowed to attach here, so I've created a
github repo with the complete self standing example [1]. To get the data
used, run the following commands:
$ wget
https://dumps.wikimedia.org/other/mediawiki_history/2020-07/enwiki/2020-07.enwiki.2016-04.tsv.bz2
$ pv -cN source < 2020-07.enwiki.2016-04.tsv.bz2 | bzcat | pv -cN bzcat | sort
-k4 > 2020-07.enwiki.2016-04.sorted.tsv
If you don't have pv installed, just remove that part, I just like to
have an overview.
The main code part is this:
package org.example.prow;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.example.prow.wikimedia.Event;
import java.time.Duration;
public class App {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final String fileNameInput =
"file:///home/rhermes/madsci/thesis/data/mediawiki_history/2020-07.enwiki.2016-04.sorted.tsv";
final DataStream<String> linesIn = env.readTextFile(fileNameInput);
final SingleOutputStreamOperator<Event> jj = linesIn.map(value -> new
Event(value));
final WatermarkStrategy<Event> mew =
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofHours(1)).withTimestampAssigner((element,
recordTimestamp) -> element.eventTimestamp.toEpochSecond() * 1000);
final DataStream<Event> props = jj.assignTimestampsAndWatermarks(mew);
final KeyedStream<Event, String> praps = props.keyBy(e ->
e.eventEntity.toString());
praps.window(TumblingEventTimeWindows.of(Time.hours(1))).sum("something").print("JAJ!");
env.execute("FlinkWikipediaHistoryTopEditors");
}
}
If you see any erors here, please tell me, this is sort of driving me
mad >_<.
Best regards,
Teodor Spæren
[1] https://github.com/rHermes/flink-question-001