Hi Niels, You are right. The S3 related configurations you have set in your `main()` is only applicable in the client side. Since the filesystem is initialized in the entrypoint of JM/TM for only once. AFAIK, we could not provide different credentials for each job in the same session cluster.
Best, Yang Niels Basjes <ni...@basjes.nl> 于2020年2月28日周五 下午11:09写道: > Hi, > > As I mentioned in my original email I already verified that the endpoints > were accessible from the pods, that was not the problem. > > It took me a while but I've figured out what went wrong. > > Setting the configuration like I did > > final Configuration conf = new Configuration(); > conf.setString("presto.s3.endpoint", > "s3.example.nl");conf.setString("presto.s3.access-key", > "myAccessKey");conf.setString("presto.s3.secret-key", > "mySecretKey");FileSystem.initialize(conf, null); > > sets it in some static variables that do not get serialized and shipped > into the task managers. > > As a consequence, under the absence of credentials the AWS/S3 client > assumes it is running inside AWS and that it can retrieve the credentials > from http://169.254.170.2 (which is non routable) > Because this is not AWS it cannot do this and I get the error it cannot > connect. > > For now my solution is to start the Flink Session with this > #!/bin/bash > ./flink-1.10.0/bin/kubernetes-session.sh \ > -Dkubernetes.cluster-id=flink1100 \ > -Dtaskmanager.memory.process.size=8192m \ > -Dkubernetes.taskmanager.cpu=2 \ > -Dtaskmanager.numberOfTaskSlots=4 \ > -Dresourcemanager.taskmanager-timeout=3600000 \ > -Dkubernetes.container.image= > docker.example.nl/flink:1.10.0-2.12-s3-presto \ > -Dpresto.s3.endpoint=s3.example.nl \ > -Dpresto.s3.access-key=MyAccessKey \ > -Dpresto.s3.secret-key=MySecretKey \ > -Dpresto.s3.path.style.access=true > > I dislike this because now ALL jobs in this Flink cluster have the same > credentials. > > Is there a way to set the S3 credentials on a per job or even per > connection basis? > > Niels Basjes > > > On Fri, Feb 28, 2020 at 4:38 AM Yang Wang <danrtsey...@gmail.com> wrote: > >> Hi Niels, >> >> Glad to hear that you are trying Flink native K8s integration and share >> you feedback. >> >> What is causing the differences in behavior between local and in k8s? It >>> works locally but not in the cluster. >> >> >> In your case, the job could be executed successfully local. That means S3 >> endpoint could be accessed in >> your local network environment. When you submit the job to the K8s >> cluster, the user `main()` will be executed >> locally and get the job graph. Then it will be submitted to the cluster >> for the execution. S3 endpoint will be >> accessed under the K8s network. So maybe there is something wrong with >> the network between taskmanager >> and S3 endpoint. >> >> How do I figure out what network it is trying to reach in k8s? >> >> >> I am not an expert of S3. So i am not sure whether the SDK will fetch the >> credentials from S3 endpoint. If it is, >> i think you need to find out which taskmanager the source operator is >> running on. Then exec into the Pod and >> use nslookup/curl to make sure the endpoint "s3.example.nl" could be >> resolved and accessed successfully. >> >> >> >> Best, >> Yang >> >> >> Niels Basjes <ni...@basjes.nl> 于2020年2月28日周五 上午4:56写道: >> >>> Hi, >>> >>> I have a problem with accessing my own S3 system from within Flink when >>> running on Kubernetes. >>> >>> *TL;DR* I have my own S3 (Ceph), Locally my application works, when >>> running in K8s it fails with >>> >>> Caused by: com.amazonaws.SdkClientException: Unable to load credentials >>> from service endpoint >>> Caused by: java.net.SocketException: Network is unreachable (connect >>> failed) >>> >>> >>> I have my own Kubernetes cluster (1.17) on which I have install Ceph and >>> the S3 gateway that is included in there. >>> I have put a file on this 'S3' and in my Flink 1.10.0 application I do >>> this: >>> >>> StreamExecutionEnvironment senv = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> final Configuration conf = new Configuration(); >>> >>> conf.setString("presto.s3.endpoint", "s3.example.nl"); >>> >>> conf.setString("presto.s3.access-key", "myAccessKey"); >>> >>> conf.setString("presto.s3.secret-key", "mySecretKey"); >>> >>> FileSystem.initialize(conf, null); >>> >>> senv.setParallelism(2); >>> >>> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> >>> DataStream<String> rawInputStream = senv >>> >>> .readTextFile(path).name("Read input"); >>> >>> ... >>> >>> >>> The s3.example.nl is the hostname of the ingress I have attached to the >>> S3 endpoint. In my case it is accessible via both http and https (with a >>> valid LetsEncrypt certificate). >>> >>> When I run this locally from within IntelliJ it works like a charm, >>> reads the data, does some stuff with it and then writes it to ElasticSearch. >>> >>> I have created an additional layer to enable the fs-s3-presto plugin >>> with this Dockerfile. >>> >>> >>> FROM flink:1.10.0-scala_2.12 >>> RUN mkdir /opt/flink/plugins/s3-fs-presto && cp >>> /opt/flink/opt/flink-s3-fs-presto* /opt/flink/plugins/s3-fs-presto >>> >>> >>> I run flink with this customized docker image like this >>> >>> >>> #!/bin/bash >>> ./flink-1.10.0/bin/kubernetes-session.sh \ >>> -Dkubernetes.cluster-id=flink1100 \ >>> -Dtaskmanager.memory.process.size=8192m \ >>> -Dkubernetes.taskmanager.cpu=2 \ >>> -Dtaskmanager.numberOfTaskSlots=8 \ >>> -Dresourcemanager.taskmanager-timeout=3600000 \ >>> -Dkubernetes.container.image= >>> docker.example.nl/flink:1.10.0-2.12-s3-presto >>> >>> >>> I then submit this into Kubernetes with this command >>> >>> flink run -e kubernetes-session -Dkubernetes.cluster-id=flink1100 >>> target/flink-table-esloader-0.1-SNAPSHOT.jar >>> >>> >>> The job starts and after about 40 seconds the job fails with this >>> exception: >>> >>> *Caused by: com.amazonaws.SdkClientException: Unable to load credentials >>> from service endpoint* >>> at >>> com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183) >>> at >>> com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162) >>> at >>> com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82) >>> at >>> com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151) >>> at >>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164) >>> at >>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762) >>> at >>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724) >>> at >>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) >>> at >>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) >>> at >>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) >>> at >>> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) >>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) >>> at >>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) >>> at >>> com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086) >>> at >>> com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060) >>> at >>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309) >>> at >>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) >>> at >>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1264) >>> at >>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1239) >>> at >>> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:563) >>> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) >>> at >>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560) >>> at >>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311) >>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) >>> at >>> org.apache.flink.fs.s3presto.common.HadoopFileSystem.exists(HadoopFileSystem.java:152) >>> at >>> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:143) >>> at >>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:197) >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >>> at >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) >>> *Caused by: java.net.SocketException: Network is unreachable (connect >>> failed)* >>> at java.net.PlainSocketImpl.socketConnect(Native Method) >>> at >>> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) >>> at >>> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) >>> at >>> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) >>> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) >>> at java.net.Socket.connect(Socket.java:607) >>> at sun.net.NetworkClient.doConnect(NetworkClient.java:175) >>> at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) >>> at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) >>> at sun.net.www.http.HttpClient.<init>(HttpClient.java:242) >>> at sun.net.www.http.HttpClient.New(HttpClient.java:339) >>> at sun.net.www.http.HttpClient.New(HttpClient.java:357) >>> at >>> sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226) >>> at >>> sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1205) >>> at >>> sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056) >>> at >>> sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990) >>> at >>> com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54) >>> at >>> com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108) >>> at >>> com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79) >>> at >>> com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174) >>> at >>> com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122) >>> ... 28 more >>> >>> >>> I have tried this with >>> >>> conf.setString("presto.s3.endpoint", "s3.example.nl"); >>> >>> and also by using the ClusterIP and the LoadBalancer IP and I get the >>> same error in all cases. >>> >>> I have verified by logging in into the task manager pod that all of >>> these endpoints show a sensible result when simply doing a curl from the >>> commandline. >>> >>> I have the s3cmd installed locally on my laptop. >>> My ~/.s3cfg looks like this and I can fully access this S3 setup. >>> >>> >>> [default] >>> access_key = myAccessKey >>> secret_key = mySecretKey >>> host_base = s3.example.nl >>> >>> >>> *I'm stuck, please help:* >>> >>> - What is causing the differences in behaviour between local and in >>> k8s? It works locally but not in the cluster. >>> - How do I figure out what network it is trying to reach in k8s? >>> >>> >>> Thanks. >>> -- >>> Best regards / Met vriendelijke groeten, >>> >>> Niels Basjes >>> >> > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes >