If by task you mean job then yes global static variables initialized im the
main of the job do not get serialized/transfered to the nodes where that
job may get assigned.

The other thing is also since it is a sink, the sink will be serialized to
that node and then initialized so that static variable will be local to
that sink.

Someone from flink should chime in :p

On Thu., Oct. 17, 2019, 9:22 a.m. John Smith, <java.dev....@gmail.com>
wrote:

> Usually the database connection pool is thread safe. When you mean task
> you mean a single deployed flink job?
>
> I still think a sink is only init once. You can prove it by putting
> logging in the open and close.
>
> On Thu., Oct. 17, 2019, 1:39 a.m. Xin Ma, <kevin.xin...@gmail.com> wrote:
>
>> Thanks, John. If I don't static my pool, I think it will create one
>> instance for each task. If the pool is static, each jvm can hold one
>> instance. Depending on the deployment approach, it can create one to
>> multiple instances. Is this correct?
>> Konstantin's talk mentions static variables can lead to dead locks, etc,
>> I don't know if the loss of jdbc connection is also related to this. Btw, I
>> am using JDBC to write to HBase, maybe it also matters.
>>
>>
>> On Thu, Oct 17, 2019 at 2:32 AM John Smith <java.dev....@gmail.com>
>> wrote:
>>
>>> Xin. The open() close() cycle of a Sink function is only called once so
>>> I don't think you event need to have it static your pool. Someone can
>>> confirm this?
>>>
>>> Miki the JDBC Connector lacks some functionality for instance it only
>>> flushes batches when the batch interval is reached. So if you set batch
>>> interval to 5 and you get 6 records the 6 one will not be flushed to the DB
>>> until you get another 4. You can see in the code above Xin has put a timer
>>> based flush as well. Also JDBC connector does not have checkpointing if you
>>> ever need that, which is a surprise because most JDBC databases have
>>> transactions so it would be nice to have.
>>>
>>> On Wed, 16 Oct 2019 at 10:58, miki haiat <miko5...@gmail.com> wrote:
>>>
>>>> If it's a sink that use jdbc, why not using the flink Jdbcsink
>>>> connector?
>>>>
>>>>
>>>> On Wed, Oct 16, 2019, 17:03 Xin Ma <kevin.xin...@gmail.com> wrote:
>>>>
>>>>> I have watched one of the recent Flink forward videos, Apache Flink
>>>>> Worst Practices by Konstantin Knauf. The talk helps me a lot and mentions
>>>>> that we should avoid using static variables to share state between tasks.
>>>>>
>>>>> So should I also avoid static database connection? Because I am facing
>>>>> a weird issue currently, the database connection will lose at some point
>>>>> and bring the whole job down.
>>>>>
>>>>> *I have created a database tool like this, *
>>>>>
>>>>> public class Phoenix {
>>>>>
>>>>>     private static ComboPooledDataSource dataSource = new
>>>>> ComboPooledDataSource();
>>>>>     static {
>>>>>         try {
>>>>>
>>>>> dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName",
>>>>> "org.apache.phoenix.jdbc.PhoenixDriver"));
>>>>>             dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url",
>>>>> null));
>>>>>             dataSource.setMaxPoolSize(200);
>>>>>             dataSource.setMinPoolSize(10);
>>>>>             Properties properties = new Properties();
>>>>>             properties.setProperty("user", "---");
>>>>>             properties.setProperty("password", "---");
>>>>>             dataSource.setProperties(properties);
>>>>>         } catch (PropertyVetoException e) {
>>>>>             throw new RuntimeException("phoenix datasource conf
>>>>> error");
>>>>>         }
>>>>>     }
>>>>>
>>>>>     private static Connection getConn() throws SQLException {
>>>>>         return dataSource.getConnection();
>>>>>     }
>>>>>
>>>>>     public static < T > T executeQuery(String sql, Caller < T >
>>>>> caller) throws SQLException {
>>>>>         // .. execiton logic
>>>>>     }
>>>>>
>>>>>     public static int executeUpdateWithTx(List < String > sqlList)
>>>>> throws SQLException {
>>>>>         // ..update logic
>>>>>     }
>>>>>
>>>>> }
>>>>>
>>>>> *Then I implemented my customized sink function like this,*
>>>>>
>>>>> public class CustomizedSink extends RichSinkFunction < Record > {
>>>>>
>>>>>     private static Logger LOG =
>>>>> LoggerFactory.getLogger("userFlinkLogger");
>>>>>     private static final int batchInsertSize = 5000;
>>>>>     private static final long flushInterval = 60 * 1000 L;
>>>>>     private long lastFlushTime;
>>>>>     private BatchCommit batchCommit;
>>>>>     private ConcurrentLinkedQueue < Object > cacheQueue;
>>>>>     private ExecutorService threadPool;
>>>>>
>>>>>     @Override
>>>>>     public void open(Configuration parameters) throws Exception {
>>>>>         cacheQueue = new ConcurrentLinkedQueue < > ();
>>>>>         threadPool = Executors.newFixedThreadPool(1);
>>>>>         batchCommit = new BatchCommit();
>>>>>         super.open(parameters);
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public void invoke(DriverLbs driverLbs) throws Exception {
>>>>>         cacheQueue.add(driverLbs);
>>>>>         if (cacheQueue.size() >= batchInsertSize ||
>>>>>             System.currentTimeMillis() - lastFlushTime >=
>>>>> flushInterval) {
>>>>>             lastFlushTime = System.currentTimeMillis();
>>>>>             threadPool.execute(batchCommit);
>>>>>         }
>>>>>     }
>>>>>
>>>>>     private class BatchCommit implements Runnable {
>>>>>         @Override
>>>>>         public void run() {
>>>>>             try {
>>>>>                 int ct;
>>>>>                 synchronized(cacheQueue) {
>>>>>                     List < String > sqlList = Lists.newArrayList();
>>>>>                     for (ct = 0; ct < batchInsertSize; ct++) {
>>>>>                         Object obj = cacheQueue.poll();
>>>>>                         if (obj == null) {
>>>>>                             break;
>>>>>                         }
>>>>>                         sqlList.add(generateBatchSql((Record) obj));
>>>>>                     }
>>>>>                     Phoenix.executeUpdateWithTx(sqlList);
>>>>>                 }
>>>>>                 LOG.info("Batch insert " + ct + " cache records");
>>>>>             } catch (Exception e) {
>>>>>                 LOG.error("Batch insert error: ", e);
>>>>>             }
>>>>>         }
>>>>>
>>>>>         private String generateBatchSql(Record record) {
>>>>>             // business logic
>>>>>         }
>>>>>     }
>>>>> }
>>>>>
>>>>> *Is there any good idea to refactor the codes?*
>>>>>
>>>>> Best,
>>>>> Kevin
>>>>>
>>>>>

Reply via email to