Xin. The open() close() cycle of a Sink function is only called once so I
don't think you event need to have it static your pool. Someone can confirm
this?

Miki the JDBC Connector lacks some functionality for instance it only
flushes batches when the batch interval is reached. So if you set batch
interval to 5 and you get 6 records the 6 one will not be flushed to the DB
until you get another 4. You can see in the code above Xin has put a timer
based flush as well. Also JDBC connector does not have checkpointing if you
ever need that, which is a surprise because most JDBC databases have
transactions so it would be nice to have.

On Wed, 16 Oct 2019 at 10:58, miki haiat <miko5...@gmail.com> wrote:

> If it's a sink that use jdbc, why not using the flink Jdbcsink connector?
>
>
> On Wed, Oct 16, 2019, 17:03 Xin Ma <kevin.xin...@gmail.com> wrote:
>
>> I have watched one of the recent Flink forward videos, Apache Flink Worst
>> Practices by Konstantin Knauf. The talk helps me a lot and mentions that we
>> should avoid using static variables to share state between tasks.
>>
>> So should I also avoid static database connection? Because I am facing a
>> weird issue currently, the database connection will lose at some point and
>> bring the whole job down.
>>
>> *I have created a database tool like this, *
>>
>> public class Phoenix {
>>
>>     private static ComboPooledDataSource dataSource = new
>> ComboPooledDataSource();
>>     static {
>>         try {
>>
>> dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName",
>> "org.apache.phoenix.jdbc.PhoenixDriver"));
>>             dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url",
>> null));
>>             dataSource.setMaxPoolSize(200);
>>             dataSource.setMinPoolSize(10);
>>             Properties properties = new Properties();
>>             properties.setProperty("user", "---");
>>             properties.setProperty("password", "---");
>>             dataSource.setProperties(properties);
>>         } catch (PropertyVetoException e) {
>>             throw new RuntimeException("phoenix datasource conf error");
>>         }
>>     }
>>
>>     private static Connection getConn() throws SQLException {
>>         return dataSource.getConnection();
>>     }
>>
>>     public static < T > T executeQuery(String sql, Caller < T > caller)
>> throws SQLException {
>>         // .. execiton logic
>>     }
>>
>>     public static int executeUpdateWithTx(List < String > sqlList) throws
>> SQLException {
>>         // ..update logic
>>     }
>>
>> }
>>
>> *Then I implemented my customized sink function like this,*
>>
>> public class CustomizedSink extends RichSinkFunction < Record > {
>>
>>     private static Logger LOG =
>> LoggerFactory.getLogger("userFlinkLogger");
>>     private static final int batchInsertSize = 5000;
>>     private static final long flushInterval = 60 * 1000 L;
>>     private long lastFlushTime;
>>     private BatchCommit batchCommit;
>>     private ConcurrentLinkedQueue < Object > cacheQueue;
>>     private ExecutorService threadPool;
>>
>>     @Override
>>     public void open(Configuration parameters) throws Exception {
>>         cacheQueue = new ConcurrentLinkedQueue < > ();
>>         threadPool = Executors.newFixedThreadPool(1);
>>         batchCommit = new BatchCommit();
>>         super.open(parameters);
>>     }
>>
>>     @Override
>>     public void invoke(DriverLbs driverLbs) throws Exception {
>>         cacheQueue.add(driverLbs);
>>         if (cacheQueue.size() >= batchInsertSize ||
>>             System.currentTimeMillis() - lastFlushTime >= flushInterval) {
>>             lastFlushTime = System.currentTimeMillis();
>>             threadPool.execute(batchCommit);
>>         }
>>     }
>>
>>     private class BatchCommit implements Runnable {
>>         @Override
>>         public void run() {
>>             try {
>>                 int ct;
>>                 synchronized(cacheQueue) {
>>                     List < String > sqlList = Lists.newArrayList();
>>                     for (ct = 0; ct < batchInsertSize; ct++) {
>>                         Object obj = cacheQueue.poll();
>>                         if (obj == null) {
>>                             break;
>>                         }
>>                         sqlList.add(generateBatchSql((Record) obj));
>>                     }
>>                     Phoenix.executeUpdateWithTx(sqlList);
>>                 }
>>                 LOG.info("Batch insert " + ct + " cache records");
>>             } catch (Exception e) {
>>                 LOG.error("Batch insert error: ", e);
>>             }
>>         }
>>
>>         private String generateBatchSql(Record record) {
>>             // business logic
>>         }
>>     }
>> }
>>
>> *Is there any good idea to refactor the codes?*
>>
>> Best,
>> Kevin
>>
>>

Reply via email to