[ https://issues.apache.org/jira/browse/FLINK-26945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17700971#comment-17700971 ]
jackylau edited comment on FLINK-26945 at 3/16/23 4:34 AM: ----------------------------------------------------------- hi [~jark] [~twalthr] [~snuyanzin] , when i implement the date_sub builtin function i found the current built-in function implementation lacks samples and is not developer friendly. So i have different 3 implementations, which way is the best. the type definition like this {code:java} // code placeholder public static final BuiltInFunctionDefinition DATE_SUB = BuiltInFunctionDefinition.newBuilder() .name("DATE_SUB") .kind(SCALAR) .inputTypeStrategy( sequence( or( logical(LogicalTypeRoot.DATE), logical(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE), logical(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE), logical(LogicalTypeFamily.CHARACTER_STRING)), or( InputTypeStrategies.explicit(TINYINT()), InputTypeStrategies.explicit(SMALLINT()), InputTypeStrategies.explicit(INT())))) .outputTypeStrategy(nullableIfArgs(explicit(DATE()))) .runtimeClass("org.apache.flink.table.runtime.functions.scalar.DateSubFunction") .build(); {code} 1)like hive style, i think it is not good. {code:java} public @Nullable Object eval(Object startDate, Object days) { if (startDate == null || days == null) { return null; } int start = 0; if (startDateLogicalType.is(LogicalTypeFamily.CHARACTER_STRING)) { start = BinaryStringDataUtil.toDate((BinaryStringData) startDate); } else if (startDateLogicalType.is(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) { start = (int) (((TimestampData) startDate).getMillisecond() / DateTimeUtils.MILLIS_PER_DAY); } else if (startDateLogicalType.is(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) { DateTimeUtils.timestampWithLocalZoneToDate((TimestampData) startDate, LOCAL_TZ); } else if (startDateLogicalType.is(LogicalTypeRoot.DATE)) { start = (int) startDate; } else { throw new FlinkRuntimeException( "DATE_SUB() don't support argument startDate type " + startDate); } return start - ((Number) days).intValue(); } {code} 2) like spark style. spark will converts the String/Timestamp type to Date in advance. and it just think it is int type in DateSub function like this {code:java} // think it is int type in DateSub function, so start.asInstanceOf[Int] case class DateSub(startDate: Expression, days: Expression) extends BinaryExpression with ExpectsInputTypes with NullIntolerant { override def left: Expression = startDate override def right: Expression = days override def inputTypes: Seq[AbstractDataType] = Seq(DateType, TypeCollection(IntegerType, ShortType, ByteType)) override def dataType: DataType = DateType override def nullSafeEval(start: Any, d: Any): Any = { start.asInstanceOf[Int] - d.asInstanceOf[Number].intValue() } override def prettyName: String = "date_sub" } // converts the String/Timestamp type to Date in advance object DateTimeOperations extends TypeCoercionRule { override val transform: PartialFunction[Expression, Expression] = { // Skip nodes who's children have not been resolved yet. case e if !e.childrenResolved => e case d @ DateAdd(AnyTimestampType(), _) => d.copy(startDate = Cast(d.startDate, DateType)) case d @ DateAdd(StringType(), _) => d.copy(startDate = Cast(d.startDate, DateType)) case d @ DateSub(AnyTimestampType(), _) => d.copy(startDate = Cast(d.startDate, DateType)) case d @ DateSub(StringType(), _) => d.copy(startDate = Cast(d.startDate, DateType)) } } {code} If I refer to the implementation of spark, it needs to be implemented by {color:#ff0000}*org.apache.flink.table.planner.expressions.converter.converters.CustomizedConverte*{color}r like this. but the CustomizedConverter will be call only in *{color:#ff0000}ExpressionEvaluatorFactory.createEvaluator{color}* {code:java} class DateSubConverter extends CustomizedConverter { @Override public RexNode convert(CallExpression call, CallExpressionConvertRule.ConvertContext context) { checkArgumentNumber(call, 2); final RexNode child = context.toRexNode(call.getChildren().get(0)); final RelDataType targetRelDataType = context.getTypeFactory() .createFieldTypeFromLogicalType(call.getOutputDataType().getLogicalType()); return context.getRelBuilder().getRexBuilder().makeAbstractCast(targetRelDataType, child); } } {code} 3) using cast to date by *{color:#ff0000}ExpressionEvaluatorFactory.createEvaluator, the builtin cast rule{color}* TimestampToDateCastRule/StringToDateCastRule will cast it to Date , then using DataStructureConverters.getConverter(startDate) to convert it to internal Date Type, which is int. this way also has some of if else to process different target type using cast. and some useless code logic in cast between LocalDateTime and int. so what is the best way and what is your suggestion? was (Author: jackylau): hi [~jark] [~twalthr] , when i implement the date_sub builtin function i found the current built-in function implementation lacks samples and is not developer friendly. So i have different 3 implementations, which way is the best. the type definition like this {code:java} // code placeholder public static final BuiltInFunctionDefinition DATE_SUB = BuiltInFunctionDefinition.newBuilder() .name("DATE_SUB") .kind(SCALAR) .inputTypeStrategy( sequence( or( logical(LogicalTypeRoot.DATE), logical(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE), logical(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE), logical(LogicalTypeFamily.CHARACTER_STRING)), or( InputTypeStrategies.explicit(TINYINT()), InputTypeStrategies.explicit(SMALLINT()), InputTypeStrategies.explicit(INT())))) .outputTypeStrategy(nullableIfArgs(explicit(DATE()))) .runtimeClass("org.apache.flink.table.runtime.functions.scalar.DateSubFunction") .build(); {code} 1)like hive style, i think it is not good. {code:java} public @Nullable Object eval(Object startDate, Object days) { if (startDate == null || days == null) { return null; } int start = 0; if (startDateLogicalType.is(LogicalTypeFamily.CHARACTER_STRING)) { start = BinaryStringDataUtil.toDate((BinaryStringData) startDate); } else if (startDateLogicalType.is(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) { start = (int) (((TimestampData) startDate).getMillisecond() / DateTimeUtils.MILLIS_PER_DAY); } else if (startDateLogicalType.is(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) { DateTimeUtils.timestampWithLocalZoneToDate((TimestampData) startDate, LOCAL_TZ); } else if (startDateLogicalType.is(LogicalTypeRoot.DATE)) { start = (int) startDate; } else { throw new FlinkRuntimeException( "DATE_SUB() don't support argument startDate type " + startDate); } return start - ((Number) days).intValue(); } {code} 2) like spark style. spark will converts the String/Timestamp type to Date in advance. and it just think it is int type in DateSub function like this {code:java} // think it is int type in DateSub function, so start.asInstanceOf[Int] case class DateSub(startDate: Expression, days: Expression) extends BinaryExpression with ExpectsInputTypes with NullIntolerant { override def left: Expression = startDate override def right: Expression = days override def inputTypes: Seq[AbstractDataType] = Seq(DateType, TypeCollection(IntegerType, ShortType, ByteType)) override def dataType: DataType = DateType override def nullSafeEval(start: Any, d: Any): Any = { start.asInstanceOf[Int] - d.asInstanceOf[Number].intValue() } override def prettyName: String = "date_sub" } // converts the String/Timestamp type to Date in advance object DateTimeOperations extends TypeCoercionRule { override val transform: PartialFunction[Expression, Expression] = { // Skip nodes who's children have not been resolved yet. case e if !e.childrenResolved => e case d @ DateAdd(AnyTimestampType(), _) => d.copy(startDate = Cast(d.startDate, DateType)) case d @ DateAdd(StringType(), _) => d.copy(startDate = Cast(d.startDate, DateType)) case d @ DateSub(AnyTimestampType(), _) => d.copy(startDate = Cast(d.startDate, DateType)) case d @ DateSub(StringType(), _) => d.copy(startDate = Cast(d.startDate, DateType)) } } {code} If I refer to the implementation of spark, it needs to be implemented by {color:#FF0000}*org.apache.flink.table.planner.expressions.converter.converters.CustomizedConverte*{color}r like this. but the CustomizedConverter will be call only in *{color:#FF0000}ExpressionEvaluatorFactory.createEvaluator{color}* {code:java} class DateSubConverter extends CustomizedConverter { @Override public RexNode convert(CallExpression call, CallExpressionConvertRule.ConvertContext context) { checkArgumentNumber(call, 2); final RexNode child = context.toRexNode(call.getChildren().get(0)); final RelDataType targetRelDataType = context.getTypeFactory() .createFieldTypeFromLogicalType(call.getOutputDataType().getLogicalType()); return context.getRelBuilder().getRexBuilder().makeAbstractCast(targetRelDataType, child); } } {code} 3) using cast to date by *{color:#FF0000}ExpressionEvaluatorFactory.createEvaluator, the builtin cast rule{color}* TimestampToDateCastRule/StringToDateCastRule will cast it to Date , then using DataStructureConverters.getConverter(startDate) to convert it to internal Date Type, which is int. this way also has some of if else to process different target type using cast. and some useless code logic in cast between LocalDateTime and int. so what is the best way and what is your suggestion? > Add DATE_SUB supported in SQL & Table API > ----------------------------------------- > > Key: FLINK-26945 > URL: https://issues.apache.org/jira/browse/FLINK-26945 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API > Reporter: dalongliu > Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > Returns the date {{numDays}} before {{{}startDate{}}}. > Syntax: > {code:java} > date_sub(startDate, numDays) {code} > Arguments: > * {{{}startDate{}}}: A DATE expression. > * {{{}numDays{}}}: An INTEGER expression. > Returns: > A DATE. > If {{numDays}} is negative abs(num_days) are added to {{{}startDate{}}}. > If the result date overflows the date range the function raises an error. > Examples: > {code:java} > > SELECT date_sub('2016-07-30', 1); > 2016-07-29 {code} > See more: > * > [Spark|https://spark.apache.org/docs/latest/sql-ref-functions-builtin.html#date-and-timestamp-functions] > * [Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+udf] -- This message was sent by Atlassian Jira (v8.20.10#820010)