像上面提到的,目前可能直接使用CDC是一个比较好的方案,自己读数据会有很多问题,比如update数据如何读取、如何读取增量数据、如何处理failover等,还是直接使用CDC最方便
Best, Shammon FY On Tue, Aug 8, 2023 at 11:30 AM Jiabao Sun <jiabao....@xtransfer.cn.invalid> wrote: > Hi, > > 可以尝试使用 flink-cdc-connectors 去实时关联。 > 使用 regular join 需要保留两张表完整的状态,表数据量较大建议使用 rocksdb backend。 > 被关联的表变化不大的话可以考虑 lookup join。 > > Best, > Jiabao > > > > 2023年8月8日 上午11:10,小昌同学 <ccc0606fight...@163.com> 写道: > > > > 谢谢老师指导呀; > > > 我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据 > > 老师这一块有更好的建议嘛 > > > > > > | | > > 小昌同学 > > | > > | > > ccc0606fight...@163.com > > | > > ---- 回复的原邮件 ---- > > | 发件人 | Shammon FY<zjur...@gmail.com> | > > | 发送日期 | 2023年8月8日 10:37 | > > | 收件人 | <user-zh@flink.apache.org> | > > | 主题 | Re: Flink消费MySQL | > > Hi, > > > > 你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏 > > > > 至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded > > source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况 > > > > Best, > > Shammon FY > > > > On Mon, Aug 7, 2023 at 5:04 PM 小昌同学 <ccc0606fight...@163.com> wrote: > > > > 各位老师好 > > > ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊; > > 以下是我的代码: > > | > > public class MysqlSource2 extends RichSourceFunction<ActionType> { > > PreparedStatement ps; > > private Connection connection; > > > > @Override > > public void open(Configuration parameters) throws Exception { > > super.open(parameters); > > connection = getConnection(); > > String sql="select * from actiontype;"; > > ps = connection.prepareStatement(sql); > > } > > > > private static Connection getConnection(){ > > Connection con=null; > > String driverClass= FlinkConfig.config.getProperty("driverClass"); > > String url=FlinkConfig.config.getProperty("jdbcUrl"); > > String user=FlinkConfig.config.getProperty("jdbcUser"); > > String passWord=FlinkConfig.config.getProperty("passWord"); > > > > try { > > Class.forName(driverClass); > > con= DriverManager.getConnection(url,user,passWord); > > } catch (Exception e) { > > throw new RuntimeException(e); > > } > > return con; > > } > > > > @Override > > public void run(SourceContext<ActionType> ctx) throws Exception { > > ResultSet resultSet = ps.executeQuery(); > > while (resultSet.next()){ > > ActionType actionType = new ActionType( > > resultSet.getString("action"), > > resultSet.getString("action_name") > > ); > > ctx.collect(actionType); > > } > > } > > > > @Override > > public void close() throws Exception { > > super.close(); > > if (null!=connection){ > > connection.close(); > > } > > if (null!=ps){ > > ps.close(); > > } > > } > > > > @Override > > public void cancel() { > > } > > }; > > > > > > | > > > > > > | | > > 小昌同学 > > | > > | > > ccc0606fight...@163.com > > | > >