Hi Venn,

     I think `AsyncFunction#asyncInvoke` should be used to submit asynchronous 
tasks for the input records instead of executing the tasks directly. However, 
it seems that in the code fragment, the query is executed directly in the 
asyncInvoke method.

    I think you may also find more information in the document page [1]. A 
point might need to be noted is that in the example of the document page, the 
call to the `client#query` returns a Future, thus is is an asynchronous action 
instead of executing the query directly.

   [1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
Best,
Yun


------------------------------------------------------------------
From:venn <wxchunj...@163.com>
Send Time:2019 Jul. 9 (Tue.) 19:54
To:user <user@flink.apache.org>
Subject:Flink Async io problem


Hi Flink experts,
            I’m working flink async io program for stream join outer 
database(mysql),but found sync,please give some advice, or provide some async 
demo. thanks 

asyncInvoke method are as follow:


@Override
public void asyncInvoke(AsyncUser asyncUser, ResultFuture<AsyncUser> 
resultFuture) throws Exception {
// 使用 asyncUser id 查询
ps.setString(1, asyncUser.getId());
ResultSet rs = ps.executeQuery();

CompletableFuture.supplyAsync(new Supplier<AsyncUser>() {
@Override
public AsyncUser get() {
try {
if (!rs.isClosed() && rs.next()) {
asyncUser.setPhone(rs.getString(1));
}
            } catch (SQLException e) {
                e.printStackTrace();
}
return asyncUser;
}
    }).thenAccept((AsyncUser tmp) -> {
        List<AsyncUser> list = new ArrayList();
list.add(tmp);
resultFuture.complete(list);
});
}



Best, Venn

Reply via email to