Hi,
I am trying to test a Flink app with the MiniClusterWithClientResource as
per the testing document at
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/testing/#junit-rule-miniclusterwithclientresource.
For the tests, I need to use a custom Kryo serializer that I am trying to
register with the Configuration object. However, the stream environment
does not show me the appropriate configuration. What am I doing wrong here?
I don't see any errors or warnings in my logs either. MVCE for the issue
below.
Thanks,
Munir
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import java.util.List;
public class ConfigIssue {
static Configuration config() {
Configuration config = new Configuration();
config.set(
PipelineOptions.SERIALIZATION_CONFIG,
List.of(
"org.apache.avro.generic.GenericRecord: {type:
kryo, kryo-type: default, class: com.example.MyKryoSerializer"));
return config;
}
@ClassRule
public static MiniClusterWithClientResource FLINK_CLUSTER =
new MiniClusterWithClientResource(
new
MiniClusterResourceConfiguration.Builder().setConfiguration(config()).build());
@Test
public void testConfig() {
Assert.assertNotNull(FLINK_CLUSTER.getMiniCluster().getConfiguration().get(PipelineOptions.SERIALIZATION_CONFIG));
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// This fails
Assert.assertNotNull(env.getConfiguration().get(PipelineOptions.SERIALIZATION_CONFIG));
}
}