Hey folks, outside of Kubernetes things are great yep, with the same generated files.
 
So to share what I'm doing a little more... and I've modified things to be more inline with the current docs
 
keytool -genkeypair -alias flink.internal -keystore internal.keystore -dname "CN=flink.internal" -storepass internal_store_password -keyalg RSA -keysize 4096 -storetype PKCS12
keytool -genkeypair -alias flink.rest -keystore rest.keystore -dname "CN=myhost.company.org" -ext "SAN=dns:myhost.company.org,ip:127.0.0.1" -storepass rest_keystore_password -keyalg RSA -keysize 4096 -storetype PKCS12
keytool -exportcert -keystore rest.keystore -alias flink.rest -storepass rest_keystore_password -file flink.cer
keytool -importcert -keystore rest.truststore -alias flink.rest -storepass rest_truststore_password -file flink.cer -noprompt
kubectl delete secret flink-tls-secret-2
# Create the simpler secret from main docs for Flink
cat << EOF | kubectl create -n abp -f -
  apiVersion: v1
  kind: Secret
  type: Opaque
  metadata:
    name: flink-tls-secret-2
  data:
    rest.keystore: $(cat ./rest.keystore | base64 | tr -d '\n')
    rest.truststore: $(cat ./rest.truststore | base64 | tr -d '\n')
    internal.keystore: $(cat ./internal.keystore | base64 | tr -d '\n')
    internal.truststore: $(cat ./internal.keystore | base64 | tr -d '\n')
EOF
 
I run this script to get flink-tls-secret-2 with those files in, the keytool commands should be familiar since they're from the Flink 1.11 security docs).
 
Note I don't have a file called internal.truststore but neither do the docs, they mention file.truststore but don't tell me how that's made...maybe this is the problem? But things are fine with my normal Flink outside of Kubernetes set up.
 
The Job CustomResource does:

apiVersion: batch/v1
kind: Job
metadata:
  name: sample-job
  labels:
    app: flink-job
spec:
  template:
    spec:
      # Run as flink user
      securityContext:
        runAsUser: 9999
        runAsGroup: 9999
      containers:
      - name: wordcount
        # Replace this to be a Docker image with your built Flink app at a known location
        # Your build of Flink should be based on https://github.com/apache/flink-docker/tree/master/1.11/scala_2.12-java8-debian
        # with a modification to the Dockerfile to add your jar in (with a COPY)
        image: adamroberts/mycoolflink:latest
        - /opt/flink/bin/flink
        - run
        - -D security.ssl.internal.enabled=true
        - -D security.ssl.rest.enabled=true
        - -D security.ssl.rest.keystore=/etc/flink-secrets/rest.keystore
        - -D security.ssl.rest.truststore=/etc/flink-secrets/rest.truststore
        - -D security.ssl.rest.keystore-password=rest_keystore_password
        - -D security.ssl.rest.key-password=rest_keystore_password
        - -D security.ssl.rest.truststore-password=rest_truststore_password
        - -D security.ssl.internal.keystore=/etc/flink-secrets/internal.keystore
        - -D security.ssl.internal.truststore=/etc/flink-secrets/internal.keystore
        - -D security.ssl.internal.keystore-password=internal_store_password
        - -D security.ssl.internal.key-password=internal_store_password
        - -D security.ssl.internal.truststore-password=internal_store_password
        - -m
        - tls-flink-cluster-1-11-jobmanager:8081
        - /opt/flink/examples/batch/WordCount.jar 
        - --input 
        - /opt/flink/NOTICE
        volumeMounts:
          - name: flink-secret-volume
            mountPath: /etc/flink-secrets
      volumes:
      - name: flink-secret-volume
        secret:
          secretName: flink-tls-secret-2
      restartPolicy: Never
 
If I modify that to be a simple curl image but keeping the secrets mounted in, I can kubectl exec in and curl the JobManager at  tls-flink-cluster-1-11-jobmanager:8081 - I get no response, but I get an error if I go to a different port or URL.
 
The secrets do look ok inside the container too.
 
The Cluster spec looks like this now
 
apiVersion: flinkoperator.k8s.io/v1beta1
kind: FlinkCluster
metadata:
  name: tls-flink-cluster-1-11
spec:
  jobManager:
    volumeMounts:
    - name: flink-secret-volume
      mountPath: /etc/flink-secrets
    volumes:
    - name: flink-secret-volume
      secret:
        secretName: flink-tls-secret-2
    resources:
      limits:
        memory: 600Mi
        cpu: "1.0"
  taskManager:
    volumeMounts:
      - name: flink-secret-volume
        mountPath: /etc/flink-secrets
    volumes:
    - name: flink-secret-volume
      secret:
        secretName: flink-tls-secret-2
    replicas: 1
    resources:
      limits:
        memory: 1Gi
        cpu: "1.0"
  image:
    name: adamroberts/mycoolflink:latest
  flinkProperties:
    # https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-ssl.html is helpful for this part.
    web.submit.enable: "false"
    security.ssl.rest.enabled: "true"
    security.ssl.rest.keystore: "/etc/flink-secrets/rest.keystore"
    security.ssl.rest.truststore: "/etc/flink-secrets/rest.truststore"
    security.ssl.rest.keystore-password: "rest_keystore_password"
    security.ssl.rest.key-password: "rest_keystore_password"
    security.ssl.rest.truststore-password: "rest_truststore_password"
    security.ssl.internal.enabled: "true"
    security.ssl.internal.keystore: "/etc/flink-secrets/internal.keystore"
    security.ssl.internal.truststore: "/etc/flink-secrets/internal.keystore"
    security.ssl.internal.keystore-password: "internal_store_password"
    security.ssl.internal.key-password: "internal_store_password"
    security.ssl.internal.truststore-password: "internal_store_password"
    taskmanager.numberOfTaskSlots: "1"
    jobmanager.heap.size: ""                # set empty value (only for Flink version 1.11 or above)
    jobmanager.memory.process.size: 1gb   # job manager memory limit  (only for Flink version 1.11 or above)
    taskmanager.heap.size: ""               # set empty value
    taskmanager.memory.process.size: 1gb    # task manager memory limit
 
Cheers,
----- Original message -----
From: Andrey Zagrebin <azagre...@apache.org>
To: Adam Roberts <arobe...@uk.ibm.com>
Cc: nkru...@apache.org, user <user@flink.apache.org>
Subject: [EXTERNAL] Re: Example flink run with security options? Running on k8s in my case
Date: Wed, Aug 26, 2020 5:35 PM
 
Hi Adam,

maybe also check your SSL setup in a local cluster to exclude possibly related k8s things.

Best,
Andrey
 
On Wed, Aug 26, 2020 at 3:59 PM Adam Roberts <arobe...@uk.ibm.com> wrote:
Hey Nico - thanks for the prompt response, good catch - I've just tried with the two security options (enabling rest and internal SSL communications) and still hit the same problem
 
I've also tried turning off security (both in my Job definition and in my Flink cluster JobManager/TaskManager settings) and the communication does happen successfully, suggesting all is well otherwise.
 
With regards to testing with just a regular curl, I switched security back on and did the curl, using this:
 

openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in /etc/flink-secrets/flink-tls-keystore.key -out rest.pem -nodes

curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081

curl --cacert rest.pem --cert rest.pem tls-flink-cluster-1-11-jobmanager:8081

 
from the Job CR pod, which is who runs the flink run against my JobManager i'd like to connect to.
 
That gives 
 

$ openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in /etc/flink-secrets/flink-tls-keystore.key -out rest.pem -nodes

curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081

curl --cacert rest.pem --cert rest.pem tls-flink-cluster-1-11-jobmanager:8081139676043637888:error:0D07207B:asn1 encoding routines:ASN1_get_object:header too long:../crypto/asn1/asn1_lib.c:101:

so I wonder if my security set up itself is flawed...I'll be happy to share the scripting I have to do that if folks feel it'll be of use, thanks again
 
----- Original message -----
From: Nico Kruber <nkru...@apache.org>
To: user@flink.apache.org
Cc: Adam Roberts <arobe...@uk.ibm.com>
Subject: [EXTERNAL] Re: Example flink run with security options? Running on k8s in my case
Date: Wed, Aug 26, 2020 11:40 AM
 
Hi Adam,
the flink binary will pick up any configuration from the flink-conf.yaml of
its directory. If that is the same as in the cluster, you wouldn't have to
pass most of your parameters manually. However, if you prefer not having a
flink-conf.yaml in place, you could remove the security.ssl.internal.*
parameter from its call since those only affect internal communication.

If the client's connection to the JM is denied, you would actually have this
in the JM logs as well which you could check.

To check whether your whole setup works, I would suggest to try without
security enabled first and then enable it (just to rule out any other issues)

From the commands you mentioned, it looks like you're just missing
security.ssl.rest.enabled=true and because of that, the client would not use
SSL for the connection.

For more information and setup, I recommend reading through [1] which also
contains an example at the bottom of the page and how to use curl to test or
use the REST endpoint.


Nico


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-ssl.html 

On Tuesday, 25 August 2020 14:40:04 CEST Adam Roberts wrote:
> Hey everyone, I've been experimenting with Flink
> using https://github.com/GoogleCloudPlatform/flink-on-k8s-operator  and I
> believe I've successfully deployed a JobManager and TaskManager with
> security enabled, and a self-signed certificate (the pods come up great).
> However, I can't do much with this - I can't port-forward and access the UI,
> nor can I submit jobs to it by running another pod and using the DNS name
> lookup of the service.
> I always get
>  
> The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph.
> ...
>  
> Caused by: org.apache.flink.runtime.rest.ConnectionClosedException: Channel
> became inactive. ... 37 more
>  
>  
> and this is even with all of the -D security options provided.
>  
> The versions of Flink are the same for both my Job and my FlinkCluster
> (1.11.1).
> Is this a sensible thing to do? If I weren't using the operator for example,
> would users be expected to flink run with all of these options?
> Does anything look odd here? My guess is because security's on, the Job
> Manager refuses to talk to my submitter.
> Running as the flink user in the container, I do
>  
>
>       securityContext:
>
>         runAsUser: 9999
>
>         runAsGroup: 9999
>
>       containers:
>
>       - name: wordcount
>
>         image: adamroberts/mycoolflink:latest
>
>         args:
>
>         - /opt/flink/bin/flink
>
>         - run
>
>         - -D
>
>         -
> security.ssl.rest.keystore=/etc/flink-secrets/flink-tls-keystore.key
>
>         - -D
>
>         -
> security.ssl.rest.truststore=/etc/flink-secrets/flink-tls-ca-truststore.jks
>
>         - -D
>
>         - security.ssl.rest.keystore-password=thepass # Replace with value
> of flink-tls-keystore.password
>
>         - -D
>
>         - security.ssl.rest.key-password=thepass # Replace with value of
> tls.p12.password
>
>         - -D
>
>         - security.ssl.rest.truststore-password=thepass # Replace with value
> of flink-tls-ca.truststore.password
>
>         - -D
>
>         -
> security.ssl.internal.keystore=/etc/flink-secrets/flink-tls-keystore.key
>
>         - -D
>
>         -
> security.ssl.internal.truststore=/etc/flink-secrets/flink-tls-ca-truststore
> .jks
>
>         - -D
>
>         - security.ssl.internal.keystore-password=thepass # Replace with
> value of flink-tls-keystore.password
>
>         - -D
>
>         - security.ssl.internal.key-password=thepass # Replace with value of
> flink-tls-keystore.password
>
>         - -D
>
>         - security.ssl.internal.truststore-password=thepass # Replace with
> value of flink-tls-truststore.password
>
>         - -m
>
>         - tls-flink-cluster-1-11-jobmanager:8081
>
>         - /opt/flink/examples/batch/WordCount.jar
>
>         - --input
>
>         - /opt/flink/NOTICE
>
>  
> with the secrets mounted in at the above location (if I exec into my
> container, I can see they're all there OK). Note that it is a read-only
> file system.
> adamroberts/mycoolflink (at this time of this email) is just based
> on https://github.com/apache/flink-docker .
> Thanks!
>  
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> PO6 3AU



 
 
Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 741598.
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
 
 
Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 741598.
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU

Reply via email to