Hello Flink users,
I could use help with three related questions:
1) How can I observe retries in the flink-s3-fs-hadoop connector?
2) How can I tell if flink-s3-fs-hadoop is actually managing to pick up
the hadoop configuration I have provided, as opposed to some separate
default configuration? My job fails quickly when I read larger or more
numerous objects from S3. I conjecture the failure may be related to
insufficient retries when S3 throttles.
3) What s3 fault recovery approach would you recommend?
Background:
I am having trouble with reliable operation of the flink-s3-fs-hadoop
connector. My application sources all its DataStream data from S3, and
appears to get frequently throttled by s3:
Caused by:
org.apache.flink.streaming.runtime.tasks.AsynchronousException:
Caught exception when processing split: [0]
s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@
1586911084000 : 0 + 33554432
. . .
Caused by: java.io.InterruptedIOException: Failed to open
s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at 0 on
s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
com.amazonaws.SdkClientException: Unable to execute HTTP request:
Timeout waiting for connection from pool
The s3 throttling does not seem to trigger retries and so
causes the job to fail. For troubleshooting purposes, the job stays up
for much longer if I reduce s3 inputs to my job by disabling functionality.
I see in the documentation for hadoop-aws that there are properties
such as fs.s3.maxRetries fs.s3.sleepTimeSeconds for handling retries
within hadoop.
After wrangling with some classpath troubles, I managed to get
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar to parse a set of hadoop
configuration files {core/hdfs/mapred/yarn}-site.xml. I can confirm
that the cluster parses the configuration by passing invalid xml and
seeing the cluster crash.
The puzzle with which I am now faced is that the configuration for
retries and timeouts in core-site.xml seems to have no effect on the
application.
I deploy in kubernetes with a custom docker image. For now, I have
not enabled the zookeeper-based HA.
See below for a frequent stacktrace that I interpret as likely to be
caused by s3 throttling.
Thanks in advance for any help.
Regards,
Jeff Henrikson
2020-04-30 19:35:24
org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
at
jdk.internal.reflect.GeneratedMethodAccessor66.invoke(Unknown Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by:
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
exception when processing split: [0]
s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@ 1586911084000
: 0 + 33554432
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
Caused by: java.io.InterruptedIOException: Failed to open
s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at 0 on
s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
com.amazonaws.SdkClientException: Unable to execute HTTP request:
Timeout waiting for connection from pool
at
org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:340)
at
org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:171)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at
org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:181)
at
org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:327)
at
org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:190)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
at
org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
at
org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:188)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:210)
at
org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:320)
at
org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:428)
at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
at
org.apache.flink.fs.s3hadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at
org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:695)
at
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:483)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:315)
Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP
request: Timeout waiting for connection from pool
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
at
com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)
at
org.apache.hadoop.fs.s3a.S3AInputStream.lambda$reopen$0(S3AInputStream.java:182)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 16 more
Caused by: org.apache.http.conn.ConnectionPoolTimeoutException:
Timeout waiting for connection from pool
at
org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:292)
at
org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:269)
at
jdk.internal.reflect.GeneratedMethodAccessor79.invoke(Unknown Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
at com.amazonaws.http.conn.$Proxy21.get(Unknown Source)
at
org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:191)
at
org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
at
org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
at
com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1236)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
... 27 more