Hi Sandeep, I am not a Beam expert. The problem might be caused by the used S3 filesystem implementation. Have you tried whether the same problem occurs when using vanilla Flink's latest version? Alternatively, you could also reach out to the Beam community or ask on Flink's user ML whether people have experience with such a problem.
Some of the exceptions look as if your network is a bit flakey. You might wanna look into the infrastructure you are running on. Cheers, Till On Tue, Sep 14, 2021 at 5:22 PM Kathula, Sandeep <sandeep_kath...@intuit.com.invalid> wrote: > Hi, > We have a simple Beam application which reads from Kafka, converts to > parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We > have a fixed window of 5 minutes after conversion to > PCollection<GenericRecord> and then writing to S3. We have around 320 > columns in our data. Our intention is to write large files of size 128MB or > more so that it won’t have a small file problem when reading back from > Hive. But from what we observed it is taking too much memory to write to S3 > (giving memory of 8GB to heap is not enough to write 50 MB files and it is > going OOM). When I increase memory for heap to 32GB then it take lot of > time to write records to s3. > For instance it takes: > > 20 MB file - 30 sec > 50 MB file - 1 min 16 sec > 75 MB file - 2 min 15 sec > 83 MB file - 2 min 40 sec > > Code block to write to S3: > PCollection<GenericRecord> parquetRecord = …………………………. > > parquetRecord.apply(FileIO.<GenericRecord>write() > .via(ParquetIO.sink(getOutput_schema())) > .to(outputPath.isEmpty() ? outputPath() : outputPath) > .withNumShards(5) > .withNaming(new CustomFileNaming("snappy.parquet"))); > > > We are also getting different exceptions like: > > > 1. UserCodeException: > > Caused by: org.apache.beam.sdk.util.UserCodeException: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) > at > com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) > at > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) > at > org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) > at > org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) > at > com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processElement$0(JsonFlattener.java:36) > at java.lang.Iterable.forEach(Iterable.java:75) > at > com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.processElement(JsonFlattener.java:34) > at > com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) > at > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) > at > org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) > at > org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) > at > com.intuit.data.platform.process.thrivev2.filter.ExtractRequiredJson.processElement(ExtractRequiredJson.java:67) > at > com.intuit.data.platform.process.thrivev2.filter.ExtractRequiredJson$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) > at > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) > at > org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) > at > org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) > at > com.intuit.data.platform.process.thrivev2.filter.Filter.filterElement(Filter.java:49) > at > com.intuit.data.platform.process.thrivev2.filter.Filter$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) > at > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) > at > org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) > at > org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) > at > org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73) > at > org.apache.beam.sdk.transforms.Filter$1.processElement(Filter.java:211) > at > org.apache.beam.sdk.transforms.Filter$1$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) > at > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) > at > org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) > at > org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) > at > org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73) > at > org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139) > at > org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) > at > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) > at > org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) > at > org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) > at > org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) > at > com.intuit.strmprocess.sdk.core.ebio.EbExtractorDoFn.extract(EbExtractorDoFn.java:85) > at > com.intuit.strmprocess.sdk.core.ebio.EbExtractorDoFn$DoFnInvoker.invokeProcessElement(Unknown > Source) > at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) > > > > > > > 2. Connection timed out: > > ERROR o.a.f.s.c.o.a.c.ConnectionState - Connection timed out for > connection string ( > internal-a49124072c9ca4429b037070c497dc28-234959464.us-west-2.elb.amazonaws.com:12181) > and timeout (15000) / elapsed (58732) > org.apache.flink.shaded.curator.org.apache.curator.CuratorConnectionLossException: > KeeperErrorCode = ConnectionLoss > at > org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:225) > at > org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:94) > at > org.apache.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:117) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:835) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64) > at > org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > > > > > > > > > > > > > 3. com.amazonaws.AbortedException > > Caused by: java.io.IOException: com.amazonaws.AbortedException: > at > org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.flush(S3WritableByteChannel.java:153) > at > org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.write(S3WritableByteChannel.java:127) > at java.nio.channels.Channels.writeFullyImpl(Channels.java:78) > at java.nio.channels.Channels.writeFully(Channels.java:101) > at java.nio.channels.Channels.access$000(Channels.java:61) > at java.nio.channels.Channels$1.write(Channels.java:174) > at org.apache.beam.sdk.io > .parquet.ParquetIO$Sink$BeamOutputStream.write(ParquetIO.java:452) > at org.apache.beam.sdk.io > .parquet.ParquetIO$Sink$BeamOutputStream.write(ParquetIO.java:447) > at > org.apache.parquet.bytes.ConcatenatingByteArrayCollector.writeAllTo(ConcatenatingByteArrayCollector.java:46) > at > org.apache.parquet.hadoop.ParquetFileWriter.writeDataPages(ParquetFileWriter.java:460) > at > org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:201) > at > org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:261) > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:173) > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114) > at > org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:308) > at org.apache.beam.sdk.io > .parquet.ParquetIO$Sink.flush(ParquetIO.java:394) > at > org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink$1$1.finishWrite(FileIO.java:1400) > at org.apache.beam.sdk.io > .FileBasedSink$Writer.close(FileBasedSink.java:1006) > at org.apache.beam.sdk.io > .WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:780) > > > > > > > > > 4. Connection unexpectedly closed by remote task manager: > WARN o.a.flink.runtime.taskmanager.Task - > FileIO.Write/WriteFiles/GatherTempFileResults/Drop > key/Values/Map/ParMultiDo(Anonymous) -> > FileIO.Write/WriteFiles/GatherTempFileResults/Gather > bundles/ParMultiDo(GatherBundlesPerWindow) -> > FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Pair > with random key/ParMultiDo(AssignShard) (5/5)#0 > (b1f59ef12b569d904c28de21a4087655) switched from RUNNING to FAILED. > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Connection unexpectedly closed by remote task manager ' > 10.35.134.92/10.35.134.92:33413'. This might indicate that the remote > task manager was lost. > > > > > > > 5. Checkpoints are failing with IOExceptions: After a few > restarts checkpoints start failing with IOExceptions. > > Caused by: java.io.IOException: Interrupted while waiting for buffer > at org.apache.flink.runtime.io > .network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:341) > at org.apache.flink.runtime.io > .network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:313) > at org.apache.flink.runtime.io > .network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:257) > at org.apache.flink.runtime.io > .network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:149) > at org.apache.flink.runtime.io > .network.api.writer.RecordWriter.emit(RecordWriter.java:104) > at org.apache.flink.runtime.io > .network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101) > ... 176 common frames omitted > > > > > > Just wanted to know if anyone has experienced these kind of issues > and how we can solve these. > > > > Thanks, > Sandeep > > > > >