[jira] [Commented] (KAFKA-6898) org.apache.kafka.common.errors.TimeoutException

2021-06-01 Thread Luca Borzani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354918#comment-17354918
 ] 

Luca Borzani commented on KAFKA-6898:
-

Is anyone working on this? Is it maybe fixed in a newer version? Still occurs 
in 0.11

> org.apache.kafka.common.errors.TimeoutException
> ---
>
> Key: KAFKA-6898
> URL: https://issues.apache.org/jira/browse/KAFKA-6898
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.2.0
> Environment: Production
>Reporter: Rishi
>Priority: Major
>
> Getting error 
> {code:java}
> org.apache.kafka.common.errors.TimeoutException Failed to allocate memory 
> within the configured max blocking time 59927 ms.{code}
> while publishing events to Kafka. We are using Kafka Java client 0.10.2.0 
> with Kafka 0.10.1.0 broker.
> This issue does not happen always but after certain time of applications 
> running in service, it starts happening and applications never recover from 
> this state until the producer instance is restarted.
> The configuration of producer and on Kafka broker is default and hasn't been 
> changed. What should be the course of action for this issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on pull request #10785: KIP-708 / A Rack awareness for Kafka Streams

2021-06-01 Thread GitBox


cadonna commented on pull request #10785:
URL: https://github.com/apache/kafka/pull/10785#issuecomment-851926961


   As far as I can see from skimming the PR, you could have a PR for the 
standby task assignors with corresponding unit tests without changing any 
existing public functionality and without adding any partial public 
functionality. So I guess, it would be fine to have a PR for the standby task 
assignors. To give context to the reviewers, you should motivate the change in 
the PR description.
   
   Maybe you can do similar with the subscription changes and with some other 
aspects of this PR. Probably, it would be good to have the config PR that makes 
the functionality available to the public last, so that the functionality is 
never partly publicly available.
   
   I do not know the details of  #7170, but most people with which I have 
talked about PR sizes agree that if it is possible to split them into smaller 
pieces, we should do it.
   
   Note, that the title of the PR should start with the issue ID of the JIRA 
tickets which in this case would be KAFKA-6718. In this way a link to the PR 
will show up on the ticket. You can also open multiple PRs whose title starts 
with KAFKA-6718. A link for each PR will then show up in the ticket.   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on pull request #10785: KIP-708 / A Rack awareness for Kafka Streams

2021-06-01 Thread GitBox


lkokhreidze commented on pull request #10785:
URL: https://github.com/apache/kafka/pull/10785#issuecomment-851937769


   Sounds good, will try to split the PR into smaller chunks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #10770: MINOR: fix code listings security.html

2021-06-01 Thread GitBox


cadonna commented on a change in pull request #10770:
URL: https://github.com/apache/kafka/pull/10770#discussion_r642904792



##
File path: docs/security.html
##
@@ -384,56 +384,56 @@ SSL key and certificates in PEM format
 ssl.key.password=test1234
 
 Other configuration settings that may also be needed depending on 
our requirements and the broker configuration:
-
-ssl.provider (Optional). The name of the security 
provider used for SSL connections. Default value is the default security 
provider of the JVM.
-ssl.cipher.suites (Optional). A cipher suite is a 
named combination of authentication, encryption, MAC and key exchange algorithm 
used to negotiate the security settings for a network connection using TLS or 
SSL network protocol.
-ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1. It should 
list at least one of the protocols configured on the broker side
-ssl.truststore.type=JKS
-ssl.keystore.type=JKS
-
-
+
+ssl.provider (Optional). The name of the security provider 
used for SSL connections. Default value is the default security provider of the 
JVM.
+ssl.cipher.suites (Optional). A cipher suite is a named 
combination of authentication, encryption, MAC and key exchange algorithm used 
to negotiate the security settings for a network connection using TLS or SSL 
network protocol.
+ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1. It should 
list at least one of the protocols configured on the broker side
+ssl.truststore.type=JKS
+ssl.keystore.type=JKS
+
+
 Examples using console-producer and console-consumer:
-kafka-console-producer.sh --bootstrap-server 
localhost:9093 --topic test --producer.config client-ssl.properties
-kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test 
--consumer.config client-ssl.properties
+> 
kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test 
--producer.config client-ssl.properties
+> kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test 
--consumer.config client-ssl.properties
 
 
 7.3 Authentication using 
SASL
 
 
-JAAS 
configuration
-Kafka uses the Java Authentication and Authorization Service
-(https://docs.oracle.com/javase/8/docs/technotes/guides/security/jaas/JAASRefGuide.html";>JAAS)
-for SASL configuration.
-
-JAAS configuration for Kafka 
brokers
-
-KafkaServer is the section name in the JAAS file used 
by each
-KafkaServer/Broker. This section provides SASL configuration 
options
-for the broker including any SASL client connections made by the 
broker
-for inter-broker communication. If multiple listeners are 
configured to use
-SASL, the section name may be prefixed with the listener name in 
lower-case
-followed by a period, e.g. sasl_ssl.KafkaServer.
-
-Client section is used to authenticate a SASL 
connection with
-zookeeper. It also allows the brokers to set SASL ACL on zookeeper
-nodes which locks these nodes down so that only the brokers can
-modify it. It is necessary to have the same principal name across 
all
-brokers. If you want to use a section name other than Client, set 
the
-system property zookeeper.sasl.clientconfig to the 
appropriate
-name (e.g., 
-Dzookeeper.sasl.clientconfig=ZkClient).
-
-ZooKeeper uses "zookeeper" as the service name by default. If 
you
-want to change this, set the system property
-zookeeper.sasl.client.username to the appropriate name
-(e.g., -Dzookeeper.sasl.client.username=zk).
-
-Brokers may also configure JAAS using the broker configuration 
property sasl.jaas.config.
-The property name must be prefixed with the listener prefix 
including the SASL mechanism,
-i.e. 
listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config. 
Only one
-login module may be specified in the config value. If multiple 
mechanisms are configured on a
-listener, configs must be provided for each mechanism using the 
listener and mechanism prefix.
-For example,
+JAAS 
configuration
+Kafka uses the Java Authentication and Authorization Service
+(https://docs.oracle.com/javase/8/docs/technotes/guides/security/jaas/JAASRefGuide.html";>JAAS)
+for SASL configuration.
+
+JAAS configuration for Kafka 
brokers
+
+KafkaServer is the section name in the JAAS 
file used by each
+KafkaServer/Broker. This section provides SASL 
configuration opt

[GitHub] [kafka] jlprat commented on pull request #10770: MINOR: fix code listings security.html

2021-06-01 Thread GitBox


jlprat commented on pull request #10770:
URL: https://github.com/apache/kafka/pull/10770#issuecomment-851963549


   Thanks @cadonna All comments addressed!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #10770: MINOR: fix code listings security.html

2021-06-01 Thread GitBox


jlprat commented on a change in pull request #10770:
URL: https://github.com/apache/kafka/pull/10770#discussion_r642923211



##
File path: docs/security.html
##
@@ -384,56 +384,56 @@ SSL key and certificates in PEM format
 ssl.key.password=test1234
 
 Other configuration settings that may also be needed depending on 
our requirements and the broker configuration:
-
-ssl.provider (Optional). The name of the security 
provider used for SSL connections. Default value is the default security 
provider of the JVM.
-ssl.cipher.suites (Optional). A cipher suite is a 
named combination of authentication, encryption, MAC and key exchange algorithm 
used to negotiate the security settings for a network connection using TLS or 
SSL network protocol.
-ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1. It should 
list at least one of the protocols configured on the broker side
-ssl.truststore.type=JKS
-ssl.keystore.type=JKS
-
-
+
+ssl.provider (Optional). The name of the security provider 
used for SSL connections. Default value is the default security provider of the 
JVM.
+ssl.cipher.suites (Optional). A cipher suite is a named 
combination of authentication, encryption, MAC and key exchange algorithm used 
to negotiate the security settings for a network connection using TLS or SSL 
network protocol.
+ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1. It should 
list at least one of the protocols configured on the broker side
+ssl.truststore.type=JKS
+ssl.keystore.type=JKS
+
+
 Examples using console-producer and console-consumer:
-kafka-console-producer.sh --bootstrap-server 
localhost:9093 --topic test --producer.config client-ssl.properties
-kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test 
--consumer.config client-ssl.properties
+> 
kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test 
--producer.config client-ssl.properties
+> kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test 
--consumer.config client-ssl.properties
 
 
 7.3 Authentication using 
SASL
 
 
-JAAS 
configuration
-Kafka uses the Java Authentication and Authorization Service
-(https://docs.oracle.com/javase/8/docs/technotes/guides/security/jaas/JAASRefGuide.html";>JAAS)
-for SASL configuration.
-
-JAAS configuration for Kafka 
brokers
-
-KafkaServer is the section name in the JAAS file used 
by each
-KafkaServer/Broker. This section provides SASL configuration 
options
-for the broker including any SASL client connections made by the 
broker
-for inter-broker communication. If multiple listeners are 
configured to use
-SASL, the section name may be prefixed with the listener name in 
lower-case
-followed by a period, e.g. sasl_ssl.KafkaServer.
-
-Client section is used to authenticate a SASL 
connection with
-zookeeper. It also allows the brokers to set SASL ACL on zookeeper
-nodes which locks these nodes down so that only the brokers can
-modify it. It is necessary to have the same principal name across 
all
-brokers. If you want to use a section name other than Client, set 
the
-system property zookeeper.sasl.clientconfig to the 
appropriate
-name (e.g., 
-Dzookeeper.sasl.clientconfig=ZkClient).
-
-ZooKeeper uses "zookeeper" as the service name by default. If 
you
-want to change this, set the system property
-zookeeper.sasl.client.username to the appropriate name
-(e.g., -Dzookeeper.sasl.client.username=zk).
-
-Brokers may also configure JAAS using the broker configuration 
property sasl.jaas.config.
-The property name must be prefixed with the listener prefix 
including the SASL mechanism,
-i.e. 
listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config. 
Only one
-login module may be specified in the config value. If multiple 
mechanisms are configured on a
-listener, configs must be provided for each mechanism using the 
listener and mechanism prefix.
-For example,
+JAAS 
configuration
+Kafka uses the Java Authentication and Authorization Service
+(https://docs.oracle.com/javase/8/docs/technotes/guides/security/jaas/JAASRefGuide.html";>JAAS)
+for SASL configuration.
+
+JAAS configuration for Kafka 
brokers
+
+KafkaServer is the section name in the JAAS 
file used by each
+KafkaServer/Broker. This section provides SASL 
configuration opti

[jira] [Created] (KAFKA-12870) RecordAccumulator stuck in a flushing state

2021-06-01 Thread Niclas Lockner (Jira)
Niclas Lockner created KAFKA-12870:
--

 Summary: RecordAccumulator stuck in a flushing state
 Key: KAFKA-12870
 URL: https://issues.apache.org/jira/browse/KAFKA-12870
 Project: Kafka
  Issue Type: Bug
  Components: producer , streams
Affects Versions: 2.8.0, 2.6.1
Reporter: Niclas Lockner


After a Kafka Stream with exactly once enabled has performed its first commit, 
the RecordAccumulator within the stream's internal producer gets stuck in a 
state where all subsequent ProducerBatches that get allocated are immediately 
flushed instead of being held in memory until they expire, regardless of the 
stream's linger or batch size config.

This is reproduced in the example code found at , 
which can be run with ./gradlew run --args=

The example has a producer that sends 1 record/sec to one topic, and a Kafka 
stream with EOS enabled that forwards the records from that topic to another 
topic with the configuration linger = 5 sec, commit interval = 10 sec.

 

The expected behavior when running the example is that the stream's 
ProducerBatches will expire (or get flushed because of the commit) every 5th 
second, and that the stream's producer will send a ProduceRequest every 5th 
second with an expired ProducerBatch that contains 5 records.

The actual behavior is that the ProducerBatch is made immediately available for 
the Sender, and the Sender sends one ProduceRequest for each record.

 

The example code contains a copy of the RecordAccumulator class (copied from 
kafka-clients 2.8.0) with some additional logging added to
 * RecordAccumulator#ready(Cluster, long)
 * RecordAccumulator#beginFlush()
 * RecordAccumulator#awaitFlushCompletion()

These log entries show
 * that the batches are considered sendable because a flush is in progress
 * that Sender.maybeSendAndPollTransactionalRequest() calls RecordAccumulator's 
beginFlush() without also calling awaitFlushCompletion(), and that this makes 
RecordAccumulator's flushesInProgress jump between 1-2 instead of the expected 
0-1.

 

This issue is not reproducible in version 2.3.1.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12870) RecordAccumulator stuck in a flushing state

2021-06-01 Thread Niclas Lockner (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Niclas Lockner updated KAFKA-12870:
---
Attachment: RecordAccumulator.log
full.log

> RecordAccumulator stuck in a flushing state
> ---
>
> Key: KAFKA-12870
> URL: https://issues.apache.org/jira/browse/KAFKA-12870
> Project: Kafka
>  Issue Type: Bug
>  Components: producer , streams
>Affects Versions: 2.6.1, 2.8.0
>Reporter: Niclas Lockner
>Priority: Major
> Attachments: RecordAccumulator.log, full.log
>
>
> After a Kafka Stream with exactly once enabled has performed its first 
> commit, the RecordAccumulator within the stream's internal producer gets 
> stuck in a state where all subsequent ProducerBatches that get allocated are 
> immediately flushed instead of being held in memory until they expire, 
> regardless of the stream's linger or batch size config.
> This is reproduced in the example code found at , 
> which can be run with ./gradlew run --args=
> The example has a producer that sends 1 record/sec to one topic, and a Kafka 
> stream with EOS enabled that forwards the records from that topic to another 
> topic with the configuration linger = 5 sec, commit interval = 10 sec.
>  
> The expected behavior when running the example is that the stream's 
> ProducerBatches will expire (or get flushed because of the commit) every 5th 
> second, and that the stream's producer will send a ProduceRequest every 5th 
> second with an expired ProducerBatch that contains 5 records.
> The actual behavior is that the ProducerBatch is made immediately available 
> for the Sender, and the Sender sends one ProduceRequest for each record.
>  
> The example code contains a copy of the RecordAccumulator class (copied from 
> kafka-clients 2.8.0) with some additional logging added to
>  * RecordAccumulator#ready(Cluster, long)
>  * RecordAccumulator#beginFlush()
>  * RecordAccumulator#awaitFlushCompletion()
> These log entries show
>  * that the batches are considered sendable because a flush is in progress
>  * that Sender.maybeSendAndPollTransactionalRequest() calls 
> RecordAccumulator's beginFlush() without also calling awaitFlushCompletion(), 
> and that this makes RecordAccumulator's flushesInProgress jump between 1-2 
> instead of the expected 0-1.
>  
> This issue is not reproducible in version 2.3.1.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12793) Client-side Circuit Breaker for Partition Write Errors

2021-06-01 Thread KahnCheny (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

KahnCheny updated KAFKA-12793:
--
Description: 
When Kafka is used to build data pipeline in mission critical business 
scenarios, availability and throughput are the most important operational goals 
that need to be maintained in presence of transient or permanent local failure. 
One typical situation that requires Ops intervention is disk failure, some 
partitions have long write latency caused by extremely high disk utilization; 
since all partitions share the same buffer under the current producer thread 
model, the buffer will be filled up quickly and eventually the good partitions 
are impacted as well. The cluster level success rate and timeout ratio will 
degrade until the local infrastructure issue is resolved.

One way to mitigate this issue is to add client side mechanism to short circuit 
problematic partitions during transient failure. Similar approach is applied in 
other distributed systems and RPC frameworks.



  was:
When Kafka is used to build data pipeline in mission critical business 
scenarios, availability and throughput are the most important operational goals 
that need to be maintained in presence of transient or permanent local failure. 
One typical situation that requires Ops intervention is disk failure, some 
partitions have long write latency caused by extremely high disk utilization; 
since all partitions share the same buffer under the current producer thread 
model, the buffer will be filled up quickly and eventually the good partitions 
are impacted as well. The cluster level success rate and timeout ratio will 
degrade until the local infrastructure issue is resolved.

One way to mitigate this issue is to add client side mechanism to short circuit 
problematic partitions during transient failure. Similar approach is applied in 
other distributed systems and RPC frameworks.

 

[link title|http://example.com]


> Client-side Circuit Breaker for Partition Write Errors
> --
>
> Key: KAFKA-12793
> URL: https://issues.apache.org/jira/browse/KAFKA-12793
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Reporter: KahnCheny
>Priority: Major
>
> When Kafka is used to build data pipeline in mission critical business 
> scenarios, availability and throughput are the most important operational 
> goals that need to be maintained in presence of transient or permanent local 
> failure. One typical situation that requires Ops intervention is disk 
> failure, some partitions have long write latency caused by extremely high 
> disk utilization; since all partitions share the same buffer under the 
> current producer thread model, the buffer will be filled up quickly and 
> eventually the good partitions are impacted as well. The cluster level 
> success rate and timeout ratio will degrade until the local infrastructure 
> issue is resolved.
> One way to mitigate this issue is to add client side mechanism to short 
> circuit problematic partitions during transient failure. Similar approach is 
> applied in other distributed systems and RPC frameworks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12793) Client-side Circuit Breaker for Partition Write Errors

2021-06-01 Thread KahnCheny (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

KahnCheny updated KAFKA-12793:
--
Description: 
When Kafka is used to build data pipeline in mission critical business 
scenarios, availability and throughput are the most important operational goals 
that need to be maintained in presence of transient or permanent local failure. 
One typical situation that requires Ops intervention is disk failure, some 
partitions have long write latency caused by extremely high disk utilization; 
since all partitions share the same buffer under the current producer thread 
model, the buffer will be filled up quickly and eventually the good partitions 
are impacted as well. The cluster level success rate and timeout ratio will 
degrade until the local infrastructure issue is resolved.

One way to mitigate this issue is to add client side mechanism to short circuit 
problematic partitions during transient failure. Similar approach is applied in 
other distributed systems and RPC frameworks.

KIP-693: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-693%3A+Client-side+Circuit+Breaker+for+Partition+Write+Errors|https://cwiki.apache.org/confluence/display/KAFKA/KIP-693%3A+Client-side+Circuit+Breaker+for+Partition+Write+Errors]

  was:
When Kafka is used to build data pipeline in mission critical business 
scenarios, availability and throughput are the most important operational goals 
that need to be maintained in presence of transient or permanent local failure. 
One typical situation that requires Ops intervention is disk failure, some 
partitions have long write latency caused by extremely high disk utilization; 
since all partitions share the same buffer under the current producer thread 
model, the buffer will be filled up quickly and eventually the good partitions 
are impacted as well. The cluster level success rate and timeout ratio will 
degrade until the local infrastructure issue is resolved.

One way to mitigate this issue is to add client side mechanism to short circuit 
problematic partitions during transient failure. Similar approach is applied in 
other distributed systems and RPC frameworks.




> Client-side Circuit Breaker for Partition Write Errors
> --
>
> Key: KAFKA-12793
> URL: https://issues.apache.org/jira/browse/KAFKA-12793
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Reporter: KahnCheny
>Priority: Major
>
> When Kafka is used to build data pipeline in mission critical business 
> scenarios, availability and throughput are the most important operational 
> goals that need to be maintained in presence of transient or permanent local 
> failure. One typical situation that requires Ops intervention is disk 
> failure, some partitions have long write latency caused by extremely high 
> disk utilization; since all partitions share the same buffer under the 
> current producer thread model, the buffer will be filled up quickly and 
> eventually the good partitions are impacted as well. The cluster level 
> success rate and timeout ratio will degrade until the local infrastructure 
> issue is resolved.
> One way to mitigate this issue is to add client side mechanism to short 
> circuit problematic partitions during transient failure. Similar approach is 
> applied in other distributed systems and RPC frameworks.
> KIP-693: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-693%3A+Client-side+Circuit+Breaker+for+Partition+Write+Errors|https://cwiki.apache.org/confluence/display/KAFKA/KIP-693%3A+Client-side+Circuit+Breaker+for+Partition+Write+Errors]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12870) RecordAccumulator stuck in a flushing state

2021-06-01 Thread Niclas Lockner (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Niclas Lockner updated KAFKA-12870:
---
Description: 
After a Kafka Stream with exactly once enabled has performed its first commit, 
the RecordAccumulator within the stream's internal producer gets stuck in a 
state where all subsequent ProducerBatches that get allocated are immediately 
flushed instead of being held in memory until they expire, regardless of the 
stream's linger or batch size config.

This is reproduced in the example code found at 
[https://github.com/niclaslockner/kafka-12870] which can be run with ./gradlew 
run --args=

The example has a producer that sends 1 record/sec to one topic, and a Kafka 
stream with EOS enabled that forwards the records from that topic to another 
topic with the configuration linger = 5 sec, commit interval = 10 sec.

 

The expected behavior when running the example is that the stream's 
ProducerBatches will expire (or get flushed because of the commit) every 5th 
second, and that the stream's producer will send a ProduceRequest every 5th 
second with an expired ProducerBatch that contains 5 records.

The actual behavior is that the ProducerBatch is made immediately available for 
the Sender, and the Sender sends one ProduceRequest for each record.

 

The example code contains a copy of the RecordAccumulator class (copied from 
kafka-clients 2.8.0) with some additional logging added to
 * RecordAccumulator#ready(Cluster, long)
 * RecordAccumulator#beginFlush()
 * RecordAccumulator#awaitFlushCompletion()

These log entries show (see the attached RecordsAccumulator.log)
 * that the batches are considered sendable because a flush is in progress
 * that Sender.maybeSendAndPollTransactionalRequest() calls RecordAccumulator's 
beginFlush() without also calling awaitFlushCompletion(), and that this makes 
RecordAccumulator's flushesInProgress jump between 1-2 instead of the expected 
0-1.

 

This issue is not reproducible in version 2.3.1.

 

 

  was:
After a Kafka Stream with exactly once enabled has performed its first commit, 
the RecordAccumulator within the stream's internal producer gets stuck in a 
state where all subsequent ProducerBatches that get allocated are immediately 
flushed instead of being held in memory until they expire, regardless of the 
stream's linger or batch size config.

This is reproduced in the example code found at , 
which can be run with ./gradlew run --args=

The example has a producer that sends 1 record/sec to one topic, and a Kafka 
stream with EOS enabled that forwards the records from that topic to another 
topic with the configuration linger = 5 sec, commit interval = 10 sec.

 

The expected behavior when running the example is that the stream's 
ProducerBatches will expire (or get flushed because of the commit) every 5th 
second, and that the stream's producer will send a ProduceRequest every 5th 
second with an expired ProducerBatch that contains 5 records.

The actual behavior is that the ProducerBatch is made immediately available for 
the Sender, and the Sender sends one ProduceRequest for each record.

 

The example code contains a copy of the RecordAccumulator class (copied from 
kafka-clients 2.8.0) with some additional logging added to
 * RecordAccumulator#ready(Cluster, long)
 * RecordAccumulator#beginFlush()
 * RecordAccumulator#awaitFlushCompletion()

These log entries show
 * that the batches are considered sendable because a flush is in progress
 * that Sender.maybeSendAndPollTransactionalRequest() calls RecordAccumulator's 
beginFlush() without also calling awaitFlushCompletion(), and that this makes 
RecordAccumulator's flushesInProgress jump between 1-2 instead of the expected 
0-1.

 

This issue is not reproducible in version 2.3.1.

 

 


> RecordAccumulator stuck in a flushing state
> ---
>
> Key: KAFKA-12870
> URL: https://issues.apache.org/jira/browse/KAFKA-12870
> Project: Kafka
>  Issue Type: Bug
>  Components: producer , streams
>Affects Versions: 2.6.1, 2.8.0
>Reporter: Niclas Lockner
>Priority: Major
> Attachments: RecordAccumulator.log, full.log
>
>
> After a Kafka Stream with exactly once enabled has performed its first 
> commit, the RecordAccumulator within the stream's internal producer gets 
> stuck in a state where all subsequent ProducerBatches that get allocated are 
> immediately flushed instead of being held in memory until they expire, 
> regardless of the stream's linger or batch size config.
> This is reproduced in the example code found at 
> [https://github.com/niclaslockner/kafka-12870] which can be run with 
> ./gradlew run --args=
> The example has a producer that sends 1 record/sec to one topic, and a Kafka 
> stream with EOS enabled that forwards the records from that topic 

[GitHub] [kafka] rajinisivaram commented on a change in pull request #10156: KAFKA-10345: File watch store reloading

2021-06-01 Thread GitBox


rajinisivaram commented on a change in pull request #10156:
URL: https://github.com/apache/kafka/pull/10156#discussion_r642939618



##
File path: clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
##
@@ -98,7 +99,7 @@ private ClientUtils() {
  *
  * @return configured ChannelBuilder based on the configs.
  */
-public static ChannelBuilder createChannelBuilder(AbstractConfig config, 
Time time, LogContext logContext) {
+public static ChannelBuilder createChannelBuilder(AbstractConfig config, 
Time time, LogContext logContext) throws IOException {

Review comment:
   Rather than propagating IOException everywhere, couldn't we use one of 
the Kafka exceptions that is not a checked exception?

##
File path: 
clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
##
@@ -70,7 +73,7 @@ public static ChannelBuilder clientChannelBuilder(
 String clientSaslMechanism,
 Time time,
 boolean saslHandshakeRequestEnable,
-LogContext logContext) {
+LogContext logContext) throws IOException {

Review comment:
   As before, IOException propagation seems too noisy

##
File path: 
clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java
##
@@ -127,6 +158,131 @@ public KeyStore truststore() {
 return this.truststore != null ? this.truststore.get() : null;
 }
 
+class SecurityFileChangeListener implements Runnable {
+private final Timer keyStoreRefreshTimer;
+private final Timer trustStoreRefreshTimer;
+private final Map watchKeyPathMap = new HashMap<>();
+private final Map fileToStoreMap = new 
HashMap<>();
+private final AtomicReference lastLoadFailure = new 
AtomicReference<>(null);
+
+SecurityFileChangeListener(final Timer keyStoreRefreshTimer,
+   final Timer trustStoreRefreshTimer) {
+this.keyStoreRefreshTimer = keyStoreRefreshTimer;
+this.trustStoreRefreshTimer = trustStoreRefreshTimer;
+}
+
+void updateStoreKey(SecurityStore store, final String watchFile) {
+try {
+Path filePath = Paths.get(watchFile);
+fileToStoreMap.put(filePath, store);
+
+Path dirPath = filePath.getParent();
+if (dirPath == null) {
+throw new IOException("Unexpected null path with no 
parent");
+}
+
+if (!Files.exists(dirPath)) {
+Files.createDirectories(dirPath);
+}
+WatchKey watchkey = dirPath.register(watchService,
+StandardWatchEventKinds.ENTRY_CREATE,
+StandardWatchEventKinds.ENTRY_MODIFY,
+StandardWatchEventKinds.ENTRY_DELETE,
+StandardWatchEventKinds.OVERFLOW);
+watchKeyPathMap.put(watchkey, dirPath);
+log.info("Watch service registered for store path = {}", 
dirPath);
+} catch (IOException e) {
+// If the update failed, we will try to use existing store 
path instead.
+log.error("Could not register store path for file {}", 
watchFile, e);
+}
+}
+
+// For testing purpose now.
+Exception lastLoadFailure() {
+return lastLoadFailure.get();
+}
+
+public void run() {
+for (Map.Entry key : watchKeyPathMap.entrySet()) {
+log.debug("Starting listening for change key {} for path {}", 
key.getKey(), key.getValue());
+}
+resetKeyStoreTimer();
+resetTrustStoreTimer();
+
+try {
+runLoop();
+} catch (InterruptedException ie) {
+log.debug("Security file listener {} was interrupted to 
shutdown", watchKeyPathMap);
+} catch (Exception e) {
+log.warn("Hit a fatal exception in security file listener", e);
+}
+}
+
+private void runLoop() throws InterruptedException {
+while (!watchKeyPathMap.isEmpty()) {
+keyStoreRefreshTimer.update();
+trustStoreRefreshTimer.update();
+final long maxPollIntervalMs = 
Math.min(keyStoreRefreshTimer.remainingMs(),
+trustStoreRefreshTimer.remainingMs());
+log.debug("Max poll interval is {} with trust store remaining 
time {} and trust store time {}",
+maxPollIntervalMs, trustStoreRefreshTimer.remainingMs(), 
trustStoreRefreshIntervalMs);
+WatchKey watchKey = watchService.poll(maxPollIntervalMs, 
TimeUnit.MILLISECONDS);
+
+// Handle file update triggered events.
+if (watchKey != null && watchKeyPathMap.containsKey(watchKey)) 
{
+for (WatchEvent event: watc

[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

2021-06-01 Thread GitBox


thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r642967602



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4225,76 +4232,84 @@ public ListOffsetsResult 
listOffsets(Map topicPartit
 }
 }
 
-for (final Map.Entry> entry : 
leaders.entrySet()) {
-final int brokerId = entry.getKey().id();
+for (final Map.Entry>> versionedEntry : leaders.entrySet()) {
+for (final Map.Entry> entry : versionedEntry.getValue().entrySet()) {
+final int brokerId = versionedEntry.getKey().id();
 
-calls.add(new Call("listOffsets on broker " + brokerId, 
context.deadline(), new ConstantNodeIdProvider(brokerId)) {
+calls.add(new Call("listOffsets on broker " + brokerId, 
context.deadline(), new ConstantNodeIdProvider(brokerId)) {
 
-final List partitionsToQuery = new 
ArrayList<>(entry.getValue().values());
+final List partitionsToQuery = new 
ArrayList<>(entry.getValue().values());
 
-@Override
-ListOffsetsRequest.Builder createRequest(int timeoutMs) {
-return ListOffsetsRequest.Builder
+@Override
+ListOffsetsRequest.Builder createRequest(int timeoutMs) {
+ListOffsetRequestVersion requestVersion = 
entry.getKey();
+if (requestVersion == 
ListOffsetRequestVersion.V7AndAbove) {
+return ListOffsetsRequest.Builder
+
.forMaxTimestamp(context.options().isolationLevel())
+.setTargetTimes(partitionsToQuery);
+}

Review comment:
   Ah, I missed that pattern. I'll fix it up to be consistent with the 
other examples.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12870) RecordAccumulator stuck in a flushing state

2021-06-01 Thread Niclas Lockner (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Niclas Lockner updated KAFKA-12870:
---
Affects Version/s: (was: 2.6.1)
   2.5.1
   2.7.1
   2.6.2

> RecordAccumulator stuck in a flushing state
> ---
>
> Key: KAFKA-12870
> URL: https://issues.apache.org/jira/browse/KAFKA-12870
> Project: Kafka
>  Issue Type: Bug
>  Components: producer , streams
>Affects Versions: 2.5.1, 2.8.0, 2.7.1, 2.6.2
>Reporter: Niclas Lockner
>Priority: Major
> Attachments: RecordAccumulator.log, full.log
>
>
> After a Kafka Stream with exactly once enabled has performed its first 
> commit, the RecordAccumulator within the stream's internal producer gets 
> stuck in a state where all subsequent ProducerBatches that get allocated are 
> immediately flushed instead of being held in memory until they expire, 
> regardless of the stream's linger or batch size config.
> This is reproduced in the example code found at 
> [https://github.com/niclaslockner/kafka-12870] which can be run with 
> ./gradlew run --args=
> The example has a producer that sends 1 record/sec to one topic, and a Kafka 
> stream with EOS enabled that forwards the records from that topic to another 
> topic with the configuration linger = 5 sec, commit interval = 10 sec.
>  
> The expected behavior when running the example is that the stream's 
> ProducerBatches will expire (or get flushed because of the commit) every 5th 
> second, and that the stream's producer will send a ProduceRequest every 5th 
> second with an expired ProducerBatch that contains 5 records.
> The actual behavior is that the ProducerBatch is made immediately available 
> for the Sender, and the Sender sends one ProduceRequest for each record.
>  
> The example code contains a copy of the RecordAccumulator class (copied from 
> kafka-clients 2.8.0) with some additional logging added to
>  * RecordAccumulator#ready(Cluster, long)
>  * RecordAccumulator#beginFlush()
>  * RecordAccumulator#awaitFlushCompletion()
> These log entries show (see the attached RecordsAccumulator.log)
>  * that the batches are considered sendable because a flush is in progress
>  * that Sender.maybeSendAndPollTransactionalRequest() calls 
> RecordAccumulator's beginFlush() without also calling awaitFlushCompletion(), 
> and that this makes RecordAccumulator's flushesInProgress jump between 1-2 
> instead of the expected 0-1.
>  
> This issue is not reproducible in version 2.3.1.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12870) RecordAccumulator stuck in a flushing state

2021-06-01 Thread Niclas Lockner (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Niclas Lockner updated KAFKA-12870:
---
Description: 
After a Kafka Stream with exactly once enabled has performed its first commit, 
the RecordAccumulator within the stream's internal producer gets stuck in a 
state where all subsequent ProducerBatches that get allocated are immediately 
flushed instead of being held in memory until they expire, regardless of the 
stream's linger or batch size config.

This is reproduced in the example code found at 
[https://github.com/niclaslockner/kafka-12870] which can be run with ./gradlew 
run --args=

The example has a producer that sends 1 record/sec to one topic, and a Kafka 
stream with EOS enabled that forwards the records from that topic to another 
topic with the configuration linger = 5 sec, commit interval = 10 sec.

 

The expected behavior when running the example is that the stream's 
ProducerBatches will expire (or get flushed because of the commit) every 5th 
second, and that the stream's producer will send a ProduceRequest every 5th 
second with an expired ProducerBatch that contains 5 records.

The actual behavior is that the ProducerBatch is made immediately available for 
the Sender, and the Sender sends one ProduceRequest for each record.

 

The example code contains a copy of the RecordAccumulator class (copied from 
kafka-clients 2.8.0) with some additional logging added to
 * RecordAccumulator#ready(Cluster, long)
 * RecordAccumulator#beginFlush()
 * RecordAccumulator#awaitFlushCompletion()

These log entries show (see the attached RecordsAccumulator.log)
 * that the batches are considered sendable because a flush is in progress
 * that Sender.maybeSendAndPollTransactionalRequest() calls RecordAccumulator's 
beginFlush() without also calling awaitFlushCompletion(), and that this makes 
RecordAccumulator's flushesInProgress jump between 1-2 instead of the expected 
0-1.

 

This issue is not reproducible in version 2.3.1 or 2.4.1.

 

 

  was:
After a Kafka Stream with exactly once enabled has performed its first commit, 
the RecordAccumulator within the stream's internal producer gets stuck in a 
state where all subsequent ProducerBatches that get allocated are immediately 
flushed instead of being held in memory until they expire, regardless of the 
stream's linger or batch size config.

This is reproduced in the example code found at 
[https://github.com/niclaslockner/kafka-12870] which can be run with ./gradlew 
run --args=

The example has a producer that sends 1 record/sec to one topic, and a Kafka 
stream with EOS enabled that forwards the records from that topic to another 
topic with the configuration linger = 5 sec, commit interval = 10 sec.

 

The expected behavior when running the example is that the stream's 
ProducerBatches will expire (or get flushed because of the commit) every 5th 
second, and that the stream's producer will send a ProduceRequest every 5th 
second with an expired ProducerBatch that contains 5 records.

The actual behavior is that the ProducerBatch is made immediately available for 
the Sender, and the Sender sends one ProduceRequest for each record.

 

The example code contains a copy of the RecordAccumulator class (copied from 
kafka-clients 2.8.0) with some additional logging added to
 * RecordAccumulator#ready(Cluster, long)
 * RecordAccumulator#beginFlush()
 * RecordAccumulator#awaitFlushCompletion()

These log entries show (see the attached RecordsAccumulator.log)
 * that the batches are considered sendable because a flush is in progress
 * that Sender.maybeSendAndPollTransactionalRequest() calls RecordAccumulator's 
beginFlush() without also calling awaitFlushCompletion(), and that this makes 
RecordAccumulator's flushesInProgress jump between 1-2 instead of the expected 
0-1.

 

This issue is not reproducible in version 2.3.1.

 

 


> RecordAccumulator stuck in a flushing state
> ---
>
> Key: KAFKA-12870
> URL: https://issues.apache.org/jira/browse/KAFKA-12870
> Project: Kafka
>  Issue Type: Bug
>  Components: producer , streams
>Affects Versions: 2.5.1, 2.8.0, 2.7.1, 2.6.2
>Reporter: Niclas Lockner
>Priority: Major
> Attachments: RecordAccumulator.log, full.log
>
>
> After a Kafka Stream with exactly once enabled has performed its first 
> commit, the RecordAccumulator within the stream's internal producer gets 
> stuck in a state where all subsequent ProducerBatches that get allocated are 
> immediately flushed instead of being held in memory until they expire, 
> regardless of the stream's linger or batch size config.
> This is reproduced in the example code found at 
> [https://github.com/niclaslockner/kafka-12870] which can be run with 
> ./gradlew run --args=
> The example has a producer that sen

[jira] [Updated] (KAFKA-12793) Client-side Circuit Breaker for Partition Write Errors

2021-06-01 Thread KahnCheny (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

KahnCheny updated KAFKA-12793:
--
Description: 
When Kafka is used to build data pipeline in mission critical business 
scenarios, availability and throughput are the most important operational goals 
that need to be maintained in presence of transient or permanent local failure. 
One typical situation that requires Ops intervention is disk failure, some 
partitions have long write latency caused by extremely high disk utilization; 
since all partitions share the same buffer under the current producer thread 
model, the buffer will be filled up quickly and eventually the good partitions 
are impacted as well. The cluster level success rate and timeout ratio will 
degrade until the local infrastructure issue is resolved.

One way to mitigate this issue is to add client side mechanism to short circuit 
problematic partitions during transient failure. Similar approach is applied in 
other distributed systems and RPC frameworks.

We propose to add a configuration driven circuit breaking mechanism that allows 
Kafka client to ‘mute’ partitions when certain condition is met. The mechanism 
adds callbacks in Sender class workflow that allows to filtering partitions 
based on certain policy.

The client can choose proper implementation that fits a special failure 
scenario, Client-side custom implementation of Partitioner and 
ProducerInterceptor

* Customize the implementation of ProducerInterceptor, and choose the strategy 
to mute partitions.

* Customize the implementation of Partitioner, and choose the strategy to 
filtering partitions.

Muting partitions have impact when the topic contains keyed message as messages 
will be written to more than one partitions during period of recovery. We 
believe this can be an explicit trade-off the application makes between 
availability and message ordering.


KIP-693: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-693%3A+Client-side+Circuit+Breaker+for+Partition+Write+Errors|https://cwiki.apache.org/confluence/display/KAFKA/KIP-693%3A+Client-side+Circuit+Breaker+for+Partition+Write+Errors]

  was:
When Kafka is used to build data pipeline in mission critical business 
scenarios, availability and throughput are the most important operational goals 
that need to be maintained in presence of transient or permanent local failure. 
One typical situation that requires Ops intervention is disk failure, some 
partitions have long write latency caused by extremely high disk utilization; 
since all partitions share the same buffer under the current producer thread 
model, the buffer will be filled up quickly and eventually the good partitions 
are impacted as well. The cluster level success rate and timeout ratio will 
degrade until the local infrastructure issue is resolved.

One way to mitigate this issue is to add client side mechanism to short circuit 
problematic partitions during transient failure. Similar approach is applied in 
other distributed systems and RPC frameworks.

KIP-693: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-693%3A+Client-side+Circuit+Breaker+for+Partition+Write+Errors|https://cwiki.apache.org/confluence/display/KAFKA/KIP-693%3A+Client-side+Circuit+Breaker+for+Partition+Write+Errors]


> Client-side Circuit Breaker for Partition Write Errors
> --
>
> Key: KAFKA-12793
> URL: https://issues.apache.org/jira/browse/KAFKA-12793
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Reporter: KahnCheny
>Priority: Major
>
> When Kafka is used to build data pipeline in mission critical business 
> scenarios, availability and throughput are the most important operational 
> goals that need to be maintained in presence of transient or permanent local 
> failure. One typical situation that requires Ops intervention is disk 
> failure, some partitions have long write latency caused by extremely high 
> disk utilization; since all partitions share the same buffer under the 
> current producer thread model, the buffer will be filled up quickly and 
> eventually the good partitions are impacted as well. The cluster level 
> success rate and timeout ratio will degrade until the local infrastructure 
> issue is resolved.
> One way to mitigate this issue is to add client side mechanism to short 
> circuit problematic partitions during transient failure. Similar approach is 
> applied in other distributed systems and RPC frameworks.
> We propose to add a configuration driven circuit breaking mechanism that 
> allows Kafka client to ‘mute’ partitions when certain condition is met. The 
> mechanism adds callbacks in Sender class workflow that allows to filtering 
> partitions based on certain policy.
> The client can choose proper implementation tha

[GitHub] [kafka] cadonna commented on pull request #10770: MINOR: fix code listings security.html

2021-06-01 Thread GitBox


cadonna commented on pull request #10770:
URL: https://github.com/apache/kafka/pull/10770#issuecomment-852043836


   The test failures are unrelated since this is a pure doc change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna merged pull request #10770: MINOR: fix code listings security.html

2021-06-01 Thread GitBox


cadonna merged pull request #10770:
URL: https://github.com/apache/kafka/pull/10770


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12793) Client-side Circuit Breaker for Partition Write Errors

2021-06-01 Thread KahnCheny (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

KahnCheny updated KAFKA-12793:
--
Attachment: (was: 
KAFKA-12793__KIP-693,_Client-side_supports_partitioned_circuit_breaker.patch)

> Client-side Circuit Breaker for Partition Write Errors
> --
>
> Key: KAFKA-12793
> URL: https://issues.apache.org/jira/browse/KAFKA-12793
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Reporter: KahnCheny
>Priority: Major
>
> When Kafka is used to build data pipeline in mission critical business 
> scenarios, availability and throughput are the most important operational 
> goals that need to be maintained in presence of transient or permanent local 
> failure. One typical situation that requires Ops intervention is disk 
> failure, some partitions have long write latency caused by extremely high 
> disk utilization; since all partitions share the same buffer under the 
> current producer thread model, the buffer will be filled up quickly and 
> eventually the good partitions are impacted as well. The cluster level 
> success rate and timeout ratio will degrade until the local infrastructure 
> issue is resolved.
> One way to mitigate this issue is to add client side mechanism to short 
> circuit problematic partitions during transient failure. Similar approach is 
> applied in other distributed systems and RPC frameworks.
> We propose to add a configuration driven circuit breaking mechanism that 
> allows Kafka client to ‘mute’ partitions when certain condition is met. The 
> mechanism adds callbacks in Sender class workflow that allows to filtering 
> partitions based on certain policy.
> The client can choose proper implementation that fits a special failure 
> scenario, Client-side custom implementation of Partitioner and 
> ProducerInterceptor
> * Customize the implementation of ProducerInterceptor, and choose the 
> strategy to mute partitions.
> * Customize the implementation of Partitioner, and choose the strategy to 
> filtering partitions.
> Muting partitions have impact when the topic contains keyed message as 
> messages will be written to more than one partitions during period of 
> recovery. We believe this can be an explicit trade-off the application makes 
> between availability and message ordering.
> KIP-693: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-693%3A+Client-side+Circuit+Breaker+for+Partition+Write+Errors|https://cwiki.apache.org/confluence/display/KAFKA/KIP-693%3A+Client-side+Circuit+Breaker+for+Partition+Write+Errors]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12793) Client-side Circuit Breaker for Partition Write Errors

2021-06-01 Thread KahnCheny (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

KahnCheny updated KAFKA-12793:
--
Attachment: 
KAFKA-12793__KIP-693,_Client-side_supports_partitioned_circuit_breaker.patch

> Client-side Circuit Breaker for Partition Write Errors
> --
>
> Key: KAFKA-12793
> URL: https://issues.apache.org/jira/browse/KAFKA-12793
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Reporter: KahnCheny
>Priority: Major
>
> When Kafka is used to build data pipeline in mission critical business 
> scenarios, availability and throughput are the most important operational 
> goals that need to be maintained in presence of transient or permanent local 
> failure. One typical situation that requires Ops intervention is disk 
> failure, some partitions have long write latency caused by extremely high 
> disk utilization; since all partitions share the same buffer under the 
> current producer thread model, the buffer will be filled up quickly and 
> eventually the good partitions are impacted as well. The cluster level 
> success rate and timeout ratio will degrade until the local infrastructure 
> issue is resolved.
> One way to mitigate this issue is to add client side mechanism to short 
> circuit problematic partitions during transient failure. Similar approach is 
> applied in other distributed systems and RPC frameworks.
> We propose to add a configuration driven circuit breaking mechanism that 
> allows Kafka client to ‘mute’ partitions when certain condition is met. The 
> mechanism adds callbacks in Sender class workflow that allows to filtering 
> partitions based on certain policy.
> The client can choose proper implementation that fits a special failure 
> scenario, Client-side custom implementation of Partitioner and 
> ProducerInterceptor
> * Customize the implementation of ProducerInterceptor, and choose the 
> strategy to mute partitions.
> * Customize the implementation of Partitioner, and choose the strategy to 
> filtering partitions.
> Muting partitions have impact when the topic contains keyed message as 
> messages will be written to more than one partitions during period of 
> recovery. We believe this can be an explicit trade-off the application makes 
> between availability and message ordering.
> KIP-693: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-693%3A+Client-side+Circuit+Breaker+for+Partition+Write+Errors|https://cwiki.apache.org/confluence/display/KAFKA/KIP-693%3A+Client-side+Circuit+Breaker+for+Partition+Write+Errors]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12793) Client-side Circuit Breaker for Partition Write Errors

2021-06-01 Thread KahnCheny (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

KahnCheny updated KAFKA-12793:
--
Attachment: KAFKA-12793.patch

> Client-side Circuit Breaker for Partition Write Errors
> --
>
> Key: KAFKA-12793
> URL: https://issues.apache.org/jira/browse/KAFKA-12793
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Reporter: KahnCheny
>Priority: Major
> Attachments: KAFKA-12793.patch
>
>
> When Kafka is used to build data pipeline in mission critical business 
> scenarios, availability and throughput are the most important operational 
> goals that need to be maintained in presence of transient or permanent local 
> failure. One typical situation that requires Ops intervention is disk 
> failure, some partitions have long write latency caused by extremely high 
> disk utilization; since all partitions share the same buffer under the 
> current producer thread model, the buffer will be filled up quickly and 
> eventually the good partitions are impacted as well. The cluster level 
> success rate and timeout ratio will degrade until the local infrastructure 
> issue is resolved.
> One way to mitigate this issue is to add client side mechanism to short 
> circuit problematic partitions during transient failure. Similar approach is 
> applied in other distributed systems and RPC frameworks.
> We propose to add a configuration driven circuit breaking mechanism that 
> allows Kafka client to ‘mute’ partitions when certain condition is met. The 
> mechanism adds callbacks in Sender class workflow that allows to filtering 
> partitions based on certain policy.
> The client can choose proper implementation that fits a special failure 
> scenario, Client-side custom implementation of Partitioner and 
> ProducerInterceptor
> * Customize the implementation of ProducerInterceptor, and choose the 
> strategy to mute partitions.
> * Customize the implementation of Partitioner, and choose the strategy to 
> filtering partitions.
> Muting partitions have impact when the topic contains keyed message as 
> messages will be written to more than one partitions during period of 
> recovery. We believe this can be an explicit trade-off the application makes 
> between availability and message ordering.
> KIP-693: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-693%3A+Client-side+Circuit+Breaker+for+Partition+Write+Errors|https://cwiki.apache.org/confluence/display/KAFKA/KIP-693%3A+Client-side+Circuit+Breaker+for+Partition+Write+Errors]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5676) MockStreamsMetrics should be in o.a.k.test

2021-06-01 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17355048#comment-17355048
 ] 

Bruno Cadonna commented on KAFKA-5676:
--

[~marcolotz] Thank you for your analysis!
 I agree that some aspects of the code can be improved. For example, it is 
questionable why methods like {{StreamsMetricsImpl#taskLevelSensor(...)}} are 
final. I do not share your concern about the static methods like 
{{StreamsMetricsImpl#addValueMetricToSensor(...)}}. They are utility methods 
that do not need the state of a {{StreamsMetricsImpl}} object. Some 
refactorings would maybe improve the situation. 

IMO, we should close this ticket as Won't Do and remove {{MockStreamsMetrics}} 
via KAFKA-8977. What do you think [~guozhang]?

> MockStreamsMetrics should be in o.a.k.test
> --
>
> Key: KAFKA-5676
> URL: https://issues.apache.org/jira/browse/KAFKA-5676
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie
>  Time Spent: 96h
>  Remaining Estimate: 0h
>
> {{MockStreamsMetrics}}'s package should be `o.a.k.test` not 
> `o.a.k.streams.processor.internals`. 
> In addition, it should not require a {{Metrics}} parameter in its constructor 
> as it is only needed for its extended base class; the right way of mocking 
> should be implementing {{StreamsMetrics}} with mock behavior than extended a 
> real implementaion of {{StreamsMetricsImpl}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #10746: MINOR: remove unneccessary public keyword from ProducerInterceptor/ConsumerInterceptor interface

2021-06-01 Thread GitBox


showuon commented on pull request #10746:
URL: https://github.com/apache/kafka/pull/10746#issuecomment-852066023


   @KahnCheny , it looks like you made some change after I approved this PR. 
Now, there are 162 files changed. Could you check again? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10770: MINOR: fix code listings security.html

2021-06-01 Thread GitBox


jlprat commented on pull request #10770:
URL: https://github.com/apache/kafka/pull/10770#issuecomment-852066684


   Thanks @cadonna !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException

2021-06-01 Thread GitBox


showuon commented on pull request #10749:
URL: https://github.com/apache/kafka/pull/10749#issuecomment-852067814


   @jsancio @mumrah , could you have another review for this PR since you are 
more familiar with this ticket. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #10765: KAFKA-12519: Remove built-in Streams metrics for versions 0.10.0-2.4

2021-06-01 Thread GitBox


cadonna commented on pull request #10765:
URL: https://github.com/apache/kafka/pull/10765#issuecomment-852068844


   @showuon Thanks for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10471: KAFKA-12597: remove deprecated zookeeper option in ReassignPartitionsCommand

2021-06-01 Thread GitBox


showuon commented on pull request #10471:
URL: https://github.com/apache/kafka/pull/10471#issuecomment-852069445


   @ijuma , could you help check this PR again? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10457: KAFKA-12596: remove --zookeeper option from topic command

2021-06-01 Thread GitBox


showuon commented on pull request #10457:
URL: https://github.com/apache/kafka/pull/10457#issuecomment-852069681


   @ijuma  , could you help check this PR again? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna merged pull request #10765: KAFKA-12519: Remove built-in Streams metrics for versions 0.10.0-2.4

2021-06-01 Thread GitBox


cadonna merged pull request #10765:
URL: https://github.com/apache/kafka/pull/10765


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10471: KAFKA-12597: remove deprecated zookeeper option in ReassignPartitionsCommand

2021-06-01 Thread GitBox


showuon commented on pull request #10471:
URL: https://github.com/apache/kafka/pull/10471#issuecomment-852070237


   Failed tests are flaky `RaftClusterTest` tests. Thanks.
   ```
   Build / JDK 15 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   Build / JDK 15 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] KahnCheny closed pull request #10746: MINOR: remove unneccessary public keyword from ProducerInterceptor/ConsumerInterceptor interface

2021-06-01 Thread GitBox


KahnCheny closed pull request #10746:
URL: https://github.com/apache/kafka/pull/10746


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] KahnCheny opened a new pull request #10801: MINOR: remove unneccessary public keyword from ProducerInterceptor/ConsumerInterceptor interface

2021-06-01 Thread GitBox


KahnCheny opened a new pull request #10801:
URL: https://github.com/apache/kafka/pull/10801


   remove unneccessary public keyword from 
ProducerInterceptor/ConsumerInterceptor interface
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] KahnCheny commented on pull request #10746: MINOR: remove unneccessary public keyword from ProducerInterceptor/ConsumerInterceptor interface

2021-06-01 Thread GitBox


KahnCheny commented on pull request #10746:
URL: https://github.com/apache/kafka/pull/10746#issuecomment-852083759


   merge trunk will be automatically updated, so I  submit a new PR, Thanks
   for your and re-review ?
   https://github.com/apache/kafka/pull/10801
   
   Luke Chen ***@***.***> 于2021年6月1日周二 下午7:59写道:
   
   > @KahnCheny  , it looks like you made some
   > change after I approved this PR. Now, there are 162 files changed. Could
   > you check again? Thanks.
   >
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > , or
   > unsubscribe
   > 

   > .
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12871) Kafka-connect : rest api connect config, fields are not ordered

2021-06-01 Thread raphael auv (Jira)
raphael auv created KAFKA-12871:
---

 Summary: Kafka-connect : rest api connect config, fields are not 
ordered
 Key: KAFKA-12871
 URL: https://issues.apache.org/jira/browse/KAFKA-12871
 Project: Kafka
  Issue Type: Improvement
Reporter: raphael auv


We you query the rest api of kafka-connect o get the config of a connector the 
fields are not ordered in alphabetical order 

[http://my_kafka_connect_cluster:8083/connectors/my_connector_1/]

answer :


{code:java}
{
   "name":"my_connector_1",
   "config":{
  "connector.class":"Something",
  "errors.log.include.messages":"true",
  "tasks.max":"2",
  "buffer.flush.time":"300",
  "topics.regex":"^(?:.*",
  "errors.deadletterqueue.context.headers.enable":"true",
  "buffer.count.records":"100",
  "name":"my_connector_1",
  "errors.log.enable":"true",
  "key.converter":"org.apache.kafka.connect.storage.StringConverter",
  "buffer.size.bytes":"2000"
   },
   "tasks":[
  {
 "connector":"my_connector_1",
 "task":0
  },
  {
 "connector":"my_connector_1",
 "task":1
  }
   ],
   "type":"sink"
}{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on pull request #10795: KAFKA-12866: Avoid root access to Zookeeper

2021-06-01 Thread GitBox


rondagostino commented on pull request #10795:
URL: https://github.com/apache/kafka/pull/10795#issuecomment-852123180


   Good catch @soarez, and thanks for the PR!  Test failed without the fix and 
passed with it.  Thanks again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy merged pull request #10795: KAFKA-12866: Avoid root access to Zookeeper

2021-06-01 Thread GitBox


omkreddy merged pull request #10795:
URL: https://github.com/apache/kafka/pull/10795


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12866) Kafka requires ZK root access even when using a chroot

2021-06-01 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-12866:
--
Fix Version/s: 3.0.0

> Kafka requires ZK root access even when using a chroot
> --
>
> Key: KAFKA-12866
> URL: https://issues.apache.org/jira/browse/KAFKA-12866
> Project: Kafka
>  Issue Type: Bug
>  Components: core, zkclient
>Affects Versions: 2.6.1, 2.8.0, 2.7.1, 2.6.2
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Major
> Fix For: 3.0.0
>
>
> When a Zookeeper chroot is configured, users do not expect Kafka to need 
> Zookeeper access outside of that chroot.
> h1. Why is this important?
> A zookeeper cluster may be shared with other Kafka clusters or even other 
> applications. It is an expected security practice to restrict each 
> cluster/application's access to it's own Zookeeper chroot.
> h1. Steps to reproduce
> h2. Zookeeper setup
> Using the zkCli, create a chroot for Kafka, make it available to Kafka but 
> lock the root znode.
>  
> {code:java}
> [zk: localhost:2181(CONNECTED) 1] create /somechroot
> Created /some
> [zk: localhost:2181(CONNECTED) 2] setAcl /somechroot world:anyone:cdrwa
> [zk: localhost:2181(CONNECTED) 3] addauth digest test:12345
> [zk: localhost:2181(CONNECTED) 4] setAcl / 
> digest:test:Mx1uO9GLtm1qaVAQ20Vh9ODgACg=:cdrwa{code}
>  
> h2. Kafka setup
> Configure the chroot in broker.properties:
>  
> {code:java}
> zookeeper.connect=localhost:2181/somechroot{code}
>  
>  
> h2. Expected behavior
> The expected behavior here is that Kafka will use the chroot without issues.
> h2. Actual result
> Kafka fails to start with a fatal exception:
> {code:java}
> org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = 
> NoAuth for /chroot
> at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:120)
> at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
> at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:583)
> at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1729)
> at 
> kafka.zk.KafkaZkClient.makeSurePersistentPathExists(KafkaZkClient.scala:1627)
> at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1957)
> at 
> kafka.zk.ZkClientAclTest.testChrootExistsAndRootIsLocked(ZkClientAclTest.scala:60)
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12866) Kafka requires ZK root access even when using a chroot

2021-06-01 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-12866.
---
Resolution: Fixed

> Kafka requires ZK root access even when using a chroot
> --
>
> Key: KAFKA-12866
> URL: https://issues.apache.org/jira/browse/KAFKA-12866
> Project: Kafka
>  Issue Type: Bug
>  Components: core, zkclient
>Affects Versions: 2.6.1, 2.8.0, 2.7.1, 2.6.2
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Major
> Fix For: 3.0.0
>
>
> When a Zookeeper chroot is configured, users do not expect Kafka to need 
> Zookeeper access outside of that chroot.
> h1. Why is this important?
> A zookeeper cluster may be shared with other Kafka clusters or even other 
> applications. It is an expected security practice to restrict each 
> cluster/application's access to it's own Zookeeper chroot.
> h1. Steps to reproduce
> h2. Zookeeper setup
> Using the zkCli, create a chroot for Kafka, make it available to Kafka but 
> lock the root znode.
>  
> {code:java}
> [zk: localhost:2181(CONNECTED) 1] create /somechroot
> Created /some
> [zk: localhost:2181(CONNECTED) 2] setAcl /somechroot world:anyone:cdrwa
> [zk: localhost:2181(CONNECTED) 3] addauth digest test:12345
> [zk: localhost:2181(CONNECTED) 4] setAcl / 
> digest:test:Mx1uO9GLtm1qaVAQ20Vh9ODgACg=:cdrwa{code}
>  
> h2. Kafka setup
> Configure the chroot in broker.properties:
>  
> {code:java}
> zookeeper.connect=localhost:2181/somechroot{code}
>  
>  
> h2. Expected behavior
> The expected behavior here is that Kafka will use the chroot without issues.
> h2. Actual result
> Kafka fails to start with a fatal exception:
> {code:java}
> org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = 
> NoAuth for /chroot
> at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:120)
> at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
> at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:583)
> at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1729)
> at 
> kafka.zk.KafkaZkClient.makeSurePersistentPathExists(KafkaZkClient.scala:1627)
> at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1957)
> at 
> kafka.zk.ZkClientAclTest.testChrootExistsAndRootIsLocked(ZkClientAclTest.scala:60)
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12519) Consider Removing Streams Old Built-in Metrics Version

2021-06-01 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-12519.
---
Resolution: Fixed

> Consider Removing Streams Old Built-in Metrics Version 
> ---
>
> Key: KAFKA-12519
> URL: https://issues.apache.org/jira/browse/KAFKA-12519
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> We refactored the Streams' built-in metrics in KIP-444 and the new structure 
> was released in 2.5. We should consider removing the old structure in the 
> upcoming 3.0 release. This would give us the opportunity to simplify the code 
> around the built in metrics since we would not need to consider different 
> versions anymore.   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #10457: KAFKA-12596: remove --zookeeper option from topic command

2021-06-01 Thread GitBox


ijuma commented on pull request #10457:
URL: https://github.com/apache/kafka/pull/10457#issuecomment-852146723


   Unrelated failures:
   
   > Build / JDK 8 and Scala 2.12 / testReplicationWithEmptyPartition() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest
   > 4s
   > Build / JDK 11 and Scala 2.13 / 
testCreateClusterAndCreateListDeleteTopic() – kafka.server.RaftClusterTest
   > 15s
   > Build / JDK 15 and Scala 2.13 / testReplication() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10457: KAFKA-12596: remove --zookeeper option from topic command

2021-06-01 Thread GitBox


[GitHub] [kafka] ijuma commented on pull request #10457: KAFKA-12596: remove --zookeeper option from topic command

2021-06-01 Thread GitBox


ijuma commented on pull request #10457:
URL: https://github.com/apache/kafka/pull/10457#issuecomment-852149539


   After merging, I noticed we are missing a note in upgrade.html. We can fix 
that as part of #10471,


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma edited a comment on pull request #10457: KAFKA-12596: remove --zookeeper option from topic command

2021-06-01 Thread GitBox


ijuma edited a comment on pull request #10457:
URL: https://github.com/apache/kafka/pull/10457#issuecomment-852149539


   After merging, I noticed we are missing a note in upgrade.html. We can fix 
that as part of #10471.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10471: KAFKA-12597: remove deprecated zookeeper option in ReassignPartitionsCommand

2021-06-01 Thread GitBox


ijuma commented on a change in pull request #10471:
URL: https://github.com/apache/kafka/pull/10471#discussion_r643139325



##
File path: 
core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
##
@@ -151,7 +148,7 @@ class ReassignPartitionsIntegrationTest extends 
ZooKeeperTestHarness {
   
"""{"topic":"bar","partition":0,"replicas":[3,2,0],"log_dirs":["any","any","any"]}"""
 +
   """]}"""
 // Execute the assignment
-runExecuteAssignment(zkClient, assignment, -1L)
+runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L)

Review comment:
   Looks like the test name doesn't match what we're testing now.

##
File path: 
core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
##
@@ -128,11 +126,10 @@ class ReassignPartitionsIntegrationTest extends 
ZooKeeperTestHarness {
 PartitionReassignmentState(Seq(3, 2, 0), Seq(3, 2, 0), true)
 )
 
-// When using --zookeeper, we aren't able to see the new-style assignment

Review comment:
   Since we are removing this comment, should we be using new style 
assignment?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12598) Remove deprecated --zookeeper in ConfigCommand

2021-06-01 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-12598:

Fix Version/s: 3.0.0

> Remove deprecated --zookeeper in ConfigCommand
> --
>
> Key: KAFKA-12598
> URL: https://issues.apache.org/jira/browse/KAFKA-12598
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12598) Remove deprecated --zookeeper in ConfigCommand

2021-06-01 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17355133#comment-17355133
 ] 

Ismael Juma commented on KAFKA-12598:
-

[~showuon] We should do this for everything except SCRAM settings that can be 
set before a cluster restart.

> Remove deprecated --zookeeper in ConfigCommand
> --
>
> Key: KAFKA-12598
> URL: https://issues.apache.org/jira/browse/KAFKA-12598
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12598) Remove deprecated --zookeeper in ConfigCommand

2021-06-01 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-12598:

Priority: Critical  (was: Major)

> Remove deprecated --zookeeper in ConfigCommand
> --
>
> Key: KAFKA-12598
> URL: https://issues.apache.org/jira/browse/KAFKA-12598
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Critical
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #10457: KAFKA-12596: remove --zookeeper option from topic command

2021-06-01 Thread GitBox


ijuma commented on pull request #10457:
URL: https://github.com/apache/kafka/pull/10457#issuecomment-852167114


   One more thing, did we check that there are no system tests using the flag 
we just removed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Reopened] (KAFKA-12598) Remove deprecated --zookeeper in ConfigCommand

2021-06-01 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reopened KAFKA-12598:
-

> Remove deprecated --zookeeper in ConfigCommand
> --
>
> Key: KAFKA-12598
> URL: https://issues.apache.org/jira/browse/KAFKA-12598
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #10695: KAFKA-12783: Remove the deprecated ZK-based partition reassignment API

2021-06-01 Thread GitBox


ijuma commented on pull request #10695:
URL: https://github.com/apache/kafka/pull/10695#issuecomment-852169319


   I looked into it a bit more and this PR also removes the Controller 
functionality while #10471 only updates the reassign partitions command. We can 
probably get #10471 merged today and we can then move into this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze opened a new pull request #10802: KAFKA-6718 / Part1: Update SubscriptionInfoData with clientTags

2021-06-01 Thread GitBox


lkokhreidze opened a new pull request #10802:
URL: https://github.com/apache/kafka/pull/10802


   Part 1 of 
[KIP-708](https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awareness+for+Kafka+Streams)
 implementation. Full implementation can be checked here 
https://github.com/apache/kafka/pull/10785. Splitting PRs into three smaller 
PRs to make the review process easier to follow. Overall plan is the following:
   
   👉  Part-1: Protocol change, add `clientTags` to `SubscriptionInfoData`
   ⏭️  Part-2: Rack aware standby task assignment logic.
   ⏭️  Part-3: Add required configurations to `StreamsConfig` (public API 
change, at this point we should have full functionality)
   
   This PR adds `clientTags` to `SubscriptionInfoData` and implements Part-1 of 
above mentioned plan.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on pull request #10802: KAFKA-6718 / Part1: Update SubscriptionInfoData with clientTags

2021-06-01 Thread GitBox


lkokhreidze commented on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-852188025


   Call for review @cadonna @vvcephei @ableegoldman 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12872) KIP-724: Drop support for message formats v0 and v1

2021-06-01 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-12872:
---

 Summary: KIP-724: Drop support for message formats v0 and v1
 Key: KAFKA-12872
 URL: https://issues.apache.org/jira/browse/KAFKA-12872
 Project: Kafka
  Issue Type: Bug
Reporter: Ismael Juma
Assignee: Ismael Juma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12872) KIP-724: Drop support for message formats v0 and v1

2021-06-01 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-12872:

Description: 
Message format v2 was introduced in Apache Kafka 0.11.0 (released in June 2017) 
via 
[KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP98ExactlyOnceDeliveryandTransactionalMessaging-MessageFormat]
 and has been the default since. It includes a number of enhancements 
(partition leader epoch, sequence ids, producer ids, record headers) required 
for correctness 
([KIP-101|https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation],
 
[KIP-279|https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over],
 
[KIP-320|https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation]),
 stronger semantics (idempotent producers, transactional clients) and other 
features ([KIP-82 - Add Record 
Headers|https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers],
 [KIP-392: Allow consumers to fetch from closest 
replica|https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]).

Four years later, it's time to sunset message formats v0 and v1 to establish a 
new baseline in terms of supported client/broker behavior and to improve 
maintainability & supportability of Kafka. This also aligns with 
[KIP-679|https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default],
 which will enable the idempotent producer by default in Apache Kafka 3.0 (and 
requires message format v2). We propose the deprecation of message formats v0 
and v1 in Apache Kafka 3.0 and their removal in Apache Kafka 4.0.

 

KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-724%3A+Drop+support+for+message+formats+v0+and+v1

> KIP-724: Drop support for message formats v0 and v1
> ---
>
> Key: KAFKA-12872
> URL: https://issues.apache.org/jira/browse/KAFKA-12872
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
>
> Message format v2 was introduced in Apache Kafka 0.11.0 (released in June 
> 2017) via 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP98ExactlyOnceDeliveryandTransactionalMessaging-MessageFormat]
>  and has been the default since. It includes a number of enhancements 
> (partition leader epoch, sequence ids, producer ids, record headers) required 
> for correctness 
> ([KIP-101|https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation],
>  
> [KIP-279|https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over],
>  
> [KIP-320|https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation]),
>  stronger semantics (idempotent producers, transactional clients) and other 
> features ([KIP-82 - Add Record 
> Headers|https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers],
>  [KIP-392: Allow consumers to fetch from closest 
> replica|https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]).
> Four years later, it's time to sunset message formats v0 and v1 to establish 
> a new baseline in terms of supported client/broker behavior and to improve 
> maintainability & supportability of Kafka. This also aligns with 
> [KIP-679|https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default],
>  which will enable the idempotent producer by default in Apache Kafka 3.0 
> (and requires message format v2). We propose the deprecation of message 
> formats v0 and v1 in Apache Kafka 3.0 and their removal in Apache Kafka 4.0.
>  
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-724%3A+Drop+support+for+message+formats+v0+and+v1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12872) KIP-724: Drop support for message formats v0 and v1

2021-06-01 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-12872:

Description: 
Message format v2 was introduced in Apache Kafka 0.11.0 (released in June 2017) 
via 
[KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP98ExactlyOnceDeliveryandTransactionalMessaging-MessageFormat]
 and has been the default since. It includes a number of enhancements 
(partition leader epoch, sequence ids, producer ids, record headers) required 
for correctness 
([KIP-101|https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation],
 
[KIP-279|https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over],
 
[KIP-320|https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation]),
 stronger semantics (idempotent producers, transactional clients) and other 
features ([KIP-82 - Add Record 
Headers|https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers],
 [KIP-392: Allow consumers to fetch from closest 
replica|https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]).

Four years later, it's time to sunset message formats v0 and v1 to establish a 
new baseline in terms of supported client/broker behavior and to improve 
maintainability & supportability of Kafka. This also aligns with 
[KIP-679|https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default],
 which will enable the idempotent producer by default in Apache Kafka 3.0 (and 
requires message format v2). We propose the deprecation of message formats v0 
and v1 in Apache Kafka 3.0 and their removal in Apache Kafka 4.0.

KIP: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-724%3A+Drop+support+for+message+formats+v0+and+v1]

  was:
Message format v2 was introduced in Apache Kafka 0.11.0 (released in June 2017) 
via 
[KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP98ExactlyOnceDeliveryandTransactionalMessaging-MessageFormat]
 and has been the default since. It includes a number of enhancements 
(partition leader epoch, sequence ids, producer ids, record headers) required 
for correctness 
([KIP-101|https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation],
 
[KIP-279|https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over],
 
[KIP-320|https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation]),
 stronger semantics (idempotent producers, transactional clients) and other 
features ([KIP-82 - Add Record 
Headers|https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers],
 [KIP-392: Allow consumers to fetch from closest 
replica|https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]).

Four years later, it's time to sunset message formats v0 and v1 to establish a 
new baseline in terms of supported client/broker behavior and to improve 
maintainability & supportability of Kafka. This also aligns with 
[KIP-679|https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default],
 which will enable the idempotent producer by default in Apache Kafka 3.0 (and 
requires message format v2). We propose the deprecation of message formats v0 
and v1 in Apache Kafka 3.0 and their removal in Apache Kafka 4.0.

 

KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-724%3A+Drop+support+for+message+formats+v0+and+v1


> KIP-724: Drop support for message formats v0 and v1
> ---
>
> Key: KAFKA-12872
> URL: https://issues.apache.org/jira/browse/KAFKA-12872
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
>
> Message format v2 was introduced in Apache Kafka 0.11.0 (released in June 
> 2017) via 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP98ExactlyOnceDeliveryandTransactionalMessaging-MessageFormat]
>  and has been the default since. It includes a number of enhancements 
> (partition leader epoch, sequence ids, producer ids, record headers) required 
> for correctness 
> ([KIP-101|https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replicatio

[jira] [Updated] (KAFKA-12872) KIP-724: Drop support for message formats v0 and v1

2021-06-01 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-12872:

Labels: kip  (was: )

> KIP-724: Drop support for message formats v0 and v1
> ---
>
> Key: KAFKA-12872
> URL: https://issues.apache.org/jira/browse/KAFKA-12872
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
>  Labels: kip
>
> Message format v2 was introduced in Apache Kafka 0.11.0 (released in June 
> 2017) via 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP98ExactlyOnceDeliveryandTransactionalMessaging-MessageFormat]
>  and has been the default since. It includes a number of enhancements 
> (partition leader epoch, sequence ids, producer ids, record headers) required 
> for correctness 
> ([KIP-101|https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation],
>  
> [KIP-279|https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over],
>  
> [KIP-320|https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation]),
>  stronger semantics (idempotent producers, transactional clients) and other 
> features ([KIP-82 - Add Record 
> Headers|https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers],
>  [KIP-392: Allow consumers to fetch from closest 
> replica|https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica]).
> Four years later, it's time to sunset message formats v0 and v1 to establish 
> a new baseline in terms of supported client/broker behavior and to improve 
> maintainability & supportability of Kafka. This also aligns with 
> [KIP-679|https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default],
>  which will enable the idempotent producer by default in Apache Kafka 3.0 
> (and requires message format v2). We propose the deprecation of message 
> formats v0 and v1 in Apache Kafka 3.0 and their removal in Apache Kafka 4.0.
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-724%3A+Drop+support+for+message+formats+v0+and+v1]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on pull request #10590: KAFKA-5761: support ByteBuffer as value in ProducerRecord and avoid redundant serialization when it's used

2021-06-01 Thread GitBox


jolshan commented on pull request #10590:
URL: https://github.com/apache/kafka/pull/10590#issuecomment-852237161


   Seems reasonable to me. One small thing I would say is to add comments to 
`tryAppend` and `append` like the versions that use the byte[] parameter. And 
of course, we will still need a committer to review this code before it can be 
merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException

2021-06-01 Thread GitBox


jsancio commented on a change in pull request #10749:
URL: https://github.com/apache/kafka/pull/10749#discussion_r643303043



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -372,27 +371,23 @@ private void maybeFireLeaderChange() {
 
 @Override
 public void initialize() {
-try {
-quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, 
log.lastFetchedEpoch()));
+quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, 
log.lastFetchedEpoch()));
 
-long currentTimeMs = time.milliseconds();
-if (quorum.isLeader()) {
-throw new IllegalStateException("Voter cannot initialize as a 
Leader");
-} else if (quorum.isCandidate()) {
-onBecomeCandidate(currentTimeMs);
-} else if (quorum.isFollower()) {
-onBecomeFollower(currentTimeMs);
-}
+long currentTimeMs = time.milliseconds();
+if (quorum.isLeader()) {
+throw new IllegalStateException("Voter cannot initialize as a 
Leader");
+} else if (quorum.isCandidate()) {
+onBecomeCandidate(currentTimeMs);
+} else if (quorum.isFollower()) {
+onBecomeFollower(currentTimeMs);
+}
 
-// When there is only a single voter, become candidate immediately
-if (quorum.isVoter()
-&& quorum.remoteVoters().isEmpty()
-&& !quorum.isCandidate()) {
+// When there is only a single voter, become candidate immediately
+if (quorum.isVoter()
+&& quorum.remoteVoters().isEmpty()
+&& !quorum.isCandidate()) {
 
-transitionToCandidate(currentTimeMs);
-}
-} catch (IOException e) {
-throw new RuntimeException(e);
+transitionToCandidate(currentTimeMs);

Review comment:
   This is semantically different from the previous code but I think it is 
okay. If `initialize` throws an io exception the JVM process for the broker 
will terminate as this is called in the `RaftManager` constructor which is 
indirectly called from `kafka.Kafka`'s `main`.

##
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##
@@ -463,9 +464,14 @@ public void transitionToCandidate() throws IOException {
 return state;
 }
 
-private void transitionTo(EpochState state) throws IOException {
+private void transitionTo(EpochState state) {
 if (this.state != null) {
-this.state.close();
+try {
+this.state.close();
+} catch (IOException e) {
+throw new UncheckedIOException(
+"Failed to close Quorum status during transition to 
candidate, state is " + this.state, e);

Review comment:
   How about "Failed to transition from ${this.state.name()} to 
${state.name()}"? Note that this string is  not valid Java.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-06-01 Thread GitBox


mumrah commented on a change in pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#discussion_r643376055



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2276,6 +2277,24 @@ public void resign(int epoch) {
 );
 }
 
+private void validateSnapshotId(OffsetAndEpoch snapshotId) {

Review comment:
   Can we make the comparisons with `snapshotId` consistently on the 
left-hand side?
   
   E.g., 
   ```
   snapshotId.offset > highWatermarkOpt.get().offset
   ```
   
   Makes it a bit easier to parse this logic.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -2276,6 +2277,24 @@ public void resign(int epoch) {
 );
 }
 
+private void validateSnapshotId(OffsetAndEpoch snapshotId) {
+Optional highWatermarkOpt = 
quorum().highWatermark();
+if (!highWatermarkOpt.isPresent() || highWatermarkOpt.get().offset < 
snapshotId.offset) {
+throw new IllegalArgumentException("Trying to creating snapshot 
with invalid snapshotId: " + snapshotId + " whose offset is larger than the 
high-watermark: " +

Review comment:
   This might be better to split into two checks with different error 
messages. This would also let us avoid logging an `Optional`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-4793) Kafka Connect: POST /connectors/(string: name)/restart doesn't start failed tasks

2021-06-01 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch reassigned KAFKA-4793:


Assignee: Randall Hauch

> Kafka Connect: POST /connectors/(string: name)/restart doesn't start failed 
> tasks
> -
>
> Key: KAFKA-4793
> URL: https://issues.apache.org/jira/browse/KAFKA-4793
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Gwen Shapira
>Assignee: Randall Hauch
>Priority: Major
>  Labels: needs-kip
>
> Sometimes tasks stop due to repeated failures. Users will want to restart the 
> connector and have it retry after fixing an issue. 
> We expected "POST /connectors/(string: name)/restart" to cause retry of 
> failed tasks, but this doesn't appear to be the case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12873) Log truncation due to divergence should also remove snapshots

2021-06-01 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-12873:
--

 Summary: Log truncation due to divergence should also remove 
snapshots
 Key: KAFKA-12873
 URL: https://issues.apache.org/jira/browse/KAFKA-12873
 Project: Kafka
  Issue Type: Sub-task
  Components: log
Reporter: Jose Armando Garcia Sancio


It should not be possible for log truncation to truncate past the 
high-watermark and we know that snapshots are less than the high-watermark.

Having said that I think we should add code that removes any snapshot that is 
greater than the log end offset after a log truncation. Currently the code that 
does log truncation is in `KafkaMetadataLog::truncateTo`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-06-01 Thread GitBox


jsancio commented on pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#issuecomment-852353685


   > Are there any concerns with multi-threading here? I think the answer is 
"no". Any thoughts @feyman2016 or @jsancio?
   
   Good point. There may be some concurrency issue with `createSnapshot`. How 
about moving the validation to `onSnapshotFrozen`?
   
   This conversation also reminded me to create this issue: 
https://issues.apache.org/jira/browse/KAFKA-12873. It is slightly related to 
this point.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-7749) confluent does not provide option to set consumer properties at connector level

2021-06-01 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-7749.
--
Fix Version/s: 2.3.0
   Resolution: Fixed

[KIP-458|https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy]
 introduced in AK 2.3.0 added support for connector-specific client overrides 
like the one described here.

Marking as resolved.

> confluent does not provide option to set consumer properties at connector 
> level
> ---
>
> Key: KAFKA-7749
> URL: https://issues.apache.org/jira/browse/KAFKA-7749
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Manjeet Duhan
>Priority: Major
> Fix For: 2.3.0
>
>
> _We want to increase consumer.max.poll.record to increase performance but 
> this  value can only be set in worker properties which is applicable to all 
> connectors given cluster._
>  __ 
> _Operative Situation :- We have one project which is communicating with 
> Elasticsearch and we set consumer.max.poll.record=500 after multiple 
> performance tests which worked fine for an year._
>  _Then one more project onboarded in the same cluster which required 
> consumer.max.poll.record=5000 based on their performance tests. This 
> configuration is moved to production._
>   _Admetric started failing as it was taking more than 5 minutes to process 
> 5000 polled records and started throwing commitfailed exception which is 
> vicious cycle as it will process same data over and over again._
>  __ 
> _We can control above if start consumer using plain java but this control was 
> not available at each consumer level in confluent connector._
> _I have overridden kafka code to accept connector properties which will be 
> applied to single connector and others will keep on using default properties 
> . These changes are already running in production for more than 5 months._
> _Some of the properties which were useful for us._
> max.poll.records
> max.poll.interval.ms
> request.timeout.ms
> key.deserializer
> value.deserializer
> heartbeat.interval.ms
> session.timeout.ms
> auto.offset.reset
> connections.max.idle.ms
> enable.auto.commit
>  
> auto.commit.interval.ms
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch commented on pull request #3749: [KAFKA-4793] Restart tasks for connector endpoint

2021-06-01 Thread GitBox


rhauch commented on pull request #3749:
URL: https://github.com/apache/kafka/pull/3749#issuecomment-852355451


   This is an API change, which requires a KIP. Closing this without merging. 
Also, I think there are better ways to address this and improve the API.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch closed pull request #3749: [KAFKA-4793] Restart tasks for connector endpoint

2021-06-01 Thread GitBox


rhauch closed pull request #3749:
URL: https://github.com/apache/kafka/pull/3749


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-4793) Kafka Connect: POST /connectors/(string: name)/restart doesn't start failed tasks

2021-06-01 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17355287#comment-17355287
 ] 

Randall Hauch commented on KAFKA-4793:
--

I'm picking this up, and will add a KIP that proposes adding query parameters 
to the existing `/connectors/{name}/restart` API. One query parameter will 
specify whether to also include tasks when determining the instances to 
restart, and another query parameter that will specify whether to restart only 
failed instances.

I will post the link to the KIP here as soon as I've created it.

> Kafka Connect: POST /connectors/(string: name)/restart doesn't start failed 
> tasks
> -
>
> Key: KAFKA-4793
> URL: https://issues.apache.org/jira/browse/KAFKA-4793
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Gwen Shapira
>Assignee: Randall Hauch
>Priority: Major
>  Labels: needs-kip
>
> Sometimes tasks stop due to repeated failures. Users will want to restart the 
> connector and have it retry after fixing an issue. 
> We expected "POST /connectors/(string: name)/restart" to cause retry of 
> failed tasks, but this doesn't appear to be the case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12874) Increase default consumer session timeout to 40s (KIP-735)

2021-06-01 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12874:
---

 Summary: Increase default consumer session timeout to 40s (KIP-735)
 Key: KAFKA-12874
 URL: https://issues.apache.org/jira/browse/KAFKA-12874
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson


As documented in KIP-735, we will increase the default session timeout to 40s: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-735%3A+Increase+default+consumer+session+timeout.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mattwong949 commented on pull request #9915: [KAKFA-8522] Streamline tombstone and transaction marker removal

2021-06-01 Thread GitBox


mattwong949 commented on pull request #9915:
URL: https://github.com/apache/kafka/pull/9915#issuecomment-852379058


   @ConcurrencyPractitioner Could you merge trunk back into this PR? I can help 
provide a review on top of what @junrao was able to help with on the other 
previous PR 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12874) Increase default consumer session timeout to 45s (KIP-735)

2021-06-01 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12874:

Summary: Increase default consumer session timeout to 45s (KIP-735)  (was: 
Increase default consumer session timeout to 40s (KIP-735))

> Increase default consumer session timeout to 45s (KIP-735)
> --
>
> Key: KAFKA-12874
> URL: https://issues.apache.org/jira/browse/KAFKA-12874
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> As documented in KIP-735, we will increase the default session timeout to 
> 40s: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-735%3A+Increase+default+consumer+session+timeout.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12874) Increase default consumer session timeout to 45s (KIP-735)

2021-06-01 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12874:

Description: As documented in KIP-735, we will increase the default session 
timeout to 45s: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-735%3A+Increase+default+consumer+session+timeout.
  (was: As documented in KIP-735, we will increase the default session timeout 
to 40s: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-735%3A+Increase+default+consumer+session+timeout.)

> Increase default consumer session timeout to 45s (KIP-735)
> --
>
> Key: KAFKA-12874
> URL: https://issues.apache.org/jira/browse/KAFKA-12874
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> As documented in KIP-735, we will increase the default session timeout to 
> 45s: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-735%3A+Increase+default+consumer+session+timeout.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji opened a new pull request #10803: KAFKA-12874; Increase default consumer session timeout to 45s

2021-06-01 Thread GitBox


hachikuji opened a new pull request #10803:
URL: https://github.com/apache/kafka/pull/10803


   This patch increases the default consumer session timeout to 45s as 
documented in KIP-735: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-735%3A+Increase+default+consumer+session+timeout.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #10803: KAFKA-12874; Increase default consumer session timeout to 45s

2021-06-01 Thread GitBox


hachikuji commented on pull request #10803:
URL: https://github.com/apache/kafka/pull/10803#issuecomment-852392818


   @mumrah We'll have to see. It wouldn't surprise me if we need a little 
additional tuning.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-01 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r643433770



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1500,50 +1325,67 @@ class Log(@volatile private var _dir: File,
   }
 }
 
-deleteOldSegments(shouldDelete, RetentionSizeBreach)
+deleteOldSegments(shouldDelete, RetentionSizeBreach(this))
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
   nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
 }
 
-deleteOldSegments(shouldDelete, StartOffsetBreach)
+deleteOldSegments(shouldDelete, StartOffsetBreach(this))
   }
 
   def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix)
 
   /**
* The size of the log in bytes
*/
-  def size: Long = Log.sizeInBytes(logSegments)
+  def size: Long = localLog.segments.sizeInBytes
 
   /**
-   * The offset metadata of the next message that will be appended to the log
+   * The offset of the next message that will be appended to the log
*/
-  def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
+  def logEndOffset: Long =  localLog.logEndOffset
 
   /**
-   * The offset of the next message that will be appended to the log
+   * The offset metadata of the next message that will be appended to the log
*/
-  def logEndOffset: Long = nextOffsetMetadata.messageOffset
+  def logEndOffsetMetadata: LogOffsetMetadata = localLog.logEndOffsetMetadata
+
+  private val rollAction = RollAction(
+preRollAction = (newSegment: LogSegment) => {
+  // Take a snapshot of the producer state to facilitate recovery. It is 
useful to have the snapshot
+  // offset align with the new segment offset since this ensures we can 
recover the segment by beginning
+  // with the corresponding snapshot file and scanning the segment data. 
Because the segment base offset
+  // may actually be ahead of the current producer state end offset (which 
corresponds to the log end offset),
+  // we manually override the state offset here prior to taking the 
snapshot.
+  producerStateManager.updateMapEndOffset(newSegment.baseOffset)
+  producerStateManager.takeSnapshot()
+},
+postRollAction = (newSegment: LogSegment, deletedSegment: 
Option[LogSegment]) => {
+  deletedSegment.foreach(segment => 
deleteProducerSnapshotAsync(Seq(segment)))

Review comment:
   This is a great catch. I agree with you. While I can address it in this 
PR, should we create a separate JIRA for it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-01 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r643436284



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1500,50 +1325,67 @@ class Log(@volatile private var _dir: File,
   }
 }
 
-deleteOldSegments(shouldDelete, RetentionSizeBreach)
+deleteOldSegments(shouldDelete, RetentionSizeBreach(this))
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
   nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
 }
 
-deleteOldSegments(shouldDelete, StartOffsetBreach)
+deleteOldSegments(shouldDelete, StartOffsetBreach(this))
   }
 
   def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix)
 
   /**
* The size of the log in bytes
*/
-  def size: Long = Log.sizeInBytes(logSegments)
+  def size: Long = localLog.segments.sizeInBytes
 
   /**
-   * The offset metadata of the next message that will be appended to the log
+   * The offset of the next message that will be appended to the log
*/
-  def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
+  def logEndOffset: Long =  localLog.logEndOffset
 
   /**
-   * The offset of the next message that will be appended to the log
+   * The offset metadata of the next message that will be appended to the log
*/
-  def logEndOffset: Long = nextOffsetMetadata.messageOffset
+  def logEndOffsetMetadata: LogOffsetMetadata = localLog.logEndOffsetMetadata
+
+  private val rollAction = RollAction(

Review comment:
   Great point. I agree with you that `RollAction` is avoidable and it 
complicates the code a bit. The main part I wasn't sure about was why 
previously in the code we chose to take the producer state snapshot before the 
segment was rolled, instead of doing it after. Am I right that taking the 
snapshot prior to the roll is not a strict requirement for crash safety? (I'm 
not able to think of a case myself, but wanted to double check)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Moovlin closed pull request #10718: KAFKA-12811: kafka-topics.sh should let the user know they cannot adjust the replication factor for a topic using the --alter flag and not warn abo

2021-06-01 Thread GitBox


Moovlin closed pull request #10718:
URL: https://github.com/apache/kafka/pull/10718


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-01 Thread GitBox


junrao commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r643443877



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1500,50 +1325,67 @@ class Log(@volatile private var _dir: File,
   }
 }
 
-deleteOldSegments(shouldDelete, RetentionSizeBreach)
+deleteOldSegments(shouldDelete, RetentionSizeBreach(this))
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
   nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
 }
 
-deleteOldSegments(shouldDelete, StartOffsetBreach)
+deleteOldSegments(shouldDelete, StartOffsetBreach(this))
   }
 
   def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix)
 
   /**
* The size of the log in bytes
*/
-  def size: Long = Log.sizeInBytes(logSegments)
+  def size: Long = localLog.segments.sizeInBytes
 
   /**
-   * The offset metadata of the next message that will be appended to the log
+   * The offset of the next message that will be appended to the log
*/
-  def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
+  def logEndOffset: Long =  localLog.logEndOffset
 
   /**
-   * The offset of the next message that will be appended to the log
+   * The offset metadata of the next message that will be appended to the log
*/
-  def logEndOffset: Long = nextOffsetMetadata.messageOffset
+  def logEndOffsetMetadata: LogOffsetMetadata = localLog.logEndOffsetMetadata
+
+  private val rollAction = RollAction(
+preRollAction = (newSegment: LogSegment) => {
+  // Take a snapshot of the producer state to facilitate recovery. It is 
useful to have the snapshot
+  // offset align with the new segment offset since this ensures we can 
recover the segment by beginning
+  // with the corresponding snapshot file and scanning the segment data. 
Because the segment base offset
+  // may actually be ahead of the current producer state end offset (which 
corresponds to the log end offset),
+  // we manually override the state offset here prior to taking the 
snapshot.
+  producerStateManager.updateMapEndOffset(newSegment.baseOffset)
+  producerStateManager.takeSnapshot()
+},
+postRollAction = (newSegment: LogSegment, deletedSegment: 
Option[LogSegment]) => {
+  deletedSegment.foreach(segment => 
deleteProducerSnapshotAsync(Seq(segment)))

Review comment:
   We could fix this in a separate jira too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-01 Thread GitBox


junrao commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r643445208



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1500,50 +1325,67 @@ class Log(@volatile private var _dir: File,
   }
 }
 
-deleteOldSegments(shouldDelete, RetentionSizeBreach)
+deleteOldSegments(shouldDelete, RetentionSizeBreach(this))
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
   nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
 }
 
-deleteOldSegments(shouldDelete, StartOffsetBreach)
+deleteOldSegments(shouldDelete, StartOffsetBreach(this))
   }
 
   def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix)
 
   /**
* The size of the log in bytes
*/
-  def size: Long = Log.sizeInBytes(logSegments)
+  def size: Long = localLog.segments.sizeInBytes
 
   /**
-   * The offset metadata of the next message that will be appended to the log
+   * The offset of the next message that will be appended to the log
*/
-  def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
+  def logEndOffset: Long =  localLog.logEndOffset
 
   /**
-   * The offset of the next message that will be appended to the log
+   * The offset metadata of the next message that will be appended to the log
*/
-  def logEndOffset: Long = nextOffsetMetadata.messageOffset
+  def logEndOffsetMetadata: LogOffsetMetadata = localLog.logEndOffsetMetadata
+
+  private val rollAction = RollAction(

Review comment:
   Right, I don't think there is a particular reason that we have to take a 
snapshot before adding a new empty segment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-01 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r643446123



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1572,144 +1414,69 @@ class Log(@volatile private var _dir: File,
 .map(_.messageOffset)
 .getOrElse(maxOffsetInMessages - Integer.MAX_VALUE)
 
-  roll(Some(rollOffset))
+  localLog.roll(Some(rollOffset), Some(rollAction))
 } else {
   segment
 }
   }
 
   /**
-   * Roll the log over to a new active segment starting with the current 
logEndOffset.
+   * Roll the local log over to a new active segment starting with the current 
logEndOffset.
* This will trim the index to the exact size of the number of entries it 
currently contains.
*
* @return The newly rolled segment
*/
   def roll(expectedNextOffset: Option[Long] = None): LogSegment = {
-maybeHandleIOException(s"Error while rolling log segment for 
$topicPartition in dir ${dir.getParent}") {
-  val start = time.hiResClockMs()
-  lock synchronized {
-checkIfMemoryMappedBufferClosed()
-val newOffset = math.max(expectedNextOffset.getOrElse(0L), 
logEndOffset)
-val logFile = Log.logFile(dir, newOffset)
-
-if (segments.contains(newOffset)) {
-  // segment with the same base offset already exists and loaded
-  if (activeSegment.baseOffset == newOffset && activeSegment.size == 
0) {
-// We have seen this happen (see KAFKA-6388) after shouldRoll() 
returns true for an
-// active segment of size zero because of one of the indexes is 
"full" (due to _maxEntries == 0).
-warn(s"Trying to roll a new log segment with start offset 
$newOffset " +
- s"=max(provided offset = $expectedNextOffset, LEO = 
$logEndOffset) while it already " +
- s"exists and is active with size 0. Size of time index: 
${activeSegment.timeIndex.entries}," +
- s" size of offset index: 
${activeSegment.offsetIndex.entries}.")
-removeAndDeleteSegments(Seq(activeSegment), asyncDelete = true, 
LogRoll)
-  } else {
-throw new KafkaException(s"Trying to roll a new log segment for 
topic partition $topicPartition with start offset $newOffset" +
- s" =max(provided offset = 
$expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " +
- s"segment is ${segments.get(newOffset)}.")
-  }
-} else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) {
-  throw new KafkaException(
-s"Trying to roll a new log segment for topic partition 
$topicPartition with " +
-s"start offset $newOffset =max(provided offset = 
$expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active 
segment $activeSegment")
-} else {
-  val offsetIdxFile = offsetIndexFile(dir, newOffset)
-  val timeIdxFile = timeIndexFile(dir, newOffset)
-  val txnIdxFile = transactionIndexFile(dir, newOffset)
-
-  for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) 
if file.exists) {
-warn(s"Newly rolled segment file ${file.getAbsolutePath} already 
exists; deleting it first")
-Files.delete(file.toPath)
-  }
-
-  segments.lastSegment.foreach(_.onBecomeInactiveSegment())
-}
-
-// take a snapshot of the producer state to facilitate recovery. It is 
useful to have the snapshot
-// offset align with the new segment offset since this ensures we can 
recover the segment by beginning
-// with the corresponding snapshot file and scanning the segment data. 
Because the segment base offset
-// may actually be ahead of the current producer state end offset 
(which corresponds to the log end offset),
-// we manually override the state offset here prior to taking the 
snapshot.
-producerStateManager.updateMapEndOffset(newOffset)
-producerStateManager.takeSnapshot()
-
-val segment = LogSegment.open(dir,
-  baseOffset = newOffset,
-  config,
-  time = time,
-  initFileSize = config.initFileSize,
-  preallocate = config.preallocate)
-addSegment(segment)
-
-// We need to update the segment base offset and append position data 
of the metadata when log rolls.
-// The next offset should not change.
-updateLogEndOffset(nextOffsetMetadata.messageOffset)
-
-// schedule an asynchronous flush of the old segment
-scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
-
-info(s"Rolled new log segment at offset $newOffset in 
${time.hiResClockMs() - start} ms.")
-
-segment
-  }
+lock synchronized {
+  localLog.roll(expectedNextOffset, Some(rollAction))
 }
   }
 
   /**
* The number of message

[GitHub] [kafka] Moovlin commented on pull request #10718: KAFKA-12811: kafka-topics.sh should let the user know they cannot adjust the replication factor for a topic using the --alter flag and not wa

2021-06-01 Thread GitBox


Moovlin commented on pull request #10718:
URL: https://github.com/apache/kafka/pull/10718#issuecomment-852414008


   Seems I've done some bad things here in an effort to resolve issues. Let me 
figure out what I actually want need to do to resolve the issue with the PR. I 
apologize for the messiness here. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-01 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r643449279



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1852,65 +1612,24 @@ class Log(@volatile private var _dir: File,
 logString.toString
   }
 
-  /**
-   * This method deletes the given log segments by doing the following for 
each of them:
-   * 
-   *   It removes the segment from the segment map so that it will no 
longer be used for reads.
-   *   It renames the index and log files by appending .deleted to the 
respective file name
-   *   It can either schedule an asynchronous delete operation to occur in 
the future or perform the deletion synchronously
-   * 
-   * Asynchronous deletion allows reads to happen concurrently without 
synchronization and without the possibility of
-   * physically deleting a file while it is being read.
-   *
-   * This method does not need to convert IOException to KafkaStorageException 
because it is either called before all logs are loaded
-   * or the immediate caller will catch and handle IOException
-   *
-   * @param segments The log segments to schedule for deletion
-   * @param asyncDelete Whether the segment files should be deleted 
asynchronously
-   */
-  private def removeAndDeleteSegments(segments: Iterable[LogSegment],
-  asyncDelete: Boolean,
-  reason: SegmentDeletionReason): Unit = {
-if (segments.nonEmpty) {
-  lock synchronized {
-// As most callers hold an iterator into the `segments` collection and 
`removeAndDeleteSegment` mutates it by
-// removing the deleted segment, we should force materialization of 
the iterator here, so that results of the
-// iteration remain valid and deterministic.
-val toDelete = segments.toList
-reason.logReason(this, toDelete)
-toDelete.foreach { segment =>
-  this.segments.remove(segment.baseOffset)
-}
-deleteSegmentFiles(toDelete, asyncDelete)
-  }
-}
-  }
-
-  private def deleteSegmentFiles(segments: Iterable[LogSegment], asyncDelete: 
Boolean, deleteProducerStateSnapshots: Boolean = true): Unit = {
-Log.deleteSegmentFiles(segments, asyncDelete, 
deleteProducerStateSnapshots, dir, topicPartition,
-  config, scheduler, logDirFailureChannel, producerStateManager, 
this.logIdent)
-  }
-
   private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: 
Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = {

Review comment:
   Great catch, I'll fix this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-01 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r643450237



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -246,17 +262,17 @@ object LogLoader extends Logging {
 return fn
   } catch {
 case e: LogSegmentOffsetOverflowException =>
-  info(s"${params.logIdentifier}Caught segment overflow error: 
${e.getMessage}. Split segment and retry.")
-  Log.splitOverflowedSegment(
+  info(s"${params.logIdentifier} Caught segment overflow error: 
${e.getMessage}. Split segment and retry.")
+  val result = Log.splitOverflowedSegment(
 e.segment,
 params.segments,
 params.dir,
 params.topicPartition,
 params.config,
 params.scheduler,
 params.logDirFailureChannel,
-params.producerStateManager,
 params.logIdentifier)
+  deleteProducerSnapshotsAsync(result.deletedSegments, params)

Review comment:
   Sounds good. It appears straightforward to just skip deleting the 
snapshot here, I can leave a comment explaining why.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-01 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r643450237



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -246,17 +262,17 @@ object LogLoader extends Logging {
 return fn
   } catch {
 case e: LogSegmentOffsetOverflowException =>
-  info(s"${params.logIdentifier}Caught segment overflow error: 
${e.getMessage}. Split segment and retry.")
-  Log.splitOverflowedSegment(
+  info(s"${params.logIdentifier} Caught segment overflow error: 
${e.getMessage}. Split segment and retry.")
+  val result = Log.splitOverflowedSegment(
 e.segment,
 params.segments,
 params.dir,
 params.topicPartition,
 params.config,
 params.scheduler,
 params.logDirFailureChannel,
-params.producerStateManager,
 params.logIdentifier)
+  deleteProducerSnapshotsAsync(result.deletedSegments, params)

Review comment:
   Sounds good. Great catch. It appears straightforward to just skip 
deleting the snapshot here, I can leave a comment explaining why.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-01 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r643456815



##
File path: core/src/test/scala/unit/kafka/log/LocalLogTest.scala
##
@@ -0,0 +1,734 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import java.nio.channels.ClosedChannelException
+import java.nio.charset.StandardCharsets
+import java.util.regex.Pattern
+import java.util.{Collections, Properties}
+
+import kafka.server.{FetchDataInfo, KafkaConfig, LogDirFailureChannel, 
LogOffsetMetadata}
+import kafka.utils.{MockTime, Scheduler, TestUtils}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record, 
SimpleRecord}
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.junit.jupiter.api.Assertions.{assertFalse, _}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.jdk.CollectionConverters._
+
+class LocalLogTest {
+
+  import kafka.log.LocalLogTest._
+
+  var config: KafkaConfig = null
+  val tmpDir: File = TestUtils.tempDir()
+  val logDir: File = TestUtils.randomPartitionLogDir(tmpDir)
+  val topicPartition = new TopicPartition("test_topic", 1)
+  val logDirFailureChannel = new LogDirFailureChannel(10)
+  val mockTime = new MockTime()
+  val log: LocalLog = createLocalLogWithActiveSegment(config = 
createLogConfig())
+
+  @BeforeEach
+  def setUp(): Unit = {
+val props = TestUtils.createBrokerConfig(0, "127.0.0.1:1", port = -1)
+config = KafkaConfig.fromProps(props)
+  }
+
+  @AfterEach
+  def tearDown(): Unit = {
+if (!log.isMemoryMappedBufferClosed) {

Review comment:
   Is the concern here that we are accessing the private attribute?
   In certain tests, `log.checkIfMemoryMappedBufferClosed()` will raise an 
exception if the log has already been closed. If the private attribute access 
is a concern, I can change this to just call 
`log.checkIfMemoryMappedBufferClosed()` but ignore the `KafkaStorageException` 
thats raised.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-01 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r643464231



##
File path: core/src/test/scala/unit/kafka/log/LocalLogTest.scala
##
@@ -0,0 +1,734 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import java.nio.channels.ClosedChannelException
+import java.nio.charset.StandardCharsets
+import java.util.regex.Pattern
+import java.util.{Collections, Properties}
+
+import kafka.server.{FetchDataInfo, KafkaConfig, LogDirFailureChannel, 
LogOffsetMetadata}
+import kafka.utils.{MockTime, Scheduler, TestUtils}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record, 
SimpleRecord}
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.junit.jupiter.api.Assertions.{assertFalse, _}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.jdk.CollectionConverters._
+
+class LocalLogTest {
+
+  import kafka.log.LocalLogTest._
+
+  var config: KafkaConfig = null
+  val tmpDir: File = TestUtils.tempDir()
+  val logDir: File = TestUtils.randomPartitionLogDir(tmpDir)
+  val topicPartition = new TopicPartition("test_topic", 1)
+  val logDirFailureChannel = new LogDirFailureChannel(10)
+  val mockTime = new MockTime()
+  val log: LocalLog = createLocalLogWithActiveSegment(config = 
createLogConfig())
+
+  @BeforeEach
+  def setUp(): Unit = {
+val props = TestUtils.createBrokerConfig(0, "127.0.0.1:1", port = -1)
+config = KafkaConfig.fromProps(props)
+  }
+
+  @AfterEach
+  def tearDown(): Unit = {
+if (!log.isMemoryMappedBufferClosed) {
+  log.close()
+}
+Utils.delete(tmpDir)
+  }
+
+  case class KeyValue(key: String, value: String) {
+def toRecord(timestamp: => Long = mockTime.milliseconds): SimpleRecord = {
+  new SimpleRecord(timestamp, key.getBytes, value.getBytes)
+}
+  }
+
+  object KeyValue {
+def fromRecord(record: Record): KeyValue = {
+  val key =
+if (record.hasKey)
+  StandardCharsets.UTF_8.decode(record.key()).toString
+else
+  ""
+  val value =
+if (record.hasValue)
+  StandardCharsets.UTF_8.decode(record.value()).toString
+else
+  ""
+  KeyValue(key, value)
+}
+  }
+
+  private def kvsToRecords(keyValues: Iterable[KeyValue]): 
Iterable[SimpleRecord] = {
+keyValues.map(kv => kv.toRecord())
+  }
+
+  private def recordsToKvs(records: Iterable[Record]): Iterable[KeyValue] = {
+records.map(r => KeyValue.fromRecord(r))
+  }
+
+  private def appendRecords(records: Iterable[SimpleRecord],
+log: LocalLog = log,
+initialOffset: Long = 0L): Unit = {
+log.append(lastOffset = initialOffset + records.size - 1,
+  largestTimestamp = records.head.timestamp,
+  shallowOffsetOfMaxTimestamp = initialOffset,
+  records = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, 
0, records.toList : _*))
+  }
+
+  private def readRecords(log: LocalLog = log,
+  startOffset: Long = 0L,
+  maxLength: => Int = log.segments.activeSegment.size,
+  minOneMessage: Boolean = false,
+  maxOffsetMetadata: => LogOffsetMetadata = 
log.logEndOffsetMetadata,
+  includeAbortedTxns: Boolean = false): FetchDataInfo 
= {
+log.read(startOffset,
+ maxLength,
+ minOneMessage = minOneMessage,
+ maxOffsetMetadata,
+ includeAbortedTxns = includeAbortedTxns)
+  }
+
+  @Test
+  def testLogDeleteSuccess(): Unit = {
+val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
+appendRecords(List(record))
+log.roll()
+assertEquals(2, log.segments.numberOfSegments)
+assertFalse(logDir.listFiles.isEmpty)
+val segmentsBeforeDelete = List[LogSegment]() ++ log.segments.values
+val deletedSegments = log.delete()
+assertTrue(log.segments.isEmpty)
+assertEquals(segmentsBeforeDelete, deletedSegme

[GitHub] [kafka] kowshik commented on a change in pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-01 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r643465477



##
File path: core/src/test/scala/unit/kafka/log/LocalLogTest.scala
##
@@ -0,0 +1,734 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import java.nio.channels.ClosedChannelException
+import java.nio.charset.StandardCharsets
+import java.util.regex.Pattern
+import java.util.{Collections, Properties}
+
+import kafka.server.{FetchDataInfo, KafkaConfig, LogDirFailureChannel, 
LogOffsetMetadata}
+import kafka.utils.{MockTime, Scheduler, TestUtils}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record, 
SimpleRecord}
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.junit.jupiter.api.Assertions.{assertFalse, _}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.jdk.CollectionConverters._
+
+class LocalLogTest {
+
+  import kafka.log.LocalLogTest._
+
+  var config: KafkaConfig = null
+  val tmpDir: File = TestUtils.tempDir()
+  val logDir: File = TestUtils.randomPartitionLogDir(tmpDir)
+  val topicPartition = new TopicPartition("test_topic", 1)
+  val logDirFailureChannel = new LogDirFailureChannel(10)
+  val mockTime = new MockTime()
+  val log: LocalLog = createLocalLogWithActiveSegment(config = 
createLogConfig())
+
+  @BeforeEach
+  def setUp(): Unit = {
+val props = TestUtils.createBrokerConfig(0, "127.0.0.1:1", port = -1)
+config = KafkaConfig.fromProps(props)
+  }
+
+  @AfterEach
+  def tearDown(): Unit = {
+if (!log.isMemoryMappedBufferClosed) {
+  log.close()
+}
+Utils.delete(tmpDir)
+  }
+
+  case class KeyValue(key: String, value: String) {
+def toRecord(timestamp: => Long = mockTime.milliseconds): SimpleRecord = {
+  new SimpleRecord(timestamp, key.getBytes, value.getBytes)
+}
+  }
+
+  object KeyValue {
+def fromRecord(record: Record): KeyValue = {
+  val key =
+if (record.hasKey)
+  StandardCharsets.UTF_8.decode(record.key()).toString
+else
+  ""
+  val value =
+if (record.hasValue)
+  StandardCharsets.UTF_8.decode(record.value()).toString
+else
+  ""
+  KeyValue(key, value)
+}
+  }
+
+  private def kvsToRecords(keyValues: Iterable[KeyValue]): 
Iterable[SimpleRecord] = {
+keyValues.map(kv => kv.toRecord())
+  }
+
+  private def recordsToKvs(records: Iterable[Record]): Iterable[KeyValue] = {
+records.map(r => KeyValue.fromRecord(r))
+  }
+
+  private def appendRecords(records: Iterable[SimpleRecord],
+log: LocalLog = log,
+initialOffset: Long = 0L): Unit = {
+log.append(lastOffset = initialOffset + records.size - 1,
+  largestTimestamp = records.head.timestamp,
+  shallowOffsetOfMaxTimestamp = initialOffset,
+  records = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, 
0, records.toList : _*))
+  }
+
+  private def readRecords(log: LocalLog = log,
+  startOffset: Long = 0L,
+  maxLength: => Int = log.segments.activeSegment.size,
+  minOneMessage: Boolean = false,
+  maxOffsetMetadata: => LogOffsetMetadata = 
log.logEndOffsetMetadata,
+  includeAbortedTxns: Boolean = false): FetchDataInfo 
= {
+log.read(startOffset,
+ maxLength,
+ minOneMessage = minOneMessage,
+ maxOffsetMetadata,
+ includeAbortedTxns = includeAbortedTxns)
+  }
+
+  @Test
+  def testLogDeleteSuccess(): Unit = {
+val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
+appendRecords(List(record))
+log.roll()
+assertEquals(2, log.segments.numberOfSegments)
+assertFalse(logDir.listFiles.isEmpty)
+val segmentsBeforeDelete = List[LogSegment]() ++ log.segments.values
+val deletedSegments = log.delete()
+assertTrue(log.segments.isEmpty)
+assertEquals(segmentsBeforeDelete, deletedSegme

[GitHub] [kafka] bruto1 commented on pull request #10590: KAFKA-5761: support ByteBuffer as value in ProducerRecord and avoid redundant serialization when it's used

2021-06-01 Thread GitBox


bruto1 commented on pull request #10590:
URL: https://github.com/apache/kafka/pull/10590#issuecomment-852435745


   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12875) Change Log layer segment map mutations to avoid absence of active segment

2021-06-01 Thread Kowshik Prakasam (Jira)
Kowshik Prakasam created KAFKA-12875:


 Summary: Change Log layer segment map mutations to avoid absence 
of active segment
 Key: KAFKA-12875
 URL: https://issues.apache.org/jira/browse/KAFKA-12875
 Project: Kafka
  Issue Type: Improvement
Reporter: Kowshik Prakasam


[https://github.com/apache/kafka/pull/10650] showed a case where active segment 
was absent when Log layer segments were iterated. We should investigate Log 
layer code to see if we can change Log layer segment map mutations to avoid 
absence of active segment at any given point. For example, if we are clearing 
all segments and creating a new one, maybe we can reverse the order to create a 
new segment first and then clear the old ones later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10616: KAFKA-12709; Add Admin API for `ListTransactions`

2021-06-01 Thread GitBox


hachikuji commented on a change in pull request #10616:
URL: https://github.com/apache/kafka/pull/10616#discussion_r643496029



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsResult.java
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The result of the {@link Admin#listTransactions()} call.
+ * 
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class ListTransactionsResult {
+private final KafkaFutureImpl>>> future;
+
+ListTransactionsResult(KafkaFutureImpl>>> future) {
+this.future = future;
+}
+
+public KafkaFuture> all() {
+return allByBrokerId().thenApply(map -> {
+List allListings = new ArrayList<>();
+for (Collection listings : map.values()) {
+allListings.addAll(listings);
+}
+return allListings;
+});
+}
+
+public KafkaFuture> brokerIds() {
+return future.thenApply(map -> new HashSet<>(map.keySet()));
+}

Review comment:
   I also don't feel too strong about it, so I decided to take the 
suggestion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12876) Log.roll() could forever delete producer state snapshot of empty active segment

2021-06-01 Thread Kowshik Prakasam (Jira)
Kowshik Prakasam created KAFKA-12876:


 Summary: Log.roll() could forever delete producer state snapshot 
of empty active segment
 Key: KAFKA-12876
 URL: https://issues.apache.org/jira/browse/KAFKA-12876
 Project: Kafka
  Issue Type: Bug
Reporter: Kowshik Prakasam


In Log.scala, during roll, if there is an existing segment of 0 size with the 
newOffsetToRoll then we end up 
[deleting|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L1610]
 the active segment asynchronously. This will also delete the producer state 
snapshot. However, we also [take a producer 
snapshot|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L1639]
 on newOffsetToRoll before we add the new segment. This addition could race 
with snapshot deletion and we can end up losing the snapshot forever. So, in 
this case the fix is to not delete the snapshot because we end up recreating it 
anyway.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kowshik commented on a change in pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-01 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r643498093



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1500,50 +1325,67 @@ class Log(@volatile private var _dir: File,
   }
 }
 
-deleteOldSegments(shouldDelete, RetentionSizeBreach)
+deleteOldSegments(shouldDelete, RetentionSizeBreach(this))
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
   nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
 }
 
-deleteOldSegments(shouldDelete, StartOffsetBreach)
+deleteOldSegments(shouldDelete, StartOffsetBreach(this))
   }
 
   def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix)
 
   /**
* The size of the log in bytes
*/
-  def size: Long = Log.sizeInBytes(logSegments)
+  def size: Long = localLog.segments.sizeInBytes
 
   /**
-   * The offset metadata of the next message that will be appended to the log
+   * The offset of the next message that will be appended to the log
*/
-  def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
+  def logEndOffset: Long =  localLog.logEndOffset
 
   /**
-   * The offset of the next message that will be appended to the log
+   * The offset metadata of the next message that will be appended to the log
*/
-  def logEndOffset: Long = nextOffsetMetadata.messageOffset
+  def logEndOffsetMetadata: LogOffsetMetadata = localLog.logEndOffsetMetadata
+
+  private val rollAction = RollAction(
+preRollAction = (newSegment: LogSegment) => {
+  // Take a snapshot of the producer state to facilitate recovery. It is 
useful to have the snapshot
+  // offset align with the new segment offset since this ensures we can 
recover the segment by beginning
+  // with the corresponding snapshot file and scanning the segment data. 
Because the segment base offset
+  // may actually be ahead of the current producer state end offset (which 
corresponds to the log end offset),
+  // we manually override the state offset here prior to taking the 
snapshot.
+  producerStateManager.updateMapEndOffset(newSegment.baseOffset)
+  producerStateManager.takeSnapshot()
+},
+postRollAction = (newSegment: LogSegment, deletedSegment: 
Option[LogSegment]) => {
+  deletedSegment.foreach(segment => 
deleteProducerSnapshotAsync(Seq(segment)))

Review comment:
   I've created a JIRA to track this: 
https://issues.apache.org/jira/browse/KAFKA-12876.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

2021-06-01 Thread GitBox


hachikuji commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r643513632



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -279,30 +279,33 @@ class KafkaApis(val requestChannel: RequestChannel,
 new 
StopReplicaResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
 } else {
   val partitionStates = stopReplicaRequest.partitionStates().asScala
-  val (result, error) = replicaManager.stopReplicas(
-request.context.correlationId,
-stopReplicaRequest.controllerId,
-stopReplicaRequest.controllerEpoch,
-stopReplicaRequest.brokerEpoch,
-partitionStates)
-  // Clear the coordinator caches in case we were the leader. In the case 
of a reassignment, we
-  // cannot rely on the LeaderAndIsr API for this since it is only sent to 
active replicas.
-  result.forKeyValue { (topicPartition, error) =>
-if (error == Errors.NONE) {
-  if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
-  && partitionStates(topicPartition).deletePartition) {
-groupCoordinator.onResignation(topicPartition.partition)
-  } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
- && partitionStates(topicPartition).deletePartition) {
+  def onStopReplicas(error: Errors, partitions: Map[TopicPartition, 
Errors]): Unit = {
+// Clear the coordinator caches in case we were the leader. In the 
case of a reassignment, we
+// cannot rely on the LeaderAndIsr API for this since it is only sent 
to active replicas.
+partitions.forKeyValue { (topicPartition, partitionError) =>
+  if (partitionError == Errors.NONE) {
 val partitionState = partitionStates(topicPartition)
 val leaderEpoch = if (partitionState.leaderEpoch >= 0)
-Some(partitionState.leaderEpoch)
+  Some(partitionState.leaderEpoch)
 else
   None
-txnCoordinator.onResignation(topicPartition.partition, 
coordinatorEpoch = leaderEpoch)
+if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
+  && partitionState.deletePartition) {
+  groupCoordinator.onResignation(topicPartition.partition, 
leaderEpoch)
+} else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
+  && partitionState.deletePartition) {
+  txnCoordinator.onResignation(topicPartition.partition, 
coordinatorEpoch = leaderEpoch)
+}
   }
 }
   }
+  val (result, error) = replicaManager.stopReplicas(
+request.context.correlationId,
+stopReplicaRequest.controllerId,
+stopReplicaRequest.controllerEpoch,
+stopReplicaRequest.brokerEpoch,
+partitionStates,
+onStopReplicas)

Review comment:
   If I understand correctly, the original issue concerned the potential 
reordering of loading/unloading events. This was possible because of 
inconsistent locking and the fact that we relied 100% on the order that the 
task was submitted to the scheduler. With this patch, we are now using the 
leader epoch in order to ensure that loading/unloading events are handled in 
the correct order. This means it does not actually matter if the events get 
submitted to the scheduler in the wrong order. Does that make sense or am I 
still missing something?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

2021-06-01 Thread GitBox


hachikuji commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-852487435


   @tombentley Apologies for the delay. I was out on leave for the past month. 
I responded to Guozhang's comment. Let me know if it makes sense or not.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10280: KAFKA-12554: Refactor Log layer

2021-06-01 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r643514883



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1806,37 +1566,37 @@ class Log(@volatile private var _dir: File,
 endOffset: Long
   ): Unit = {
 logStartOffset = startOffset
-nextOffsetMetadata = LogOffsetMetadata(endOffset, 
activeSegment.baseOffset, activeSegment.size)
-recoveryPoint = math.min(recoveryPoint, endOffset)
-rebuildProducerState(endOffset, producerStateManager)
+lock synchronized {
+  rebuildProducerState(endOffset, producerStateManager)
+}

Review comment:
   Sounds good. I'll fix this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji edited a comment on pull request #10803: KAFKA-12874; Increase default consumer session timeout to 45s

2021-06-01 Thread GitBox


hachikuji edited a comment on pull request #10803:
URL: https://github.com/apache/kafka/pull/10803#issuecomment-852392818


   @mumrah We'll have to see. It wouldn't surprise me if the integration tests 
need a little additional tuning.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12877) Fix KRPC files with missing flexibleVersions annotation

2021-06-01 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-12877:


 Summary: Fix KRPC files with missing flexibleVersions annotation
 Key: KAFKA-12877
 URL: https://issues.apache.org/jira/browse/KAFKA-12877
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe
Assignee: Colin McCabe


Some KRPC files do not specify their flexibleVersions. Unfortunately, in this 
case, we default to not supporting any flexible versions. This is a poor 
default, since the flexible format is both more efficient (usually) and 
flexible.

Make flexibleVersions explicit and disallow setting anything except "0+" on new 
RPC and metadata records.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on pull request #10664: KAFKA-12749: Changelog topic config on suppressed KTable lost

2021-06-01 Thread GitBox


wcarlson5 commented on pull request #10664:
URL: https://github.com/apache/kafka/pull/10664#issuecomment-852546841


   Sorry, this got lost in my inbox. @vichu if you can rebase I think its good 
to go. do you agree @ableegoldman ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10681) MM2 translateOffsets returns wrong offsets

2021-06-01 Thread Sandeep mehta (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17355371#comment-17355371
 ] 

Sandeep mehta commented on KAFKA-10681:
---

I am not sure why its happening with *RemoteClusterUtils.translateOffsets().* 

But it's not required anymore with the latest versions of mm2. You can use 
Mirrormaker2 from Kafka 2.7.0. It comes with automated consumer offset sync, 
which translates consumer offsets automatically. 

https://issues.apache.org/jira/browse/KAFKA-9076

I hope it helps

> MM2 translateOffsets returns wrong offsets
> --
>
> Key: KAFKA-10681
> URL: https://issues.apache.org/jira/browse/KAFKA-10681
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
> Environment: GKE, strimzi release
>Reporter: Carlo Bongiovanni
>Priority: Major
>
> Hi all,
> we'd like to make use of the ability of MM2 to mirror checkpoints of consumer 
> offsets, in order to have a graceful failover from an active cluster to a 
> standby one.
> For this reason we have created the following setup (FYI all done with 
> strimzi on k8s):
>  * an active kafka cluster 2.5.0 used by a few producers/consumers
>  * a standby kafka cluster 2.5.0
>  * MM2 is setup in one direction only to mirror from active to standby
> We have let MM2 run for some time and we could verify that messages are 
> effectively mirrored.
> At this point we have started developing the tooling to create consumer 
> groups in the consumer-offsets topic of the passive cluster, by reading the 
> internal checkpoints topic.
> The following is an extract of our code to read the translated offsets:
> {code:java}
> Map mm2Props = new HashMap<>();
>  mm2Props.put(BOOTSTRAP_SERVERS_CONFIG, "bootstrap_servers");
>  mm2Props.put("source.cluster.alias", "euwe");
>  mm2Props.put(SASL_MECHANISM, "SCRAM-SHA-512");
>  mm2Props.put(SASL_JAAS_CONFIG, 
> "org.apache.kafka.common.security.scram.ScramLoginModule required 
> username=\"user\" password=\"password\";");
>  mm2Props.put(SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
>  mm2Props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, 
> "/usr/local/lib/jdk/lib/security/cacerts");
>  mm2Props.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, "some-password");
> Map translatedOffsets = RemoteClusterUtils
>  .translateOffsets(mm2Props, (String) mm2Props.get("source.cluster.alias"), 
> cgi,
>  Duration.ofSeconds(60L));
> {code}
>  
> Before persisting the translated offsets with 
> {code:java}
> AlterConsumerGroupOffsetsResult alterConsumerGroupOffsetsResult = kafkaClient
>  .alterConsumerGroupOffsets(cgi, offsets);{code}
> we filter them because we don't want to create consumer groups for all 
> retrieved offsets.
> During the filtering, we compare the values of the translated offset for each 
> topic partition (as coming from the checkpoint topic), 
>  with the respective current offset value for each topic partition (as 
> mirrored from MM2).
> While running this check we have verified that for some topics we get big 
> difference between those values, while for other topics the update seems 
> realistic.
> For example, looking at a given target partition we see it has an offset of 
> 100 (after mirroring by mm2). 
>  From the checkpoint topic for the same consumer group id, we receive offset 
> 200, and later 150.
> The issues are that:
>  * both consumer group id offsets exceed the real offset of the partition
>  * the consumer group id offsets from checkpoint goes down over time, not up
> We haven't been able to explain it, the wrong numbers are coming from the 
> *RemoteClusterUtils.translateOffsets()* and we're wondering if this could be 
> a misconfiguration on our side or a bug of MM2.
> Thanks, best
>  C.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe opened a new pull request #10804: KAFKA-12877: fix KRPC files with missing flexibleVersions annotations

2021-06-01 Thread GitBox


cmccabe opened a new pull request #10804:
URL: https://github.com/apache/kafka/pull/10804


   Some KRPC files do not specify their flexibleVersions. Unfortunately,
   in this case, we default to not supporting any flexible versions. This
   is a poor default, since the flexible format is both more efficient
   (usually) and flexible.
   
   Make flexibleVersions explicit and disallow setting anything except
   "0+" on new RPC, metadata, and storage records.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12878) Support --bootstrap-server kafka-streams-application-reset

2021-06-01 Thread Neil Buesing (Jira)
Neil Buesing created KAFKA-12878:


 Summary: Support --bootstrap-server kafka-streams-application-reset
 Key: KAFKA-12878
 URL: https://issues.apache.org/jira/browse/KAFKA-12878
 Project: Kafka
  Issue Type: Improvement
  Components: tools
Reporter: Neil Buesing
Assignee: Mitchell
 Fix For: 2.5.0


This is a unambitious initial move toward standardizing the command line tools. 
We have favored the name {{\-\-bootstrap-server}} in all new tools since it 
matches the config {{bootstrap.server}} which is used by all clients. Some 
older commands use {{\-\-broker-list}} or {{\-\-bootstrap-servers}} and maybe 
other exotic variations. We should support {{\-\-bootstrap-server}} in all 
commands and deprecate the other options.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12878) Support --bootstrap-server kafka-streams-application-reset

2021-06-01 Thread Neil Buesing (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neil Buesing updated KAFKA-12878:
-
Fix Version/s: (was: 2.5.0)

> Support --bootstrap-server kafka-streams-application-reset
> --
>
> Key: KAFKA-12878
> URL: https://issues.apache.org/jira/browse/KAFKA-12878
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Neil Buesing
>Assignee: Mitchell
>Priority: Major
>  Labels: pull-request-available
>
> This is a unambitious initial move toward standardizing the command line 
> tools. We have favored the name {{\-\-bootstrap-server}} in all new tools 
> since it matches the config {{bootstrap.server}} which is used by all 
> clients. Some older commands use {{\-\-broker-list}} or 
> {{\-\-bootstrap-servers}} and maybe other exotic variations. We should 
> support {{\-\-bootstrap-server}} in all commands and deprecate the other 
> options.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >