Hey Alex, I'm not sure if there is a best practice here, but what I can tell you is that I worked on a job that did exactly what you're suggesting with a non-async operator to create a [record, config] tuple, which was then passed to the async stage. Our config objects were also not tiny (~500kb) and our pipeline not huge (~1M records/day and 1GB data/ day), but this setup worked quite well. I'd say if latency isn't your most important metric, or if your pipeline is a similar size, the ease of async IO is worth it.
One thing you'll have to look out for (if you haven't already) is bootstrapping the config objects when the job starts, since the broadcast from the polling source can happen later than recieving the first record – we solved this by calling the polling source's service in the `open()` method of the non-async operator and storing the initial configs in memory. Hope that helps a bit, Austin On Wed, Apr 7, 2021 at 4:39 PM Alex Cruise <a...@cluonflux.com> wrote: > Hi folks, > > I have a somewhat complex Flink job that has a few async stages, and a few > stateful stages. It currently loads its configuration on startup, and > doesn't attempt to refresh it. > > Now I'm working on dynamic reconfiguration. I've written a polling source > which sends a configuration snapshot whenever anything has changed, I've > set up a broadcast of that source, and I'm updating the operators in the > data (i.e. not config) stream to be BroadcastProcessFunctions. But now I've > reached the first async operator, and I recall that async functions aren't > allowed to be stateful. > > I've tried to find a best practice for this situation, without much luck. > My best idea so far is to insert a new stage before the async one, which > would tuple up each record with its corresponding config snapshot from the > most recent broadcast state. This would increase the amount of data that > needs to be serialized, and some of the configs are quite large, but would > allow me to continue using async IO. > > Any suggestions? > > Thanks! > > -0xe1a >