Hi KristoffSC,

Afaik asyncIO does not support state operations at all because of your
mentioned issues (RichAsyncFunction fails if you access state).

I'd probably solve it by having a map or process function before and after
the asyncIO for the state operations. If you enable object reuse,
performance should be pretty much the same as if async I/O would support
it, but the threading model becomes much easier.

So, the pipeline is source -> keyby -> map (retrieve state) -> async IO
(use state) -> map (update state). You might need to return Tuple<Key,
State> from map and asyncIO to have the full context information on the
subsequent operators.

On Mon, Aug 10, 2020 at 4:24 PM KristoffSC <krzysiek.chmielew...@gmail.com>
wrote:

> Hi guys,
> I'm using Flink 1.9.2
>
> I have a question about uses case where I would like to use FLink's managed
> keyed state with Async IO [1]
>
>
> Lets take as a base line below example taken from [1] and lets assume that
> we are executing this on a keyed stream.
>
> final Future<String> result = client.query(key);
>
> CompletableFuture.supplyAsync(new Supplier<String>() {
>
>             @Override
>             public String get() {
>                 try {
>                     return result.get();
>                 } catch (InterruptedException | ExecutionException e) {
>                     // Normally handled explicitly.
>                     return null;
>                 }
>             }
>         }).thenAccept( (String dbResult) -> {
>             resultFuture.complete(Collections.singleton(new Tuple2<>(key,
> dbResult)));
>         });
>
>
> Imagine that instead passing key to client.query(..) we will pass some
> value
> taken from Flinks Managed, keyed state. Later the supplier's get method
> will
> return a value that should be stored in that state. In other words, we use
> previous results as inputs for next computations.
>
> Is this achievable with Flinks AsyncIo? I can have many pending requests on
> client.query which can finished in a random order. The
> AsyncDataStream.orderedWait will not help he here since this affects only
> the way how Flink "releases" the messages from it's internal queue for
> Async
> operators.
>
>
> What is more, this scenario can result with multiple concurrent
> writes/reads
> to/from Flink's managed state for same key values. Is this thread safe?
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to