Copilot commented on code in PR #4294:
URL: https://github.com/apache/flink-cdc/pull/4294#discussion_r2903036475


##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java:
##########
@@ -117,35 +122,109 @@ public void setup(
 
     @Override
     public void open() throws Exception {
-        
this.<AbstractStreamOperator<CommittableMessage<CommT>>>getFlinkWriterOperator().open();
+        invokeFlinkWriterOperatorMethod("open", new Class<?>[0]);
         copySinkWriter = getFieldValue("sinkWriter");
+        ensureEmitDownstream();
+    }
+
+    /**
+     * Flink 2.2 {@code SinkWriterOperator} sets {@code emitDownstream = sink 
instanceof
+     * SupportsCommitter}. Sinks like Paimon implement the older {@code 
TwoPhaseCommittingSink}
+     * rather than Flink 2.x {@code SupportsCommitter}, so committables are 
silently discarded. This
+     * method forces the flag to {@code true} and fills in the committable 
serializer so that the
+     * wrapped operator properly emits committables to the downstream 
committer.
+     *
+     * <p>This is only applied when the sink actually supports committing 
(implements either {@code
+     * TwoPhaseCommittingSink} or {@code SupportsCommitter}). For sinks that 
do not support
+     * committing, forcing {@code emitDownstream = true} would cause a {@link 
ClassCastException}
+     * when the wrapped operator tries to cast the writer to {@code 
CommittingSinkWriter}.
+     */
+    private void ensureEmitDownstream() {
+        if (!sinkSupportsCommitting()) {
+            return;
+        }
+        try {
+            Field emitField = findField(flinkWriterOperator.getClass(), 
"emitDownstream");
+            if (emitField == null) {
+                return;
+            }
+            emitField.setAccessible(true);
+            if (!emitField.getBoolean(flinkWriterOperator)) {
+                emitField.setBoolean(flinkWriterOperator, true);
+
+                Field serField = findField(flinkWriterOperator.getClass(), 
"committableSerializer");
+                if (serField != null) {
+                    serField.setAccessible(true);
+                    if (serField.get(flinkWriterOperator) == null) {
+                        try {
+                            Method getSerializer =
+                                    
sink.getClass().getMethod("getCommittableSerializer");
+                            getSerializer.setAccessible(true);
+                            serField.set(flinkWriterOperator, 
getSerializer.invoke(sink));
+                        } catch (NoSuchMethodException ignored) {
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Could not force emitDownstream on wrapped 
SinkWriterOperator", e);
+        }

Review Comment:
   `LOG` is referenced (e.g., in ensureEmitDownstream and handleFlushEvent) but 
no logger is declared/imported in this class, so this won't compile. Add a 
`private static final Logger LOG = 
LoggerFactory.getLogger(DataSinkWriterOperator.class);` (and imports), or 
remove the logging statements.



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/TarballFetcher.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.utils;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.MountableFile;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+/** Obtain and downloads corresponding Flink CDC tarball files. */
+public abstract class TarballFetcher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TarballFetcher.class);
+
+    public static void fetchAll(GenericContainer<?> container) throws 
Exception {
+        fetch(container, CdcVersion.values());
+    }
+
+    public static void fetchLatest(GenericContainer<?> container) throws 
Exception {
+        fetch(container, CdcVersion.SNAPSHOT);
+    }
+
+    public static void fetch(GenericContainer<?> container, CdcVersion... 
versions)
+            throws Exception {
+        for (CdcVersion version : versions) {
+            TarballFetcher.fetchInternal(container, version);
+        }
+    }
+
+    private static void fetchInternal(GenericContainer<?> container, 
CdcVersion version)
+            throws Exception {
+        LOG.info("Trying to download CDC tarball @ {}...", version);
+        if (CdcVersion.SNAPSHOT.equals(version)) {
+            LOG.info("CDC {} is a snapshot version, we should fetch it 
locally...", version);
+
+            container.copyFileToContainer(
+                    MountableFile.forHostPath(
+                            TestUtils.getResource("flink-cdc.sh", 
"flink-cdc-dist", "src"), 0755),
+                    version.workDir() + "/bin/flink-cdc.sh");
+            container.copyFileToContainer(
+                    MountableFile.forHostPath(
+                            TestUtils.getResource("flink-cdc.yaml", 
"flink-cdc-dist", "src"), 0755),
+                    version.workDir() + "/conf/flink-cdc.yaml");
+            container.copyFileToContainer(
+                    
MountableFile.forHostPath(TestUtils.getResource("flink-cdc-dist.jar")),
+                    version.workDir() + "/lib/flink-cdc-dist.jar");
+            container.copyFileToContainer(
+                    MountableFile.forHostPath(
+                            
TestUtils.getResource("values-cdc-pipeline-connector.jar")),
+                    version.workDir() + 
"/lib/values-cdc-pipeline-connector.jar");
+
+        } else {
+            LOG.info("CDC {} is a released version, download it from the 
Internet...", version);
+
+            String containerPath = "/tmp/tarball/" + version.getVersion() + 
".tar.gz";
+            downloadAndCopyToContainer(container, version.tarballUrl(), 
containerPath);
+            container.execInContainer("mkdir", "-p", version.workDir());
+            container.execInContainer(
+                    "tar", "-xzvf", containerPath, "-C", version.workDir(), 
"--strip-components=1");
+
+            downloadAndCopyToContainer(
+                    container,
+                    version.connectorJarUrl("values"),
+                    version.workDir() + 
"/lib/values-cdc-pipeline-connector.jar");
+        }
+    }
+
+    private static void downloadAndCopyToContainer(
+            GenericContainer<?> container, String url, String containerPath) 
throws Exception {
+        Path tempFile = Files.createTempFile("download-", ".tmp");
+        FileUtils.copyURLToFile(
+                new URL(url),
+                tempFile.toFile(),
+                (int) Duration.ofMinutes(1).toMillis(),
+                (int) Duration.ofMinutes(5).toMillis());
+        container.copyFileToContainer(MountableFile.forHostPath(tempFile), 
containerPath);
+    }

Review Comment:
   `downloadAndCopyToContainer` creates a temporary file but never deletes it. 
In CI (especially with retries / multiple versions) this can accumulate and 
exhaust disk space. Delete the temp file in a finally block (after copying into 
the container).



##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java:
##########
@@ -98,20 +118,20 @@ public void translate(
             OperatorUidGenerator operatorUidGenerator) {
         // Get sink provider
         EventSinkProvider eventSinkProvider = dataSink.getEventSinkProvider();
+        if (eventSinkProvider == null) {
+            return;

Review Comment:
   `DataSink#getEventSinkProvider()` is a required contract and not annotated 
as nullable; silently returning when it is null will produce a pipeline with no 
sink and no error. Consider failing fast here (e.g., throw a 
ValidationException/IllegalStateException) so misconfigured sinks are caught 
early.
   ```suggestion
               throw new IllegalStateException(
                       "DataSink#getEventSinkProvider() must not return null. "
                               + "Please ensure the sink is correctly 
configured.");
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSink.java:
##########
@@ -67,12 +68,7 @@ public ValuesDataSink(
 
     @Override
     public EventSinkProvider getEventSinkProvider() {
-        if (SinkApi.SINK_V2.equals(sinkApi)) {
-            return FlinkSinkProvider.of(new ValuesSink(materializedInMemory, 
print));
-        } else {
-            return FlinkSinkFunctionProvider.of(
-                    new ValuesDataSinkFunction(materializedInMemory, print));
-        }
+        return FlinkSinkProvider.of(new ValuesSink(materializedInMemory, 
print));
     }

Review Comment:
   `sinkApi` is still accepted/configurable (and `SINK_FUNCTION` is still part 
of the public option), but the implementation now always returns a Sink V2 
provider and never uses `sinkApi`. This makes `sink.api=SINK_FUNCTION` a silent 
no-op. Either remove the option/field or validate the config and fail fast when 
`SINK_FUNCTION` is selected (or implement the legacy path via reflection).



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java:
##########
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.cdc.common.exceptions.ValidationException;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.util.TestLogger;
+
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.command.ExecCreateCmdResponse;
+import com.github.dockerjava.api.model.Volume;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.Container.ExecResult;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.FrameConsumerResultCallback;
+import org.testcontainers.containers.output.OutputFrame;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.output.ToStringConsumer;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.MountableFile;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Test environment running pipeline job on Flink containers. */
+@Testcontainers
+public abstract class PipelineTestEnvironment extends TestLogger {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PipelineTestEnvironment.class);
+
+    protected Integer parallelism = getParallelism();
+
+    private int getParallelism() {
+        try {
+            return 
Integer.parseInt(System.getProperty("specifiedParallelism"));
+        } catch (NumberFormatException ex) {
+            LOG.warn(
+                    "Unable to parse specified parallelism configuration ({} 
provided). Use 4 by default.",
+                    System.getProperty("specifiedParallelism"));
+            return 4;
+        }
+    }
+
+    // 
------------------------------------------------------------------------------------------
+    // MySQL Variables (we always use MySQL as the data source for easier 
verifying)
+    // 
------------------------------------------------------------------------------------------
+    protected static final String MYSQL_TEST_USER = "mysqluser";
+    protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+    protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql";
+    protected static final Duration EVENT_WAITING_TIMEOUT = 
Duration.ofMinutes(3);
+    protected static final Duration STARTUP_WAITING_TIMEOUT = 
Duration.ofMinutes(5);
+
+    public static final Network NETWORK = Network.newNetwork();
+
+    @Container
+    protected static final MySqlContainer MYSQL =
+            (MySqlContainer)
+                    new MySqlContainer(MySqlVersion.V8_0)
+                            .withConfigurationOverride("docker/mysql/my.cnf")
+                            .withSetupSQL("docker/mysql/setup.sql")
+                            .withDatabaseName("flink-test")
+                            .withUsername("flinkuser")
+                            .withPassword("flinkpw")
+                            .withNetwork(NETWORK)
+                            .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
+                            .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+    // 
------------------------------------------------------------------------------------------
+    // Flink Variables
+    // 
------------------------------------------------------------------------------------------
+    protected static final int JOB_MANAGER_REST_PORT = 8081;
+    protected static final String INTER_CONTAINER_JM_ALIAS = "jobmanager";
+    protected static final String INTER_CONTAINER_TM_ALIAS = "taskmanager";
+    protected static final List<String> EXTERNAL_PROPS =
+            Arrays.asList(
+                    String.format("jobmanager.rpc.address: %s", 
INTER_CONTAINER_JM_ALIAS),
+                    "jobmanager.bind-host: 0.0.0.0",
+                    "taskmanager.bind-host: 0.0.0.0",
+                    "rest.bind-address: 0.0.0.0",
+                    "rest.address: 0.0.0.0",
+                    "jobmanager.memory.process.size: 1GB",
+                    "query.server.port: 6125",
+                    "blob.server.port: 6124",
+                    "taskmanager.numberOfTaskSlots: 10",
+                    "parallelism.default: 4",
+                    "execution.checkpointing.interval: 300",
+                    "state.backend.type: hashmap",
+                    "env.java.default-opts.all: 
--add-exports=java.base/sun.net.util=ALL-UNNAMED 
--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED 
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED 
--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
--add-opens=java.base/java.text=ALL-UNNAMED 
--add-opens=java.base/java.time=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-op
 ens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED 
--add-opens=java.base/jdk.internal.loader=ALL-UNNAMED 
--add-opens=java.base/java.security=ALL-UNNAMED 
--add-exports=java.base/sun.net.www=ALL-UNNAMED 
-Doracle.jdbc.timezoneAsRegion=false",
+                    "execution.checkpointing.savepoint-dir: file:///opt/flink",
+                    "restart-strategy.type: off",
+                    "pekko.ask.timeout: 60s",
+                    // Set off-heap memory explicitly to avoid 
"java.lang.OutOfMemoryError: Direct
+                    // buffer memory" error.
+                    "taskmanager.memory.task.off-heap.size: 128mb",
+                    // Fix `java.lang.OutOfMemoryError: Metaspace. The 
metaspace out-of-memory error
+                    // has occurred` error.
+                    "taskmanager.memory.jvm-metaspace.size: 512mb");
+    public static final String FLINK_PROPERTIES = String.join("\n", 
EXTERNAL_PROPS);
+
+    @Nullable protected RestClusterClient<StandaloneClusterId> 
restClusterClient;
+
+    protected GenericContainer<?> jobManager;
+    protected GenericContainer<?> taskManager;
+    protected Volume sharedVolume = new Volume("/tmp/shared");
+
+    protected ToStringConsumer jobManagerConsumer;
+
+    protected ToStringConsumer taskManagerConsumer;
+
+    protected String flinkVersion = getFlinkVersion();
+
+    public static String getFlinkVersion() {
+        return "2.2.0";
+    }
+
+    protected List<String> copyJarToFlinkLib() {
+        return Collections.emptyList();
+    }
+
+    @BeforeEach
+    public void before() throws Exception {
+        LOG.info("Starting containers...");
+        jobManagerConsumer = new ToStringConsumer();
+        jobManager =
+                new GenericContainer<>(getFlinkDockerImageTag())
+                        .withCommand("jobmanager")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
+                        .withExposedPorts(JOB_MANAGER_REST_PORT)
+                        .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+                        .withCreateContainerCmdModifier(cmd -> 
cmd.withVolumes(sharedVolume))
+                        .withLogConsumer(jobManagerConsumer);
+
+        List<String> jarToCopy = copyJarToFlinkLib();
+        if (!jarToCopy.isEmpty()) {
+            for (String jar : jarToCopy) {
+                jobManager.withCopyFileToContainer(
+                        MountableFile.forHostPath(TestUtils.getResource(jar)), 
"/opt/flink/lib/");
+            }
+        }
+
+        Startables.deepStart(Stream.of(jobManager)).join();
+        runInContainerAsRoot(jobManager, "chmod", "0777", "-R", 
sharedVolume.toString());
+        LOG.info("JobManager is started.");
+
+        taskManagerConsumer = new ToStringConsumer();
+        taskManager =
+                new GenericContainer<>(getFlinkDockerImageTag())
+                        .withCommand("taskmanager")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
+                        .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+                        .dependsOn(jobManager)
+                        .withVolumesFrom(jobManager, BindMode.READ_WRITE)
+                        .withLogConsumer(taskManagerConsumer);
+        Startables.deepStart(Stream.of(taskManager)).join();
+        runInContainerAsRoot(taskManager, "chmod", "0777", "-R", 
sharedVolume.toString());
+        LOG.info("TaskManager is started.");
+
+        TarballFetcher.fetchLatest(jobManager);
+        LOG.info("CDC executables deployed.");
+    }
+
+    @AfterEach
+    public void after() {
+        if (restClusterClient != null) {
+            restClusterClient.close();
+        }
+        if (jobManager != null) {
+            jobManager.stop();
+        }
+        if (taskManager != null) {
+            taskManager.stop();
+        }
+    }
+
+    /**
+     * Submits a YAML job to the running cluster with latest CDC version, 
without from previous
+     * savepoint states.
+     */
+    public JobID submitPipelineJob(String pipelineJob, Path... jars) throws 
Exception {
+        return submitPipelineJob(
+                TarballFetcher.CdcVersion.SNAPSHOT, pipelineJob, null, false, 
jars);
+    }
+
+    /**
+     * Submits a YAML job to the running cluster with specific CDC version, 
without from previous
+     * savepoint states.
+     */
+    public JobID submitPipelineJob(
+            TarballFetcher.CdcVersion version, String pipelineJob, Path... 
jars) throws Exception {
+        return submitPipelineJob(version, pipelineJob, null, false, jars);
+    }
+
+    /** Submits a YAML job to the running cluster with latest CDC version. */
+    public JobID submitPipelineJob(
+            String pipelineJob,
+            @Nullable String savepointPath,
+            boolean allowNonRestoredState,
+            Path... jars)
+            throws Exception {
+        return submitPipelineJob(
+                TarballFetcher.CdcVersion.SNAPSHOT,
+                pipelineJob,
+                savepointPath,
+                allowNonRestoredState,
+                jars);
+    }
+
+    public JobID submitPipelineJob(
+            TarballFetcher.CdcVersion version,
+            String pipelineJob,
+            @Nullable String savepointPath,
+            boolean allowNonRestoredState,
+            Path... jars)
+            throws Exception {
+
+        // Prepare external JAR dependencies
+        List<Path> paths = new ArrayList<>(Arrays.asList(jars));
+        List<String> containerPaths = new ArrayList<>();
+        paths.add(TestUtils.getResource("mysql-driver.jar"));
+
+        for (Path jar : paths) {
+            String containerPath = version.workDir() + "/lib/" + 
jar.getFileName();
+            jobManager.copyFileToContainer(MountableFile.forHostPath(jar), 
containerPath);
+            containerPaths.add(containerPath);
+        }
+
+        containerPaths.add(version.workDir() + 
"/lib/values-cdc-pipeline-connector.jar");
+
+        StringBuilder sb = new StringBuilder();
+        for (String containerPath : containerPaths) {
+            sb.append(" --jar ").append(containerPath);
+        }
+
+        jobManager.copyFileToContainer(
+                Transferable.of(pipelineJob), version.workDir() + 
"/conf/pipeline.yaml");
+
+        String commands =
+                version.workDir()
+                        + "/bin/flink-cdc.sh "
+                        + version.workDir()
+                        + "/conf/pipeline.yaml --flink-home /opt/flink"
+                        + sb;
+
+        if (savepointPath != null) {
+            commands += " --from-savepoint " + savepointPath;
+            if (allowNonRestoredState) {
+                commands += " --allow-nonRestored-state";
+            }
+        }
+        LOG.info("Execute command: {}", commands);
+        ExecResult execResult = executeAndCheck(jobManager, commands);
+        return Arrays.stream(execResult.getStdout().split("\n"))
+                .filter(line -> line.startsWith("Job ID: "))
+                .findFirst()
+                .map(line -> line.split(": ")[1])
+                .map(JobID::fromHexString)
+                .orElse(null);
+    }
+
+    public String stopJobWithSavepoint(JobID jobID) {
+        String savepointPath = "/opt/flink/";
+        ExecResult result =
+                executeAndCheck(
+                        jobManager,
+                        "flink",
+                        "stop",
+                        jobID.toHexString(),
+                        "--savepointPath",
+                        savepointPath);
+
+        return Arrays.stream(result.getStdout().split("\n"))
+                .filter(line -> line.startsWith("Savepoint completed."))
+                .findFirst()
+                .map(line -> line.split("Path: file:")[1])
+                .orElseThrow(
+                        () -> new RuntimeException("Failed to parse savepoint 
path from stdout."));
+    }
+
+    public void cancelJob(JobID jobID) {
+        executeAndCheck(jobManager, "flink", "cancel", jobID.toHexString());
+    }
+
+    /**
+     * Get {@link RestClusterClient} connected to this FlinkContainer.
+     *
+     * <p>This method lazily initializes the REST client on-demand.
+     */
+    public RestClusterClient<StandaloneClusterId> getRestClusterClient() {
+        if (restClusterClient != null) {
+            return restClusterClient;
+        }
+        checkState(
+                jobManager.isRunning(),
+                "Cluster client should only be retrieved for a running 
cluster");
+        try {
+            final Configuration clientConfiguration = new Configuration();
+            clientConfiguration.set(RestOptions.ADDRESS, jobManager.getHost());
+            clientConfiguration.set(
+                    RestOptions.PORT, 
jobManager.getMappedPort(JOB_MANAGER_REST_PORT));
+            this.restClusterClient =
+                    new RestClusterClient<>(clientConfiguration, 
StandaloneClusterId.getInstance());
+        } catch (Exception e) {
+            throw new IllegalStateException(
+                    "Failed to create client for Flink container cluster", e);
+        }
+        return restClusterClient;
+    }
+
+    public void waitUntilJobRunning(Duration timeout) {
+        waitUntilJobState(timeout, JobStatus.RUNNING);
+    }
+
+    public void waitUntilJobFinished(Duration timeout) {
+        waitUntilJobState(timeout, JobStatus.FINISHED);
+    }
+
+    public void waitUntilJobState(Duration timeout, JobStatus expectedStatus) {
+        RestClusterClient<?> clusterClient = getRestClusterClient();
+        Deadline deadline = Deadline.fromNow(timeout);
+        while (deadline.hasTimeLeft()) {
+            Collection<JobStatusMessage> jobStatusMessages;
+            try {
+                jobStatusMessages = clusterClient.listJobs().get(10, 
TimeUnit.SECONDS);
+            } catch (Exception e) {
+                LOG.warn("Error when fetching job status.", e);
+                continue;
+            }
+            if (jobStatusMessages != null && !jobStatusMessages.isEmpty()) {
+                JobStatusMessage message = jobStatusMessages.iterator().next();
+                JobStatus jobStatus = message.getJobState();
+                if (!expectedStatus.isTerminalState() && 
jobStatus.isTerminalState()) {
+                    throw new ValidationException(
+                            String.format(
+                                    "Job has been terminated! JobName: %s, 
JobID: %s, Status: %s",
+                                    message.getJobName(),
+                                    message.getJobId(),
+                                    message.getJobState()));
+                } else if (jobStatus == expectedStatus) {
+                    return;
+                }
+            }
+        }
+    }
+
+    protected String getFlinkDockerImageTag() {
+        if (System.getProperty("java.specification.version").equals("17")) {
+            return String.format("flink:%s-scala_2.12-java17", flinkVersion);
+        }
+        return String.format("flink:%s-scala_2.12-java11", flinkVersion);
+    }
+
+    private ExecResult executeAndCheck(GenericContainer<?> container, 
String... command) {
+        String joinedCommand = String.join(" ", command);
+        try {
+            LOG.info("Executing command {}", joinedCommand);
+            ExecResult execResult =
+                    container.execInContainer("bash", "-c", String.join(" ", 
command));
+            LOG.info(execResult.getStdout());
+            if (execResult.getExitCode() == 0) {
+                LOG.info("Command executed successfully.");
+                return execResult;
+            } else {
+                LOG.error(execResult.getStderr());
+                throw new AssertionError(
+                        "Failed when submitting the pipeline job.\n"
+                                + "Exit code: "
+                                + execResult.getExitCode()
+                                + "\n"
+                                + "StdOut: "
+                                + execResult.getStdout()
+                                + "\n"
+                                + "StdErr: "
+                                + execResult.getStderr());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to execute command " + joinedCommand + " in 
container " + container);
+        }

Review Comment:
   The catch-all here throws a new RuntimeException without the original 
exception as the cause, which makes failures in container execution much harder 
to diagnose. Preserve the cause (and ideally include stderr/stdout) when 
rethrowing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to