Usually the database connection pool is thread safe. When you mean task you
mean a single deployed flink job?

I still think a sink is only init once. You can prove it by putting logging
in the open and close.

On Thu., Oct. 17, 2019, 1:39 a.m. Xin Ma, <kevin.xin...@gmail.com> wrote:

> Thanks, John. If I don't static my pool, I think it will create one
> instance for each task. If the pool is static, each jvm can hold one
> instance. Depending on the deployment approach, it can create one to
> multiple instances. Is this correct?
> Konstantin's talk mentions static variables can lead to dead locks, etc, I
> don't know if the loss of jdbc connection is also related to this. Btw, I
> am using JDBC to write to HBase, maybe it also matters.
>
>
> On Thu, Oct 17, 2019 at 2:32 AM John Smith <java.dev....@gmail.com> wrote:
>
>> Xin. The open() close() cycle of a Sink function is only called once so I
>> don't think you event need to have it static your pool. Someone can confirm
>> this?
>>
>> Miki the JDBC Connector lacks some functionality for instance it only
>> flushes batches when the batch interval is reached. So if you set batch
>> interval to 5 and you get 6 records the 6 one will not be flushed to the DB
>> until you get another 4. You can see in the code above Xin has put a timer
>> based flush as well. Also JDBC connector does not have checkpointing if you
>> ever need that, which is a surprise because most JDBC databases have
>> transactions so it would be nice to have.
>>
>> On Wed, 16 Oct 2019 at 10:58, miki haiat <miko5...@gmail.com> wrote:
>>
>>> If it's a sink that use jdbc, why not using the flink Jdbcsink connector?
>>>
>>>
>>> On Wed, Oct 16, 2019, 17:03 Xin Ma <kevin.xin...@gmail.com> wrote:
>>>
>>>> I have watched one of the recent Flink forward videos, Apache Flink
>>>> Worst Practices by Konstantin Knauf. The talk helps me a lot and mentions
>>>> that we should avoid using static variables to share state between tasks.
>>>>
>>>> So should I also avoid static database connection? Because I am facing
>>>> a weird issue currently, the database connection will lose at some point
>>>> and bring the whole job down.
>>>>
>>>> *I have created a database tool like this, *
>>>>
>>>> public class Phoenix {
>>>>
>>>>     private static ComboPooledDataSource dataSource = new
>>>> ComboPooledDataSource();
>>>>     static {
>>>>         try {
>>>>
>>>> dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName",
>>>> "org.apache.phoenix.jdbc.PhoenixDriver"));
>>>>             dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url",
>>>> null));
>>>>             dataSource.setMaxPoolSize(200);
>>>>             dataSource.setMinPoolSize(10);
>>>>             Properties properties = new Properties();
>>>>             properties.setProperty("user", "---");
>>>>             properties.setProperty("password", "---");
>>>>             dataSource.setProperties(properties);
>>>>         } catch (PropertyVetoException e) {
>>>>             throw new RuntimeException("phoenix datasource conf error");
>>>>         }
>>>>     }
>>>>
>>>>     private static Connection getConn() throws SQLException {
>>>>         return dataSource.getConnection();
>>>>     }
>>>>
>>>>     public static < T > T executeQuery(String sql, Caller < T > caller)
>>>> throws SQLException {
>>>>         // .. execiton logic
>>>>     }
>>>>
>>>>     public static int executeUpdateWithTx(List < String > sqlList)
>>>> throws SQLException {
>>>>         // ..update logic
>>>>     }
>>>>
>>>> }
>>>>
>>>> *Then I implemented my customized sink function like this,*
>>>>
>>>> public class CustomizedSink extends RichSinkFunction < Record > {
>>>>
>>>>     private static Logger LOG =
>>>> LoggerFactory.getLogger("userFlinkLogger");
>>>>     private static final int batchInsertSize = 5000;
>>>>     private static final long flushInterval = 60 * 1000 L;
>>>>     private long lastFlushTime;
>>>>     private BatchCommit batchCommit;
>>>>     private ConcurrentLinkedQueue < Object > cacheQueue;
>>>>     private ExecutorService threadPool;
>>>>
>>>>     @Override
>>>>     public void open(Configuration parameters) throws Exception {
>>>>         cacheQueue = new ConcurrentLinkedQueue < > ();
>>>>         threadPool = Executors.newFixedThreadPool(1);
>>>>         batchCommit = new BatchCommit();
>>>>         super.open(parameters);
>>>>     }
>>>>
>>>>     @Override
>>>>     public void invoke(DriverLbs driverLbs) throws Exception {
>>>>         cacheQueue.add(driverLbs);
>>>>         if (cacheQueue.size() >= batchInsertSize ||
>>>>             System.currentTimeMillis() - lastFlushTime >=
>>>> flushInterval) {
>>>>             lastFlushTime = System.currentTimeMillis();
>>>>             threadPool.execute(batchCommit);
>>>>         }
>>>>     }
>>>>
>>>>     private class BatchCommit implements Runnable {
>>>>         @Override
>>>>         public void run() {
>>>>             try {
>>>>                 int ct;
>>>>                 synchronized(cacheQueue) {
>>>>                     List < String > sqlList = Lists.newArrayList();
>>>>                     for (ct = 0; ct < batchInsertSize; ct++) {
>>>>                         Object obj = cacheQueue.poll();
>>>>                         if (obj == null) {
>>>>                             break;
>>>>                         }
>>>>                         sqlList.add(generateBatchSql((Record) obj));
>>>>                     }
>>>>                     Phoenix.executeUpdateWithTx(sqlList);
>>>>                 }
>>>>                 LOG.info("Batch insert " + ct + " cache records");
>>>>             } catch (Exception e) {
>>>>                 LOG.error("Batch insert error: ", e);
>>>>             }
>>>>         }
>>>>
>>>>         private String generateBatchSql(Record record) {
>>>>             // business logic
>>>>         }
>>>>     }
>>>> }
>>>>
>>>> *Is there any good idea to refactor the codes?*
>>>>
>>>> Best,
>>>> Kevin
>>>>
>>>>

Reply via email to