Hi Rad,
at a first glance your example does not look too bad. Which exceptions
do you get? Did you create your pom.xml with the provided template [1]
and then added flink-table, flink-connector-kafkaXXX, flink-streaming-scala?
Regards,
Timo
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/quickstart/java_api_quickstart.html
Am 02.06.18 um 19:26 schrieb Rad Rad:
Hi,
Could any one help me by providing some sample java code which Flink
subscribes data data from kafka and then doing SQL queries using SQL APIs.
Also, what are the compatible versions for java/kafka/flink.
Since, I am beginner and there are many exceptions in my code
public class FlinkKafkaSQL {
public static void main(String[] args) throws Exception {
// Read parameters from command line
final ParameterTool params = ParameterTool.fromArgs(args);
if(params.getNumberOfParameters() < 5) {
System.out.println("\nUsage: FlinkReadKafka " +
"--read-topic <topic> " +
"--write-topic <topic> " +
"--bootstrap.servers <kafka brokers> "
+
"zookeeper.connect" +
"--group.id <groupid>");
return;
}
// setup streaming environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000));
env.enableCheckpointing(300000); // 300 seconds
env.getConfig().setGlobalJobParameters(params);
StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);
// specify JSON field names and types
TypeInformation<Row> typeInfo2 = Types.ROW(
new String[] { "iotdevice", "sensorID" },
new TypeInformation<?>[] { Types.STRING()}
);
// create a new tablesource of JSON from kafka
KafkaJsonTableSource kafkaTableSource = new
Kafka09JsonTableSource(
params.getRequired("read-topic"),
params.getProperties(),
typeInfo2);
// run some SQL to filter results where a key is not null
String sql = "SELECT sensorID " +
"FROM iotdevice ";
tableEnv.registerTableSource("iotdevice", kafkaTableSource);
Table result = tableEnv.sql(sql);
// create a partition for the data going into kafka
FlinkFixedPartitioner partition = new
FlinkFixedPartitioner();
// create new tablesink of JSON to kafka
KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
params.getRequired("write-topic"),
params.getProperties(),
partition);
result.writeToSink(kafkaTableSink);
env.execute("FlinkReadWriteKafkaJSON");
}
}
*This is the dependencies in pom.xml*
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-
scala_2.11</artifactId>
<version>1.3.0</version>
</dependency>
</dependencies>
Regards.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.
n4.nabble.com/
Thank you.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/