It seems the issue also appears when using Flink version 1.6.2 . ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐ On Tuesday, October 30, 2018 10:26 PM, Flink Developer <[email protected]> wrote:
> Hi, thanks for the info Rafi, that seems to be related. I hope Flink version > 1.6.2 fixes this. Has anyone encountered this before? > > I would also like to note that my jar includes a core-site.xml file that uses > *s3a*. Is this the recommended configuration to use with BucketingSink? > Should the sink be specified using s3a://<bucket>/<prefix> or > s3://<bucket>/<prefix> ? > > - <configuration> > - <property> > - <name>fs.s3.impl</name> > - <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> > - </property> > - <property> > - <name>fs.s3a.buffer.dir</name> > - <value>/tmp</value> > - </property> > - <property> > - <name>fs.s3a.access.key</name> > - <value>xxxxx</value> > - </property> > - <property> > - <name>fs.s3a.secret.key</name> > - <value>xxxxx</value> > - </property> > - <property> > - <name>fs.s3a.buffer.dir</name> > - <value>/tmp</value> > - </property> > - </configuration> > > And my pom.xml uses: > > - <artifactId>flink-s3-fs-hadoop</artifactId> > - ... > - <artifactId>flink-statebackend-rocksdb_2.11</artifactId> > - ... > - <artifactId>hadoop-hdfs</artifactId> > - ... > - <artifactId>hadoop-common</artifactId> > - ... > - <artifactId>hadoop-core</artifactId> > - ... > - <artifactId>hadoop-aws</artifactId> > - ... > > ‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐ > On Sunday, October 28, 2018 8:08 AM, Rafi Aroch <[email protected]> wrote: > >> Hi, >> >> I'm also experiencing this with Flink 1.5.2. This is probably related to >> BucketingSink not working properly with S3 as filesystem because of the >> eventual-consistency of S3. >> >> I see that https://issues.apache.org/jira/browse/FLINK-9752 will be part of >> 1.6.2 release. It might help, if you use the flink-s3-fs-hadoop (and not >> presto). >> >> Does anyone know if this fix would solve this issue? >> >> Thanks, >> Rafi >> >> On Sun, Oct 28, 2018 at 12:08 AM Flink Developer >> <[email protected]> wrote: >> >>> Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17, hadoop >>> 2.8.4) with flink parallelization set to 400. The source is a Kafka topic >>> and sinks to S3 in the format of: >>> s3://<day>/<hour>/<worker_number>/<files>. There's potentially 400 files >>> writing simultaneously. >>> >>> Configuration: >>> - Flink v1.5.2 >>> - Checkpointing enabled w/ RocksDb (flink-statebackend-rocksdb_2.11, >>> v1.6.1). Interval is every 2 mins with max concurrent set to 1. Min pause >>> between checkpoints in 2 mins. Timeout is set to 2 mins. >>> - BucketingSink (flink-connector-filesystem_2.11, v1.6.1). >>> - Batch file size is set to 5mb. >>> - Batch rollover interval is set to 30min >>> - Writer uses GZip compression >>> - Hadoop Maven Dependencies (hadoop-hdfs v3.1.1, hadoop-common v3.1.1, >>> hadoop-core v1.2.1, hadoop-aws v3.1.1) >>> >>> The app is able to run for hours straight, but occasionally (once or twice >>> a day), it displays the following exception. When this happens, the app is >>> able to recover from previous checkpoint, but I am concerned about the >>> exception: >>> >>> Caused by: java.io.IOException: >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: >>> Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: >>> xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx >>> >>> - at >>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.handleAmazonServiceException(Jets3tNativeFileSystemStore.java:446) >>> >>> - at >>> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:427) >>> >>> - at >>> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1297) >>> >>> - at >>> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:312) >>> >>> - at >>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:815) >>> >>> Caused by: >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: >>> Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID: >>> xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx >>> >>> - at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1632) >>> >>> - at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304) >>> >>> - at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058) >>> >>> - at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) >>> >>> - at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) >>> >>> - at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) >>> >>> - at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) >>> >>> - at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) >>> >>> - at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) >>> >>> - at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4365) >>> >>> - at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4312) >>> >>> - at >>> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1270) >>> >>> - at >>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22) >>> >>> - at >>> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:8) >>> >>> - at >>> com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Exxecutor.execute(GlobalS3Executor.java:91) >>> >>> And sometimes, it will show this: >>> >>> - java.lang.RuntimeException: Error while restoring BucketingSink state. >>> >>> - at >>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:888) >>> >>> - at >>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handleRestoredBucketState(BucketingSink.java:767) >>> >>> - at >>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:394) >>> >>> - at >>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) >>> >>> - at >>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) >>> >>> - at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) >>> >>> - at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254) >>> >>> - at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730) >>> >>> - at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295) >>> >>> - at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) >>> >>> What causes this and how can it be resolved? Thank you. >>> >>> There seems to be a related Flink ticket and PR here, but I'm not sure if >>> this is the exact same issue and if it has been resolved: >>> https://issues.apache.org/jira/browse/FLINK-6306 >>> https://github.com/apache/flink/pull/3752 >>> https://github.com/apache/flink/pull/4607
