Hi Tao, I want to make sure that I understand your problem correctly. So within the 2.25.0 version, you are not able to write the sink but within 2.24.0 you can do so. Do I understand correctly? Besides, could you please provide your pipeline with the write operation as well?
On Tue, Nov 24, 2020 at 2:50 PM Kyle Weaver <kcwea...@google.com> wrote: > Yeah, it looks like a regression. I filed a JIRA issue to track this > issue. https://issues.apache.org/jira/browse/BEAM-11341 > > On Tue, Nov 24, 2020 at 2:07 PM Tao Li <t...@zillow.com> wrote: > >> Yep it works with “--experiments=use_deprecated_read”. Is this a >> regression? >> >> >> >> *From: *Kyle Weaver <kcwea...@google.com> >> *Reply-To: *"user@beam.apache.org" <user@beam.apache.org> >> *Date: *Tuesday, November 24, 2020 at 11:08 AM >> *To: *"user@beam.apache.org" <user@beam.apache.org> >> *Subject: *Re: Potential issue with the flink runner in streaming mode >> >> >> >> I wonder if this issue is related to the migration to Splittable DoFn >> [1]. Can you try running your pipeline again with the option >> --experiments=use_deprecated_read? >> >> >> >> [1] https://beam.apache.org/blog/beam-2.25.0/ >> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fblog%2Fbeam-2.25.0%2F&data=04%7C01%7Ctaol%40zillow.com%7Ce3981620f5bf453508ca08d890ac64b0%7C033464830d1840e7a5883784ac50e16f%7C0%7C1%7C637418417386743362%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=7DZzSHib68xdOtotID7GA%2B1SJsVRJda2rEjR3TV%2F8JU%3D&reserved=0> >> >> >> >> On Tue, Nov 24, 2020 at 10:19 AM Tao Li <t...@zillow.com> wrote: >> >> Hi Beam community, >> >> >> >> I am running into a problem with >> “org.apache.beam:beam-runners-flink-1.11:2.25.0” >> and “org.apache.beam:beam-runners-flink-1.10:2.25.0”. I am doing some local >> testing with the flink runners in embedded mode. The problem is that I >> cannot save data into local files using those artifact versions. However >> when I switched to “org.apache.beam:beam-runners-flink-1.10:2.24.0”, it >> worked fine and output files were saved successfully. >> >> >> >> I am basically generating unbounded data in memory using GenerateSequence >> transform and saving it into local files. Here is the code that >> generates unlimited data in memory: >> >> >> >> *Pipeline.apply(GenerateSequence.from(0).withRate(1, new Duration(10)))* >> >> >> *.apply(Window.into[java.lang.Long](FixedWindows.of(Duration.standardSeconds(1))))* >> >> >> >> I compared the logs and noticed that there is no write operation found in >> the logs with “beam-runners-flink-1.11:2.25.0” and >> “beam-runners-flink-1.10:2.25.0”. With the working version >> “beam-runners-flink-1.10:2.24.0”, I could find below logs that was >> obviously doing the write operation: >> >> >> >> [FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous) >> -> >> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Finalize/ParMultiDo(Finalize) >> -> >> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair >> with random key/ParMultiDo(AssignShard) (9/12)] INFO >> org.apache.beam.sdk.io.WriteFiles - Finalizing 1 file results >> >> [FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous) >> -> >> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Finalize/ParMultiDo(Finalize) >> -> >> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair >> with random key/ParMultiDo(AssignShard) (9/12)] INFO >> org.apache.beam.sdk.io.FileBasedSink - Will copy temporary file >> FileResult{tempFilename=/Users/taol/data/output/.temp-beam-819dbd7c-b9f7-4c8c-9d8b-20091d2eef94/010abb5e-92b0-4e95-a85d-30984e769fe2, >> shard=2, window=[2020-11-24T01:33:59.000Z..2020-11-24T01:34:00.000Z), >> paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, >> onTimeIndex=0}} to final location >> /Users/taol/data/output/output-2020-11-24T01:33:59.000Z-2020-11-24T01:34:00.000Z-00002-of-00010.parquet >> >> [FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous) >> -> >> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Finalize/ParMultiDo(Finalize) >> -> >> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair >> with random key/ParMultiDo(AssignShard) (9/12)] INFO >> org.apache.beam.sdk.io.FileBasedSink - Will remove known temporary file >> /Users/taol/data/output/.temp-beam-819dbd7c-b9f7-4c8c-9d8b-20091d2eef94/010abb5e-92b0-4e95-a85d-30984e769fe2 >> >> >> >> >> >> >> >> Is this a known issue with “beam-runners-flink-1.11:2.25.0” and >> “beam-runners-flink-1.10:2.25.0”? Can someone please take a look at this >> issue? Thanks so much! >> >> >> >> >> >>