Hi Nathan, Job submissions for FlinkSessionJob resources will always be done by first uploading the JAR file itself from the Operator pod using the JobManager's REST API, then starting a new job using the uploaded JAR. This means that downloading the JAR file with an initContainer to the JobManager will not help in your case.
You could look into the Operator config option 'kubernetes.operator.user.artifacts.http.header' to set the HTTP headers used to download the artifacts. Please check FLINK-27483 [1] for more information. [1] https://issues.apache.org/jira/browse/FLINK-27483 Regards, Mate Czagany Nathan T. A. Lewis <nat...@sourcespectrum.com> ezt írta (időpont: 2024. máj. 9., Cs, 19:00): > Hello, > > I am trying to run a Flink Session Job with a jar that is hosted on a > maven repository in Google's Artifact Registry. > > The first thing I tried was to just specify the `jarURI` directly: > > apiVersion: flink.apache.org/v1beta1 > kind: FlinkSessionJob > metadata: > name: myJobName > spec: > deploymentName: flink-session > job: > jarURI: " > https://mylocation-maven.pkg.dev/myGCPproject/myreponame/path/to/the.jar" > entryClass: myentryclass > parallelism: 1 > upgradeMode: savepoint > > But, since it is a private repository, it not surprisingly resulted in: > > java.io.IOException: Server returned HTTP response code: 401 for URL: > https://mylocation-maven.pkg.dev/myGCPproject/myreponame/path/to/the.jar > > I didn't see anywhere in the FlinkSessionJob definition to put a bearer > token and doubt it would be a good idea security-wise to store one there > anyway, so I instead looked into using `initContainers` on the > FlinkDeployment like in this example: > https://github.com/apache/flink-kubernetes-operator/blob/main/examples/pod-template.yaml > > apiVersion: flink.apache.org/v1beta1 > kind: FlinkDeployment > metadata: > name: flink-session > spec: > flinkVersion: v1_18 > flinkConfiguration: > taskmanager.numberOfTaskSlots: "2" > state.checkpoints.dir: mycheckpointsdir > state.savepoints.dir: mysavepointsdir > state.backend: rocksdb > state.backend.rocksdb.timer-service.factory: ROCKSDB > state.backend.incremental: "true" > execution.checkpointing.interval: "1m" > serviceAccount: flink > jobManager: > resource: > memory: "2048m" > cpu: 0.5 > taskManager: > resource: > memory: "2048m" > cpu: 1 > podTemplate: > spec: > initContainers: > - name: gcloud > image: google/cloud-sdk:latest > volumeMounts: > - mountPath: /opt/flink/downloads > name: downloads > command: ["sh", "-c", "gcloud artifacts files download > --project=myGCPproject --repository=myreponame --location=mylocation > --destination=/opt/flink/downloads path/to/the.jar"] > containers: > - name: flink-main-container > volumeMounts: > - mountPath: /opt/flink/downloads > name: downloads > volumes: > - name: downloads > emptyDir: { } > > This worked well for getting the jar onto the jobManager pod, but it looks > like the FlinkSessionJob actually looks for the jar on the pod of the Flink > Kubernetes Operator itself. So in the end, the job still isn't being run. > > As a workaround for now, I'm planning to move my jar from Maven to a > Google Cloud Storage bucket and then add the gcs filesystem plugin to the > operator image. What I'd love to know is if I've overlooked some already > implemented way to connect to a private maven repository for a > FlinkSessionJob. I suppose in a worst case, we could write a filesystem > plugin that handles the `artifactrepository://` scheme and uses Google's > java libraries to handle authentication and download of the artifact. > Again, I'm kind of hoping something already exists though, rather than > having to build something new. > > > Best regards, > Nathan T.A. Lewis >