[ 
https://issues.apache.org/jira/browse/KAFKA-18818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shubham Raj updated KAFKA-18818:
--------------------------------
    Description: 
We have observed a significant delay in metadata updates after performing 
asynchronous Admin Client operations (such as {{{}createTopics{}}}, 
{{{}deleteTopics{}}}, {{{}createAcls{}}}, and {{{}deleteAcls, 
changeTopicConfigs{}}}) using both the Confluent Python library and the Apache 
Kafka Java library when running Kafka in KRaft mode.

*Issue Details:*

When using the Admin Client to perform operations, we receive a {{Future}} map 
as the calls are asynchronous. Upon waiting for the {{Future}} results, we 
expect the operations to succeed, and the changes should be immediately 
reflected in the metadata. This behavior is consistent with what we observe 
when running Kafka in ZooKeeper mode.

However, in KRaft mode, even after {{future.result()}} completes, the operation 
results are not instantly reflected in the metadata. We have observed an 
additional delay before the metadata is updated to reflect the changes. In our 
case, this delay is approximately 500ms, but it could vary depending on the 
cluster load.

*Impact:*

This delay is affecting our workflows where Kafka users need to perform Admin 
Client operations and expect immediate metadata updates. The additional wait 
time introduces latency and could potentially impact time-sensitive operations.

*Steps performed :*
 * Called {{AdminClient.createTopics}} to create a test topic and waited on its 
KafkaFuture result, the future result returns null to denote a topic creation ( 
*[CreateTopicsResult 
ref|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/admin/CreateTopicsResult.html]*
 ).
 * Started a loop to try and describe the newly created topic ( to validate 
that the topic is present in Kafka's metadata ).
Noted that the topic does not get immediately reflected in metadata, as we 
receive an _*{{UnknownTopicOrPartitionException}}*_

 * Checked and noted the number of iteration cycles it takes to properly 
reflect the newly created topic in metadata.
In the below test the createTopics operation takes *407 ms* for the topic to 
properly appear in metadata ( which is additional time client needs to wait 
after the KafkaFuture resolves ).
 
 * *Kafka version: 3.9.0*
 * *Java library version: 2.8.0*

{code:java}
package kafkautils.trycode;

import java.util.*;

import local.kafka.KafkaConfig;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.logging.Logger;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.acl.*;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;

public class TryAdminClient {    
    /** The Logger. **/
    private static final Logger LOG 
=LogFactory.getLogger(TryAdminClient.class);    
    private static final String CLUSTER = "alpha";    
    private static final String MAIN_RESOURCE_PREFIX = 
"testkafka.test_cluster_operations_delay.java_client.";   
    private static final Admin adminClient;

    static {
        Properties adminProps = new Properties();
        adminProps.put(
            AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
            KafkaConfig.makeKafkaConfig(CLUSTER).getBootStrapServersWithPort()
        );        adminProps.put("security.protocol", "SASL_PLAINTEXT");
        adminProps.put("sasl.mechanism", "GSSAPI");
        adminProps.put("sasl.kerberos.service.name", "kafka");
        adminProps.put("sasl.kerberos.init.cmd", "/usr/local/bin/skinit 
--quiet");
        adminProps.put("sun.security.jgss.native", "true");
        adminProps.put("sun.security.jgss.lib", "/usr/libexec/libgsswrap.so");
        adminProps.put("javax.security.auth.useSubjectCredsOnly", "false");
        adminProps.put("java.security.auth.login.config",
                  
"/data/middleware/kafkautils/etc/config/kafka/kafka_client_jaas.conf");
        adminClient = Admin.create(adminProps);
    }   

 public static void main(String[] args) throws ExecutionException, 
InterruptedException 
 {
        TryAdminClient client  = new TryAdminClient();
        String topic           = MAIN_RESOURCE_PREFIX + 
"single_topic_operation_with_admin_client_test.a.1";
        var newTopic           = new NewTopic(topic, 1, 
(short)3).configs(Map.of("cleanup.policy",  "delete"));
        var createTopicsResult = 
adminClient.createTopics(List.of(newTopic)).all().get();
        LOG.info("CreateTopics finished with result : " + createTopicsResult);
        long queryStartTimestamp = System.currentTimeMillis();

        while (true) {
            try {
                var describeTopicsResult = 
adminClient.describeTopics(List.of(topic)).all().get().get(topic);
                LOG.info("DescribeTopics returned result : " + 
describeTopicsResult);
            if (describeTopicsResult != null)
                    break;
            } catch (ExecutionException err) {
                LOG.warning("DescribeTopics operation failed with error : " + 
err);
            }
        }
        LOG.info(String.format(
            "Describe operation took = %d ms",
            System.currentTimeMillis() - queryStartTimestamp
        ));
    }
} {code}
 

*logs we got*
{code:java}
[20250214 07:11:43.514 EST (main) kafkautils.trycode.TryAdminClient#main INFO] 
CreateTopics finished with result : null

[20250214 07:11:43.538 EST (main) kafkautils.trycode.TryAdminClient#main 
WARNING] DescribeTopics operation failed with error : 
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition... repeated 146 times 

[20250214 07:11:43.938 EST (main) kafkautils.trycode.TryAdminClient#main INFO] 
DescribeTopics returned result : 
(name=testkafka.test_cluster_operations_delay.java_client.single_topic_operation_with_admin_client_test.a.1,
 internal=false, partitions=(partition=0, 
leader=qtekafka-alpha-02.tbd.local.com:9092 (id: 6 rack: tbd.local.com), 
replicas=qtekafka-alpha-02.tbd.local.com:9092 (id: 6 rack: tbd.local.com), 
qtekafka-alpha-02.dr.local.com:9092 (id: 4 rack: dr.local.com), 
qtekafka-alpha-02.nyc.local.com:9092 (id: 2 rack: nyc.local.com), 
isr=qtekafka-alpha-02.tbd.local.com:9092 (id: 6 rack: tbd.local.com), 
qtekafka-alpha-02.dr.local.com:9092 (id: 4 rack: dr.local.com), 
qtekafka-alpha-02.nyc.local.com:9092 (id: 2 rack: nyc.local.com)), 
authorizedOperations=null)

[20250214 07:11:43.939 EST (main) kafkautils.trycode.TryAdminClient#main INFO] 
Describe operation took = 407 ms{code}

  was:
We have observed a significant delay in metadata updates after performing 
asynchronous Admin Client operations (such as {{{}createTopics{}}}, 
{{{}deleteTopics{}}}, {{{}createAcls{}}}, and {{{}deleteAcls, 
changeTopicConfigs{}}}) using both the Confluent Python library and the Apache 
Kafka Java library when running Kafka in KRaft mode.

*Issue Details:*

When using the Admin Client to perform operations, we receive a {{Future}} map 
as the calls are asynchronous. Upon waiting for the {{Future}} results, we 
expect the operations to succeed, and the changes should be immediately 
reflected in the metadata. This behavior is consistent with what we observe 
when running Kafka in ZooKeeper mode.

However, in KRaft mode, even after {{future.result()}} completes, the operation 
results are not instantly reflected in the metadata. We have observed an 
additional delay before the metadata is updated to reflect the changes. In our 
case, this delay is approximately 500ms, but it could vary depending on the 
cluster load.

*Impact:*

This delay is affecting our workflows where Kafka users need to perform Admin 
Client operations and expect immediate metadata updates. The additional wait 
time introduces latency and could potentially impact time-sensitive operations.

*Steps performed :*
 * Called {{AdminClient.createTopics}} to create a test topic and waited on its 
KafkaFuture result, the future result returns null to denote a topic creation ( 
*[CreateTopicsResult 
ref|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/admin/CreateTopicsResult.html]*
 ).
 * Started a loop to try and describe the newly created topic ( to validate 
that the topic is present in Kafka's metadata ).
Noted that the topic does not get immediately reflected in metadata, as we 
receive an _*{{UnknownTopicOrPartitionException}}*_

 * Checked and noted the number of iteration cycles it takes to properly 
reflect the newly created topic in metadata.
In the below test the createTopics operation takes *407 ms* for the topic to 
properly appear in metadata ( which is additional time client needs to wait 
after the KafkaFuture resolves ).
 
 * *Kafka version: 3.9.0*
 * *Java library version: 2.8.0*

{code:java}
package deshaw.kafkautils.trycode;import java.util.*;

import deshaw.kafka.KafkaConfig;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.logging.Logger;

import deshaw.common.util.LogFactory;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.acl.*;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;

public class TryAdminClient {    
    /** The Logger. **/
    private static final Logger LOG 
=LogFactory.getLogger(TryAdminClient.class);    
    private static final String CLUSTER = "alpha";    
    private static final String MAIN_RESOURCE_PREFIX = 
"testkafka.test_cluster_operations_delay.java_client.";   
    private static final Admin adminClient;

    static {
        Properties adminProps = new Properties();
        adminProps.put(
            AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
            KafkaConfig.makeKafkaConfig(CLUSTER).getBootStrapServersWithPort()
        );        adminProps.put("security.protocol", "SASL_PLAINTEXT");
        adminProps.put("sasl.mechanism", "GSSAPI");
        adminProps.put("sasl.kerberos.service.name", "kafka");
        adminProps.put("sasl.kerberos.init.cmd", "/usr/local/bin/skinit 
--quiet");
        adminProps.put("sun.security.jgss.native", "true");
        adminProps.put("sun.security.jgss.lib", "/usr/libexec/libgsswrap.so");
        adminProps.put("javax.security.auth.useSubjectCredsOnly", "false");
        adminProps.put("java.security.auth.login.config",
                  
"/data/middleware/kafkautils/etc/config/kafka/kafka_client_jaas.conf");
        adminClient = Admin.create(adminProps);
    }   

 public static void main(String[] args) throws ExecutionException, 
InterruptedException 
 {
        TryAdminClient client  = new TryAdminClient();
        String topic           = MAIN_RESOURCE_PREFIX + 
"single_topic_operation_with_admin_client_test.a.1";
        var newTopic           = new NewTopic(topic, 1, 
(short)3).configs(Map.of("cleanup.policy",  "delete"));
        var createTopicsResult = 
adminClient.createTopics(List.of(newTopic)).all().get();
        LOG.info("CreateTopics finished with result : " + createTopicsResult);
        long queryStartTimestamp = System.currentTimeMillis();

        while (true) {
            try {
                var describeTopicsResult = 
adminClient.describeTopics(List.of(topic)).all().get().get(topic);
                LOG.info("DescribeTopics returned result : " + 
describeTopicsResult);
            if (describeTopicsResult != null)
                    break;
            } catch (ExecutionException err) {
                LOG.warning("DescribeTopics operation failed with error : " + 
err);
            }
        }
        LOG.info(String.format(
            "Describe operation took = %d ms",
            System.currentTimeMillis() - queryStartTimestamp
        ));
    }
} {code}
 

*logs we got*
{code:java}
[20250214 07:11:43.514 EST (main) deshaw.kafkautils.trycode.TryAdminClient#main 
INFO] CreateTopics finished with result : null

[20250214 07:11:43.538 EST (main) deshaw.kafkautils.trycode.TryAdminClient#main 
WARNING] DescribeTopics operation failed with error : 
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition... repeated 146 times 

[20250214 07:11:43.938 EST (main) deshaw.kafkautils.trycode.TryAdminClient#main 
INFO] DescribeTopics returned result : 
(name=testkafka.test_cluster_operations_delay.java_client.single_topic_operation_with_admin_client_test.a.1,
 internal=false, partitions=(partition=0, 
leader=qtekafka-alpha-02.tbd.deshaw.com:9092 (id: 6 rack: tbd.deshaw.com), 
replicas=qtekafka-alpha-02.tbd.deshaw.com:9092 (id: 6 rack: tbd.deshaw.com), 
qtekafka-alpha-02.dr.deshaw.com:9092 (id: 4 rack: dr.deshaw.com), 
qtekafka-alpha-02.nyc.deshaw.com:9092 (id: 2 rack: nyc.deshaw.com), 
isr=qtekafka-alpha-02.tbd.deshaw.com:9092 (id: 6 rack: tbd.deshaw.com), 
qtekafka-alpha-02.dr.deshaw.com:9092 (id: 4 rack: dr.deshaw.com), 
qtekafka-alpha-02.nyc.deshaw.com:9092 (id: 2 rack: nyc.deshaw.com)), 
authorizedOperations=null)

[20250214 07:11:43.939 EST (main) deshaw.kafkautils.trycode.TryAdminClient#main 
INFO] Describe operation took = 407 ms{code}


> Significant delay in Metadata Update After Admin Client Operations in KRaft 
> Mode
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-18818
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18818
>             Project: Kafka
>          Issue Type: Bug
>          Components: kraft
>    Affects Versions: 3.9.0
>            Reporter: Shubham Raj
>            Priority: Major
>
> We have observed a significant delay in metadata updates after performing 
> asynchronous Admin Client operations (such as {{{}createTopics{}}}, 
> {{{}deleteTopics{}}}, {{{}createAcls{}}}, and {{{}deleteAcls, 
> changeTopicConfigs{}}}) using both the Confluent Python library and the 
> Apache Kafka Java library when running Kafka in KRaft mode.
> *Issue Details:*
> When using the Admin Client to perform operations, we receive a {{Future}} 
> map as the calls are asynchronous. Upon waiting for the {{Future}} results, 
> we expect the operations to succeed, and the changes should be immediately 
> reflected in the metadata. This behavior is consistent with what we observe 
> when running Kafka in ZooKeeper mode.
> However, in KRaft mode, even after {{future.result()}} completes, the 
> operation results are not instantly reflected in the metadata. We have 
> observed an additional delay before the metadata is updated to reflect the 
> changes. In our case, this delay is approximately 500ms, but it could vary 
> depending on the cluster load.
> *Impact:*
> This delay is affecting our workflows where Kafka users need to perform Admin 
> Client operations and expect immediate metadata updates. The additional wait 
> time introduces latency and could potentially impact time-sensitive 
> operations.
> *Steps performed :*
>  * Called {{AdminClient.createTopics}} to create a test topic and waited on 
> its KafkaFuture result, the future result returns null to denote a topic 
> creation ( *[CreateTopicsResult 
> ref|https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/admin/CreateTopicsResult.html]*
>  ).
>  * Started a loop to try and describe the newly created topic ( to validate 
> that the topic is present in Kafka's metadata ).
> Noted that the topic does not get immediately reflected in metadata, as we 
> receive an _*{{UnknownTopicOrPartitionException}}*_
>  * Checked and noted the number of iteration cycles it takes to properly 
> reflect the newly created topic in metadata.
> In the below test the createTopics operation takes *407 ms* for the topic to 
> properly appear in metadata ( which is additional time client needs to wait 
> after the KafkaFuture resolves ).
>  
>  * *Kafka version: 3.9.0*
>  * *Java library version: 2.8.0*
> {code:java}
> package kafkautils.trycode;
> import java.util.*;
> import local.kafka.KafkaConfig;
> import java.util.concurrent.Callable;
> import java.util.concurrent.ExecutionException;
> import java.util.function.Function;
> import java.util.logging.Logger;
> import org.apache.kafka.clients.admin.*;
> import org.apache.kafka.common.acl.*;
> import org.apache.kafka.common.config.ConfigResource;
> import org.apache.kafka.common.resource.PatternType;
> import org.apache.kafka.common.resource.ResourcePattern;
> import org.apache.kafka.common.resource.ResourcePatternFilter;
> import org.apache.kafka.common.resource.ResourceType;
> public class TryAdminClient {    
>     /** The Logger. **/
>     private static final Logger LOG 
> =LogFactory.getLogger(TryAdminClient.class);    
>     private static final String CLUSTER = "alpha";    
>     private static final String MAIN_RESOURCE_PREFIX = 
> "testkafka.test_cluster_operations_delay.java_client.";   
>     private static final Admin adminClient;
>     static {
>         Properties adminProps = new Properties();
>         adminProps.put(
>             AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
>             KafkaConfig.makeKafkaConfig(CLUSTER).getBootStrapServersWithPort()
>         );        adminProps.put("security.protocol", "SASL_PLAINTEXT");
>         adminProps.put("sasl.mechanism", "GSSAPI");
>         adminProps.put("sasl.kerberos.service.name", "kafka");
>         adminProps.put("sasl.kerberos.init.cmd", "/usr/local/bin/skinit 
> --quiet");
>         adminProps.put("sun.security.jgss.native", "true");
>         adminProps.put("sun.security.jgss.lib", "/usr/libexec/libgsswrap.so");
>         adminProps.put("javax.security.auth.useSubjectCredsOnly", "false");
>         adminProps.put("java.security.auth.login.config",
>                   
> "/data/middleware/kafkautils/etc/config/kafka/kafka_client_jaas.conf");
>         adminClient = Admin.create(adminProps);
>     }   
>  public static void main(String[] args) throws ExecutionException, 
> InterruptedException 
>  {
>         TryAdminClient client  = new TryAdminClient();
>         String topic           = MAIN_RESOURCE_PREFIX + 
> "single_topic_operation_with_admin_client_test.a.1";
>         var newTopic           = new NewTopic(topic, 1, 
> (short)3).configs(Map.of("cleanup.policy",  "delete"));
>         var createTopicsResult = 
> adminClient.createTopics(List.of(newTopic)).all().get();
>         LOG.info("CreateTopics finished with result : " + createTopicsResult);
>         long queryStartTimestamp = System.currentTimeMillis();
>         while (true) {
>             try {
>                 var describeTopicsResult = 
> adminClient.describeTopics(List.of(topic)).all().get().get(topic);
>                 LOG.info("DescribeTopics returned result : " + 
> describeTopicsResult);
>             if (describeTopicsResult != null)
>                     break;
>             } catch (ExecutionException err) {
>                 LOG.warning("DescribeTopics operation failed with error : " + 
> err);
>             }
>         }
>         LOG.info(String.format(
>             "Describe operation took = %d ms",
>             System.currentTimeMillis() - queryStartTimestamp
>         ));
>     }
> } {code}
>  
> *logs we got*
> {code:java}
> [20250214 07:11:43.514 EST (main) kafkautils.trycode.TryAdminClient#main 
> INFO] CreateTopics finished with result : null
> [20250214 07:11:43.538 EST (main) kafkautils.trycode.TryAdminClient#main 
> WARNING] DescribeTopics operation failed with error : 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition... repeated 146 times 
> [20250214 07:11:43.938 EST (main) kafkautils.trycode.TryAdminClient#main 
> INFO] DescribeTopics returned result : 
> (name=testkafka.test_cluster_operations_delay.java_client.single_topic_operation_with_admin_client_test.a.1,
>  internal=false, partitions=(partition=0, 
> leader=qtekafka-alpha-02.tbd.local.com:9092 (id: 6 rack: tbd.local.com), 
> replicas=qtekafka-alpha-02.tbd.local.com:9092 (id: 6 rack: tbd.local.com), 
> qtekafka-alpha-02.dr.local.com:9092 (id: 4 rack: dr.local.com), 
> qtekafka-alpha-02.nyc.local.com:9092 (id: 2 rack: nyc.local.com), 
> isr=qtekafka-alpha-02.tbd.local.com:9092 (id: 6 rack: tbd.local.com), 
> qtekafka-alpha-02.dr.local.com:9092 (id: 4 rack: dr.local.com), 
> qtekafka-alpha-02.nyc.local.com:9092 (id: 2 rack: nyc.local.com)), 
> authorizedOperations=null)
> [20250214 07:11:43.939 EST (main) kafkautils.trycode.TryAdminClient#main 
> INFO] Describe operation took = 407 ms{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to