Hi,
?????????????????????????????????????????????????????????????????????????? Best, Yichao Yang ------------------ ???????? ------------------ ??????: "[email protected]"<[email protected]>; ????????: 2020??7??9??(??????) ????11:31 ??????: "user-zh"<[email protected]>; ????: ????: ?????????? richfunction???????????????????????? FlinkKafkaConsumer<Bill> kafkaConsumer = new FlinkKafkaConsumer<>(TrafficConstants.BILLTOPIC,new SchargeConsumerSchema(), props); kafkaConsumer.setStartFromLatest(); SingleOutputStreamOperator<Bill> process = env.addSource(kafkaConsumer).setParallelism(4) .filter(new HiveFilterFunction(TrafficConstants.HIVEURL, TrafficConstants.HIVEUSERNAME, TrafficConstants.HIVEPASSWORD)).name("??????????") .keyBy((KeySelector<Bill, String>) value -> value.getUser_id() + value.getSerial_number() + value.getProvince_code()) .process(***); SingleOutputStreamOperator<BillInfo> map = process.map(); map.addSink(new RdsFlowSink(TrafficConstants.URL, TrafficConstants.USERNAME, TrafficConstants.PASSWORD)) .setParallelism(1).name("sinkRds"); ??????????????Kafka????-->??????richfilter????????hive??????????????????-->keyby-->process-->??????sink???? public class HiveFilterFunction extends RichFilterFunction<Bill> { Logger LOG = LoggerFactory.getLogger(HiveFilterFunction.class); private final String jdbcUrl; private final String username; private final String password; private transient volatile Statement sts; private transient volatile Connection connection; Map<String, String> map = new ConcurrentHashMap(); public HiveFilterFunction(String jdbcUrl, String username, String password) { this.jdbcUrl = jdbcUrl; this.username = username; this.password = password; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName("org.apache.hive.jdbc.HiveDriver"); connection = DriverManager.getConnection(jdbcUrl, username, password); LOG.info("hive connection --- " + connection); sts = connection.createStatement(); query(); } @Override public boolean filter(Bill value) { return map.containsKey(value.getIntegrate_item_code()) && TrafficConstants.getProCode().contains(value.getProvince_code()); } @Override public void close() throws Exception { super.close(); assert null != sts ; assert null != connection ; sts.close(); connection.close(); } private void query() throws Exception { ResultSet resultSet = null; try { sts.execute(TrafficConstants.SETSQL); resultSet = sts.executeQuery(TrafficConstants.CODESQL); while (resultSet.next()) { map.put(resultSet.getString("charge_code_cbss"), ""); } } catch (Exception e) { LOG.error("hive error", e); throw new Exception(e); } finally { assert resultSet != null; resultSet.close(); } LOG.info("hive ????????????????"); } } public class RdsFlowSink extends RichSinkFunction<BillInfo>{ Logger LOG = LoggerFactory.getLogger(RdsFlowSink.class); private final String url; private final String name; private final String password; private transient volatile PreparedStatement insertStatement; private transient volatile Connection connection; private transient volatile Counter counter = null; public RdsFlowSink(String url, String name, String password) { this.url = url; this.name = name; this.password = password; } @Override public void open(Configuration parameters) throws Exception { Class.forName("com.mysql.jdbc.Driver"); connection = DriverManager.getConnection(url,name,password); LOG.info("connection --- " + connection); counter = getRuntimeContext().getMetricGroup().counter("counter"); insertStatement = connection.prepareStatement(TrafficConstants.FLOWSQL); } @Override public void invoke(BillInfo value, Context context) throws Exception { try { insertStatement.setString(1,value.getSerial_number()); insertStatement.setString(2,value.getUser_id()); insertStatement.setString(3,value.getIntegrate_item_code()); insertStatement.setString(4,value.getFee()); insertStatement.setString(5,value.getCity_code()); counter.inc(1); insertStatement.execute(); }catch (Exception e){ LOG.info("invoke --- " + connection); LOG.error(e.getMessage()); throw new Exception(e); } } @Override public void close() throws Exception { super.close(); assert insertStatement != null; assert connection != null; insertStatement.close(); connection.close(); } } ???????????????????? Class.forName("org.apache.hive.jdbc.HiveDriver"); ???? Class.forName("com.mysql.jdbc.Driver"); ???? [email protected] ???????? JasonLee ?????????? 2020-07-08 18:46 ???????? user-zh ?????? ?????????? richfunction???????????????????????? hi ???????????????????????????????????????????? ???????????????????????? ???????????????????????? | | JasonLee | | [email protected] | Signature is customized by Netease Mail Master ??2020??07??08?? 18:[email protected] ?????? ?????? ??????flink1.10.1????streamapi????????????????????richfunction?? ????????Class.forName("*****"); ???????????????????????????????????????????????????????????????????????????????????? [email protected]
