Airblader commented on a change in pull request #16984:
URL: https://github.com/apache/flink/pull/16984#discussion_r696586700



##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
##########
@@ -243,4 +270,153 @@ public void testProjectFieldAccessWithITEM() {
                                 + "`outer_map`['item'] "
                                 + "FROM ItemTable");
     }
+
+    @Test
+    public void testMetadataProjectionWithoutProjectionPushDownWhenSupported() 
{
+        createMetadataTableWithoutProjectionPushDown("T1", true);
+
+        util().verifyRelPlan("SELECT m1, metadata FROM T1");
+        assertThat(
+                
MetadataNoProjectionPushDownTableFactory.appliedMetadataKeys.get(),
+                contains("m1", "m2"));
+    }
+
+    @Test
+    public void 
testMetadataProjectionWithoutProjectionPushDownWhenNotSupported() {
+        createMetadataTableWithoutProjectionPushDown("T2", false);
+
+        util().verifyRelPlan("SELECT m1, metadata FROM T2");
+        assertThat(
+                
MetadataNoProjectionPushDownTableFactory.appliedMetadataKeys.get(),
+                contains("m1", "m2", "m3"));
+    }
+
+    @Test
+    public void 
testMetadataProjectionWithoutProjectionPushDownWhenSupportedAndNoneSelected() {
+        createMetadataTableWithoutProjectionPushDown("T3", true);
+
+        util().verifyRelPlan("SELECT 1 FROM T3");
+        
assertThat(MetadataNoProjectionPushDownTableFactory.appliedMetadataKeys.get(), 
hasSize(0));
+    }
+
+    @Test
+    public void 
testMetadataProjectionWithoutProjectionPushDownWhenNotSupportedAndNoneSelected()
 {
+        createMetadataTableWithoutProjectionPushDown("T4", false);
+
+        util().verifyRelPlan("SELECT 1 FROM T4");
+        assertThat(
+                
MetadataNoProjectionPushDownTableFactory.appliedMetadataKeys.get(),
+                contains("m1", "m2", "m3"));
+    }
+
+    // 
---------------------------------------------------------------------------------------------
+
+    private void createMetadataTableWithoutProjectionPushDown(
+            String name, boolean supportsMetadataProjection) {
+        util().tableEnv()
+                .createTable(
+                        name,
+                        TableDescriptor.forConnector(
+                                        
MetadataNoProjectionPushDownTableFactory.IDENTIFIER)
+                                .schema(
+                                        Schema.newBuilder()
+                                                .columnByMetadata("m1", 
STRING())
+                                                .columnByMetadata("metadata", 
STRING(), "m2")
+                                                .columnByMetadata("m3", 
STRING())
+                                                .build())
+                                .option(SUPPORTS_METADATA_PROJECTION, 
supportsMetadataProjection)
+                                .build());
+    }
+
+    // 
---------------------------------------------------------------------------------------------
+
+    /** Factory for {@link Source}. */
+    public static class MetadataNoProjectionPushDownTableFactory
+            implements DynamicTableSourceFactory {
+        public static final String IDENTIFIER = "metadataNoProjectionPushDown";
+
+        public static final ConfigOption<Boolean> SUPPORTS_METADATA_PROJECTION 
=
+                
ConfigOptions.key("supports-metadata-projection").booleanType().defaultValue(true);
+
+        public static ThreadLocal<List<String>> appliedMetadataKeys = new 
ThreadLocal<>();
+
+        @Override
+        public String factoryIdentifier() {
+            return IDENTIFIER;
+        }
+
+        @Override
+        public Set<ConfigOption<?>> requiredOptions() {
+            return Collections.emptySet();
+        }
+
+        @Override
+        public Set<ConfigOption<?>> optionalOptions() {
+            return Collections.singleton(SUPPORTS_METADATA_PROJECTION);
+        }
+
+        @Override
+        public DynamicTableSource createDynamicTableSource(Context context) {
+            FactoryUtil.TableFactoryHelper helper =
+                    FactoryUtil.createTableFactoryHelper(this, context);
+            return new Source(helper.getOptions());
+        }
+    }
+
+    private static class Source implements ScanTableSource, 
SupportsReadingMetadata {
+
+        private final ReadableConfig options;
+
+        public Source(ReadableConfig options) {
+            this.options = options;
+            
MetadataNoProjectionPushDownTableFactory.appliedMetadataKeys.remove();
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode() {
+            return ChangelogMode.insertOnly();
+        }
+
+        @Override
+        public ScanRuntimeProvider getScanRuntimeProvider(ScanContext 
runtimeProviderContext) {
+            return SourceFunctionProvider.of(

Review comment:
       It isn't, because `DynamicSourceUtils#validateScanSource` is called 
which creates an instance. And since this test runs in batch mode, the result 
is even used, otherwise `return null` would've worked.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to