Hi Kenny,

this look almost correct.
The Table class has a method writeToSink(TableSink) that should address
your use case (so the same as yours but without the TableEnvironment
argument).

Does that work for you?
If not what kind of error and error message do you get?

Best, Fabian

2017-10-18 1:28 GMT+02:00 Kenny Gorman <ke...@eventador.io>:

> I am hoping you guys can help me. I am stumped how to actually write to
> Kafka using Kafka09JsonTableSink using the Table API. Here is my code
> below, I am hoping you guys can shed some light on how this should be done.
> I don’t see any methods for the actual write to Kafka. I am probably doing
> something stupid. TIA.
>
> Thanks!
> Kenny
>
> // run some SQL to filter results where a key is not null
> String sql = "SELECT icao FROM flights WHERE icao is not null";
> tableEnv.registerTableSource("flights", kafkaTableSource);
> Table result = tableEnv.sql(sql);
>
> // create a partition for the data going into kafka
> FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();
>
> // create new tablesink of JSON to kafka
> KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
>         params.getRequired("write-topic"),
>         params.getProperties(),
>         partition);
>
> result.writeToSink(tableEnv, kafkaTableSink);  // Logically, I want to do
> this, but no such method..

Reply via email to