Hi,

??????????????????????????????????????????????????????????????????????????


Best,
Yichao Yang




------------------ ???????? ------------------
??????:&nbsp;"[email protected]"<[email protected]&gt;;
????????:&nbsp;2020??7??9??(??????) ????11:31
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;????: ?????????? richfunction????????????????????????




FlinkKafkaConsumer<Bill&gt; kafkaConsumer = new 
FlinkKafkaConsumer<&gt;(TrafficConstants.BILLTOPIC,new SchargeConsumerSchema(), 
props);
kafkaConsumer.setStartFromLatest();
SingleOutputStreamOperator<Bill&gt; process = 
env.addSource(kafkaConsumer).setParallelism(4)
.filter(new HiveFilterFunction(TrafficConstants.HIVEURL, 
TrafficConstants.HIVEUSERNAME, 
TrafficConstants.HIVEPASSWORD)).name("??????????")
.keyBy((KeySelector<Bill, String&gt;) value -&gt; value.getUser_id() + 
value.getSerial_number() + value.getProvince_code())
.process(***);
SingleOutputStreamOperator<BillInfo&gt; map = process.map();
map.addSink(new RdsFlowSink(TrafficConstants.URL, TrafficConstants.USERNAME, 
TrafficConstants.PASSWORD))
.setParallelism(1).name("sinkRds");

??????????????Kafka????--&gt;??????richfilter????????hive??????????????????--&gt;keyby--&gt;process--&gt;??????sink????

public class HiveFilterFunction extends RichFilterFunction<Bill&gt; {
&nbsp;&nbsp;&nbsp; Logger LOG = 
LoggerFactory.getLogger(HiveFilterFunction.class);
&nbsp;&nbsp;&nbsp; private final String jdbcUrl;
&nbsp;&nbsp;&nbsp; private final String username;
&nbsp;&nbsp;&nbsp; private final String password;
&nbsp;&nbsp;&nbsp; private transient volatile Statement sts;
&nbsp;&nbsp;&nbsp; private transient volatile Connection connection;
&nbsp;&nbsp;&nbsp; Map<String, String&gt; map = new ConcurrentHashMap();

&nbsp;&nbsp;&nbsp; public HiveFilterFunction(String jdbcUrl, String username, 
String password) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.jdbcUrl = jdbcUrl;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.username = username;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.password = password;
&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp; @Override
&nbsp;&nbsp;&nbsp; public void open(Configuration parameters) throws Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; super.open(parameters);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
Class.forName("org.apache.hive.jdbc.HiveDriver");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; connection = 
DriverManager.getConnection(jdbcUrl, username, password);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; LOG.info("hive connection --- " + 
connection);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; sts = connection.createStatement();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; query();
&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp; @Override
&nbsp;&nbsp;&nbsp; public boolean filter(Bill value) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; return 
map.containsKey(value.getIntegrate_item_code())
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 &amp;&amp; TrafficConstants.getProCode().contains(value.getProvince_code());
&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp; @Override
&nbsp;&nbsp;&nbsp; public void close() throws Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; super.close();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; assert null != sts ;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; assert null != connection ;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; sts.close();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; connection.close();
&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp; private void query() throws Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ResultSet resultSet = null;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; try {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
sts.execute(TrafficConstants.SETSQL);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; resultSet = 
sts.executeQuery(TrafficConstants.CODESQL);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; while 
(resultSet.next()) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 map.put(resultSet.getString("charge_code_cbss"), "");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } catch (Exception e) {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
LOG.error("hive error", e);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; throw new 
Exception(e);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; } finally {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; assert 
resultSet != null;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
resultSet.close();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; LOG.info("hive ????????????????");
&nbsp;&nbsp;&nbsp; }
}

public class RdsFlowSink extends RichSinkFunction<BillInfo&gt;{
&nbsp;&nbsp;&nbsp; Logger LOG = LoggerFactory.getLogger(RdsFlowSink.class);
&nbsp;&nbsp;&nbsp; private final String url;
&nbsp;&nbsp;&nbsp; private final String name;
&nbsp;&nbsp;&nbsp; private final String password;

&nbsp;&nbsp;&nbsp; private transient volatile PreparedStatement insertStatement;
&nbsp;&nbsp;&nbsp; private transient volatile Connection connection;
&nbsp;&nbsp;&nbsp; private transient volatile Counter counter = null;
&nbsp;
&nbsp;&nbsp;&nbsp; public RdsFlowSink(String url, String name, String password) 
{
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.url = url;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.name = name;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; this.password = password;
&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp; @Override
&nbsp;&nbsp;&nbsp; public void open(Configuration parameters) throws Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
Class.forName("com.mysql.jdbc.Driver");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; connection = 
DriverManager.getConnection(url,name,password);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; LOG.info("connection --- " + 
connection);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; counter = 
getRuntimeContext().getMetricGroup().counter("counter");
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; insertStatement = 
connection.prepareStatement(TrafficConstants.FLOWSQL);
&nbsp;&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp; @Override
&nbsp;&nbsp;&nbsp; public void invoke(BillInfo value, Context context) throws 
Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; try {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
insertStatement.setString(1,value.getSerial_number());
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
insertStatement.setString(2,value.getUser_id());
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
insertStatement.setString(3,value.getIntegrate_item_code());
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
insertStatement.setString(4,value.getFee());
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
insertStatement.setString(5,value.getCity_code());
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
counter.inc(1);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
insertStatement.execute();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }catch (Exception 
e){&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
LOG.info("invoke&nbsp; --- " + connection);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
LOG.error(e.getMessage());
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; throw new 
Exception(e);
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp; }

&nbsp;&nbsp;&nbsp; @Override
&nbsp;&nbsp;&nbsp; public void close() throws Exception {
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; super.close();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; assert insertStatement != null;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; assert connection != null;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; insertStatement.close();
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; connection.close();
&nbsp;&nbsp;&nbsp; }
}

???????????????????? Class.forName("org.apache.hive.jdbc.HiveDriver"); ???? 
Class.forName("com.mysql.jdbc.Driver"); ????


[email protected]
&nbsp;
???????? JasonLee
?????????? 2020-07-08 18:46
???????? user-zh
?????? ?????????? richfunction????????????????????????
hi
???????????????????????????????????????????? ???????????????????????? 
????????????????????????
&nbsp;
&nbsp;
| |
JasonLee
|
|
[email protected]
|
&nbsp;
Signature is customized by Netease Mail Master
&nbsp;
??2020??07??08?? 18:[email protected] ??????
??????
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
??????flink1.10.1????streamapi????????????????????richfunction?? 
????????Class.forName("*****"); 
????????????????????????????????????????????????????????????????????????????????????
&nbsp;
&nbsp;
[email protected]

回复