Aitozi commented on code in PR #20671: URL: https://github.com/apache/flink/pull/20671#discussion_r953258798
########## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/artifact/HttpArtifactFetcher.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.artifact; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.FilenameUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Map; + +/** Download the jar from the http resource. */ +public class HttpArtifactFetcher implements ArtifactFetcher { + + public static final Logger LOG = LoggerFactory.getLogger(HttpArtifactFetcher.class); + public static final HttpArtifactFetcher INSTANCE = new HttpArtifactFetcher(); + + @Override + public File fetch(String uri, Configuration flinkConfiguration, File targetDir) + throws Exception { + long start = System.currentTimeMillis(); + URL url = new URL(uri); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + + // merged session job level header and cluster level header, session job level header take Review Comment: remove this ########## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java: ########## @@ -65,9 +71,13 @@ public static void main(final String[] args) { final Configuration configuration = KubernetesEntrypointUtils.loadConfiguration(dynamicParameters); + PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration); Review Comment: I think it's better to put these also in the try catch block ########## docs/layouts/shortcodes/generated/kubernetes_config_configuration.html: ########## @@ -260,5 +260,17 @@ <td>Integer</td> <td>Defines the number of Kubernetes transactional operation retries before the client gives up. For example, <code class="highlighter-rouge">FlinkKubeClient#checkAndUpdateConfigMap</code>.</td> </tr> + <tr> + <td><h5>kubernetes.user.artifacts.base.dir</h5></td> + <td style="word-wrap: break-word;">"/opt/flink/artifacts"</td> + <td>String</td> + <td>The base dir to put the application job artifacts.</td> + </tr> + <tr> + <td><h5>kubernetes.operator.user.artifacts.http.header</h5></td> Review Comment: The option is not suitable with `operator` word here. ########## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java: ########## @@ -498,6 +498,20 @@ public class KubernetesConfigOptions { "Whether to enable HostNetwork mode. " + "The HostNetwork allows the pod could use the node network namespace instead of the individual pod network namespace. Please note that the JobManager service account should have the permission to update Kubernetes service."); + public static final ConfigOption<Map<String, String>> KUBERNETES_USER_JAR_ARTIFACT_HTTP_HEADER = + ConfigOptions.key("kubernetes.user.artifacts.http.header") + .mapType() + .noDefaultValue() + .withDescription( + "Custom HTTP header for HttpArtifactFetcher. The header will be applied when getting the application job artifacts. " + + "Expected format: headerKey1:headerValue1,headerKey2:headerValue2."); + + public static final ConfigOption<String> KUBERNETES_USER_ARTIFACTS_BASE_DIR = + ConfigOptions.key("kubernetes.user.artifacts.base.dir") + .stringType() + .defaultValue("/opt/flink/artifacts") Review Comment: I have a sense that we could also mount an empty dir for this path. By this, if the container restart, the jar resource will kept. It can be optimized later. ########## docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md: ########## @@ -97,14 +97,34 @@ COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar After creating and publishing the Docker image under `custom-image-name`, you can start an Application cluster with the following command: ```bash +# Local Schema $ ./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=custom-image-name \ local:///opt/flink/usrlib/my-flink-job.jar + +# FileSystem +$ ./bin/flink run-application \ + --target kubernetes-application \ + -Dkubernetes.cluster-id=my-first-application-cluster \ + -Dkubernetes.container.image=custom-image-name \ + s3://my-bucket/my-flink-job.jar + +# Http/Https Schema +$ ./bin/flink run-application \ + --target kubernetes-application \ + -Dkubernetes.cluster-id=my-first-application-cluster \ + -Dkubernetes.container.image=custom-image-name \ + http://ip:port/my-flink-job.jar ``` +{{< hint info >}} +Now, The jar package supports reading from the [flink filesystem]({{< ref "docs/deployment/filesystems/overview" >}}#docker-hub-flink-images) or Http/Https in Application Mode. +The jar package will be downloaded from filesystem to +[kubernetes.user.artifacts.base.dir]({{< ref "docs/deployment/config" >}}#kubernetes-user-artifacts-base-dir)/[kubernetes.namespace]({{< ref "docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image. +{{< /hint >}} +<span class="label label-info">Note</span> `local` schema is also supported . Review Comment: if we use the non-local URI, we do not have to set the init container in pod template (as shown in [link](https://github.com/apache/flink/blob/dca656b922b6c6d350301d8b8f3e60f87b03c381/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md#example-of-pod-template)) Or we can add comments here to clarify that: if a local URI provided, the jar must be provided in the image or download by a init container. ########## docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md: ########## @@ -97,14 +97,34 @@ COPY /path/of/my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar After creating and publishing the Docker image under `custom-image-name`, you can start an Application cluster with the following command: ```bash +# Local Schema $ ./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=custom-image-name \ local:///opt/flink/usrlib/my-flink-job.jar + +# FileSystem +$ ./bin/flink run-application \ + --target kubernetes-application \ + -Dkubernetes.cluster-id=my-first-application-cluster \ + -Dkubernetes.container.image=custom-image-name \ + s3://my-bucket/my-flink-job.jar + +# Http/Https Schema +$ ./bin/flink run-application \ + --target kubernetes-application \ + -Dkubernetes.cluster-id=my-first-application-cluster \ + -Dkubernetes.container.image=custom-image-name \ + http://ip:port/my-flink-job.jar ``` +{{< hint info >}} +Now, The jar package supports reading from the [flink filesystem]({{< ref "docs/deployment/filesystems/overview" >}}#docker-hub-flink-images) or Http/Https in Application Mode. +The jar package will be downloaded from filesystem to +[kubernetes.user.artifacts.base.dir]({{< ref "docs/deployment/config" >}}#kubernetes-user-artifacts-base-dir)/[kubernetes.namespace]({{< ref "docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image. +{{< /hint >}} +<span class="label label-info">Note</span> `local` schema is also supported . Review Comment: Besides, I think the `file` schema is not supported (because it's in the client side) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org