Thanks Rong, I used Flink 1.3.0 in case of using Flink 1.5 how can I define
jsonschema?Yes, there are two names but now I put one name only and I want
to define jsonschema. 
Rong Rong wrote
> Hi Radhya,Can you provide which Flink version you are using? Based on the
> latestFLINK 1.5 release, Kafka09JsonTableSource takes:/** * Creates a
> Kafka 0.9 JSON {@link StreamTableSource}. * * @param topic       Kafka
> topic to consume. * @param properties  Properties for the Kafka consumer.
> * @param tableSchema The schema of the table. * @param jsonSchema  The
> schema of the JSON messages to decode from Kafka. */Also, your type
> definition: TypeInformation

>  typeInfo2 = Types.ROW(...arguments seem to have different length for
> schema names and types.Thanks,RongOn Fri, Jun 1, 2018 at 9:09 AM, Radhya
> Sahal <

> radhya.sahal@

> > wrote:> Hi,>> Could anyone help me to solve this problem>>>
> /Exception in thread "main" java.lang.Error: Unresolved compilation>
> problem:>         The constructor Kafka09JsonTableSource(String,
> Properties,> TypeInformation

> ) is undefined> /> *--This is the code *> public class FlinkKafkaSQL {>       
>  
> public static void main(String[] args) throws Exception {>             //
> Read parameters from command line>             final ParameterTool params
> = ParameterTool.fromArgs(args);>>            
> if(params.getNumberOfParameters() < 5) {>                
> System.out.println("\nUsage: FlinkReadKafka " +>                              
>      
> "--read-topic 

>  " +>                                    "--write-topic 

>  " +>                                    "--bootstrap.servers 

>  " +>                                    "zookeeper.connect" +>               
>                     
> "--group.id 

> ");>                 return;>             }>>             // setup
> streaming environment>             StreamExecutionEnvironment env =>
> StreamExecutionEnvironment.getExecutionEnvironment();>>
> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,>
> 10000));>             env.enableCheckpointing(300000); // 300 seconds>        
>     
> env.getConfig().setGlobalJobParameters(params);>>            
> StreamTableEnvironment tableEnv =>
> TableEnvironment.getTableEnvironment(env);>>             // specify JSON
> field names and types>>>             TypeInformation

>  typeInfo2 = Types.ROW(>                     new String[] { "iotdevice",
> "sensorID" },>                     new TypeInformation<?>[] {
> Types.STRING()}>             );>>             // create a new tablesource
> of JSON from kafka>             KafkaJsonTableSource kafkaTableSource =
> new> Kafka09JsonTableSource(>                    
> params.getRequired("read-topic"),>                    
> params.getProperties(),>                     typeInfo2);>>             //
> run some SQL to filter results where a key is not null>             String
> sql = "SELECT sensorID " +>                          "FROM iotdevice ";>      
>       
> tableEnv.registerTableSource("iotdevice", kafkaTableSource);>            
> Table result = tableEnv.sql(sql);>>             // create a partition for
> the data going into kafka>             FlinkFixedPartitioner partition = 
> new FlinkFixedPartitioner();>>             // create new tablesink of JSON
> to kafka>             KafkaJsonTableSink kafkaTableSink = new
> Kafka09JsonTableSink(>                    
> params.getRequired("write-topic"),>                    
> params.getProperties(),>                     partition);>>            
> result.writeToSink(kafkaTableSink);>>            
> env.execute("FlinkReadWriteKafkaJSON");>         }> }>>> *This is the
> dependencies  in pom.xml*>>         

>>             

>>                 

> org.apache.flink

>>                 

> flink-java

>>                 

> 1.3.0

>>             

>>             

>>                         

> org.apache.flink

>>                         

> flink-streaming-java_2.11

>>                         

> 1.3.0

>>                 

>>                 

>>                         

> org.apache.flink

>>                         

> flink-clients_2.11

>>                         

> 1.3.0

>>                 

>>                 

>>                         

> org.apache.flink

>>                         

> flink-connector-kafka-0.9

>>> 

> 1.3.0

>>                 

>>                 

>>                         

> org.apache.flink

>>                         

> flink-table_2.11

>>                         

> 1.3.0

>>                 

>>                 

>>                         

> org.apache.flink

>>                         

> flink-core

>>                         

> 1.3.0

>>                 

>>                 

>>                         

> org.apache.flink

>>                         

> flink-streaming-> scala_2.11

>>                         

> 1.3.0

>>                 

>>         

>>>> Regards.>>>> --> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.> n4.nabble.com/>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to