Hi,
I'm trying to write a very basic Kafka streams consumer in Java. Once I add a KTable, I see a message in the server log that I have been unsubscribed from all topics. Doing the same with a KStream instead of KTable works fine for me. I'm using Kafka version 3.2.0 (kafka_2.13-3.2.0) and am running on raspbian OS. I tried modifying the group.initial.rebalance.delay.ms in the server properties but this did not help. The message I get in the server log is: [2022-05-28 00:29:43,989] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group streams-wiki-created-table in Empty state. Created a new member id streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator) [2022-05-28 00:29:44,055] INFO [GroupCoordinator 0]: Preparing to rebalance group streams-wiki-created-table in state PreparingRebalance with old generation 2 (__consumer_offsets-16) (reason: Adding new member streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e with group instance id None; client reason: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator) [2022-05-28 00:29:44,089] INFO [GroupCoordinator 0]: Stabilized group streams-wiki-created-table generation 3 (__consumer_offsets-16) with 1 members (kafka.coordinator.group.GroupCoordinator) [2022-05-28 00:29:44,458] INFO [GroupCoordinator 0]: Assignment received from leader streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e for group streams-wiki-created-table for generation 3. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator) [2022-05-28 00:29:44,955] INFO [GroupCoordinator 0]: Preparing to rebalance group streams-wiki-created-table in state PreparingRebalance with old generation 3 (__consumer_offsets-16) (reason: Removing member streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e on LeaveGroup; client reason: the consumer unsubscribed from all topics) (kafka.coordinator.group.GroupCoordinator) [2022-05-28 00:29:44,960] INFO [GroupCoordinator 0]: Group streams-wiki-created-table with generation 4 is now empty (__consumer_offsets-16) (kafka.coordinator.group.GroupCoordinator) [2022-05-28 00:29:44,998] INFO [GroupCoordinator 0]: Member MemberMetadata(memberId=streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e, groupInstanceId=None, clientId=streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer, clientHost=/127.0.0.1, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=List(stream)) has left group streams-wiki-created-table through explicit `LeaveGroup`; client reason: the consumer unsubscribed from all topics (kafka.coordinator.group.GroupCoordinator) My code is as following: properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wiki-created-table"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); TimeWindows monthWindow = TimeWindows.of(Duration.ofDays(30)); TimeWindows weekWindow = TimeWindows.of(Duration.ofDays(7)); TimeWindows dayWindow = TimeWindows.of(Duration.ofDays(1)); TimeWindows hourWindow = TimeWindows.of(Duration.ofHours(1)); StreamsBuilder builder = new StreamsBuilder(); KTable<String, Long> createdPagesUserTypeTable = builder.stream("temp-create-stream", Consumed.with(Serdes.String(), WikiEventSerdes.WikiEvent())) .selectKey((ignored, value) -> value.getUserType()).groupByKey().count(); Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, props); CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } Can someone please help me figure out what's wrong here? Thanks, Meir