Awesome, thanks!

On Mon, Jun 14, 2021 at 5:36 PM Evan Galpin <evan.gal...@gmail.com> wrote:
>
> I’ll try to create something as small as possible from the pipeline I 
> mentioned 👍 I should have time this week to do so.
>
> Thanks,
> Evan
>
> On Mon, Jun 14, 2021 at 18:09 Robert Bradshaw <rober...@google.com> wrote:
>>
>> Is it possible to post the code? (Or the code of a similar, but
>> minimal, pipeline that exhibits the same issues?)
>>
>> On Mon, Jun 14, 2021 at 2:15 PM Evan Galpin <evan.gal...@gmail.com> wrote:
>> >
>> > @robert I have a pipeline which consistently shows a major slowdown (10 
>> > seconds Vs 10 minutes) between version <=2.23.0 and >=2.25.0 that can be 
>> > boiled down to:
>> >
>> > - Read GCS file patterns from PubSub
>> > - Window into Fixed windows (repeating every 15 seconds)
>> > - Deduplicate/distinct (have tried both)
>> > - Read GCS blobs via patterns from the first step
>> > - Write file contents to sink
>> >
>> > It doesn't seem to matter if there are 0 messages in a subscription or 50k 
>> > messages at startup. The rate of new messages however is very low. Not 
>> > sure if those are helpful details, let me know if there's anything else 
>> > specific which would help.
>> >
>> > On Mon, Jun 14, 2021 at 12:44 PM Robert Bradshaw <rober...@google.com> 
>> > wrote:
>> >>
>> >> +1, we'd really like to get to the bottom of this, so clear
>> >> instructions on a pipeline/conditions that can reproduce it would be
>> >> great.
>> >>
>> >> On Mon, Jun 14, 2021 at 7:34 AM Jan Lukavský <je...@seznam.cz> wrote:
>> >> >
>> >> > Hi Eddy,
>> >> >
>> >> > you are probably hitting a not-yet discovered bug in SDF implementation 
>> >> > in FlinkRunner that (under some currently unknown conditions) seems to 
>> >> > stop advancing the watermark. This has been observed in one other 
>> >> > instance (that I'm aware of). I think we don't yet have a tracking JIRA 
>> >> > for that, would you mind filling it? It would be awesome if you could 
>> >> > include estimations of messages per sec throughput that causes the 
>> >> > issue in your case.
>> >> >
>> >> > +Tobias Kaymak
>> >> >
>> >> > Tobias, could you please confirm that the case you had with Flink 
>> >> > stopping progressing watermark resembled this one?
>> >> >
>> >> > Thanks.
>> >> >
>> >> >  Jan
>> >> >
>> >> > On 6/14/21 4:11 PM, Eddy G wrote:
>> >> >
>> >> > Hi Jan,
>> >> >
>> >> > I've added --experiments=use_deprecated_read and it seems to work 
>> >> > flawlessly (with my current Window and the one proposed by Evan).
>> >> >
>> >> > Why is this? Do Splittable DoFn now break current implementations? Are 
>> >> > there any posts of possible breaking changes?
>> >> >
>> >> > On 2021/06/14 13:19:39, Jan Lukavský <je...@seznam.cz> wrote:
>> >> >
>> >> > Hi Eddy,
>> >> >
>> >> > answers inline.
>> >> >
>> >> > On 6/14/21 3:05 PM, Eddy G wrote:
>> >> >
>> >> > Hi Jan,
>> >> >
>> >> > Thanks for replying so fast!
>> >> >
>> >> > Regarding your questions,
>> >> >
>> >> > - "Does your data get buffered in a state?"
>> >> > Yes, I do have a state within a stage prior ParquetIO writing together 
>> >> > with a Timer with PROCESSING_TIME.
>> >> >
>> >> > The stage which contains the state does send bytes to the next one 
>> >> > which is the ParquetIO writing. Seems the @OnTimer doesn't get 
>> >> > triggered and it's not clearing the state. This however does work under 
>> >> > normal circumstances without having too much data queued waiting to be 
>> >> > processed.
>> >> >
>> >> > OK, this suggests, that the watermark is for some reason "stuck". If you
>> >> > checkpoints enabled, you should see the size of the checkpoint to grow
>> >> > over time.
>> >> >
>> >> > - "Do you see watermark being updated in your Flink WebUI?"
>> >> > The stages that do have a watermark don't get updated. The same 
>> >> > watermark value has been constant since the pipeline started.
>> >> >
>> >> > If no lateness is set, any late data should be admitted right?
>> >> >
>> >> > If no lateness is set, it means allowed lateness of Duration.ZERO, which
>> >> > means that data that arrive after end-of-window will be dropped.
>> >> >
>> >> > Regarding 'droppedDueToLateness' metric, can't see it exposed anywhere, 
>> >> > neither in Flink UI or Prometheus. I've seen it in Dataflow but seems 
>> >> > to be a Dataflow specific metric right?
>> >> >
>> >> > Should not be Dataflow specific. But if you don't see it, it means it
>> >> > could be zero. So, we can rule this out.
>> >> >
>> >> > We're using KinesisIO for reading messages.
>> >> >
>> >> > Kinesis uses UnboundedSource, which is expended to SDF starting from
>> >> > Beam 2.25.0. The flag should change that as well. Can you try the
>> >> > --experiments=use_deprecated_read and see if you Pipeline DAG changes
>> >> > (should not contain Impulse transform at the beginning) and if it solves
>> >> > your issues?
>> >> >
>> >> > On 2021/06/14 12:48:58, Jan Lukavský <je...@seznam.cz> wrote:
>> >> >
>> >> > Hi Eddy,
>> >> >
>> >> > does your data get buffered in a state - e.g. does the size of the state
>> >> > grow over time? Do you see watermark being updated in your Flink WebUI?
>> >> > When a stateful operation (and GroupByKey is a stateful operation) does
>> >> > not output any data, the first place to look at is if watermark
>> >> > correctly progresses. If it does not progress, then the input data must
>> >> > be buffered in state and the size of the state should grow over time. If
>> >> > it progresses, then it might be the case, that the data is too late
>> >> > after the watermark (the watermark estimator might need tuning) and the
>> >> > data gets dropped (note you don't set any allowed lateness, which
>> >> > _might_ cause issues). You could see if your pipeline drops data in
>> >> > "droppedDueToLateness" metric. The size of you state would not grow much
>> >> > in that situation.
>> >> >
>> >> > Another hint - If you use KafkaIO, try to disable SDF wrapper for it
>> >> > using "--experiments=use_deprecated_read" on command line (which you
>> >> > then must pass to PipelineOptionsFactory). There is some suspicion that
>> >> > SDF wrapper for Kafka might not work as expected in certain situations
>> >> > with Flink.
>> >> >
>> >> > Please feel free to share any results,
>> >> >
>> >> >     Jan
>> >> >
>> >> > On 6/14/21 1:39 PM, Eddy G wrote:
>> >> >
>> >> > As seen in this image https://imgur.com/a/wrZET97, I'm trying to deal 
>> >> > with late data (intentionally stopped my consumer so data has been 
>> >> > accumulating for several days now). Now, with the following Window... 
>> >> > I'm using Beam 2.27 and Flink 1.12.
>> >> >
>> >> >                               
>> >> > Window.into(FixedWindows.of(Duration.standardMinutes(10)))
>> >> >
>> >> > And several parsing stages after, once it's time to write within the 
>> >> > ParquetIO stage...
>> >> >
>> >> >                               FileIO
>> >> >                                   .<String, MyClass>writeDynamic()
>> >> >                                   .by(...)
>> >> >                                   .via(...)
>> >> >                                   .to(...)
>> >> >                                   .withNaming(...)
>> >> >                                   
>> >> > .withDestinationCoder(StringUtf8Coder.of())
>> >> >                                   .withNumShards(options.getNumShards())
>> >> >
>> >> > it won't send bytes across all stages so no data is being written, 
>> >> > still it accumulates in the first stage seen in the image and won't go 
>> >> > further than that.
>> >> >
>> >> > Any reason why this may be happening? Wrong windowing strategy?

Reply via email to