I wonder if this issue is related to the migration to Splittable DoFn [1]. Can you try running your pipeline again with the option --experiments=use_deprecated_read?
[1] https://beam.apache.org/blog/beam-2.25.0/ On Tue, Nov 24, 2020 at 10:19 AM Tao Li <t...@zillow.com> wrote: > Hi Beam community, > > > > I am running into a problem with > “org.apache.beam:beam-runners-flink-1.11:2.25.0” > and “org.apache.beam:beam-runners-flink-1.10:2.25.0”. I am doing some local > testing with the flink runners in embedded mode. The problem is that I > cannot save data into local files using those artifact versions. However > when I switched to “org.apache.beam:beam-runners-flink-1.10:2.24.0”, it > worked fine and output files were saved successfully. > > > > I am basically generating unbounded data in memory using GenerateSequence > transform and saving it into local files. Here is the code that generates > unlimited data in memory: > > > > *Pipeline.apply(GenerateSequence.from(0).withRate(1, new Duration(10)))* > > > *.apply(Window.into[java.lang.Long](FixedWindows.of(Duration.standardSeconds(1))))* > > > > I compared the logs and noticed that there is no write operation found in > the logs with “beam-runners-flink-1.11:2.25.0” and > “beam-runners-flink-1.10:2.25.0”. With the working version > “beam-runners-flink-1.10:2.24.0”, I could find below logs that was > obviously doing the write operation: > > > > [FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous) > -> > FileIO.Write/WriteFiles/FinalizeTempFileBundles/Finalize/ParMultiDo(Finalize) > -> > FileIO.Write/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair > with random key/ParMultiDo(AssignShard) (9/12)] INFO > org.apache.beam.sdk.io.WriteFiles - Finalizing 1 file results > > [FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous) > -> > FileIO.Write/WriteFiles/FinalizeTempFileBundles/Finalize/ParMultiDo(Finalize) > -> > FileIO.Write/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair > with random key/ParMultiDo(AssignShard) (9/12)] INFO > org.apache.beam.sdk.io.FileBasedSink - Will copy temporary file > FileResult{tempFilename=/Users/taol/data/output/.temp-beam-819dbd7c-b9f7-4c8c-9d8b-20091d2eef94/010abb5e-92b0-4e95-a85d-30984e769fe2, > shard=2, window=[2020-11-24T01:33:59.000Z..2020-11-24T01:34:00.000Z), > paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, > onTimeIndex=0}} to final location > /Users/taol/data/output/output-2020-11-24T01:33:59.000Z-2020-11-24T01:34:00.000Z-00002-of-00010.parquet > > [FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous) > -> > FileIO.Write/WriteFiles/FinalizeTempFileBundles/Finalize/ParMultiDo(Finalize) > -> > FileIO.Write/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair > with random key/ParMultiDo(AssignShard) (9/12)] INFO > org.apache.beam.sdk.io.FileBasedSink - Will remove known temporary file > /Users/taol/data/output/.temp-beam-819dbd7c-b9f7-4c8c-9d8b-20091d2eef94/010abb5e-92b0-4e95-a85d-30984e769fe2 > > > > > > > > Is this a known issue with “beam-runners-flink-1.11:2.25.0” and > “beam-runners-flink-1.10:2.25.0”? Can someone please take a look at this > issue? Thanks so much! > > > > >