No worries :-) Thanks for the notice. 2017-10-18 15:07 GMT+02:00 Kenny Gorman <ke...@eventador.io>:
> Yep we hung out and got it working. I should have replied sooner! Thx for > the reply. > > -kg > > On Oct 18, 2017, at 7:06 AM, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Kenny, > > this look almost correct. > The Table class has a method writeToSink(TableSink) that should address > your use case (so the same as yours but without the TableEnvironment > argument). > > Does that work for you? > If not what kind of error and error message do you get? > > Best, Fabian > > 2017-10-18 1:28 GMT+02:00 Kenny Gorman <ke...@eventador.io>: > >> I am hoping you guys can help me. I am stumped how to actually write to >> Kafka using Kafka09JsonTableSink using the Table API. Here is my code >> below, I am hoping you guys can shed some light on how this should be done. >> I don’t see any methods for the actual write to Kafka. I am probably doing >> something stupid. TIA. >> >> Thanks! >> Kenny >> >> // run some SQL to filter results where a key is not null >> String sql = "SELECT icao FROM flights WHERE icao is not null"; >> tableEnv.registerTableSource("flights", kafkaTableSource); >> Table result = tableEnv.sql(sql); >> >> // create a partition for the data going into kafka >> FlinkFixedPartitioner partition = new FlinkFixedPartitioner(); >> >> // create new tablesink of JSON to kafka >> KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink( >> params.getRequired("write-topic"), >> params.getProperties(), >> partition); >> >> result.writeToSink(tableEnv, kafkaTableSink); // Logically, I want to do >> this, but no such method.. > > >