Thanks for reporting this issue, Tao. That's all I need for debugging this
issue and hopefully we could fix this issue soon.

On Wed, Nov 25, 2020 at 10:39 AM Tao Li <t...@zillow.com> wrote:

> It’s a steaming pipeline. I am using the below code to generate unbounded
> data source and beam is running in streaming mode:
>
> *Pipeline.apply(GenerateSequence.from(0).withRate(1, new
> Duration(10))).apply(Window.into[java.lang.Long](FixedWindows.of(Duration.standardSeconds(1))))*
>
>
>
>
>
> *From: *Boyuan Zhang <boyu...@google.com>
> *Date: *Tuesday, November 24, 2020 at 3:27 PM
> *To: *Tao Li <t...@zillow.com>
> *Cc: *"user@beam.apache.org" <user@beam.apache.org>, Kyle Weaver <
> kcwea...@google.com>
> *Subject: *Re: Potential issue with the flink runner in streaming mode
>
>
>
> And is it a batch pipeline or a streaming pipeline?
>
>
>
> On Tue, Nov 24, 2020 at 3:25 PM Tao Li <t...@zillow.com> wrote:
>
> Correction: And as discussed with Kyle, adding 
> “--experiments=use_deprecated_read”
> worked with 2.25.
>
>
>
>
>
> *From: *Tao Li <t...@zillow.com>
> *Date: *Tuesday, November 24, 2020 at 3:19 PM
> *To: *"user@beam.apache.org" <user@beam.apache.org>, Kyle Weaver <
> kcwea...@google.com>, Boyuan Zhang <boyu...@google.com>
> *Subject: *Re: Potential issue with the flink runner in streaming mode
>
>
>
> Thanks @Kyle Weaver <kcwea...@google.com> for filing the jira.
>
>
>
> @Boyuan Zhang <boyu...@google.com> your understanding is correct. And as
> discussed with Kyle, adding “--experiments=use_deprecated_read” worked
> with 2.24.
>
>
>
> Here is the write part if you are interested. It’s basically saving
> parquet files. And I am using local fs for that.
>
>
>
>
> *data.apply(FileIO.write[GenericRecord]().withNumShards(10).via(ParquetIO.sink(schema)).to(path).withSuffix(".parquet"))*
>
>
>
> *From: *Boyuan Zhang <boyu...@google.com>
> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
> *Date: *Tuesday, November 24, 2020 at 2:54 PM
> *To: *"user@beam.apache.org" <user@beam.apache.org>
> *Subject: *Re: Potential issue with the flink runner in streaming mode
>
>
>
> Hi Tao,
>
>
>
> I want to make sure that I understand your problem correctly. So within
> the 2.25.0 version, you are not able to write the sink but within 2.24.0
> you can do so. Do I understand correctly? Besides, could you please provide
> your pipeline with the write operation as well?
>
>
>
> On Tue, Nov 24, 2020 at 2:50 PM Kyle Weaver <kcwea...@google.com> wrote:
>
> Yeah, it looks like a regression. I filed a JIRA issue to track this
> issue. https://issues.apache.org/jira/browse/BEAM-11341
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11341&data=04%7C01%7Ctaol%40zillow.com%7C9ca9b56f7f3443ac421b08d890d093da%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637418572792396377%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=VFs1BgSsbyk4fadpkwsMUj%2BP5eNeV5x%2BtcNZmgJdO74%3D&reserved=0>
>
>
>
> On Tue, Nov 24, 2020 at 2:07 PM Tao Li <t...@zillow.com> wrote:
>
> Yep it works with “--experiments=use_deprecated_read”. Is this a
> regression?
>
>
>
> *From: *Kyle Weaver <kcwea...@google.com>
> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
> *Date: *Tuesday, November 24, 2020 at 11:08 AM
> *To: *"user@beam.apache.org" <user@beam.apache.org>
> *Subject: *Re: Potential issue with the flink runner in streaming mode
>
>
>
> I wonder if this issue is related to the migration to Splittable DoFn [1].
> Can you try running your pipeline again with the option
> --experiments=use_deprecated_read?
>
>
>
> [1] https://beam.apache.org/blog/beam-2.25.0/
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fblog%2Fbeam-2.25.0%2F&data=04%7C01%7Ctaol%40zillow.com%7C9ca9b56f7f3443ac421b08d890d093da%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637418572792406375%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=Osy8oO5Xe6ePXGORhz10B9rT2YHsYOm%2BMOgwLEasPys%3D&reserved=0>
>
>
>
> On Tue, Nov 24, 2020 at 10:19 AM Tao Li <t...@zillow.com> wrote:
>
> Hi Beam community,
>
>
>
> I am running into a problem with 
> “org.apache.beam:beam-runners-flink-1.11:2.25.0”
> and “org.apache.beam:beam-runners-flink-1.10:2.25.0”. I am doing some local
> testing with the flink runners in embedded mode. The problem is that I
> cannot save data into local files using those artifact versions. However
> when I switched to “org.apache.beam:beam-runners-flink-1.10:2.24.0”, it
> worked fine and output files were saved successfully.
>
>
>
> I am basically generating unbounded data in memory using GenerateSequence
> transform and saving it into local files. Here is the code that generates
> unlimited data in memory:
>
>
>
> *Pipeline.apply(GenerateSequence.from(0).withRate(1, new Duration(10)))*
>
>
> *.apply(Window.into[java.lang.Long](FixedWindows.of(Duration.standardSeconds(1))))*
>
>
>
> I compared the logs and noticed that there is no write operation found in
> the logs with “beam-runners-flink-1.11:2.25.0” and
> “beam-runners-flink-1.10:2.25.0”. With the working version
> “beam-runners-flink-1.10:2.24.0”, I could find below logs that was
> obviously doing the write operation:
>
>
>
> [FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous)
> ->
> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Finalize/ParMultiDo(Finalize)
> ->
> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair
> with random key/ParMultiDo(AssignShard) (9/12)] INFO
> org.apache.beam.sdk.io.WriteFiles - Finalizing 1 file results
>
> [FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous)
> ->
> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Finalize/ParMultiDo(Finalize)
> ->
> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair
> with random key/ParMultiDo(AssignShard) (9/12)] INFO
> org.apache.beam.sdk.io.FileBasedSink - Will copy temporary file
> FileResult{tempFilename=/Users/taol/data/output/.temp-beam-819dbd7c-b9f7-4c8c-9d8b-20091d2eef94/010abb5e-92b0-4e95-a85d-30984e769fe2,
> shard=2, window=[2020-11-24T01:33:59.000Z..2020-11-24T01:34:00.000Z),
> paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0,
> onTimeIndex=0}} to final location
> /Users/taol/data/output/output-2020-11-24T01:33:59.000Z-2020-11-24T01:34:00.000Z-00002-of-00010.parquet
>
> [FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous)
> ->
> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Finalize/ParMultiDo(Finalize)
> ->
> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair
> with random key/ParMultiDo(AssignShard) (9/12)] INFO
> org.apache.beam.sdk.io.FileBasedSink - Will remove known temporary file
> /Users/taol/data/output/.temp-beam-819dbd7c-b9f7-4c8c-9d8b-20091d2eef94/010abb5e-92b0-4e95-a85d-30984e769fe2
>
>
>
>
>
>
>
> Is this a known issue with “beam-runners-flink-1.11:2.25.0” and
> “beam-runners-flink-1.10:2.25.0”? Can someone please take a look at this
> issue? Thanks so much!
>
>
>
>
>
>

Reply via email to