gaoyunhaii commented on code in PR #20472: URL: https://github.com/apache/flink/pull/20472#discussion_r939545370
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java: ########## @@ -64,7 +103,48 @@ protected Transformation<RowData> translateToPlanInternal( // the boundedness has been checked via the runtime provider already, so we can safely // declare all legacy transformations as bounded to make the stream graph generator happy ExecNodeUtil.makeLegacySourceTransformationsBounded(transformation); - return transformation; + + // no dynamic filtering applied + if (getInputEdges().isEmpty() || !(transformation instanceof SourceTransformation)) { + return transformation; + } + + // handle dynamic filtering + Preconditions.checkArgument(getInputEdges().size() == 1); + BatchExecNode<?> input = (BatchExecNode<?>) getInputEdges().get(0).getSource(); + if (!(input instanceof BatchExecDynamicFilteringDataCollector)) { + throw new TableException( + "The source input must be BatchExecDynamicFilteringDataCollector now"); Review Comment: nit: now -> for now ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSource.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.source; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.source.enumerator.NoOpEnumState; +import org.apache.flink.connector.source.enumerator.NoOpEnumStateSerializer; +import org.apache.flink.connector.source.enumerator.ValuesSourceEnumerator; +import org.apache.flink.connector.source.split.ValuesSourceSplit; +import org.apache.flink.connector.source.split.ValuesSourceSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * A {@link Source} implementation that reads data from a list. + * + * <p>The source is useful for FLIP-27 source tests. + * + * <p>{@code FromElementsSource} requires the elements must be serializable, and the parallelism + * must be 1. RowData is not serializable and the parallelism of table source may not be 1, so we + * introduce a new source for testing in table module. + */ +public class ValuesSource implements Source<RowData, ValuesSourceSplit, NoOpEnumState> { + private final TypeSerializer<RowData> serializer; + + private final List<byte[]> serializedElements; + + public ValuesSource(Collection<RowData> elements, TypeSerializer<RowData> serializer) { + Preconditions.checkState(serializer != null, "serializer not set"); + this.serializedElements = serializeElements(elements, serializer); + this.serializer = serializer; + } + + private List<byte[]> serializeElements( + Collection<RowData> elements, TypeSerializer<RowData> serializer) { + List<byte[]> serializeElements = new ArrayList<>(); + + for (RowData element : elements) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); Review Comment: Wrapped with try-with-resource ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/DynamicFilteringValuesSource.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.source; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.source.enumerator.DynamicFilteringValuesSourceEnumerator; +import org.apache.flink.connector.source.enumerator.NoOpEnumState; +import org.apache.flink.connector.source.enumerator.NoOpEnumStateSerializer; +import org.apache.flink.connector.source.split.ValuesSourcePartitionSplit; +import org.apache.flink.connector.source.split.ValuesSourcePartitionSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayOutputStream; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * A {@link Source} implementation that reads data from a partitioned list. + * + * <p>This source is useful for dynamic filtering testing. + */ +public class DynamicFilteringValuesSource + implements Source<RowData, ValuesSourcePartitionSplit, NoOpEnumState> { + + private final TypeSerializer<RowData> serializer; + private Map<Map<String, String>, byte[]> serializedElements; + private Map<Map<String, String>, Integer> counts; + private final List<String> dynamicFilteringFields; + + public DynamicFilteringValuesSource( + Map<Map<String, String>, Collection<RowData>> elements, + TypeSerializer<RowData> serializer, + List<String> dynamicFilteringFields) { + this.serializer = serializer; + this.dynamicFilteringFields = dynamicFilteringFields; + serializeElements(serializer, elements); + } + + private void serializeElements( + TypeSerializer<RowData> serializer, + Map<Map<String, String>, Collection<RowData>> elements) { + Preconditions.checkState(serializer != null, "serializer not set"); + + serializedElements = new HashMap<>(); + counts = new HashMap<>(); + for (Map<String, String> partition : elements.keySet()) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); Review Comment: Also wrapped with try-with-resource ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecDynamicFilteringDataCollector.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.batch; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; +import org.apache.flink.table.runtime.operators.dynamicfiltering.DynamicFilteringDataCollectorOperatorFactory; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.configuration.ConfigOptions.key; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Batch {@link ExecNode} that collects inputs and builds {@link + * org.apache.flink.table.connector.source.DynamicFilteringData}, and then sends the {@link + * org.apache.flink.table.connector.source.DynamicFilteringEvent} to the source coordinator. + */ +public class BatchExecDynamicFilteringDataCollector extends ExecNodeBase<Object> + implements BatchExecNode<Object> { + + @Experimental + public static final ConfigOption<MemorySize> TABLE_EXEC_DYNAMIC_FILTERING_THRESHOLD = + key("table.exec.dynamic-filtering.threshold") + .memoryType() + .defaultValue(MemorySize.parse("32 mb")) Review Comment: Might decrease the size to 8M since AkkaOptions#FRAMESIZE by default is 10M ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java: ########## @@ -64,7 +103,48 @@ protected Transformation<RowData> translateToPlanInternal( // the boundedness has been checked via the runtime provider already, so we can safely // declare all legacy transformations as bounded to make the stream graph generator happy ExecNodeUtil.makeLegacySourceTransformationsBounded(transformation); - return transformation; + + // no dynamic filtering applied + if (getInputEdges().isEmpty() || !(transformation instanceof SourceTransformation)) { + return transformation; + } + + // handle dynamic filtering + Preconditions.checkArgument(getInputEdges().size() == 1); + BatchExecNode<?> input = (BatchExecNode<?>) getInputEdges().get(0).getSource(); + if (!(input instanceof BatchExecDynamicFilteringDataCollector)) { + throw new TableException( + "The source input must be BatchExecDynamicFilteringDataCollector now"); + } + BatchExecDynamicFilteringDataCollector dynamicFilteringDataCollector = + (BatchExecDynamicFilteringDataCollector) input; + + ((SourceTransformation<?, ?, ?>) transformation) + .setCoordinatorListeningID(dynamicFilteringDataListenerID); + + // Must use translateToPlan to avoid duplication dynamic filters. + Transformation<Object> dynamicFilteringTransform = + dynamicFilteringDataCollector.translateToPlan(planner); + ((DynamicFilteringDataCollectorOperatorFactory) + ((OneInputTransformation<?, ?>) dynamicFilteringTransform) + .getOperatorFactory()) + .registerDynamicFilteringDataListenerID(dynamicFilteringDataListenerID); + + if (!needDynamicFilteringDependency) { + planner.addExtraTransformation(dynamicFilteringTransform); + return transformation; + } else { + MultipleInputTransformation<RowData> multipleInputTransformation = + new MultipleInputTransformation<>( + "Placeholder-Filter", Review Comment: Might rename to `Order-Enforcer` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java: ########## @@ -64,7 +103,48 @@ protected Transformation<RowData> translateToPlanInternal( // the boundedness has been checked via the runtime provider already, so we can safely // declare all legacy transformations as bounded to make the stream graph generator happy ExecNodeUtil.makeLegacySourceTransformationsBounded(transformation); - return transformation; + + // no dynamic filtering applied + if (getInputEdges().isEmpty() || !(transformation instanceof SourceTransformation)) { + return transformation; + } + + // handle dynamic filtering + Preconditions.checkArgument(getInputEdges().size() == 1); Review Comment: Seems better to use `checkState` since the give parameter is not an argument. ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSource.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.source; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.source.enumerator.NoOpEnumState; +import org.apache.flink.connector.source.enumerator.NoOpEnumStateSerializer; +import org.apache.flink.connector.source.enumerator.ValuesSourceEnumerator; +import org.apache.flink.connector.source.split.ValuesSourceSplit; +import org.apache.flink.connector.source.split.ValuesSourceSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * A {@link Source} implementation that reads data from a list. + * + * <p>The source is useful for FLIP-27 source tests. + * + * <p>{@code FromElementsSource} requires the elements must be serializable, and the parallelism + * must be 1. RowData is not serializable and the parallelism of table source may not be 1, so we + * introduce a new source for testing in table module. + */ +public class ValuesSource implements Source<RowData, ValuesSourceSplit, NoOpEnumState> { + private final TypeSerializer<RowData> serializer; + + private final List<byte[]> serializedElements; + + public ValuesSource(Collection<RowData> elements, TypeSerializer<RowData> serializer) { + Preconditions.checkState(serializer != null, "serializer not set"); + this.serializedElements = serializeElements(elements, serializer); + this.serializer = serializer; + } + + private List<byte[]> serializeElements( + Collection<RowData> elements, TypeSerializer<RowData> serializer) { + List<byte[]> serializeElements = new ArrayList<>(); + + for (RowData element : elements) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + try { + serializer.serialize(element, wrapper); + } catch (Exception e) { + throw new TableException( + "Serializing the source elements failed: " + e.getMessage(), e); + } + serializeElements.add(baos.toByteArray()); + } + return serializeElements; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SourceReader<RowData, ValuesSourceSplit> createReader(SourceReaderContext readerContext) + throws Exception { + return new ValuesSourceReader(serializedElements, serializer, readerContext); + } + + @Override + public SplitEnumerator<ValuesSourceSplit, NoOpEnumState> createEnumerator( + SplitEnumeratorContext<ValuesSourceSplit> enumContext) throws Exception { Review Comment: nit: throws Exception is not used. ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSourceReader.java: ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.source; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.source.split.ValuesSourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; + +/** A {@link SourceReader} implementation that reads data from a list. */ +public class ValuesSourceReader implements SourceReader<RowData, ValuesSourceSplit> { + private static final Logger LOG = LoggerFactory.getLogger(ValuesSourceReader.class); + + /** The context for this reader, to communicate with the enumerator. */ + private final SourceReaderContext context; + + /** The availability future. This reader is available as soon as a split is assigned. */ + private CompletableFuture<Void> availability; + + private final List<byte[]> serializedElements; + private final TypeSerializer<RowData> serializer; + private List<RowData> elements; + + /** The remaining splits that were assigned but not yet processed. */ + private final Queue<ValuesSourceSplit> remainingSplits; + + private boolean noMoreSplits; + + public ValuesSourceReader( + List<byte[]> serializedElements, + TypeSerializer<RowData> serializer, + SourceReaderContext context) { + this.serializedElements = serializedElements; + this.serializer = serializer; + this.context = context; + this.availability = new CompletableFuture<>(); + this.remainingSplits = new ArrayDeque<>(); + } + + @Override + public void start() { + elements = new ArrayList<>(); + for (byte[] bytes : serializedElements) { + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); Review Comment: Also wrapped with try-with-resource ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/DynamicFilteringValuesSourceReader.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.source; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.source.split.ValuesSourcePartitionSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A {@link SourceReader} implementation that reads data from a list. */ +public class DynamicFilteringValuesSourceReader Review Comment: Similar to `ValuesSourceReader`, we'd better not implement it from scratch ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSource.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.source; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.source.enumerator.NoOpEnumState; +import org.apache.flink.connector.source.enumerator.NoOpEnumStateSerializer; +import org.apache.flink.connector.source.enumerator.ValuesSourceEnumerator; +import org.apache.flink.connector.source.split.ValuesSourceSplit; +import org.apache.flink.connector.source.split.ValuesSourceSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * A {@link Source} implementation that reads data from a list. + * + * <p>The source is useful for FLIP-27 source tests. + * + * <p>{@code FromElementsSource} requires the elements must be serializable, and the parallelism + * must be 1. RowData is not serializable and the parallelism of table source may not be 1, so we + * introduce a new source for testing in table module. + */ +public class ValuesSource implements Source<RowData, ValuesSourceSplit, NoOpEnumState> { + private final TypeSerializer<RowData> serializer; + + private final List<byte[]> serializedElements; + + public ValuesSource(Collection<RowData> elements, TypeSerializer<RowData> serializer) { + Preconditions.checkState(serializer != null, "serializer not set"); + this.serializedElements = serializeElements(elements, serializer); + this.serializer = serializer; + } + + private List<byte[]> serializeElements( + Collection<RowData> elements, TypeSerializer<RowData> serializer) { + List<byte[]> serializeElements = new ArrayList<>(); + + for (RowData element : elements) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); Review Comment: ditto ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/ValuesSourceReader.java: ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.source; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.source.split.ValuesSourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; + +/** A {@link SourceReader} implementation that reads data from a list. */ +public class ValuesSourceReader implements SourceReader<RowData, ValuesSourceSplit> { Review Comment: It looks to me that it might not be a good practice to directly implements a SourceReader from scratch since it in fact has a lot of "default protocol" with how runtime interact with the new sources. If there are some changes in the future in the runtime side, the implementation here might also need some rework It looks to me to be better to extends the `SourceReaderBase` or `SingleThreadMultiplexSourceReaderBase`. If we reach an agreement on this point it is also acceptable to change the implementation in a separate PR. (a poc implementation might looks like follows: ```java public class MyValuesSourceReader extends SingleThreadMultiplexSourceReaderBase<RowData, RowData, ValuesSourceSplit, byte[]> { public MyValuesSourceReader( List<byte[]> serializedElements, TypeSerializer<RowData> serializer, SourceReaderContext readerContext, Configuration config) { super(() -> new MyValueSourceSplitReader(serializedElements, serializer), new MyRecordEmitter(), config, readerContext); } @Override public void start() { // we request a split only if we did not get splits during the checkpoint restore if (getNumberOfCurrentlyAssignedSplits() == 0) { context.sendSplitRequest(); } } @Override protected void onSplitFinished(Map<String, byte[]> finishedSplitIds) { context.sendSplitRequest(); } @Override protected byte[] initializedState(ValuesSourceSplit split) { return new byte[0]; } @Override protected ValuesSourceSplit toSplitType(String splitId, byte[] splitState) { return new ValuesSourceSplit(Integer.parseInt(splitId)); } public static class MyValueSourceSplitReader implements SplitReader<RowData, ValuesSourceSplit> { private final List<RowData> elements = new ArrayList<>(); private final Queue<ValuesSourceSplit> splits = new ArrayDeque<>(); private boolean nextIsFinish; public MyValueSourceSplitReader(List<byte[]> serializedElements, TypeSerializer<RowData> serializer) { for (byte[] bytes : serializedElements) { ByteArrayInputStream bais = new ByteArrayInputStream(bytes); final DataInputView input = new DataInputViewStreamWrapper(bais); try { RowData next = serializer.deserialize(input); elements.add(next); } catch (Exception e) { throw new TableException( "Failed to deserialize an element from the source. " + "If you are using user-defined serialization (Value and Writable types), check the " + "serialization functions.\nSerializer is " + serializer, e); } } } @Override public RecordsWithSplitIds<RowData> fetch() throws IOException { System.out.println("Fetch splits is " + splits); if (nextIsFinish) { ValuesSourceSplit split = splits.poll(); nextIsFinish = false; return new RecordsBySplits<>( Collections.emptyMap(), Collections.singleton(split.splitId())); } else { ValuesSourceSplit split = splits.peek(); nextIsFinish = true; return new RecordsBySplits<>( Collections.singletonMap(split.splitId(), Collections.singletonList(elements.get(split.getIndex()))), Collections.emptySet()); } } @Override public void handleSplitsChanges(SplitsChange<ValuesSourceSplit> splitsChanges) { if (!(splitsChanges instanceof SplitsAddition)) { throw new UnsupportedOperationException( String.format( "The SplitChange type of %s is not supported.", splitsChanges.getClass())); } splits.addAll(splitsChanges.splits()); } @Override public void wakeUp() {} @Override public void close() throws Exception {} } public static class MyRecordEmitter implements RecordEmitter<RowData, RowData, byte[]> { @Override public void emitRecord(RowData element, SourceOutput<RowData> output, byte[] splitState) { output.collect(element); } } } ``` ) ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java: ########## @@ -960,11 +997,12 @@ protected Collection<RowData> convertToRowData(DataStructureConverter converter) final RowData rowData = (RowData) converter.toInternal(row); if (rowData != null) { rowData.setRowKind(row.getKind()); - result.add(rowData); + partitionResult.add(rowData); + size++; Review Comment: It seems size == partitionResult.size() ? Could we directly use partitionResult.size() ? ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/connector/source/enumerator/DynamicFilteringValuesSourceEnumerator.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.source.enumerator; + +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.source.split.ValuesSourcePartitionSplit; +import org.apache.flink.table.connector.source.DynamicFilteringData; +import org.apache.flink.table.connector.source.DynamicFilteringEvent; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** A SplitEnumerator implementation for dynamic filtering source. */ +public class DynamicFilteringValuesSourceEnumerator + implements SplitEnumerator<ValuesSourcePartitionSplit, NoOpEnumState> { + private static final Logger LOG = + LoggerFactory.getLogger(DynamicFilteringValuesSourceEnumerator.class); + + private final SplitEnumeratorContext<ValuesSourcePartitionSplit> context; + private final List<ValuesSourcePartitionSplit> allSplits; + private final List<String> dynamicFilteringFields; + private transient boolean receivedDynamicFilteringEvent; + private transient List<ValuesSourcePartitionSplit> remainingSplits; + + public DynamicFilteringValuesSourceEnumerator( + SplitEnumeratorContext<ValuesSourcePartitionSplit> context, + List<ValuesSourcePartitionSplit> allSplits, + List<String> dynamicFilteringFields) { + this.context = context; + this.allSplits = allSplits; + this.dynamicFilteringFields = dynamicFilteringFields; + } + + @Override + public void start() {} + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + if (!receivedDynamicFilteringEvent) { + throw new IllegalStateException("DynamicFilteringEvent has not receive"); + } + if (remainingSplits.isEmpty()) { + context.signalNoMoreSplits(subtaskId); + LOG.info("No more splits available for subtask {}", subtaskId); + } else { + ValuesSourcePartitionSplit split = remainingSplits.remove(0); + LOG.debug("Assigned split to subtask {} : {}", subtaskId, split); + context.assignSplit(split, subtaskId); + } + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + if (sourceEvent instanceof DynamicFilteringEvent) { + LOG.warn("Received DynamicFilteringEvent: {}", subtaskId); + receivedDynamicFilteringEvent = true; + DynamicFilteringData dynamicFilteringData = + ((DynamicFilteringEvent) sourceEvent).getData(); + assignPartitions(dynamicFilteringData); + } else { + LOG.error("Received unrecognized event: {}", sourceEvent); + } + } + + private void assignPartitions(DynamicFilteringData data) { + if (data.isFiltering()) { + remainingSplits = new ArrayList<>(); + for (ValuesSourcePartitionSplit split : allSplits) { + List<String> values = + dynamicFilteringFields.stream() + .map(k -> split.getPartition().get(k)) + .collect(Collectors.toList()); + LOG.info("values: " + values); + if (data.contains(generateRowData(values, data.getRowType()))) { + remainingSplits.add(split); + } + } + } else { + remainingSplits = new ArrayList<>(allSplits); + } + LOG.info("remainingSplits: " + remainingSplits); + } + + private GenericRowData generateRowData(List<String> partitionValues, RowType rowType) { + Preconditions.checkArgument(partitionValues.size() == rowType.getFieldCount()); + Object[] values = new Object[partitionValues.size()]; + for (int i = 0; i < rowType.getFieldCount(); ++i) { + switch (rowType.getTypeAt(i).getTypeRoot()) { + case VARCHAR: + values[i] = partitionValues.get(i); + break; + case INTEGER: + values[i] = Integer.valueOf(partitionValues.get(i)); + break; + case BIGINT: + values[i] = Long.valueOf(partitionValues.get(i)); + break; + default: + throw new UnsupportedOperationException( + rowType.getTypeAt(i).getTypeRoot() + " is not supported."); + } + } + return GenericRowData.of(values); + } + + @Override + public void addSplitsBack(List<ValuesSourcePartitionSplit> splits, int subtaskId) { + remainingSplits.addAll(splits); + } + + @Override + public void addReader(int subtaskId) {} + + @Override + public NoOpEnumState snapshotState(long checkpointId) throws Exception { + return null; Review Comment: This is a typo ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org