If by task you mean job then yes global static variables initialized im the main of the job do not get serialized/transfered to the nodes where that job may get assigned.
The other thing is also since it is a sink, the sink will be serialized to that node and then initialized so that static variable will be local to that sink. Someone from flink should chime in :p On Thu., Oct. 17, 2019, 9:22 a.m. John Smith, <java.dev....@gmail.com> wrote: > Usually the database connection pool is thread safe. When you mean task > you mean a single deployed flink job? > > I still think a sink is only init once. You can prove it by putting > logging in the open and close. > > On Thu., Oct. 17, 2019, 1:39 a.m. Xin Ma, <kevin.xin...@gmail.com> wrote: > >> Thanks, John. If I don't static my pool, I think it will create one >> instance for each task. If the pool is static, each jvm can hold one >> instance. Depending on the deployment approach, it can create one to >> multiple instances. Is this correct? >> Konstantin's talk mentions static variables can lead to dead locks, etc, >> I don't know if the loss of jdbc connection is also related to this. Btw, I >> am using JDBC to write to HBase, maybe it also matters. >> >> >> On Thu, Oct 17, 2019 at 2:32 AM John Smith <java.dev....@gmail.com> >> wrote: >> >>> Xin. The open() close() cycle of a Sink function is only called once so >>> I don't think you event need to have it static your pool. Someone can >>> confirm this? >>> >>> Miki the JDBC Connector lacks some functionality for instance it only >>> flushes batches when the batch interval is reached. So if you set batch >>> interval to 5 and you get 6 records the 6 one will not be flushed to the DB >>> until you get another 4. You can see in the code above Xin has put a timer >>> based flush as well. Also JDBC connector does not have checkpointing if you >>> ever need that, which is a surprise because most JDBC databases have >>> transactions so it would be nice to have. >>> >>> On Wed, 16 Oct 2019 at 10:58, miki haiat <miko5...@gmail.com> wrote: >>> >>>> If it's a sink that use jdbc, why not using the flink Jdbcsink >>>> connector? >>>> >>>> >>>> On Wed, Oct 16, 2019, 17:03 Xin Ma <kevin.xin...@gmail.com> wrote: >>>> >>>>> I have watched one of the recent Flink forward videos, Apache Flink >>>>> Worst Practices by Konstantin Knauf. The talk helps me a lot and mentions >>>>> that we should avoid using static variables to share state between tasks. >>>>> >>>>> So should I also avoid static database connection? Because I am facing >>>>> a weird issue currently, the database connection will lose at some point >>>>> and bring the whole job down. >>>>> >>>>> *I have created a database tool like this, * >>>>> >>>>> public class Phoenix { >>>>> >>>>> private static ComboPooledDataSource dataSource = new >>>>> ComboPooledDataSource(); >>>>> static { >>>>> try { >>>>> >>>>> dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName", >>>>> "org.apache.phoenix.jdbc.PhoenixDriver")); >>>>> dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url", >>>>> null)); >>>>> dataSource.setMaxPoolSize(200); >>>>> dataSource.setMinPoolSize(10); >>>>> Properties properties = new Properties(); >>>>> properties.setProperty("user", "---"); >>>>> properties.setProperty("password", "---"); >>>>> dataSource.setProperties(properties); >>>>> } catch (PropertyVetoException e) { >>>>> throw new RuntimeException("phoenix datasource conf >>>>> error"); >>>>> } >>>>> } >>>>> >>>>> private static Connection getConn() throws SQLException { >>>>> return dataSource.getConnection(); >>>>> } >>>>> >>>>> public static < T > T executeQuery(String sql, Caller < T > >>>>> caller) throws SQLException { >>>>> // .. execiton logic >>>>> } >>>>> >>>>> public static int executeUpdateWithTx(List < String > sqlList) >>>>> throws SQLException { >>>>> // ..update logic >>>>> } >>>>> >>>>> } >>>>> >>>>> *Then I implemented my customized sink function like this,* >>>>> >>>>> public class CustomizedSink extends RichSinkFunction < Record > { >>>>> >>>>> private static Logger LOG = >>>>> LoggerFactory.getLogger("userFlinkLogger"); >>>>> private static final int batchInsertSize = 5000; >>>>> private static final long flushInterval = 60 * 1000 L; >>>>> private long lastFlushTime; >>>>> private BatchCommit batchCommit; >>>>> private ConcurrentLinkedQueue < Object > cacheQueue; >>>>> private ExecutorService threadPool; >>>>> >>>>> @Override >>>>> public void open(Configuration parameters) throws Exception { >>>>> cacheQueue = new ConcurrentLinkedQueue < > (); >>>>> threadPool = Executors.newFixedThreadPool(1); >>>>> batchCommit = new BatchCommit(); >>>>> super.open(parameters); >>>>> } >>>>> >>>>> @Override >>>>> public void invoke(DriverLbs driverLbs) throws Exception { >>>>> cacheQueue.add(driverLbs); >>>>> if (cacheQueue.size() >= batchInsertSize || >>>>> System.currentTimeMillis() - lastFlushTime >= >>>>> flushInterval) { >>>>> lastFlushTime = System.currentTimeMillis(); >>>>> threadPool.execute(batchCommit); >>>>> } >>>>> } >>>>> >>>>> private class BatchCommit implements Runnable { >>>>> @Override >>>>> public void run() { >>>>> try { >>>>> int ct; >>>>> synchronized(cacheQueue) { >>>>> List < String > sqlList = Lists.newArrayList(); >>>>> for (ct = 0; ct < batchInsertSize; ct++) { >>>>> Object obj = cacheQueue.poll(); >>>>> if (obj == null) { >>>>> break; >>>>> } >>>>> sqlList.add(generateBatchSql((Record) obj)); >>>>> } >>>>> Phoenix.executeUpdateWithTx(sqlList); >>>>> } >>>>> LOG.info("Batch insert " + ct + " cache records"); >>>>> } catch (Exception e) { >>>>> LOG.error("Batch insert error: ", e); >>>>> } >>>>> } >>>>> >>>>> private String generateBatchSql(Record record) { >>>>> // business logic >>>>> } >>>>> } >>>>> } >>>>> >>>>> *Is there any good idea to refactor the codes?* >>>>> >>>>> Best, >>>>> Kevin >>>>> >>>>>