Thank you very much! It seems like you have a quite similar goal. However, could you clarify: do you maintain the stream order on key level, or do you just limit the parallel requests per key to one without caring about the order?
I'm not 100% sure how your implementation with futures is done. If you are able to share a code snippet that would be much appreciated! I'm also wondering what kind of memory implication that implementation has: would the futures be queued inside the operator without any limit? Would it be a problem if the same key has too many records within the same time window? But I suppose the function can be made blocking to protect against that. On Tue, Jun 20, 2023 at 3:34 PM Galen Warren <ga...@cvillewarrens.com.invalid> wrote: > Hi Juho -- I'm doing something similar. In my case, I want to execute async > requests concurrently for inputs associated with different keys but issue > them sequentially for any given key. The way I do it is to create a keyed > stream and use it as an input to an async function. In this arrangement, > all the inputs for a given key are handled by a single instance of the > async function; inside that function instance, I use a map to keep track of > any in-flight requests for a given key. When a new input comes in for a > key, if there is an existing in-flight request for that key, the future > that is constructed for the new request is constructed as [existing > request].then([new request]) so that the new one is only executed once the > in-flight request completes. The futures are constructed in such a way that > they maintain the map properly after completing. > > > On Mon, Jun 19, 2023 at 10:55 AM Juho Autio <juho.au...@rovio.com.invalid> > wrote: > > > I need to make some slower external requests in parallel, so Async I/O > > helps greatly with that. However, I also need to make the requests in a > > certain order per key. Is that possible with Async I/O? > > > > The documentation[1] talks about preserving the stream order of > > results, but it doesn't discuss the order of the async requests. I tried > to > > use AsyncDataStream.orderedWait, but the order of async requests seems to > > be random – the order of calls gets shuffled even if I > > use AsyncDataStream.orderedWait. > > > > If that is by design, would there be any suggestion how to work around > > that? I was thinking of collecting all events of the same key into a > > List, so that the async operator gets a list instead of individual > events. > > There are of course some downsides with using a List, so I would rather > > have something better. > > > > In a nutshell my code is: > > > > AsyncDataStream.orderedWait(stream.keyBy(key), asyncFunction) > > > > The asyncFunction extends RichAsyncFunction. > > > > Thanks! > > > > [1] > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#order-of-results > > > > (Sorry if it's not appropriate to post this type of question to the dev > > mailing list. I tried the Flink users list with no luck.) > >