Andrey Polyakov created KAFKA-12759:
---------------------------------------

             Summary: Kafka consumers with static group membership won't 
consume from newly subscribed topics
                 Key: KAFKA-12759
                 URL: https://issues.apache.org/jira/browse/KAFKA-12759
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 2.8.0
            Reporter: Andrey Polyakov


We've recently started using static group membership and noticed that when 
adding a new topic to the subscription, it's not consumed from, regardless of 
how long the consumer is left to run. A workaround we have is shutting down all 
consumers in the group for longer than session.timeout.ms, then starting them 
back up. Is this expected behaviour or a bug?

Sample application:
{code:java}
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

public class Test {
  static volatile boolean shutdown = false;
  static final Object shutdownLock = new Object();

  public static void main(String[] args) {
    Runtime.getRuntime()
        .addShutdownHook(
            new Thread(
                () -> {
                  shutdown = true;
                  synchronized (shutdownLock) {
                    try {
                      shutdownLock.wait();
                    } catch (InterruptedException e) {
                      e.printStackTrace();
                    }
                  }
                }));

    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        ByteArrayDeserializer.class.getCanonicalName());
    props.put(
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        ByteArrayDeserializer.class.getCanonicalName());

    props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupID");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "300000"); // 5 min
    props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "instance1");

    KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);

    consumer.subscribe(Arrays.asList("topic1"));
    // consumer.subscribe(Arrays.asList("topic1", "topic2"));

    while (!shutdown) {
      ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofSeconds(5));
      System.out.println("poll() returned " + records.count() + " records");
    }

    System.out.println("Closing consumer");
    consumer.close();
    synchronized (shutdownLock) {
      shutdownLock.notifyAll();
      System.out.println("Done closing consumer");
    }
  }
}
{code}
Steps to reproduce:
 0. update bootstrap server config in example code
 1. run above application, which consumes from topic1
 2. send SIGTERM to process, cleaning closing the consumer
 3. modify code to consume from topic1 AND topic2
 4. run application again, and see that both topics appear in the logs as being 
part of the subscription, but they're never assigned, regardless of how long 
you let the consumer run.

Logs from first run (1 topic subscription):
{code:java}
ConsumerConfig values: 
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = consumer-myGroupID-instance1
        client.rack = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = myGroupID
        group.instance.id = instance1
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 300000
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

Kafka version: 2.8.0
Kafka commitId: ebb1d6e21cc92130
Kafka startTimeMs: 1620342287841
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] Subscribed to topic(s): topic1
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] Cluster ID: AHajHZ_3QqyBJlGdr9Gwwg
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] Discovered group coordinator 10.86.24.3:9092 (id: 2147483646 
rack: null)
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] (Re-)joining group
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] Successfully joined group with generation 
Generation{generationId=1, 
memberId='instance1-6aac9f19-ad3e-46ee-8873-746e4a892c03', protocol='range'}
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] Finished assignment for group at generation 1: 
{instance1-6aac9f19-ad3e-46ee-8873-746e4a892c03=Assignment(partitions=[topic1-0])}
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] Successfully synced group in generation 
Generation{generationId=1, 
memberId='instance1-6aac9f19-ad3e-46ee-8873-746e4a892c03', protocol='range'}
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] Notifying assignor about the new 
Assignment(partitions=[topic1-0])
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] Adding newly assigned partitions: topic1-0
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] Found no committed offset for partition topic1-0
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] Resetting offset for partition topic1-0 to position 
FetchPosition{offset=3, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[10.86.31.131:9092 (id: 2 rack: 
null)], epoch=0}}.
poll() returned 0 records
poll() returned 0 records
poll() returned 0 records
poll() returned 0 records
poll() returned 0 records
Closing consumer
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] Revoke previously assigned partitions topic1-0
Metrics scheduler closed
Closing reporter org.apache.kafka.common.metrics.JmxReporter
Metrics reporters closed
App info kafka.consumer for consumer-myGroupID-instance1 unregistered
Done closing consumer

Process finished with exit code 130 (interrupted by signal 2: SIGINT)
{code}
Logs from second run (2 topic subscription):
{code:java}
ConsumerConfig values: 
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = consumer-myGroupID-instance1
        client.rack = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = myGroupID
        group.instance.id = instance1
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 300000
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

Kafka version: 2.8.0
Kafka commitId: ebb1d6e21cc92130
Kafka startTimeMs: 1620342351702
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] Subscribed to topic(s): topic1, topic2
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] Cluster ID: AHajHZ_3QqyBJlGdr9Gwwg
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] Discovered group coordinator 10.86.24.3:9092 (id: 2147483646 
rack: null)
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] (Re-)joining group
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] Successfully joined group with generation 
Generation{generationId=1, 
memberId='instance1-9fdf45a9-04bc-4056-91ec-07d5a64a2a86', protocol='range'}
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] Successfully synced group in generation 
Generation{generationId=1, 
memberId='instance1-9fdf45a9-04bc-4056-91ec-07d5a64a2a86', protocol='range'}
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] Notifying assignor about the new 
Assignment(partitions=[topic1-0])
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] Adding newly assigned partitions: topic1-0
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] Setting offset for partition topic1-0 to the committed 
offset FetchPosition{offset=3, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[10.86.31.131:9092 (id: 2 rack: 
null)], epoch=0}}
poll() returned 0 records
poll() returned 0 records
poll() returned 0 records
poll() returned 0 records
poll() returned 0 records
Closing consumer
[Consumer instanceId=instance1, clientId=consumer-myGroupID-instance1, 
groupId=myGroupID] Revoke previously assigned partitions topic1-0
Metrics scheduler closed
Closing reporter org.apache.kafka.common.metrics.JmxReporter
Metrics reporters closed
App info kafka.consumer for consumer-myGroupID-instance1 unregistered
Done closing consumer

Process finished with exit code 130 (interrupted by signal 2: SIGINT)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to