Thanks David! This saved me quite some time.

Aljoscha

On 09.09.20 19:58, David Anderson wrote:
Arti,

The problem with watermarks and the File source operator will be fixed in
1.11.2 [1]. This bug was introduced in 1.10.0, and isn't related to the new
WatermarkStrategy api.

[1] https://issues.apache.org/jira/browse/FLINK-19109

David

On Wed, Sep 9, 2020 at 2:52 PM Arti Pande <pande.a...@gmail.com> wrote:

Hi Aljoscha,

By "checkpoints do not work" what I mean is ever since Flink 1.9.2 till
1.11.1 when using File source the source operator (guessing split
enumerator or metadata reader) finishes immediately after starting (and
assigning the splits to split readers) hence when first checkpoint is
triggered, it sees the state of the first operator i.e. source as finished
and hence does not do any checkpointing. Thats' what you can see in logs
and also on the Flink UI for checkpoints. It assumes that the pipeline is
about to finish shortly and aborts the checkpoint.

This along with the watermark generation problems kind of make it
difficult to use file source in production.


On Mon, Aug 24, 2020 at 4:01 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

Hi Arti,

what exactly do you mean by "checkpoints do not work"? Are there
exceptions being thrown? How are you writing your file-based sources,
what API methods are you using?

Best,
Aljoscha

On 20.08.20 16:21, Arti Pande wrote:
Hi Till,

Thank you for your quick response. Both the
AssignerWithPeriodicWatermarks
and WatermarkStrategy I am using are very simple ones.

*Code for AssignerWithPeriodicWatermarks:*

public class CustomEventTimeWatermarkGenerator implements
AssignerWithPeriodicWatermarks<MyPojo> {

      private final long maxOutOfOrderness = 0;
      private long currentMaxTimestamp;

      @Override
      public long extractTimestamp(MyPojo myPojo, long
previousTimestamp) {
          long timestamp = myPojo.getInitiationTime().toEpochMilli();
          currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
          return timestamp;
      }

      @Override
      public Watermark getCurrentWatermark() {
          return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
      }
}


*Code for WatermarkStrategy :*

WatermarkStrategy<MyPojo> watermarkStrategy =

WatermarkStrategy.<MyPojo>forBoundedOutOfOrderness(Duration.ofMillis(0))
                  .withTimestampAssigner((event, timestamp) ->
event.getInitiationTime().toEpochMilli());


Thanks & regards,
Arti


On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann <trohrm...@apache.org>
wrote:

Hi Arti,

thanks for sharing this feedback with us. The WatermarkStrategy has
been
introduced quite recently and might have some rough edges. I am
pulling in
Aljoscha and Klou who have worked on this feature and might be able to
help
you. For better understanding your problem, it would be great if you
could
share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with
us.

For the file source, the Flink community has recently introduced a new
source abstraction which will also support checkpoints for file sources
once the file source connector has been migrated to the new
interfaces. The
community is currently working on it.

Cheers,
Till

On Wed, Aug 19, 2020 at 5:38 PM Arti Pande <pande.a...@gmail.com>
wrote:

Hi,

When migrating Stream API based Flink application from 1.9.2 to 1.11.1
the watermark generation has issues with file source alone. It works
well
with Kafka source.

With 1.9.2 a custom watermark generator implementation of
AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is
deprecated and to be replaced with WatermarkStrategy (that combines
both
WatermarkGenerator and TimestampAssigner).

With Flink 1.11.1 when using Kafka source both the above options (i.e.
old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work
perfectly well but with file source none of them works. The watermark
assigner never increments the watermarks resulting in stateful
operators
not clearing their state ever, leading to erroneous results and
continuously increasing memory usage.

Same code works well with Kafka source. Is this a known issue? If so,
any
fix planned shortly?

A side note (and probably a candidate for separate email, but I will
write it here) even checkpoints do not work with File Source since
1.9.2
and it is still the problem with 1.11.1. Just wondering if File
source with
stream API is not a priority in Flink development? If so we can
rethink our
sources.

Thanks & regards,
Arti







Reply via email to