Robert,

Thanks for the tip!

Before you replied, I did figure out to put the keys in flink-conf.yaml, using btrace. I instrumented the methods org.apache.hadoop.conf.Configuration.get for the keys, and org.apache.hadoop.conf.Configuration.substituteVars for effective values. (There is a btrace bug where you can't just observe the return value from .get directly.)

I did not see in the code any way to observe the effective configuration using logging.

Regards,


Jeff



On 5/8/20 7:29 AM, Robert Metzger wrote:
I validated my assumption. Putting

s3.connection.maximum: 123456

into the flink-conf.yaml file results in the following DEBUG log output:

2020-05-08 16:20:47,461 DEBUG org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding Flink config entry for s3.connection.maximum as fs.s3a.connection.maximum to Hadoop config

I guess that is the recommended way of passing configuration into the S3 connectors of Flink.

You also asked how to detect retries: DEBUG-log level is helpful again. I just tried connecting against an invalid port, and got these messages:

2020-05-08 16:26:37,671 DEBUG org.apache.http.impl.conn.DefaultManagedHttpClientConnection [] - http-outgoing-7: Shutdown connection 2020-05-08 16:26:37,671 DEBUG org.apache.http.impl.execchain.MainClientExec                [] - Connection discarded 2020-05-08 16:26:37,671 DEBUG org.apache.http.impl.conn.PoolingHttpClientConnectionManager [] - Connection released: [id: 7][route: {}->http://127.0.0.1:9000][total kept alive: 0; route allocated: 0 of 123456; total allocated: 0 of 123456] 2020-05-08 16:26:37,671 DEBUG com.amazonaws.request                    [] - Retrying Request: HEAD http://127.0.0.1:9000 /test/ Headers: (User-Agent: Hadoop 3.1.0, aws-sdk-java/1.11.271 Mac_OS_X/10.15.3 OpenJDK_64-Bit_Server_VM/25.252-b09 java/1.8.0_252 scala/2.11.12, amz-sdk-invocation-id: 051f9877-1c22-00ed-ad26-8361bcf14b98, Content-Type: application/octet-stream, ) 2020-05-08 16:26:37,671 DEBUG com.amazonaws.http.AmazonHttpClient                    [] - Retriable error detected, will retry in 4226ms, attempt number: 7


maybe it makes sense to set the log level only for "com.amazonaws.http.AmazonHttpClient" to DEBUG.

How to configure the log level depends on the deployment method. Usually, its done by replacing the first INFO with DEBUG in conf/log4j.properties. ("rootLogger.level = DEBUG")


Best,
Robert

On Fri, May 8, 2020 at 3:51 PM Robert Metzger <rmetz...@apache.org <mailto:rmetz...@apache.org>> wrote:

    Hey Jeff,

    Which Flink version are you using?
    Have you tried configuring the S3 filesystem via Flink's  config
    yaml? Afaik all config parameters prefixed with "s3." are mirrored
    into the Hadoop file system connector.


    On Mon, May 4, 2020 at 8:45 PM Jeff Henrikson <jehenri...@gmail.com
    <mailto:jehenri...@gmail.com>> wrote:

          > 2) How can I tell if flink-s3-fs-hadoop is actually managing
        to pick up
          > the hadoop configuration I have provided, as opposed to some
        separate
          > default configuration?

        I'm reading the docs and source of flink-fs-hadoop-shaded.  I
        see that
        core-default-shaded.xml has fs.s3a.connection.maximum set to
        15.  I have
        around 20 different DataStreams being instantiated from S3, so
        if they
        each require one connection to be healthy, then 15 is definitely
        not a
        good value.

        However, I seem to be unable to override
        fs.s3a.connection.maximum using
        my core-site.xml.  I am also unable to see the DEBUG level
        messages for
        the shaded flink-fs-hadoop-shaded if I set log4j.rootLogger=DEBUG.

        So now I'm wondering:

              1) Anybody know how to see DEBUG output for
        flink-fs-hadoop-shaded?

              2) Am I going to end up rebuilding flink-fs-hadoop-shaded to
              override the config?


        Thanks in advance,


        Jeff Henrikson



        
https://github.com/apache/flink/tree/master/flink-filesystems/flink-fs-hadoop-shaded

        
https://github.com/apache/flink/blob/master/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/core-default-shaded.xml

            <property>
              <name>fs.s3a.connection.maximum</name>
              <value>15</value>
              <description>Controls the maximum number of simultaneous
        connections to S3.</description>
            </property>




        On 5/1/20 7:30 PM, Jeff Henrikson wrote:
         > Hello Flink users,
         >
         > I could use help with three related questions:
         >
         > 1) How can I observe retries in the flink-s3-fs-hadoop connector?
         >
         > 2) How can I tell if flink-s3-fs-hadoop is actually managing
        to pick up
         > the hadoop configuration I have provided, as opposed to some
        separate
         > default configuration?  My job fails quickly when I read
        larger or more
         > numerous objects from S3.  I conjecture the failure may be
        related to
         > insufficient retries when S3 throttles.
         >
         > 3) What s3 fault recovery approach would you recommend?
         >
         > Background:
         >
         > I am having trouble with reliable operation of the
        flink-s3-fs-hadoop
         > connector.   My application sources all its DataStream data
        from S3, and
         > appears to get frequently throttled by s3:
         >
         >      Caused by:
> org.apache.flink.streaming.runtime.tasks.AsynchronousException:
         >      Caught exception when processing split: [0]
         >      s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@
         >      1586911084000 : 0 + 33554432
         >      . . .
         >      Caused by: java.io
        <http://java.io>.InterruptedIOException: Failed to open
         >      s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at
        0 on
         >      s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
         >      com.amazonaws.SdkClientException: Unable to execute HTTP
        request:
         >      Timeout waiting for connection from pool
         >
         > The s3 throttling does not seem to trigger retries and so
         > causes the job to fail.  For troubleshooting purposes, the
        job stays up
         > for much longer if I reduce s3 inputs to my job by disabling
        functionality.
         >
         > I see in the documentation for hadoop-aws that there are
        properties
         > such as fs.s3.maxRetries fs.s3.sleepTimeSeconds for handling
        retries
         > within hadoop.
         >
         > After wrangling with some classpath troubles, I managed to get
         > flink-shaded-hadoop-2-uber-2.8.3-10.0.jar to parse a set of
        hadoop
         > configuration files {core/hdfs/mapred/yarn}-site.xml.  I can
        confirm
         > that the cluster parses the configuration by passing invalid
        xml and
         > seeing the cluster crash.
         >
         > The puzzle with which I am now faced is that the
        configuration for
         > retries and timeouts in core-site.xml seems to have no effect
        on the
         > application.
         >
         > I deploy in kubernetes with a custom docker image.  For now,
        I have
         > not enabled the zookeeper-based HA.
         >
         > See below for a frequent stacktrace that I interpret as
        likely to be
         > caused by s3 throttling.
         >
         > Thanks in advance for any help.
         >
         > Regards,
         >
         >
         > Jeff Henrikson
         >
         >
         >
         >      2020-04-30 19:35:24
         >      org.apache.flink.runtime.JobException: Recovery is
        suppressed by
         > NoRestartBackoffTimeStrategy
         >          at
         >
        
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)

         >
         >          at
         >
        
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)

         >
         >          at
         >
        
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)

         >
         >          at
         >
        
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)

         >
         >          at
         >
        
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)

         >
         >          at
         >
        
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)

         >
         >          at
         >
        
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)

         >
         >          at
         > jdk.internal.reflect.GeneratedMethodAccessor66.invoke(Unknown
        Source)
         >          at
         >
        
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

         >
         >          at
        java.base/java.lang.reflect.Method.invoke(Method.java:566)
         >          at
         >
        
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)

         >
         >          at
         >
        
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)

         >
         >          at
         >
        
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)

         >
         >          at
         >
        
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)

         >
         >          at akka.japi.pf
        <http://akka.japi.pf>.UnitCaseStatement.apply(CaseStatements.scala:26)
         >          at akka.japi.pf
        <http://akka.japi.pf>.UnitCaseStatement.apply(CaseStatements.scala:21)
         >          at
        scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
         >          at
        scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
         >          at
         > akka.japi.pf
        
<http://akka.japi.pf>.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
         >          at
         >
        scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
         >          at
         >
        scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
         >          at
         >
        scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
         >          at akka.actor.Actor.aroundReceive(Actor.scala:517)
         >          at akka.actor.Actor.aroundReceive$(Actor.scala:515)
         >          at
        akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
         >          at
        akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
         >          at akka.actor.ActorCell.invoke(ActorCell.scala:561)
         >          at
        akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
         >          at akka.dispatch.Mailbox.run(Mailbox.scala:225)
         >          at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
         >          at
         > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
         >          at
         >
        
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

         >
         >          at
         >
        akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
         >          at
         >
        
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

         >
         >      Caused by:
         >
        org.apache.flink.streaming.runtime.tasks.AsynchronousException:
        Caught
         > exception when processing split: [0]
         > s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@
        1586911084000
         > : 0 + 33554432
         >          at
         >
        
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090)

         >
         >          at
         >
        
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058)

         >
         >          at
         >
        
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)

         >
         >      Caused by: java.io
        <http://java.io>.InterruptedIOException: Failed to open
         > s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at 0 on
         > s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
         > com.amazonaws.SdkClientException: Unable to execute HTTP
        request:
         > Timeout waiting for connection from pool
         >          at
         >
        
org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:340)

         >
         >          at
         >
        org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:171)
         >          at
        org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
         >          at
         >
        org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:181)
         >          at
         >
        
org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:327)

         >
         >          at
         > org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:190)
         >          at
        org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
         >          at
         > org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
         >          at
         >
        org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
         >          at
        org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
         >          at
        org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:188)
         >          at
        org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:210)
         >          at
         >
        
org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:320)
         >          at
         >
        org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:428)
         >          at
         > java.base/java.io
        <http://java.io>.DataInputStream.read(DataInputStream.java:149)
         >          at
         >
        
org.apache.flink.fs.s3hadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:94)

         >
         >          at
         >
        
org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:695)

         >
         >          at
         >
        
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:483)

         >
         >          at
         >
        
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:315)

         >
         >      Caused by: com.amazonaws.SdkClientException: Unable to
        execute HTTP
         > request: Timeout waiting for connection from pool
         >          at
         >
        
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1114)

         >
         >          at
         >
        
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1064)

         >
         >          at
         >
        
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)

         >
         >          at
         >
        
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)

         >
         >          at
         >
        
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)

         >
         >          at
         >
        
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)

         >
         >          at
         >
        
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)

         >
         >          at
         >
        com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
         >          at
         >
        
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
         >          at
         >
        
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
         >          at
         >
        
com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1409)

         >
         >          at
         >
        
org.apache.hadoop.fs.s3a.S3AInputStream.lambda$reopen$0(S3AInputStream.java:182)

         >
         >          at
        org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
         >          ... 16 more
         >      Caused by:
        org.apache.http.conn.ConnectionPoolTimeoutException:
         > Timeout waiting for connection from pool
         >          at
         >
        
org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:292)

         >
         >          at
         >
        
org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:269)

         >
         >          at
         > jdk.internal.reflect.GeneratedMethodAccessor79.invoke(Unknown
        Source)
         >          at
         >
        
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

         >
         >          at
        java.base/java.lang.reflect.Method.invoke(Method.java:566)
         >          at
         >
        
com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)

         >
         >          at com.amazonaws.http.conn.$Proxy21.get(Unknown Source)
         >          at
         >
        
org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:191)

         >
         >          at
         >
        
org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
         >          at
         >
        
org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)

         >
         >          at
         >
        
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)

         >
         >          at
         >
        
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)

         >
         >          at
         >
        
com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)

         >
         >          at
         >
        
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1236)

         >
         >          at
         >
        
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)

         >
         >          ... 27 more
         >

Reply via email to