gaoyunhaii commented on code in PR #20472:
URL: https://github.com/apache/flink/pull/20472#discussion_r939545370


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java:
##########
@@ -64,7 +103,48 @@ protected Transformation<RowData> translateToPlanInternal(
         // the boundedness has been checked via the runtime provider already, 
so we can safely
         // declare all legacy transformations as bounded to make the stream 
graph generator happy
         ExecNodeUtil.makeLegacySourceTransformationsBounded(transformation);
-        return transformation;
+
+        // no dynamic filtering applied
+        if (getInputEdges().isEmpty() || !(transformation instanceof 
SourceTransformation)) {
+            return transformation;
+        }
+
+        // handle dynamic filtering
+        Preconditions.checkArgument(getInputEdges().size() == 1);
+        BatchExecNode<?> input = (BatchExecNode<?>) 
getInputEdges().get(0).getSource();
+        if (!(input instanceof BatchExecDynamicFilteringDataCollector)) {
+            throw new TableException(
+                    "The source input must be 
BatchExecDynamicFilteringDataCollector now");

Review Comment:
   nit: now -> for now



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSource.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.source.enumerator.NoOpEnumState;
+import org.apache.flink.connector.source.enumerator.NoOpEnumStateSerializer;
+import org.apache.flink.connector.source.enumerator.ValuesSourceEnumerator;
+import org.apache.flink.connector.source.split.ValuesSourceSplit;
+import org.apache.flink.connector.source.split.ValuesSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A {@link Source} implementation that reads data from a list.
+ *
+ * <p>The source is useful for FLIP-27 source tests.
+ *
+ * <p>{@code FromElementsSource} requires the elements must be serializable, 
and the parallelism
+ * must be 1. RowData is not serializable and the parallelism of table source 
may not be 1, so we
+ * introduce a new source for testing in table module.
+ */
+public class ValuesSource implements Source<RowData, ValuesSourceSplit, 
NoOpEnumState> {
+    private final TypeSerializer<RowData> serializer;
+
+    private final List<byte[]> serializedElements;
+
+    public ValuesSource(Collection<RowData> elements, TypeSerializer<RowData> 
serializer) {
+        Preconditions.checkState(serializer != null, "serializer not set");
+        this.serializedElements = serializeElements(elements, serializer);
+        this.serializer = serializer;
+    }
+
+    private List<byte[]> serializeElements(
+            Collection<RowData> elements, TypeSerializer<RowData> serializer) {
+        List<byte[]> serializeElements = new ArrayList<>();
+
+        for (RowData element : elements) {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();

Review Comment:
   Wrapped with try-with-resource



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/DynamicFilteringValuesSource.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import 
org.apache.flink.connector.source.enumerator.DynamicFilteringValuesSourceEnumerator;
+import org.apache.flink.connector.source.enumerator.NoOpEnumState;
+import org.apache.flink.connector.source.enumerator.NoOpEnumStateSerializer;
+import org.apache.flink.connector.source.split.ValuesSourcePartitionSplit;
+import 
org.apache.flink.connector.source.split.ValuesSourcePartitionSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link Source} implementation that reads data from a partitioned list.
+ *
+ * <p>This source is useful for dynamic filtering testing.
+ */
+public class DynamicFilteringValuesSource
+        implements Source<RowData, ValuesSourcePartitionSplit, NoOpEnumState> {
+
+    private final TypeSerializer<RowData> serializer;
+    private Map<Map<String, String>, byte[]> serializedElements;
+    private Map<Map<String, String>, Integer> counts;
+    private final List<String> dynamicFilteringFields;
+
+    public DynamicFilteringValuesSource(
+            Map<Map<String, String>, Collection<RowData>> elements,
+            TypeSerializer<RowData> serializer,
+            List<String> dynamicFilteringFields) {
+        this.serializer = serializer;
+        this.dynamicFilteringFields = dynamicFilteringFields;
+        serializeElements(serializer, elements);
+    }
+
+    private void serializeElements(
+            TypeSerializer<RowData> serializer,
+            Map<Map<String, String>, Collection<RowData>> elements) {
+        Preconditions.checkState(serializer != null, "serializer not set");
+
+        serializedElements = new HashMap<>();
+        counts = new HashMap<>();
+        for (Map<String, String> partition : elements.keySet()) {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();

Review Comment:
   Also wrapped with try-with-resource



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import 
org.apache.flink.table.runtime.operators.dynamicfiltering.DynamicFilteringDataCollectorOperatorFactory;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Batch {@link ExecNode} that collects inputs and builds {@link
+ * org.apache.flink.table.connector.source.DynamicFilteringData}, and then 
sends the {@link
+ * org.apache.flink.table.connector.source.DynamicFilteringEvent} to the 
source coordinator.
+ */
+public class BatchExecDynamicFilteringDataCollector extends 
ExecNodeBase<Object>
+        implements BatchExecNode<Object> {
+
+    @Experimental
+    public static final ConfigOption<MemorySize> 
TABLE_EXEC_DYNAMIC_FILTERING_THRESHOLD =
+            key("table.exec.dynamic-filtering.threshold")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("32 mb"))

Review Comment:
   Might decrease the size to 8M since AkkaOptions#FRAMESIZE by default is 10M



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java:
##########
@@ -64,7 +103,48 @@ protected Transformation<RowData> translateToPlanInternal(
         // the boundedness has been checked via the runtime provider already, 
so we can safely
         // declare all legacy transformations as bounded to make the stream 
graph generator happy
         ExecNodeUtil.makeLegacySourceTransformationsBounded(transformation);
-        return transformation;
+
+        // no dynamic filtering applied
+        if (getInputEdges().isEmpty() || !(transformation instanceof 
SourceTransformation)) {
+            return transformation;
+        }
+
+        // handle dynamic filtering
+        Preconditions.checkArgument(getInputEdges().size() == 1);
+        BatchExecNode<?> input = (BatchExecNode<?>) 
getInputEdges().get(0).getSource();
+        if (!(input instanceof BatchExecDynamicFilteringDataCollector)) {
+            throw new TableException(
+                    "The source input must be 
BatchExecDynamicFilteringDataCollector now");
+        }
+        BatchExecDynamicFilteringDataCollector dynamicFilteringDataCollector =
+                (BatchExecDynamicFilteringDataCollector) input;
+
+        ((SourceTransformation<?, ?, ?>) transformation)
+                .setCoordinatorListeningID(dynamicFilteringDataListenerID);
+
+        // Must use translateToPlan to avoid duplication dynamic filters.
+        Transformation<Object> dynamicFilteringTransform =
+                dynamicFilteringDataCollector.translateToPlan(planner);
+        ((DynamicFilteringDataCollectorOperatorFactory)
+                        ((OneInputTransformation<?, ?>) 
dynamicFilteringTransform)
+                                .getOperatorFactory())
+                
.registerDynamicFilteringDataListenerID(dynamicFilteringDataListenerID);
+
+        if (!needDynamicFilteringDependency) {
+            planner.addExtraTransformation(dynamicFilteringTransform);
+            return transformation;
+        } else {
+            MultipleInputTransformation<RowData> multipleInputTransformation =
+                    new MultipleInputTransformation<>(
+                            "Placeholder-Filter",

Review Comment:
   Might rename to `Order-Enforcer` 



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java:
##########
@@ -64,7 +103,48 @@ protected Transformation<RowData> translateToPlanInternal(
         // the boundedness has been checked via the runtime provider already, 
so we can safely
         // declare all legacy transformations as bounded to make the stream 
graph generator happy
         ExecNodeUtil.makeLegacySourceTransformationsBounded(transformation);
-        return transformation;
+
+        // no dynamic filtering applied
+        if (getInputEdges().isEmpty() || !(transformation instanceof 
SourceTransformation)) {
+            return transformation;
+        }
+
+        // handle dynamic filtering
+        Preconditions.checkArgument(getInputEdges().size() == 1);

Review Comment:
   Seems better to use `checkState` since the give parameter is not an 
argument. 



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSource.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.source.enumerator.NoOpEnumState;
+import org.apache.flink.connector.source.enumerator.NoOpEnumStateSerializer;
+import org.apache.flink.connector.source.enumerator.ValuesSourceEnumerator;
+import org.apache.flink.connector.source.split.ValuesSourceSplit;
+import org.apache.flink.connector.source.split.ValuesSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A {@link Source} implementation that reads data from a list.
+ *
+ * <p>The source is useful for FLIP-27 source tests.
+ *
+ * <p>{@code FromElementsSource} requires the elements must be serializable, 
and the parallelism
+ * must be 1. RowData is not serializable and the parallelism of table source 
may not be 1, so we
+ * introduce a new source for testing in table module.
+ */
+public class ValuesSource implements Source<RowData, ValuesSourceSplit, 
NoOpEnumState> {
+    private final TypeSerializer<RowData> serializer;
+
+    private final List<byte[]> serializedElements;
+
+    public ValuesSource(Collection<RowData> elements, TypeSerializer<RowData> 
serializer) {
+        Preconditions.checkState(serializer != null, "serializer not set");
+        this.serializedElements = serializeElements(elements, serializer);
+        this.serializer = serializer;
+    }
+
+    private List<byte[]> serializeElements(
+            Collection<RowData> elements, TypeSerializer<RowData> serializer) {
+        List<byte[]> serializeElements = new ArrayList<>();
+
+        for (RowData element : elements) {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(baos);
+            try {
+                serializer.serialize(element, wrapper);
+            } catch (Exception e) {
+                throw new TableException(
+                        "Serializing the source elements failed: " + 
e.getMessage(), e);
+            }
+            serializeElements.add(baos.toByteArray());
+        }
+        return serializeElements;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceReader<RowData, ValuesSourceSplit> 
createReader(SourceReaderContext readerContext)
+            throws Exception {
+        return new ValuesSourceReader(serializedElements, serializer, 
readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<ValuesSourceSplit, NoOpEnumState> createEnumerator(
+            SplitEnumeratorContext<ValuesSourceSplit> enumContext) throws 
Exception {

Review Comment:
   nit: throws Exception is not used.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSourceReader.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.source.split.ValuesSourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+/** A {@link SourceReader} implementation that reads data from a list. */
+public class ValuesSourceReader implements SourceReader<RowData, 
ValuesSourceSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ValuesSourceReader.class);
+
+    /** The context for this reader, to communicate with the enumerator. */
+    private final SourceReaderContext context;
+
+    /** The availability future. This reader is available as soon as a split 
is assigned. */
+    private CompletableFuture<Void> availability;
+
+    private final List<byte[]> serializedElements;
+    private final TypeSerializer<RowData> serializer;
+    private List<RowData> elements;
+
+    /** The remaining splits that were assigned but not yet processed. */
+    private final Queue<ValuesSourceSplit> remainingSplits;
+
+    private boolean noMoreSplits;
+
+    public ValuesSourceReader(
+            List<byte[]> serializedElements,
+            TypeSerializer<RowData> serializer,
+            SourceReaderContext context) {
+        this.serializedElements = serializedElements;
+        this.serializer = serializer;
+        this.context = context;
+        this.availability = new CompletableFuture<>();
+        this.remainingSplits = new ArrayDeque<>();
+    }
+
+    @Override
+    public void start() {
+        elements = new ArrayList<>();
+        for (byte[] bytes : serializedElements) {
+            ByteArrayInputStream bais = new ByteArrayInputStream(bytes);

Review Comment:
   Also wrapped with try-with-resource



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/DynamicFilteringValuesSourceReader.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.source.split.ValuesSourcePartitionSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link SourceReader} implementation that reads data from a list. */
+public class DynamicFilteringValuesSourceReader

Review Comment:
   Similar to `ValuesSourceReader`, we'd better not implement it from scratch



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSource.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.source.enumerator.NoOpEnumState;
+import org.apache.flink.connector.source.enumerator.NoOpEnumStateSerializer;
+import org.apache.flink.connector.source.enumerator.ValuesSourceEnumerator;
+import org.apache.flink.connector.source.split.ValuesSourceSplit;
+import org.apache.flink.connector.source.split.ValuesSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A {@link Source} implementation that reads data from a list.
+ *
+ * <p>The source is useful for FLIP-27 source tests.
+ *
+ * <p>{@code FromElementsSource} requires the elements must be serializable, 
and the parallelism
+ * must be 1. RowData is not serializable and the parallelism of table source 
may not be 1, so we
+ * introduce a new source for testing in table module.
+ */
+public class ValuesSource implements Source<RowData, ValuesSourceSplit, 
NoOpEnumState> {
+    private final TypeSerializer<RowData> serializer;
+
+    private final List<byte[]> serializedElements;
+
+    public ValuesSource(Collection<RowData> elements, TypeSerializer<RowData> 
serializer) {
+        Preconditions.checkState(serializer != null, "serializer not set");
+        this.serializedElements = serializeElements(elements, serializer);
+        this.serializer = serializer;
+    }
+
+    private List<byte[]> serializeElements(
+            Collection<RowData> elements, TypeSerializer<RowData> serializer) {
+        List<byte[]> serializeElements = new ArrayList<>();
+
+        for (RowData element : elements) {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(baos);

Review Comment:
   ditto



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSourceReader.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.connector.source.split.ValuesSourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+/** A {@link SourceReader} implementation that reads data from a list. */
+public class ValuesSourceReader implements SourceReader<RowData, 
ValuesSourceSplit> {

Review Comment:
   It looks to me that it might not be a good practice to directly implements a 
SourceReader from scratch since it in fact has a lot of  "default protocol" 
with how runtime interact with the new sources. If there are some changes in 
the future in the runtime side, the implementation here might also need some 
rework
   
   It looks to me to be better to extends the `SourceReaderBase` or 
`SingleThreadMultiplexSourceReaderBase`.
   
   If we reach an agreement on this point it is also acceptable to change the 
implementation in a separate PR. 
   
   (a poc implementation might looks like follows:
   
   ```java
   public class MyValuesSourceReader
           extends SingleThreadMultiplexSourceReaderBase<RowData, RowData, 
ValuesSourceSplit, byte[]> {
   
       public MyValuesSourceReader(
               List<byte[]> serializedElements,
               TypeSerializer<RowData> serializer,
               SourceReaderContext readerContext,
               Configuration config) {
           super(() -> new MyValueSourceSplitReader(serializedElements, 
serializer), new MyRecordEmitter(), config, readerContext);
       }
   
       @Override
       public void start() {
           // we request a split only if we did not get splits during the 
checkpoint restore
           if (getNumberOfCurrentlyAssignedSplits() == 0) {
               context.sendSplitRequest();
           }
       }
   
       @Override
       protected void onSplitFinished(Map<String, byte[]> finishedSplitIds) {
           context.sendSplitRequest();
       }
   
       @Override
       protected byte[] initializedState(ValuesSourceSplit split) {
           return new byte[0];
       }
   
       @Override
       protected ValuesSourceSplit toSplitType(String splitId, byte[] 
splitState) {
           return new ValuesSourceSplit(Integer.parseInt(splitId));
       }
   
       public static class MyValueSourceSplitReader
               implements SplitReader<RowData, ValuesSourceSplit> {
   
           private final List<RowData> elements = new ArrayList<>();
   
           private final Queue<ValuesSourceSplit> splits = new ArrayDeque<>();
   
           private boolean nextIsFinish;
   
           public MyValueSourceSplitReader(List<byte[]> serializedElements,
                                           TypeSerializer<RowData> serializer) {
               for (byte[] bytes : serializedElements) {
                   ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
                   final DataInputView input = new 
DataInputViewStreamWrapper(bais);
                   try {
                       RowData next = serializer.deserialize(input);
                       elements.add(next);
                   } catch (Exception e) {
                       throw new TableException(
                               "Failed to deserialize an element from the 
source. "
                                       + "If you are using user-defined 
serialization (Value and Writable types), check the "
                                       + "serialization functions.\nSerializer 
is "
                                       + serializer,
                               e);
                   }
               }
           }
   
           @Override
           public RecordsWithSplitIds<RowData> fetch() throws IOException {
               System.out.println("Fetch splits is " + splits);
               if (nextIsFinish) {
                   ValuesSourceSplit split = splits.poll();
                   nextIsFinish = false;
                   return new RecordsBySplits<>(
                           Collections.emptyMap(), 
Collections.singleton(split.splitId()));
               } else {
                   ValuesSourceSplit split = splits.peek();
                   nextIsFinish = true;
                   return new RecordsBySplits<>(
                           Collections.singletonMap(split.splitId(), 
Collections.singletonList(elements.get(split.getIndex()))),
                           Collections.emptySet());
               }
           }
   
           @Override
           public void handleSplitsChanges(SplitsChange<ValuesSourceSplit> 
splitsChanges) {
               if (!(splitsChanges instanceof SplitsAddition)) {
                   throw new UnsupportedOperationException(
                           String.format(
                                   "The SplitChange type of %s is not 
supported.",
                                   splitsChanges.getClass()));
               }
   
               splits.addAll(splitsChanges.splits());
           }
   
           @Override
           public void wakeUp() {}
   
           @Override
           public void close() throws Exception {}
       }
   
       public static class MyRecordEmitter implements RecordEmitter<RowData, 
RowData, byte[]> {
   
           @Override
           public void emitRecord(RowData element, SourceOutput<RowData> 
output, byte[] splitState) {
               output.collect(element);
           }
       }
   }
   
   ```
   )



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java:
##########
@@ -960,11 +997,12 @@ protected Collection<RowData> 
convertToRowData(DataStructureConverter converter)
                     final RowData rowData = (RowData) 
converter.toInternal(row);
                     if (rowData != null) {
                         rowData.setRowKind(row.getKind());
-                        result.add(rowData);
+                        partitionResult.add(rowData);
+                        size++;

Review Comment:
   It seems size == partitionResult.size() ? Could we directly use 
partitionResult.size() ?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/enumerator/DynamicFilteringValuesSourceEnumerator.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.source.enumerator;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.source.split.ValuesSourcePartitionSplit;
+import org.apache.flink.table.connector.source.DynamicFilteringData;
+import org.apache.flink.table.connector.source.DynamicFilteringEvent;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** A SplitEnumerator implementation for dynamic filtering source. */
+public class DynamicFilteringValuesSourceEnumerator
+        implements SplitEnumerator<ValuesSourcePartitionSplit, NoOpEnumState> {
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(DynamicFilteringValuesSourceEnumerator.class);
+
+    private final SplitEnumeratorContext<ValuesSourcePartitionSplit> context;
+    private final List<ValuesSourcePartitionSplit> allSplits;
+    private final List<String> dynamicFilteringFields;
+    private transient boolean receivedDynamicFilteringEvent;
+    private transient List<ValuesSourcePartitionSplit> remainingSplits;
+
+    public DynamicFilteringValuesSourceEnumerator(
+            SplitEnumeratorContext<ValuesSourcePartitionSplit> context,
+            List<ValuesSourcePartitionSplit> allSplits,
+            List<String> dynamicFilteringFields) {
+        this.context = context;
+        this.allSplits = allSplits;
+        this.dynamicFilteringFields = dynamicFilteringFields;
+    }
+
+    @Override
+    public void start() {}
+
+    @Override
+    public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+        if (!receivedDynamicFilteringEvent) {
+            throw new IllegalStateException("DynamicFilteringEvent has not 
receive");
+        }
+        if (remainingSplits.isEmpty()) {
+            context.signalNoMoreSplits(subtaskId);
+            LOG.info("No more splits available for subtask {}", subtaskId);
+        } else {
+            ValuesSourcePartitionSplit split = remainingSplits.remove(0);
+            LOG.debug("Assigned split to subtask {} : {}", subtaskId, split);
+            context.assignSplit(split, subtaskId);
+        }
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof DynamicFilteringEvent) {
+            LOG.warn("Received DynamicFilteringEvent: {}", subtaskId);
+            receivedDynamicFilteringEvent = true;
+            DynamicFilteringData dynamicFilteringData =
+                    ((DynamicFilteringEvent) sourceEvent).getData();
+            assignPartitions(dynamicFilteringData);
+        } else {
+            LOG.error("Received unrecognized event: {}", sourceEvent);
+        }
+    }
+
+    private void assignPartitions(DynamicFilteringData data) {
+        if (data.isFiltering()) {
+            remainingSplits = new ArrayList<>();
+            for (ValuesSourcePartitionSplit split : allSplits) {
+                List<String> values =
+                        dynamicFilteringFields.stream()
+                                .map(k -> split.getPartition().get(k))
+                                .collect(Collectors.toList());
+                LOG.info("values: " + values);
+                if (data.contains(generateRowData(values, data.getRowType()))) 
{
+                    remainingSplits.add(split);
+                }
+            }
+        } else {
+            remainingSplits = new ArrayList<>(allSplits);
+        }
+        LOG.info("remainingSplits: " + remainingSplits);
+    }
+
+    private GenericRowData generateRowData(List<String> partitionValues, 
RowType rowType) {
+        Preconditions.checkArgument(partitionValues.size() == 
rowType.getFieldCount());
+        Object[] values = new Object[partitionValues.size()];
+        for (int i = 0; i < rowType.getFieldCount(); ++i) {
+            switch (rowType.getTypeAt(i).getTypeRoot()) {
+                case VARCHAR:
+                    values[i] = partitionValues.get(i);
+                    break;
+                case INTEGER:
+                    values[i] = Integer.valueOf(partitionValues.get(i));
+                    break;
+                case BIGINT:
+                    values[i] = Long.valueOf(partitionValues.get(i));
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            rowType.getTypeAt(i).getTypeRoot() + " is not 
supported.");
+            }
+        }
+        return GenericRowData.of(values);
+    }
+
+    @Override
+    public void addSplitsBack(List<ValuesSourcePartitionSplit> splits, int 
subtaskId) {
+        remainingSplits.addAll(splits);
+    }
+
+    @Override
+    public void addReader(int subtaskId) {}
+
+    @Override
+    public NoOpEnumState snapshotState(long checkpointId) throws Exception {
+        return null;

Review Comment:
   This is a typo ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to