I have watched one of the recent Flink forward videos, Apache Flink Worst Practices by Konstantin Knauf. The talk helps me a lot and mentions that we should avoid using static variables to share state between tasks.
So should I also avoid static database connection? Because I am facing a weird issue currently, the database connection will lose at some point and bring the whole job down. *I have created a database tool like this, * public class Phoenix { private static ComboPooledDataSource dataSource = new ComboPooledDataSource(); static { try { dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName", "org.apache.phoenix.jdbc.PhoenixDriver")); dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url", null)); dataSource.setMaxPoolSize(200); dataSource.setMinPoolSize(10); Properties properties = new Properties(); properties.setProperty("user", "---"); properties.setProperty("password", "---"); dataSource.setProperties(properties); } catch (PropertyVetoException e) { throw new RuntimeException("phoenix datasource conf error"); } } private static Connection getConn() throws SQLException { return dataSource.getConnection(); } public static < T > T executeQuery(String sql, Caller < T > caller) throws SQLException { // .. execiton logic } public static int executeUpdateWithTx(List < String > sqlList) throws SQLException { // ..update logic } } *Then I implemented my customized sink function like this,* public class CustomizedSink extends RichSinkFunction < Record > { private static Logger LOG = LoggerFactory.getLogger("userFlinkLogger"); private static final int batchInsertSize = 5000; private static final long flushInterval = 60 * 1000 L; private long lastFlushTime; private BatchCommit batchCommit; private ConcurrentLinkedQueue < Object > cacheQueue; private ExecutorService threadPool; @Override public void open(Configuration parameters) throws Exception { cacheQueue = new ConcurrentLinkedQueue < > (); threadPool = Executors.newFixedThreadPool(1); batchCommit = new BatchCommit(); super.open(parameters); } @Override public void invoke(DriverLbs driverLbs) throws Exception { cacheQueue.add(driverLbs); if (cacheQueue.size() >= batchInsertSize || System.currentTimeMillis() - lastFlushTime >= flushInterval) { lastFlushTime = System.currentTimeMillis(); threadPool.execute(batchCommit); } } private class BatchCommit implements Runnable { @Override public void run() { try { int ct; synchronized(cacheQueue) { List < String > sqlList = Lists.newArrayList(); for (ct = 0; ct < batchInsertSize; ct++) { Object obj = cacheQueue.poll(); if (obj == null) { break; } sqlList.add(generateBatchSql((Record) obj)); } Phoenix.executeUpdateWithTx(sqlList); } LOG.info("Batch insert " + ct + " cache records"); } catch (Exception e) { LOG.error("Batch insert error: ", e); } } private String generateBatchSql(Record record) { // business logic } } } *Is there any good idea to refactor the codes?* Best, Kevin