Hello Team,

I am consuming data from kafka topic through spark structured streaming,
the topic has 3 partitions. As Spark structured streaming does not allow
you to explicitly provide group.id and assigns some random id to consumer,
I tried to check the consumer group id's using below kafka command.

./kafka-consumer-groups.sh --bootstrap-server kfk01.sboxdc.com:9092,
kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092 --list

output
 spark-kafka-source-054e8dac-bea9-46e8-9374-8298daafcd23--1587684247-driver-0
 spark-kafka-source-756c08e8-6a84-447c-8326-5af1ef0412f5-209267112-driver-0
 spark-kafka-source-9528b191-4322-4334-923d-8c1500ef8194-2006218471-driver-0

Below are my questions

1) Why does it create 3 consumer groups? Is it because of 3 partitions?
2) Is there any way I can get these consumer group names in spark
application?
3) Even though my spark application was still running, after some time
these group names didn't show up in consumer groups list. Is this because
all the data was consumed by spark application and there was no more data
in that kafka topic?
4) If my assumption is right about point 3, will it create new consumer
group id if new data arrives or the name of consumer group will remain same?

Below is my read stream

  val inputDf = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("subscribe", topic)
 // .option("assign"," {\""+topic+"\":[0]}")
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", 60000)
  .load()
The streaming query is used only once in the code and there are no joins.

Execution plan

== Parsed Logical Plan ==
 StreamingRelationV2
org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka,
Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe
-> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,
kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9,
partition#10, offset#11L, timestamp#12, timestampType#13],
StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger
-> 60000, startingOffsets -> earliest, subscribe -> downloading,
kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,
kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3,
offset#4L, timestamp#5, timestampType#6]

== Analyzed Logical Plan ==
key: binary, value: binary, topic: string, partition: int, offset: bigint,
timestamp: timestamp, timestampType: int
StreamingRelationV2
org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka,
Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe
-> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,
kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9,
partition#10, offset#11L, timestamp#12, timestampType#13],
StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger
-> 60000, startingOffsets -> earliest, subscribe -> downloading,
kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,
kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3,
offset#4L, timestamp#5, timestampType#6]

== Optimized Logical Plan ==
StreamingRelationV2
org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka,
Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe
-> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,
kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9,
partition#10, offset#11L, timestamp#12, timestampType#13],
StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger
-> 60000, startingOffsets -> earliest, subscribe -> downloading,
kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,
kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3,
offset#4L, timestamp#5, timestampType#6]

== Physical Plan ==
StreamingRelation kafka, [key#7, value#8, topic#9, partition#10,
offset#11L, timestamp

I've also posted the problem on stackoverflow
https://stackoverflow.com/questions/54924666/identify-kafka-consumer-group-name

Thanks,
Sneha

Reply via email to