I have to make my flink job dynamically configurable and I'm thinking about using broadcast state. My current static job configuration file consists of configuration of entire set of operators which I load into a case class and then I explicitly pass the relevant configuration of each operator as its constructor parameters. Will I have to create individual broadcast streams for each operator? I.e
val o1conf: BroadcastStream[O1Conf] = ... someStream.connect(o1conf).map(...) someOtherStream.connect(o2conf).flatMap(...) and so on? 1. Is there a way to just load the configuration as a whole but only pick the right subset in the connect method like so: someStream.connect(jobConfig.o1Conf).map(...) My job has several operators and it seems rather clumsy to have to instantiate 1 broadcast stream for each dynamically configurable operator. 2. Is there a way to guarantee that processElement isn't called before the first processBroadcastElement will be called? How else can we ensure that each operator always starts with valid configuration? Passing the same configuration as constructor parameters is one way to deal with it but its clumsy because that's just repetition of code. Loading configuration in open method is even worse because each operator will now have access to entire job configuration. 3. What can we do to make source and sink connectors dynamically configurable? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/