Hi Alex, your approach is completely valid. What you want to achieve is that you have a chain between your state managing operator and the consuming async operations. In that way, you have no serialization overhead.
To achieve that you want to - use Flink 1.11+ [1] - make sure that if you have a legacy source, you disableChaining before your state managing operator as asyncIO cannot be (transitively) chained to legacy sources. So it should be source -> ... -> (forward channel) -> (state managing operator -> async1 -> async2 -> ... ) ... -> sink - enableObjectReuse [2] to avoid copying of objects [1] https://issues.apache.org/jira/browse/FLINK-16219 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/execution_configuration.html On Thu, Apr 8, 2021 at 3:26 AM Alex Cruise <a...@cluonflux.com> wrote: > Thanks Austin! I'll proceed with my idea, and keep the bootstrap config :) > > -0xe1a > > On Wed, Apr 7, 2021 at 2:18 PM Austin Cawley-Edwards < > austin.caw...@gmail.com> wrote: > >> Hey Alex, >> >> I'm not sure if there is a best practice here, but what I can tell you is >> that I worked on a job that did exactly what you're suggesting with a >> non-async operator to create a [record, config] tuple, which was then >> passed to the async stage. Our config objects were also not tiny (~500kb) >> and our pipeline not huge (~1M records/day and 1GB data/ day), but this >> setup worked quite well. I'd say if latency isn't your most important >> metric, or if your pipeline is a similar size, the ease of async IO is >> worth it. >> >> One thing you'll have to look out for (if you haven't already) is >> bootstrapping the config objects when the job starts, since the broadcast >> from the polling source can happen later than recieving the first record – >> we solved this by calling the polling source's service in the `open()` >> method of the non-async operator and storing the initial configs in memory. >> >> Hope that helps a bit, >> Austin >> >> On Wed, Apr 7, 2021 at 4:39 PM Alex Cruise <a...@cluonflux.com> wrote: >> >>> Hi folks, >>> >>> I have a somewhat complex Flink job that has a few async stages, and a >>> few stateful stages. It currently loads its configuration on startup, and >>> doesn't attempt to refresh it. >>> >>> Now I'm working on dynamic reconfiguration. I've written a polling >>> source which sends a configuration snapshot whenever anything has changed, >>> I've set up a broadcast of that source, and I'm updating the operators in >>> the data (i.e. not config) stream to be BroadcastProcessFunctions. But now >>> I've reached the first async operator, and I recall that async functions >>> aren't allowed to be stateful. >>> >>> I've tried to find a best practice for this situation, without much >>> luck. My best idea so far is to insert a new stage before the async one, >>> which would tuple up each record with its corresponding config snapshot >>> from the most recent broadcast state. This would increase the amount of >>> data that needs to be serialized, and some of the configs are quite large, >>> but would allow me to continue using async IO. >>> >>> Any suggestions? >>> >>> Thanks! >>> >>> -0xe1a >>> >>