[ https://issues.apache.org/jira/browse/FLINK-11429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Aljoscha Krettek closed FLINK-11429. ------------------------------------ Resolution: Not A Problem > Flink fails to authenticate s3a with core-site.xml > -------------------------------------------------- > > Key: FLINK-11429 > URL: https://issues.apache.org/jira/browse/FLINK-11429 > Project: Flink > Issue Type: Bug > Affects Versions: 1.7.1 > Reporter: Mario Georgiev > Priority: Minor > > Hello, > Problem is, if i put the core-site.xml somewhere and add it in the flink > image, put the path to it in the flink-conf.yaml it does not get picked and i > get an exception > {code:java} > Caused by: > org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException: No AWS > Credentials provided by BasicAWSCredentialsProvider > EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : > org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to > load credentials from service endpoint > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:139) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1337) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1277) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:373) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) > ... 31 more > Caused by: > org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to > load credentials from service endpoint > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151) > at > org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:117) > ... 48 more > Caused by: java.net.SocketException: Network unreachable (connect failed) > at java.net.PlainSocketImpl.socketConnect(Native Method) > at > java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) > at > java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) > at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) > at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) > at java.net.Socket.connect(Socket.java:589) > at sun.net.NetworkClient.doConnect(NetworkClient.java:175) > at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) > at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) > at sun.net.www.http.HttpClient.<init>(HttpClient.java:242) > at sun.net.www.http.HttpClient.New(HttpClient.java:339) > at sun.net.www.http.HttpClient.New(HttpClient.java:357) > at > sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220) > at > sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1199) > at > sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050) > at > sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174) > at > org.apache.flink.fs.s3base.shaded.com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122) > {code} > However, if i put the ACCESS_KEY and the SECRET_KEY in ENV variables in the > Dockerfile, they get picked and it works. Why is it disregarding the > core-site.xml? Even if i don't copy the core-site.xml it works only with the > ENV variables. > It looks it is on the classpath, but entirely disregarded > > {code:java} > - Classpath: > /opt/flink-1.7.1/lib/aws-java-sdk-core-1.11.489.jar:/opt/flink-1.7.1/lib/aws-java-sdk-kms-1.11.489.jar:/opt/flink-1.7.1/lib/aws-java-sdk-s3-1.10.6.jar:/opt/flink-1.7.1/lib/flink-python_2.12-1.7.1.jar:/opt/flink-1.7.1/lib/flink-s3-fs-hadoop-1.7.1.jar:/opt/flink-1.7.1/lib/flink-shaded-hadoop2-uber-1.7.1.jar:/opt/flink-1.7.1/lib/hadoop-aws-2.8.0.jar:/opt/flink-1.7.1/lib/httpclient-4.5.6.jar:/opt/flink-1.7.1/lib/httpcore-4.4.11.jar:/opt/flink-1.7.1/lib/jackson-annotations-2.9.8.jar:/opt/flink-1.7.1/lib/jackson-core-2.9.8.jar:/opt/flink-1.7.1/lib/jackson-databind-2.9.8.jar:/opt/flink-1.7.1/lib/job.jar:/opt/flink-1.7.1/lib/joda-time-2.10.1.jar:/opt/flink-1.7.1/lib/log4j-1.2.17.jar:/opt/flink-1.7.1/lib/slf4j-log4j12-1.7.15.jar:/opt/flink-1.7.1/lib/flink-dist_2.12-1.7.1.jar::/hadoop/conf:{code} > > {code:java} > <configuration> > <property> > <name>fs.s3.impl</name> > <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> > </property> > <!-- Comma separated list of local directories used to buffer > large results prior to transmitting them to S3. --> > <property> > <name>fs.s3a.buffer.dir</name> > <value>/tmp</value> > </property> > <property> > <name>fs.s3a.access.key</name> > <description>AWS access key ID. > Omit for IAM role-based or provider-based > authentication.</description> > <value><hidden></value> > </property> > <property> > <name>fs.s3a.secret.key</name> > <description>AWS secret key. > Omit for IAM role-based or provider-based > authentication.</description> > <value><hidden></value> > </property> > <property> > <name>fs.s3a.aws.credentials.provider</name> > <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value> > </property> > </configuration> > {code} > I am building the kubernetes standalone image as following : > Dockerfile : > {code:java} > ################################################################################ > # Licensed to the Apache Software Foundation (ASF) under one > # or more contributor license agreements. See the NOTICE file > # distributed with this work for additional information > # regarding copyright ownership. The ASF licenses this file > # to you under the Apache License, Version 2.0 (the > # "License"); you may not use this file except in compliance > # with the License. You may obtain a copy of the License at > # > # http://www.apache.org/licenses/LICENSE-2.0 > # > # Unless required by applicable law or agreed to in writing, software > # distributed under the License is distributed on an "AS IS" BASIS, > # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > # See the License for the specific language governing permissions and > # limitations under the License. > ################################################################################ > FROM openjdk:8-jre-alpine > # Install requirements > # Modification to original Dockerfile to support rocksdb > # RUN apk add --no-cache bash snappy > # This is a fix for RocksDB compatibility > # Flink environment variables > ENV FLINK_INSTALL_PATH=/opt > ENV FLINK_HOME $FLINK_INSTALL_PATH/flink > ENV FLINK_LIB_DIR $FLINK_HOME/lib > ENV PATH $PATH:$FLINK_HOME/bin > ENV FLINK_CONF $FLINK_HOME/conf > ENV FLINK_OPT $FLINK_HOME/opt > ENV FLINK_HADOOP_CONF /hadoop/conf > # flink-dist can point to a directory or a tarball on the local system > ARG flink_dist=NOT_SET > ARG job_jar=NOT_SET > # Install build dependencies and flink > ADD $flink_dist $FLINK_INSTALL_PATH > ADD $job_jar $FLINK_INSTALL_PATH/job.jar > RUN set -x && \ > ln -s $FLINK_INSTALL_PATH/flink-* $FLINK_HOME && \ > ln -s $FLINK_INSTALL_PATH/job.jar $FLINK_LIB_DIR && \ > addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \ > chown -R flink:flink $FLINK_INSTALL_PATH/flink-* && \ > chown -h flink:flink $FLINK_HOME > # Modification to original Dockerfile > RUN apk add --no-cache bash libc6-compat snappy 'su-exec>=0.2' > COPY core-site.xml $FLINK_HADOOP_CONF/core-site.xml > ENV HADOOP_CONF_DIR=$FLINK_HADOOP_CONF > RUN echo "fs.hdfs.hadoopconf: $FLINK_HADOOP_CONF" >> > $FLINK_CONF/flink-conf.yaml > RUN echo "akka.ask.timeout: 30 min" >> $FLINK_CONF/flink-conf.yaml > RUN echo "akka.client.timeout: 30 min" >> $FLINK_CONF/flink-conf.yaml > RUN echo "web.timeout: 180000" >> $FLINK_CONF/flink-conf.yaml > RUN mv $FLINK_OPT/flink-s3-fs-hadoop-1.7.1.jar $FLINK_LIB_DIR > COPY docker-entrypoint.sh / > RUN chmod +x docker-entrypoint.sh > RUN wget -O $FLINK_LIB_DIR/hadoop-aws-2.8.0.jar > https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.0/hadoop-aws-2.8.0.jar > RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-s3-1.10.6.jar > http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.10.6/aws-java-sdk-s3-1.10.6.jar > #Transitive Dependency of aws-java-sdk-s3 > RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-core-1.11.489.jar > http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.489/aws-java-sdk-core-1.11.489.jar > RUN wget -O $FLINK_LIB_DIR/aws-java-sdk-kms-1.11.489.jar > http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.11.489/aws-java-sdk-kms-1.11.489.jar > RUN wget -O $FLINK_LIB_DIR/jackson-annotations-2.9.8.jar > http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.9.8/jackson-annotations-2.9.8.jar > RUN wget -O $FLINK_LIB_DIR/jackson-core-2.9.8.jar > http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.9.8/jackson-core-2.9.8.jar > RUN wget -O $FLINK_LIB_DIR/jackson-databind-2.9.8.jar > http://central.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.9.8/jackson-databind-2.9.8.jar > RUN wget -O $FLINK_LIB_DIR/joda-time-2.10.1.jar > http://central.maven.org/maven2/joda-time/joda-time/2.10.1/joda-time-2.10.1.jar > RUN wget -O $FLINK_LIB_DIR/httpcore-4.4.11.jar > http://central.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.11/httpcore-4.4.11.jar > RUN wget -O $FLINK_LIB_DIR/httpclient-4.5.6.jar > http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.6/httpclient-4.5.6.jar > #Modification to original Dockerfile > USER flink > EXPOSE 8081 6123 > ENTRYPOINT ["/docker-entrypoint.sh"] > CMD ["--help"] > {code} > > > > {code:java} > import org.apache.commons.lang3.RandomStringUtils; > import org.apache.flink.api.common.functions.FlatMapFunction; > import org.apache.flink.api.common.typeinfo.TypeHint; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; > import org.apache.flink.runtime.state.StateBackend; > import org.apache.flink.runtime.state.filesystem.FsStateBackend; > import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.util.Collector; > import java.util.ArrayList; > import java.util.List; > import java.util.Random; > public class WordCount { > public static void main (String[] args) throws Exception{ > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(5000L); > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000L); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > String words1[] = new String[]{ > "football", > "soccer", > "billiards", > "snooker", > "tennis", > "handball", > "basketball" > }; > List<String> words = new ArrayList<>(); > Random rnd = new Random(); > for (int i =0 ; i < 500000;i++) { > words.add(words1[rnd.nextInt(words1.length-1)]); > } > DataStreamSource<String> src = env.fromElements(words.toArray(new > String[]{})); > src.map(str -> str.toLowerCase()) > .flatMap(new Splitter()) > .returns(TypeInformation.of(new > TypeHint<Tuple2<String,Integer>>(){})) > .keyBy(0) > .sum(1) > .print(); > env.execute(); > } > public static class Splitter implements FlatMapFunction<String, > Tuple2<String, Integer>> { > @Override > public void flatMap(String sentence, Collector<Tuple2<String, > Integer>> out) throws Exception { > for (String word : sentence.split(" ")) { > out.collect(new Tuple2<String, Integer>(word, 1)); > } > } > } > } > {code} > Job manger kubernetes args : > It is a template, so disregard the placeholders > {code:java} > "job-cluster", > "--job-classname", "{classname}", > "-Djobmanager.rpc.address={cluster.name}-jobmanager", > "-Dparallelism.default=2", > "-Dblob.server.port=6124", > "-Dqueryable-state.server.ports=6125", > "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/", > "-Dstate.backend=rocksdb", > "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}", > "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}", > "-Dstate.backend.incremental=true" > {code} > Task manager kubernetes args: > Again, templated > {code:java} > ["task-manager", > "-Djobmanager.rpc.address={cluster.name}-jobmanager", > "-Dstate.backend.rocksdb.localdir=/tmp/{cluster.name}/", > "-Dstate.backend=rocksdb", > "-Dstate.checkpoints.dir=s3a://<whatever>/checkpoints/{cluster.name}", > "-Dstate.savepoints.dir=s3a://<whatever>/savepoints/{cluster.name}", > "-Dstate.backend.incremental=true"] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)