Hi,

I'm trying to write a very basic Kafka streams consumer in Java.
Once I add a KTable, I see a message in the server log that I have been 
unsubscribed from all topics.
Doing the same with a KStream instead of KTable works fine for me.

I'm using Kafka version 3.2.0 (kafka_2.13-3.2.0) and am running on raspbian OS.

I tried modifying the group.initial.rebalance.delay.ms in the server properties 
but this did not help.

The message I get in the server log is:

[2022-05-28 00:29:43,989] INFO [GroupCoordinator 0]: Dynamic member with 
unknown member id joins group streams-wiki-created-table in Empty state. 
Created a new member id 
streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e
 and request the member to rejoin with this id. 
(kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,055] INFO [GroupCoordinator 0]: Preparing to rebalance 
group streams-wiki-created-table in state PreparingRebalance with old 
generation 2 (__consumer_offsets-16) (reason: Adding new member 
streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e
 with group instance id None; client reason: rebalance failed due to 'The group 
member needs to have a valid member id before actually entering a consumer 
group.' (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,089] INFO [GroupCoordinator 0]: Stabilized group 
streams-wiki-created-table generation 3 (__consumer_offsets-16) with 1 members 
(kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,458] INFO [GroupCoordinator 0]: Assignment received from 
leader 
streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e
 for group streams-wiki-created-table for generation 3. The group has 1 
members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,955] INFO [GroupCoordinator 0]: Preparing to rebalance 
group streams-wiki-created-table in state PreparingRebalance with old 
generation 3 (__consumer_offsets-16) (reason: Removing member 
streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e
 on LeaveGroup; client reason: the consumer unsubscribed from all topics) 
(kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,960] INFO [GroupCoordinator 0]: Group 
streams-wiki-created-table with generation 4 is now empty 
(__consumer_offsets-16) (kafka.coordinator.group.GroupCoordinator)
[2022-05-28 00:29:44,998] INFO [GroupCoordinator 0]: Member 
MemberMetadata(memberId=streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer-fa252bd8-62ea-4fc7-b012-b5db5f061e6e,
 groupInstanceId=None, 
clientId=streams-wiki-created-table-298e4b7e-351e-43d5-b3a3-77e00d07953e-StreamThread-1-consumer,
 clientHost=/127.0.0.1, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, 
supportedProtocols=List(stream)) has left group streams-wiki-created-table 
through explicit `LeaveGroup`; client reason: the consumer unsubscribed from 
all topics (kafka.coordinator.group.GroupCoordinator)


My code is as following:





        properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"streams-wiki-created-table");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

        TimeWindows monthWindow = TimeWindows.of(Duration.ofDays(30));
        TimeWindows weekWindow = TimeWindows.of(Duration.ofDays(7));
        TimeWindows dayWindow = TimeWindows.of(Duration.ofDays(1));
        TimeWindows hourWindow = TimeWindows.of(Duration.ofHours(1));

        StreamsBuilder builder = new StreamsBuilder();

        KTable<String, Long> createdPagesUserTypeTable = 
builder.stream("temp-create-stream", Consumed.with(Serdes.String(), 
WikiEventSerdes.WikiEvent()))
        .selectKey((ignored, value) -> 
value.getUserType()).groupByKey().count();


        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology, props);
        CountDownLatch latch = new CountDownLatch(1);


        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new 
Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }



Can someone please help me figure out what's wrong here?

Thanks,

Meir

Reply via email to