Hi, As I mentioned in my original email I already verified that the endpoints were accessible from the pods, that was not the problem.
It took me a while but I've figured out what went wrong. Setting the configuration like I did final Configuration conf = new Configuration(); conf.setString("presto.s3.endpoint", "s3.example.nl");conf.setString("presto.s3.access-key", "myAccessKey");conf.setString("presto.s3.secret-key", "mySecretKey");FileSystem.initialize(conf, null); sets it in some static variables that do not get serialized and shipped into the task managers. As a consequence, under the absence of credentials the AWS/S3 client assumes it is running inside AWS and that it can retrieve the credentials from http://169.254.170.2 (which is non routable) Because this is not AWS it cannot do this and I get the error it cannot connect. For now my solution is to start the Flink Session with this #!/bin/bash ./flink-1.10.0/bin/kubernetes-session.sh \ -Dkubernetes.cluster-id=flink1100 \ -Dtaskmanager.memory.process.size=8192m \ -Dkubernetes.taskmanager.cpu=2 \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dresourcemanager.taskmanager-timeout=3600000 \ -Dkubernetes.container.image=docker.example.nl/flink:1.10.0-2.12-s3-presto \ -Dpresto.s3.endpoint=s3.example.nl \ -Dpresto.s3.access-key=MyAccessKey \ -Dpresto.s3.secret-key=MySecretKey \ -Dpresto.s3.path.style.access=true I dislike this because now ALL jobs in this Flink cluster have the same credentials. Is there a way to set the S3 credentials on a per job or even per connection basis? Niels Basjes On Fri, Feb 28, 2020 at 4:38 AM Yang Wang <danrtsey...@gmail.com> wrote: > Hi Niels, > > Glad to hear that you are trying Flink native K8s integration and share > you feedback. > > What is causing the differences in behavior between local and in k8s? It >> works locally but not in the cluster. > > > In your case, the job could be executed successfully local. That means S3 > endpoint could be accessed in > your local network environment. When you submit the job to the K8s > cluster, the user `main()` will be executed > locally and get the job graph. Then it will be submitted to the cluster > for the execution. S3 endpoint will be > accessed under the K8s network. So maybe there is something wrong with the > network between taskmanager > and S3 endpoint. > > How do I figure out what network it is trying to reach in k8s? > > > I am not an expert of S3. So i am not sure whether the SDK will fetch the > credentials from S3 endpoint. If it is, > i think you need to find out which taskmanager the source operator is > running on. Then exec into the Pod and > use nslookup/curl to make sure the endpoint "s3.example.nl" could be > resolved and accessed successfully. > > > > Best, > Yang > > > Niels Basjes <ni...@basjes.nl> 于2020年2月28日周五 上午4:56写道: > >> Hi, >> >> I have a problem with accessing my own S3 system from within Flink when >> running on Kubernetes. >> >> *TL;DR* I have my own S3 (Ceph), Locally my application works, when >> running in K8s it fails with >> >> Caused by: com.amazonaws.SdkClientException: Unable to load credentials >> from service endpoint >> Caused by: java.net.SocketException: Network is unreachable (connect >> failed) >> >> >> I have my own Kubernetes cluster (1.17) on which I have install Ceph and >> the S3 gateway that is included in there. >> I have put a file on this 'S3' and in my Flink 1.10.0 application I do >> this: >> >> StreamExecutionEnvironment senv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> final Configuration conf = new Configuration(); >> >> conf.setString("presto.s3.endpoint", "s3.example.nl"); >> >> conf.setString("presto.s3.access-key", "myAccessKey"); >> >> conf.setString("presto.s3.secret-key", "mySecretKey"); >> >> FileSystem.initialize(conf, null); >> >> senv.setParallelism(2); >> >> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> >> DataStream<String> rawInputStream = senv >> >> .readTextFile(path).name("Read input"); >> >> ... >> >> >> The s3.example.nl is the hostname of the ingress I have attached to the >> S3 endpoint. In my case it is accessible via both http and https (with a >> valid LetsEncrypt certificate). >> >> When I run this locally from within IntelliJ it works like a charm, reads >> the data, does some stuff with it and then writes it to ElasticSearch. >> >> I have created an additional layer to enable the fs-s3-presto plugin with >> this Dockerfile. >> >> >> FROM flink:1.10.0-scala_2.12 >> RUN mkdir /opt/flink/plugins/s3-fs-presto && cp >> /opt/flink/opt/flink-s3-fs-presto* /opt/flink/plugins/s3-fs-presto >> >> >> I run flink with this customized docker image like this >> >> >> #!/bin/bash >> ./flink-1.10.0/bin/kubernetes-session.sh \ >> -Dkubernetes.cluster-id=flink1100 \ >> -Dtaskmanager.memory.process.size=8192m \ >> -Dkubernetes.taskmanager.cpu=2 \ >> -Dtaskmanager.numberOfTaskSlots=8 \ >> -Dresourcemanager.taskmanager-timeout=3600000 \ >> -Dkubernetes.container.image= >> docker.example.nl/flink:1.10.0-2.12-s3-presto >> >> >> I then submit this into Kubernetes with this command >> >> flink run -e kubernetes-session -Dkubernetes.cluster-id=flink1100 >> target/flink-table-esloader-0.1-SNAPSHOT.jar >> >> >> The job starts and after about 40 seconds the job fails with this >> exception: >> >> *Caused by: com.amazonaws.SdkClientException: Unable to load credentials >> from service endpoint* >> at >> com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183) >> at >> com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162) >> at >> com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82) >> at >> com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151) >> at >> com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164) >> at >> com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762) >> at >> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724) >> at >> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) >> at >> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) >> at >> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) >> at >> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) >> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) >> at >> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) >> at >> com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086) >> at >> com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060) >> at >> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309) >> at >> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) >> at >> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1264) >> at >> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1239) >> at >> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:563) >> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) >> at >> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560) >> at >> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311) >> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) >> at >> org.apache.flink.fs.s3presto.common.HadoopFileSystem.exists(HadoopFileSystem.java:152) >> at >> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:143) >> at >> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:197) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) >> *Caused by: java.net.SocketException: Network is unreachable (connect >> failed)* >> at java.net.PlainSocketImpl.socketConnect(Native Method) >> at >> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) >> at >> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) >> at >> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) >> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) >> at java.net.Socket.connect(Socket.java:607) >> at sun.net.NetworkClient.doConnect(NetworkClient.java:175) >> at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) >> at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) >> at sun.net.www.http.HttpClient.<init>(HttpClient.java:242) >> at sun.net.www.http.HttpClient.New(HttpClient.java:339) >> at sun.net.www.http.HttpClient.New(HttpClient.java:357) >> at >> sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226) >> at >> sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1205) >> at >> sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056) >> at >> sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990) >> at >> com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54) >> at >> com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108) >> at >> com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79) >> at >> com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174) >> at >> com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122) >> ... 28 more >> >> >> I have tried this with >> >> conf.setString("presto.s3.endpoint", "s3.example.nl"); >> >> and also by using the ClusterIP and the LoadBalancer IP and I get the >> same error in all cases. >> >> I have verified by logging in into the task manager pod that all of these >> endpoints show a sensible result when simply doing a curl from the >> commandline. >> >> I have the s3cmd installed locally on my laptop. >> My ~/.s3cfg looks like this and I can fully access this S3 setup. >> >> >> [default] >> access_key = myAccessKey >> secret_key = mySecretKey >> host_base = s3.example.nl >> >> >> *I'm stuck, please help:* >> >> - What is causing the differences in behaviour between local and in >> k8s? It works locally but not in the cluster. >> - How do I figure out what network it is trying to reach in k8s? >> >> >> Thanks. >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes >> > -- Best regards / Met vriendelijke groeten, Niels Basjes