Hi,

I am trying to test a Flink app with the MiniClusterWithClientResource as
per the testing document at
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/testing/#junit-rule-miniclusterwithclientresource.
For the tests, I need to use a custom Kryo serializer that I am trying to
register with the Configuration object. However, the stream environment
does not show me the appropriate configuration. What am I doing wrong here?
I don't see any errors or warnings in my logs either. MVCE for the issue
below.

Thanks,
Munir

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

import java.util.List;

public class ConfigIssue {
    static Configuration config() {
        Configuration config = new Configuration();
        config.set(
                PipelineOptions.SERIALIZATION_CONFIG,
                List.of(
                        "org.apache.avro.generic.GenericRecord: {type:
kryo, kryo-type: default, class: com.example.MyKryoSerializer"));
        return config;
    }

    @ClassRule
    public static MiniClusterWithClientResource FLINK_CLUSTER =
            new MiniClusterWithClientResource(
                    new
MiniClusterResourceConfiguration.Builder().setConfiguration(config()).build());

    @Test
    public void testConfig() {

Assert.assertNotNull(FLINK_CLUSTER.getMiniCluster().getConfiguration().get(PipelineOptions.SERIALIZATION_CONFIG));

        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        // This fails

Assert.assertNotNull(env.getConfiguration().get(PipelineOptions.SERIALIZATION_CONFIG));
    }
}

Reply via email to