lincoln-lil commented on code in PR #20393: URL: https://github.com/apache/flink/pull/20393#discussion_r935658039
########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalIncrementalGroupAggregate.scala: ########## @@ -62,14 +62,19 @@ class StreamPhysicalIncrementalGroupAggregate( inputRel: RelNode, val partialAggGrouping: Array[Int], val partialAggCalls: Array[AggregateCall], - val finalAggGrouping: Array[Int], Review Comment: Do you mean the attribute name in `StreamPhysicalGroupAggregateBase`? I was thinking the partial-final is the internal concept for two-phase aggregate, so tend to omit the 'final' prefix in the base class which I think can both represents grouping in one-phase agg and distinguishes from partialGroupings in two-phase agg. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala: ########## @@ -85,6 +87,19 @@ class RelTreeWriterImpl( case _ => // ignore } + if (withUpsertKey) rel match { + case streamRel: StreamPhysicalRel => + val fmq = FlinkRelMetadataQuery.reuseOrCreate(rel.getCluster.getMetadataQuery) + val upsertKeys = fmq.getUpsertKeys(streamRel) + if (null != upsertKeys && !upsertKeys.isEmpty) { + printValues.add( + Pair.of("upsertKeys", upsertKeys.map(bitset => bitset.toString).mkString(", "))) Review Comment: The previous idea using index for simplicity since there already has field names, but upsertKey usually not too long, I'll update it ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala: ########## @@ -585,6 +585,50 @@ object FlinkRexUtil { } false } + + /** + * Returns whether a given expression is deterministic in streaming scenario, differs from + * calcite's [[RexUtil]], it considers both non-deterministic and dynamic functions. + */ + def isDeterministicInStreaming(e: RexNode): Boolean = try { Review Comment: I want to highlight it to avoid some one using it in batch processing due to the different semantics on dynamic function. I'll add separate tests for it. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala: ########## @@ -228,17 +229,29 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { join: CommonPhysicalLookupJoin, mq: RelMetadataQuery): util.Set[ImmutableBitSet] = { val left = join.getInput - val leftKeys = FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(left) val leftType = left.getRowType val leftJoinKeys = join.joinInfo.leftSet + // differs from regular join, here we do not filterKeys because there's no shuffle on join keys + // by default. + val leftUpsertKeys = FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(left) + val rightUniqueKeys = FlinkRelMdUniqueKeys.INSTANCE.getUniqueKeysOfTemporalTable(join) + + val rightUpsertKeys = + if ( + (join.remainingCondition.isDefined && !FlinkRexUtil.isDeterministicInStreaming( + join.remainingCondition.get)) + || (join.calcOnTemporalTable.isDefined && !FlinkRexUtil.isDeterministicInStreaming( Review Comment: good catch! I'll re-split the commits for next updates ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala: ########## @@ -279,6 +280,83 @@ abstract class CommonPhysicalLookupJoin( constantLookupKeys.toMap[Int, LookupKey] ++ fieldRefLookupKeys.toMap[Int, LookupKey] } + /** Check if lookup key contains primary key, include constant lookup keys. */ + def lookupKeyContainsPrimaryKey(): Boolean = { + val outputPkIdx = getOutputPrimaryKeyIndexes + // use allLookupKeys instead of joinInfo.rightSet because there may exists constant + // lookup key(s) which are not included in joinInfo.rightKeys. + outputPkIdx.nonEmpty && outputPkIdx.forall(index => allLookupKeys.contains(index)) + } + + /** Get final output pk indexes if exists, otherwise will get empty. */ + def getOutputPrimaryKeyIndexes: Array[Int] = { Review Comment: this output indexes is the final one which may has a project inside, not directly on a temporal table, so I didn't add the temporalTable into the name. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala: ########## @@ -80,7 +81,8 @@ object FlinkRelOptUtil { withIdPrefix, withChangelogTraits, withRowType, - withTreeStyle = true) + withTreeStyle = true, + withUpsertKey) Review Comment: I had the same feeling at first, but all attributes in `RelTreeWriterImpl` has default values, so I appended the new `withUpsertKey` to the last position of `RelTreeWriterImpl`. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java: ########## @@ -142,4 +142,33 @@ public class OptimizerConfigOptions { .withDescription( "When it is true, the optimizer will merge the operators with pipelined shuffling " + "into a multiple input operator to reduce shuffling and improve performance. Default value is true."); + + @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) + public static final ConfigOption<NonDeterministicUpdateHandling> + TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_HANDLING = + key("table.optimizer.non-deterministic-update.handling") Review Comment: This option aimed for resolving all NDU (non deterministic update) problems in the long run, lookup join has the first priority in the goal. For naming, the main consideration is similar to error-handling, NDU is also one kind of errors, so I prefer handling than strategy, WDYT? ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala: ########## @@ -85,6 +87,19 @@ class RelTreeWriterImpl( case _ => // ignore } + if (withUpsertKey) rel match { Review Comment: Ok ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalIncrementalGroupAggregate.scala: ########## @@ -62,14 +62,19 @@ class StreamPhysicalIncrementalGroupAggregate( inputRel: RelNode, val partialAggGrouping: Array[Int], val partialAggCalls: Array[AggregateCall], - val finalAggGrouping: Array[Int], Review Comment: Do you mean the attribute name in `StreamPhysicalGroupAggregateBase`? I was thinking the partial-final is the internal concept for two-phase aggregate, so tend to omit the 'final' prefix in the base class which I think can both represents grouping in one-phase agg and distinguishes from partialGroupings in two-phase agg. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalJoin.scala: ########## @@ -80,4 +81,14 @@ abstract class CommonPhysicalJoin( .item("select", getRowType.getFieldNames.mkString(", ")) } + def getUniqueKeys(input: RelNode, keys: Array[Int]): List[Array[Int]] = { Review Comment: this method was moved from `StreamPhysicalJoin`, so I didn't change the original name (there're several related methods named `xxUniqueKeys`, maybe better refact this in another pr) ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala: ########## @@ -279,6 +280,83 @@ abstract class CommonPhysicalLookupJoin( constantLookupKeys.toMap[Int, LookupKey] ++ fieldRefLookupKeys.toMap[Int, LookupKey] } + /** Check if lookup key contains primary key, include constant lookup keys. */ + def lookupKeyContainsPrimaryKey(): Boolean = { + val outputPkIdx = getOutputPrimaryKeyIndexes + // use allLookupKeys instead of joinInfo.rightSet because there may exists constant + // lookup key(s) which are not included in joinInfo.rightKeys. + outputPkIdx.nonEmpty && outputPkIdx.forall(index => allLookupKeys.contains(index)) + } + + /** Get final output pk indexes if exists, otherwise will get empty. */ + def getOutputPrimaryKeyIndexes: Array[Int] = { + val temporalPkIdxs = getPrimaryKeyIndexesOfTemporalTable + + val outputPkIdx = if (!temporalPkIdxs.isEmpty) { + if (calcOnTemporalTable.isDefined) { + val program = calcOnTemporalTable.get + val outputMapping = program.getProjectList.asScala.zipWithIndex + .map { case (ref, index) => (index, program.expandLocalRef(ref)) } + .map { + case (outIndex, ref) => + ref match { + case inputRef: RexInputRef => (inputRef.getIndex, outIndex) + case _ => (-1, -1) + } + } + .toMap + val outputPk = temporalPkIdxs.forall(outputMapping.contains) + if (outputPk) { + // remapping pk index + temporalPkIdxs.map(outputMapping.get(_).get) + } else { + Array[Int]() + } + } else { + temporalPkIdxs + } + } else { + // temporal table has no pk, no uk produces + Array[Int]() + } + + outputPkIdx + } + + private def getPrimaryKeyIndexesOfTemporalTable: Array[Int] = { + // get primary key columns of lookup table if exists + val pkColumns = getPrimaryKeyColumnsOfTemporalTable + if (pkColumns.isDefined) { + val newSchema = temporalTable.getRowType.getFieldNames + pkColumns.get.toArray().map(col => newSchema.indexOf(col)) + } else { + Array[Int]() + } + } + + private def getPrimaryKeyColumnsOfTemporalTable: Option[util.List[String]] = { + val schema = temporalTable match { + case t: TableSourceTable => + TableSchema.fromResolvedSchema(t.contextResolvedTable.getResolvedSchema) Review Comment: This is mainly due to the desire to obtain pk in a uniform way, but not found proper method on the `LegacyTableSourceTable`. We can switch to use ResolvedSchema after `LegacyTableSourceTable` was removed ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala: ########## @@ -228,17 +229,29 @@ class FlinkRelMdUpsertKeys private extends MetadataHandler[UpsertKeys] { join: CommonPhysicalLookupJoin, mq: RelMetadataQuery): util.Set[ImmutableBitSet] = { val left = join.getInput - val leftKeys = FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(left) val leftType = left.getRowType val leftJoinKeys = join.joinInfo.leftSet + // differs from regular join, here we do not filterKeys because there's no shuffle on join keys + // by default. + val leftUpsertKeys = FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(left) + val rightUniqueKeys = FlinkRelMdUniqueKeys.INSTANCE.getUniqueKeysOfTemporalTable(join) + + val rightUpsertKeys = + if ( + (join.remainingCondition.isDefined && !FlinkRexUtil.isDeterministicInStreaming( Review Comment: make sense to me ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala: ########## @@ -488,16 +488,36 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu ignoreNulls: Boolean): util.Set[ImmutableBitSet] = { val left = join.getInput val leftUniqueKeys = mq.getUniqueKeys(left, ignoreNulls) - val leftType = left.getRowType - getJoinUniqueKeys( - join.joinType, - leftType, - leftUniqueKeys, - null, - mq.areColumnsUnique(left, join.joinInfo.leftSet, ignoreNulls), - // TODO get uniqueKeys from TableSchema of TableSource + + if (leftUniqueKeys != null) { Review Comment: ok ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala: ########## @@ -488,16 +488,36 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu ignoreNulls: Boolean): util.Set[ImmutableBitSet] = { val left = join.getInput val leftUniqueKeys = mq.getUniqueKeys(left, ignoreNulls) - val leftType = left.getRowType - getJoinUniqueKeys( - join.joinType, - leftType, - leftUniqueKeys, - null, - mq.areColumnsUnique(left, join.joinInfo.leftSet, ignoreNulls), - // TODO get uniqueKeys from TableSchema of TableSource + + if (leftUniqueKeys != null) { + val rightUniqueKeys = getUniqueKeysOfTemporalTable(join) + + getJoinUniqueKeys( + join.joinType, + left.getRowType, + leftUniqueKeys, + rightUniqueKeys, + mq.areColumnsUnique(left, join.joinInfo.leftSet, ignoreNulls), + rightUniqueKeys != null) + } else { null - ) + } + } + + private[flink] def getUniqueKeysOfTemporalTable( Review Comment: The expected return values are different here, it'll be a little obscure to use 'JSet[ImmutableBitSet] != null' for representing lookupKeyContainsPrimaryKey of lookup join if unified to this method. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala: ########## @@ -279,6 +280,83 @@ abstract class CommonPhysicalLookupJoin( constantLookupKeys.toMap[Int, LookupKey] ++ fieldRefLookupKeys.toMap[Int, LookupKey] } + /** Check if lookup key contains primary key, include constant lookup keys. */ + def lookupKeyContainsPrimaryKey(): Boolean = { + val outputPkIdx = getOutputPrimaryKeyIndexes + // use allLookupKeys instead of joinInfo.rightSet because there may exists constant + // lookup key(s) which are not included in joinInfo.rightKeys. + outputPkIdx.nonEmpty && outputPkIdx.forall(index => allLookupKeys.contains(index)) + } + + /** Get final output pk indexes if exists, otherwise will get empty. */ + def getOutputPrimaryKeyIndexes: Array[Int] = { + val temporalPkIdxs = getPrimaryKeyIndexesOfTemporalTable + + val outputPkIdx = if (!temporalPkIdxs.isEmpty) { + if (calcOnTemporalTable.isDefined) { Review Comment: make sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org