Thanks Rong, I used Flink 1.3.0 in case of using Flink 1.5 how can I define jsonschema?Yes, there are two names but now I put one name only and I want to define jsonschema. Rong Rong wrote > Hi Radhya,Can you provide which Flink version you are using? Based on the > latestFLINK 1.5 release, Kafka09JsonTableSource takes:/** * Creates a > Kafka 0.9 JSON {@link StreamTableSource}. * * @param topic Kafka > topic to consume. * @param properties Properties for the Kafka consumer. > * @param tableSchema The schema of the table. * @param jsonSchema The > schema of the JSON messages to decode from Kafka. */Also, your type > definition: TypeInformation
> typeInfo2 = Types.ROW(...arguments seem to have different length for > schema names and types.Thanks,RongOn Fri, Jun 1, 2018 at 9:09 AM, Radhya > Sahal < > radhya.sahal@ > > wrote:> Hi,>> Could anyone help me to solve this problem>>> > /Exception in thread "main" java.lang.Error: Unresolved compilation> > problem:> The constructor Kafka09JsonTableSource(String, > Properties,> TypeInformation > ) is undefined> /> *--This is the code *> public class FlinkKafkaSQL {> > > public static void main(String[] args) throws Exception {> // > Read parameters from command line> final ParameterTool params > = ParameterTool.fromArgs(args);>> > if(params.getNumberOfParameters() < 5) {> > System.out.println("\nUsage: FlinkReadKafka " +> > > "--read-topic > " +> "--write-topic > " +> "--bootstrap.servers > " +> "zookeeper.connect" +> > > "--group.id > ");> return;> }>> // setup > streaming environment> StreamExecutionEnvironment env => > StreamExecutionEnvironment.getExecutionEnvironment();>> > env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,> > 10000));> env.enableCheckpointing(300000); // 300 seconds> > > env.getConfig().setGlobalJobParameters(params);>> > StreamTableEnvironment tableEnv => > TableEnvironment.getTableEnvironment(env);>> // specify JSON > field names and types>>> TypeInformation > typeInfo2 = Types.ROW(> new String[] { "iotdevice", > "sensorID" },> new TypeInformation<?>[] { > Types.STRING()}> );>> // create a new tablesource > of JSON from kafka> KafkaJsonTableSource kafkaTableSource = > new> Kafka09JsonTableSource(> > params.getRequired("read-topic"),> > params.getProperties(),> typeInfo2);>> // > run some SQL to filter results where a key is not null> String > sql = "SELECT sensorID " +> "FROM iotdevice ";> > > tableEnv.registerTableSource("iotdevice", kafkaTableSource);> > Table result = tableEnv.sql(sql);>> // create a partition for > the data going into kafka> FlinkFixedPartitioner partition = > new FlinkFixedPartitioner();>> // create new tablesink of JSON > to kafka> KafkaJsonTableSink kafkaTableSink = new > Kafka09JsonTableSink(> > params.getRequired("write-topic"),> > params.getProperties(),> partition);>> > result.writeToSink(kafkaTableSink);>> > env.execute("FlinkReadWriteKafkaJSON");> }> }>>> *This is the > dependencies in pom.xml*>> >> >> > org.apache.flink >> > flink-java >> > 1.3.0 >> >> >> > org.apache.flink >> > flink-streaming-java_2.11 >> > 1.3.0 >> >> >> > org.apache.flink >> > flink-clients_2.11 >> > 1.3.0 >> >> >> > org.apache.flink >> > flink-connector-kafka-0.9 >>> > 1.3.0 >> >> >> > org.apache.flink >> > flink-table_2.11 >> > 1.3.0 >> >> >> > org.apache.flink >> > flink-core >> > 1.3.0 >> >> >> > org.apache.flink >> > flink-streaming-> scala_2.11 >> > 1.3.0 >> >> >>>> Regards.>>>> --> Sent from: >>>> http://apache-flink-user-mailing-list-archive.2336050.> n4.nabble.com/> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/