Hi Team,

Please find the query below.

Use Case: Using parallelism.default property mentioned in flink-conf.yaml
file to enable system-level parallelism in flink configuration.

Issue: Even after setting the parallelism.default to 3, on config start the
configuration starts with parallelism as 1.

On debugging the code we found that the value of parallelism.default in
Configuration object instantiated inside handleRequest() method of
JarRunHandler class(Line Number - 90) is initially set to 3 but it is
changed to 1 in applyToConfiguration method of
JarHandlerUtils.JarHandlerContext class(Line Number - 132) which is called
from handleRequest method of JarRunHandler(Line Number - 95).

Flink Version - 1.12.1
Job Code -

public class FlinkJob
{

    public static void main(String[] args) throws Exception
    {

        String TOPIC_IN = args[0];
        String TOPIC_OUT = args[1];
        String BOOTSTRAP_SERVER = args[2];
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        props.put("bootstrap.servers", BOOTSTRAP_SERVER);
        props.put("group.id","kc1");

        KafkaDeserializationSchema<Object> deserializationSchema = new
MMDeserializer<>("org.gamezone.GameZoneInput");

        FlinkKafkaConsumer<Object> kafkaConsumer = new
FlinkKafkaConsumer<>(TOPIC_IN, deserializationSchema, props);
        kafkaConsumer.setStartFromLatest();

        Properties prodProps = new Properties();
        prodProps.put("bootstrap.servers", BOOTSTRAP_SERVER);

        FlinkKafkaProducer<Object> kafkaProducer =
                new FlinkKafkaProducer<>(TOPIC_OUT,
                        ((value, timestamp) -> new
ProducerRecord<>(TOPIC_OUT, "myKey".getBytes(),
value.toString().getBytes())),
                        prodProps,
                        Semantic.AT_LEAST_ONCE);

        DataStream<Object> stream = env.addSource(kafkaConsumer);
        KeySelector<Object,Serializable> keys = create();
        KeyedStream<Object, Serializable> playerId = stream.keyBy(keys);

        playerId.process(new KeyedAggregateFunction(),
TypeInformation.of(new TypeHint<Object>()
        {})).addSink(kafkaProducer);

        env.execute();
    }

    public static KeySelector<Object, Serializable> create()
    {
        return record ->
        {
            final Serializable key = ((GameZoneInput)record).getPlayerId();
            return key;
        };
    }

Any leads would be appreciated.

Thanks
Mahima

Reply via email to