hackergin commented on code in PR #22939:
URL: https://github.com/apache/flink/pull/22939#discussion_r1268198538


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java:
##########
@@ -125,4 +162,118 @@ public void validateColumnListParams(
         // this makes it possible to ignore them in the validator and fall 
back to regular row types
         // see also SqlFunction#deriveType
     }
+
+    @Override
+    protected void registerNamespace(
+            @Nullable SqlValidatorScope usingScope,
+            @Nullable String alias,
+            SqlValidatorNamespace ns,
+            boolean forceNullable) {
+
+        // Generate a new validator namespace for time travel scenario.
+        // Time travel only supports constant expressions, so we need to 
investigate scenarios
+        // where the period of Snapshot is a SqlIdentifier.
+        Optional<SqlSnapshot> timeTravelNode = getTimeTravelNode(ns);
+        if (usingScope != null
+                && timeTravelNode.isPresent()
+                && !(timeTravelNode.get().getPeriod() instanceof 
SqlIdentifier)) {
+            SqlSnapshot sqlSnapshot = timeTravelNode.get();
+            SqlNode periodNode = sqlSnapshot.getPeriod();
+            SqlToRelConverter sqlToRelConverter = 
this.createSqlToRelConverter();
+            RexNode rexNode = sqlToRelConverter.convertExpression(periodNode);
+            RexNode simplifiedRexNode =
+                    FlinkRexUtil.simplify(
+                            sqlToRelConverter.getRexBuilder(),
+                            rexNode,
+                            relOptCluster.getPlanner().getExecutor());
+            List<RexNode> reducedNodes = new ArrayList<>();
+            relOptCluster
+                    .getPlanner()
+                    .getExecutor()
+                    .reduce(
+                            relOptCluster.getRexBuilder(),
+                            Collections.singletonList(simplifiedRexNode),
+                            reducedNodes);
+            // check whether period is the unsupported expression
+            if (!(reducedNodes.get(0) instanceof RexLiteral)) {
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Unsupported time travel expression: %s for 
the expression can not be reduced to a constant by Flink.",
+                                periodNode));
+            }
+
+            RexLiteral rexLiteral = (RexLiteral) (reducedNodes).get(0);
+            TimestampString timestampString = 
rexLiteral.getValueAs(TimestampString.class);
+            checkNotNull(
+                    timestampString,
+                    "The time travel expression %s can not reduce to a valid 
timestamp string. This is a bug. Please file an issue.",
+                    periodNode);
+
+            TableConfig tableConfig = 
ShortcutUtils.unwrapContext(relOptCluster).getTableConfig();
+            ZoneId zoneId = tableConfig.getLocalTimeZone();
+            long timeTravelTimestamp =
+                    
TimestampData.fromEpochMillis(timestampString.getMillisSinceEpoch())
+                            .toLocalDateTime()
+                            .atZone(zoneId)
+                            .toInstant()
+                            .toEpochMilli();
+
+            SchemaVersion schemaVersion = 
TimestampSchemaVersion.of(timeTravelTimestamp);
+            IdentifierNamespace identifierNamespace = (IdentifierNamespace) ns;
+            IdentifierNamespace snapshotNameSpace =
+                    new IdentifierSnapshotNamespace(
+                            identifierNamespace,
+                            schemaVersion,
+                            ((DelegatingScope) usingScope).getParent());
+            ns = snapshotNameSpace;
+
+            sqlSnapshot.setOperand(

Review Comment:
   Since we have reduce the period expression here, we can just build a simple 
SqlLiteral.  And we don't need to reduce again when convertting to RelNode. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to