I am trying to use the task number as a keyby value to help fan out the work 
load reading from kafka.

Given:
        DataStream<SchemaRecord> stream =                env.addSource(new 
FlinkKafkaConsumer010<SchemaRecord>("topicA", schema, properties)               
 ).setParallelism(240).flatMap(new SchemaRecordSplit()).setParallelism(240).    
                    name("TopicA splitter").keyBy("partition", "keyByHelper", 
"schemaId");
        stream.addSink(new CustomMaprFsSink()).name("TopicA 
Sink").setParallelism(240);

In the DeserialClass I am trying to get to the 
getRuntimeContext().getIndexOfThisSubtask(); 

Which is only avaliable in the RichSinkFunction


The above is partition (by hour) , schemaID (avro schemaId) and I would like to 
add the task number so that all 240 readers / writers have something to do.
Any ideas ?


Reply via email to