Hi Radhya,
Can you provide which Flink version you are using? Based on the latest
FLINK 1.5 release, Kafka09JsonTableSource takes:
/**
* Creates a Kafka 0.9 JSON {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param tableSchema The schema of the table.
* @param jsonSchema The schema of the JSON messages to decode from
Kafka.
*/
Also, your type definition: TypeInformation
<Row>
typeInfo2 = Types.ROW(...
arguments seem to have different length for schema names and types.
Thanks,
Rong
On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal <
radhya.sahal@
> wrote:
Hi,
Could anyone help me to solve this problem
/Exception in thread "main" java.lang.Error: Unresolved compilation
problem:
The constructor Kafka09JsonTableSource(String, Properties,
TypeInformation
<Row>
) is undefined
/
*--This is the code *
public class FlinkKafkaSQL {
public static void main(String[] args) throws Exception {
// Read parameters from command line
final ParameterTool params = ParameterTool.fromArgs(args);
if(params.getNumberOfParameters() < 5) {
System.out.println("\nUsage: FlinkReadKafka " +
"--read-topic
<topic>
" +
"--write-topic
<topic>
" +
"--bootstrap.servers
<kafka brokers>
" +
"zookeeper.connect" +
"--group.id
<groupid>
");
return;
}
// setup streaming environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000));
env.enableCheckpointing(300000); // 300 seconds
env.getConfig().setGlobalJobParameters(params);
StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);
// specify JSON field names and types
TypeInformation
<Row>
typeInfo2 = Types.ROW(
new String[] { "iotdevice", "sensorID" },
new TypeInformation<?>[] { Types.STRING()}
);
// create a new tablesource of JSON from kafka
KafkaJsonTableSource kafkaTableSource = new
Kafka09JsonTableSource(
params.getRequired("read-topic"),
params.getProperties(),
typeInfo2);
// run some SQL to filter results where a key is not null
String sql = "SELECT sensorID " +
"FROM iotdevice ";
tableEnv.registerTableSource("iotdevice", kafkaTableSource);
Table result = tableEnv.sql(sql);
// create a partition for the data going into kafka
FlinkFixedPartitioner partition = new
FlinkFixedPartitioner();
// create new tablesink of JSON to kafka
KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
params.getRequired("write-topic"),
params.getProperties(),
partition);
result.writeToSink(kafkaTableSink);
env.execute("FlinkReadWriteKafkaJSON");
}
}
*This is the dependencies in pom.xml*
<dependencies>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-java
</artifactId>
<version>
1.3.0
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-streaming-java_2.11
</artifactId>
<version>
1.3.0
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-clients_2.11
</artifactId>
<version>
1.3.0
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-connector-kafka-0.9
</artifactId>
<version>
1.3.0
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-table_2.11
</artifactId>
<version>
1.3.0
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-core
</artifactId>
<version>
1.3.0
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-streaming-
scala_2.11
</artifactId>
<version>
1.3.0
</version>
</dependency>
</dependencies>
Regards.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.
n4.nabble.com/