@Shuyi: Yes, a more advanced table example would be helpful anyway and
combining it with Kafka/Avro end-to-end would be even better.
@Will: I totally agree that the current connector ecosystem could be
improved. This is also on mid-term roadmap. Contributors that could help
here are very welcome. We also did a step towards improving the
situation by [1][2] etc.
[1] https://issues.apache.org/jira/browse/FLINK-8240
[2] https://issues.apache.org/jira/browse/FLINK-8630
Regards,
Timo
Am 04.06.18 um 23:06 schrieb Will Du:
Yes, I am also looking for examples for Kafka avro table examples in
java and command line. Also, Kafka avro table sink is still missing.
In addition, once we have Kafka topic, the API should read the schema
directly from schema file or schema registry. The way of current API
supporting lacks of flexibility, just my own opinion.
Sent from my iPhone
On Jun 4, 2018, at 14:29, Shuyi Chen <suez1...@gmail.com
<mailto:suez1...@gmail.com>> wrote:
Given the popularity of Flink SQL and Kafka as streaming source, I
think we can add some examples of using Kafka[XXX]TableSource in
flink-examples/flink-examples-table module. What do you guys think?
Cheers
Shuyi
On Mon, Jun 4, 2018 at 12:57 AM, Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>> wrote:
Hi,
as you can see in code [1] Kafka09JsonTableSource takes a
TableSchema. You can create table schema from type information
see [2].
Regards,
Timo
[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java>
[2]
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
<https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala>
Am 02.06.18 um 18:31 schrieb Radhya Sahal:
Thanks Rong,
I used Flink 1.3.0 in case of using Flink 1.5 how can I
define jsonschema?
Yes, there are two names but now I put one name only and I
want to define
jsonschema.
Rong Rong wrote
Hi Radhya,
Can you provide which Flink version you are using? Based
on the latest
FLINK 1.5 release, Kafka09JsonTableSource takes:
/**
* Creates a Kafka 0.9 JSON {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param tableSchema The schema of the table.
* @param jsonSchema The schema of the JSON messages to
decode from
Kafka.
*/
Also, your type definition: TypeInformation
<Row>
typeInfo2 = Types.ROW(...
arguments seem to have different length for schema names
and types.
Thanks,
Rong
On Fri, Jun 1, 2018 at 9:09 AM, Radhya Sahal <
radhya.sahal@
> wrote:
Hi,
Could anyone help me to solve this problem
/Exception in thread "main" java.lang.Error:
Unresolved compilation
problem:
The constructor
Kafka09JsonTableSource(String, Properties,
TypeInformation
<Row>
) is undefined
/
*--This is the code *
public class FlinkKafkaSQL {
public static void main(String[] args)
throws Exception {
// Read parameters from command line
final ParameterTool params =
ParameterTool.fromArgs(args);
if(params.getNumberOfParameters() < 5) {
System.out.println("\nUsage:
FlinkReadKafka " +
"--read-topic
<topic>
" +
"--write-topic
<topic>
" +
"--bootstrap.servers
<kafka brokers>
" +
"zookeeper.connect" +
"--group.id
<http://group.id>
<groupid>
");
return;
}
// setup streaming environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000));
env.enableCheckpointing(300000); // 300
seconds
env.getConfig().setGlobalJobParameters(params);
StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);
// specify JSON field names and types
TypeInformation
<Row>
typeInfo2 = Types.ROW(
new String[] { "iotdevice",
"sensorID" },
new TypeInformation<?>[] {
Types.STRING()}
);
// create a new tablesource of JSON from
kafka
KafkaJsonTableSource kafkaTableSource = new
Kafka09JsonTableSource(
params.getRequired("read-topic"),
params.getProperties(),
typeInfo2);
// run some SQL to filter results where
a key is not null
String sql = "SELECT sensorID " +
"FROM iotdevice ";
tableEnv.registerTableSource("iotdevice",
kafkaTableSource);
Table result = tableEnv.sql(sql);
// create a partition for the data going
into kafka
FlinkFixedPartitioner partition = new
FlinkFixedPartitioner();
// create new tablesink of JSON to kafka
KafkaJsonTableSink kafkaTableSink = new
Kafka09JsonTableSink(
params.getRequired("write-topic"),
params.getProperties(),
partition);
result.writeToSink(kafkaTableSink);
env.execute("FlinkReadWriteKafkaJSON");
}
}
*This is the dependencies in pom.xml*
<dependencies>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-java
</artifactId>
<version>
1.3.0
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-streaming-java_2.11
</artifactId>
<version>
1.3.0
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-clients_2.11
</artifactId>
<version>
1.3.0
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-connector-kafka-0.9
</artifactId>
<version>
1.3.0
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-table_2.11
</artifactId>
<version>
1.3.0
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-core
</artifactId>
<version>
1.3.0
</version>
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-streaming-
scala_2.11
</artifactId>
<version>
1.3.0
</version>
</dependency>
</dependencies>
Regards.
--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050
<http://apache-flink-user-mailing-list-archive.2336050>.
n4.nabble.com/ <http://n4.nabble.com/>
--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
--
"So you have to trust that the dots will somehow connect in your future."