Yep we hung out and got it working. I should have replied sooner! Thx for the 
reply.

-kg

> On Oct 18, 2017, at 7:06 AM, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Kenny,
> 
> this look almost correct. 
> The Table class has a method writeToSink(TableSink) that should address your 
> use case (so the same as yours but without the TableEnvironment argument).
> 
> Does that work for you?
> If not what kind of error and error message do you get?
> 
> Best, Fabian
> 
> 2017-10-18 1:28 GMT+02:00 Kenny Gorman <ke...@eventador.io>:
>> I am hoping you guys can help me. I am stumped how to actually write to 
>> Kafka using Kafka09JsonTableSink using the Table API. Here is my code below, 
>> I am hoping you guys can shed some light on how this should be done. I don’t 
>> see any methods for the actual write to Kafka. I am probably doing something 
>> stupid. TIA.
>> 
>> Thanks!
>> Kenny
>> 
>> // run some SQL to filter results where a key is not null
>> String sql = "SELECT icao FROM flights WHERE icao is not null";
>> tableEnv.registerTableSource("flights", kafkaTableSource);
>> Table result = tableEnv.sql(sql);
>> 
>> // create a partition for the data going into kafka
>> FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();
>> 
>> // create new tablesink of JSON to kafka
>> KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
>>         params.getRequired("write-topic"),
>>         params.getProperties(),
>>         partition);
>> 
>> result.writeToSink(tableEnv, kafkaTableSink);  // Logically, I want to do 
>> this, but no such method..
> 

Reply via email to