Thank you! This was very helpful.
Sincerely, Marco A. Villalobos > On Aug 13, 2020, at 1:24 PM, Arvid Heise <[email protected]> wrote: > > Hi Marco, > > you don't need to use an async library; you could simply write your code in > async fashion. > > I'm trying to sketch the basic idea using any JDBC driver in the following > (it's been a while since I used JDBC, so don't take it too literally). > > private static class SampleAsyncFunction extends RichAsyncFunction<Integer, > String> { > private transient ExecutorService executorService; > private transient Connection dbConn; > private transient PreparedStatement preparedStatement; > > SampleAsyncFunction(<connection info>) { > this.<connection info > = <connection info>; > } > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > executorService = Executors.newFixedThreadPool(30); > dbConn = DriverManager.getConnection( < connection info >); > preparedStatement = dbConn.prepareStatement("SELECT * FROM WHERE ..."); > } > > @Override > public void close() throws Exception { > super.close(); > executorService.shutdownNow(); > preparedStatement.close(); > dbConn.close(); > } > > @Override > public void asyncInvoke(final Integer input, final ResultFuture<String> > resultFuture) { > executorService.submit(() -> { > try { > preparedStatement.setInt(0, input); > final ResultSet resultSet = preparedStatement.executeQuery(); > > resultFuture.complete(Arrays.asList(resultSet.getString(0))); > } catch (SQLException e) { > resultFuture.completeExceptionally(e); > } > }); > } > } > That's basically what all async libraries are doing behind the scenes > anyways: spawn a thread pool and call the callbacks when a submitted task > finishes. > > To decide on the size of the thread pool, you should do some measurements > without Flink on how many queries you can execute in parallel. Also keep in > mind that if your async IO is run in parallel on the same task manager, that > your threads will multiply (you can also use a static, shared executor, but > it's a bit tricky to shutdown). > > On Wed, Aug 12, 2020 at 8:16 PM KristoffSC <[email protected] > <mailto:[email protected]>> wrote: > Hi, > I do believe that example from [1] where you see DatabaseClient is just a > hint that whatever library you would use (db or REST based or whatever else) > should be asynchronous or should actually not block. It does not have to be > non blocking until it runs on its own thread pool that will return a feature > or somewhat allowing you to register resultFuture.complete(...) on that > future. > > I actually write my own semi library that registers onto > resultFuture.complete(...) from each library thread. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html > > <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html> > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> > > > -- > Arvid Heise | Senior Java Developer > <https://www.ververica.com/> > Follow us @VervericaData > -- > Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference > Stream Processing | Event Driven | Real Time > -- > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng
