Thanks @Kyle Weaver<mailto:kcwea...@google.com> for filing the jira.

@Boyuan Zhang<mailto:boyu...@google.com> your understanding is correct. And as 
discussed with Kyle, adding “--experiments=use_deprecated_read” worked with 
2.24.

Here is the write part if you are interested. It’s basically saving parquet 
files. And I am using local fs for that.

data.apply(FileIO.write[GenericRecord]().withNumShards(10).via(ParquetIO.sink(schema)).to(path).withSuffix(".parquet"))

From: Boyuan Zhang <boyu...@google.com>
Reply-To: "user@beam.apache.org" <user@beam.apache.org>
Date: Tuesday, November 24, 2020 at 2:54 PM
To: "user@beam.apache.org" <user@beam.apache.org>
Subject: Re: Potential issue with the flink runner in streaming mode

Hi Tao,

I want to make sure that I understand your problem correctly. So within the 
2.25.0 version, you are not able to write the sink but within 2.24.0 you can do 
so. Do I understand correctly? Besides, could you please provide your pipeline 
with the write operation as well?

On Tue, Nov 24, 2020 at 2:50 PM Kyle Weaver 
<kcwea...@google.com<mailto:kcwea...@google.com>> wrote:
Yeah, it looks like a regression. I filed a JIRA issue to track this issue. 
https://issues.apache.org/jira/browse/BEAM-11341<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-11341&data=04%7C01%7Ctaol%40zillow.com%7Cd82167bc6d13460a3e0e08d890cbe6db%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637418552709471072%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=xjuSIBiVj4nzvjpU0rhN54IxAb9X3ewJwUpvZ8scjP8%3D&reserved=0>

On Tue, Nov 24, 2020 at 2:07 PM Tao Li 
<t...@zillow.com<mailto:t...@zillow.com>> wrote:
Yep it works with “--experiments=use_deprecated_read”. Is this a regression?

From: Kyle Weaver <kcwea...@google.com<mailto:kcwea...@google.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Tuesday, November 24, 2020 at 11:08 AM
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: Potential issue with the flink runner in streaming mode

I wonder if this issue is related to the migration to Splittable DoFn [1]. Can 
you try running your pipeline again with the option 
--experiments=use_deprecated_read?

[1] 
https://beam.apache.org/blog/beam-2.25.0/<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fblog%2Fbeam-2.25.0%2F&data=04%7C01%7Ctaol%40zillow.com%7Cd82167bc6d13460a3e0e08d890cbe6db%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637418552709471072%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=uzqaxy5mDagDNGpNz9O35MhameVtSJzUA%2FeKXy6kn9E%3D&reserved=0>

On Tue, Nov 24, 2020 at 10:19 AM Tao Li 
<t...@zillow.com<mailto:t...@zillow.com>> wrote:
Hi Beam community,

I am running into a problem with 
“org.apache.beam:beam-runners-flink-1.11:2.25.0” and 
“org.apache.beam:beam-runners-flink-1.10:2.25.0”. I am doing some local testing 
with the flink runners in embedded mode. The problem is that I cannot save data 
into local files using those artifact versions. However when I switched to 
“org.apache.beam:beam-runners-flink-1.10:2.24.0”, it worked fine and output 
files were saved successfully.

I am basically generating unbounded data in memory using GenerateSequence 
transform and saving it into local files. Here is the code that generates 
unlimited data in memory:

Pipeline.apply(GenerateSequence.from(0).withRate(1, new Duration(10)))
.apply(Window.into[java.lang.Long](FixedWindows.of(Duration.standardSeconds(1))))

I compared the logs and noticed that there is no write operation found in the 
logs with “beam-runners-flink-1.11:2.25.0” and 
“beam-runners-flink-1.10:2.25.0”. With the working version 
“beam-runners-flink-1.10:2.24.0”, I could find below logs that was obviously 
doing the write operation:

[FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous)
 -> 
FileIO.Write/WriteFiles/FinalizeTempFileBundles/Finalize/ParMultiDo(Finalize) 
-> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair 
with random key/ParMultiDo(AssignShard) (9/12)] INFO 
org.apache.beam.sdk.io.WriteFiles - Finalizing 1 file results
[FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous)
 -> 
FileIO.Write/WriteFiles/FinalizeTempFileBundles/Finalize/ParMultiDo(Finalize) 
-> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair 
with random key/ParMultiDo(AssignShard) (9/12)] INFO 
org.apache.beam.sdk.io.FileBasedSink - Will copy temporary file 
FileResult{tempFilename=/Users/taol/data/output/.temp-beam-819dbd7c-b9f7-4c8c-9d8b-20091d2eef94/010abb5e-92b0-4e95-a85d-30984e769fe2,
 shard=2, window=[2020-11-24T01:33:59.000Z..2020-11-24T01:34:00.000Z), 
paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, 
onTimeIndex=0}} to final location 
/Users/taol/data/output/output-2020-11-24T01:33:59.000Z-2020-11-24T01:34:00.000Z-00002-of-00010.parquet
[FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous)
 -> 
FileIO.Write/WriteFiles/FinalizeTempFileBundles/Finalize/ParMultiDo(Finalize) 
-> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair 
with random key/ParMultiDo(AssignShard) (9/12)] INFO 
org.apache.beam.sdk.io.FileBasedSink - Will remove known temporary file 
/Users/taol/data/output/.temp-beam-819dbd7c-b9f7-4c8c-9d8b-20091d2eef94/010abb5e-92b0-4e95-a85d-30984e769fe2



Is this a known issue with “beam-runners-flink-1.11:2.25.0” and 
“beam-runners-flink-1.10:2.25.0”? Can someone please take a look at this issue? 
Thanks so much!


Reply via email to