Hi KristoffSC, Afaik asyncIO does not support state operations at all because of your mentioned issues (RichAsyncFunction fails if you access state).
I'd probably solve it by having a map or process function before and after the asyncIO for the state operations. If you enable object reuse, performance should be pretty much the same as if async I/O would support it, but the threading model becomes much easier. So, the pipeline is source -> keyby -> map (retrieve state) -> async IO (use state) -> map (update state). You might need to return Tuple<Key, State> from map and asyncIO to have the full context information on the subsequent operators. On Mon, Aug 10, 2020 at 4:24 PM KristoffSC <krzysiek.chmielew...@gmail.com> wrote: > Hi guys, > I'm using Flink 1.9.2 > > I have a question about uses case where I would like to use FLink's managed > keyed state with Async IO [1] > > > Lets take as a base line below example taken from [1] and lets assume that > we are executing this on a keyed stream. > > final Future<String> result = client.query(key); > > CompletableFuture.supplyAsync(new Supplier<String>() { > > @Override > public String get() { > try { > return result.get(); > } catch (InterruptedException | ExecutionException e) { > // Normally handled explicitly. > return null; > } > } > }).thenAccept( (String dbResult) -> { > resultFuture.complete(Collections.singleton(new Tuple2<>(key, > dbResult))); > }); > > > Imagine that instead passing key to client.query(..) we will pass some > value > taken from Flinks Managed, keyed state. Later the supplier's get method > will > return a value that should be stored in that state. In other words, we use > previous results as inputs for next computations. > > Is this achievable with Flinks AsyncIo? I can have many pending requests on > client.query which can finished in a random order. The > AsyncDataStream.orderedWait will not help he here since this affects only > the way how Flink "releases" the messages from it's internal queue for > Async > operators. > > > What is more, this scenario can result with multiple concurrent > writes/reads > to/from Flink's managed state for same key values. Is this thread safe? > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng