Hi,
你可以试一下用TO_TIMESTAMP(FROM_UNIXTIME(transaction_time)) 将long转为timestamp
--
Best!
Xuyang
在 2023-12-13 15:36:50,"[email protected]" <[email protected]> 写道:
>文档中数据来源于kafka
>tEnv.executeSql("CREATE TABLE transactions (\n" +
> " account_id BIGINT,\n" +
> " amount BIGINT,\n" +
> " transaction_time TIMESTAMP(3),\n" +
> " WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5'
> SECOND\n" +
> ") WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = 'transactions',\n" +
> " 'properties.bootstrap.servers' = 'kafka:9092',\n" +
> " 'format' = 'csv'\n" +
> ")");
>
>怎么对应kafka的transaction_time中的TIMESTAMP(3)类型?
>
>我是用实体类型为
>private long account_id;
>private int amount;
>private long transaction_time;
>
>通过下面插入kafka
>
>StreamExecutionEnvironment env =
>StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> DataStream<Transactions> streamSource = env.addSource(new
> TransactionSource());
> tEnv.createTemporaryView("tran",streamSource,
> $("account_id"),$("amount"),$("transaction_time"));
> tEnv.executeSql("CREATE TABLE transactions (\n" +
> " account_id BIGINT,\n" +
> " amount INT,\n" +
> " transaction_time TIMESTAMP(3)\n" +
> ") WITH (\n" +
> " 'connector' = 'kafka',\n" +
> " 'topic' = 'kafka_mysql_transactions',\n" +
> " 'properties.bootstrap.servers' = '172.24.6.109:9092',\n" +
> " 'format' = 'json'\n" +
> ")");
> tEnv.executeSql("insert into transactions select * from tran ");
>会报类似类型不匹配的错误
>Query schema: [account_id: BIGINT NOT NULL, amount: INT NOT NULL,
>transaction_time: BIGINT NOT NULL]
>Sink schema: [account_id: BIGINT, amount: INT, transaction_time: TIMESTAMP(3)]