gaoyunhaii commented on code in PR #20374: URL: https://github.com/apache/flink/pull/20374#discussion_r935239040
########## flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java: ########## @@ -82,12 +83,18 @@ public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) { context, splitSerializer, context.isConcurrentExecutionAttemptsSupported()); - return new SourceCoordinator<>( + SourceCoordinator<SplitT, ?> coordinator = new SourceCoordinator<>( operatorName, source, sourceCoordinatorContext, context.getCoordinatorStore(), alignmentParams); + coordinator.setCoordinatorListeningID(coordinatorListeningID); + return coordinator; + } + + public void setCoordinatorListeningID(String coordinatorListeningID) { Review Comment: Might make `coordinatorListeningID` to be a constructor parameter? ########## flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java: ########## @@ -423,6 +433,10 @@ private void runInEventLoop( }); } + public void setCoordinatorListeningID(String coordinatorListeningID) { Review Comment: Might make `coordinatorListeningID` an constructor parameter ? ########## flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java: ########## @@ -43,6 +43,7 @@ Licensed to the Apache Software Foundation (ASF) under one private final Source<?, SplitT, ?> source; private final int numWorkerThreads; private final WatermarkAlignmentParams alignmentParams; + private String coordinatorListeningID; Review Comment: `@Nullable` ########## flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java: ########## @@ -111,6 +111,12 @@ Licensed to the Apache Software Foundation (ASF) under one /** A flag marking whether the coordinator has started. */ private boolean started; + /** + * An ID that the coordinator will register self in the coordinator store with. Other + * coordinators may send events to this coordinator by the ID. + */ + private String coordinatorListeningID; Review Comment: `@Nullable`? ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +import java.io.ByteArrayInputStream; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.IntStream; + +/** Data for dynamic filtering. */ +public class DynamicFilteringData implements Serializable { + private final TypeInformation<RowData> typeInfo; + private final RowType rowType; + private final List<byte[]> serializedData; + + private final boolean isFiltering; + + private transient volatile boolean prepared = false; + private transient Map<Integer, RowData> dataMap; + private transient RowData.FieldGetter[] fieldGetters; + + public DynamicFilteringData( + TypeInformation<RowData> typeInfo, + RowType rowType, + List<byte[]> serializedData, + boolean isFiltering) { + this.typeInfo = typeInfo; + this.rowType = rowType; + this.isFiltering = isFiltering; + this.serializedData = isFiltering ? serializedData : Collections.emptyList(); Review Comment: Is it necessary to do the distinguish ? ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +import java.io.ByteArrayInputStream; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.IntStream; + +/** Data for dynamic filtering. */ +public class DynamicFilteringData implements Serializable { + private final TypeInformation<RowData> typeInfo; + private final RowType rowType; + private final List<byte[]> serializedData; + + private final boolean isFiltering; + + private transient volatile boolean prepared = false; + private transient Map<Integer, RowData> dataMap; + private transient RowData.FieldGetter[] fieldGetters; + + public DynamicFilteringData( + TypeInformation<RowData> typeInfo, + RowType rowType, + List<byte[]> serializedData, + boolean isFiltering) { + this.typeInfo = typeInfo; + this.rowType = rowType; + this.isFiltering = isFiltering; + this.serializedData = isFiltering ? serializedData : Collections.emptyList(); + } + + public boolean isFiltering() { + return isFiltering; + } + + public RowType getRowType() { + return rowType; + } + + public Collection<RowData> getData() { Review Comment: `@VisibleForTesting` ? And the two method might be moved to the end of the file. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +import java.io.ByteArrayInputStream; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.IntStream; + +/** Data for dynamic filtering. */ +public class DynamicFilteringData implements Serializable { + private final TypeInformation<RowData> typeInfo; + private final RowType rowType; + private final List<byte[]> serializedData; + + private final boolean isFiltering; + + private transient volatile boolean prepared = false; + private transient Map<Integer, RowData> dataMap; + private transient RowData.FieldGetter[] fieldGetters; + + public DynamicFilteringData( + TypeInformation<RowData> typeInfo, + RowType rowType, + List<byte[]> serializedData, + boolean isFiltering) { + this.typeInfo = typeInfo; + this.rowType = rowType; + this.isFiltering = isFiltering; + this.serializedData = isFiltering ? serializedData : Collections.emptyList(); + } + + public boolean isFiltering() { + return isFiltering; + } + + public RowType getRowType() { Review Comment: This method is not used now, does it necessary in the future ? ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +import java.io.ByteArrayInputStream; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.IntStream; + +/** Data for dynamic filtering. */ +public class DynamicFilteringData implements Serializable { + private final TypeInformation<RowData> typeInfo; + private final RowType rowType; + private final List<byte[]> serializedData; + + private final boolean isFiltering; + + private transient volatile boolean prepared = false; + private transient Map<Integer, RowData> dataMap; + private transient RowData.FieldGetter[] fieldGetters; + + public DynamicFilteringData( + TypeInformation<RowData> typeInfo, + RowType rowType, + List<byte[]> serializedData, + boolean isFiltering) { + this.typeInfo = typeInfo; + this.rowType = rowType; + this.isFiltering = isFiltering; + this.serializedData = isFiltering ? serializedData : Collections.emptyList(); + } + + public boolean isFiltering() { + return isFiltering; + } + + public RowType getRowType() { + return rowType; + } + + public Collection<RowData> getData() { + prepare(); + return dataMap.values(); + } + + public boolean contains(RowData row) { + if (!isFiltering) { + return true; + } else if (row.getArity() != rowType.getFieldCount()) { + throw new TableException("The arity of RowData is different"); + } else { + prepare(); + RowData rowData = dataMap.get(hash(row)); Review Comment: Would the hash conflicts in some cases? If so, we might lost some partitions? ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicFilteringData.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.source; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; + +import java.io.ByteArrayInputStream; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.IntStream; + +/** Data for dynamic filtering. */ +public class DynamicFilteringData implements Serializable { + private final TypeInformation<RowData> typeInfo; + private final RowType rowType; + private final List<byte[]> serializedData; + + private final boolean isFiltering; + + private transient volatile boolean prepared = false; + private transient Map<Integer, RowData> dataMap; + private transient RowData.FieldGetter[] fieldGetters; + + public DynamicFilteringData( + TypeInformation<RowData> typeInfo, + RowType rowType, + List<byte[]> serializedData, + boolean isFiltering) { + this.typeInfo = typeInfo; + this.rowType = rowType; + this.isFiltering = isFiltering; + this.serializedData = isFiltering ? serializedData : Collections.emptyList(); + } + + public boolean isFiltering() { Review Comment: `@VisibleForTesting` ? ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java: ########## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.dynamicfiltering; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.source.event.SourceEventWrapper; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.connector.source.DynamicFilteringData; +import org.apache.flink.table.connector.source.DynamicFilteringEvent; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +/** + * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic + * filtering. + */ +public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object> + implements OneInputStreamOperator<RowData, Object> { + + private static final Logger LOG = + LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class); + + private final RowType dynamicFilteringFieldType; + private final List<Integer> dynamicFilteringFieldIndices; + private final long threshold; + private final OperatorEventGateway operatorEventGateway; + + private transient TypeInformation<RowData> typeInfo; + private transient TypeSerializer<RowData> serializer; + + private transient Set<byte[]> buffer; + private transient long currentSize; + private transient boolean eventSent; + + public DynamicFilteringDataCollectorOperator( + RowType dynamicFilteringFieldType, + List<Integer> dynamicFilteringFieldIndices, + long threshold, + OperatorEventGateway operatorEventGateway) { + this.dynamicFilteringFieldType = dynamicFilteringFieldType; + this.dynamicFilteringFieldIndices = dynamicFilteringFieldIndices; + this.threshold = threshold; + this.operatorEventGateway = operatorEventGateway; + } + + @Override + public void open() throws Exception { + super.open(); + this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType); + this.serializer = typeInfo.createSerializer(new ExecutionConfig()); + this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare); + this.currentSize = 0L; + this.eventSent = false; + } + + @Override + public void processElement(StreamRecord<RowData> element) throws Exception { + if (exceedThreshold()) { + return; + } + + RowData value = element.getValue(); + GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size()); + for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) { + LogicalType type = dynamicFilteringFieldType.getTypeAt(i); + int index = dynamicFilteringFieldIndices.get(i); + switch (type.getTypeRoot()) { + case INTEGER: + rowData.setField(i, value.getInt(index)); + break; + case BIGINT: + rowData.setField(i, value.getLong(index)); + break; + case VARCHAR: + rowData.setField(i, value.getString(index)); + break; + default: + throw new UnsupportedOperationException(); Review Comment: nit: Add some message ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java: ########## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.dynamicfiltering; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.source.event.SourceEventWrapper; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.connector.source.DynamicFilteringData; +import org.apache.flink.table.connector.source.DynamicFilteringEvent; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +/** + * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic + * filtering. + */ +public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object> + implements OneInputStreamOperator<RowData, Object> { + + private static final Logger LOG = + LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class); + + private final RowType dynamicFilteringFieldType; + private final List<Integer> dynamicFilteringFieldIndices; + private final long threshold; + private final OperatorEventGateway operatorEventGateway; + + private transient TypeInformation<RowData> typeInfo; + private transient TypeSerializer<RowData> serializer; + + private transient Set<byte[]> buffer; + private transient long currentSize; + private transient boolean eventSent; + + public DynamicFilteringDataCollectorOperator( + RowType dynamicFilteringFieldType, + List<Integer> dynamicFilteringFieldIndices, + long threshold, + OperatorEventGateway operatorEventGateway) { + this.dynamicFilteringFieldType = dynamicFilteringFieldType; + this.dynamicFilteringFieldIndices = dynamicFilteringFieldIndices; + this.threshold = threshold; + this.operatorEventGateway = operatorEventGateway; + } + + @Override + public void open() throws Exception { + super.open(); + this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType); + this.serializer = typeInfo.createSerializer(new ExecutionConfig()); + this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare); + this.currentSize = 0L; + this.eventSent = false; + } + + @Override + public void processElement(StreamRecord<RowData> element) throws Exception { + if (exceedThreshold()) { + return; + } + + RowData value = element.getValue(); + GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size()); + for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) { + LogicalType type = dynamicFilteringFieldType.getTypeAt(i); + int index = dynamicFilteringFieldIndices.get(i); + switch (type.getTypeRoot()) { + case INTEGER: + rowData.setField(i, value.getInt(index)); + break; + case BIGINT: + rowData.setField(i, value.getLong(index)); + break; + case VARCHAR: + rowData.setField(i, value.getString(index)); + break; + default: + throw new UnsupportedOperationException(); + } + } + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); Review Comment: wrapped with try-with-resource ? ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java: ########## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.dynamicfiltering; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.source.event.SourceEventWrapper; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.connector.source.DynamicFilteringData; +import org.apache.flink.table.connector.source.DynamicFilteringEvent; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +/** + * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic + * filtering. + */ +public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object> + implements OneInputStreamOperator<RowData, Object> { + + private static final Logger LOG = + LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class); + + private final RowType dynamicFilteringFieldType; + private final List<Integer> dynamicFilteringFieldIndices; + private final long threshold; + private final OperatorEventGateway operatorEventGateway; + + private transient TypeInformation<RowData> typeInfo; + private transient TypeSerializer<RowData> serializer; + + private transient Set<byte[]> buffer; + private transient long currentSize; + private transient boolean eventSent; + + public DynamicFilteringDataCollectorOperator( + RowType dynamicFilteringFieldType, + List<Integer> dynamicFilteringFieldIndices, + long threshold, + OperatorEventGateway operatorEventGateway) { + this.dynamicFilteringFieldType = dynamicFilteringFieldType; Review Comment: nit: add checkNotNull when possible ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java: ########## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.dynamicfiltering; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.source.event.SourceEventWrapper; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.connector.source.DynamicFilteringData; +import org.apache.flink.table.connector.source.DynamicFilteringEvent; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +/** + * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic + * filtering. + */ +public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object> + implements OneInputStreamOperator<RowData, Object> { + + private static final Logger LOG = + LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class); + + private final RowType dynamicFilteringFieldType; + private final List<Integer> dynamicFilteringFieldIndices; + private final long threshold; + private final OperatorEventGateway operatorEventGateway; + + private transient TypeInformation<RowData> typeInfo; + private transient TypeSerializer<RowData> serializer; + + private transient Set<byte[]> buffer; + private transient long currentSize; + private transient boolean eventSent; + + public DynamicFilteringDataCollectorOperator( + RowType dynamicFilteringFieldType, + List<Integer> dynamicFilteringFieldIndices, + long threshold, + OperatorEventGateway operatorEventGateway) { + this.dynamicFilteringFieldType = dynamicFilteringFieldType; + this.dynamicFilteringFieldIndices = dynamicFilteringFieldIndices; + this.threshold = threshold; + this.operatorEventGateway = operatorEventGateway; + } + + @Override + public void open() throws Exception { + super.open(); + this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType); + this.serializer = typeInfo.createSerializer(new ExecutionConfig()); + this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare); + this.currentSize = 0L; + this.eventSent = false; + } + + @Override + public void processElement(StreamRecord<RowData> element) throws Exception { + if (exceedThreshold()) { + return; + } + + RowData value = element.getValue(); + GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size()); + for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) { + LogicalType type = dynamicFilteringFieldType.getTypeAt(i); + int index = dynamicFilteringFieldIndices.get(i); + switch (type.getTypeRoot()) { + case INTEGER: + rowData.setField(i, value.getInt(index)); + break; + case BIGINT: + rowData.setField(i, value.getLong(index)); + break; + case VARCHAR: + rowData.setField(i, value.getString(index)); + break; + default: + throw new UnsupportedOperationException(); + } + } + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + serializer.serialize(rowData, wrapper); + boolean duplicated = !buffer.add(baos.toByteArray()); Review Comment: Why need this line? The byte array seems always newly-created, and byte[] should not override the hashcode / equals method. ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.dynamicfiltering; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.runtime.operators.coordination.CoordinatorStore; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.table.connector.source.DynamicFilteringEvent; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * The operator coordinator for {@link DynamicFilteringDataCollectorOperator}. The coordinator + * collects {@link DynamicFilteringEvent} then redistributes to listening source coordinators. + */ +public class DynamicFilteringDataCollectorOperatorCoordinator + implements OperatorCoordinator, CoordinationRequestHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(DynamicFilteringDataCollectorOperatorCoordinator.class); + + private final CoordinatorStore coordinatorStore; + private final List<String> dynamicFilteringDataListenerIDs; + + private boolean hasReceivedFilteringData; + + public DynamicFilteringDataCollectorOperatorCoordinator( Review Comment: nit: checkNotNull when possible ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java: ########## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.dynamicfiltering; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.source.event.SourceEventWrapper; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.connector.source.DynamicFilteringData; +import org.apache.flink.table.connector.source.DynamicFilteringEvent; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +/** + * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic + * filtering. + */ +public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object> + implements OneInputStreamOperator<RowData, Object> { + + private static final Logger LOG = + LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class); + + private final RowType dynamicFilteringFieldType; + private final List<Integer> dynamicFilteringFieldIndices; + private final long threshold; + private final OperatorEventGateway operatorEventGateway; + + private transient TypeInformation<RowData> typeInfo; + private transient TypeSerializer<RowData> serializer; + + private transient Set<byte[]> buffer; + private transient long currentSize; + private transient boolean eventSent; + + public DynamicFilteringDataCollectorOperator( + RowType dynamicFilteringFieldType, + List<Integer> dynamicFilteringFieldIndices, + long threshold, + OperatorEventGateway operatorEventGateway) { + this.dynamicFilteringFieldType = dynamicFilteringFieldType; + this.dynamicFilteringFieldIndices = dynamicFilteringFieldIndices; + this.threshold = threshold; + this.operatorEventGateway = operatorEventGateway; + } + + @Override + public void open() throws Exception { + super.open(); + this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType); + this.serializer = typeInfo.createSerializer(new ExecutionConfig()); + this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare); + this.currentSize = 0L; + this.eventSent = false; + } + + @Override + public void processElement(StreamRecord<RowData> element) throws Exception { + if (exceedThreshold()) { + return; + } + + RowData value = element.getValue(); + GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size()); + for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) { + LogicalType type = dynamicFilteringFieldType.getTypeAt(i); + int index = dynamicFilteringFieldIndices.get(i); + switch (type.getTypeRoot()) { + case INTEGER: + rowData.setField(i, value.getInt(index)); + break; + case BIGINT: + rowData.setField(i, value.getLong(index)); + break; + case VARCHAR: + rowData.setField(i, value.getString(index)); + break; + default: + throw new UnsupportedOperationException(); + } + } + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + serializer.serialize(rowData, wrapper); + boolean duplicated = !buffer.add(baos.toByteArray()); + if (duplicated) { + return; + } + currentSize += baos.size(); + + if (exceedThreshold()) { + // Send an empty filtering data and disable self by leaving the currentSize unchanged + sendEvent(); + buffer.clear(); + LOG.info( Review Comment: nit: change to warn? ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java: ########## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.dynamicfiltering; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.source.event.SourceEventWrapper; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.connector.source.DynamicFilteringData; +import org.apache.flink.table.connector.source.DynamicFilteringEvent; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +/** + * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic + * filtering. + */ +public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object> + implements OneInputStreamOperator<RowData, Object> { + + private static final Logger LOG = + LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class); + + private final RowType dynamicFilteringFieldType; + private final List<Integer> dynamicFilteringFieldIndices; + private final long threshold; + private final OperatorEventGateway operatorEventGateway; + + private transient TypeInformation<RowData> typeInfo; + private transient TypeSerializer<RowData> serializer; + + private transient Set<byte[]> buffer; + private transient long currentSize; + private transient boolean eventSent; + + public DynamicFilteringDataCollectorOperator( + RowType dynamicFilteringFieldType, + List<Integer> dynamicFilteringFieldIndices, + long threshold, + OperatorEventGateway operatorEventGateway) { + this.dynamicFilteringFieldType = dynamicFilteringFieldType; + this.dynamicFilteringFieldIndices = dynamicFilteringFieldIndices; + this.threshold = threshold; + this.operatorEventGateway = operatorEventGateway; + } + + @Override + public void open() throws Exception { + super.open(); + this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType); + this.serializer = typeInfo.createSerializer(new ExecutionConfig()); + this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare); + this.currentSize = 0L; + this.eventSent = false; + } + + @Override + public void processElement(StreamRecord<RowData> element) throws Exception { + if (exceedThreshold()) { + return; + } + + RowData value = element.getValue(); + GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size()); + for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) { + LogicalType type = dynamicFilteringFieldType.getTypeAt(i); + int index = dynamicFilteringFieldIndices.get(i); + switch (type.getTypeRoot()) { + case INTEGER: + rowData.setField(i, value.getInt(index)); + break; + case BIGINT: + rowData.setField(i, value.getLong(index)); + break; + case VARCHAR: + rowData.setField(i, value.getString(index)); + break; + default: + throw new UnsupportedOperationException(); + } + } + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + serializer.serialize(rowData, wrapper); + boolean duplicated = !buffer.add(baos.toByteArray()); + if (duplicated) { + return; + } + currentSize += baos.size(); + + if (exceedThreshold()) { + // Send an empty filtering data and disable self by leaving the currentSize unchanged + sendEvent(); Review Comment: This seems not necessary? Could we directly do it on finishing and remove the `eventSent` flag ? ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.java: ########## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.dynamicfiltering; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.source.event.SourceEventWrapper; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.connector.source.DynamicFilteringData; +import org.apache.flink.table.connector.source.DynamicFilteringEvent; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +/** + * Operator to collect and build the {@link DynamicFilteringData} for sources that supports dynamic + * filtering. + */ +public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object> + implements OneInputStreamOperator<RowData, Object> { + + private static final Logger LOG = + LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class); + + private final RowType dynamicFilteringFieldType; + private final List<Integer> dynamicFilteringFieldIndices; + private final long threshold; + private final OperatorEventGateway operatorEventGateway; + + private transient TypeInformation<RowData> typeInfo; + private transient TypeSerializer<RowData> serializer; + + private transient Set<byte[]> buffer; + private transient long currentSize; + private transient boolean eventSent; + + public DynamicFilteringDataCollectorOperator( + RowType dynamicFilteringFieldType, + List<Integer> dynamicFilteringFieldIndices, + long threshold, + OperatorEventGateway operatorEventGateway) { + this.dynamicFilteringFieldType = dynamicFilteringFieldType; + this.dynamicFilteringFieldIndices = dynamicFilteringFieldIndices; + this.threshold = threshold; + this.operatorEventGateway = operatorEventGateway; + } + + @Override + public void open() throws Exception { + super.open(); + this.typeInfo = InternalTypeInfo.of(dynamicFilteringFieldType); + this.serializer = typeInfo.createSerializer(new ExecutionConfig()); + this.buffer = new TreeSet<>(new BytePrimitiveArrayComparator(true)::compare); + this.currentSize = 0L; + this.eventSent = false; + } + + @Override + public void processElement(StreamRecord<RowData> element) throws Exception { + if (exceedThreshold()) { + return; + } + + RowData value = element.getValue(); + GenericRowData rowData = new GenericRowData(dynamicFilteringFieldIndices.size()); + for (int i = 0; i < dynamicFilteringFieldIndices.size(); ++i) { + LogicalType type = dynamicFilteringFieldType.getTypeAt(i); + int index = dynamicFilteringFieldIndices.get(i); + switch (type.getTypeRoot()) { + case INTEGER: + rowData.setField(i, value.getInt(index)); + break; + case BIGINT: + rowData.setField(i, value.getLong(index)); + break; + case VARCHAR: + rowData.setField(i, value.getString(index)); + break; + default: + throw new UnsupportedOperationException(); + } + } + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos); + serializer.serialize(rowData, wrapper); + boolean duplicated = !buffer.add(baos.toByteArray()); + if (duplicated) { + return; + } + currentSize += baos.size(); + + if (exceedThreshold()) { + // Send an empty filtering data and disable self by leaving the currentSize unchanged + sendEvent(); + buffer.clear(); + LOG.info( + "Collected data size exceeds the threshold, {} > {}, dynamic filtering is disabled.", + currentSize, + threshold); + } + } + + private boolean exceedThreshold() { + return threshold > 0 && currentSize > threshold; + } + + @Override + public void finish() throws Exception { + if (!eventSent) { + LOG.info( + "Finish collecting. {} bytes in {} rows are collected. sending the data.", + currentSize, + buffer.size()); + sendEvent(); + } + } + + private void sendEvent() { + if (eventSent) { + return; + } + + final DynamicFilteringData dynamicFilteringData; + if (exceedThreshold()) { + dynamicFilteringData = + new DynamicFilteringData( + typeInfo, dynamicFilteringFieldType, new ArrayList<>(), false); Review Comment: nit: Collections.emptyList() ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorCoordinator.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.dynamicfiltering; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.runtime.operators.coordination.CoordinatorStore; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.table.connector.source.DynamicFilteringEvent; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * The operator coordinator for {@link DynamicFilteringDataCollectorOperator}. The coordinator + * collects {@link DynamicFilteringEvent} then redistributes to listening source coordinators. + */ +public class DynamicFilteringDataCollectorOperatorCoordinator + implements OperatorCoordinator, CoordinationRequestHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(DynamicFilteringDataCollectorOperatorCoordinator.class); + + private final CoordinatorStore coordinatorStore; + private final List<String> dynamicFilteringDataListenerIDs; + + private boolean hasReceivedFilteringData; + + public DynamicFilteringDataCollectorOperatorCoordinator( + Context context, List<String> dynamicFilteringDataListenerIDs) { + this.coordinatorStore = context.getCoordinatorStore(); + this.dynamicFilteringDataListenerIDs = dynamicFilteringDataListenerIDs; + } + + @Override + public void start() throws Exception {} + + @Override + public void close() throws Exception {} + + @Override + public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) + throws Exception { + // Since there might be speculative execution, once the dynamic filter collectors operator + // has been executed for multiple attempts, we only keep the first notification. + if (hasReceivedFilteringData) { + return; + } + + for (String listenerID : dynamicFilteringDataListenerIDs) { + // Push event to listening source coordinators. + OperatorCoordinator listener = (OperatorCoordinator) coordinatorStore.get(listenerID); + if (listener == null) { + LOG.warn("Dynamic filtering data listener is missing: {}", listenerID); Review Comment: Would this be marked as an error? ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperatorFactory.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.dynamicfiltering; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.table.types.logical.RowType; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** The factory class for {@link DynamicFilteringDataCollectorOperator}. */ +public class DynamicFilteringDataCollectorOperatorFactory + extends AbstractStreamOperatorFactory<Object> + implements CoordinatedOperatorFactory<Object> { + + private final Set<String> dynamicFilteringDataListenerIDs = new HashSet<>(); + private final RowType dynamicFilteringFieldType; + private final List<Integer> dynamicFilteringFieldIndices; + private final long threshold; + + public DynamicFilteringDataCollectorOperatorFactory( + RowType dynamicFilteringFieldType, + List<Integer> dynamicFilteringFieldIndices, + long threshold) { + this.dynamicFilteringFieldType = dynamicFilteringFieldType; Review Comment: nit: checkNotNull when possible ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/dynamicfiltering/ExecutionOrderEnforcerOperator.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.dynamicfiltering; + +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; +import org.apache.flink.streaming.api.operators.Input; +import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; + +import java.util.Arrays; +import java.util.List; + +/** + * ExecutionOrderEnforcerOperator has two inputs, one of which is a source, and the other is the + * dependent upstream. It enforces that the input source is executed after the dependent input is + * finished. Everything passed from the inputs is forwarded to the output, though typically the + * dependent input should not send anything. + * + * <p>The operator must be chained with the source, which is ensured by the {@link Review Comment: This might not be very accurate since users have method to disable chaining. We might change it to say that if not chained, the enforcer would not work, or modify the last paragraph to say that it enforces... when chaining -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org