Thanks, John. If I don't static my pool, I think it will create one
instance for each task. If the pool is static, each jvm can hold one
instance. Depending on the deployment approach, it can create one to
multiple instances. Is this correct?
Konstantin's talk mentions static variables can lead to dead locks, etc, I
don't know if the loss of jdbc connection is also related to this. Btw, I
am using JDBC to write to HBase, maybe it also matters.


On Thu, Oct 17, 2019 at 2:32 AM John Smith <java.dev....@gmail.com> wrote:

> Xin. The open() close() cycle of a Sink function is only called once so I
> don't think you event need to have it static your pool. Someone can confirm
> this?
>
> Miki the JDBC Connector lacks some functionality for instance it only
> flushes batches when the batch interval is reached. So if you set batch
> interval to 5 and you get 6 records the 6 one will not be flushed to the DB
> until you get another 4. You can see in the code above Xin has put a timer
> based flush as well. Also JDBC connector does not have checkpointing if you
> ever need that, which is a surprise because most JDBC databases have
> transactions so it would be nice to have.
>
> On Wed, 16 Oct 2019 at 10:58, miki haiat <miko5...@gmail.com> wrote:
>
>> If it's a sink that use jdbc, why not using the flink Jdbcsink connector?
>>
>>
>> On Wed, Oct 16, 2019, 17:03 Xin Ma <kevin.xin...@gmail.com> wrote:
>>
>>> I have watched one of the recent Flink forward videos, Apache Flink
>>> Worst Practices by Konstantin Knauf. The talk helps me a lot and mentions
>>> that we should avoid using static variables to share state between tasks.
>>>
>>> So should I also avoid static database connection? Because I am facing a
>>> weird issue currently, the database connection will lose at some point and
>>> bring the whole job down.
>>>
>>> *I have created a database tool like this, *
>>>
>>> public class Phoenix {
>>>
>>>     private static ComboPooledDataSource dataSource = new
>>> ComboPooledDataSource();
>>>     static {
>>>         try {
>>>
>>> dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName",
>>> "org.apache.phoenix.jdbc.PhoenixDriver"));
>>>             dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url",
>>> null));
>>>             dataSource.setMaxPoolSize(200);
>>>             dataSource.setMinPoolSize(10);
>>>             Properties properties = new Properties();
>>>             properties.setProperty("user", "---");
>>>             properties.setProperty("password", "---");
>>>             dataSource.setProperties(properties);
>>>         } catch (PropertyVetoException e) {
>>>             throw new RuntimeException("phoenix datasource conf error");
>>>         }
>>>     }
>>>
>>>     private static Connection getConn() throws SQLException {
>>>         return dataSource.getConnection();
>>>     }
>>>
>>>     public static < T > T executeQuery(String sql, Caller < T > caller)
>>> throws SQLException {
>>>         // .. execiton logic
>>>     }
>>>
>>>     public static int executeUpdateWithTx(List < String > sqlList)
>>> throws SQLException {
>>>         // ..update logic
>>>     }
>>>
>>> }
>>>
>>> *Then I implemented my customized sink function like this,*
>>>
>>> public class CustomizedSink extends RichSinkFunction < Record > {
>>>
>>>     private static Logger LOG =
>>> LoggerFactory.getLogger("userFlinkLogger");
>>>     private static final int batchInsertSize = 5000;
>>>     private static final long flushInterval = 60 * 1000 L;
>>>     private long lastFlushTime;
>>>     private BatchCommit batchCommit;
>>>     private ConcurrentLinkedQueue < Object > cacheQueue;
>>>     private ExecutorService threadPool;
>>>
>>>     @Override
>>>     public void open(Configuration parameters) throws Exception {
>>>         cacheQueue = new ConcurrentLinkedQueue < > ();
>>>         threadPool = Executors.newFixedThreadPool(1);
>>>         batchCommit = new BatchCommit();
>>>         super.open(parameters);
>>>     }
>>>
>>>     @Override
>>>     public void invoke(DriverLbs driverLbs) throws Exception {
>>>         cacheQueue.add(driverLbs);
>>>         if (cacheQueue.size() >= batchInsertSize ||
>>>             System.currentTimeMillis() - lastFlushTime >= flushInterval)
>>> {
>>>             lastFlushTime = System.currentTimeMillis();
>>>             threadPool.execute(batchCommit);
>>>         }
>>>     }
>>>
>>>     private class BatchCommit implements Runnable {
>>>         @Override
>>>         public void run() {
>>>             try {
>>>                 int ct;
>>>                 synchronized(cacheQueue) {
>>>                     List < String > sqlList = Lists.newArrayList();
>>>                     for (ct = 0; ct < batchInsertSize; ct++) {
>>>                         Object obj = cacheQueue.poll();
>>>                         if (obj == null) {
>>>                             break;
>>>                         }
>>>                         sqlList.add(generateBatchSql((Record) obj));
>>>                     }
>>>                     Phoenix.executeUpdateWithTx(sqlList);
>>>                 }
>>>                 LOG.info("Batch insert " + ct + " cache records");
>>>             } catch (Exception e) {
>>>                 LOG.error("Batch insert error: ", e);
>>>             }
>>>         }
>>>
>>>         private String generateBatchSql(Record record) {
>>>             // business logic
>>>         }
>>>     }
>>> }
>>>
>>> *Is there any good idea to refactor the codes?*
>>>
>>> Best,
>>> Kevin
>>>
>>>

Reply via email to