I am trying to use the task number as a keyby value to help fan out the work load reading from kafka.
Given: DataStream<SchemaRecord> stream = env.addSource(new FlinkKafkaConsumer010<SchemaRecord>("topicA", schema, properties) ).setParallelism(240).flatMap(new SchemaRecordSplit()).setParallelism(240). name("TopicA splitter").keyBy("partition", "keyByHelper", "schemaId"); stream.addSink(new CustomMaprFsSink()).name("TopicA Sink").setParallelism(240); In the DeserialClass I am trying to get to the getRuntimeContext().getIndexOfThisSubtask(); Which is only avaliable in the RichSinkFunction The above is partition (by hour) , schemaID (avro schemaId) and I would like to add the task number so that all 240 readers / writers have something to do. Any ideas ?