Hi Jan,

          Thanks for the reply. To answer your questions:


  1.  We are using RocksDB as backend.
  2.  We are using 10 minutes checkpointing interval.
  3.  We are getting 5,000 records per second at max each with size of around 
5KB from Kafka (25 MB/sec) which we are trying to write to S3. But as we are 
writing to S3 in parquet format 5 files once for every 5 minutes, its 
compressed and we estimate each file size to be around 100-150 MB in size.


           We even tried with 6 pods each with 4 CPU and 64GB of memory (32 GB 
going to off heap for RocksDB) but still not able to write bigger files.


Thanks,
Sandeep

From: Jan Lukavský <je...@seznam.cz>
Date: Tuesday, September 14, 2021 at 10:47 AM
To: "u...@beam.apache.org" <u...@beam.apache.org>
Cc: user <user@flink.apache.org>
Subject: Re: Beam with Flink runner - Issues when writing to S3 in Parquet 
Format

This email is from an external sender.


Hi Sandeep,
a few questions:
 a) which state backend do you use for Flink?
 b) what is your checkpointingInterval set for FlinkRunner?
 c) how much data is there in your input Kafka topic(s)?

FileIO has to buffer all elements per window (by default) into state, so this 
might create a high pressure on state backend and/or heap, which could result 
in suboptimal performance. Due to the "connection loss" and timeout exceptions 
you describe I'd suppose there might be a lot of GC pressure.

 Jan
On 9/14/21 5:20 PM, Kathula, Sandeep wrote:
Hi,
   We have a simple Beam application which reads from Kafka, converts to 
parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We have a 
fixed window of 5 minutes after conversion to PCollection<GenericRecord> and 
then writing to S3. We have around 320 columns in our data. Our intention is to 
write large files of size 128MB or more so that it won’t have a small file 
problem when reading back from Hive. But from what we observed it is taking too 
much memory to write to S3 (giving memory of 8GB to heap is not enough to write 
50 MB files and it is going OOM). When I increase memory for heap to 32GB then 
it take lot of time to write records to s3.
For instance it takes:

20 MB file - 30 sec
50 MB file - 1 min 16 sec
75 MB file - 2 min 15 sec
83 MB file - 2 min 40 sec

Code block to write to S3:
PCollection<GenericRecord> parquetRecord = ………………………….

parquetRecord.apply(FileIO.<GenericRecord>write()
                .via(ParquetIO.sink(getOutput_schema()))
                .to(outputPath.isEmpty() ? outputPath() : outputPath)
                .withNumShards(5)
                .withNaming(new CustomFileNaming("snappy.parquet")));


We are also getting different exceptions like:


  1.  UserCodeException:

Caused by: org.apache.beam.sdk.util.UserCodeException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
            at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
            at 
com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown
 Source)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
            at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
            at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
            at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
            at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
            at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
            at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
            at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
            at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
            at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
            at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processElement$0(JsonFlattener.java:36)
            at java.lang.Iterable.forEach(Iterable.java:75)
            at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.processElement(JsonFlattener.java:34)
            at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener$DoFnInvoker.invokeProcessElement(Unknown
 Source)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
            at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
            at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
            at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
            at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
            at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
            at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
            at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
            at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
            at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
            at 
com.intuit.data.platform.process.thrivev2.filter.ExtractRequiredJson.processElement(ExtractRequiredJson.java:67)
            at 
com.intuit.data.platform.process.thrivev2.filter.ExtractRequiredJson$DoFnInvoker.invokeProcessElement(Unknown
 Source)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
            at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
            at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
            at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
            at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
            at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
            at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
            at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
            at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
            at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
            at 
com.intuit.data.platform.process.thrivev2.filter.Filter.filterElement(Filter.java:49)
            at 
com.intuit.data.platform.process.thrivev2.filter.Filter$DoFnInvoker.invokeProcessElement(Unknown
 Source)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
            at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
            at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
            at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
            at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
            at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
            at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
            at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
            at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
            at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
            at 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73)
            at 
org.apache.beam.sdk.transforms.Filter$1.processElement(Filter.java:211)
            at 
org.apache.beam.sdk.transforms.Filter$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
            at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
            at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
            at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
            at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
            at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
            at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
            at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
            at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
            at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
            at 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:73)
            at 
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)
            at 
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
 Source)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
            at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
            at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
            at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
            at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
            at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
            at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
            at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
            at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
            at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
            at 
com.intuit.strmprocess.sdk.core.ebio.EbExtractorDoFn.extract(EbExtractorDoFn.java:85)
            at 
com.intuit.strmprocess.sdk.core.ebio.EbExtractorDoFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
            at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)







2.            Connection timed out:

ERROR o.a.f.s.c.o.a.c.ConnectionState - Connection timed out for connection 
string 
(internal-a49124072c9ca4429b037070c497dc28-234959464.us-west-2.elb.amazonaws.com:12181)
 and timeout (15000) / elapsed (58732)
org.apache.flink.shaded.curator.org.apache.curator.CuratorConnectionLossException:
 KeeperErrorCode = ConnectionLoss
            at 
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:225)
            at 
org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:94)
            at 
org.apache.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:117)
            at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:835)
            at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
            at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
            at 
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
            at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)














3.            com.amazonaws.AbortedException

            Caused by: java.io.IOException: com.amazonaws.AbortedException:
            at 
org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.flush(S3WritableByteChannel.java:153)
            at 
org.apache.beam.sdk.io.aws.s3.S3WritableByteChannel.write(S3WritableByteChannel.java:127)
            at java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
            at java.nio.channels.Channels.writeFully(Channels.java:101)
            at java.nio.channels.Channels.access$000(Channels.java:61)
            at java.nio.channels.Channels$1.write(Channels.java:174)
            at 
org.apache.beam.sdk.io.parquet.ParquetIO$Sink$BeamOutputStream.write(ParquetIO.java:452)
            at 
org.apache.beam.sdk.io.parquet.ParquetIO$Sink$BeamOutputStream.write(ParquetIO.java:447)
            at 
org.apache.parquet.bytes.ConcatenatingByteArrayCollector.writeAllTo(ConcatenatingByteArrayCollector.java:46)
            at 
org.apache.parquet.hadoop.ParquetFileWriter.writeDataPages(ParquetFileWriter.java:460)
            at 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:201)
            at 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:261)
            at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:173)
            at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
            at 
org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:308)
            at 
org.apache.beam.sdk.io.parquet.ParquetIO$Sink.flush(ParquetIO.java:394)
            at 
org.apache.beam.sdk.io.FileIO$Write$ViaFileBasedSink$1$1.finishWrite(FileIO.java:1400)
            at 
org.apache.beam.sdk.io.FileBasedSink$Writer.close(FileBasedSink.java:1006)
            at 
org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:780)









4.            Connection unexpectedly closed by remote task manager:
 WARN  o.a.flink.runtime.taskmanager.Task - 
FileIO.Write/WriteFiles/GatherTempFileResults/Drop 
key/Values/Map/ParMultiDo(Anonymous) -> 
FileIO.Write/WriteFiles/GatherTempFileResults/Gather 
bundles/ParMultiDo(GatherBundlesPerWindow) -> 
FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Pair with 
random key/ParMultiDo(AssignShard) (5/5)#0 (b1f59ef12b569d904c28de21a4087655) 
switched from RUNNING to FAILED.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'10.35.134.92/10.35.134.92:33413'. This might indicate that the remote task 
manager was lost.







5.            Checkpoints are failing with IOExceptions: After a few restarts 
checkpoints start failing with IOExceptions.

Caused by: java.io.IOException: Interrupted while waiting for buffer
            at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:341)
            at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:313)
            at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:257)
            at 
org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:149)
            at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
            at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
            at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
            ... 176 common frames omitted





       Just wanted to know if anyone has experienced these kind of issues and 
how we can solve these.



       Thanks,
        Sandeep

Reply via email to