Hi Sandeep and community,

I'm a PMC member of the Beam community and also a contributor on Flink. I also used parquet in conjunction with Beam pipelines running with Flink Beam runner. So I should be able to help:

I don't think it is a Flink issue as you have a Beam pipeline so you use only Beam IOs that are wrapped by Beam runner inside a Flink operator at (Beam) translation time.

I don't think It is related to S3 either but much more to ParquetIO in Beam (read and write connector).

I'll check at the Beam side and get back to you.

Best

Etienne.

On 30/09/2021 14:42, Till Rohrmann wrote:
Hi Sandeep,

I am not a Beam expert. The problem might be caused by the used S3
filesystem implementation. Have you tried whether the same problem occurs
when using vanilla Flink's latest version? Alternatively, you could also
reach out to the Beam community or ask on Flink's user ML whether people
have experience with such a problem.

Some of the exceptions look as if your network is a bit flakey. You might
wanna look into the infrastructure you are running on.

Cheers,
Till

On Tue, Sep 14, 2021 at 5:22 PM Kathula, Sandeep
<sandeep_kath...@intuit.com.invalid> wrote:

Hi,
    We have a simple Beam application which reads from Kafka, converts to
parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We
have a fixed window of 5 minutes after conversion to
PCollection<GenericRecord> and then writing to S3. We have around 320
columns in our data. Our intention is to write large files of size 128MB or
more so that it won’t have a small file problem when reading back from
Hive. But from what we observed it is taking too much memory to write to S3
(giving memory of 8GB to heap is not enough to write 50 MB files and it is
going OOM). When I increase memory for heap to 32GB then it take lot of
time to write records to s3.
For instance it takes:

20 MB file - 30 sec
50 MB file - 1 min 16 sec
75 MB file - 2 min 15 sec
83 MB file - 2 min 40 sec

Code block to write to S3:
PCollection<GenericRecord> parquetRecord = ………………………….

parquetRecord.apply(FileIO.<GenericRecord>write()
                 .via(ParquetIO.sink(getOutput_schema()))
                 .to(outputPath.isEmpty() ? outputPath() : outputPath)
                 .withNumShards(5)
                 .withNaming(new CustomFileNaming("snappy.parquet")));


We are also getting different exceptions like:


   1.  UserCodeException:

Caused by: org.apache.beam.sdk.util.UserCodeException:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
             at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
             at
com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown
Source)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
             at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
             at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
             at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
             at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
             at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
             at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
             at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
             at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
             at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
             at
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
             at
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
             at
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processElement$0(JsonFlattener.java:36)
             at java.lang.Iterable.forEach(Iterable.java:75)
             at
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.processElement(JsonFlattener.java:34)
             at
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener$DoFnInvoker.invokeProcessElement(Unknown
Source)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
             at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
             at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
             at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
             at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
             at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
             at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
             at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
             at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
             at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
             at
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
             at
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
             at
com.intuit.data.platform.process.thrivev2.filter.ExtractRequiredJson.processElement(ExtractRequiredJson.java:67)
             at
com.intuit.data.platform.process.thrivev2.filter.ExtractRequiredJson$DoFnInvoker.invokeProcessElement(Unknown
Source)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
             at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
             at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
             at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
             at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
             at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
             at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
             at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
             at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
             at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
             at
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
             at
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
             at
com.intuit.data.platform.process.thrivev2.filter.Filter.filterElement(Filter.java:49)
             at
com.intuit.data.platform.process.thrivev2.filter.Filter$DoFnInvoker.invokeProcessElement(Unknown
Source)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
             at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
             at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
             at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
             at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
             at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
             at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
             at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
             at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
             at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
             at
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
             at
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73)
             at
org.apache.beam.sdk.transforms.Filter$1.processElement(Filter.java:211)
             at
org.apache.beam.sdk.transforms.Filter$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
             at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
             at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
             at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
             at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
             at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
             at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
             at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
             at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
             at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
             at
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
             at
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73)
             at
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
             at
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
Source)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
             at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
             at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
             at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
             at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
             at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
             at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
             at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
             at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
             at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
             at
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
             at
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
             at
com.intuit.strmprocess.sdk.core.ebio.EbExtractorDoFn.extract(EbExtractorDoFn.java:85)
             at
com.intuit.strmprocess.sdk.core.ebio.EbExtractorDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
             at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)






2.            Connection timed out:

ERROR o.a.f.s.c.o.a.c.ConnectionState - Connection timed out for
connection string (
internal-a49124072c9ca4429b037070c497dc28-234959464.us-west-2.elb.amazonaws.com:12181)
and timeout (15000) / elapsed (58732)
org.apache.flink.shaded.curator.org.apache.curator.CuratorConnectionLossException:
KeeperErrorCode = ConnectionLoss
             at
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:225)
             at
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:94)
             at
org.apache.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:117)
             at
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:835)
             at
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
             at
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
             at
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
             at java.util.concurrent.FutureTask.run(FutureTask.java:266)
             at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
             at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
             at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
             at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
             at java.lang.Thread.run(Thread.java:748)













3.            com.amazonaws.AbortedException

             Caused by: java.io.IOException: com.amazonaws.AbortedException:
             at
org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.flush(S3WritableByteChannel.java:153)
             at
org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.write(S3WritableByteChannel.java:127)
             at java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
             at java.nio.channels.Channels.writeFully(Channels.java:101)
             at java.nio.channels.Channels.access$000(Channels.java:61)
             at java.nio.channels.Channels$1.write(Channels.java:174)
             at org.apache.beam.sdk.io
.parquet.ParquetIO$Sink$BeamOutputStream.write(ParquetIO.java:452)
             at org.apache.beam.sdk.io
.parquet.ParquetIO$Sink$BeamOutputStream.write(ParquetIO.java:447)
             at
org.apache.parquet.bytes.ConcatenatingByteArrayCollector.writeAllTo(ConcatenatingByteArrayCollector.java:46)
             at
org.apache.parquet.hadoop.ParquetFileWriter.writeDataPages(ParquetFileWriter.java:460)
             at
org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:201)
             at
org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:261)
             at
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:173)
             at
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
             at
org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:308)
             at org.apache.beam.sdk.io
.parquet.ParquetIO$Sink.flush(ParquetIO.java:394)
             at
org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink$1$1.finishWrite(FileIO.java:1400)
             at org.apache.beam.sdk.io
.FileBasedSink$Writer.close(FileBasedSink.java:1006)
             at org.apache.beam.sdk.io
.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:780)








4.            Connection unexpectedly closed by remote task manager:
  WARN  o.a.flink.runtime.taskmanager.Task -
FileIO.Write/WriteFiles/GatherTempFileResults/Drop
key/Values/Map/ParMultiDo(Anonymous) ->
FileIO.Write/WriteFiles/GatherTempFileResults/Gather
bundles/ParMultiDo(GatherBundlesPerWindow) ->
FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Pair
with random key/ParMultiDo(AssignShard) (5/5)#0
(b1f59ef12b569d904c28de21a4087655) switched from RUNNING to FAILED.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connection unexpectedly closed by remote task manager '
10.35.134.92/10.35.134.92:33413'. This might indicate that the remote
task manager was lost.






5.            Checkpoints are failing with IOExceptions: After a few
restarts checkpoints start failing with IOExceptions.

Caused by: java.io.IOException: Interrupted while waiting for buffer
             at org.apache.flink.runtime.io
.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:341)
             at org.apache.flink.runtime.io
.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:313)
             at org.apache.flink.runtime.io
.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:257)
             at org.apache.flink.runtime.io
.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:149)
             at org.apache.flink.runtime.io
.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
             at org.apache.flink.runtime.io
.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
             at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
             ... 176 common frames omitted





        Just wanted to know if anyone has experienced these kind of issues
and how we can solve these.



        Thanks,
         Sandeep





Reply via email to