Hi Chengzhi,
if you emit a watermark even though there is still data with a lower
timestamp, you generate "late data" that either needs to be processed in
a separate branch of your pipeline (see sideOutputLateData() [1]) or
should force your existing operators to update their previously emitted
results. The latter means holding state or the contents of your windows
longer (see allowedLateness() [1]). I think in general a processing time
watermark strategy might not be suitable for reprocessing. Either you
parameterize your watermark generator such that you can pass information
through job parameters or you use another strategy such as
BoundedOutOfOrdernessTimestampExtractor [2] and sinks that allow
idempotent updates.
I hope this helps.
Regards,
Timo
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#windows
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamp_extractors.html
Am 02.04.18 um 23:51 schrieb Chengzhi Zhao:
Hello, flink community,
I am using period watermark and extract the event time from each
records from files in S3. I am using the `TimeLagWatermarkGenerator`
as it was mentioned in flink documentation.
Currently, a new watermark will be generated using processing time by
fixed amount
override def getCurrentWatermark: Watermark = {
new Watermark(System.currentTimeMillis() - maxTimeLag)
}
This would work fine as long as process is running. However, in case
of failures, I mean if there was some bad data or out of memory
occurs, I need to stop the process and it will take me time to get
back. If the maxTimeLag=3 hours, and it took me 12 hours to realize
and fix it.
My question is since I am using processing time as part of the
watermark, when flink resumed from failures, will some records might
be ignored by the watermark? And what's the best practice to catchup
and continue without losing data? Thanks!
Best,
Chengzhi