Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r197640444 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala --- @@ -328,6 +330,71 @@ case class TemporalOverlaps( } } + /** + * Standard conversion of the TIMESTAMPADD operator. + * Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#TimestampAddConvertlet]] + */ +case class TimestampAdd( + unit: Expression, + count: Expression, + timestamp: Expression) + extends Expression { + + private[flink] final val sqlTsiArray = Array("SQL_TSI_YEAR", "SQL_TSI_QUARTER", "SQL_TSI_MONTH", + "SQL_TSI_WEEK", "SQL_TSI_DAY", "SQL_TSI_HOUR", "SQL_TSI_MINUTE", "SQL_TSI_SECOND") + + override private[flink] def children: Seq[Expression] = unit :: count :: timestamp :: Nil + + override private[flink] def validateInput(): ValidationResult = { + if (!TypeCheckUtils.isString(unit.resultType)) { + return ValidationFailure(s"TimestampAdd operator requires unit to be of type " + + s"String Literal, but get ${unit.resultType}.") + } + if (!TypeCheckUtils.isTimePoint(timestamp.resultType)) { + return ValidationFailure(s"TimestampAdd operator requires timestamp to be of type " + + s"SqlTimeTypeInfo, but get ${timestamp.resultType}.") + } + ValidationSuccess + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + val timeUnit = unit match { + case Literal(value: String, STRING_TYPE_INFO) => + if (sqlTsiArray.contains(value)) { + Some(TimeUnit.valueOf(value.split("_").last)) + } else { + Some(TimeUnit.valueOf(value)) + } + case _ => None + } + + val interval = count match { + case Literal(value: Int, INT_TYPE_INFO) => + makeInterval(value.toLong, timeUnit) + case Literal(value: Long, LONG_TYPE_INFO) => + makeInterval(value, timeUnit) + case _ => + relBuilder.call(SqlStdOperatorTable.MULTIPLY, + relBuilder.getRexBuilder.makeIntervalLiteral(timeUnit.get.multiplier, + new SqlIntervalQualifier(timeUnit.get, null, SqlParserPos.ZERO)), + count.toRexNode) + } + + relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, timestamp.toRexNode, interval) + } + + override def toString: String = s"timestampAdd(${children.mkString(", ")})" + + override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO + + private[flink] def makeInterval(value: Long, timeUnit: Option[TimeUnit]) + (implicit relBuilder: RelBuilder) = { + val countWithUnit = timeUnit.get.multiplier.multiply(java.math.BigDecimal.valueOf(value)) + relBuilder.getRexBuilder.makeIntervalLiteral(countWithUnit, --- End diff -- indent with two spaces
---