Hi Niels, Glad to hear that you are trying Flink native K8s integration and share you feedback.
What is causing the differences in behavior between local and in k8s? It > works locally but not in the cluster. In your case, the job could be executed successfully local. That means S3 endpoint could be accessed in your local network environment. When you submit the job to the K8s cluster, the user `main()` will be executed locally and get the job graph. Then it will be submitted to the cluster for the execution. S3 endpoint will be accessed under the K8s network. So maybe there is something wrong with the network between taskmanager and S3 endpoint. How do I figure out what network it is trying to reach in k8s? I am not an expert of S3. So i am not sure whether the SDK will fetch the credentials from S3 endpoint. If it is, i think you need to find out which taskmanager the source operator is running on. Then exec into the Pod and use nslookup/curl to make sure the endpoint "s3.example.nl" could be resolved and accessed successfully. Best, Yang Niels Basjes <ni...@basjes.nl> 于2020年2月28日周五 上午4:56写道: > Hi, > > I have a problem with accessing my own S3 system from within Flink when > running on Kubernetes. > > *TL;DR* I have my own S3 (Ceph), Locally my application works, when > running in K8s it fails with > > Caused by: com.amazonaws.SdkClientException: Unable to load credentials > from service endpoint > Caused by: java.net.SocketException: Network is unreachable (connect > failed) > > > I have my own Kubernetes cluster (1.17) on which I have install Ceph and > the S3 gateway that is included in there. > I have put a file on this 'S3' and in my Flink 1.10.0 application I do > this: > > StreamExecutionEnvironment senv = > StreamExecutionEnvironment.getExecutionEnvironment(); > > final Configuration conf = new Configuration(); > > conf.setString("presto.s3.endpoint", "s3.example.nl"); > > conf.setString("presto.s3.access-key", "myAccessKey"); > > conf.setString("presto.s3.secret-key", "mySecretKey"); > > FileSystem.initialize(conf, null); > > senv.setParallelism(2); > > senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > DataStream<String> rawInputStream = senv > > .readTextFile(path).name("Read input"); > > ... > > > The s3.example.nl is the hostname of the ingress I have attached to the > S3 endpoint. In my case it is accessible via both http and https (with a > valid LetsEncrypt certificate). > > When I run this locally from within IntelliJ it works like a charm, reads > the data, does some stuff with it and then writes it to ElasticSearch. > > I have created an additional layer to enable the fs-s3-presto plugin with > this Dockerfile. > > > FROM flink:1.10.0-scala_2.12 > RUN mkdir /opt/flink/plugins/s3-fs-presto && cp > /opt/flink/opt/flink-s3-fs-presto* /opt/flink/plugins/s3-fs-presto > > > I run flink with this customized docker image like this > > > #!/bin/bash > ./flink-1.10.0/bin/kubernetes-session.sh \ > -Dkubernetes.cluster-id=flink1100 \ > -Dtaskmanager.memory.process.size=8192m \ > -Dkubernetes.taskmanager.cpu=2 \ > -Dtaskmanager.numberOfTaskSlots=8 \ > -Dresourcemanager.taskmanager-timeout=3600000 \ > -Dkubernetes.container.image=docker.example.nl/flink:1.10.0-2.12-s3-pres > to > > > I then submit this into Kubernetes with this command > > flink run -e kubernetes-session -Dkubernetes.cluster-id=flink1100 > target/flink-table-esloader-0.1-SNAPSHOT.jar > > > The job starts and after about 40 seconds the job fails with this > exception: > > *Caused by: com.amazonaws.SdkClientException: Unable to load credentials > from service endpoint* > at > com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183) > at > com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162) > at > com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82) > at > com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) > at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) > at > com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086) > at > com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060) > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309) > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) > at > com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1264) > at > com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1239) > at > com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:563) > at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) > at > com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560) > at > com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311) > at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734) > at > org.apache.flink.fs.s3presto.common.HadoopFileSystem.exists(HadoopFileSystem.java:152) > at > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:143) > at > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:197) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) > *Caused by: java.net.SocketException: Network is unreachable (connect > failed)* > at java.net.PlainSocketImpl.socketConnect(Native Method) > at > java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) > at > java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) > at > java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) > at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) > at java.net.Socket.connect(Socket.java:607) > at sun.net.NetworkClient.doConnect(NetworkClient.java:175) > at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) > at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) > at sun.net.www.http.HttpClient.<init>(HttpClient.java:242) > at sun.net.www.http.HttpClient.New(HttpClient.java:339) > at sun.net.www.http.HttpClient.New(HttpClient.java:357) > at > sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226) > at > sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1205) > at > sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056) > at > sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990) > at > com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54) > at > com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108) > at > com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79) > at > com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174) > at > com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122) > ... 28 more > > > I have tried this with > > conf.setString("presto.s3.endpoint", "s3.example.nl"); > > and also by using the ClusterIP and the LoadBalancer IP and I get the same > error in all cases. > > I have verified by logging in into the task manager pod that all of these > endpoints show a sensible result when simply doing a curl from the > commandline. > > I have the s3cmd installed locally on my laptop. > My ~/.s3cfg looks like this and I can fully access this S3 setup. > > > [default] > access_key = myAccessKey > secret_key = mySecretKey > host_base = s3.example.nl > > > *I'm stuck, please help:* > > - What is causing the differences in behaviour between local and in > k8s? It works locally but not in the cluster. > - How do I figure out what network it is trying to reach in k8s? > > > Thanks. > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes >