FlinkKafkaConsumer<Bill> kafkaConsumer = new
FlinkKafkaConsumer<>(TrafficConstants.BILLTOPIC,new SchargeConsumerSchema(),
props);
kafkaConsumer.setStartFromLatest();
SingleOutputStreamOperator<Bill> process =
env.addSource(kafkaConsumer).setParallelism(4)
.filter(new HiveFilterFunction(TrafficConstants.HIVEURL,
TrafficConstants.HIVEUSERNAME, TrafficConstants.HIVEPASSWORD)).name("流量费过滤")
.keyBy((KeySelector<Bill, String>) value -> value.getUser_id() +
value.getSerial_number() + value.getProvince_code())
.process(***);
SingleOutputStreamOperator<BillInfo> map = process.map();
map.addSink(new RdsFlowSink(TrafficConstants.URL, TrafficConstants.USERNAME,
TrafficConstants.PASSWORD))
.setParallelism(1).name("sinkRds");
这是主要逻辑:Kafka取数-->自定义richfilter函数加载hive维表数据来过滤数据-->keyby-->process-->自定义sink函数
public class HiveFilterFunction extends RichFilterFunction<Bill> {
Logger LOG = LoggerFactory.getLogger(HiveFilterFunction.class);
private final String jdbcUrl;
private final String username;
private final String password;
private transient volatile Statement sts;
private transient volatile Connection connection;
Map<String, String> map = new ConcurrentHashMap();
public HiveFilterFunction(String jdbcUrl, String username, String password)
{
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName("org.apache.hive.jdbc.HiveDriver");
connection = DriverManager.getConnection(jdbcUrl, username, password);
LOG.info("hive connection --- " + connection);
sts = connection.createStatement();
query();
}
@Override
public boolean filter(Bill value) {
return map.containsKey(value.getIntegrate_item_code())
&&
TrafficConstants.getProCode().contains(value.getProvince_code());
}
@Override
public void close() throws Exception {
super.close();
assert null != sts ;
assert null != connection ;
sts.close();
connection.close();
}
private void query() throws Exception {
ResultSet resultSet = null;
try {
sts.execute(TrafficConstants.SETSQL);
resultSet = sts.executeQuery(TrafficConstants.CODESQL);
while (resultSet.next()) {
map.put(resultSet.getString("charge_code_cbss"), "");
}
} catch (Exception e) {
LOG.error("hive error", e);
throw new Exception(e);
} finally {
assert resultSet != null;
resultSet.close();
}
LOG.info("hive 维表数据加载完成");
}
}
public class RdsFlowSink extends RichSinkFunction<BillInfo>{
Logger LOG = LoggerFactory.getLogger(RdsFlowSink.class);
private final String url;
private final String name;
private final String password;
private transient volatile PreparedStatement insertStatement;
private transient volatile Connection connection;
private transient volatile Counter counter = null;
public RdsFlowSink(String url, String name, String password) {
this.url = url;
this.name = name;
this.password = password;
}
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.jdbc.Driver");
connection = DriverManager.getConnection(url,name,password);
LOG.info("connection --- " + connection);
counter = getRuntimeContext().getMetricGroup().counter("counter");
insertStatement = connection.prepareStatement(TrafficConstants.FLOWSQL);
}
@Override
public void invoke(BillInfo value, Context context) throws Exception {
try {
insertStatement.setString(1,value.getSerial_number());
insertStatement.setString(2,value.getUser_id());
insertStatement.setString(3,value.getIntegrate_item_code());
insertStatement.setString(4,value.getFee());
insertStatement.setString(5,value.getCity_code());
counter.inc(1);
insertStatement.execute();
}catch (Exception e){
LOG.info("invoke --- " + connection);
LOG.error(e.getMessage());
throw new Exception(e);
}
}
@Override
public void close() throws Exception {
super.close();
assert insertStatement != null;
assert connection != null;
insertStatement.close();
connection.close();
}
}
执行的时候程序会卡在 Class.forName("org.apache.hive.jdbc.HiveDriver"); 或者
Class.forName("com.mysql.jdbc.Driver"); 这里
[email protected]
发件人: JasonLee
发送时间: 2020-07-08 18:46
收件人: user-zh
主题: 回复:关于 richfunction中初始化数据库连接的问题
hi
具体是卡在什么地方了呢?可以打印日志定位一下 理论上是不会有这样的问题 还有单个执行的话可以吗?
| |
JasonLee
|
|
邮箱:[email protected]
|
Signature is customized by Netease Mail Master
在2020年07月08日 18:32,[email protected] 写道:
您好:
我使用flink1.10.1版本streamapi编写程序时,在不同的richfunction中
分别使用Class.forName("*****"); 来加载数据库驱动。是不同的两个数据库驱动;这样会导致程序卡住不往下执行;有人遇到吗?
[email protected]