I am hoping you guys can help me. I am stumped how to actually write to Kafka using Kafka09JsonTableSink using the Table API. Here is my code below, I am hoping you guys can shed some light on how this should be done. I don’t see any methods for the actual write to Kafka. I am probably doing something stupid. TIA.
Thanks! Kenny // run some SQL to filter results where a key is not null String sql = "SELECT icao FROM flights WHERE icao is not null"; tableEnv.registerTableSource("flights", kafkaTableSource); Table result = tableEnv.sql(sql); // create a partition for the data going into kafka FlinkFixedPartitioner partition = new FlinkFixedPartitioner(); // create new tablesink of JSON to kafka KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink( params.getRequired("write-topic"), params.getProperties(), partition); result.writeToSink(tableEnv, kafkaTableSink); // Logically, I want to do this, but no such method..