Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。
你可以使用非 window 聚合来代替。

Btw,你可能说一下你的需求场景么? 为什么需要在  CDC 上做 window 操作呢?

Best,
Jark

On Mon, 23 Nov 2020 at 10:28, jy l <[email protected]> wrote:

> Hi:
> 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下:
> [image: image.png]
> [image: image.png]
> 分组计算的SQL如下:
> [image: image.png]
> 在执行计算时,报了如下异常:
> Exception in thread "main" org.apache.flink.table.api.TableException:
> GroupWindowAggregate doesn't support consuming update and delete changes
> which is produced by node TableSourceScan(table=[[default_catalog,
> default_database, t_order, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1,
> 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps,
> orderInformationId, userId, categoryId, productId, price, productCount,
> priceSum, shipAddress, receiverAddress])
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> at scala.collection.immutable.Range.foreach(Range.scala:155)
> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>
> 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。
> 那面对我这样的情况,该用什么方案来解决?
> 望知道的各位告知一下,感谢!
>
> 祝好
>
>

回复