DynamicTableSink有一个方法是getChangelogMode,可以通过这个方法来指定这个sink接收什么种类的数据
[email protected] <[email protected]> 于2020年7月17日周五 下午1:02写道: > > INSERT INTO kafka_dws_artemis_out_order select warehouse_id, count(*) > from kafka_ods_artemis_out_order group by warehouse_id; > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: Table sink > 'myhive.wanglei.kafka_dws_artemis_out_order' doesn't support consuming > update changes which is produced by node > GroupAggregate(groupBy=[warehouse_id], select=[warehouse_id, COUNT(*) AS > EXPR$1]) > > 在 Flink-1.10 中可以更改 KafkaTableSinkBase 让它 implements RetractStream 实现。 > > 我看现在 Flink-1.11 中是用了 KafkaDynamicSource, KafkaDynamicSink,这样怎样改动才能让 > GroupBy 的结果也发送到 Kafka 呢? > > 谢谢, > 王磊 > > > [email protected] > > -- Best, Benchao Li
