lincoln-lil commented on code in PR #20393:
URL: https://github.com/apache/flink/pull/20393#discussion_r935658039


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalIncrementalGroupAggregate.scala:
##########
@@ -62,14 +62,19 @@ class StreamPhysicalIncrementalGroupAggregate(
     inputRel: RelNode,
     val partialAggGrouping: Array[Int],
     val partialAggCalls: Array[AggregateCall],
-    val finalAggGrouping: Array[Int],

Review Comment:
   Do you mean the attribute name in `StreamPhysicalGroupAggregateBase`?
   I was thinking the partial-final is the internal concept for two-phase 
aggregate, so tend to omit the 'final' prefix in the base class which I think 
can both represents
   grouping in one-phase agg and distinguishes from partialGroupings in 
two-phase agg.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala:
##########
@@ -85,6 +87,19 @@ class RelTreeWriterImpl(
       case _ => // ignore
     }
 
+    if (withUpsertKey) rel match {
+      case streamRel: StreamPhysicalRel =>
+        val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(rel.getCluster.getMetadataQuery)
+        val upsertKeys = fmq.getUpsertKeys(streamRel)
+        if (null != upsertKeys && !upsertKeys.isEmpty) {
+          printValues.add(
+            Pair.of("upsertKeys", upsertKeys.map(bitset => 
bitset.toString).mkString(", ")))

Review Comment:
   The previous idea using index for simplicity since there already has field 
names, but upsertKey usually not too long, I'll update it



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala:
##########
@@ -585,6 +585,50 @@ object FlinkRexUtil {
     }
     false
   }
+
+  /**
+   * Returns whether a given expression is deterministic in streaming 
scenario, differs from
+   * calcite's [[RexUtil]], it considers both non-deterministic and dynamic 
functions.
+   */
+  def isDeterministicInStreaming(e: RexNode): Boolean = try {

Review Comment:
   I want to highlight it to avoid some one using it in batch processing due to 
the different semantics on dynamic function.
   I'll add separate tests for it.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##########
@@ -228,17 +229,29 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
       join: CommonPhysicalLookupJoin,
       mq: RelMetadataQuery): util.Set[ImmutableBitSet] = {
     val left = join.getInput
-    val leftKeys = FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(left)
     val leftType = left.getRowType
     val leftJoinKeys = join.joinInfo.leftSet
+    // differs from regular join, here we do not filterKeys because there's no 
shuffle on join keys
+    // by default.
+    val leftUpsertKeys = 
FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(left)
+    val rightUniqueKeys = 
FlinkRelMdUniqueKeys.INSTANCE.getUniqueKeysOfTemporalTable(join)
+
+    val rightUpsertKeys =
+      if (
+        (join.remainingCondition.isDefined && 
!FlinkRexUtil.isDeterministicInStreaming(
+          join.remainingCondition.get))
+        || (join.calcOnTemporalTable.isDefined && 
!FlinkRexUtil.isDeterministicInStreaming(

Review Comment:
   good catch! I'll re-split the commits for next updates



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala:
##########
@@ -279,6 +280,83 @@ abstract class CommonPhysicalLookupJoin(
     constantLookupKeys.toMap[Int, LookupKey] ++ fieldRefLookupKeys.toMap[Int, 
LookupKey]
   }
 
+  /** Check if lookup key contains primary key, include constant lookup keys. 
*/
+  def lookupKeyContainsPrimaryKey(): Boolean = {
+    val outputPkIdx = getOutputPrimaryKeyIndexes
+    // use allLookupKeys instead of joinInfo.rightSet because there may exists 
constant
+    // lookup key(s) which are not included in joinInfo.rightKeys.
+    outputPkIdx.nonEmpty && outputPkIdx.forall(index => 
allLookupKeys.contains(index))
+  }
+
+  /** Get final output pk indexes if exists, otherwise will get empty. */
+  def getOutputPrimaryKeyIndexes: Array[Int] = {

Review Comment:
   this output indexes is the final one which may has a project inside, not 
directly on a temporal table, so I didn't add the temporalTable into the name.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala:
##########
@@ -80,7 +81,8 @@ object FlinkRelOptUtil {
       withIdPrefix,
       withChangelogTraits,
       withRowType,
-      withTreeStyle = true)
+      withTreeStyle = true,
+      withUpsertKey)

Review Comment:
   I had the same feeling at first, but all attributes in `RelTreeWriterImpl` 
has default values, so I appended the new `withUpsertKey` to the last position 
of `RelTreeWriterImpl`.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java:
##########
@@ -142,4 +142,33 @@ public class OptimizerConfigOptions {
                     .withDescription(
                             "When it is true, the optimizer will merge the 
operators with pipelined shuffling "
                                     + "into a multiple input operator to 
reduce shuffling and improve performance. Default value is true.");
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<NonDeterministicUpdateHandling>
+            TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_HANDLING =
+                    key("table.optimizer.non-deterministic-update.handling")

Review Comment:
   This option aimed for resolving all NDU (non deterministic update) problems 
in the long run, lookup join has the first priority in the goal. 
   For naming, the main consideration is similar to error-handling, NDU is also 
one kind of errors, so I prefer handling than strategy, WDYT?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala:
##########
@@ -85,6 +87,19 @@ class RelTreeWriterImpl(
       case _ => // ignore
     }
 
+    if (withUpsertKey) rel match {

Review Comment:
   Ok



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalIncrementalGroupAggregate.scala:
##########
@@ -62,14 +62,19 @@ class StreamPhysicalIncrementalGroupAggregate(
     inputRel: RelNode,
     val partialAggGrouping: Array[Int],
     val partialAggCalls: Array[AggregateCall],
-    val finalAggGrouping: Array[Int],

Review Comment:
   Do you mean the attribute name in `StreamPhysicalGroupAggregateBase`?
   I was thinking the partial-final is the internal concept for two-phase 
aggregate, so tend to omit the 'final' prefix in the base class which I think 
can both represents
   grouping in one-phase agg and distinguishes from partialGroupings in 
two-phase agg.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalJoin.scala:
##########
@@ -80,4 +81,14 @@ abstract class CommonPhysicalJoin(
       .item("select", getRowType.getFieldNames.mkString(", "))
   }
 
+  def getUniqueKeys(input: RelNode, keys: Array[Int]): List[Array[Int]] = {

Review Comment:
   this method was moved from `StreamPhysicalJoin`, so I didn't change the 
original name (there're several related methods named `xxUniqueKeys`, maybe 
better refact this in another pr)



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala:
##########
@@ -279,6 +280,83 @@ abstract class CommonPhysicalLookupJoin(
     constantLookupKeys.toMap[Int, LookupKey] ++ fieldRefLookupKeys.toMap[Int, 
LookupKey]
   }
 
+  /** Check if lookup key contains primary key, include constant lookup keys. 
*/
+  def lookupKeyContainsPrimaryKey(): Boolean = {
+    val outputPkIdx = getOutputPrimaryKeyIndexes
+    // use allLookupKeys instead of joinInfo.rightSet because there may exists 
constant
+    // lookup key(s) which are not included in joinInfo.rightKeys.
+    outputPkIdx.nonEmpty && outputPkIdx.forall(index => 
allLookupKeys.contains(index))
+  }
+
+  /** Get final output pk indexes if exists, otherwise will get empty. */
+  def getOutputPrimaryKeyIndexes: Array[Int] = {
+    val temporalPkIdxs = getPrimaryKeyIndexesOfTemporalTable
+
+    val outputPkIdx = if (!temporalPkIdxs.isEmpty) {
+      if (calcOnTemporalTable.isDefined) {
+        val program = calcOnTemporalTable.get
+        val outputMapping = program.getProjectList.asScala.zipWithIndex
+          .map { case (ref, index) => (index, program.expandLocalRef(ref)) }
+          .map {
+            case (outIndex, ref) =>
+              ref match {
+                case inputRef: RexInputRef => (inputRef.getIndex, outIndex)
+                case _ => (-1, -1)
+              }
+          }
+          .toMap
+        val outputPk = temporalPkIdxs.forall(outputMapping.contains)
+        if (outputPk) {
+          // remapping pk index
+          temporalPkIdxs.map(outputMapping.get(_).get)
+        } else {
+          Array[Int]()
+        }
+      } else {
+        temporalPkIdxs
+      }
+    } else {
+      // temporal table has no pk, no uk produces
+      Array[Int]()
+    }
+
+    outputPkIdx
+  }
+
+  private def getPrimaryKeyIndexesOfTemporalTable: Array[Int] = {
+    // get primary key columns of lookup table if exists
+    val pkColumns = getPrimaryKeyColumnsOfTemporalTable
+    if (pkColumns.isDefined) {
+      val newSchema = temporalTable.getRowType.getFieldNames
+      pkColumns.get.toArray().map(col => newSchema.indexOf(col))
+    } else {
+      Array[Int]()
+    }
+  }
+
+  private def getPrimaryKeyColumnsOfTemporalTable: Option[util.List[String]] = 
{
+    val schema = temporalTable match {
+      case t: TableSourceTable =>
+        
TableSchema.fromResolvedSchema(t.contextResolvedTable.getResolvedSchema)

Review Comment:
   This is mainly due to the desire to obtain pk in a uniform way, but not 
found proper method on the `LegacyTableSourceTable`. We can switch to use 
ResolvedSchema after `LegacyTableSourceTable` was removed



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala:
##########
@@ -228,17 +229,29 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
       join: CommonPhysicalLookupJoin,
       mq: RelMetadataQuery): util.Set[ImmutableBitSet] = {
     val left = join.getInput
-    val leftKeys = FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(left)
     val leftType = left.getRowType
     val leftJoinKeys = join.joinInfo.leftSet
+    // differs from regular join, here we do not filterKeys because there's no 
shuffle on join keys
+    // by default.
+    val leftUpsertKeys = 
FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(left)
+    val rightUniqueKeys = 
FlinkRelMdUniqueKeys.INSTANCE.getUniqueKeysOfTemporalTable(join)
+
+    val rightUpsertKeys =
+      if (
+        (join.remainingCondition.isDefined && 
!FlinkRexUtil.isDeterministicInStreaming(

Review Comment:
   make sense to me



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala:
##########
@@ -488,16 +488,36 @@ class FlinkRelMdUniqueKeys private extends 
MetadataHandler[BuiltInMetadata.Uniqu
       ignoreNulls: Boolean): util.Set[ImmutableBitSet] = {
     val left = join.getInput
     val leftUniqueKeys = mq.getUniqueKeys(left, ignoreNulls)
-    val leftType = left.getRowType
-    getJoinUniqueKeys(
-      join.joinType,
-      leftType,
-      leftUniqueKeys,
-      null,
-      mq.areColumnsUnique(left, join.joinInfo.leftSet, ignoreNulls),
-      // TODO get uniqueKeys from TableSchema of TableSource
+
+    if (leftUniqueKeys != null) {

Review Comment:
   ok



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala:
##########
@@ -488,16 +488,36 @@ class FlinkRelMdUniqueKeys private extends 
MetadataHandler[BuiltInMetadata.Uniqu
       ignoreNulls: Boolean): util.Set[ImmutableBitSet] = {
     val left = join.getInput
     val leftUniqueKeys = mq.getUniqueKeys(left, ignoreNulls)
-    val leftType = left.getRowType
-    getJoinUniqueKeys(
-      join.joinType,
-      leftType,
-      leftUniqueKeys,
-      null,
-      mq.areColumnsUnique(left, join.joinInfo.leftSet, ignoreNulls),
-      // TODO get uniqueKeys from TableSchema of TableSource
+
+    if (leftUniqueKeys != null) {
+      val rightUniqueKeys = getUniqueKeysOfTemporalTable(join)
+
+      getJoinUniqueKeys(
+        join.joinType,
+        left.getRowType,
+        leftUniqueKeys,
+        rightUniqueKeys,
+        mq.areColumnsUnique(left, join.joinInfo.leftSet, ignoreNulls),
+        rightUniqueKeys != null)
+    } else {
       null
-    )
+    }
+  }
+
+  private[flink] def getUniqueKeysOfTemporalTable(

Review Comment:
   The expected return values are different here, it'll be a little obscure to 
use 'JSet[ImmutableBitSet] != null' for representing 
lookupKeyContainsPrimaryKey of lookup join if unified to this method.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala:
##########
@@ -279,6 +280,83 @@ abstract class CommonPhysicalLookupJoin(
     constantLookupKeys.toMap[Int, LookupKey] ++ fieldRefLookupKeys.toMap[Int, 
LookupKey]
   }
 
+  /** Check if lookup key contains primary key, include constant lookup keys. 
*/
+  def lookupKeyContainsPrimaryKey(): Boolean = {
+    val outputPkIdx = getOutputPrimaryKeyIndexes
+    // use allLookupKeys instead of joinInfo.rightSet because there may exists 
constant
+    // lookup key(s) which are not included in joinInfo.rightKeys.
+    outputPkIdx.nonEmpty && outputPkIdx.forall(index => 
allLookupKeys.contains(index))
+  }
+
+  /** Get final output pk indexes if exists, otherwise will get empty. */
+  def getOutputPrimaryKeyIndexes: Array[Int] = {
+    val temporalPkIdxs = getPrimaryKeyIndexesOfTemporalTable
+
+    val outputPkIdx = if (!temporalPkIdxs.isEmpty) {
+      if (calcOnTemporalTable.isDefined) {

Review Comment:
   make sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to