Also pool should be transient because it it holds connections which shouldn't/cannot be serialized.
On Thu., Oct. 17, 2019, 9:39 a.m. John Smith, <java.dev....@gmail.com> wrote: > If by task you mean job then yes global static variables initialized im > the main of the job do not get serialized/transfered to the nodes where > that job may get assigned. > > The other thing is also since it is a sink, the sink will be serialized to > that node and then initialized so that static variable will be local to > that sink. > > Someone from flink should chime in :p > > On Thu., Oct. 17, 2019, 9:22 a.m. John Smith, <java.dev....@gmail.com> > wrote: > >> Usually the database connection pool is thread safe. When you mean task >> you mean a single deployed flink job? >> >> I still think a sink is only init once. You can prove it by putting >> logging in the open and close. >> >> On Thu., Oct. 17, 2019, 1:39 a.m. Xin Ma, <kevin.xin...@gmail.com> wrote: >> >>> Thanks, John. If I don't static my pool, I think it will create one >>> instance for each task. If the pool is static, each jvm can hold one >>> instance. Depending on the deployment approach, it can create one to >>> multiple instances. Is this correct? >>> Konstantin's talk mentions static variables can lead to dead locks, etc, >>> I don't know if the loss of jdbc connection is also related to this. Btw, I >>> am using JDBC to write to HBase, maybe it also matters. >>> >>> >>> On Thu, Oct 17, 2019 at 2:32 AM John Smith <java.dev....@gmail.com> >>> wrote: >>> >>>> Xin. The open() close() cycle of a Sink function is only called once so >>>> I don't think you event need to have it static your pool. Someone can >>>> confirm this? >>>> >>>> Miki the JDBC Connector lacks some functionality for instance it only >>>> flushes batches when the batch interval is reached. So if you set batch >>>> interval to 5 and you get 6 records the 6 one will not be flushed to the DB >>>> until you get another 4. You can see in the code above Xin has put a timer >>>> based flush as well. Also JDBC connector does not have checkpointing if you >>>> ever need that, which is a surprise because most JDBC databases have >>>> transactions so it would be nice to have. >>>> >>>> On Wed, 16 Oct 2019 at 10:58, miki haiat <miko5...@gmail.com> wrote: >>>> >>>>> If it's a sink that use jdbc, why not using the flink Jdbcsink >>>>> connector? >>>>> >>>>> >>>>> On Wed, Oct 16, 2019, 17:03 Xin Ma <kevin.xin...@gmail.com> wrote: >>>>> >>>>>> I have watched one of the recent Flink forward videos, Apache Flink >>>>>> Worst Practices by Konstantin Knauf. The talk helps me a lot and mentions >>>>>> that we should avoid using static variables to share state between tasks. >>>>>> >>>>>> So should I also avoid static database connection? Because I am >>>>>> facing a weird issue currently, the database connection will lose at some >>>>>> point and bring the whole job down. >>>>>> >>>>>> *I have created a database tool like this, * >>>>>> >>>>>> public class Phoenix { >>>>>> >>>>>> private static ComboPooledDataSource dataSource = new >>>>>> ComboPooledDataSource(); >>>>>> static { >>>>>> try { >>>>>> >>>>>> dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName", >>>>>> "org.apache.phoenix.jdbc.PhoenixDriver")); >>>>>> dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url", >>>>>> null)); >>>>>> dataSource.setMaxPoolSize(200); >>>>>> dataSource.setMinPoolSize(10); >>>>>> Properties properties = new Properties(); >>>>>> properties.setProperty("user", "---"); >>>>>> properties.setProperty("password", "---"); >>>>>> dataSource.setProperties(properties); >>>>>> } catch (PropertyVetoException e) { >>>>>> throw new RuntimeException("phoenix datasource conf >>>>>> error"); >>>>>> } >>>>>> } >>>>>> >>>>>> private static Connection getConn() throws SQLException { >>>>>> return dataSource.getConnection(); >>>>>> } >>>>>> >>>>>> public static < T > T executeQuery(String sql, Caller < T > >>>>>> caller) throws SQLException { >>>>>> // .. execiton logic >>>>>> } >>>>>> >>>>>> public static int executeUpdateWithTx(List < String > sqlList) >>>>>> throws SQLException { >>>>>> // ..update logic >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> *Then I implemented my customized sink function like this,* >>>>>> >>>>>> public class CustomizedSink extends RichSinkFunction < Record > { >>>>>> >>>>>> private static Logger LOG = >>>>>> LoggerFactory.getLogger("userFlinkLogger"); >>>>>> private static final int batchInsertSize = 5000; >>>>>> private static final long flushInterval = 60 * 1000 L; >>>>>> private long lastFlushTime; >>>>>> private BatchCommit batchCommit; >>>>>> private ConcurrentLinkedQueue < Object > cacheQueue; >>>>>> private ExecutorService threadPool; >>>>>> >>>>>> @Override >>>>>> public void open(Configuration parameters) throws Exception { >>>>>> cacheQueue = new ConcurrentLinkedQueue < > (); >>>>>> threadPool = Executors.newFixedThreadPool(1); >>>>>> batchCommit = new BatchCommit(); >>>>>> super.open(parameters); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void invoke(DriverLbs driverLbs) throws Exception { >>>>>> cacheQueue.add(driverLbs); >>>>>> if (cacheQueue.size() >= batchInsertSize || >>>>>> System.currentTimeMillis() - lastFlushTime >= >>>>>> flushInterval) { >>>>>> lastFlushTime = System.currentTimeMillis(); >>>>>> threadPool.execute(batchCommit); >>>>>> } >>>>>> } >>>>>> >>>>>> private class BatchCommit implements Runnable { >>>>>> @Override >>>>>> public void run() { >>>>>> try { >>>>>> int ct; >>>>>> synchronized(cacheQueue) { >>>>>> List < String > sqlList = Lists.newArrayList(); >>>>>> for (ct = 0; ct < batchInsertSize; ct++) { >>>>>> Object obj = cacheQueue.poll(); >>>>>> if (obj == null) { >>>>>> break; >>>>>> } >>>>>> sqlList.add(generateBatchSql((Record) obj)); >>>>>> } >>>>>> Phoenix.executeUpdateWithTx(sqlList); >>>>>> } >>>>>> LOG.info("Batch insert " + ct + " cache records"); >>>>>> } catch (Exception e) { >>>>>> LOG.error("Batch insert error: ", e); >>>>>> } >>>>>> } >>>>>> >>>>>> private String generateBatchSql(Record record) { >>>>>> // business logic >>>>>> } >>>>>> } >>>>>> } >>>>>> >>>>>> *Is there any good idea to refactor the codes?* >>>>>> >>>>>> Best, >>>>>> Kevin >>>>>> >>>>>>