Hi Jim, This is a known issue[1], could you verify that if this issue meets your requirements?
[1] https://issues.apache.org/jira/browse/FLINK-18002 Jim Chen <chenshuai19950...@gmail.com> 于2020年7月6日周一 下午1:28写道: > Hi, everyone! > > When i use flink1.10 to define table, and i want to define the json array > as the string type. But the query resutl is null when i execute the program. > The detail code as follow: > > package com.flink; > > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.types.Row; > > /** > * kafka topic: test_action > * > * kafka message: > * {"action": [{"actionID": "id001", "actionName": "aaa"}, {"actionID": > "id002", "actionName": "bbb"} ] } > */ > public class Problem2 { > > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings envSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment bsEnv = StreamTableEnvironment.create(env, > envSettings); > // bsEnv.registerFunction("explode3", new ExplodeFunction()); > > String ddlSource = "CREATE TABLE actionTable3 (\n" + > " action STRING\n" + > ") WITH (\n" + > " 'connector.type' = 'kafka',\n" + > " 'connector.version' = '0.11',\n" + > " 'connector.topic' = 'test_action',\n" + > " 'connector.startup-mode' = 'earliest-offset',\n" + > " 'connector.properties.zookeeper.connect' = > 'localhost:2181',\n" + > " 'connector.properties.bootstrap.servers' = > 'localhost:9092',\n" + > " 'update-mode' = 'append',\n" + > " 'format.type' = 'json',\n" + > // " 'format.derive-schema' = 'true',\n" + > " 'format.json-schema' = '{\"type\": \"object\", > \"properties\": {\"action\": {\"type\": \"string\"} } }'" + > ")"; > System.out.println(ddlSource); > bsEnv.sqlUpdate(ddlSource); > > Table table = bsEnv.sqlQuery("select * from actionTable3"); > // Table table = bsEnv.sqlQuery("select * from actionTable2, > LATERAL TABLE(explode3(`action`)) as T(`word`)"); > table.printSchema(); > bsEnv.toAppendStream(table, Row.class) > .print();// the result is null > > bsEnv.execute("ARRAY tableFunction Problem"); > } > } > -- Best, Benchao Li