Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r141112340
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
    @@ -1414,8 +1414,29 @@ object AggregateUtil {
               aggregates(index) = udagg.getFunction
               accTypes(index) = udagg.accType
     
    -        case unSupported: SqlAggFunction =>
    -          throw new TableException(s"unsupported Function: 
'${unSupported.getName}'")
    +        case other: SqlAggFunction =>
    +          if (other.getKind == SqlKind.COLLECT) {
    +            aggregates(index) = sqlTypeName match {
    +              case TINYINT =>
    +                new ByteCollectAggFunction
    +              case SMALLINT =>
    +                new ShortCollectAggFunction
    +              case INTEGER =>
    +                new IntCollectAggFunction
    +              case BIGINT =>
    +                new LongCollectAggFunction
    +              case VARCHAR | CHAR =>
    +                new StringCollectAggFunction
    +              case FLOAT =>
    +                new FloatCollectAggFunction
    +              case DOUBLE =>
    +                new DoubleCollectAggFunction
    +              case _ =>
    +                new ObjectCollectAggFunction
    +            }
    +          } else {
    --- End diff --
    
    else case can be removed because we keep the catch all.


---

Reply via email to