Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5241#discussion_r160265977 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -145,6 +145,14 @@ class DataStreamOverAggregate( inputSchema.typeInfo, Some(constants)) + val constantsTypeInfo = + Some(constants).map(_.map(generator.generateExpression(_))).getOrElse(Seq()).map(_.resultType) + val aggInputTypeInfo = constantsTypeInfo.++:(inputSchema.fieldTypeInfos) + + val aggregateInputType = + cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] --- End diff -- It should be possible to create a `RelDataType` without going through code generation and `TypeInformation`. We have a `RelDataType` for the input row and `RelDataType`s for the `RexNode`s. Check if we can use `FlinkTypeFactory.createStructType()` to create a `RelDataType`.
---