Hi Lei, this really depends on your user code. Flink will give you the guarantee that `asyncInvoke` is called in order (first `update table table1 set a = 1` and then `update table table1 set a = 2`). However, what `CompletableFuture.supplyAsync` does is not under control of Flink. Concretely, it will use the ForkJoinPool#commonPool() to execute the update method on your table. I believe that this executor won't give you any order guarantees wrt the submitted tasks. Hence, it should be able that you see 1 in your database instead of 2.
Cheers, Till On Wed, Aug 19, 2020 at 8:25 AM wangl...@geekplus.com <wangl...@geekplus.com> wrote: > > Read kafka message and write the message to MySQL > Writing to database is the bottleneck when too much message is sent to > kafka with high throughput. > > So i want to change the operator to asynchronously. > > public void asyncInvoke(ObjectNode node, ResultFuture<Integer> > resultFuture) throws Exception { > > SqlProvider sqlProvider = new SqlProvider(); > > String tableName = node.get("metadata").get("topic").asText(); > String sql = sqlProvider.getSql(node); > CompletableFuture.supplyAsync(new Supplier<Integer>() { > @Override > public Integer get() { > return Integer.valueOf(namedTemplate.update(sql, > sqlProvider.getSqlParaMap())); > } > }).thenAccept((Integer res) -> { > resultFuture.complete(Collections.singletonList(res)); > }); > > } > > > I want to know if there will be order issues. > For example the kafaka message order is: > First: update table table1 set a = 1 > Second: update table table1 set a = 2 > > It is possible that the final value in database is 1 instead of 2. > > Thanks > Lei > ------------------------------ > wangl...@geekplus.com <wangl...@geekplus.com.cn> > >