I am hoping you guys can help me. I am stumped how to actually write to Kafka 
using Kafka09JsonTableSink using the Table API. Here is my code below, I am 
hoping you guys can shed some light on how this should be done. I don’t see any 
methods for the actual write to Kafka. I am probably doing something stupid. 
TIA.

Thanks!
Kenny

// run some SQL to filter results where a key is not null
String sql = "SELECT icao FROM flights WHERE icao is not null";
tableEnv.registerTableSource("flights", kafkaTableSource);
Table result = tableEnv.sql(sql);

// create a partition for the data going into kafka
FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();

// create new tablesink of JSON to kafka
KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
        params.getRequired("write-topic"),
        params.getProperties(),
        partition);

result.writeToSink(tableEnv, kafkaTableSink);  // Logically, I want to do this, 
but no such method..

Reply via email to