Hi Team, Please find the query below.
Use Case: Using parallelism.default property mentioned in flink-conf.yaml file to enable system-level parallelism in flink configuration. Issue: Even after setting the parallelism.default to 3, on config start the configuration starts with parallelism as 1. On debugging the code we found that the value of parallelism.default in Configuration object instantiated inside handleRequest() method of JarRunHandler class(Line Number - 90) is initially set to 3 but it is changed to 1 in applyToConfiguration method of JarHandlerUtils.JarHandlerContext class(Line Number - 132) which is called from handleRequest method of JarRunHandler(Line Number - 95). Flink Version - 1.12.1 Job Code - public class FlinkJob { public static void main(String[] args) throws Exception { String TOPIC_IN = args[0]; String TOPIC_OUT = args[1]; String BOOTSTRAP_SERVER = args[2]; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.put("bootstrap.servers", BOOTSTRAP_SERVER); props.put("group.id","kc1"); KafkaDeserializationSchema<Object> deserializationSchema = new MMDeserializer<>("org.gamezone.GameZoneInput"); FlinkKafkaConsumer<Object> kafkaConsumer = new FlinkKafkaConsumer<>(TOPIC_IN, deserializationSchema, props); kafkaConsumer.setStartFromLatest(); Properties prodProps = new Properties(); prodProps.put("bootstrap.servers", BOOTSTRAP_SERVER); FlinkKafkaProducer<Object> kafkaProducer = new FlinkKafkaProducer<>(TOPIC_OUT, ((value, timestamp) -> new ProducerRecord<>(TOPIC_OUT, "myKey".getBytes(), value.toString().getBytes())), prodProps, Semantic.AT_LEAST_ONCE); DataStream<Object> stream = env.addSource(kafkaConsumer); KeySelector<Object,Serializable> keys = create(); KeyedStream<Object, Serializable> playerId = stream.keyBy(keys); playerId.process(new KeyedAggregateFunction(), TypeInformation.of(new TypeHint<Object>() {})).addSink(kafkaProducer); env.execute(); } public static KeySelector<Object, Serializable> create() { return record -> { final Serializable key = ((GameZoneInput)record).getPlayerId(); return key; }; } Any leads would be appreciated. Thanks Mahima