[ 
https://issues.apache.org/jira/browse/FLINK-11429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-11429.
------------------------------------
    Resolution: Not A Problem

> Flink fails to authenticate s3a with core-site.xml
> --------------------------------------------------
>
>                 Key: FLINK-11429
>                 URL: https://issues.apache.org/jira/browse/FLINK-11429
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.7.1
>            Reporter:  Mario Georgiev
>            Priority: Minor
>
> Hello,
> Problem is, if i put the core-site.xml somewhere and add it in the flink 
> image, put the path to it in the flink-conf.yaml it does not get picked and i 
> get an exception 
> {code:java}
> Caused by: 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException: No AWS 
> Credentials provided by BasicAWSCredentialsProvider 
> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to 
> load credentials from service endpoint
> at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:139)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1337)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1277)
> at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:373)
> at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
> ... 31 more
> Caused by: 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to 
> load credentials from service endpoint
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)
> at 
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:117)
> ... 48 more
> Caused by: java.net.SocketException: Network unreachable (connect failed)
> at java.net.PlainSocketImpl.socketConnect(Native Method)
> at 
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
> at 
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
> at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
> at java.net.Socket.connect(Socket.java:589)
> at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
> at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
> at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
> at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
> at sun.net.www.http.HttpClient.New(HttpClient.java:339)
> at sun.net.www.http.HttpClient.New(HttpClient.java:357)
> at 
> sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
> at 
> sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199)
> at 
> sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
> at 
> sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)
> at 
> org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
> {code}
> However, if i put the ACCESS_KEY and the SECRET_KEY in ENV variables in the 
> Dockerfile, they get picked and it works. Why is it disregarding the 
> core-site.xml? Even if i don't copy the core-site.xml it works only with the 
> ENV variables.
> It looks it is on the classpath, but entirely disregarded
>  
> {code:java}
>     -  Classpath: 
> /opt/flink-1.7.1/lib/aws-java-sdk-core-1.11.489.jar:/opt/flink-1.7.1/lib/aws-java-sdk-kms-1.11.489.jar:/opt/flink-1.7.1/lib/aws-java-sdk-s3-1.10.6.jar:/opt/flink-1.7.1/lib/flink-python_2.12-1.7.1.jar:/opt/flink-1.7.1/lib/flink-s3-fs-hadoop-1.7.1.jar:/opt/flink-1.7.1/lib/flink-shaded-hadoop2-uber-1.7.1.jar:/opt/flink-1.7.1/lib/hadoop-aws-2.8.0.jar:/opt/flink-1.7.1/lib/httpclient-4.5.6.jar:/opt/flink-1.7.1/lib/httpcore-4.4.11.jar:/opt/flink-1.7.1/lib/jackson-annotations-2.9.8.jar:/opt/flink-1.7.1/lib/jackson-core-2.9.8.jar:/opt/flink-1.7.1/lib/jackson-databind-2.9.8.jar:/opt/flink-1.7.1/lib/job.jar:/opt/flink-1.7.1/lib/joda-time-2.10.1.jar:/opt/flink-1.7.1/lib/log4j-1.2.17.jar:/opt/flink-1.7.1/lib/slf4j-log4j12-1.7.15.jar:/opt/flink-1.7.1/lib/flink-dist_2.12-1.7.1.jar::/hadoop/conf:{code}
>  
> {code:java}
> <configuration>
>     <property>
>         <name>fs.s3.impl</name>
>         <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
>     </property>
>     <!-- Comma separated list of local directories used to buffer
>          large results prior to transmitting them to S3. -->
>     <property>
>         <name>fs.s3a.buffer.dir</name>
>         <value>/tmp</value>
>     </property>
>     <property>
>         <name>fs.s3a.access.key</name>
>         <description>AWS access key ID.
>             Omit for IAM role-based or provider-based 
> authentication.</description>
>         <value><hidden></value>
>     </property>
>     <property>
>         <name>fs.s3a.secret.key</name>
>         <description>AWS secret key.
>             Omit for IAM role-based or provider-based 
> authentication.</description>
>         <value><hidden></value>
>     </property>
>     <property>
>         <name>fs.s3a.aws.credentials.provider</name>
>         <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
>     </property>
> </configuration>
> {code}
> I am building the kubernetes standalone image as following :
> Dockerfile : 
> {code:java}
> ################################################################################
> #  Licensed to the Apache Software Foundation (ASF) under one
> #  or more contributor license agreements.  See the NOTICE file
> #  distributed with this work for additional information
> #  regarding copyright ownership.  The ASF licenses this file
> #  to you under the Apache License, Version 2.0 (the
> #  "License"); you may not use this file except in compliance
> #  with the License.  You may obtain a copy of the License at
> #
> #      http://www.apache.org/licenses/LICENSE-2.0
> #
> #  Unless required by applicable law or agreed to in writing, software
> #  distributed under the License is distributed on an "AS IS" BASIS,
> #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> #  See the License for the specific language governing permissions and
> # limitations under the License.
> ################################################################################
> FROM openjdk:8-jre-alpine
> # Install requirements
> # Modification to original Dockerfile to support rocksdb
> # RUN apk add --no-cache bash snappy
> # This is a fix for RocksDB compatibility
> # Flink environment variables
> ENV FLINK_INSTALL_PATH=/opt
> ENV FLINK_HOME $FLINK_INSTALL_PATH/flink
> ENV FLINK_LIB_DIR $FLINK_HOME/lib
> ENV PATH $PATH:$FLINK_HOME/bin
> ENV FLINK_CONF $FLINK_HOME/conf
> ENV FLINK_OPT $FLINK_HOME/opt
> ENV FLINK_HADOOP_CONF /hadoop/conf
> # flink-dist can point to a directory or a tarball on the local system
> ARG flink_dist=NOT_SET
> ARG job_jar=NOT_SET
> # Install build dependencies and flink
> ADD $flink_dist $FLINK_INSTALL_PATH
> ADD $job_jar $FLINK_INSTALL_PATH/job.jar
> RUN set -x && \
>   ln -s $FLINK_INSTALL_PATH/flink-* $FLINK_HOME && \
>   ln -s $FLINK_INSTALL_PATH/job.jar $FLINK_LIB_DIR && \
>   addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \
>   chown -R flink:flink $FLINK_INSTALL_PATH/flink-* && \
>   chown -h flink:flink $FLINK_HOME
> # Modification to original Dockerfile
> RUN apk add --no-cache bash libc6-compat snappy 'su-exec>=0.2'
> COPY core-site.xml $FLINK_HADOOP_CONF/core-site.xml
> ENV HADOOP_CONF_DIR=$FLINK_HADOOP_CONF
> RUN echo "fs.hdfs.hadoopconf: $FLINK_HADOOP_CONF" >> 
> $FLINK_CONF/flink-conf.yaml
> RUN echo "akka.ask.timeout: 30 min" >> $FLINK_CONF/flink-conf.yaml
> RUN echo "akka.client.timeout: 30 min" >> $FLINK_CONF/flink-conf.yaml
> RUN echo "web.timeout: 180000" >> $FLINK_CONF/flink-conf.yaml
> RUN mv $FLINK_OPT/flink-s3-fs-hadoop-1.7.1.jar $FLINK_LIB_DIR
> COPY docker-entrypoint.sh /
> RUN chmod +x docker-entrypoint.sh
> RUN wget -O $FLINK_LIB_DIR/hadoop-aws-2.8.0.jar 
> https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.0/hadoop-aws-2.8.0.jar
> RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-s3-1.10.6.jar 
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar
> #Transitive Dependency of aws-java-sdk-s3
> RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-core-1.11.489.jar 
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.489/aws-java-sdk-core-1.11.489.jar
> RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-kms-1.11.489.jar 
> http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.11.489/aws-java-sdk-kms-1.11.489.jar
> RUN wget -O $FLINK_LIB_DIR/jackson-annotations-2.9.8.jar 
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.9.8/jackson-annotations-2.9.8.jar
> RUN wget -O $FLINK_LIB_DIR/jackson-core-2.9.8.jar 
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.9.8/jackson-core-2.9.8.jar
> RUN wget -O $FLINK_LIB_DIR/jackson-databind-2.9.8.jar 
> http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.9.8/jackson-databind-2.9.8.jar
> RUN wget -O $FLINK_LIB_DIR/joda-time-2.10.1.jar 
> http://central.maven.org/maven2/joda-time/joda-time/2.10.1/joda-time-2.10.1.jar
> RUN wget -O $FLINK_LIB_DIR/httpcore-4.4.11.jar 
> http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.11/httpcore-4.4.11.jar
> RUN wget -O $FLINK_LIB_DIR/httpclient-4.5.6.jar 
> http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.6/httpclient-4.5.6.jar
> #Modification to original Dockerfile
> USER flink
> EXPOSE 8081 6123
> ENTRYPOINT ["/docker-entrypoint.sh"]
> CMD ["--help"]
> {code}
>  
>  
>  
> {code:java}
> import org.apache.commons.lang3.RandomStringUtils;
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
> import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.util.Collector;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Random;
> public class WordCount {
>     public static void main (String[] args) throws Exception{
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.enableCheckpointing(5000L);
>         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000L);
>         
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>   
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>         String words1[] = new String[]{
>                 "football",
>                 "soccer",
>                 "billiards",
>                 "snooker",
>                 "tennis",
>                 "handball",
>                 "basketball"
>         };
>         List<String> words = new ArrayList<>();
>         Random rnd = new Random();
>         for (int i =0 ; i < 500000;i++) {
>             words.add(words1[rnd.nextInt(words1.length-1)]);
>         }
>         DataStreamSource<String> src = env.fromElements(words.toArray(new 
> String[]{}));
>         src.map(str -> str.toLowerCase())
>                 .flatMap(new Splitter())
>                 .returns(TypeInformation.of(new 
> TypeHint<Tuple2<String,Integer>>(){}))
>                 .keyBy(0)
>                 .sum(1)
>                 .print();
>         env.execute();
>     }
>     public static class Splitter implements FlatMapFunction<String, 
> Tuple2<String, Integer>> {
>         @Override
>         public void flatMap(String sentence, Collector<Tuple2<String, 
> Integer>> out) throws Exception {
>             for (String word : sentence.split(" ")) {
>                 out.collect(new Tuple2<String, Integer>(word, 1));
>             }
>         }
>     }
> }
> {code}
> Job manger kubernetes args :
>  It is a template, so disregard the placeholders
> {code:java}
>        "job-cluster",
>        "--job-classname", "{classname}",
>        "-Djobmanager.rpc.address={cluster.name}-jobmanager",
>        "-Dparallelism.default=2",
>        "-Dblob.server.port=6124",
>        "-Dqueryable-state.server.ports=6125",
>        "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
>        "-Dstate.backend=rocksdb",
>        "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
>        "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
>        "-Dstate.backend.incremental=true"
> {code}
> Task manager kubernetes args: 
>  Again, templated
> {code:java}
>        ["task-manager",
>        "-Djobmanager.rpc.address={cluster.name}-jobmanager",
>        "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/",
>        "-Dstate.backend=rocksdb",
>        "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}",
>        "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}",
>        "-Dstate.backend.incremental=true"]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to