Hi, 也可以在sink侧写入的时候,过滤掉RowKind为-U和-D的数据(自定义sink逻辑)
-- Best! Xuyang 在 2025-02-10 12:39:29,"Feng Jin" <jinfeng1...@gmail.com> 写道: >目前还不支持, 可以关注下这个 jira: https://issues.apache.org/jira/browse/FLINK-37005 > > >On Fri, Feb 7, 2025 at 10:20 PM casel.chen <casel_c...@126.com> wrote: > >> 但实际业务场景要使用event time属性时间字段排序,只取最大的一条,有办法不产生或者过滤掉回撤消息么? >> >> 业务场景是 tidb <- ticdc binlog -> kafka -> flink sql -> kafka <- doris routine >> load 最终写入doris aggregation model table 聚合模型表。 >> >> 因为ticdc可能会因为自动重启而回溯部分binlog,所以cdc消息会有重复,因为想在flink >> sql加去重逻辑,去重时间字段是ticdc消息中的单调递增的commitTs字段,而下游kafka和doris聚合模型都不支持回撤消息。所以才来咨询能否过滤掉回撤消息 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2025-01-19 20:08:44,"Feng Jin" <jinfeng1...@gmail.com> 写道: >> >Hi casel >> > >> >使用 proctime 属性时间的字段排序,取第一条是不会产生回撤消息的。 >> > >> >参考: >> > >> https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sql/queries/deduplication/ >> > >> >``` >> > >> >CREATE TABLE Orders ( order_id STRING, user STRING, product >> > STRING, num BIGINT, proctime AS PROCTIME()) WITH >> >(...);-- remove duplicate rows on order_id and keep the first >> >occurrence row,-- because there shouldn't be two orders with the same >> >order_id.SELECT order_id, user, product, numFROM ( SELECT *, >> >ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) AS >> >row_num FROM Orders)WHERE row_num = 1 >> > >> >``` >> > >> > >> >Best, >> >Feng Jin >> > >> > >> >On Sun, Jan 19, 2025 at 7:55 PM casel.chen <casel_c...@126.com> wrote: >> > >> >> 社区有人回答一下么? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2025-01-14 13:42:58,"casel.chen" <casel_c...@126.com> 写道: >> >> >doris routine load不支持消费墓碑消息,会把它当作一个异常数据进行记录,累计达到阈值后会导致任务失败 >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> >在 2025-01-14 13:41:00,"casel.chen" <casel_c...@126.com> 写道: >> >> >>业务tidb表通过ticdc实时同步变更数据到kafka topic,我开发flink实时作业消费kafka处理后投递到另一个kafka >> >> topic,再由doris创建routine load任务消费sink kafka >> >> topic写入到doris聚合模型表,做一些count/sum/min/max统计。 >> >> >>针对上游mysql表的更新要拆分出两条append only (+I) >> >> 数据(通过扩展的cdc数据源来实现),update_before数据要将除聚合key以外的指标值都乘以 -1 >> >> 表示扣减掉,而update_after数据正常输入即可,将这两部分数据union >> >> all起来后根据业务主键例如order_no(tidb表用的是自增id做为主键)进行keyBy再根据更新时间进行去重。 >> >> >>使用的是select * from ( select *, row_number() over (partition by order_no >> >> order by event_time) as rownum from tbl ) where rownum = 1 语法。 >> >> >>但是我发现这样出来的结果会产生回撤流,例如 >> >> >>+I update_before -1 >> >> >>-D update_before -1 >> >> >>+I update_after +1 >> >> >> >> >> >>请问为什么append only数据源去重也会产生回撤数据?有没有办法不产生回撤数据?或者有什么办法可以过滤掉回撤数据再发到下游kafka? >> >> >>