FlinkKafkaConsumer<Bill> kafkaConsumer = new 
FlinkKafkaConsumer<>(TrafficConstants.BILLTOPIC,new SchargeConsumerSchema(), 
props);
kafkaConsumer.setStartFromLatest();
SingleOutputStreamOperator<Bill> process = 
env.addSource(kafkaConsumer).setParallelism(4)
.filter(new HiveFilterFunction(TrafficConstants.HIVEURL, 
TrafficConstants.HIVEUSERNAME, TrafficConstants.HIVEPASSWORD)).name("流量费过滤")
.keyBy((KeySelector<Bill, String>) value -> value.getUser_id() + 
value.getSerial_number() + value.getProvince_code())
.process(***);
SingleOutputStreamOperator<BillInfo> map = process.map();
map.addSink(new RdsFlowSink(TrafficConstants.URL, TrafficConstants.USERNAME, 
TrafficConstants.PASSWORD))
.setParallelism(1).name("sinkRds");

这是主要逻辑:Kafka取数-->自定义richfilter函数加载hive维表数据来过滤数据-->keyby-->process-->自定义sink函数

public class HiveFilterFunction extends RichFilterFunction<Bill> {
    Logger LOG = LoggerFactory.getLogger(HiveFilterFunction.class);
    private final String jdbcUrl;
    private final String username;
    private final String password;
    private transient volatile Statement sts;
    private transient volatile Connection connection;
    Map<String, String> map = new ConcurrentHashMap();

    public HiveFilterFunction(String jdbcUrl, String username, String password) 
{
        this.jdbcUrl = jdbcUrl;
        this.username = username;
        this.password = password;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName("org.apache.hive.jdbc.HiveDriver");
        connection = DriverManager.getConnection(jdbcUrl, username, password);
        LOG.info("hive connection --- " + connection);
        sts = connection.createStatement();
        query();
    }

    @Override
    public boolean filter(Bill value) {
        return map.containsKey(value.getIntegrate_item_code())
                && 
TrafficConstants.getProCode().contains(value.getProvince_code());
    }

    @Override
    public void close() throws Exception {
        super.close();
        assert null != sts ;
        assert null != connection ;
        sts.close();
        connection.close();
    }

    private void query() throws Exception {
        ResultSet resultSet = null;
        try {
            sts.execute(TrafficConstants.SETSQL);
            resultSet = sts.executeQuery(TrafficConstants.CODESQL);
            while (resultSet.next()) {
                map.put(resultSet.getString("charge_code_cbss"), "");
            }
        } catch (Exception e) {
            LOG.error("hive error", e);
            throw new Exception(e);
        } finally {
            assert resultSet != null;
            resultSet.close();
        }
        LOG.info("hive 维表数据加载完成");
    }
}

public class RdsFlowSink extends RichSinkFunction<BillInfo>{
    Logger LOG = LoggerFactory.getLogger(RdsFlowSink.class);
    private final String url;
    private final String name;
    private final String password;

    private transient volatile PreparedStatement insertStatement;
    private transient volatile Connection connection;
    private transient volatile Counter counter = null;
 
    public RdsFlowSink(String url, String name, String password) {
        this.url = url;
        this.name = name;
        this.password = password;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("com.mysql.jdbc.Driver");
        connection = DriverManager.getConnection(url,name,password);
        LOG.info("connection --- " + connection);
        counter = getRuntimeContext().getMetricGroup().counter("counter");
        insertStatement = connection.prepareStatement(TrafficConstants.FLOWSQL);
     
    }

    @Override
    public void invoke(BillInfo value, Context context) throws Exception {
        try {
            insertStatement.setString(1,value.getSerial_number());
            insertStatement.setString(2,value.getUser_id());
            insertStatement.setString(3,value.getIntegrate_item_code());
            insertStatement.setString(4,value.getFee());
            insertStatement.setString(5,value.getCity_code());
            counter.inc(1);
            insertStatement.execute();
          
        }catch (Exception e){      
            LOG.info("invoke  --- " + connection);
            LOG.error(e.getMessage());
            throw new Exception(e);
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        assert insertStatement != null;
        assert connection != null;
        insertStatement.close();
        connection.close();
    }
}

执行的时候程序会卡在 Class.forName("org.apache.hive.jdbc.HiveDriver"); 或者 
Class.forName("com.mysql.jdbc.Driver"); 这里


[email protected]
 
发件人: JasonLee
发送时间: 2020-07-08 18:46
收件人: user-zh
主题: 回复:关于 richfunction中初始化数据库连接的问题
hi
具体是卡在什么地方了呢?可以打印日志定位一下 理论上是不会有这样的问题 还有单个执行的话可以吗?
 
 
| |
JasonLee
|
|
邮箱:[email protected]
|
 
Signature is customized by Netease Mail Master
 
在2020年07月08日 18:32,[email protected] 写道:
您好:
       我使用flink1.10.1版本streamapi编写程序时,在不同的richfunction中 
分别使用Class.forName("*****"); 来加载数据库驱动。是不同的两个数据库驱动;这样会导致程序卡住不往下执行;有人遇到吗?
 
 
[email protected]

回复