lsyldliu commented on code in PR #23007:
URL: https://github.com/apache/flink/pull/23007#discussion_r1270276722


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/runtimefilter/util/RuntimeFilterUtils.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.runtimefilter.util;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.operators.util.BloomFilter;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+/** Utilities for runtime filter. */
+public class RuntimeFilterUtils {
+
+    public static final int OVER_MAX_ROW_COUNT = -1;
+
+    public static BloomFilter createOnHeapBloomFilter(int numExpectedEntries, 
double fpp) {
+        int byteSize = (int) 
Math.ceil(BloomFilter.optimalNumOfBits(numExpectedEntries, fpp) / 8D);
+        final BloomFilter filter = new BloomFilter(numExpectedEntries, 
byteSize);
+        
filter.setBitsLocation(MemorySegmentFactory.allocateUnpooledSegment(byteSize), 
0);
+        return filter;
+    }
+
+    public static RowData convertBloomFilterToRowData(
+            int actualRowCount, @Nullable BloomFilter bloomFilter) {

Review Comment:
   The `actRowCount` use long type may be more suitable?
   Moreover, I advise the BloomFilter should not be null, we should check it is 
not null before calling this method. Otherwise, it's not easy to understand 
what the value of `actualRowCount` is when BloomFilter is null.



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/runtimefilter/LocalRuntimeFilterBuilderOperatorTest.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.runtimefilter;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.io.network.api.EndOfData;
+import org.apache.flink.runtime.io.network.api.StopMode;
+import org.apache.flink.runtime.operators.util.BloomFilter;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
+import 
org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.Serializable;
+import java.util.Queue;
+
+import static 
org.apache.flink.table.runtime.operators.runtimefilter.util.RuntimeFilterUtils.OVER_MAX_ROW_COUNT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Test for {@link LocalRuntimeFilterBuilderOperator}. */
+public class LocalRuntimeFilterBuilderOperatorTest implements Serializable {
+
+    @Test
+    void testNormalOutput() throws Exception {
+        // create test harness and process input elements
+        try (StreamTaskMailboxTestHarness<RowData> testHarness =
+                
createLocalRuntimeFilterBuilderOperatorHarnessAndProcessElements(5, 10)) {
+
+            // test the output bloom filter
+            Queue<Object> outputs = testHarness.getOutput();
+            assertThat(outputs.size()).isEqualTo(1);
+
+            RowData outputRowData = ((StreamRecord<RowData>) 
outputs.poll()).getValue();
+            assertThat(outputRowData.getArity()).isEqualTo(2);
+
+            int actualCount = outputRowData.getInt(0);
+            BloomFilter bloomFilter = 
BloomFilter.fromBytes(outputRowData.getBinary(1));
+            assertThat(actualCount).isEqualTo(5);
+            // test elements that should exist
+            assertThat(bloomFilterTestString(bloomFilter, "var1")).isTrue();
+            assertThat(bloomFilterTestString(bloomFilter, "var2")).isTrue();
+            assertThat(bloomFilterTestString(bloomFilter, "var3")).isTrue();
+            assertThat(bloomFilterTestString(bloomFilter, "var4")).isTrue();
+            assertThat(bloomFilterTestString(bloomFilter, "var5")).isTrue();
+            // test elements that should not exist
+            assertThat(bloomFilterTestString(bloomFilter, "var6")).isFalse();
+            assertThat(bloomFilterTestString(bloomFilter, "var7")).isFalse();
+            assertThat(bloomFilterTestString(bloomFilter, "var8")).isFalse();
+            assertThat(bloomFilterTestString(bloomFilter, "var9")).isFalse();
+            assertThat(bloomFilterTestString(bloomFilter, "var10")).isFalse();
+        }
+    }
+
+    /** Test the case that the output filter is over-max-row-count. */
+    @Test
+    void testOverMaxRowCountOutput() throws Exception {
+        // create test harness and process input elements
+        try (StreamTaskMailboxTestHarness<RowData> testHarness =
+                
createLocalRuntimeFilterBuilderOperatorHarnessAndProcessElements(3, 4)) {
+
+            // test the output bloom filter should be null
+            Queue<Object> outputs = testHarness.getOutput();
+            assertThat(outputs.size()).isEqualTo(1);
+
+            RowData outputRowData = ((StreamRecord<RowData>) 
outputs.poll()).getValue();
+            assertThat(outputRowData.getArity()).isEqualTo(2);
+
+            int actualCount = outputRowData.getInt(0);
+            assertThat(actualCount).isEqualTo(OVER_MAX_ROW_COUNT);
+            assertThat(outputRowData.isNullAt(1)).isTrue();
+        }
+    }
+
+    private static boolean bloomFilterTestString(BloomFilter bloomFilter, 
String string) {
+        final Projection<RowData, BinaryRowData> projection = new 
FirstStringFileldProjection();
+        return bloomFilter.testHash(
+                
projection.apply(GenericRowData.of(StringData.fromString(string))).hashCode());
+    }
+
+    public static StreamRecord<RowData> createRowDataRecord(String string, int 
integer) {
+        return new 
StreamRecord<>(GenericRowData.of(StringData.fromString(string), integer));
+    }
+
+    public static StreamTaskMailboxTestHarness<RowData>
+            createLocalRuntimeFilterBuilderOperatorHarnessAndProcessElements(
+                    int estimatedRowCount, int maxRowCount) throws Exception {
+        final GeneratedProjection buildProjectionCode =
+                new GeneratedProjection("", "", new Object[0]) {
+                    @Override
+                    public Projection newInstance(ClassLoader classLoader) {
+                        return new FirstStringFileldProjection();
+                    }
+                };
+
+        final TypeInformation<RowData> inputType =
+                InternalTypeInfo.ofFields(new VarCharType(), new IntType());
+        final LocalRuntimeFilterBuilderOperator operator =
+                new LocalRuntimeFilterBuilderOperator(
+                        buildProjectionCode, estimatedRowCount, maxRowCount);
+        StreamTaskMailboxTestHarness<RowData> testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new,
+                                InternalTypeInfo.ofFields(new IntType(), new 
BinaryType()))
+                        .setupOutputForSingletonOperatorChain(operator)
+                        .addInput(inputType)
+                        .build();
+
+        testHarness.processElement(createRowDataRecord("var1", 111));
+        testHarness.processElement(createRowDataRecord("var2", 222));
+        testHarness.processElement(createRowDataRecord("var3", 333));
+        testHarness.processElement(createRowDataRecord("var4", 444));
+        testHarness.processElement(createRowDataRecord("var5", 555));
+        testHarness.processEvent(new EndOfData(StopMode.DRAIN), 0);
+
+        return testHarness;
+    }
+
+    static final class FirstStringFileldProjection implements 
Projection<RowData, BinaryRowData> {

Review Comment:
   typo -> FirstStringFieldProjection



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/runtimefilter/GlobalRuntimeFilterBuilderOperatorTest.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.runtimefilter;
+
+import org.apache.flink.runtime.io.network.api.EndOfData;
+import org.apache.flink.runtime.io.network.api.StopMode;
+import org.apache.flink.runtime.operators.util.BloomFilter;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
+import 
org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.runtime.operators.runtimefilter.util.RuntimeFilterUtils;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.IntType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Queue;
+
+import static 
org.apache.flink.table.runtime.operators.runtimefilter.util.RuntimeFilterUtils.OVER_MAX_ROW_COUNT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Test for {@link GlobalRuntimeFilterBuilderOperator}. */
+class GlobalRuntimeFilterBuilderOperatorTest {
+
+    @Test
+    void testNormalInputAndNormalOutput() throws Exception {
+        try (StreamTaskMailboxTestHarness<RowData> testHarness =
+                createGlobalRuntimeFilterBuilderOperatorHarness(10)) {
+            // process elements
+            testHarness.processElement(
+                    new StreamRecord<RowData>(
+                            GenericRowData.of(5, 
BloomFilter.toBytes(createBloomFilter1()))));
+            testHarness.processElement(
+                    new StreamRecord<RowData>(
+                            GenericRowData.of(5, 
BloomFilter.toBytes(createBloomFilter2()))));
+            testHarness.processEvent(new EndOfData(StopMode.DRAIN), 0);
+
+            // test the output
+            Queue<Object> outputs = testHarness.getOutput();
+            assertThat(outputs.size()).isEqualTo(1);
+
+            RowData outputRowData = ((StreamRecord<RowData>) 
outputs.poll()).getValue();
+            assertThat(outputRowData.getArity()).isEqualTo(2);
+
+            int globalCount = outputRowData.getInt(0);
+            BloomFilter globalBloomFilter = 
BloomFilter.fromBytes(outputRowData.getBinary(1));
+            assertThat(globalCount).isEqualTo(10);
+            assertThat(globalBloomFilter.testHash("var1".hashCode())).isTrue();
+            assertThat(globalBloomFilter.testHash("var2".hashCode())).isTrue();
+            assertThat(globalBloomFilter.testHash("var3".hashCode())).isTrue();
+            assertThat(globalBloomFilter.testHash("var4".hashCode())).isTrue();
+            assertThat(globalBloomFilter.testHash("var5".hashCode())).isTrue();
+            assertThat(globalBloomFilter.testHash("var6".hashCode())).isTrue();
+            assertThat(globalBloomFilter.testHash("var7".hashCode())).isTrue();
+            assertThat(globalBloomFilter.testHash("var8".hashCode())).isTrue();
+            assertThat(globalBloomFilter.testHash("var9".hashCode())).isTrue();
+            
assertThat(globalBloomFilter.testHash("var10".hashCode())).isTrue();
+        }
+    }
+
+    /**
+     * Test the case that all input local runtime filters are normal, but the 
merged global filter
+     * is over-max-row-count.
+     */
+    @Test
+    void testNormalInputAndOverMaxRowCountOutput() throws Exception {
+        try (StreamTaskMailboxTestHarness<RowData> testHarness =
+                createGlobalRuntimeFilterBuilderOperatorHarness(9)) {
+            // process elements
+            testHarness.processElement(
+                    new StreamRecord<RowData>(
+                            GenericRowData.of(5, 
BloomFilter.toBytes(createBloomFilter1()))));
+            testHarness.processElement(
+                    new StreamRecord<RowData>(
+                            GenericRowData.of(5, 
BloomFilter.toBytes(createBloomFilter2()))));
+            testHarness.processEvent(new EndOfData(StopMode.DRAIN), 0);
+
+            // test the output
+            Queue<Object> outputs = testHarness.getOutput();
+            assertThat(outputs.size()).isEqualTo(1);
+
+            RowData outputRowData = ((StreamRecord<RowData>) 
outputs.poll()).getValue();
+            assertThat(outputRowData.getArity()).isEqualTo(2);
+
+            int globalCount = outputRowData.getInt(0);
+            assertThat(globalCount).isEqualTo(OVER_MAX_ROW_COUNT);
+            assertThat(outputRowData.isNullAt(1)).isTrue();
+        }
+    }
+
+    /** Test the case that one of the input local runtime filters is 
over-max-row-count. */
+    @Test
+    void testOverMaxRowCountInput() throws Exception {
+        try (StreamTaskMailboxTestHarness<RowData> testHarness =
+                createGlobalRuntimeFilterBuilderOperatorHarness(10)) {
+            // process elements
+            testHarness.processElement(
+                    new StreamRecord<RowData>(
+                            GenericRowData.of(5, 
BloomFilter.toBytes(createBloomFilter1()))));
+            testHarness.processElement(
+                    new 
StreamRecord<RowData>(GenericRowData.of(OVER_MAX_ROW_COUNT, null)));
+            testHarness.processEvent(new EndOfData(StopMode.DRAIN), 0);
+
+            // test the output
+            Queue<Object> outputs = testHarness.getOutput();
+            assertThat(outputs.size()).isEqualTo(1);
+
+            RowData outputRowData = ((StreamRecord<RowData>) 
outputs.poll()).getValue();
+            assertThat(outputRowData.getArity()).isEqualTo(2);
+
+            int globalCount = outputRowData.getInt(0);
+            assertThat(globalCount).isEqualTo(OVER_MAX_ROW_COUNT);
+            assertThat(outputRowData.isNullAt(1)).isTrue();
+        }
+    }
+
+    private static BloomFilter createBloomFilter1() {

Review Comment:
   It would be better if we can extract a util method
   ```
       private static BloomFilter createBloomFilter(int rowCount, int offset) {
           final BloomFilter bloomFilter = 
RuntimeFilterUtils.createOnHeapBloomFilter(rowCount, 0.05);
           for (int i = 0; i < rowCount; i++) {
               bloomFilter.addHash(String.format("var%s", offset + 
i).hashCode());
           }
           return bloomFilter;
       }
   ```



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/runtimefilter/RuntimeFilterCodeGenerator.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.codegen.runtimefilter
+
+import org.apache.flink.runtime.operators.util.BloomFilter
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
OperatorCodeGenerator, ProjectionCodeGenerator}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{newName, ROW_DATA}
+import 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator.INPUT_SELECTION
+import org.apache.flink.table.planner.typeutils.RowTypeUtils
+import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
+import org.apache.flink.table.runtime.util.StreamRecordCollector
+import org.apache.flink.table.types.logical.RowType
+
+/** Operator code generator for runtime filter operator. */
+object RuntimeFilterCodeGenerator {
+  def gen(
+      ctx: CodeGeneratorContext,
+      buildType: RowType,
+      probeType: RowType,
+      probeIndices: Array[Int]): CodeGenOperatorFactory[RowData] = {
+    val probeGenProj = ProjectionCodeGenerator.generateProjection(
+      ctx,
+      "RuntimeFilterProjection",
+      probeType,
+      RowTypeUtils.projectRowType(probeType, probeIndices),
+      probeIndices)
+    ctx.addReusableInnerClass(probeGenProj.getClassName, probeGenProj.getCode)
+
+    val probeProjection = newName("probeToBinaryRow")
+    ctx.addReusableMember(s"private transient ${probeGenProj.getClassName} 
$probeProjection;")
+    val probeProjRefs = ctx.addReusableObject(probeGenProj.getReferences, 
"probeProjRefs", null)

Review Comment:
   ```suggestion
       val probeProjRefs = ctx.addReusableObject(probeGenProj.getReferences, 
"probeProjRefs")
   ```



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/runtimefilter/RuntimeFilterCodeGenerator.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.codegen.runtimefilter
+
+import org.apache.flink.runtime.operators.util.BloomFilter
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
OperatorCodeGenerator, ProjectionCodeGenerator}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{newName, ROW_DATA}
+import 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator.INPUT_SELECTION
+import org.apache.flink.table.planner.typeutils.RowTypeUtils
+import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
+import org.apache.flink.table.runtime.util.StreamRecordCollector
+import org.apache.flink.table.types.logical.RowType
+
+/** Operator code generator for runtime filter operator. */
+object RuntimeFilterCodeGenerator {
+  def gen(
+      ctx: CodeGeneratorContext,
+      buildType: RowType,
+      probeType: RowType,
+      probeIndices: Array[Int]): CodeGenOperatorFactory[RowData] = {
+    val probeGenProj = ProjectionCodeGenerator.generateProjection(
+      ctx,
+      "RuntimeFilterProjection",
+      probeType,
+      RowTypeUtils.projectRowType(probeType, probeIndices),
+      probeIndices)
+    ctx.addReusableInnerClass(probeGenProj.getClassName, probeGenProj.getCode)
+
+    val probeProjection = newName("probeToBinaryRow")
+    ctx.addReusableMember(s"private transient ${probeGenProj.getClassName} 
$probeProjection;")
+    val probeProjRefs = ctx.addReusableObject(probeGenProj.getReferences, 
"probeProjRefs", null)
+    ctx.addReusableOpenStatement(
+      s"$probeProjection = new ${probeGenProj.getClassName}($probeProjRefs);")
+
+    val buildEnd = newName("buildEnd")
+    ctx.addReusableMember(s"private transient boolean $buildEnd;")
+    ctx.addReusableOpenStatement(s"$buildEnd = false;")
+
+    val filter = newName("filter")
+    val filterClass = classOf[BloomFilter].getCanonicalName
+    ctx.addReusableMember(s"private transient $filterClass $filter;")
+
+    val collector = newName("collector")

Review Comment:
   You can use `OperatorCodeGenerator#generateCollect` directly.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/runtimefilter/RuntimeFilterCodeGenerator.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.codegen.runtimefilter
+
+import org.apache.flink.runtime.operators.util.BloomFilter
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
OperatorCodeGenerator, ProjectionCodeGenerator}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{newName, ROW_DATA}
+import 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator.INPUT_SELECTION
+import org.apache.flink.table.planner.typeutils.RowTypeUtils
+import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
+import org.apache.flink.table.runtime.util.StreamRecordCollector
+import org.apache.flink.table.types.logical.RowType
+
+/** Operator code generator for runtime filter operator. */
+object RuntimeFilterCodeGenerator {
+  def gen(
+      ctx: CodeGeneratorContext,
+      buildType: RowType,
+      probeType: RowType,
+      probeIndices: Array[Int]): CodeGenOperatorFactory[RowData] = {
+    val probeGenProj = ProjectionCodeGenerator.generateProjection(
+      ctx,
+      "RuntimeFilterProjection",
+      probeType,
+      RowTypeUtils.projectRowType(probeType, probeIndices),
+      probeIndices)
+    ctx.addReusableInnerClass(probeGenProj.getClassName, probeGenProj.getCode)
+
+    val probeProjection = newName("probeToBinaryRow")
+    ctx.addReusableMember(s"private transient ${probeGenProj.getClassName} 
$probeProjection;")
+    val probeProjRefs = ctx.addReusableObject(probeGenProj.getReferences, 
"probeProjRefs", null)
+    ctx.addReusableOpenStatement(
+      s"$probeProjection = new ${probeGenProj.getClassName}($probeProjRefs);")
+
+    val buildEnd = newName("buildEnd")
+    ctx.addReusableMember(s"private transient boolean $buildEnd;")
+    ctx.addReusableOpenStatement(s"$buildEnd = false;")
+
+    val filter = newName("filter")
+    val filterClass = classOf[BloomFilter].getCanonicalName
+    ctx.addReusableMember(s"private transient $filterClass $filter;")
+
+    val collector = newName("collector")
+    val collectorClass = classOf[StreamRecordCollector[_]].getCanonicalName
+    ctx.addReusableMember(s"private transient $collectorClass<$ROW_DATA> 
$collector;")
+    ctx.addReusableOpenStatement(s"$collector = new 
$collectorClass<>(output);")
+
+    val input1Item = "input1"
+    val input2Item = "input2"
+
+    val processElement1Code =
+      s"""
+         |if ($buildEnd) {
+         |    throw new IllegalStateException("Should not build ended.");
+         |}
+         |if ($filter == null && !$input1Item.isNullAt(1)) {
+         |    $filter = $filterClass.fromBytes($input1Item.getBinary(1));
+         |}
+         |""".stripMargin
+
+    val processElement2Code =
+      s"""
+         |if (!$buildEnd) {
+         |    throw new IllegalStateException("Should build ended.");
+         |}
+         |if ($filter != null) {
+         |    final int hashCode = 
$probeProjection.apply($input2Item).hashCode();
+         |    if ($filter.testHash(hashCode)) {
+         |        $collector.collect($input2Item);
+         |    }
+         |} else {
+         |    $collector.collect($input2Item);
+         |}
+         |""".stripMargin
+
+    val nextSelectionCode = s"return $buildEnd ? $INPUT_SELECTION.SECOND : 
$INPUT_SELECTION.FIRST;"
+
+    val endInputCode1 =
+      s"""
+         |if ($buildEnd) {

Review Comment:
   use `checkState(!buildEnd, "Should not build ended.");` ?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/runtimefilter/RuntimeFilterCodeGeneratorTest.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen.runtimefilter;
+
+import org.apache.flink.runtime.io.network.api.EndOfData;
+import org.apache.flink.runtime.io.network.api.StopMode;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
+import 
org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
+import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory;
+import 
org.apache.flink.table.runtime.operators.runtimefilter.util.RuntimeFilterUtils;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.runtime.operators.runtimefilter.LocalRuntimeFilterBuilderOperatorTest.createLocalRuntimeFilterBuilderOperatorHarnessAndProcessElements;
+import static 
org.apache.flink.table.runtime.operators.runtimefilter.LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RuntimeFilterCodeGenerator}. */
+class RuntimeFilterCodeGeneratorTest {
+    private StreamTaskMailboxTestHarness<RowData> testHarness;
+
+    @BeforeEach
+    void setup() throws Exception {
+        final RowType leftType = RowType.of(new IntType(), new 
VarBinaryType());
+        final RowType rightType = RowType.of(new VarCharType(), new IntType());
+        final CodeGeneratorContext ctx =
+                new CodeGeneratorContext(
+                        TableConfig.getDefault(), 
Thread.currentThread().getContextClassLoader());
+        final CodeGenOperatorFactory<RowData> operatorFactory =
+                RuntimeFilterCodeGenerator.gen(ctx, leftType, rightType, new 
int[] {0});
+
+        testHarness =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                TwoInputStreamTask::new, 
InternalTypeInfo.of(rightType))
+                        .setupOutputForSingletonOperatorChain(operatorFactory)
+                        .addInput(InternalTypeInfo.of(leftType))
+                        .addInput(InternalTypeInfo.of(rightType))
+                        .build();
+    }
+
+    @AfterEach
+    void cleanup() throws Exception {
+        if (testHarness != null) {
+            testHarness.close();
+        }
+    }
+
+    @Test
+    void testNormalFilter() throws Exception {
+        // finish build phase
+        finishBuildPhase(createNormalInput());
+
+        // finish probe phase
+        testHarness.processElement(createRowDataRecord("var1", 111), 1);
+        testHarness.processElement(createRowDataRecord("var3", 333), 1);
+        testHarness.processElement(createRowDataRecord("var5", 555), 1);
+        testHarness.processElement(createRowDataRecord("var6", 666), 1);
+        testHarness.processElement(createRowDataRecord("var8", 888), 1);
+        testHarness.processElement(createRowDataRecord("var9", 999), 1);
+        testHarness.processEvent(new EndOfData(StopMode.DRAIN), 1);
+
+        assertThat(getOutputRowDatas())
+                .containsExactly(
+                        GenericRowData.of("var1", 111),
+                        GenericRowData.of("var3", 333),
+                        GenericRowData.of("var5", 555));
+    }
+
+    @Test
+    void testOverMaxRowCountLimitFilter() throws Exception {
+        // finish build phase
+        finishBuildPhase(createOverMaxRowCountLimitInput());
+
+        // finish probe phase
+        testHarness.processElement(createRowDataRecord("var1", 111), 1);
+        testHarness.processElement(createRowDataRecord("var3", 333), 1);
+        testHarness.processElement(createRowDataRecord("var5", 555), 1);
+        testHarness.processElement(createRowDataRecord("var6", 666), 1);
+        testHarness.processElement(createRowDataRecord("var8", 888), 1);
+        testHarness.processElement(createRowDataRecord("var9", 999), 1);
+        testHarness.processEvent(new EndOfData(StopMode.DRAIN), 1);
+
+        assertThat(getOutputRowDatas())
+                .containsExactly(
+                        GenericRowData.of("var1", 111),
+                        GenericRowData.of("var3", 333),
+                        GenericRowData.of("var5", 555),
+                        GenericRowData.of("var6", 666),
+                        GenericRowData.of("var8", 888),
+                        GenericRowData.of("var9", 999));
+    }
+
+    private void finishBuildPhase(StreamRecord<RowData> leftInput) throws 
Exception {
+        testHarness.processElement(leftInput, 0);
+        testHarness.processEvent(new EndOfData(StopMode.DRAIN), 0);
+    }
+
+    private List<GenericRowData> getOutputRowDatas() {

Review Comment:
   `getOutputRowData`?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/runtimefilter/LocalRuntimeFilterBuilderOperator.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.runtimefilter;
+
+import org.apache.flink.runtime.operators.util.BloomFilter;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import 
org.apache.flink.table.runtime.operators.runtimefilter.util.RuntimeFilterUtils;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.util.Collector;
+
+import static 
org.apache.flink.table.runtime.operators.runtimefilter.util.RuntimeFilterUtils.OVER_MAX_ROW_COUNT;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Local runtime filter builder operator. */
+public class LocalRuntimeFilterBuilderOperator extends 
TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {
+    private final GeneratedProjection buildProjectionCode;
+    private final int estimatedRowCount;
+
+    /**
+     * The maximum number of rows to build the bloom filter. Once the actual 
number of rows received
+     * is greater than this value, we will give up building the bloom filter 
and directly output an
+     * empty filter.
+     */
+    private final int maxRowCount;
+
+    private transient Projection<RowData, BinaryRowData> buildSideProjection;
+    private transient BloomFilter filter;
+    private transient Collector<RowData> collector;
+    private transient int actualRowCount;
+
+    public LocalRuntimeFilterBuilderOperator(
+            GeneratedProjection buildProjectionCode, int estimatedRowCount, 
int maxRowCount) {
+        checkArgument(estimatedRowCount > 0);
+        checkArgument(maxRowCount >= estimatedRowCount);
+        this.buildProjectionCode = checkNotNull(buildProjectionCode);
+        this.estimatedRowCount = estimatedRowCount;
+        this.maxRowCount = maxRowCount;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+
+        this.buildSideProjection = 
buildProjectionCode.newInstance(getUserCodeClassloader());
+        this.filter = 
RuntimeFilterUtils.createOnHeapBloomFilter(estimatedRowCount, 0.05);
+        this.collector = new StreamRecordCollector<>(output);
+        this.actualRowCount = 0;
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        if (filter != null) {
+            checkNotNull(buildSideProjection);
+            int hashCode = 
buildSideProjection.apply(element.getValue()).hashCode();
+            filter.addHash(hashCode);
+            actualRowCount++;
+
+            if (actualRowCount > maxRowCount) {
+                // the actual row count is over the allowed max row count, we 
will output a
+                // null/empty filter
+                filter = null;
+                actualRowCount = OVER_MAX_ROW_COUNT;
+            }
+        }
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        
collector.collect(RuntimeFilterUtils.convertBloomFilterToRowData(actualRowCount,
 filter));

Review Comment:
   ```
           if (filter != null) {
               collector.collect(
                       
RuntimeFilterUtils.convertBloomFilterToRowData(actualRowCount, filter));
           }
   ```



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/runtimefilter/GlobalRuntimeFilterBuilderOperator.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.runtimefilter;
+
+import org.apache.flink.runtime.operators.util.BloomFilter;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import 
org.apache.flink.table.runtime.operators.runtimefilter.util.RuntimeFilterUtils;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.util.Collector;
+
+import static 
org.apache.flink.table.runtime.operators.runtimefilter.util.RuntimeFilterUtils.OVER_MAX_ROW_COUNT;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Global runtime filter builder operator. */
+public class GlobalRuntimeFilterBuilderOperator extends 
TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {
+    /**
+     * The maximum number of rows to build the bloom filter. Once the actual 
number of rows received
+     * is greater than this value, we will give up building the bloom filter 
and directly output an
+     * empty filter.
+     */
+    private final int maxRowCount;
+
+    private transient byte[] serializedGlobalFilter;
+    private transient Collector<RowData> collector;
+    private transient int globalRowCount;
+
+    public GlobalRuntimeFilterBuilderOperator(int maxRowCount) {
+        checkArgument(maxRowCount > 0);
+        this.maxRowCount = maxRowCount;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.serializedGlobalFilter = null;
+        this.collector = new StreamRecordCollector<>(output);
+        this.globalRowCount = 0;
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        RowData rowData = element.getValue();
+        int localRowCount = rowData.getInt(0);
+
+        if (globalRowCount == OVER_MAX_ROW_COUNT) {
+            // Current global filter is already over-max-row-count, do nothing.
+        } else if (localRowCount == OVER_MAX_ROW_COUNT
+                || globalRowCount + localRowCount > maxRowCount) {
+            // The received local filter is over-max-row-count, mark the 
global filter as
+            // over-max-row-count.
+            globalRowCount = OVER_MAX_ROW_COUNT;
+            serializedGlobalFilter = null;
+        } else {
+            // merge the local filter
+            byte[] serializedLocalFilter = rowData.getBinary(1);
+            if (serializedGlobalFilter == null) {
+                serializedGlobalFilter = serializedLocalFilter.clone();
+            } else {
+                BloomFilter.mergeSerializedBloomFilters(
+                        serializedGlobalFilter, serializedLocalFilter);
+            }
+            globalRowCount += localRowCount;
+        }
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        collector.collect(

Review Comment:
   if check?



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/runtimefilter/GlobalRuntimeFilterBuilderOperatorTest.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.runtimefilter;
+
+import org.apache.flink.runtime.io.network.api.EndOfData;
+import org.apache.flink.runtime.io.network.api.StopMode;
+import org.apache.flink.runtime.operators.util.BloomFilter;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
+import 
org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import 
org.apache.flink.table.runtime.operators.runtimefilter.util.RuntimeFilterUtils;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.IntType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Queue;
+
+import static 
org.apache.flink.table.runtime.operators.runtimefilter.util.RuntimeFilterUtils.OVER_MAX_ROW_COUNT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Test for {@link GlobalRuntimeFilterBuilderOperator}. */
+class GlobalRuntimeFilterBuilderOperatorTest {
+
+    @Test
+    void testNormalInputAndNormalOutput() throws Exception {
+        try (StreamTaskMailboxTestHarness<RowData> testHarness =
+                createGlobalRuntimeFilterBuilderOperatorHarness(10)) {
+            // process elements
+            testHarness.processElement(
+                    new StreamRecord<RowData>(
+                            GenericRowData.of(5, 
BloomFilter.toBytes(createBloomFilter1()))));
+            testHarness.processElement(
+                    new StreamRecord<RowData>(
+                            GenericRowData.of(5, 
BloomFilter.toBytes(createBloomFilter2()))));
+            testHarness.processEvent(new EndOfData(StopMode.DRAIN), 0);
+
+            // test the output
+            Queue<Object> outputs = testHarness.getOutput();
+            assertThat(outputs.size()).isEqualTo(1);
+
+            RowData outputRowData = ((StreamRecord<RowData>) 
outputs.poll()).getValue();
+            assertThat(outputRowData.getArity()).isEqualTo(2);
+
+            int globalCount = outputRowData.getInt(0);
+            BloomFilter globalBloomFilter = 
BloomFilter.fromBytes(outputRowData.getBinary(1));
+            assertThat(globalCount).isEqualTo(10);
+            assertThat(globalBloomFilter.testHash("var1".hashCode())).isTrue();
+            assertThat(globalBloomFilter.testHash("var2".hashCode())).isTrue();
+            assertThat(globalBloomFilter.testHash("var3".hashCode())).isTrue();
+            assertThat(globalBloomFilter.testHash("var4".hashCode())).isTrue();
+            assertThat(globalBloomFilter.testHash("var5".hashCode())).isTrue();
+            assertThat(globalBloomFilter.testHash("var6".hashCode())).isTrue();
+            assertThat(globalBloomFilter.testHash("var7".hashCode())).isTrue();
+            assertThat(globalBloomFilter.testHash("var8".hashCode())).isTrue();
+            assertThat(globalBloomFilter.testHash("var9".hashCode())).isTrue();
+            
assertThat(globalBloomFilter.testHash("var10".hashCode())).isTrue();
+        }
+    }
+
+    /**
+     * Test the case that all input local runtime filters are normal, but the 
merged global filter
+     * is over-max-row-count.
+     */
+    @Test
+    void testNormalInputAndOverMaxRowCountOutput() throws Exception {
+        try (StreamTaskMailboxTestHarness<RowData> testHarness =
+                createGlobalRuntimeFilterBuilderOperatorHarness(9)) {
+            // process elements
+            testHarness.processElement(
+                    new StreamRecord<RowData>(
+                            GenericRowData.of(5, 
BloomFilter.toBytes(createBloomFilter1()))));
+            testHarness.processElement(
+                    new StreamRecord<RowData>(
+                            GenericRowData.of(5, 
BloomFilter.toBytes(createBloomFilter2()))));
+            testHarness.processEvent(new EndOfData(StopMode.DRAIN), 0);
+
+            // test the output
+            Queue<Object> outputs = testHarness.getOutput();
+            assertThat(outputs.size()).isEqualTo(1);
+
+            RowData outputRowData = ((StreamRecord<RowData>) 
outputs.poll()).getValue();
+            assertThat(outputRowData.getArity()).isEqualTo(2);
+
+            int globalCount = outputRowData.getInt(0);
+            assertThat(globalCount).isEqualTo(OVER_MAX_ROW_COUNT);
+            assertThat(outputRowData.isNullAt(1)).isTrue();
+        }
+    }
+
+    /** Test the case that one of the input local runtime filters is 
over-max-row-count. */
+    @Test
+    void testOverMaxRowCountInput() throws Exception {
+        try (StreamTaskMailboxTestHarness<RowData> testHarness =
+                createGlobalRuntimeFilterBuilderOperatorHarness(10)) {
+            // process elements
+            testHarness.processElement(
+                    new StreamRecord<RowData>(
+                            GenericRowData.of(5, 
BloomFilter.toBytes(createBloomFilter1()))));
+            testHarness.processElement(
+                    new 
StreamRecord<RowData>(GenericRowData.of(OVER_MAX_ROW_COUNT, null)));
+            testHarness.processEvent(new EndOfData(StopMode.DRAIN), 0);
+
+            // test the output
+            Queue<Object> outputs = testHarness.getOutput();
+            assertThat(outputs.size()).isEqualTo(1);
+
+            RowData outputRowData = ((StreamRecord<RowData>) 
outputs.poll()).getValue();
+            assertThat(outputRowData.getArity()).isEqualTo(2);
+
+            int globalCount = outputRowData.getInt(0);
+            assertThat(globalCount).isEqualTo(OVER_MAX_ROW_COUNT);
+            assertThat(outputRowData.isNullAt(1)).isTrue();
+        }
+    }
+
+    private static BloomFilter createBloomFilter1() {
+        final BloomFilter bloomFilter1 = 
RuntimeFilterUtils.createOnHeapBloomFilter(5, 0.05);

Review Comment:
   The 0.05 has been used in multiple places, extract it to a static constant? 



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/runtimefilter/RuntimeFilterCodeGenerator.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.codegen.runtimefilter
+
+import org.apache.flink.runtime.operators.util.BloomFilter
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
OperatorCodeGenerator, ProjectionCodeGenerator}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{newName, ROW_DATA}
+import 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator.INPUT_SELECTION
+import org.apache.flink.table.planner.typeutils.RowTypeUtils
+import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
+import org.apache.flink.table.runtime.util.StreamRecordCollector
+import org.apache.flink.table.types.logical.RowType
+
+/** Operator code generator for runtime filter operator. */
+object RuntimeFilterCodeGenerator {
+  def gen(
+      ctx: CodeGeneratorContext,
+      buildType: RowType,
+      probeType: RowType,
+      probeIndices: Array[Int]): CodeGenOperatorFactory[RowData] = {
+    val probeGenProj = ProjectionCodeGenerator.generateProjection(
+      ctx,
+      "RuntimeFilterProjection",
+      probeType,
+      RowTypeUtils.projectRowType(probeType, probeIndices),
+      probeIndices)
+    ctx.addReusableInnerClass(probeGenProj.getClassName, probeGenProj.getCode)
+
+    val probeProjection = newName("probeToBinaryRow")
+    ctx.addReusableMember(s"private transient ${probeGenProj.getClassName} 
$probeProjection;")
+    val probeProjRefs = ctx.addReusableObject(probeGenProj.getReferences, 
"probeProjRefs", null)
+    ctx.addReusableOpenStatement(
+      s"$probeProjection = new ${probeGenProj.getClassName}($probeProjRefs);")
+
+    val buildEnd = newName("buildEnd")
+    ctx.addReusableMember(s"private transient boolean $buildEnd;")
+    ctx.addReusableOpenStatement(s"$buildEnd = false;")
+
+    val filter = newName("filter")
+    val filterClass = classOf[BloomFilter].getCanonicalName
+    ctx.addReusableMember(s"private transient $filterClass $filter;")
+
+    val collector = newName("collector")
+    val collectorClass = classOf[StreamRecordCollector[_]].getCanonicalName
+    ctx.addReusableMember(s"private transient $collectorClass<$ROW_DATA> 
$collector;")
+    ctx.addReusableOpenStatement(s"$collector = new 
$collectorClass<>(output);")
+
+    val input1Item = "input1"

Review Comment:
   You can use the default `DEFAULT_INPUT1_TERM` and `DEFAULT_INPUT2_TERM`



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/runtimefilter/RuntimeFilterCodeGenerator.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.codegen.runtimefilter
+
+import org.apache.flink.runtime.operators.util.BloomFilter
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
OperatorCodeGenerator, ProjectionCodeGenerator}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{newName, ROW_DATA}
+import 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator.INPUT_SELECTION
+import org.apache.flink.table.planner.typeutils.RowTypeUtils
+import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
+import org.apache.flink.table.runtime.util.StreamRecordCollector
+import org.apache.flink.table.types.logical.RowType
+
+/** Operator code generator for runtime filter operator. */
+object RuntimeFilterCodeGenerator {
+  def gen(
+      ctx: CodeGeneratorContext,
+      buildType: RowType,
+      probeType: RowType,
+      probeIndices: Array[Int]): CodeGenOperatorFactory[RowData] = {
+    val probeGenProj = ProjectionCodeGenerator.generateProjection(
+      ctx,
+      "RuntimeFilterProjection",
+      probeType,
+      RowTypeUtils.projectRowType(probeType, probeIndices),
+      probeIndices)
+    ctx.addReusableInnerClass(probeGenProj.getClassName, probeGenProj.getCode)
+
+    val probeProjection = newName("probeToBinaryRow")
+    ctx.addReusableMember(s"private transient ${probeGenProj.getClassName} 
$probeProjection;")
+    val probeProjRefs = ctx.addReusableObject(probeGenProj.getReferences, 
"probeProjRefs", null)
+    ctx.addReusableOpenStatement(
+      s"$probeProjection = new ${probeGenProj.getClassName}($probeProjRefs);")
+
+    val buildEnd = newName("buildEnd")
+    ctx.addReusableMember(s"private transient boolean $buildEnd;")
+    ctx.addReusableOpenStatement(s"$buildEnd = false;")
+
+    val filter = newName("filter")
+    val filterClass = classOf[BloomFilter].getCanonicalName
+    ctx.addReusableMember(s"private transient $filterClass $filter;")
+
+    val collector = newName("collector")
+    val collectorClass = classOf[StreamRecordCollector[_]].getCanonicalName
+    ctx.addReusableMember(s"private transient $collectorClass<$ROW_DATA> 
$collector;")
+    ctx.addReusableOpenStatement(s"$collector = new 
$collectorClass<>(output);")
+
+    val input1Item = "input1"
+    val input2Item = "input2"
+
+    val processElement1Code =
+      s"""
+         |if ($buildEnd) {
+         |    throw new IllegalStateException("Should not build ended.");
+         |}
+         |if ($filter == null && !$input1Item.isNullAt(1)) {
+         |    $filter = $filterClass.fromBytes($input1Item.getBinary(1));
+         |}
+         |""".stripMargin
+
+    val processElement2Code =
+      s"""
+         |if (!$buildEnd) {
+         |    throw new IllegalStateException("Should build ended.");
+         |}
+         |if ($filter != null) {
+         |    final int hashCode = 
$probeProjection.apply($input2Item).hashCode();
+         |    if ($filter.testHash(hashCode)) {
+         |        $collector.collect($input2Item);
+         |    }
+         |} else {
+         |    $collector.collect($input2Item);
+         |}
+         |""".stripMargin
+
+    val nextSelectionCode = s"return $buildEnd ? $INPUT_SELECTION.SECOND : 
$INPUT_SELECTION.FIRST;"
+
+    val endInputCode1 =
+      s"""
+         |if ($buildEnd) {
+         |    throw new IllegalStateException("Should not build ended.");
+         |}
+         |LOG.info("Finish build phase.");

Review Comment:
   BloomFilter build completed.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/runtimefilter/RuntimeFilterCodeGenerator.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.codegen.runtimefilter
+
+import org.apache.flink.runtime.operators.util.BloomFilter
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
OperatorCodeGenerator, ProjectionCodeGenerator}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{newName, ROW_DATA}
+import 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator.INPUT_SELECTION
+import org.apache.flink.table.planner.typeutils.RowTypeUtils
+import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
+import org.apache.flink.table.runtime.util.StreamRecordCollector
+import org.apache.flink.table.types.logical.RowType
+
+/** Operator code generator for runtime filter operator. */
+object RuntimeFilterCodeGenerator {
+  def gen(
+      ctx: CodeGeneratorContext,
+      buildType: RowType,
+      probeType: RowType,
+      probeIndices: Array[Int]): CodeGenOperatorFactory[RowData] = {
+    val probeGenProj = ProjectionCodeGenerator.generateProjection(
+      ctx,
+      "RuntimeFilterProjection",
+      probeType,
+      RowTypeUtils.projectRowType(probeType, probeIndices),
+      probeIndices)
+    ctx.addReusableInnerClass(probeGenProj.getClassName, probeGenProj.getCode)
+
+    val probeProjection = newName("probeToBinaryRow")
+    ctx.addReusableMember(s"private transient ${probeGenProj.getClassName} 
$probeProjection;")
+    val probeProjRefs = ctx.addReusableObject(probeGenProj.getReferences, 
"probeProjRefs", null)
+    ctx.addReusableOpenStatement(
+      s"$probeProjection = new ${probeGenProj.getClassName}($probeProjRefs);")
+
+    val buildEnd = newName("buildEnd")
+    ctx.addReusableMember(s"private transient boolean $buildEnd;")
+    ctx.addReusableOpenStatement(s"$buildEnd = false;")
+
+    val filter = newName("filter")
+    val filterClass = classOf[BloomFilter].getCanonicalName
+    ctx.addReusableMember(s"private transient $filterClass $filter;")
+
+    val collector = newName("collector")
+    val collectorClass = classOf[StreamRecordCollector[_]].getCanonicalName
+    ctx.addReusableMember(s"private transient $collectorClass<$ROW_DATA> 
$collector;")
+    ctx.addReusableOpenStatement(s"$collector = new 
$collectorClass<>(output);")
+
+    val input1Item = "input1"
+    val input2Item = "input2"
+
+    val processElement1Code =
+      s"""
+         |if ($buildEnd) {
+         |    throw new IllegalStateException("Should not build ended.");
+         |}
+         |if ($filter == null && !$input1Item.isNullAt(1)) {
+         |    $filter = $filterClass.fromBytes($input1Item.getBinary(1));
+         |}
+         |""".stripMargin
+
+    val processElement2Code =
+      s"""
+         |if (!$buildEnd) {
+         |    throw new IllegalStateException("Should build ended.");
+         |}
+         |if ($filter != null) {
+         |    final int hashCode = 
$probeProjection.apply($input2Item).hashCode();
+         |    if ($filter.testHash(hashCode)) {
+         |        $collector.collect($input2Item);
+         |    }
+         |} else {
+         |    $collector.collect($input2Item);
+         |}
+         |""".stripMargin
+
+    val nextSelectionCode = s"return $buildEnd ? $INPUT_SELECTION.SECOND : 
$INPUT_SELECTION.FIRST;"
+
+    val endInputCode1 =
+      s"""
+         |if ($buildEnd) {
+         |    throw new IllegalStateException("Should not build ended.");
+         |}
+         |LOG.info("Finish build phase.");
+         |$buildEnd = true;
+         |""".stripMargin
+
+    val endInputCode2 =
+      s"""
+         |if (!$buildEnd) {
+         |    throw new IllegalStateException("Should build ended.");
+         |}
+         |LOG.info("Finish probe phase.");

Review Comment:
   Finish BloomFilter probe phase.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/runtimefilter/RuntimeFilterCodeGenerator.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.codegen.runtimefilter
+
+import org.apache.flink.runtime.operators.util.BloomFilter
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
OperatorCodeGenerator, ProjectionCodeGenerator}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{newName, ROW_DATA}
+import 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator.INPUT_SELECTION
+import org.apache.flink.table.planner.typeutils.RowTypeUtils
+import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
+import org.apache.flink.table.runtime.util.StreamRecordCollector
+import org.apache.flink.table.types.logical.RowType
+
+/** Operator code generator for runtime filter operator. */
+object RuntimeFilterCodeGenerator {
+  def gen(
+      ctx: CodeGeneratorContext,
+      buildType: RowType,
+      probeType: RowType,
+      probeIndices: Array[Int]): CodeGenOperatorFactory[RowData] = {
+    val probeGenProj = ProjectionCodeGenerator.generateProjection(
+      ctx,
+      "RuntimeFilterProjection",
+      probeType,
+      RowTypeUtils.projectRowType(probeType, probeIndices),
+      probeIndices)
+    ctx.addReusableInnerClass(probeGenProj.getClassName, probeGenProj.getCode)
+
+    val probeProjection = newName("probeToBinaryRow")
+    ctx.addReusableMember(s"private transient ${probeGenProj.getClassName} 
$probeProjection;")
+    val probeProjRefs = ctx.addReusableObject(probeGenProj.getReferences, 
"probeProjRefs", null)
+    ctx.addReusableOpenStatement(
+      s"$probeProjection = new ${probeGenProj.getClassName}($probeProjRefs);")
+
+    val buildEnd = newName("buildEnd")
+    ctx.addReusableMember(s"private transient boolean $buildEnd;")
+    ctx.addReusableOpenStatement(s"$buildEnd = false;")
+
+    val filter = newName("filter")
+    val filterClass = classOf[BloomFilter].getCanonicalName

Review Comment:
       val filterClassTerm = className[BloomFilter]
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to