能具体说下如何实现吗? 我用cdc 能实现什么,我现在想让两个Insert Sql 保持到一个事务里, 要么全成功,要么全失败,目前查看Flink 文档 并没有发现相关的解释
wukon...@foxmail.com 发件人: jie han 发送时间: 2021-08-26 21:36 收件人: user-zh 主题: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中 HI: 可以尝试下使用flink cdc 的方式写入到第二个kafka里面呀 悟空 <wukon...@foxmail.com> 于2021年8月26日周四 下午1:54写道: > 我目前用的是flink-connector-kafka_2.11 和 flink-connector-jdbc_2.11, > 测试时,我把任务启动好之后,把mysql 中的目标表删除 或 删除必要字段, > 之后发送一条kafka数据,会报java.sql.BatchUpdateException 异常,然后重试3次。 > 但是接着sink Kafka 是成功的,Kafka端 我开启了 'sink.semantic' = 'exactly-once', > 同时下游consumer 使用 --isolation-level read_committed 读取,依旧能成功读取到数据,说明sink > db 失败,但是sink kafka成功,同时flink 本身任务不会挂掉。 > > > > > ------------------ 原始邮件 ------------------ > 发件人: > "user-zh" > < > tsreape...@gmail.com>; > 发送时间: 2021年8月26日(星期四) 中午1:25 > 收件人: "user-zh"<user-zh@flink.apache.org>; > > 主题: Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中 > > > > Hi! > > 如果 statement set 已经包含了多个 insert 语句,那么写入 kafka 表和写入 db 应该是在同一个作业里进行的。如果写入 db > 失败,那么产生的 exception 应该会让作业失败才对。这里 db 写入失败但 kafka 依旧写入是什么样的现象? > > 另外其实可以考虑分成两个作业,第一个作业将数据写入 db,第二个作业从 db 读出数据写入 kafka。关于捕获 db 数据的变化,可以看一下 > Flink CDC connector[1] > > [1] https://github.com/ververica/flink-cdc-connectors > > 悟空 <wukon...@foxmail.com> 于2021年8月26日周四 下午12:52写道: > > > 能否详细说下呢,statement set[1] 是什么意思, 我是通过StatementSet.addInsertSql 将多个sql > > 加入的,然后执行execute()方法 > > > > > > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > > 发件人: > > > "user-zh" > > > < > > fskm...@gmail.com&gt;; > > 发送时间:&nbsp;2021年8月26日(星期四) 中午12:36 > > 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;; > > > > 主题:&nbsp;Re: 基于flink 1.12 如何通过Flink SQL 实现 多sink端 在同一个事务中 > > > > > > > > 说的是 statement set [1] 吗 ? > > > > [1] > > > > > https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements > > > <https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#execute-a-set-of-sql-statements>> > ; > > 悟空 <wukon...@foxmail.com&gt; 于2021年8月26日周四 上午11:33写道: > > > > &gt; hi all:&amp;nbsp; > > &gt; &amp;nbsp; &amp;nbsp; 我目前基于flink 1.12 sql 来开发功能, > 目前遇到一个问题, 我现在想实现 > > 在一个事务里 先将kafka > > &gt; 源的数据写入到一张msyql 表中, 写入成功后再写入一张kafka的表里。如果写入db失败,则不写入kafka 。 > > &gt; &amp;nbsp; &amp;nbsp;语句类似这种: > > &gt; &amp;nbsp; &amp;nbsp;insert into > db_table_sink&amp;nbsp;select * > > from&amp;nbsp; > > &gt; kafka_source_table; > > &gt; &amp;nbsp; &amp;nbsp;insert into kafka_table_sink > select * from > > kafka_source_table; > > &gt; > > &gt; > > &gt; &amp;nbsp; 请问flink SQL 有实现方式吗? > 目前经过测试,发现如果db写入失败,但是kafka依旧写入,同时flink > > 程序没有挂掉。