i ended up just going back to FlinkKafkaConsumer instead of the new FlinkSource
On Wed, Apr 13, 2022 at 3:01 AM Qingsheng Ren <renqs...@gmail.com> wrote: > Another solution would be setting the parallelism = #partitions, so that > one parallelism would be responsible for reading exactly one partition. > > Qingsheng > > > On Apr 13, 2022, at 17:52, Qingsheng Ren <renqs...@gmail.com> wrote: > > > > Hi Jin, > > > > Unfortunately I don’t have any quick bypass in mind except increasing > the tolerance of out of orderness. > > > > Best regards, > > > > Qingsheng > > > >> On Apr 8, 2022, at 18:12, Jin Yi <j...@promoted.ai> wrote: > >> > >> confirmed that moving back to FlinkKafkaConsumer fixes things. > >> > >> is there some notification channel/medium that highlights critical > bugs/issues on the intended features like this pretty readily? > >> > >> On Fri, Apr 8, 2022 at 2:18 AM Jin Yi <j...@promoted.ai> wrote: > >> based on symptoms/observations on the first operator (LogRequestFilter) > watermark and event timestamps, it does seem like it's the bug. things > track fine (timestamp > watermark) for the first batch of events, then the > event timestamps go back into the past and are "late". > >> > >> looks like the 1.14 backport just got in 11 days ago ( > https://github.com/apache/flink/pull/19128). is there a way to easily > test this fix locally? based on the threads, should i just move back to > FlinkKafkaConsumer until 1.14.5? > >> > >> On Fri, Apr 8, 2022 at 1:34 AM Qingsheng Ren <renqs...@gmail.com> > wrote: > >> Hi Jin, > >> > >> If you are using new FLIP-27 sources like KafkaSource, per-partition > watermark (or per-split watermark) is a default feature integrated in > SourceOperator. You might hit the bug described in FLINK-26018 [1], which > happens during the first fetch of the source that records in the first > split pushes the watermark far away, then records from other splits will be > treated as late events. > >> > >> [1] https://issues.apache.org/jira/browse/FLINK-26018 > >> > >> Best regards, > >> > >> Qingsheng > >> > >> > >>> On Apr 8, 2022, at 15:54, Jin Yi <j...@promoted.ai> wrote: > >>> > >>> how should the code look like to verify we're using per-partition > watermarks if we moved away from FlinkKafkaConsumer to KafkaSource in > 1.14.4? > >>> > >>> we currently have it looking like: > >>> > >>> streamExecutionEnvironment.fromSource( > >>> KafkaSource.<T>builder().....build(), > >>> watermarkStrategy, > >>> "whatever", > >>> typeInfo); > >>> > >>> when running this job with the streamExecutionEnviornment parallelism > set to 1, and the kafka source having 30 partitions, i'm seeing weird > behaviors where the first operator after this source consumes events out of > order (and therefore, violates watermarks). the operator simply checks to > see what "type" of event something is and uses side outputs to output the > type-specific messages. here's a snippet of the event timestamp going back > before the current watermark (first instance of going backwards in time in > bold): > >>> > >>> 2022-04-08 05:47:06,315 WARN > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter > [] - LogRequestFilter ts: 1649284267139 watermark: 1649284187140 > >>> 2022-04-08 05:47:06,315 WARN > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter > [] - LogRequestFilter ts: 1649284268138 watermark: 1649284187140 > >>> 2022-04-08 05:47:06,315 WARN > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter > [] - LogRequestFilter ts: 1649284269138 watermark: 1649284187140 > >>> 2022-04-08 05:47:06,315 WARN > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter > [] - LogRequestFilter ts: 1649284270139 watermark: 1649284187140 > >>> 2022-04-08 05:47:06,315 WARN > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter > [] - LogRequestFilter ts: 1649284271139 watermark: 1649284187140 > >>> 2022-04-08 05:47:06,315 WARN > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter > [] - LogRequestFilter ts: 1649284171037 watermark: 1649284187140 > >>> 2022-04-08 05:47:06,316 WARN > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter > [] - LogRequestFilter ts: 1649284172057 watermark: 1649284187140 > >>> 2022-04-08 05:47:06,316 WARN > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter > [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140 > >>> 2022-04-08 05:47:06,316 WARN > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter > [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140 > >>> 2022-04-08 05:47:06,316 WARN > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter > [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140 > >>> 2022-04-08 05:47:06,317 WARN > ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter > [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140 > >>> > >>> > >>> > >>> On Sat, Mar 19, 2022 at 10:51 AM Dan Hill <quietgol...@gmail.com> > wrote: > >>> I dove deeper. I wasn't actually using per-partition watermarks. > Thank you for the help! > >>> > >>> On Fri, Mar 18, 2022 at 12:11 PM Dan Hill <quietgol...@gmail.com> > wrote: > >>> Thanks, Thias and Dongwon. > >>> > >>> I'll keep debugging this with the idle watermark turned off. > >>> > >>> Next TODOs: > >>> - Verify that we’re using per-partition watermarks. Our code matches > the example but maybe something is disabling it. > >>> - Enable logging of partition-consumer assignment, to see if that is > the cause of the problem. > >>> - Look at adding flags to set the source parallelism to see if that > fixes the issue. > >>> > >>> Yes, I've seen Flink talks on creating our own watermarks through > Kafka. Sounds like a good idea. > >>> > >>> On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim <eastcirc...@gmail.com> > wrote: > >>> I totally agree with Schwalbe that per-partition watermarking allows # > source tasks < # kafka partitions. > >>> > >>> Otherwise, Dan, you should suspect other possibilities like what > Schwalbe said. > >>> > >>> Best, > >>> > >>> Dongwon > >>> > >>> On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias < > matthias.schwa...@viseca.ch> wrote: > >>> Hi San, Dongwon, > >>> > >>> > >>> > >>> I share the opinion that when per-partition watermarking is enabled, > you should observe correct behavior … would be interesting to see why it > does not work for you. > >>> > >>> > >>> > >>> I’d like to clear one tiny misconception here when you write: > >>> > >>> > >>> > >>>>> - The same issue happens even if I use an idle watermark. > >>> > >>> > >>> > >>> You would expect to see glitches with watermarking when you enable > idleness. > >>> > >>> Idleness sort of trades watermark correctness for reduces latency when > processing timers (much simplified). > >>> > >>> With idleness enabled you have no guaranties whatsoever as to the > quality of watermarks (which might be ok in some cases). > >>> > >>> BTW we dominantly use a mix of fast and slow sources (that only update > once a day) which hand-pimped watermarking and late event processing, and > enabling idleness would break everything. > >>> > >>> > >>> > >>> Oversight put aside things should work the way you implemented it. > >>> > >>> > >>> > >>> One thing I could imagine to be a cause is > >>> > >>> • that over time the kafka partitions get reassigned to > different consumer subtasks which would probably stress correct > recalculation of watermarks. Hence #partition == number subtask might > reduce the problem > >>> • can you enable logging of partition-consumer assignment, to see > if that is the cause of the problem > >>> • also involuntary restarts of the job can cause havoc as this > resets watermarking > >>> > >>> > >>> I’ll be off next week, unable to take part in the active discussion … > >>> > >>> > >>> > >>> Sincere greetings > >>> > >>> > >>> > >>> Thias > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> From: Dan Hill <quietgol...@gmail.com> > >>> Sent: Freitag, 18. März 2022 08:23 > >>> To: Dongwon Kim <eastcirc...@gmail.com> > >>> Cc: user <user@flink.apache.org> > >>> Subject: Re: Weird Flink Kafka source watermark behavior > >>> > >>> > >>> > >>> ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ > >>> > >>> > >>> > >>> I'll try forcing # source tasks = # partitions tomorrow. > >>> > >>> > >>> > >>> Thank you, Dongwon, for all of your help! > >>> > >>> > >>> > >>> On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim <eastcirc...@gmail.com> > wrote: > >>> > >>> I believe your job with per-partition watermarking should be working > okay even in a backfill scenario. > >>> > >>> > >>> > >>> BTW, is the problem still observed even with # sour tasks = # > partitions? > >>> > >>> > >>> > >>> For committers: > >>> > >>> Is there a way to confirm that per-partition watermarking is used in > TM log? > >>> > >>> > >>> > >>> On Fri, Mar 18, 2022 at 4:14 PM Dan Hill <quietgol...@gmail.com> > wrote: > >>> > >>> I hit this using event processing and no idleness detection. The same > issue happens if I enable idleness. > >>> > >>> > >>> > >>> My code matches the code example for per-partition watermarking. > >>> > >>> > >>> > >>> On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <eastcirc...@gmail.com> > wrote: > >>> > >>> Hi Dan, > >>> > >>> > >>> > >>> I'm quite confused as you already use per-partition watermarking. > >>> > >>> > >>> > >>> What I meant in the reply is > >>> > >>> - If you don't use per-partition watermarking, # tasks < # partitions > can cause the problem for backfill jobs. > >>> > >>> - If you don't use per-partition watermarking, # tasks = # partitions > is going to be okay even for backfill jobs. > >>> > >>> - If you use per-partition watermarking, # tasks < # partitions > shouldn't cause any problems unless you turn on the idleness detection. > >>> > >>> > >>> > >>> Regarding the idleness detection which is based on processing time, > what is your setting? If you set the value to 10 seconds for example, > you'll face the same problem unless the watermark of your backfill job > catches up real-time within 10 seconds. If you increase the value to 1 > minute, your backfill job should catch up real-time within 1 minute. > >>> > >>> > >>> > >>> Best, > >>> > >>> > >>> > >>> Dongwon > >>> > >>> > >>> > >>> > >>> > >>> On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <quietgol...@gmail.com> > wrote: > >>> > >>> Thanks Dongwon! > >>> > >>> > >>> > >>> Wow. Yes, I'm using per-partition watermarking [1]. Yes, my # source > tasks < # kafka partitions. This should be called out in the docs or the > bug should be fixed. > >>> > >>> > >>> > >>> On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <eastcirc...@gmail.com> > wrote: > >>> > >>> Hi Dan, > >>> > >>> > >>> > >>> Do you use the per-partition watermarking explained in [1]? > >>> > >>> I've also experienced a similar problem when running backfill jobs > specifically when # source tasks < # kafka partitions. > >>> > >>> - When # source tasks = # kafka partitions, the backfill job works as > expected. > >>> > >>> - When # source tasks < # kafka partitions, a Kafka consumer consumes > multiple partitions. This case can destroying the per-partition patterns as > explained in [2]. > >>> > >>> > >>> > >>> Hope this helps. > >>> > >>> > >>> > >>> p.s. If you plan to use the per-partition watermarking, be aware that > idleness detection [3] can cause another problem when you run a backfill > job. Kafka source tasks in a backfill job seem to read a batch of records > from Kafka and then wait for downstream tasks to catch up the progress, > which can be counted as idleness. > >>> > >>> > >>> > >>> [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie > >>> > >>> [2] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector > >>> > >>> [3] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources > >>> > >>> > >>> > >>> Best, > >>> > >>> > >>> > >>> Dongwon > >>> > >>> > >>> > >>> On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <quietgol...@gmail.com> > wrote: > >>> > >>> I'm following the example from this section: > >>> > >>> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector > >>> > >>> > >>> > >>> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <quietgol...@gmail.com> > wrote: > >>> > >>> Other points > >>> > >>> - I'm using the kafka timestamp as event time. > >>> > >>> - The same issue happens even if I use an idle watermark. > >>> > >>> > >>> > >>> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com> > wrote: > >>> > >>> There are 12 Kafka partitions (to keep the structure similar to other > low traffic environments). > >>> > >>> > >>> > >>> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com> > wrote: > >>> > >>> Hi. > >>> > >>> > >>> > >>> I'm running a backfill from a kafka topic with very few records spread > across a few days. I'm seeing a case where the records coming from a kafka > source have a watermark that's more recent (by hours) than the event time. > I haven't seen this before when running this. This violates what I'd > assume the kafka source would do. > >>> > >>> > >>> > >>> Example problem: > >>> > >>> 1. I have kafka records at ts=1000, 2000, ... 500000. The actual > times are separated by a longer time period. > >>> > >>> 2. My first operator after the FlinkKafkaConsumer sees: > >>> > >>> context.timestamp() = 1000 > >>> > >>> context.timerService().currentWatermark() = 500000 > >>> > >>> > >>> > >>> Details about how I'm running this: > >>> > >>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the > source. > >>> > >>> - I'm using FlinkKafkaConsumer > >>> > >>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s). No > idleness settings. > >>> > >>> - I'm running similar code in all the environments. The main > difference is low traffic. I have not been able to reproduce this out of > the environment. > >>> > >>> > >>> > >>> > >>> > >>> I put the following process function right after my kafka source. > >>> > >>> > >>> > >>> -------- > >>> > >>> > >>> AfterSource > >>> > >>> ts=1647274892728 > >>> watermark=1647575140007 > >>> record=... > >>> > >>> > >>> > >>> > >>> public static class TextLog extends ProcessFunction<Record, Record> { > >>> private final String label; > >>> public TextLogDeliveryLog(String label) { > >>> this.label = label; > >>> } > >>> @Override > >>> public void processElement(Record record, Context context, > Collector<Record> collector) throws Exception { > >>> LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}", > >>> label, context.timestamp(), > context.timerService().currentWatermark(), record); > >>> collector.collect(deliveryLog); > >>> } > >>> } > >>> > >>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > >>> > >>> This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. > >> > > > >