Thanks Rong, I used Flink 1.3.0 in case of using Flink 1.5 how can I define jsonschema?
Yes, there are two names but now I put one name only and I want to define jsonschema. Rong Rong wrote > Hi Radhya, > > Can you provide which Flink version you are using? Based on the latest > FLINK 1.5 release, Kafka09JsonTableSource takes: > > /** > * Creates a Kafka 0.9 JSON {@link StreamTableSource}. > * > * @param topic Kafka topic to consume. > * @param properties Properties for the Kafka consumer. > * @param tableSchema The schema of the table. > * @param jsonSchema The schema of the JSON messages to decode from > Kafka. > */ > > Also, your type definition: TypeInformation > <Row> > typeInfo2 = Types.ROW(... > arguments seem to have different length for schema names and types. > > Thanks, > Rong > > On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal < > radhya.sahal@ > > wrote: > >> Hi, >> >> Could anyone help me to solve this problem >> >> >> /Exception in thread "main" java.lang.Error: Unresolved compilation >> problem: >> The constructor Kafka09JsonTableSource(String, Properties, >> TypeInformation > <Row> > ) is undefined >> / >> *--This is the code * >> public class FlinkKafkaSQL { >> public static void main(String[] args) throws Exception { >> // Read parameters from command line >> final ParameterTool params = ParameterTool.fromArgs(args); >> >> if(params.getNumberOfParameters() < 5) { >> System.out.println("\nUsage: FlinkReadKafka " + >> "--read-topic > <topic> > " + >> "--write-topic > <topic> > " + >> "--bootstrap.servers > <kafka brokers> > " + >> "zookeeper.connect" + >> "--group.id > <groupid> > "); >> return; >> } >> >> // setup streaming environment >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, >> 10000)); >> env.enableCheckpointing(300000); // 300 seconds >> env.getConfig().setGlobalJobParameters(params); >> >> StreamTableEnvironment tableEnv = >> TableEnvironment.getTableEnvironment(env); >> >> // specify JSON field names and types >> >> >> TypeInformation > <Row> > typeInfo2 = Types.ROW( >> new String[] { "iotdevice", "sensorID" }, >> new TypeInformation<?>[] { Types.STRING()} >> ); >> >> // create a new tablesource of JSON from kafka >> KafkaJsonTableSource kafkaTableSource = new >> Kafka09JsonTableSource( >> params.getRequired("read-topic"), >> params.getProperties(), >> typeInfo2); >> >> // run some SQL to filter results where a key is not null >> String sql = "SELECT sensorID " + >> "FROM iotdevice "; >> tableEnv.registerTableSource("iotdevice", kafkaTableSource); >> Table result = tableEnv.sql(sql); >> >> // create a partition for the data going into kafka >> FlinkFixedPartitioner partition = new >> FlinkFixedPartitioner(); >> >> // create new tablesink of JSON to kafka >> KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink( >> params.getRequired("write-topic"), >> params.getProperties(), >> partition); >> >> result.writeToSink(kafkaTableSink); >> >> env.execute("FlinkReadWriteKafkaJSON"); >> } >> } >> >> >> *This is the dependencies in pom.xml* >> >> > <dependencies> >> > <dependency> >> > <groupId> > org.apache.flink > </groupId> >> > <artifactId> > flink-java > </artifactId> >> > <version> > 1.3.0 > </version> >> > </dependency> >> > <dependency> >> > <groupId> > org.apache.flink > </groupId> >> > <artifactId> > flink-streaming-java_2.11 > </artifactId> >> > <version> > 1.3.0 > </version> >> > </dependency> >> > <dependency> >> > <groupId> > org.apache.flink > </groupId> >> > <artifactId> > flink-clients_2.11 > </artifactId> >> > <version> > 1.3.0 > </version> >> > </dependency> >> > <dependency> >> > <groupId> > org.apache.flink > </groupId> >> > <artifactId> > flink-connector-kafka-0.9 > </artifactId> >> >> > <version> > 1.3.0 > </version> >> > </dependency> >> > <dependency> >> > <groupId> > org.apache.flink > </groupId> >> > <artifactId> > flink-table_2.11 > </artifactId> >> > <version> > 1.3.0 > </version> >> > </dependency> >> > <dependency> >> > <groupId> > org.apache.flink > </groupId> >> > <artifactId> > flink-core > </artifactId> >> > <version> > 1.3.0 > </version> >> > </dependency> >> > <dependency> >> > <groupId> > org.apache.flink > </groupId> >> > <artifactId> > flink-streaming- >> scala_2.11 > </artifactId> >> > <version> > 1.3.0 > </version> >> > </dependency> >> > </dependencies> >> >> >> Regards. >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050. >> n4.nabble.com/ >> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/