Thanks Austin! I'll proceed with my idea, and keep the bootstrap config :) -0xe1a
On Wed, Apr 7, 2021 at 2:18 PM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote: > Hey Alex, > > I'm not sure if there is a best practice here, but what I can tell you is > that I worked on a job that did exactly what you're suggesting with a > non-async operator to create a [record, config] tuple, which was then > passed to the async stage. Our config objects were also not tiny (~500kb) > and our pipeline not huge (~1M records/day and 1GB data/ day), but this > setup worked quite well. I'd say if latency isn't your most important > metric, or if your pipeline is a similar size, the ease of async IO is > worth it. > > One thing you'll have to look out for (if you haven't already) is > bootstrapping the config objects when the job starts, since the broadcast > from the polling source can happen later than recieving the first record – > we solved this by calling the polling source's service in the `open()` > method of the non-async operator and storing the initial configs in memory. > > Hope that helps a bit, > Austin > > On Wed, Apr 7, 2021 at 4:39 PM Alex Cruise <a...@cluonflux.com> wrote: > >> Hi folks, >> >> I have a somewhat complex Flink job that has a few async stages, and a >> few stateful stages. It currently loads its configuration on startup, and >> doesn't attempt to refresh it. >> >> Now I'm working on dynamic reconfiguration. I've written a polling source >> which sends a configuration snapshot whenever anything has changed, I've >> set up a broadcast of that source, and I'm updating the operators in the >> data (i.e. not config) stream to be BroadcastProcessFunctions. But now I've >> reached the first async operator, and I recall that async functions aren't >> allowed to be stateful. >> >> I've tried to find a best practice for this situation, without much luck. >> My best idea so far is to insert a new stage before the async one, which >> would tuple up each record with its corresponding config snapshot from the >> most recent broadcast state. This would increase the amount of data that >> needs to be serialized, and some of the configs are quite large, but would >> allow me to continue using async IO. >> >> Any suggestions? >> >> Thanks! >> >> -0xe1a >> >