Hi,

I'm also experiencing this with Flink 1.5.2. This is probably related to
BucketingSink not working properly with S3 as filesystem because of the
eventual-consistency of S3.

I see that https://issues.apache.org/jira/browse/FLINK-9752 will be part of
1.6.2 release. It might help, if you use the flink-s3-fs-hadoop (and not
presto).

Does anyone know if this fix would solve this issue?

Thanks,
Rafi


On Sun, Oct 28, 2018 at 12:08 AM Flink Developer <
developer...@protonmail.com> wrote:

> Hi, I'm running a scala flink app in an AWS EMR cluster (emr 5.17, hadoop
> 2.8.4)  with flink parallelization set to 400. The source is a Kafka topic
> and sinks to S3 in the format of:
> s3://<day>/<hour>/<worker_number>/<files>. There's potentially 400 files
> writing simultaneously.
>
> *Configuration:*
> - Flink v1.5.2
> - Checkpointing enabled w/ RocksDb (flink-statebackend-rocksdb_2.11,
> v1.6.1). Interval is every 2 mins with max concurrent set to 1. Min pause
> between checkpoints in 2 mins. Timeout is set to 2 mins.
> - BucketingSink (flink-connector-filesystem_2.11, v1.6.1).
> - Batch file size is set to 5mb.
> - Batch rollover interval is set to 30min
> - Writer uses GZip compression
> - Hadoop Maven Dependencies (hadoop-hdfs v3.1.1, hadoop-common v3.1.1,
> hadoop-core v1.2.1, hadoop-aws v3.1.1)
>
> The app is able to run for hours straight, but occasionally (once or twice
> a day), it displays the following exception. When this happens, the app is
> able to recover from previous checkpoint, but I am concerned about the
> exception:
>
> *Caused by: java.io.IOException:
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
> Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID:
> xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx*
>
>    - *at
>    
> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.handleAmazonServiceException(Jets3tNativeFileSystemStore.java:446)*
> - *at
>    
> com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:427)*
> - *at
>    
> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1297)*
> - *at
>    com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:312)*
> -
>
> *at
>    
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:815)
>    *
>
> *Caused by:
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
> Not Found (Service: Amazon S3; Status Code 404; Error ... Request ID:
> xxxxxxxx, S3 Extended Request ID: xxxxxxxxxxxxxxxxxxxxxxxxxx*
>
>    - *at
>    
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1632)*
> - *at
>    
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)*
> - *at
>    
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058)*
> - *at
>    
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)*
> - *at
>    
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)*
> - *at
>    
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)*
> - *at
>    
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)*
> - *at
>    
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)*
> - *at
>    
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)*
> - *at
>    
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4365)*
> - *at
>    
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4312)*
> - *at
>    
> com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1270)*
> - *at
>    
> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)*
> - *at
>    
> com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:8)*
> - *at
>    
> com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Exxecutor.execute(GlobalS3Executor.java:91)*
>
>
> *And sometimes, it will show this:*
>
>    - *java.lang.RuntimeException: Error while restoring BucketingSink
>    state.*
> - *at
>    
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:888)*
> - *at
>    
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handleRestoredBucketState(BucketingSink.java:767)*
> - *at
>    
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:394)*
> - *at
>    
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)*
> - *at
>    
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)*
> - *at
>    
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)*
> - *at
>    
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)*
> - *at
>    
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)*
> - *at
>    
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)*
> - *at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)*
>
>
> What causes this and how can it be resolved? Thank you.
>
> There seems to be a related Flink ticket and PR here, but I'm not sure if
> this is the exact same issue and if it has been resolved:
> https://issues.apache.org/jira/browse/FLINK-6306
> https://github.com/apache/flink/pull/3752
> https://github.com/apache/flink/pull/4607
>

Reply via email to