Hi,

As I mentioned in my original email I already verified that the endpoints
were accessible from the pods, that was not the problem.

It took me a while but I've figured out what went wrong.

Setting the configuration like I did

final Configuration conf = new Configuration();
conf.setString("presto.s3.endpoint",
"s3.example.nl");conf.setString("presto.s3.access-key",
"myAccessKey");conf.setString("presto.s3.secret-key",
"mySecretKey");FileSystem.initialize(conf, null);

sets it in some static variables that do not get serialized and shipped
into the task managers.

As a consequence, under the absence of credentials the AWS/S3 client
assumes it is running inside AWS and that it can retrieve the credentials
from http://169.254.170.2  (which is non routable)
Because this is not AWS it cannot do this and I get the error it cannot
connect.

For now my solution is to start the Flink Session with this
#!/bin/bash
./flink-1.10.0/bin/kubernetes-session.sh \
  -Dkubernetes.cluster-id=flink1100 \
  -Dtaskmanager.memory.process.size=8192m \
  -Dkubernetes.taskmanager.cpu=2 \
  -Dtaskmanager.numberOfTaskSlots=4 \
  -Dresourcemanager.taskmanager-timeout=3600000 \
  -Dkubernetes.container.image=docker.example.nl/flink:1.10.0-2.12-s3-presto
\
  -Dpresto.s3.endpoint=s3.example.nl \
  -Dpresto.s3.access-key=MyAccessKey \
  -Dpresto.s3.secret-key=MySecretKey \
  -Dpresto.s3.path.style.access=true

I dislike this because now ALL jobs in this Flink cluster have the same
credentials.

Is there a way to set the S3 credentials on a per job or even per
connection basis?

Niels Basjes


On Fri, Feb 28, 2020 at 4:38 AM Yang Wang <danrtsey...@gmail.com> wrote:

> Hi Niels,
>
> Glad to hear that you are trying Flink native K8s integration and share
> you feedback.
>
> What is causing the differences in behavior between local and in k8s? It
>> works locally but not in the cluster.
>
>
> In your case, the job could be executed successfully local. That means S3
> endpoint could be accessed in
> your local network environment. When you submit the job to the K8s
> cluster, the user `main()` will be executed
> locally and get the job graph. Then it will be submitted to the cluster
> for the execution. S3 endpoint will be
> accessed under the K8s network. So maybe there is something wrong with the
> network between taskmanager
> and S3 endpoint.
>
> How do I figure out what network it is trying to reach in k8s?
>
>
> I am not an expert of S3. So i am not sure whether the SDK will fetch the
> credentials from S3 endpoint. If it is,
> i think you need to find out which taskmanager the source operator is
> running on. Then exec into the Pod and
> use nslookup/curl to make sure the endpoint "s3.example.nl" could be
> resolved and accessed successfully.
>
>
>
> Best,
> Yang
>
>
> Niels Basjes <ni...@basjes.nl> 于2020年2月28日周五 上午4:56写道:
>
>> Hi,
>>
>> I have a problem with accessing my own S3 system from within Flink when
>> running on Kubernetes.
>>
>> *TL;DR* I have my own S3 (Ceph), Locally my application works, when
>> running in K8s it fails with
>>
>> Caused by: com.amazonaws.SdkClientException: Unable to load credentials
>> from service endpoint
>> Caused by: java.net.SocketException: Network is unreachable (connect
>> failed)
>>
>>
>> I have my own Kubernetes cluster (1.17) on which I have install Ceph and
>> the S3 gateway that is included in there.
>> I have put a file on this 'S3' and in my Flink 1.10.0 application I do
>> this:
>>
>> StreamExecutionEnvironment senv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> final Configuration conf = new Configuration();
>>
>> conf.setString("presto.s3.endpoint",         "s3.example.nl");
>>
>> conf.setString("presto.s3.access-key",       "myAccessKey");
>>
>> conf.setString("presto.s3.secret-key",       "mySecretKey");
>>
>> FileSystem.initialize(conf, null);
>>
>> senv.setParallelism(2);
>>
>> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>> DataStream<String> rawInputStream = senv
>>
>>     .readTextFile(path).name("Read input");
>>
>> ...
>>
>>
>> The s3.example.nl is the hostname of the ingress I have attached to the
>> S3 endpoint. In my case it is accessible via both http and https (with a
>> valid LetsEncrypt certificate).
>>
>> When I run this locally from within IntelliJ it works like a charm, reads
>> the data, does some stuff with it and then writes it to ElasticSearch.
>>
>> I have created an additional layer to enable the fs-s3-presto plugin with
>> this Dockerfile.
>>
>>
>> FROM flink:1.10.0-scala_2.12
>> RUN mkdir /opt/flink/plugins/s3-fs-presto && cp
>> /opt/flink/opt/flink-s3-fs-presto* /opt/flink/plugins/s3-fs-presto
>>
>>
>> I run flink with this customized docker image like this
>>
>>
>> #!/bin/bash
>> ./flink-1.10.0/bin/kubernetes-session.sh \
>>   -Dkubernetes.cluster-id=flink1100 \
>>   -Dtaskmanager.memory.process.size=8192m \
>>   -Dkubernetes.taskmanager.cpu=2 \
>>   -Dtaskmanager.numberOfTaskSlots=8 \
>>   -Dresourcemanager.taskmanager-timeout=3600000 \
>>   -Dkubernetes.container.image=
>> docker.example.nl/flink:1.10.0-2.12-s3-presto
>>
>>
>> I then submit this into Kubernetes with this command
>>
>> flink run -e kubernetes-session -Dkubernetes.cluster-id=flink1100
>> target/flink-table-esloader-0.1-SNAPSHOT.jar
>>
>>
>> The job starts and after about 40 seconds the job fails with this
>> exception:
>>
>> *Caused by: com.amazonaws.SdkClientException: Unable to load credentials
>> from service endpoint*
>> at
>> com.amazonaws.auth.EC2CredentialsFetcher.handleError(EC2CredentialsFetcher.java:183)
>> at
>> com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:162)
>> at
>> com.amazonaws.auth.EC2CredentialsFetcher.getCredentials(EC2CredentialsFetcher.java:82)
>> at
>> com.amazonaws.auth.InstanceProfileCredentialsProvider.getCredentials(InstanceProfileCredentialsProvider.java:151)
>> at
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1164)
>> at
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762)
>> at
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724)
>> at
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>> at
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>> at
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>> at
>> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:5086)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:5060)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4309)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1264)
>> at
>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1239)
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:563)
>> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138)
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
>> at
>> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1734)
>> at
>> org.apache.flink.fs.s3presto.common.HadoopFileSystem.exists(HadoopFileSystem.java:152)
>> at
>> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.exists(PluginFileSystemFactory.java:143)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:197)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>> *Caused by: java.net.SocketException: Network is unreachable (connect
>> failed)*
>> at java.net.PlainSocketImpl.socketConnect(Native Method)
>> at
>> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
>> at
>> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
>> at
>> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
>> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>> at java.net.Socket.connect(Socket.java:607)
>> at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
>> at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
>> at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
>> at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
>> at sun.net.www.http.HttpClient.New(HttpClient.java:339)
>> at sun.net.www.http.HttpClient.New(HttpClient.java:357)
>> at
>> sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226)
>> at
>> sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1205)
>> at
>> sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
>> at
>> sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
>> at
>> com.amazonaws.internal.ConnectionUtils.connectToEndpoint(ConnectionUtils.java:54)
>> at
>> com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:108)
>> at
>> com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:79)
>> at
>> com.amazonaws.auth.InstanceProfileCredentialsProvider$InstanceMetadataCredentialsEndpointProvider.getCredentialsEndpoint(InstanceProfileCredentialsProvider.java:174)
>> at
>> com.amazonaws.auth.EC2CredentialsFetcher.fetchCredentials(EC2CredentialsFetcher.java:122)
>> ... 28 more
>>
>>
>> I have tried this with
>>
>> conf.setString("presto.s3.endpoint",         "s3.example.nl");
>>
>> and also by using the ClusterIP and the LoadBalancer IP and I get the
>> same error in all cases.
>>
>> I have verified by logging in into the task manager pod that all of these
>> endpoints show a sensible result when simply doing a curl from the
>> commandline.
>>
>> I have the s3cmd installed locally on my laptop.
>> My ~/.s3cfg looks like this and I can fully access this S3 setup.
>>
>>
>> [default]
>> access_key = myAccessKey
>> secret_key = mySecretKey
>> host_base = s3.example.nl
>>
>>
>> *I'm stuck, please help:*
>>
>>    - What is causing the differences in behaviour between local and in
>>    k8s? It works locally but not in the cluster.
>>    - How do I figure out what network it is trying to reach in k8s?
>>
>>
>> Thanks.
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to