Hi Aljoscha,

By "checkpoints do not work" what I mean is ever since Flink 1.9.2 till
1.11.1 when using File source the source operator (guessing split
enumerator or metadata reader) finishes immediately after starting (and
assigning the splits to split readers) hence when first checkpoint is
triggered, it sees the state of the first operator i.e. source as finished
and hence does not do any checkpointing. Thats' what you can see in logs
and also on the Flink UI for checkpoints. It assumes that the pipeline is
about to finish shortly and aborts the checkpoint.

This along with the watermark generation problems kind of make it difficult
to use file source in production.


On Mon, Aug 24, 2020 at 4:01 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Arti,
>
> what exactly do you mean by "checkpoints do not work"? Are there
> exceptions being thrown? How are you writing your file-based sources,
> what API methods are you using?
>
> Best,
> Aljoscha
>
> On 20.08.20 16:21, Arti Pande wrote:
> > Hi Till,
> >
> > Thank you for your quick response. Both the
> AssignerWithPeriodicWatermarks
> > and WatermarkStrategy I am using are very simple ones.
> >
> > *Code for AssignerWithPeriodicWatermarks:*
> >
> > public class CustomEventTimeWatermarkGenerator implements
> > AssignerWithPeriodicWatermarks<MyPojo> {
> >
> >      private final long maxOutOfOrderness = 0;
> >      private long currentMaxTimestamp;
> >
> >      @Override
> >      public long extractTimestamp(MyPojo myPojo, long previousTimestamp)
> {
> >          long timestamp = myPojo.getInitiationTime().toEpochMilli();
> >          currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
> >          return timestamp;
> >      }
> >
> >      @Override
> >      public Watermark getCurrentWatermark() {
> >          return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
> >      }
> > }
> >
> >
> > *Code for WatermarkStrategy :*
> >
> > WatermarkStrategy<MyPojo> watermarkStrategy =
> >
> WatermarkStrategy.<MyPojo>forBoundedOutOfOrderness(Duration.ofMillis(0))
> >                  .withTimestampAssigner((event, timestamp) ->
> > event.getInitiationTime().toEpochMilli());
> >
> >
> > Thanks & regards,
> > Arti
> >
> >
> > On Thu, Aug 20, 2020 at 11:43 AM Till Rohrmann <trohrm...@apache.org>
> wrote:
> >
> >> Hi Arti,
> >>
> >> thanks for sharing this feedback with us. The WatermarkStrategy has been
> >> introduced quite recently and might have some rough edges. I am pulling
> in
> >> Aljoscha and Klou who have worked on this feature and might be able to
> help
> >> you. For better understanding your problem, it would be great if you
> could
> >> share the AssignerWithPeriodicWatermarks/WatermarkStrategy code with us.
> >>
> >> For the file source, the Flink community has recently introduced a new
> >> source abstraction which will also support checkpoints for file sources
> >> once the file source connector has been migrated to the new interfaces.
> The
> >> community is currently working on it.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Wed, Aug 19, 2020 at 5:38 PM Arti Pande <pande.a...@gmail.com>
> wrote:
> >>
> >>> Hi,
> >>>
> >>> When migrating Stream API based Flink application from 1.9.2 to 1.11.1
> >>> the watermark generation has issues with file source alone. It works
> well
> >>> with Kafka source.
> >>>
> >>> With 1.9.2 a custom watermark generator implementation of
> >>> AssignerWithPeriodicWatermarks was to be used. Starting 1.11.1 it is
> >>> deprecated and to be replaced with WatermarkStrategy (that combines
> both
> >>> WatermarkGenerator and TimestampAssigner).
> >>>
> >>> With Flink 1.11.1 when using Kafka source both the above options (i.e.
> >>> old  AssignerWithPeriodicWatermarks  and new WatermarkStrategy) work
> >>> perfectly well but with file source none of them works. The watermark
> >>> assigner never increments the watermarks resulting in stateful
> operators
> >>> not clearing their state ever, leading to erroneous results and
> >>> continuously increasing memory usage.
> >>>
> >>> Same code works well with Kafka source. Is this a known issue? If so,
> any
> >>> fix planned shortly?
> >>>
> >>> A side note (and probably a candidate for separate email, but I will
> >>> write it here) even checkpoints do not work with File Source since
> 1.9.2
> >>> and it is still the problem with 1.11.1. Just wondering if File source
> with
> >>> stream API is not a priority in Flink development? If so we can
> rethink our
> >>> sources.
> >>>
> >>> Thanks & regards,
> >>> Arti
> >>>
> >>
> >
>
>

Reply via email to