Hi Radhya, Can you provide which Flink version you are using? Based on the latest FLINK 1.5 release, Kafka09JsonTableSource takes:
/** * Creates a Kafka 0.9 JSON {@link StreamTableSource}. * * @param topic Kafka topic to consume. * @param properties Properties for the Kafka consumer. * @param tableSchema The schema of the table. * @param jsonSchema The schema of the JSON messages to decode from Kafka. */ Also, your type definition: TypeInformation<Row> typeInfo2 = Types.ROW(... arguments seem to have different length for schema names and types. Thanks, Rong On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal <radhya.sa...@gmail.com> wrote: > Hi, > > Could anyone help me to solve this problem > > > /Exception in thread "main" java.lang.Error: Unresolved compilation > problem: > The constructor Kafka09JsonTableSource(String, Properties, > TypeInformation<Row>) is undefined > / > *--This is the code * > public class FlinkKafkaSQL { > public static void main(String[] args) throws Exception { > // Read parameters from command line > final ParameterTool params = ParameterTool.fromArgs(args); > > if(params.getNumberOfParameters() < 5) { > System.out.println("\nUsage: FlinkReadKafka " + > "--read-topic <topic> " + > "--write-topic <topic> " + > "--bootstrap.servers <kafka brokers> " + > "zookeeper.connect" + > "--group.id <groupid>"); > return; > } > > // setup streaming environment > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, > 10000)); > env.enableCheckpointing(300000); // 300 seconds > env.getConfig().setGlobalJobParameters(params); > > StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > > // specify JSON field names and types > > > TypeInformation<Row> typeInfo2 = Types.ROW( > new String[] { "iotdevice", "sensorID" }, > new TypeInformation<?>[] { Types.STRING()} > ); > > // create a new tablesource of JSON from kafka > KafkaJsonTableSource kafkaTableSource = new > Kafka09JsonTableSource( > params.getRequired("read-topic"), > params.getProperties(), > typeInfo2); > > // run some SQL to filter results where a key is not null > String sql = "SELECT sensorID " + > "FROM iotdevice "; > tableEnv.registerTableSource("iotdevice", kafkaTableSource); > Table result = tableEnv.sql(sql); > > // create a partition for the data going into kafka > FlinkFixedPartitioner partition = new FlinkFixedPartitioner(); > > // create new tablesink of JSON to kafka > KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink( > params.getRequired("write-topic"), > params.getProperties(), > partition); > > result.writeToSink(kafkaTableSink); > > env.execute("FlinkReadWriteKafkaJSON"); > } > } > > > *This is the dependencies in pom.xml* > > <dependencies> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>1.3.0</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-java_2.11</artifactId> > <version>1.3.0</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients_2.11</artifactId> > <version>1.3.0</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka-0.9</artifactId> > > <version>1.3.0</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table_2.11</artifactId> > <version>1.3.0</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-core</artifactId> > <version>1.3.0</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming- > scala_2.11</artifactId> > <version>1.3.0</version> > </dependency> > </dependencies> > > > Regards. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ >