DynamicTableSink有一个方法是getChangelogMode,可以通过这个方法来指定这个sink接收什么种类的数据

[email protected] <[email protected]> 于2020年7月17日周五 下午1:02写道:

>
>  INSERT INTO kafka_dws_artemis_out_order select warehouse_id, count(*)
> from kafka_ods_artemis_out_order group by warehouse_id;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Table sink
> 'myhive.wanglei.kafka_dws_artemis_out_order' doesn't support consuming
> update changes which is produced by node
> GroupAggregate(groupBy=[warehouse_id], select=[warehouse_id, COUNT(*) AS
> EXPR$1])
>
> 在 Flink-1.10 中可以更改 KafkaTableSinkBase 让它 implements RetractStream 实现。
>
> 我看现在 Flink-1.11 中是用了  KafkaDynamicSource, KafkaDynamicSink,这样怎样改动才能让
> GroupBy 的结果也发送到 Kafka 呢?
>
> 谢谢,
> 王磊
>
>
> [email protected]
>
>

-- 

Best,
Benchao Li

回复