No worries :-) Thanks for the notice.

2017-10-18 15:07 GMT+02:00 Kenny Gorman <ke...@eventador.io>:

> Yep we hung out and got it working. I should have replied sooner! Thx for
> the reply.
>
> -kg
>
> On Oct 18, 2017, at 7:06 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi Kenny,
>
> this look almost correct.
> The Table class has a method writeToSink(TableSink) that should address
> your use case (so the same as yours but without the TableEnvironment
> argument).
>
> Does that work for you?
> If not what kind of error and error message do you get?
>
> Best, Fabian
>
> 2017-10-18 1:28 GMT+02:00 Kenny Gorman <ke...@eventador.io>:
>
>> I am hoping you guys can help me. I am stumped how to actually write to
>> Kafka using Kafka09JsonTableSink using the Table API. Here is my code
>> below, I am hoping you guys can shed some light on how this should be done.
>> I don’t see any methods for the actual write to Kafka. I am probably doing
>> something stupid. TIA.
>>
>> Thanks!
>> Kenny
>>
>> // run some SQL to filter results where a key is not null
>> String sql = "SELECT icao FROM flights WHERE icao is not null";
>> tableEnv.registerTableSource("flights", kafkaTableSource);
>> Table result = tableEnv.sql(sql);
>>
>> // create a partition for the data going into kafka
>> FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();
>>
>> // create new tablesink of JSON to kafka
>> KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
>>         params.getRequired("write-topic"),
>>         params.getProperties(),
>>         partition);
>>
>> result.writeToSink(tableEnv, kafkaTableSink);  // Logically, I want to do
>> this, but no such method..
>
>
>

Reply via email to