Hi Radhya,

Can you provide which Flink version you are using? Based on the latest
FLINK 1.5 release, Kafka09JsonTableSource takes:

/**
 * Creates a Kafka 0.9 JSON {@link StreamTableSource}.
 *
 * @param topic       Kafka topic to consume.
 * @param properties  Properties for the Kafka consumer.
 * @param tableSchema The schema of the table.
 * @param jsonSchema  The schema of the JSON messages to decode from Kafka.
 */

Also, your type definition: TypeInformation<Row> typeInfo2 = Types.ROW(...
arguments seem to have different length for schema names and types.

Thanks,
Rong

On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal <radhya.sa...@gmail.com> wrote:

> Hi,
>
> Could anyone help me to solve this problem
>
>
> /Exception in thread "main" java.lang.Error: Unresolved compilation
> problem:
>         The constructor Kafka09JsonTableSource(String, Properties,
> TypeInformation<Row>) is undefined
> /
> *--This is the code *
> public class FlinkKafkaSQL {
>         public static void main(String[] args) throws Exception {
>             // Read parameters from command line
>             final ParameterTool params = ParameterTool.fromArgs(args);
>
>             if(params.getNumberOfParameters() < 5) {
>                 System.out.println("\nUsage: FlinkReadKafka " +
>                                    "--read-topic <topic> " +
>                                    "--write-topic <topic> " +
>                                    "--bootstrap.servers <kafka brokers> " +
>                                    "zookeeper.connect" +
>                                    "--group.id <groupid>");
>                 return;
>             }
>
>             // setup streaming environment
>             StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
> 10000));
>             env.enableCheckpointing(300000); // 300 seconds
>             env.getConfig().setGlobalJobParameters(params);
>
>             StreamTableEnvironment tableEnv =
> TableEnvironment.getTableEnvironment(env);
>
>             // specify JSON field names and types
>
>
>             TypeInformation<Row> typeInfo2 = Types.ROW(
>                     new String[] { "iotdevice", "sensorID" },
>                     new TypeInformation<?>[] { Types.STRING()}
>             );
>
>             // create a new tablesource of JSON from kafka
>             KafkaJsonTableSource kafkaTableSource = new
> Kafka09JsonTableSource(
>                     params.getRequired("read-topic"),
>                     params.getProperties(),
>                     typeInfo2);
>
>             // run some SQL to filter results where a key is not null
>             String sql = "SELECT sensorID " +
>                          "FROM iotdevice ";
>             tableEnv.registerTableSource("iotdevice", kafkaTableSource);
>             Table result = tableEnv.sql(sql);
>
>             // create a partition for the data going into kafka
>             FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();
>
>             // create new tablesink of JSON to kafka
>             KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
>                     params.getRequired("write-topic"),
>                     params.getProperties(),
>                     partition);
>
>             result.writeToSink(kafkaTableSink);
>
>             env.execute("FlinkReadWriteKafkaJSON");
>         }
> }
>
>
> *This is the dependencies  in pom.xml*
>
>         <dependencies>
>             <dependency>
>                 <groupId>org.apache.flink</groupId>
>                 <artifactId>flink-java</artifactId>
>                 <version>1.3.0</version>
>             </dependency>
>             <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-streaming-java_2.11</artifactId>
>                         <version>1.3.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-clients_2.11</artifactId>
>                         <version>1.3.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-connector-kafka-0.9</artifactId>
>
> <version>1.3.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-table_2.11</artifactId>
>                         <version>1.3.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-core</artifactId>
>                         <version>1.3.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-streaming-
> scala_2.11</artifactId>
>                         <version>1.3.0</version>
>                 </dependency>
>         </dependencies>
>
>
> Regards.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Reply via email to