i ended up just going back to FlinkKafkaConsumer instead of the new
FlinkSource

On Wed, Apr 13, 2022 at 3:01 AM Qingsheng Ren <renqs...@gmail.com> wrote:

> Another solution would be setting the parallelism = #partitions, so that
> one parallelism would be responsible for reading exactly one partition.
>
> Qingsheng
>
> > On Apr 13, 2022, at 17:52, Qingsheng Ren <renqs...@gmail.com> wrote:
> >
> > Hi Jin,
> >
> > Unfortunately I don’t have any quick bypass in mind except increasing
> the tolerance of out of orderness.
> >
> > Best regards,
> >
> > Qingsheng
> >
> >> On Apr 8, 2022, at 18:12, Jin Yi <j...@promoted.ai> wrote:
> >>
> >> confirmed that moving back to FlinkKafkaConsumer fixes things.
> >>
> >> is there some notification channel/medium that highlights critical
> bugs/issues on the intended features like this pretty readily?
> >>
> >> On Fri, Apr 8, 2022 at 2:18 AM Jin Yi <j...@promoted.ai> wrote:
> >> based on symptoms/observations on the first operator (LogRequestFilter)
> watermark and event timestamps, it does seem like it's the bug.  things
> track fine (timestamp > watermark) for the first batch of events, then the
> event timestamps go back into the past and are "late".
> >>
> >> looks like the 1.14 backport just got in 11 days ago (
> https://github.com/apache/flink/pull/19128).  is there a way to easily
> test this fix locally?  based on the threads, should i just move back to
> FlinkKafkaConsumer until 1.14.5?
> >>
> >> On Fri, Apr 8, 2022 at 1:34 AM Qingsheng Ren <renqs...@gmail.com>
> wrote:
> >> Hi Jin,
> >>
> >> If you are using new FLIP-27 sources like KafkaSource, per-partition
> watermark (or per-split watermark) is a default feature integrated in
> SourceOperator. You might hit the bug described in FLINK-26018 [1], which
> happens during the first fetch of the source that records in the first
> split pushes the watermark far away, then records from other splits will be
> treated as late events.
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-26018
> >>
> >> Best regards,
> >>
> >> Qingsheng
> >>
> >>
> >>> On Apr 8, 2022, at 15:54, Jin Yi <j...@promoted.ai> wrote:
> >>>
> >>> how should the code look like to verify we're using per-partition
> watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in
> 1.14.4?
> >>>
> >>> we currently have it looking like:
> >>>
> >>> streamExecutionEnvironment.fromSource(
> >>>   KafkaSource.<T>builder().....build(),
> >>>   watermarkStrategy,
> >>>   "whatever",
> >>>   typeInfo);
> >>>
> >>> when running this job with the streamExecutionEnviornment parallelism
> set to 1, and the kafka source having 30 partitions, i'm seeing weird
> behaviors where the first operator after this source consumes events out of
> order (and therefore, violates watermarks).  the operator simply checks to
> see what "type" of event something is and uses side outputs to output the
> type-specific messages.  here's a snippet of the event timestamp going back
> before the current watermark (first instance of going backwards in time in
> bold):
> >>>
> >>> 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,315 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,317 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
> >>>
> >>>
> >>>
> >>> On Sat, Mar 19, 2022 at 10:51 AM Dan Hill <quietgol...@gmail.com>
> wrote:
> >>> I dove deeper.  I wasn't actually using per-partition watermarks.
> Thank you for the help!
> >>>
> >>> On Fri, Mar 18, 2022 at 12:11 PM Dan Hill <quietgol...@gmail.com>
> wrote:
> >>> Thanks, Thias and Dongwon.
> >>>
> >>> I'll keep debugging this with the idle watermark turned off.
> >>>
> >>> Next TODOs:
> >>> - Verify that we’re using per-partition watermarks.  Our code matches
> the example but maybe something is disabling it.
> >>> - Enable logging of partition-consumer assignment, to see if that is
> the cause of the problem.
> >>> - Look at adding flags to set the source parallelism to see if that
> fixes the issue.
> >>>
> >>> Yes, I've seen Flink talks on creating our own watermarks through
> Kafka.  Sounds like a good idea.
> >>>
> >>> On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim <eastcirc...@gmail.com>
> wrote:
> >>> I totally agree with Schwalbe that per-partition watermarking allows #
> source tasks < # kafka partitions.
> >>>
> >>> Otherwise, Dan, you should suspect other possibilities like what
> Schwalbe said.
> >>>
> >>> Best,
> >>>
> >>> Dongwon
> >>>
> >>> On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
> >>> Hi San, Dongwon,
> >>>
> >>>
> >>>
> >>> I share the opinion that when per-partition watermarking is enabled,
> you should observe correct behavior … would be interesting to see why it
> does not work for you.
> >>>
> >>>
> >>>
> >>> I’d like to clear one tiny misconception here when you write:
> >>>
> >>>
> >>>
> >>>>> - The same issue happens even if I use an idle watermark.
> >>>
> >>>
> >>>
> >>> You would expect to see glitches with watermarking when you enable
> idleness.
> >>>
> >>> Idleness sort of trades watermark correctness for reduces latency when
> processing timers (much simplified).
> >>>
> >>> With idleness enabled you have no guaranties whatsoever as to the
> quality of watermarks (which might be ok in some cases).
> >>>
> >>> BTW we dominantly use a mix of fast and slow sources (that only update
> once a day) which hand-pimped watermarking and late event processing, and
> enabling idleness would break everything.
> >>>
> >>>
> >>>
> >>> Oversight put aside things should work the way you implemented it.
> >>>
> >>>
> >>>
> >>> One thing I could imagine to be a cause is
> >>>
> >>>      • that over time the kafka partitions get reassigned  to
> different consumer subtasks which would probably stress correct
> recalculation of watermarks. Hence #partition == number subtask might
> reduce the problem
> >>>      • can you enable logging of partition-consumer assignment, to see
> if that is the cause of the problem
> >>>      • also involuntary restarts of the job can cause havoc as this
> resets watermarking
> >>>
> >>>
> >>> I’ll be off next week, unable to take part in the active discussion …
> >>>
> >>>
> >>>
> >>> Sincere greetings
> >>>
> >>>
> >>>
> >>> Thias
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> From: Dan Hill <quietgol...@gmail.com>
> >>> Sent: Freitag, 18. März 2022 08:23
> >>> To: Dongwon Kim <eastcirc...@gmail.com>
> >>> Cc: user <user@flink.apache.org>
> >>> Subject: Re: Weird Flink Kafka source watermark behavior
> >>>
> >>>
> >>>
> >>> ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
> >>>
> >>>
> >>>
> >>> I'll try forcing # source tasks = # partitions tomorrow.
> >>>
> >>>
> >>>
> >>> Thank you, Dongwon, for all of your help!
> >>>
> >>>
> >>>
> >>> On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim <eastcirc...@gmail.com>
> wrote:
> >>>
> >>> I believe your job with per-partition watermarking should be working
> okay even in a backfill scenario.
> >>>
> >>>
> >>>
> >>> BTW, is the problem still observed even with # sour tasks = #
> partitions?
> >>>
> >>>
> >>>
> >>> For committers:
> >>>
> >>> Is there a way to confirm that per-partition watermarking is used in
> TM log?
> >>>
> >>>
> >>>
> >>> On Fri, Mar 18, 2022 at 4:14 PM Dan Hill <quietgol...@gmail.com>
> wrote:
> >>>
> >>> I hit this using event processing and no idleness detection.  The same
> issue happens if I enable idleness.
> >>>
> >>>
> >>>
> >>> My code matches the code example for per-partition watermarking.
> >>>
> >>>
> >>>
> >>> On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <eastcirc...@gmail.com>
> wrote:
> >>>
> >>> Hi Dan,
> >>>
> >>>
> >>>
> >>> I'm quite confused as you already use per-partition watermarking.
> >>>
> >>>
> >>>
> >>> What I meant in the reply is
> >>>
> >>> - If you don't use per-partition watermarking, # tasks < # partitions
> can cause the problem for backfill jobs.
> >>>
> >>> - If you don't use per-partition watermarking, # tasks = # partitions
> is going to be okay even for backfill jobs.
> >>>
> >>> - If you use per-partition watermarking, # tasks < # partitions
> shouldn't cause any problems unless you turn on the idleness detection.
> >>>
> >>>
> >>>
> >>> Regarding the idleness detection which is based on processing time,
> what is your setting? If you set the value to 10 seconds for example,
> you'll face the same problem unless the watermark of your backfill job
> catches up real-time within 10 seconds. If you increase the value to 1
> minute, your backfill job should catch up real-time within 1 minute.
> >>>
> >>>
> >>>
> >>> Best,
> >>>
> >>>
> >>>
> >>> Dongwon
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <quietgol...@gmail.com>
> wrote:
> >>>
> >>> Thanks Dongwon!
> >>>
> >>>
> >>>
> >>> Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source
> tasks < # kafka partitions.  This should be called out in the docs or the
> bug should be fixed.
> >>>
> >>>
> >>>
> >>> On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <eastcirc...@gmail.com>
> wrote:
> >>>
> >>> Hi Dan,
> >>>
> >>>
> >>>
> >>> Do you use the per-partition watermarking explained in [1]?
> >>>
> >>> I've also experienced a similar problem when running backfill jobs
> specifically when # source tasks < # kafka partitions.
> >>>
> >>> - When # source tasks = # kafka partitions, the backfill job works as
> expected.
> >>>
> >>> - When # source tasks < # kafka partitions, a Kafka consumer consumes
> multiple partitions. This case can destroying the per-partition patterns as
> explained in [2].
> >>>
> >>>
> >>>
> >>> Hope this helps.
> >>>
> >>>
> >>>
> >>> p.s. If you plan to use the per-partition watermarking, be aware that
> idleness detection [3] can cause another problem when you run a backfill
> job. Kafka source tasks in a backfill job seem to read a batch of records
> from Kafka and then wait for downstream tasks to catch up the progress,
> which can be counted as idleness.
> >>>
> >>>
> >>>
> >>> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
> >>>
> >>> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
> >>>
> >>> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
> >>>
> >>>
> >>>
> >>> Best,
> >>>
> >>>
> >>>
> >>> Dongwon
> >>>
> >>>
> >>>
> >>> On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <quietgol...@gmail.com>
> wrote:
> >>>
> >>> I'm following the example from this section:
> >>>
> >>>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
> >>>
> >>>
> >>>
> >>> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <quietgol...@gmail.com>
> wrote:
> >>>
> >>> Other points
> >>>
> >>> - I'm using the kafka timestamp as event time.
> >>>
> >>> - The same issue happens even if I use an idle watermark.
> >>>
> >>>
> >>>
> >>> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com>
> wrote:
> >>>
> >>> There are 12 Kafka partitions (to keep the structure similar to other
> low traffic environments).
> >>>
> >>>
> >>>
> >>> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com>
> wrote:
> >>>
> >>> Hi.
> >>>
> >>>
> >>>
> >>> I'm running a backfill from a kafka topic with very few records spread
> across a few days.  I'm seeing a case where the records coming from a kafka
> source have a watermark that's more recent (by hours) than the event time.
> I haven't seen this before when running this.  This violates what I'd
> assume the kafka source would do.
> >>>
> >>>
> >>>
> >>> Example problem:
> >>>
> >>> 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual
> times are separated by a longer time period.
> >>>
> >>> 2.  My first operator after the FlinkKafkaConsumer sees:
> >>>
> >>>   context.timestamp() = 1000
> >>>
> >>>   context.timerService().currentWatermark() = 500000
> >>>
> >>>
> >>>
> >>> Details about how I'm running this:
> >>>
> >>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the
> source.
> >>>
> >>> - I'm using FlinkKafkaConsumer
> >>>
> >>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No
> idleness settings.
> >>>
> >>> - I'm running similar code in all the environments.  The main
> difference is low traffic.  I have not been able to reproduce this out of
> the environment.
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> I put the following process function right after my kafka source.
> >>>
> >>>
> >>>
> >>> --------
> >>>
> >>>
> >>> AfterSource
> >>>
> >>> ts=1647274892728
> >>> watermark=1647575140007
> >>> record=...
> >>>
> >>>
> >>>
> >>>
> >>> public static class TextLog extends ProcessFunction<Record, Record> {
> >>>    private final String label;
> >>>    public TextLogDeliveryLog(String label) {
> >>>        this.label = label;
> >>>    }
> >>>    @Override
> >>>    public void processElement(Record record, Context context,
> Collector<Record> collector) throws Exception {
> >>>        LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
> >>>                label, context.timestamp(),
> context.timerService().currentWatermark(), record);
> >>>        collector.collect(deliveryLog);
> >>>    }
> >>> }
> >>>
> >>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
> >>>
> >>> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
> >>
> >
>
>

Reply via email to