yuxiqian commented on code in PR #3093: URL: https://github.com/apache/flink-cdc/pull/3093#discussion_r1677720046
########## Dockerfile: ########## @@ -0,0 +1,33 @@ +#/* +# * Licensed to the Apache Software Foundation (ASF) under one or more +# * contributor license agreements. See the NOTICE file distributed with +# * this work for additional information regarding copyright ownership. +# * The ASF licenses this file to You under the Apache License, Version 2.0 +# * (the "License"); you may not use this file except in compliance with +# * the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ + +FROM flink + +ARG FLINK_CDC_VERSION=3.0-SNAPSHOT Review Comment: This could be bumped to `3.2` ########## flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/ComposeDeploymentFactory.java: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.composer.flink.deployment; + +import org.apache.flink.cdc.composer.PipelineDeploymentExecutor; + +import org.apache.commons.cli.CommandLine; + +/** Create deployment methods corresponding to different goals. */ +public class ComposeDeploymentFactory { + + public PipelineDeploymentExecutor getFlinkComposeExecutor(CommandLine commandLine) + throws Exception { + String target = commandLine.getOptionValue("target"); + if (target.equalsIgnoreCase("kubernetes-application")) { + return new K8SApplicationDeploymentExecutor(); + } + throw new Exception(String.format("target %s is not support", target)); Review Comment: ```suggestion throw new Exception(String.format("Deployment target %s is not supported", target)); ``` Throwing a more specific Exception type might be better ########## mysql-doris.yaml: ########## Review Comment: Same concern like `Dockerfile`, maybe it's not ideal to put it in root folder. ########## Dockerfile: ########## @@ -0,0 +1,33 @@ +#/* +# * Licensed to the Apache Software Foundation (ASF) under one or more +# * contributor license agreements. See the NOTICE file distributed with +# * this work for additional information regarding copyright ownership. +# * The ASF licenses this file to You under the Apache License, Version 2.0 +# * (the "License"); you may not use this file except in compliance with +# * the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ + +FROM flink + +ARG FLINK_CDC_VERSION=3.0-SNAPSHOT + +RUN mkdir -p /opt/flink-cdc +RUN mkdir -p /opt/flink/usrlib +ENV FLINK_CDC_HOME /opt/flink-cdc +COPY flink-cdc-dist/target/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz /tmp/ +RUN tar -xzvf /tmp/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz -C /tmp/ && \ + mv /tmp/flink-cdc-${FLINK_CDC_VERSION}/* /opt/flink-cdc/ && \ + rm -rf /tmp/flink-cdc-${FLINK_CDC_VERSION} /tmp/flink-cdc-${FLINK_CDC_VERSION}-bin.tar.gz +# copy jars to cdc libs +COPY flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/target/flink-cdc-pipeline-connector-values-${FLINK_CDC_VERSION}.jar /opt/flink/usrlib/flink-cdc-pipeline-connector-values-${FLINK_CDC_VERSION}.jar +COPY flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/target/flink-cdc-pipeline-connector-mysql-${FLINK_CDC_VERSION}.jar /opt/flink/usrlib/flink-cdc-pipeline-connector-mysql-${FLINK_CDC_VERSION}.jar Review Comment: Should other pipeline connectors (like Doris, used in `mysql-doris.yaml`) be copied, too? ########## Dockerfile: ########## Review Comment: Are `Dockerfile` & `mysql-doris` generic deployment configurations or just an example? Maybe somewhere like `/example` or `/conf` is better than putting them at root? ########## flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentExecutor.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.composer.flink.deployment; + +import org.apache.flink.cdc.composer.PipelineDeploymentExecutor; +import org.apache.flink.cdc.composer.PipelineExecution; +import org.apache.flink.client.cli.CliFrontend; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ClusterClientProvider; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.kubernetes.KubernetesClusterClientFactory; +import org.apache.flink.kubernetes.KubernetesClusterDescriptor; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; + +import org.apache.commons.cli.CommandLine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +/** deploy flink cdc job by native k8s application mode. */ +public class K8SApplicationDeploymentExecutor implements PipelineDeploymentExecutor { + + private static final Logger LOG = + LoggerFactory.getLogger(K8SApplicationDeploymentExecutor.class); + + @Override + public PipelineExecution.ExecutionInfo deploy( + CommandLine commandLine, Configuration flinkConfig, List<Path> additionalJars) { + LOG.info("Submitting application in 'Flink K8S Application Mode'."); + flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName()); + List<String> jars = new ArrayList<>(); + if (flinkConfig.get(PipelineOptions.JARS) == null){ + // must be added cdc dist jar by default docker container path + jars.add("local:///opt/flink-cdc/lib/flink-cdc-dist-3.1.jar"); Review Comment: Is it ok to hard-encode version tag here? ########## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontendOptions.java: ########## @@ -46,6 +46,16 @@ public class CliFrontendOptions { .desc("JARs to be submitted together with the pipeline") .build(); + public static final Option TARGET = + Option.builder("t") + .longOpt("target") + .hasArg() + .desc( + "The deployment target for the execution. This can take one of the following values " Review Comment: +1 for adding related docs in this PR since FLINK-34677 has added a new section `deployment/kubernetes` for Kubernetes deployment. ########## flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java: ########## @@ -176,6 +189,10 @@ private String globalPipelineConfig() throws Exception { + " was triggered.\n" + " -s,--from-savepoint <arg> Path to a savepoint to restore the job from\n" + " (for example hdfs:///flink/savepoint-1537\n" + + " -t,--target <arg> The deployment target for the execution. This\n" + + " can take one of the following values\n" + + " local/remote/yarn-session/yarn-application/ku\n" + + " bernetes-session/kubernetes-application\n" Review Comment: It would be easier to read if we don't break line between enum values. ########## flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/deployment/K8SApplicationDeploymentITCase.java: ########## Review Comment: Is there any test cases for K8s deployment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org