twalthr commented on code in PR #25064:
URL: https://github.com/apache/flink/pull/25064#discussion_r1673942771


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/BatchCompiledPlanTestBase.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.table.api.CompiledPlan;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.PlanReference;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode;
+import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.SqlTestStep;
+import org.apache.flink.table.test.program.StatementSetTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.table.test.program.TableTestProgramRunner;
+import org.apache.flink.table.test.program.TestStep.TestKind;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Base class for implementing compiled plan tests for {@link BatchExecNode}. 
You can generate json
+ * compiled plan for the latest node version by running {@link
+ * BatchCompiledPlanTestBase#generateCompiledPlans(TableTestProgram)}. This 
method does not recreate
+ * the compiled plan if it already exists for the given version of the 
operator.
+ *
+ * <p><b>Note:</b> The test base uses {@link 
TableConfigOptions.CatalogPlanCompilation#SCHEMA}
+ * because it needs to adjust source and sink properties. Therefore, the test 
base can not be used
+ * for testing storing table options in the compiled plan.
+ */
+@ExtendWith(MiniClusterExtension.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@TestMethodOrder(OrderAnnotation.class)
+public abstract class BatchCompiledPlanTestBase implements 
TableTestProgramRunner {
+
+    private final Class<? extends ExecNode<?>> execNodeUnderTest;
+    private final List<Class<? extends ExecNode<?>>> childExecNodesUnderTest;
+
+    protected BatchCompiledPlanTestBase(Class<? extends ExecNode<?>> 
execNodeUnderTest) {
+        this(execNodeUnderTest, new ArrayList<>());
+    }
+
+    protected BatchCompiledPlanTestBase(
+            Class<? extends ExecNode<?>> execNodeUnderTest,
+            List<Class<? extends ExecNode<?>>> childExecNodesUnderTest) {
+        this.execNodeUnderTest = execNodeUnderTest;
+        this.childExecNodesUnderTest = childExecNodesUnderTest;
+    }
+
+    // Used for testing Restore Test Completeness
+    public Class<? extends ExecNode<?>> getExecNode() {
+        return execNodeUnderTest;
+    }
+
+    // Used for testing Restore Test Completeness
+    public List<Class<? extends ExecNode<?>>> getChildExecNodes() {
+        return childExecNodesUnderTest;
+    }
+
+    @Override
+    public EnumSet<TestKind> supportedSetupSteps() {
+        return EnumSet.of(
+                TestKind.CONFIG,
+                TestKind.FUNCTION,
+                TestKind.TEMPORAL_FUNCTION,
+                TestKind.SOURCE_WITH_RESTORE_DATA,
+                TestKind.SOURCE_WITH_DATA,
+                TestKind.SINK_WITH_RESTORE_DATA,
+                TestKind.SINK_WITH_DATA);
+    }
+
+    @Override
+    public EnumSet<TestKind> supportedRunSteps() {
+        return EnumSet.of(TestKind.SQL, TestKind.STATEMENT_SET);
+    }
+
+    @AfterEach
+    public void clearData() {
+        TestValuesTableFactory.clearAllData();
+    }
+
+    private List<ExecNodeMetadata> getAllMetadata() {
+        return 
ExecNodeMetadataUtil.extractMetadataFromAnnotation(execNodeUnderTest);
+    }
+
+    private ExecNodeMetadata getLatestMetadata() {
+        return ExecNodeMetadataUtil.latestAnnotation(execNodeUnderTest);
+    }
+
+    private Stream<Arguments> createSpecs() {
+        return getAllMetadata().stream()
+                .flatMap(
+                        metadata ->
+                                supportedPrograms().stream().map(p -> 
Arguments.of(p, metadata)));
+    }
+
+    /** Generates compiled plans for a given TableTestProgram. */
+    @ParameterizedTest
+    @MethodSource("supportedPrograms")
+    @Order(0)
+    public void generateCompiledPlans(TableTestProgram program) {
+        Path path = getPlanPath(program, getLatestMetadata());
+        if (path.toFile().exists()) {
+            return;
+        }
+
+        final EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
+        settings.getConfiguration().set(StateBackendOptions.STATE_BACKEND, 
"rocksdb");
+        final TableEnvironment tEnv = TableEnvironment.create(settings);
+        program.getSetupConfigOptionTestSteps().forEach(s -> s.apply(tEnv));
+        tEnv.getConfig()
+                .set(
+                        TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS,
+                        TableConfigOptions.CatalogPlanCompilation.SCHEMA);
+
+        for (SourceTestStep sourceTestStep : 
program.getSetupSourceTestSteps()) {
+            final String id = 
TestValuesTableFactory.registerData(sourceTestStep.dataBeforeRestore);
+            final Map<String, String> options = new HashMap<>();
+            options.put("connector", "values");
+            options.put("data-id", id);
+            options.put("bounded", "true");
+            options.put("terminating", "true");
+            options.put("runtime-source", "NewSource");
+            sourceTestStep.apply(tEnv, options);
+        }
+
+        for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
+            final Map<String, String> options = new HashMap<>();
+            options.put("connector", "values");
+            options.put("sink-insert-only", "false");
+            sinkTestStep.apply(tEnv, options);
+        }
+
+        program.getSetupFunctionTestSteps().forEach(s -> s.apply(tEnv));
+        program.getSetupTemporalFunctionTestSteps().forEach(s -> 
s.apply(tEnv));
+
+        final CompiledPlan compiledPlan;
+        if (program.runSteps.get(0).getKind() == TestKind.STATEMENT_SET) {
+            final StatementSetTestStep statementSetTestStep = 
program.getRunStatementSetTestStep();
+            compiledPlan = statementSetTestStep.compiledPlan(tEnv);
+        } else {
+            final SqlTestStep sqlTestStep = program.getRunSqlTestStep();
+            compiledPlan = tEnv.compilePlanSql(sqlTestStep.sql);
+        }
+
+        compiledPlan.writeToFile(path);
+    }
+
+    @ParameterizedTest
+    @MethodSource("createSpecs")
+    @Order(1)
+    void loadAndRunCompiledPlan(TableTestProgram program, ExecNodeMetadata 
metadata)
+            throws Exception {
+        final EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
+        settings.getConfiguration().set(StateBackendOptions.STATE_BACKEND, 
"rocksdb");
+        final TableEnvironment tEnv = TableEnvironment.create(settings);
+        tEnv.getConfig()
+                .set(
+                        TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS,
+                        TableConfigOptions.CatalogPlanRestore.IDENTIFIER);
+
+        program.getSetupConfigOptionTestSteps().forEach(s -> s.apply(tEnv));
+
+        for (SourceTestStep sourceTestStep : 
program.getSetupSourceTestSteps()) {
+            final Collection<Row> data = sourceTestStep.dataBeforeRestore;
+            final String id = TestValuesTableFactory.registerData(data);
+            final Map<String, String> options = new HashMap<>();
+            options.put("connector", "values");
+            options.put("data-id", id);
+            options.put("runtime-source", "NewSource");
+            options.put("terminating", "true");
+            options.put("bounded", "true");
+            sourceTestStep.apply(tEnv, options);
+        }
+
+        for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
+            final Map<String, String> options = new HashMap<>();
+            options.put("connector", "values");
+            options.put("disable-lookup", "true");
+            options.put("sink-insert-only", "false");
+            sinkTestStep.apply(tEnv, options);
+        }
+
+        program.getSetupFunctionTestSteps().forEach(s -> s.apply(tEnv));
+        program.getSetupTemporalFunctionTestSteps().forEach(s -> 
s.apply(tEnv));
+
+        final CompiledPlan compiledPlan =
+                tEnv.loadPlan(PlanReference.fromFile(getPlanPath(program, 
metadata)));
+
+        compiledPlan.execute().await();
+        for (SinkTestStep sinkTestStep : program.getSetupSinkTestSteps()) {
+            List<String> expectedResults = getExpectedResults(sinkTestStep, 
sinkTestStep.name);
+            assertThat(expectedResults)
+                    .containsExactlyInAnyOrder(
+                            sinkTestStep
+                                    .getExpectedBeforeRestoreAsStrings()

Review Comment:
   There is `getExpectedAsStrings` already.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to