My flink job loads several configuration files that contain job, operator and business configuration. One of the operators is an AsyncOperator with function like so:
class AsyncFun(config: T) extends RichAsyncFunction[X, Y] { @transient private lazy val client = f(config, metricGroup, etc.) @transient private lazy val metricGroup = ... def asyncInvoke(....) } The variables are declared lazily as an alternative to implementing the open method. This is un-avoidable as we're relying on flink's monitoring libraries. Application resumes from checkpoint upon unexpected termination. However, sometimes I want to change the parameter config that's passed as a constructor argument but it doesn't work as Flink tries to restore from the submittedJobGraph. This makes sense as Flink by itself doesn't know whether its recovering from an abrupt termination and must therefore rely on old config to build client or whether to start afresh. I want to know what options do we have to allow for configuration changes (i.e. re-initializing the operators): 1. Is there any way to restore from a checkpoint as well as recreate client using newer configuration? 2. If we take a savepoint (drain and save) and then resume the job, then will the configuration changes happen? 3. Will we have to move away from flink monitoring so as to initialize the client inside the constructor? 4. One option is to remove the constructor argument entirely and load config inside the open method. I want to know how this can be done without exposing the entire application configuration. I could store the configuration inside job parameters (by somehow converting this object to a map which I don't want to) but how to load it back as this operator function is used by multiple operators? 5. Any other option? For functions that aren't AsyncFunction, is leveraging BroadcastState the only way to dynamically update configuration? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/