Hi everyone, I noticed that the Flink state contains KafkaIO's consumer config properties.
When restarting the Beam pipeline (Java SDK) from state, the Flink Runner translation layer will deserialize the KafkaUnboudedReader (via UnboundedSourceWrapper) from Flink's state. This happens *before* the user written KafkaIO builder code is executed. Effectively what this means is that if the user has code that feeds KafkaIO correct file paths (probably fetched from configs), Beam still tries to use the ones that were saved in the Flink state and those may be outdated, hence preventing the pipeline from starting up properly. This is problematic if files get moved around on disk, or if we move the Flink state to another machine that may have different file configurations. Has anyone seen this problem before? Also, could anyone give me a quick overview of why Beam saves so many things in the Flink state (I'm aware of coders, transforms and transform configs) when those things can be materialized from the user code just like when the pipeline is started without a state. It would help me find a workaround for this issue. Thanks, Cristian