Thanks Fabian! This is very helpful! Best, Chengzhi
On Wed, Apr 4, 2018 at 9:02 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Chengzhi, > > You can access the current watermark from the Context object of a > ProcessFunction [1] and store it in operator state [2]. > In case of a restart, the state will be restored with the watermark that > was active when the checkpoint (or savepoint) was taken. Note, this won't > be the last watermark before the failure happened. > > Best, > Fabian > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.4/dev/stream/operators/process_function.html > [2] https://ci.apache.org/projects/flink/flink-docs- > release-1.4/dev/stream/state/state.html#operator-state > > 2018-04-03 19:39 GMT+02:00 Chengzhi Zhao <w.zhaocheng...@gmail.com>: > >> Thanks Timo for your response and the references. >> >> I will try BoundedOutOfOrdernessTimestampExtractor, and if it does't >> work as expected, I will handle it as a separated pipeline. >> Also, is there a way to retrieve the last watermark before/after failure? >> So maybe I can persist the watermark to external storage and resume as a >> separated >> pipeline? >> >> Best, >> Chengzhi >> >> >> On Tue, Apr 3, 2018 at 7:58 AM, Timo Walther <twal...@apache.org> wrote: >> >>> Hi Chengzhi, >>> >>> if you emit a watermark even though there is still data with a lower >>> timestamp, you generate "late data" that either needs to be processed in a >>> separate branch of your pipeline (see sideOutputLateData() [1]) or should >>> force your existing operators to update their previously emitted results. >>> The latter means holding state or the contents of your windows longer (see >>> allowedLateness() [1]). I think in general a processing time watermark >>> strategy might not be suitable for reprocessing. Either you parameterize >>> your watermark generator such that you can pass information through job >>> parameters or you use another strategy such as >>> BoundedOutOfOrdernessTimestampExtractor [2] and sinks that allow >>> idempotent updates. >>> >>> I hope this helps. >>> >>> Regards, >>> Timo >>> >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ >>> dev/stream/operators/windows.html#windows >>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ >>> dev/event_timestamp_extractors.html >>> >>> Am 02.04.18 um 23:51 schrieb Chengzhi Zhao: >>> >>> Hello, flink community, >>> >>> I am using period watermark and extract the event time from each records >>> from files in S3. I am using the `TimeLagWatermarkGenerator` as it was >>> mentioned in flink documentation. >>> >>> Currently, a new watermark will be generated using processing time by >>> fixed amount >>> >>> override def getCurrentWatermark: Watermark = { >>> new Watermark(System.currentTimeMillis() - maxTimeLag) >>> } >>> >>> This would work fine as long as process is running. However, in case of >>> failures, I mean if there was some bad data or out of memory occurs, I need >>> to stop the process and it will take me time to get back. If the >>> maxTimeLag=3 >>> hours, and it took me 12 hours to realize and fix it. >>> >>> My question is since I am using processing time as part of the >>> watermark, when flink resumed from failures, will some records might be >>> ignored by the watermark? And what's the best practice to catchup and >>> continue without losing data? Thanks! >>> >>> Best, >>> Chengzhi >>> >>> >>> >> >