Hi everyone,

I noticed that the Flink state contains KafkaIO's consumer config properties.

When restarting the Beam pipeline (Java SDK) from state, the Flink
Runner translation layer will deserialize the KafkaUnboudedReader (via
UnboundedSourceWrapper) from Flink's state. This happens *before* the
user written KafkaIO builder code is executed. Effectively what this
means is that if the user has code that feeds KafkaIO correct file
paths (probably fetched from configs), Beam still tries to use the
ones that were saved in the Flink state and those may be outdated,
hence preventing the pipeline from starting up properly.

This is problematic if files get moved around on disk, or if we move
the Flink state to another machine that may have different file
configurations.

Has anyone seen this problem before?

Also, could anyone give me a quick overview of why Beam saves so many
things in the Flink state (I'm aware of coders, transforms and
transform configs) when those things can be materialized from the user
code just like when the pipeline is started without a state. It would
help me find a workaround for this issue.

Thanks,
Cristian

Reply via email to