godfreyhe commented on code in PR #20324: URL: https://github.com/apache/flink/pull/20324#discussion_r939599197
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java: ########## @@ -329,8 +340,59 @@ private Transformation<RowData> createSyncLookupJoinWithState( isLeftOuterJoin, isObjectReuseEnabled); - // TODO then wrapper it into a keyed lookup function with state FLINK-28568 - throw new UnsupportedOperationException("to be supported"); + KeyedLookupJoinWrapper keyedLookupJoinWrapper = + new KeyedLookupJoinWrapper( + (LookupJoinRunner) processFunction, + StateConfigUtil.createTtlConfig( + config.get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis()), + InternalSerializers.create(tableSourceRowType), + lookupKeyContainsPrimaryKey); + + KeyedProcessOperator<RowData, RowData, RowData> operator = + new KeyedProcessOperator<>(keyedLookupJoinWrapper); + + List<Integer> refKeys = + allLookupKeys.entrySet().stream() Review Comment: use `allLookupKeys.values()` can avoid `key.getValue()` ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/collector/ListenableCollector.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.collector; + +import org.apache.flink.annotation.Internal; + +import java.util.Optional; + +/** + * A listenable collector for lookup join that can be called when an original record was collected. + */ +@Internal +public abstract class ListenableCollector<T> extends TableFunctionCollector<T> { + private CollectListener<T> collectListener; Review Comment: we should mark it as `@Nullable` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java: ########## @@ -329,8 +340,59 @@ private Transformation<RowData> createSyncLookupJoinWithState( isLeftOuterJoin, isObjectReuseEnabled); - // TODO then wrapper it into a keyed lookup function with state FLINK-28568 - throw new UnsupportedOperationException("to be supported"); + KeyedLookupJoinWrapper keyedLookupJoinWrapper = + new KeyedLookupJoinWrapper( + (LookupJoinRunner) processFunction, + StateConfigUtil.createTtlConfig( + config.get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis()), + InternalSerializers.create(tableSourceRowType), + lookupKeyContainsPrimaryKey); + + KeyedProcessOperator<RowData, RowData, RowData> operator = + new KeyedProcessOperator<>(keyedLookupJoinWrapper); + + List<Integer> refKeys = + allLookupKeys.entrySet().stream() + .filter( Review Comment: directly filter the `FieldRefLookupKey`s using `filter(key -> (key.getValue() instanceof LookupJoinUtil.FieldRefLookupKey))` ? ########## flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.java: ########## @@ -238,13 +239,16 @@ public void flatMap(RowData value, Collector<RowData> out) throws Exception { * The {@link TestingFetcherCollector} is a simple implementation of {@link * TableFunctionCollector} which combines left and right into a JoinedRowData. */ - public static final class TestingFetcherCollector extends TableFunctionCollector { + public static final class TestingFetcherCollector extends ListenableCollector { Review Comment: The type should be `ListenableCollector<RowData>` ? ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala: ########## @@ -268,6 +268,35 @@ class AsyncLookupJoinITCase( assertEquals(expected.sorted, sink.getRetractResults.sorted) } + @Test + def testAggAndAsyncLeftJoinWithTryResolveMode(): Unit = { Review Comment: Do we have any IT case to verify the change? this pr aims to support sync LookupJoin ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java: ########## @@ -329,8 +340,59 @@ private Transformation<RowData> createSyncLookupJoinWithState( isLeftOuterJoin, isObjectReuseEnabled); - // TODO then wrapper it into a keyed lookup function with state FLINK-28568 - throw new UnsupportedOperationException("to be supported"); + KeyedLookupJoinWrapper keyedLookupJoinWrapper = + new KeyedLookupJoinWrapper( + (LookupJoinRunner) processFunction, + StateConfigUtil.createTtlConfig( + config.get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis()), + InternalSerializers.create(tableSourceRowType), + lookupKeyContainsPrimaryKey); + + KeyedProcessOperator<RowData, RowData, RowData> operator = + new KeyedProcessOperator<>(keyedLookupJoinWrapper); + + List<Integer> refKeys = + allLookupKeys.entrySet().stream() + .filter( + key -> + !(key.getValue() + instanceof LookupJoinUtil.ConstantLookupKey)) + .map(key -> ((LookupJoinUtil.FieldRefLookupKey) key.getValue()).index) + .collect(Collectors.toList()); + RowDataKeySelector keySelector; + + int parallelism = inputTransformation.getParallelism(); + if (refKeys.isEmpty()) { + // all lookup keys are constants, then use an empty key selector + keySelector = EmptyRowDataKeySelector.INSTANCE; + // single parallelism for empty key shuffle + parallelism = 1; + } else { + // make it a deterministic asc order + Collections.sort(refKeys); + keySelector = + KeySelectorUtil.getRowDataSelector( + classLoader, + refKeys.stream().mapToInt(Integer::intValue).toArray(), + InternalTypeInfo.of(inputRowType)); + } + final KeyGroupStreamPartitioner<RowData, RowData> partitioner = + new KeyGroupStreamPartitioner<>( + keySelector, KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM); + Transformation<RowData> partitionedTransform = + new PartitionTransformation<>(inputTransformation, partitioner); + partitionedTransform.setParallelism(parallelism); + + OneInputTransformation<RowData, RowData> transform = + ExecNodeUtil.createOneInputTransformation( + partitionedTransform, + createTransformationMeta(LOOKUP_JOIN_TRANSFORMATION, config), Review Comment: we should define another operatorName for the Transformation, Because LOOKUP_JOIN_TRANSFORMATION has used for the join Transformation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org