Hi Venn, I think `AsyncFunction#asyncInvoke` should be used to submit asynchronous tasks for the input records instead of executing the tasks directly. However, it seems that in the code fragment, the query is executed directly in the asyncInvoke method.
I think you may also find more information in the document page [1]. A point might need to be noted is that in the example of the document page, the call to the `client#query` returns a Future, thus is is an asynchronous action instead of executing the query directly. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html Best, Yun ------------------------------------------------------------------ From:venn <wxchunj...@163.com> Send Time:2019 Jul. 9 (Tue.) 19:54 To:user <user@flink.apache.org> Subject:Flink Async io problem Hi Flink experts, I’m working flink async io program for stream join outer database(mysql),but found sync,please give some advice, or provide some async demo. thanks asyncInvoke method are as follow: @Override public void asyncInvoke(AsyncUser asyncUser, ResultFuture<AsyncUser> resultFuture) throws Exception { // 使用 asyncUser id 查询 ps.setString(1, asyncUser.getId()); ResultSet rs = ps.executeQuery(); CompletableFuture.supplyAsync(new Supplier<AsyncUser>() { @Override public AsyncUser get() { try { if (!rs.isClosed() && rs.next()) { asyncUser.setPhone(rs.getString(1)); } } catch (SQLException e) { e.printStackTrace(); } return asyncUser; } }).thenAccept((AsyncUser tmp) -> { List<AsyncUser> list = new ArrayList(); list.add(tmp); resultFuture.complete(list); }); } Best, Venn