Hi, I am trying to test a Flink app with the MiniClusterWithClientResource as per the testing document at https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/testing/#junit-rule-miniclusterwithclientresource. For the tests, I need to use a custom Kryo serializer that I am trying to register with the Configuration object. However, the stream environment does not show me the appropriate configuration. What am I doing wrong here? I don't see any errors or warnings in my logs either. MVCE for the issue below.
Thanks, Munir import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import java.util.List; public class ConfigIssue { static Configuration config() { Configuration config = new Configuration(); config.set( PipelineOptions.SERIALIZATION_CONFIG, List.of( "org.apache.avro.generic.GenericRecord: {type: kryo, kryo-type: default, class: com.example.MyKryoSerializer")); return config; } @ClassRule public static MiniClusterWithClientResource FLINK_CLUSTER = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder().setConfiguration(config()).build()); @Test public void testConfig() { Assert.assertNotNull(FLINK_CLUSTER.getMiniCluster().getConfiguration().get(PipelineOptions.SERIALIZATION_CONFIG)); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // This fails Assert.assertNotNull(env.getConfiguration().get(PipelineOptions.SERIALIZATION_CONFIG)); } }