各位老师好
,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
以下是我的代码:
|
public class MysqlSource2 extends RichSourceFunction<ActionType> {
PreparedStatement ps;
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql="select * from actiontype;";
ps = connection.prepareStatement(sql);
}
private static Connection getConnection(){
Connection con=null;
String driverClass= FlinkConfig.config.getProperty("driverClass");
String url=FlinkConfig.config.getProperty("jdbcUrl");
String user=FlinkConfig.config.getProperty("jdbcUser");
String passWord=FlinkConfig.config.getProperty("passWord");
try {
Class.forName(driverClass);
con= DriverManager.getConnection(url,user,passWord);
} catch (Exception e) {
throw new RuntimeException(e);
}
return con;
}
@Override
public void run(SourceContext<ActionType> ctx) throws Exception {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()){
ActionType actionType = new ActionType(
resultSet.getString("action"),
resultSet.getString("action_name")
);
ctx.collect(actionType);
}
}
@Override
public void close() throws Exception {
super.close();
if (null!=connection){
connection.close();
}
if (null!=ps){
ps.close();
}
}
@Override
public void cancel() {
}
};
|
| |
小昌同学
|
|
[email protected]
|