Hi, 
> 语句:
> CREATE TABLE A (
> w_data  STRING,
> w_table  STRING,
> w_ts TIMESTAMP(3)

如果需要 w_data 字段是一个json数组,需要声明表时声明对应的结构化数据类型[1]

即你这里的A表需要声明成:
create table json_table(
        w_es BIGINT, 
        w_type STRING, 
        w_isDdl BOOLEAN, 
        w_data ARRAY<ROW<pay_info STRING, online_fee DOUBLE, sign STRING, 
account_pay_fee DOUBLE>>, 
        w_ts TIMESTAMP(3), 
        w_table STRING) WITH (
  'connector.type' = 'kafka',
  'connector.version' = '0.11',
  'connector.topic' = 'json-test',
  'connector.properties.zookeeper.connect' = 'localhost:2181',
  'connector.properties.bootstrap.servers' = 'localhost:9092',
  'connector.properties.group.id' = 'test-jdbc',
  'connector.startup-mode' = 'earliest-offset',
  'format.type' = 'json',
  'format.derive-schema' = 'true'
)


Best,
Leonard Xu

> 在 2020年5月20日,18:06,[email protected] 写道:
> 
> 语句:
> CREATE TABLE A (
> w_data  STRING,
> w_table  STRING,
> w_ts TIMESTAMP(3)
> 
> 
> CREATE TABLE B (
> w_ts TIMESTAMP(3),
> city1_id  STRING,
> cate3_id  STRING,
> pay_order_id  STRING
> )
> 
> insert into B
> select w_ts,
> 
> 'test' as city1_id,
> 
> ArrayIndexOf(w_data, 0) AS cate3_id,
> w_data as pay_order_id
> from A
> 
> 部分数据
> A
> {"w_es":1589870637000,"w_type":"INSERT","w_isDdl":false,"w_data":[{"pay_info":"channelId=82&onlineFee=89.0&outTradeNo=0&payId=0&payType=02&rechargeId=4&totalFee=89.0&tradeStatus=success&userId=32590183789575&sign=00","online_fee":"89.0","sign":"00","account_pay_fee":"0.0"}],"w_ts":"2020-05-20T13:58:37.131Z","w_table":"cccc111"}
> 
> B
> {"w_ts":"2020-05-20T13:58:37.131Z","city1_id":"test","cate3_id":null,"pay_order_id":""}
> 
> 
> 
> [email protected]
> 
> 发件人: Leonard Xu
> 发送时间: 2020-05-20 16:03
> 收件人: user-zh
> 主题: Re: Flink 1.10-SQL解析复杂json问题
> Hi, guaishushu
> 贴query或者图床链接吧,flink-sql中的json的解析支持是比较全的[1],可以把json的 schema 和 异常数据贴下吗?
> 用个单元测试应该就可以复现问题
> 
> Best,
> Leonard
> 
> [1] 
> https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
>  
> <https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java>
> 
>> 在 2020年5月20日,15:51,[email protected] <mailto:[email protected]> 写道:
>> 
>> kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> [email protected] <mailto:[email protected]>

回复