Ahmet Gürbüz created FLINK-27348: ------------------------------------ Summary: Flink KafkaSource doesn't set groupId Key: FLINK-27348 URL: https://issues.apache.org/jira/browse/FLINK-27348 Project: Flink Issue Type: Bug Components: API / Scala Affects Versions: 1.14.4 Environment: OS: windows 8.1.
Java version: java version "11.0.13" 2021-10-19 LTS Java(TM) SE Runtime Environment 18.9 (build 11.0.13+10-LTS-370) Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.13+10-LTS-370, mixed mode) Reporter: Ahmet Gürbüz Attachments: image-2022-04-22-05-43-06-475.png, image-2022-04-22-05-44-56-494.png, image-2022-04-22-05-46-45-592.png I have one very simple Flink application. I have installed kafka in my local and I am reading data from kafka with flink. I am using KafkaSource class in Flink. Although I have assigned GroupId with setGroupId, this groupId does not appear in Kafka. {code:java} object FlinkKafkaSource extends App { val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI() case class Event(partitionNo:Long, eventTime:String, eventTimestamp:Long, userId:String, firstName:String) implicit val readsEvent: Reads[Event] = Json.reads[Event] env .fromSource(KafkaSource.builder[Event] .setBootstrapServers("localhost:9092") .setTopics("flink-connection") .setGroupId("test-group") // I can't see this groupId in kafka-consumer-groups .setStartingOffsets(OffsetsInitializer.latest) .setDeserializer(new KafkaRecordDeserializationSchema[Event] { override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[Event]): Unit = { val rec = record.value.map(_.toChar).mkString Try(Json.fromJson[Event](Json.parse(rec)).get) match { case Success(event) => out.collect(event) case Failure(exception) => println(s"Couldn't parse string: $rec, error: ${exception.toString}") } } override def getProducedType: TypeInformation[Event] = createTypeInformation[Event] }) .build, WatermarkStrategy.noWatermarks[Event], "kafka-source" ) .keyBy(l => l.userId) .print env.execute("flink-kafka-source") } {code} I have created a topic in kafka named "flink-connection". I am using a simple kafka-python producer to produce data flink-connection topic. !image-2022-04-22-05-43-06-475.png|width=762,height=161! I am able to consume data from kafka. !image-2022-04-22-05-44-56-494.png! But can't see the groupId in kafka-consumer-groups !image-2022-04-22-05-46-45-592.png! -- This message was sent by Atlassian Jira (v8.20.7#820007)