Hi Lei,

this really depends on your user code. Flink will give you the guarantee
that `asyncInvoke` is called in order (first `update table table1 set a =
1` and then `update table table1 set a = 2`). However, what
`CompletableFuture.supplyAsync` does is not under control of Flink.
Concretely, it will use the ForkJoinPool#commonPool() to execute the update
method on your table. I believe that this executor won't give you any order
guarantees wrt the submitted tasks. Hence, it should be able that you see 1
in your database instead of 2.

Cheers,
Till

On Wed, Aug 19, 2020 at 8:25 AM wangl...@geekplus.com <wangl...@geekplus.com>
wrote:

>
> Read kafka message and  write the message to MySQL
> Writing to database is the bottleneck when too much message is sent to
> kafka with high throughput.
>
> So i want to change the operator to asynchronously.
>
>  public void asyncInvoke(ObjectNode node, ResultFuture<Integer>
> resultFuture) throws Exception {
>
>     SqlProvider sqlProvider = new SqlProvider();
>
>     String tableName = node.get("metadata").get("topic").asText();
>     String sql = sqlProvider.getSql(node);
>     CompletableFuture.supplyAsync(new Supplier<Integer>() {
>         @Override
>         public Integer get() {
>             return Integer.valueOf(namedTemplate.update(sql, 
> sqlProvider.getSqlParaMap()));
>         }
>     }).thenAccept((Integer res) -> {
>         resultFuture.complete(Collections.singletonList(res));
>     });
>
> }
>
>
> I want to know if there will be order issues.
> For example the kafaka message order is:
> First:          update table table1 set a = 1
> Second:     update table table1 set a = 2
>
> It is possible that the final value in database is 1 instead of 2.
>
> Thanks
> Lei
> ------------------------------
> wangl...@geekplus.com <wangl...@geekplus.com.cn>
>
>

Reply via email to