Hello Meir, >From the code snippet I cannot find where did you add a KTable, it seems you created a KStream from the source topic, and aggregate the stream into a KTable, could you show me the code difference between "adding a KTable" v.s. "adding a KStream"?
Anyways, the log line should only happen when `unsubscribe` is explicitly called on the consumer which would happen only for two cases: 1) the instance is shutting down (potentially due to an exception), 2) the instance is handling a task-migrated exception. In either case you should see other log lines on INFO/WARN indicating the cases. I suspect your code has something that throws an exception right upon starting up that caused it to shutdown (i.e. case 1) but that should be easily confirmed from the other log lines. Guozhang On Fri, May 27, 2022 at 2:31 PM Meir Goldenberg <meirg...@hotmail.com> wrote: > Hi, > > > I'm trying to write a very basic Kafka streams consumer in Java. > Once I add a KTable, I see a message in the server log that I have been > unsubscribed from all topics. > Doing the same with a KStream instead of KTable works fine for me. > > I'm using Kafka version 3.2.0 (kafka_2.13-3.2.0) and am running on > raspbian OS. > > I tried modifying the group.initial.rebalance.delay.ms in the server > properties but this did not help. > > The message I get in the server log is: > > [2022-05-28 00:29:43,989] INFO [GroupCoordinator 0]: Dynamic member with > unknown member id joins group streams-wiki-created-table in Empty state. > Created a new member id > streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e > and request the member to rejoin with this id. > (kafka.coordinator.group.GroupCoordinator) > [2022-05-28 00:29:44,055] INFO [GroupCoordinator 0]: Preparing to > rebalance group streams-wiki-created-table in state PreparingRebalance with > old generation 2 (__consumer_offsets-16) (reason: Adding new member > streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e > with group instance id None; client reason: rebalance failed due to 'The > group member needs to have a valid member id before actually entering a > consumer group.' (MemberIdRequiredException)) > (kafka.coordinator.group.GroupCoordinator) > [2022-05-28 00:29:44,089] INFO [GroupCoordinator 0]: Stabilized group > streams-wiki-created-table generation 3 (__consumer_offsets-16) with 1 > members (kafka.coordinator.group.GroupCoordinator) > [2022-05-28 00:29:44,458] INFO [GroupCoordinator 0]: Assignment received > from leader > streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e > for group streams-wiki-created-table for generation 3. The group has 1 > members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator) > [2022-05-28 00:29:44,955] INFO [GroupCoordinator 0]: Preparing to > rebalance group streams-wiki-created-table in state PreparingRebalance with > old generation 3 (__consumer_offsets-16) (reason: Removing member > streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e > on LeaveGroup; client reason: the consumer unsubscribed from all topics) > (kafka.coordinator.group.GroupCoordinator) > [2022-05-28 00:29:44,960] INFO [GroupCoordinator 0]: Group > streams-wiki-created-table with generation 4 is now empty > (__consumer_offsets-16) (kafka.coordinator.group.GroupCoordinator) > [2022-05-28 00:29:44,998] INFO [GroupCoordinator 0]: Member > MemberMetadata(memberId=streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e, > groupInstanceId=None, > clientId=streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer, > clientHost=/127.0.0.1, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, > supportedProtocols=List(stream)) has left group streams-wiki-created-table > through explicit `LeaveGroup`; client reason: the consumer unsubscribed > from all topics (kafka.coordinator.group.GroupCoordinator) > > > My code is as following: > > > > > > properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "streams-wiki-created-table"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); > props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); > > TimeWindows monthWindow = TimeWindows.of(Duration.ofDays(30)); > TimeWindows weekWindow = TimeWindows.of(Duration.ofDays(7)); > TimeWindows dayWindow = TimeWindows.of(Duration.ofDays(1)); > TimeWindows hourWindow = TimeWindows.of(Duration.ofHours(1)); > > StreamsBuilder builder = new StreamsBuilder(); > > KTable<String, Long> createdPagesUserTypeTable = > builder.stream("temp-create-stream", Consumed.with(Serdes.String(), > WikiEventSerdes.WikiEvent())) > .selectKey((ignored, value) -> > value.getUserType()).groupByKey().count(); > > > Topology topology = builder.build(); > KafkaStreams streams = new KafkaStreams(topology, props); > CountDownLatch latch = new CountDownLatch(1); > > > // attach shutdown handler to catch control-c > Runtime.getRuntime().addShutdownHook(new > Thread("streams-shutdown-hook") { > @Override > public void run() { > streams.close(); > latch.countDown(); > } > }); > > try { > streams.start(); > latch.await(); > } catch (Throwable e) { > System.exit(1); > } > System.exit(0); > } > > > > Can someone please help me figure out what's wrong here? > > Thanks, > > Meir > -- -- Guozhang