Hello Team, I am consuming data from kafka topic through spark structured streaming, the topic has 3 partitions. As Spark structured streaming does not allow you to explicitly provide group.id and assigns some random id to consumer, I tried to check the consumer group id's using below kafka command.
./kafka-consumer-groups.sh --bootstrap-server kfk01.sboxdc.com:9092, kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092 --list output spark-kafka-source-054e8dac-bea9-46e8-9374-8298daafcd23--1587684247-driver-0 spark-kafka-source-756c08e8-6a84-447c-8326-5af1ef0412f5-209267112-driver-0 spark-kafka-source-9528b191-4322-4334-923d-8c1500ef8194-2006218471-driver-0 Below are my questions 1) Why does it create 3 consumer groups? Is it because of 3 partitions? 2) Is there any way I can get these consumer group names in spark application? 3) Even though my spark application was still running, after some time these group names didn't show up in consumer groups list. Is this because all the data was consumed by spark application and there was no more data in that kafka topic? 4) If my assumption is right about point 3, will it create new consumer group id if new data arrives or the name of consumer group will remain same? Below is my read stream val inputDf = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", brokers) .option("subscribe", topic) // .option("assign"," {\""+topic+"\":[0]}") .option("startingOffsets", "earliest") .option("maxOffsetsPerTrigger", 60000) .load() The streaming query is used only once in the code and there are no joins. Execution plan == Parsed Logical Plan == StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092, kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092, kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6] == Analyzed Logical Plan == key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092, kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092, kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6] == Optimized Logical Plan == StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092, kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092, kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6] == Physical Plan == StreamingRelation kafka, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp I've also posted the problem on stackoverflow https://stackoverflow.com/questions/54924666/identify-kafka-consumer-group-name Thanks, Sneha