Hi,

建议你翻译成英文然后到jira里建个issue。

Best,
Kurt


On Thu, Dec 12, 2019 at 11:39 PM 李佟 <[email protected]> wrote:

> 近期进行Flink升级,将原来的程序从老的集群(1.8.0运行正常)迁移到新的集群(1.9.1)中。在部署程序的时候发现在1.9.1的集群中,原来运行正常的Flink
> SQL的程序无法执行,异常如下:
>
>
> org.apache.flink.table.api.ValidationException: *Window can only be
> defined over a time attribute column.*
> at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
>
> at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:90)
>
>
>
> 跟踪到反编译的scala文件并设置断点,发现下图中的红框部分没有执行,直接跳过。
>
>
> 功能很简单,就是在某个时间窗对数值求和。测试用例如下:
>
>
> package org.flowmatrix.isp.traffic.accounting.test;
>
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.scala.typeutils.Types;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableSchema;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.sinks.CsvTableSink;
> import org.apache.flink.table.sinks.TableSink;
> import org.apache.flink.table.sources.DefinedRowtimeAttributes;
> import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
> import org.apache.flink.table.sources.StreamTableSource;
> import org.apache.flink.table.sources.tsextractors.ExistingField;
> import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
> import org.apache.flink.types.Row;
> import org.junit.Test;
>
> import javax.annotation.Nullable;
> import java.sql.Timestamp;
> import java.util.ArrayList;
> import java.util.Collections;
> import java.util.List;
>
> public class TestSql {
>     @Test
>     public void testAccountingSql() {
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>         env.setParallelism(1);
>
>         try {
>             StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
>
>             SimpleTableSource source = new SimpleTableSource();
>             Table t = tableEnv.fromTableSource(source);
>
>             String interval = "5"; //5 second
>             System.out.println("source schema is " +
> source.getTableSchema());
>
>             Table sqlResult = tableEnv.sqlQuery("SELECT " +
>                     " TUMBLE_START(UserActionTime, INTERVAL '" + interval
> + "' SECOND) as rowTime, " +
>                     " Username," +
>                     " SUM(Data) as Data " +
>                     " FROM  " + t +
>                     " GROUP BY TUMBLE(UserActionTime, INTERVAL '" +
> interval + "' SECOND),Username");
>
>
>             String[] fieldNames = {
>                     "rowTime",
>                     "Username", "Data"};
>             TypeInformation[] fieldTypes = {
>                     TypeInformation.of(Timestamp.class),
>                     TypeInformation.of(String.class),
>                     TypeInformation.of(Long.class)};
>
>             TableSink sink1 = new CsvTableSink("/tmp/data.log", ",");
>             sink1 = sink1.configure(fieldNames, fieldTypes);
>             tableEnv.registerTableSink("EsSinkTable", sink1);
>             System.out.println("sql result schema is " +
> sqlResult.getSchema());
>
>             tableEnv.sqlUpdate("insert into EsSinkTable select  " +
>                     "rowTime,Username,Data from " + sqlResult + "");
>
>             env.execute("test");
>         } catch (Exception e) {
>             e.printStackTrace();
>             System.err.println("start program error. FlowMatrix
> --zookeeper <zookeeperAdress> --config <configpath>" +
>                     " --name <jobName> --interval <intervalInMinute>
> --indexName <indexName>");
>             System.err.println(e.toString());
>             return;
>         }
>     }
>
>     public static class SimpleTableSource implements
> StreamTableSource<Row>, DefinedRowtimeAttributes {
>         @Override
>         public DataStream<Row> getDataStream(StreamExecutionEnvironment
> env) {
>             return
> env.fromCollection(genertateData()).assignTimestampsAndWatermarks(new
> AssignerWithPunctuatedWatermarks<Row>() {
>                 private long lastWaterMarkMillSecond = -1;
>                 private long waterMarkPeriodMillSecond = 1000;
>                 @Nullable
>                 @Override
>                 public Watermark checkAndGetNextWatermark(Row lastElement,
> long extractedTimestamp) {
>                     if(extractedTimestamp - lastWaterMarkMillSecond >=
> waterMarkPeriodMillSecond){
>                         lastWaterMarkMillSecond = extractedTimestamp;
>                         return new Watermark(extractedTimestamp);
>                     }
>                     return null;
>                 }
>
>                 @Override
>                 public long extractTimestamp(Row element, long
> previousElementTimestamp) {
>                     return ((Long)element.getField(0))*1000;
>                 }
>             });
>         }
>
>         @Override
>         public TableSchema getTableSchema() {
>             TableSchema schema = TableSchema.builder()
>                     .field("Username", Types.STRING())
>                     .field("Data", Types.LONG())
>                     .field("UserActionTime", Types.SQL_TIMESTAMP())
>                     .build();
>             return schema;
>         }
>
>         @Override
>         public TypeInformation<Row> getReturnType() {
>             String[] names = new String[]{"Username", "Data",
> "UserActionTime"};
>             TypeInformation[] types =
>                     new TypeInformation[]{Types.STRING(), Types.LONG(),
> Types.SQL_TIMESTAMP()};
>             return Types.ROW(names, types);
>         }
>
>
>         @Override
>         public List<RowtimeAttributeDescriptor>
> getRowtimeAttributeDescriptors() {
>             RowtimeAttributeDescriptor rowtimeAttrDescr = new
> RowtimeAttributeDescriptor(
>                     "UserActionTime",
>                     new ExistingField("UserActionTime"),
>                     new AscendingTimestamps());
>             List<RowtimeAttributeDescriptor> listRowtimeAttrDescr =
> Collections.singletonList(rowtimeAttrDescr);
>             return listRowtimeAttrDescr;
>         }
>
>
>         private static List<Row> genertateData() {
>             List<Row> rows = new ArrayList<>();
>             long startTime = System.currentTimeMillis() / 1000 - 10000;
>             for (int i = 0; i < 10000; i++) {
>                 rows.add(buildRecord(startTime, i));
>             }
>             return rows;
>         }
>
>         private static Row buildRecord(long startTime, int i) {
>             Row row = new Row(3);
>             row.setField(0, "fox"); //Username
>             row.setField(1, Math.random()); //Data
>             row.setField(2, startTime + i); //UserActionTime
>             return row;
>         }
>     }
> }
>
>
>

回复