Usually the database connection pool is thread safe. When you mean task you mean a single deployed flink job?
I still think a sink is only init once. You can prove it by putting logging in the open and close. On Thu., Oct. 17, 2019, 1:39 a.m. Xin Ma, <kevin.xin...@gmail.com> wrote: > Thanks, John. If I don't static my pool, I think it will create one > instance for each task. If the pool is static, each jvm can hold one > instance. Depending on the deployment approach, it can create one to > multiple instances. Is this correct? > Konstantin's talk mentions static variables can lead to dead locks, etc, I > don't know if the loss of jdbc connection is also related to this. Btw, I > am using JDBC to write to HBase, maybe it also matters. > > > On Thu, Oct 17, 2019 at 2:32 AM John Smith <java.dev....@gmail.com> wrote: > >> Xin. The open() close() cycle of a Sink function is only called once so I >> don't think you event need to have it static your pool. Someone can confirm >> this? >> >> Miki the JDBC Connector lacks some functionality for instance it only >> flushes batches when the batch interval is reached. So if you set batch >> interval to 5 and you get 6 records the 6 one will not be flushed to the DB >> until you get another 4. You can see in the code above Xin has put a timer >> based flush as well. Also JDBC connector does not have checkpointing if you >> ever need that, which is a surprise because most JDBC databases have >> transactions so it would be nice to have. >> >> On Wed, 16 Oct 2019 at 10:58, miki haiat <miko5...@gmail.com> wrote: >> >>> If it's a sink that use jdbc, why not using the flink Jdbcsink connector? >>> >>> >>> On Wed, Oct 16, 2019, 17:03 Xin Ma <kevin.xin...@gmail.com> wrote: >>> >>>> I have watched one of the recent Flink forward videos, Apache Flink >>>> Worst Practices by Konstantin Knauf. The talk helps me a lot and mentions >>>> that we should avoid using static variables to share state between tasks. >>>> >>>> So should I also avoid static database connection? Because I am facing >>>> a weird issue currently, the database connection will lose at some point >>>> and bring the whole job down. >>>> >>>> *I have created a database tool like this, * >>>> >>>> public class Phoenix { >>>> >>>> private static ComboPooledDataSource dataSource = new >>>> ComboPooledDataSource(); >>>> static { >>>> try { >>>> >>>> dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName", >>>> "org.apache.phoenix.jdbc.PhoenixDriver")); >>>> dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url", >>>> null)); >>>> dataSource.setMaxPoolSize(200); >>>> dataSource.setMinPoolSize(10); >>>> Properties properties = new Properties(); >>>> properties.setProperty("user", "---"); >>>> properties.setProperty("password", "---"); >>>> dataSource.setProperties(properties); >>>> } catch (PropertyVetoException e) { >>>> throw new RuntimeException("phoenix datasource conf error"); >>>> } >>>> } >>>> >>>> private static Connection getConn() throws SQLException { >>>> return dataSource.getConnection(); >>>> } >>>> >>>> public static < T > T executeQuery(String sql, Caller < T > caller) >>>> throws SQLException { >>>> // .. execiton logic >>>> } >>>> >>>> public static int executeUpdateWithTx(List < String > sqlList) >>>> throws SQLException { >>>> // ..update logic >>>> } >>>> >>>> } >>>> >>>> *Then I implemented my customized sink function like this,* >>>> >>>> public class CustomizedSink extends RichSinkFunction < Record > { >>>> >>>> private static Logger LOG = >>>> LoggerFactory.getLogger("userFlinkLogger"); >>>> private static final int batchInsertSize = 5000; >>>> private static final long flushInterval = 60 * 1000 L; >>>> private long lastFlushTime; >>>> private BatchCommit batchCommit; >>>> private ConcurrentLinkedQueue < Object > cacheQueue; >>>> private ExecutorService threadPool; >>>> >>>> @Override >>>> public void open(Configuration parameters) throws Exception { >>>> cacheQueue = new ConcurrentLinkedQueue < > (); >>>> threadPool = Executors.newFixedThreadPool(1); >>>> batchCommit = new BatchCommit(); >>>> super.open(parameters); >>>> } >>>> >>>> @Override >>>> public void invoke(DriverLbs driverLbs) throws Exception { >>>> cacheQueue.add(driverLbs); >>>> if (cacheQueue.size() >= batchInsertSize || >>>> System.currentTimeMillis() - lastFlushTime >= >>>> flushInterval) { >>>> lastFlushTime = System.currentTimeMillis(); >>>> threadPool.execute(batchCommit); >>>> } >>>> } >>>> >>>> private class BatchCommit implements Runnable { >>>> @Override >>>> public void run() { >>>> try { >>>> int ct; >>>> synchronized(cacheQueue) { >>>> List < String > sqlList = Lists.newArrayList(); >>>> for (ct = 0; ct < batchInsertSize; ct++) { >>>> Object obj = cacheQueue.poll(); >>>> if (obj == null) { >>>> break; >>>> } >>>> sqlList.add(generateBatchSql((Record) obj)); >>>> } >>>> Phoenix.executeUpdateWithTx(sqlList); >>>> } >>>> LOG.info("Batch insert " + ct + " cache records"); >>>> } catch (Exception e) { >>>> LOG.error("Batch insert error: ", e); >>>> } >>>> } >>>> >>>> private String generateBatchSql(Record record) { >>>> // business logic >>>> } >>>> } >>>> } >>>> >>>> *Is there any good idea to refactor the codes?* >>>> >>>> Best, >>>> Kevin >>>> >>>>