Also pool should be transient because it it holds connections which
shouldn't/cannot be serialized.

On Thu., Oct. 17, 2019, 9:39 a.m. John Smith, <java.dev....@gmail.com>
wrote:

> If by task you mean job then yes global static variables initialized im
> the main of the job do not get serialized/transfered to the nodes where
> that job may get assigned.
>
> The other thing is also since it is a sink, the sink will be serialized to
> that node and then initialized so that static variable will be local to
> that sink.
>
> Someone from flink should chime in :p
>
> On Thu., Oct. 17, 2019, 9:22 a.m. John Smith, <java.dev....@gmail.com>
> wrote:
>
>> Usually the database connection pool is thread safe. When you mean task
>> you mean a single deployed flink job?
>>
>> I still think a sink is only init once. You can prove it by putting
>> logging in the open and close.
>>
>> On Thu., Oct. 17, 2019, 1:39 a.m. Xin Ma, <kevin.xin...@gmail.com> wrote:
>>
>>> Thanks, John. If I don't static my pool, I think it will create one
>>> instance for each task. If the pool is static, each jvm can hold one
>>> instance. Depending on the deployment approach, it can create one to
>>> multiple instances. Is this correct?
>>> Konstantin's talk mentions static variables can lead to dead locks, etc,
>>> I don't know if the loss of jdbc connection is also related to this. Btw, I
>>> am using JDBC to write to HBase, maybe it also matters.
>>>
>>>
>>> On Thu, Oct 17, 2019 at 2:32 AM John Smith <java.dev....@gmail.com>
>>> wrote:
>>>
>>>> Xin. The open() close() cycle of a Sink function is only called once so
>>>> I don't think you event need to have it static your pool. Someone can
>>>> confirm this?
>>>>
>>>> Miki the JDBC Connector lacks some functionality for instance it only
>>>> flushes batches when the batch interval is reached. So if you set batch
>>>> interval to 5 and you get 6 records the 6 one will not be flushed to the DB
>>>> until you get another 4. You can see in the code above Xin has put a timer
>>>> based flush as well. Also JDBC connector does not have checkpointing if you
>>>> ever need that, which is a surprise because most JDBC databases have
>>>> transactions so it would be nice to have.
>>>>
>>>> On Wed, 16 Oct 2019 at 10:58, miki haiat <miko5...@gmail.com> wrote:
>>>>
>>>>> If it's a sink that use jdbc, why not using the flink Jdbcsink
>>>>> connector?
>>>>>
>>>>>
>>>>> On Wed, Oct 16, 2019, 17:03 Xin Ma <kevin.xin...@gmail.com> wrote:
>>>>>
>>>>>> I have watched one of the recent Flink forward videos, Apache Flink
>>>>>> Worst Practices by Konstantin Knauf. The talk helps me a lot and mentions
>>>>>> that we should avoid using static variables to share state between tasks.
>>>>>>
>>>>>> So should I also avoid static database connection? Because I am
>>>>>> facing a weird issue currently, the database connection will lose at some
>>>>>> point and bring the whole job down.
>>>>>>
>>>>>> *I have created a database tool like this, *
>>>>>>
>>>>>> public class Phoenix {
>>>>>>
>>>>>>     private static ComboPooledDataSource dataSource = new
>>>>>> ComboPooledDataSource();
>>>>>>     static {
>>>>>>         try {
>>>>>>
>>>>>> dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName",
>>>>>> "org.apache.phoenix.jdbc.PhoenixDriver"));
>>>>>>             dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url",
>>>>>> null));
>>>>>>             dataSource.setMaxPoolSize(200);
>>>>>>             dataSource.setMinPoolSize(10);
>>>>>>             Properties properties = new Properties();
>>>>>>             properties.setProperty("user", "---");
>>>>>>             properties.setProperty("password", "---");
>>>>>>             dataSource.setProperties(properties);
>>>>>>         } catch (PropertyVetoException e) {
>>>>>>             throw new RuntimeException("phoenix datasource conf
>>>>>> error");
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>     private static Connection getConn() throws SQLException {
>>>>>>         return dataSource.getConnection();
>>>>>>     }
>>>>>>
>>>>>>     public static < T > T executeQuery(String sql, Caller < T >
>>>>>> caller) throws SQLException {
>>>>>>         // .. execiton logic
>>>>>>     }
>>>>>>
>>>>>>     public static int executeUpdateWithTx(List < String > sqlList)
>>>>>> throws SQLException {
>>>>>>         // ..update logic
>>>>>>     }
>>>>>>
>>>>>> }
>>>>>>
>>>>>> *Then I implemented my customized sink function like this,*
>>>>>>
>>>>>> public class CustomizedSink extends RichSinkFunction < Record > {
>>>>>>
>>>>>>     private static Logger LOG =
>>>>>> LoggerFactory.getLogger("userFlinkLogger");
>>>>>>     private static final int batchInsertSize = 5000;
>>>>>>     private static final long flushInterval = 60 * 1000 L;
>>>>>>     private long lastFlushTime;
>>>>>>     private BatchCommit batchCommit;
>>>>>>     private ConcurrentLinkedQueue < Object > cacheQueue;
>>>>>>     private ExecutorService threadPool;
>>>>>>
>>>>>>     @Override
>>>>>>     public void open(Configuration parameters) throws Exception {
>>>>>>         cacheQueue = new ConcurrentLinkedQueue < > ();
>>>>>>         threadPool = Executors.newFixedThreadPool(1);
>>>>>>         batchCommit = new BatchCommit();
>>>>>>         super.open(parameters);
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public void invoke(DriverLbs driverLbs) throws Exception {
>>>>>>         cacheQueue.add(driverLbs);
>>>>>>         if (cacheQueue.size() >= batchInsertSize ||
>>>>>>             System.currentTimeMillis() - lastFlushTime >=
>>>>>> flushInterval) {
>>>>>>             lastFlushTime = System.currentTimeMillis();
>>>>>>             threadPool.execute(batchCommit);
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>     private class BatchCommit implements Runnable {
>>>>>>         @Override
>>>>>>         public void run() {
>>>>>>             try {
>>>>>>                 int ct;
>>>>>>                 synchronized(cacheQueue) {
>>>>>>                     List < String > sqlList = Lists.newArrayList();
>>>>>>                     for (ct = 0; ct < batchInsertSize; ct++) {
>>>>>>                         Object obj = cacheQueue.poll();
>>>>>>                         if (obj == null) {
>>>>>>                             break;
>>>>>>                         }
>>>>>>                         sqlList.add(generateBatchSql((Record) obj));
>>>>>>                     }
>>>>>>                     Phoenix.executeUpdateWithTx(sqlList);
>>>>>>                 }
>>>>>>                 LOG.info("Batch insert " + ct + " cache records");
>>>>>>             } catch (Exception e) {
>>>>>>                 LOG.error("Batch insert error: ", e);
>>>>>>             }
>>>>>>         }
>>>>>>
>>>>>>         private String generateBatchSql(Record record) {
>>>>>>             // business logic
>>>>>>         }
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> *Is there any good idea to refactor the codes?*
>>>>>>
>>>>>> Best,
>>>>>> Kevin
>>>>>>
>>>>>>

Reply via email to