[ 
https://issues.apache.org/jira/browse/FLINK-26945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17700971#comment-17700971
 ] 

jackylau edited comment on FLINK-26945 at 3/16/23 5:34 AM:
-----------------------------------------------------------

hi [~jark] [~twalthr] [~snuyanzin] , when i implement the date_sub builtin 
function i found the current built-in function implementation lacks samples and 
is not developer friendly. So i have different 3 implementations, which way is 
the best.

the type definition like this
{code:java}
// code placeholder
public static final BuiltInFunctionDefinition DATE_SUB =
        BuiltInFunctionDefinition.newBuilder()
                .name("DATE_SUB")
                .kind(SCALAR)
                .inputTypeStrategy(
                        sequence(
                                or(
                                        logical(LogicalTypeRoot.DATE),
                                        
logical(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE),
                                        
logical(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE),
                                        
logical(LogicalTypeFamily.CHARACTER_STRING)),
                                or(
                                        InputTypeStrategies.explicit(TINYINT()),
                                        
InputTypeStrategies.explicit(SMALLINT()),
                                        InputTypeStrategies.explicit(INT()))))
                .outputTypeStrategy(nullableIfArgs(explicit(DATE())))
                
.runtimeClass("org.apache.flink.table.runtime.functions.scalar.DateSubFunction")
                .build();
 {code}
1)like hive style, i think it is not good.
{code:java}
    public @Nullable Object eval(Object startDate, Object days) {
        if (startDate == null || days == null) {
            return null;
        }        int start = 0;
        if (startDateLogicalType.is(LogicalTypeFamily.CHARACTER_STRING)) {
            start = BinaryStringDataUtil.toDate((BinaryStringData) startDate);
        } else if 
(startDateLogicalType.is(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
            start =
                    (int)
                            (((TimestampData) startDate).getMillisecond()
                                    / DateTimeUtils.MILLIS_PER_DAY);
        } else if 
(startDateLogicalType.is(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
            DateTimeUtils.timestampWithLocalZoneToDate((TimestampData) 
startDate, LOCAL_TZ);
        } else if (startDateLogicalType.is(LogicalTypeRoot.DATE)) {
            start = (int) startDate;
        } else {
            throw new FlinkRuntimeException(
                    "DATE_SUB() don't support argument startDate type " + 
startDate);
        }        return start - ((Number) days).intValue();
    } {code}
2) like spark style.

spark will converts the String/Timestamp type to Date in advance. and it just 
think it is int type in DateSub function like this

 
{code:java}
// think it is int type in DateSub function, so start.asInstanceOf[Int]

case class DateSub(startDate: Expression, days: Expression)
  extends BinaryExpression with ExpectsInputTypes with NullIntolerant {
  override def left: Expression = startDate
  override def right: Expression = days

  override def inputTypes: Seq[AbstractDataType] =
    Seq(DateType, TypeCollection(IntegerType, ShortType, ByteType))

  override def dataType: DataType = DateType

  override def nullSafeEval(start: Any, d: Any): Any = {
    start.asInstanceOf[Int] - d.asInstanceOf[Number].intValue()
  }

  override def prettyName: String = "date_sub"
}


// converts the String/Timestamp type to Date in advance
object DateTimeOperations extends TypeCoercionRule {
  override val transform: PartialFunction[Expression, Expression] = {
    // Skip nodes who's children have not been resolved yet.
    case e if !e.childrenResolved => e
    case d @ DateAdd(AnyTimestampType(), _) => d.copy(startDate = 
Cast(d.startDate, DateType))
    case d @ DateAdd(StringType(), _) => d.copy(startDate = Cast(d.startDate, 
DateType))
    case d @ DateSub(AnyTimestampType(), _) => d.copy(startDate = 
Cast(d.startDate, DateType))
    case d @ DateSub(StringType(), _) => d.copy(startDate = Cast(d.startDate, 
DateType))
  }
}  {code}
 

If I refer to the implementation of spark, it needs to be implemented by 
{color:#ff0000}*org.apache.flink.table.planner.expressions.converter.converters.CustomizedConverte*{color}r
 like this. 

but the CustomizedConverter will be call only in 
*{color:#ff0000}ExpressionEvaluatorFactory.createEvaluator{color}*
{code:java}
class DateSubConverter extends CustomizedConverter {
    @Override
    public RexNode convert(CallExpression call, 
CallExpressionConvertRule.ConvertContext context) {
        checkArgumentNumber(call, 2);

        final RexNode child = context.toRexNode(call.getChildren().get(0));

        final RelDataType targetRelDataType =
                context.getTypeFactory()
                        
.createFieldTypeFromLogicalType(call.getOutputDataType().getLogicalType());

         // rebuild DateSub and need define FlinkSqlOperatorTable.DatsSub
         // because the converter only for expressions to calcite operator 
function 
         RexNode parrent == ...
        return 
context.getRelBuilder().getRexBuilder().makeAbstractCast(targetRelDataType, 
parrent);
    }
} {code}
3) using cast to date by 
*{color:#ff0000}ExpressionEvaluatorFactory.createEvaluator,  the builtin  cast 
rule{color}* 

TimestampToDateCastRule/StringToDateCastRule will cast it to Date , then using 
DataStructureConverters.getConverter(startDate) to convert it to internal Date 
Type, which is int.
this way also has some of if else to process different target type using cast. 
and some useless code logic in cast between LocalDateTime and int.
 
 
so what is the best way and what is your suggestion?


was (Author: jackylau):
hi [~jark] [~twalthr] [~snuyanzin] , when i implement the date_sub builtin 
function i found the current built-in function implementation lacks samples and 
is not developer friendly. So i have different 3 implementations, which way is 
the best.

the type definition like this
{code:java}
// code placeholder
public static final BuiltInFunctionDefinition DATE_SUB =
        BuiltInFunctionDefinition.newBuilder()
                .name("DATE_SUB")
                .kind(SCALAR)
                .inputTypeStrategy(
                        sequence(
                                or(
                                        logical(LogicalTypeRoot.DATE),
                                        
logical(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE),
                                        
logical(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE),
                                        
logical(LogicalTypeFamily.CHARACTER_STRING)),
                                or(
                                        InputTypeStrategies.explicit(TINYINT()),
                                        
InputTypeStrategies.explicit(SMALLINT()),
                                        InputTypeStrategies.explicit(INT()))))
                .outputTypeStrategy(nullableIfArgs(explicit(DATE())))
                
.runtimeClass("org.apache.flink.table.runtime.functions.scalar.DateSubFunction")
                .build();
 {code}
1)like hive style, i think it is not good.
{code:java}
    public @Nullable Object eval(Object startDate, Object days) {
        if (startDate == null || days == null) {
            return null;
        }        int start = 0;
        if (startDateLogicalType.is(LogicalTypeFamily.CHARACTER_STRING)) {
            start = BinaryStringDataUtil.toDate((BinaryStringData) startDate);
        } else if 
(startDateLogicalType.is(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
            start =
                    (int)
                            (((TimestampData) startDate).getMillisecond()
                                    / DateTimeUtils.MILLIS_PER_DAY);
        } else if 
(startDateLogicalType.is(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
            DateTimeUtils.timestampWithLocalZoneToDate((TimestampData) 
startDate, LOCAL_TZ);
        } else if (startDateLogicalType.is(LogicalTypeRoot.DATE)) {
            start = (int) startDate;
        } else {
            throw new FlinkRuntimeException(
                    "DATE_SUB() don't support argument startDate type " + 
startDate);
        }        return start - ((Number) days).intValue();
    } {code}
2) like spark style.

spark will converts the String/Timestamp type to Date in advance. and it just 
think it is int type in DateSub function like this

 
{code:java}
// think it is int type in DateSub function, so start.asInstanceOf[Int]

case class DateSub(startDate: Expression, days: Expression)
  extends BinaryExpression with ExpectsInputTypes with NullIntolerant {
  override def left: Expression = startDate
  override def right: Expression = days

  override def inputTypes: Seq[AbstractDataType] =
    Seq(DateType, TypeCollection(IntegerType, ShortType, ByteType))

  override def dataType: DataType = DateType

  override def nullSafeEval(start: Any, d: Any): Any = {
    start.asInstanceOf[Int] - d.asInstanceOf[Number].intValue()
  }

  override def prettyName: String = "date_sub"
}


// converts the String/Timestamp type to Date in advance
object DateTimeOperations extends TypeCoercionRule {
  override val transform: PartialFunction[Expression, Expression] = {
    // Skip nodes who's children have not been resolved yet.
    case e if !e.childrenResolved => e
    case d @ DateAdd(AnyTimestampType(), _) => d.copy(startDate = 
Cast(d.startDate, DateType))
    case d @ DateAdd(StringType(), _) => d.copy(startDate = Cast(d.startDate, 
DateType))
    case d @ DateSub(AnyTimestampType(), _) => d.copy(startDate = 
Cast(d.startDate, DateType))
    case d @ DateSub(StringType(), _) => d.copy(startDate = Cast(d.startDate, 
DateType))
  }
}  {code}
 

If I refer to the implementation of spark, it needs to be implemented by 
{color:#ff0000}*org.apache.flink.table.planner.expressions.converter.converters.CustomizedConverte*{color}r
 like this. 

but the CustomizedConverter will be call only in 
*{color:#ff0000}ExpressionEvaluatorFactory.createEvaluator{color}*
{code:java}
class DateSubConverter extends CustomizedConverter {
    @Override
    public RexNode convert(CallExpression call, 
CallExpressionConvertRule.ConvertContext context) {
        checkArgumentNumber(call, 2);

        final RexNode child = context.toRexNode(call.getChildren().get(0));

        final RelDataType targetRelDataType =
                context.getTypeFactory()
                        
.createFieldTypeFromLogicalType(call.getOutputDataType().getLogicalType());

        return 
context.getRelBuilder().getRexBuilder().makeAbstractCast(targetRelDataType, 
child);
    }
} {code}
3) using cast to date by 
*{color:#ff0000}ExpressionEvaluatorFactory.createEvaluator,  the builtin  cast 
rule{color}* 

TimestampToDateCastRule/StringToDateCastRule will cast it to Date , then using 
DataStructureConverters.getConverter(startDate) to convert it to internal Date 
Type, which is int.
this way also has some of if else to process different target type using cast. 
and some useless code logic in cast between LocalDateTime and int.
 
 
so what is the best way and what is your suggestion?

> Add DATE_SUB supported in SQL & Table API
> -----------------------------------------
>
>                 Key: FLINK-26945
>                 URL: https://issues.apache.org/jira/browse/FLINK-26945
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / API
>            Reporter: dalongliu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.17.0
>
>
> Returns the date {{numDays}} before {{{}startDate{}}}.
> Syntax:
> {code:java}
> date_sub(startDate, numDays) {code}
> Arguments:
>  * {{{}startDate{}}}: A DATE expression.
>  * {{{}numDays{}}}: An INTEGER expression.
> Returns:
> A DATE.
> If {{numDays}} is negative abs(num_days) are added to {{{}startDate{}}}.
> If the result date overflows the date range the function raises an error.
> Examples:
> {code:java}
> > SELECT date_sub('2016-07-30', 1);
>  2016-07-29 {code}
> See more:
>  * 
> [Spark|https://spark.apache.org/docs/latest/sql-ref-functions-builtin.html#date-and-timestamp-functions]
>  * [Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+udf]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to