Re: [PR] KAFKA-17421: Add IT for ConsumerRecord#leaderEpoch [kafka]

2024-12-28 Thread via GitHub


chia7712 commented on code in PR #18254:
URL: https://github.com/apache/kafka/pull/18254#discussion_r1898879221


##
core/src/test/java/kafka/clients/consumer/ConsumerIntegrationTest.java:
##
@@ -185,4 +194,72 @@ public void 
onPartitionsAssigned(Collection partitions) {
 }
 }
 }
+
+@ClusterTest(types = {Type.KRAFT}, brokers = 3)
+public void testLeaderEpoch(ClusterInstance clusterInstance) throws 
Exception {
+String topic = "test-topic";
+clusterInstance.createTopic(topic, 1, (short) 2);
+var msgNum = 10;
+sendMsg(clusterInstance, topic, msgNum);
+
+try (var consumer = clusterInstance.consumer()) {
+List topicPartitions = 
Collections.singletonList(new TopicPartition(topic, 0));

Review Comment:
   `List.of`



##
core/src/test/java/kafka/clients/consumer/ConsumerIntegrationTest.java:
##
@@ -185,4 +194,72 @@ public void 
onPartitionsAssigned(Collection partitions) {
 }
 }
 }
+
+@ClusterTest(types = {Type.KRAFT}, brokers = 3)
+public void testLeaderEpoch(ClusterInstance clusterInstance) throws 
Exception {
+String topic = "test-topic";
+clusterInstance.createTopic(topic, 1, (short) 2);
+var msgNum = 10;
+sendMsg(clusterInstance, topic, msgNum);
+
+try (var consumer = clusterInstance.consumer()) {
+List topicPartitions = 
Collections.singletonList(new TopicPartition(topic, 0));
+consumer.assign(topicPartitions);
+consumer.seekToBeginning(Collections.singletonList(new 
TopicPartition(topic, 0)));
+
+int consumed = 0;
+while (consumed < msgNum) {
+ConsumerRecords records = 
consumer.poll(Duration.ofMillis(1000));
+for (ConsumerRecord record : records) {
+assertTrue(record.leaderEpoch().isPresent());
+assertEquals(0, record.leaderEpoch().get());
+}
+consumed += records.count();
+}
+
+// make the leader epoch increment by shutdown the leader broker
+shutdownFirstPartitionLeader(clusterInstance, topic);
+
+sendMsg(clusterInstance, topic, msgNum);
+
+consumed = 0;
+while (consumed < msgNum) {
+ConsumerRecords records = 
consumer.poll(Duration.ofMillis(1000));
+for (ConsumerRecord record : records) {
+assertTrue(record.leaderEpoch().isPresent());
+assertEquals(1, record.leaderEpoch().get());
+}
+consumed += records.count();
+}
+}
+}
+
+private void shutdownFirstPartitionLeader(ClusterInstance clusterInstance,
+  String topic) throws Exception {
+var leaderBrokerId = -1;
+try (var admin = clusterInstance.admin()) {
+DescribeTopicsResult result = admin.describeTopics(List.of(topic));

Review Comment:
   finding the leader is a common op, so could you please add a helper to 
`ClusterInstance` to return leader of partition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-18354 Use log4j2 APIs to refactor LogCaptureAppender [kafka]

2024-12-28 Thread via GitHub


m1a2st opened a new pull request, #18338:
URL: https://github.com/apache/kafka/pull/18338

   as title 
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-18355) Stream thread blocks indefinitely for acquiring state directory lock

2024-12-28 Thread Ravi Gupta (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ravi Gupta updated KAFKA-18355:
---
Description: 
We are running Kafka streams based application in production and have noticed 
couple of times our lag on source topic partition start increasing.

Based on investigation, we found the below happening:
 * Thread responsible for the partition task gets Authentication exception ( 
MSK IAM authentication gives the transient exception) while producing record in 
the Sink

{code:java}
{
"level":"ERROR",
"logger_name":"org.apache.kafka.clients.NetworkClient",
"message":"[Producer 
clientId=x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer,
 
transactionalId=x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-3]
 Connection to node 1 
(b-1.xxx.yy.c2.kafka.ap-southeast-1.amazonaws.com/xx.xx.xxx.:) 
failed authentication due to: An error: 
(java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
Failed to find AWS IAM Credentials [Caused by 
com.amazonaws.AmazonServiceException: Unauthorized (Service: null; Status Code: 
401; Error Code: null; Request ID: null; Proxy: null)]) occurred when 
evaluating SASL token received from the Kafka Broker. Kafka Client will go to 
AUTHENTICATION_FAILED state.",
"thread_name":"kafka-producer-network-thread | 
x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer",
"time":"2024-12-26T07:40:45.113067247Z"
} {code}
 * In some cases, the system recovers when the next record is polled and the 
Sink Node ( RecordCollectorImpl) throws the exception from the last message 
while processing
 * However, in couple of cases the following logs appears, approximately 5 
minutes after the producer failure. ( {_}N{_}{_}o additional log statement to 
understand why thread stopped polling, however it seems heartbeat thread got 
the same exception as producer){_}.

{code:java}
{
"level":"WARN",
"logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
"message":"[Consumer 
clientId=x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-consumer,
 groupId=x--lall-lio-step-executor_lio-se] consumer poll timeout has 
expired. This means the time between subsequent calls to poll() was longer than 
the configured max.poll.interval.ms, which typically implies that the poll loop 
is spending too much time processing messages. You can address this either by 
increasing max.poll.interval.ms or by reducing the maximum size of batches 
returned in poll() with max.poll.records.",
"thread_name":"kafka-coordinator-heartbeat-thread | 
x--lall-lio-step-executor_lio-se",
"time":"2024-12-26T07:45:43.286428901Z"
} {code}
 * In such cases, the partition gets assigned to a new thread ( Thread 5), 
however the new thread keep throwing the following exception:

{code:java}
{
"level":"INFO",
"logger_name":"org.apache.kafka.streams.processor.internals.TaskManager",
"message":"stream-thread 
[x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
 Encountered lock exception. Reattempting locking the state in the next 
iteration.",
"stack_trace":"org.apache.kafka.streams.errors.LockException: stream-thread 
[x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
 task [8_0] Failed to lock the state directory for task 8_0\n\tat 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1010)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:997)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\n",
"thread_name":"x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5",
"time":"2024-12-26T07:50:53.904374419Z"
} {code}
 * We are using exception handler, however, in these failure cases our 
exception handler is not called for both producer and consumer exception. 
However in some authentication exception during consume/produce we see the 
handler being called.

 

  was:
We are running Kafka streams 

[jira] [Created] (KAFKA-18355) Stream thread blocks indefinitely for acquiring state directory lock

2024-12-28 Thread Ravi Gupta (Jira)
Ravi Gupta created KAFKA-18355:
--

 Summary: Stream thread blocks indefinitely for acquiring state 
directory lock
 Key: KAFKA-18355
 URL: https://issues.apache.org/jira/browse/KAFKA-18355
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.8.1
Reporter: Ravi Gupta


We are running Kafka streams based application in production and have noticed 
couple of times our lag on source topic partition start increasing.

Based on investigation, we found the below happening:
 # Thread responsible for the partition task gets Authentication exception ( 
MSK IAM authentication gives the transient exception) while producing record in 
the Sink

{code:java}
{
"level":"ERROR",
"logger_name":"org.apache.kafka.clients.NetworkClient",
"message":"[Producer 
clientId=x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer,
 
transactionalId=x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-3]
 Connection to node 1 
(b-1.xxx.yy.c2.kafka.ap-southeast-1.amazonaws.com/xx.xx.xxx.:) 
failed authentication due to: An error: 
(java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
Failed to find AWS IAM Credentials [Caused by 
com.amazonaws.AmazonServiceException: Unauthorized (Service: null; Status Code: 
401; Error Code: null; Request ID: null; Proxy: null)]) occurred when 
evaluating SASL token received from the Kafka Broker. Kafka Client will go to 
AUTHENTICATION_FAILED state.",
"thread_name":"kafka-producer-network-thread | 
x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer",
"time":"2024-12-26T07:40:45.113067247Z"
} {code}

 # In some cases, the system recovers when the next record is polled and the 
Sink Node ( RecordCollectorImpl) throws the exception from the last message 
while processing
 # However, in couple of cases the following logs appears, approximately 5 
minutes after the producer failure. ( {_}N{_}{_}o additional log statement to 
understand why thread stopped polling, however it seems heartbeat thread got 
the same exception as producer){_}.

{code:java}
{
"level":"WARN",
"logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
"message":"[Consumer 
clientId=x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-consumer,
 groupId=x--lall-lio-step-executor_lio-se] consumer poll timeout has 
expired. This means the time between subsequent calls to poll() was longer than 
the configured max.poll.interval.ms, which typically implies that the poll loop 
is spending too much time processing messages. You can address this either by 
increasing max.poll.interval.ms or by reducing the maximum size of batches 
returned in poll() with max.poll.records.",
"thread_name":"kafka-coordinator-heartbeat-thread | 
lioprodapsevc-papse-lall-lio-step-executor_lio-se",
"time":"2024-12-26T07:45:43.286428901Z"
} {code}

 # In such cases, the partition gets assigned to a new thread ( Thread 5), 
however the new thread keep throwing the following exception:

{code:java}
{
"level":"INFO",
"logger_name":"org.apache.kafka.streams.processor.internals.TaskManager",
"message":"stream-thread 
[x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
 Encountered lock exception. Reattempting locking the state in the next 
iteration.",
"stack_trace":"org.apache.kafka.streams.errors.LockException: stream-thread 
[x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
 task [8_0] Failed to lock the state directory for task 8_0\n\tat 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1010)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:997)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\n",
"thread_name":"x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5",
"time":"2024-12-26T07:50:53.904374419Z"
} {code}

 # We are using exception handler, however, in these failure cases our 

[jira] [Updated] (KAFKA-18355) Stream thread blocks indefinitely for acquiring state directory lock

2024-12-28 Thread Ravi Gupta (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ravi Gupta updated KAFKA-18355:
---
Description: 
We are running Kafka streams based application in production and have noticed 
couple of times our lag on source topic partition start increasing.

Based on investigation, we found the below happening:
 * Thread responsible for the partition task gets Authentication exception ( 
MSK IAM authentication gives the transient exception) while producing record in 
the Sink

{code:java}
{
"level":"ERROR",
"logger_name":"org.apache.kafka.clients.NetworkClient",
"message":"[Producer 
clientId=x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer,
 
transactionalId=x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-3]
 Connection to node 1 
(b-1.xxx.yy.c2.kafka.ap-southeast-1.amazonaws.com/xx.xx.xxx.:) 
failed authentication due to: An error: 
(java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
Failed to find AWS IAM Credentials [Caused by 
com.amazonaws.AmazonServiceException: Unauthorized (Service: null; Status Code: 
401; Error Code: null; Request ID: null; Proxy: null)]) occurred when 
evaluating SASL token received from the Kafka Broker. Kafka Client will go to 
AUTHENTICATION_FAILED state.",
"thread_name":"kafka-producer-network-thread | 
x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer",
"time":"2024-12-26T07:40:45.113067247Z"
} {code}
 * In some cases, the system recovers when the next record is polled and the 
Sink Node ( RecordCollectorImpl) throws the exception from the last message 
while processing
 * However, in couple of cases the following logs appears, approximately 5 
minutes after the producer failure. ( {_}N{_}{_}o additional log statement to 
understand why thread stopped polling, however it seems heartbeat thread got 
the same exception as producer){_}.

{code:java}
{
"level":"WARN",
"logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
"message":"[Consumer 
clientId=x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-consumer,
 groupId=x--lall-lio-step-executor_lio-se] consumer poll timeout has 
expired. This means the time between subsequent calls to poll() was longer than 
the configured max.poll.interval.ms, which typically implies that the poll loop 
is spending too much time processing messages. You can address this either by 
increasing max.poll.interval.ms or by reducing the maximum size of batches 
returned in poll() with max.poll.records.",
"thread_name":"kafka-coordinator-heartbeat-thread | 
x--lall-lio-step-executor_lio-se",
"time":"2024-12-26T07:45:43.286428901Z"
} {code}
 * In such cases, the partition gets assigned to a new thread ( Thread 5), 
however the new thread keep throwing the following exception:

{code:java}
{
"level":"INFO",
"logger_name":"org.apache.kafka.streams.processor.internals.TaskManager",
"message":"stream-thread 
[x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
 Encountered lock exception. Reattempting locking the state in the next 
iteration.",
"stack_trace":"org.apache.kafka.streams.errors.LockException: stream-thread 
[x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
 task [8_0] Failed to lock the state directory for task 8_0\n\tat 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1010)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:997)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\n",
"thread_name":"x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5",
"time":"2024-12-26T07:50:53.904374419Z"
} {code}
 * We are using exception handler, however, in these failure cases our 
exception handler is not called for both producer and consumer exception. 
However in some authentication exception during consume/produce we see the 
handler being called.

It seems that old thread didn't clean u

[jira] [Updated] (KAFKA-18355) Stream thread blocks indefinitely for acquiring state directory lock

2024-12-28 Thread Ravi Gupta (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ravi Gupta updated KAFKA-18355:
---
Description: 
We are running Kafka streams based application in production and have noticed 
couple of times \{*}lag on source topic partition start increasing{*}.

Based on investigation, we found the below happening:
 * Thread responsible for the partition task gets Authentication exception ( 
MSK IAM authentication gives the transient exception) while producing record in 
the Sink

{code:java}
{
"level":"ERROR",
"logger_name":"org.apache.kafka.clients.NetworkClient",
"message":"[Producer 
clientId=x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer,
 
transactionalId=x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-3]
 Connection to node 1 
(b-1.xxx.yy.c2.kafka.ap-southeast-1.amazonaws.com/xx.xx.xxx.:) 
failed authentication due to: An error: 
(java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
Failed to find AWS IAM Credentials [Caused by 
com.amazonaws.AmazonServiceException: Unauthorized (Service: null; Status Code: 
401; Error Code: null; Request ID: null; Proxy: null)]) occurred when 
evaluating SASL token received from the Kafka Broker. Kafka Client will go to 
AUTHENTICATION_FAILED state.",
"thread_name":"kafka-producer-network-thread | 
x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer",
"time":"2024-12-26T07:40:45.113067247Z"
} {code}
 * In some cases, the system recovers when the next record is polled and the 
Sink Node ( RecordCollectorImpl) throws the exception from the last message 
while processing
 * However, in couple of cases the following logs appears, approximately 5 
minutes after the producer failure. ( {_}N{_}{_}o additional log statement to 
understand why thread stopped polling, however it seems heartbeat thread got 
the same exception as producer){_}.

{code:java}
{
"level":"WARN",
"logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
"message":"[Consumer 
clientId=x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-consumer,
 groupId=x--lall-lio-step-executor_lio-se] consumer poll timeout has 
expired. This means the time between subsequent calls to poll() was longer than 
the configured max.poll.interval.ms, which typically implies that the poll loop 
is spending too much time processing messages. You can address this either by 
increasing max.poll.interval.ms or by reducing the maximum size of batches 
returned in poll() with max.poll.records.",
"thread_name":"kafka-coordinator-heartbeat-thread | 
x--lall-lio-step-executor_lio-se",
"time":"2024-12-26T07:45:43.286428901Z"
} {code}
 * In such cases, the partition gets assigned to a new thread ( Thread 5), 
however the new thread keep throwing the following exception:

{code:java}
{
"level":"INFO",
"logger_name":"org.apache.kafka.streams.processor.internals.TaskManager",
"message":"stream-thread 
[x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
 Encountered lock exception. Reattempting locking the state in the next 
iteration.",
"stack_trace":"org.apache.kafka.streams.errors.LockException: stream-thread 
[x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
 task [8_0] Failed to lock the state directory for task 8_0\n\tat 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1010)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:997)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\n",
"thread_name":"x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5",
"time":"2024-12-26T07:50:53.904374419Z"
} {code}
 * We are using exception handler, however, in these failure cases our 
exception handler is not called for both producer and consumer exception. 
However in some authentication exception during consume/produce we see the 
handler being called.

It seems that old thread didn't clea

Re: [PR] KAFKA-18277: fix e2e network_degrade_test [kafka]

2024-12-28 Thread via GitHub


TaiJuWu commented on PR #18247:
URL: https://github.com/apache/kafka/pull/18247#issuecomment-2564298635

   After check, this failure is related to linux version but there are some 
issue about vagrant and I just workaround it please refer 
https://github.com/apache/kafka/pull/18247/commits/ef5716b0b1adce95df6d91196fe55a0ea21bbe7f.
   
   
![image_720](https://github.com/user-attachments/assets/0b28f7c4-fdf2-41b3-a861-851134d0cce9)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-18355) Stream thread blocks indefinitely for acquiring state directory lock

2024-12-28 Thread Ravi Gupta (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ravi Gupta updated KAFKA-18355:
---
Description: 
We are running Kafka streams based application in production and have noticed 
couple of times {*}our lag on source topic partition start increasing{*}.

Based on investigation, we found the below happening:
 * Thread responsible for the partition task gets Authentication exception ( 
MSK IAM authentication gives the transient exception) while producing record in 
the Sink

{code:java}
{
"level":"ERROR",
"logger_name":"org.apache.kafka.clients.NetworkClient",
"message":"[Producer 
clientId=x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer,
 
transactionalId=x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-3]
 Connection to node 1 
(b-1.xxx.yy.c2.kafka.ap-southeast-1.amazonaws.com/xx.xx.xxx.:) 
failed authentication due to: An error: 
(java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
Failed to find AWS IAM Credentials [Caused by 
com.amazonaws.AmazonServiceException: Unauthorized (Service: null; Status Code: 
401; Error Code: null; Request ID: null; Proxy: null)]) occurred when 
evaluating SASL token received from the Kafka Broker. Kafka Client will go to 
AUTHENTICATION_FAILED state.",
"thread_name":"kafka-producer-network-thread | 
x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer",
"time":"2024-12-26T07:40:45.113067247Z"
} {code}
 * In some cases, the system recovers when the next record is polled and the 
Sink Node ( RecordCollectorImpl) throws the exception from the last message 
while processing
 * However, in couple of cases the following logs appears, approximately 5 
minutes after the producer failure. ( {_}N{_}{_}o additional log statement to 
understand why thread stopped polling, however it seems heartbeat thread got 
the same exception as producer){_}.

{code:java}
{
"level":"WARN",
"logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
"message":"[Consumer 
clientId=x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-consumer,
 groupId=x--lall-lio-step-executor_lio-se] consumer poll timeout has 
expired. This means the time between subsequent calls to poll() was longer than 
the configured max.poll.interval.ms, which typically implies that the poll loop 
is spending too much time processing messages. You can address this either by 
increasing max.poll.interval.ms or by reducing the maximum size of batches 
returned in poll() with max.poll.records.",
"thread_name":"kafka-coordinator-heartbeat-thread | 
x--lall-lio-step-executor_lio-se",
"time":"2024-12-26T07:45:43.286428901Z"
} {code}
 * In such cases, the partition gets assigned to a new thread ( Thread 5), 
however the new thread keep throwing the following exception:

{code:java}
{
"level":"INFO",
"logger_name":"org.apache.kafka.streams.processor.internals.TaskManager",
"message":"stream-thread 
[x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
 Encountered lock exception. Reattempting locking the state in the next 
iteration.",
"stack_trace":"org.apache.kafka.streams.errors.LockException: stream-thread 
[x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
 task [8_0] Failed to lock the state directory for task 8_0\n\tat 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1010)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:997)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\n",
"thread_name":"x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5",
"time":"2024-12-26T07:50:53.904374419Z"
} {code}
 * We are using exception handler, however, in these failure cases our 
exception handler is not called for both producer and consumer exception. 
However in some authentication exception during consume/produce we see the 
handler being called.

It seems that old thread didn't c

[jira] [Updated] (KAFKA-18355) Stream thread blocks indefinitely for acquiring state directory lock

2024-12-28 Thread Ravi Gupta (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ravi Gupta updated KAFKA-18355:
---
Description: 
We are running Kafka streams based application in production and have noticed 
couple of times our lag on source topic partition start increasing.

Based on investigation, we found the below happening:
 * Thread responsible for the partition task gets Authentication exception ( 
MSK IAM authentication gives the transient exception) while producing record in 
the Sink

{code:java}
{
"level":"ERROR",
"logger_name":"org.apache.kafka.clients.NetworkClient",
"message":"[Producer 
clientId=x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer,
 
transactionalId=x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-3]
 Connection to node 1 
(b-1.xxx.yy.c2.kafka.ap-southeast-1.amazonaws.com/xx.xx.xxx.:) 
failed authentication due to: An error: 
(java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
Failed to find AWS IAM Credentials [Caused by 
com.amazonaws.AmazonServiceException: Unauthorized (Service: null; Status Code: 
401; Error Code: null; Request ID: null; Proxy: null)]) occurred when 
evaluating SASL token received from the Kafka Broker. Kafka Client will go to 
AUTHENTICATION_FAILED state.",
"thread_name":"kafka-producer-network-thread | 
x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer",
"time":"2024-12-26T07:40:45.113067247Z"
} {code}
 * In some cases, the system recovers when the next record is polled and the 
Sink Node ( RecordCollectorImpl) throws the exception from the last message 
while processing
 * However, in couple of cases the following logs appears, approximately 5 
minutes after the producer failure. ( {_}N{_}{_}o additional log statement to 
understand why thread stopped polling, however it seems heartbeat thread got 
the same exception as producer){_}.

{code:java}
{
"level":"WARN",
"logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
"message":"[Consumer 
clientId=x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-consumer,
 groupId=x--lall-lio-step-executor_lio-se] consumer poll timeout has 
expired. This means the time between subsequent calls to poll() was longer than 
the configured max.poll.interval.ms, which typically implies that the poll loop 
is spending too much time processing messages. You can address this either by 
increasing max.poll.interval.ms or by reducing the maximum size of batches 
returned in poll() with max.poll.records.",
"thread_name":"kafka-coordinator-heartbeat-thread | 
lioprodapsevc-papse-lall-lio-step-executor_lio-se",
"time":"2024-12-26T07:45:43.286428901Z"
} {code}
 * In such cases, the partition gets assigned to a new thread ( Thread 5), 
however the new thread keep throwing the following exception:

{code:java}
{
"level":"INFO",
"logger_name":"org.apache.kafka.streams.processor.internals.TaskManager",
"message":"stream-thread 
[x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
 Encountered lock exception. Reattempting locking the state in the next 
iteration.",
"stack_trace":"org.apache.kafka.streams.errors.LockException: stream-thread 
[x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
 task [8_0] Failed to lock the state directory for task 8_0\n\tat 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1010)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:997)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\n",
"thread_name":"x--lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5",
"time":"2024-12-26T07:50:53.904374419Z"
} {code}
 * We are using exception handler, however, in these failure cases our 
exception handler is not called for both producer and consumer exception. 
However in some authentication exception during consume/produce we see the 
handler being called.

 

  was:
We are running Kafka

Re: [PR] KAFKA-18339: Remove raw unversioned direct SASL protocol (KIP-896) [kafka]

2024-12-28 Thread via GitHub


chia7712 commented on code in PR #18295:
URL: https://github.com/apache/kafka/pull/18295#discussion_r1898872993


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -1107,8 +1107,10 @@ private[kafka] class Processor(
 val header = RequestHeader.parse(buffer)
 if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) {
   header
-} else {
+} else if (header.isApiVersionDeprecated()) {
   throw new InvalidRequestException(s"Received request api key 
${header.apiKey} with version ${header.apiVersion} which is not enabled")
+} else {
+  throw new UnsupportedVersionException(s"Received request api key 
${header.apiKey} with version ${header.apiVersion} which is not supported")

Review Comment:
   Pardon me, this changes the returned exception from `INVALID_REQUEST` to 
`UNSUPPORTED_VERSION` as `isApiVersionDeprecated` always returns `false` for 
now. Is it expected? 



##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -1107,8 +1107,10 @@ private[kafka] class Processor(
 val header = RequestHeader.parse(buffer)
 if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) {
   header
-} else {
+} else if (header.isApiVersionDeprecated()) {

Review Comment:
   Should we add the deprecated versions back to the JSON files? This would 
enable us to provide more informative error messages to clients, helping them 
understand why certain APIs are marked as "unsupported" (deprecated, removed, 
or disabled).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove unused previousPartition from RoundRobinPartitioner.java [kafka]

2024-12-28 Thread via GitHub


chia7712 merged PR #18331:
URL: https://github.com/apache/kafka/pull/18331


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Update Consumer and Producer JavaDocs for committing offsets [kafka]

2024-12-28 Thread via GitHub


AndrewJSchofield commented on code in PR #18336:
URL: https://github.com/apache/kafka/pull/18336#discussion_r1898872562


##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -70,8 +70,12 @@
  * Offsets and Consumer Position
  * Kafka maintains a numerical offset for each record in a partition. This 
offset acts as a unique identifier of
  * a record within that partition, and also denotes the position of the 
consumer in the partition. For example, a consumer
- * which is at position 5 has consumed records with offsets 0 through 4 and 
will next receive the record with offset 5. There
- * are actually two notions of position relevant to the user of the consumer:
+ * which is at position 5 has consumed records with offsets 0 through 4 and 
will next receive the record with offset 5.
+ * Note that offsets are not guaranteed to be consecutive (eg., for compacted 
topic, or—independent of "read_committed"

Review Comment:
   I suggest the following the parentheses "(such as compacted topic or when 
records have been produced using transactions)".



##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -677,7 +679,9 @@ public void beginTransaction() throws 
ProducerFencedException {
  * Sends a list of specified offsets to the consumer group coordinator, 
and also marks
  * those offsets as part of the current transaction. These offsets will be 
considered
  * committed only if the transaction is committed successfully. The 
committed offset should
- * be the next message your application will consume, i.e. 
lastProcessedMessageOffset + 1.
+ * be the next message your application will consume, i.e. {@code 
nextRecordToBeProcessed.offset()}
+ * (or {@link ConsumerRecords#nextOffsets()}). You should also add the 
{@link ConsumerRecord#leaderEpoch()}
+ * (or {@code nextOffsets().get(...).leaderEpoch()}) as commit metadata.

Review Comment:
   Ditto



##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -1117,7 +1127,9 @@ public void commitAsync(OffsetCommitCallback callback) {
  * This commits offsets to Kafka. The offsets committed using this API 
will be used on the first fetch after every
  * rebalance and also on startup. As such, if you need to store offsets in 
anything other than Kafka, this API
  * should not be used. The committed offset should be the next message 
your application will consume,
- * i.e. lastProcessedMessageOffset + 1. If automatic group management with 
{@link #subscribe(Collection)} is used,
+ * i.e. {@code nextRecordToBeProcessed.offset()} (or {@link 
ConsumerRecords#nextOffsets()}).
+ * You should also add the {@link ConsumerRecord#leaderEpoch()} (or {@code 
nextOffsets().get(...).leaderEpoch()})

Review Comment:
   Ditto.



##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -1033,7 +1041,9 @@ public void commitSync(final Map offsets) {
  * This commits offsets to Kafka. The offsets committed using this API 
will be used on the first fetch after every
  * rebalance and also on startup. As such, if you need to store offsets in 
anything other than Kafka, this API
  * should not be used. The committed offset should be the next message 
your application will consume,
- * i.e. lastProcessedMessageOffset + 1. If automatic group management with 
{@link #subscribe(Collection)} is used,
+ * i.e. {@code nextRecordToBeProcessed.offset()} (or {@link 
ConsumerRecords#nextOffsets()}).
+ * You should also add the {@link ConsumerRecord#leaderEpoch()} (or {@code 
nextOffsets().get(...).leaderEpoch()})

Review Comment:
   Same point as above about `nextOffsets().get(...).leaderEpoch()`.



##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -70,8 +70,12 @@
  * Offsets and Consumer Position
  * Kafka maintains a numerical offset for each record in a partition. This 
offset acts as a unique identifier of
  * a record within that partition, and also denotes the position of the 
consumer in the partition. For example, a consumer
- * which is at position 5 has consumed records with offsets 0 through 4 and 
will next receive the record with offset 5. There
- * are actually two notions of position relevant to the user of the consumer:
+ * which is at position 5 has consumed records with offsets 0 through 4 and 
will next receive the record with offset 5.
+ * Note that offsets are not guaranteed to be consecutive (eg., for compacted 
topic, or—independent of "read_committed"
+ * mode— transactional topics). For example, if the consumer did read a 
record with offset 4, but 5 is not an offset
+ * with a record, it's position might advance to 6 (or higher) directly. 
Similarly, if the consumer's position is 5,

Review Comment:
   nit: its not it's



##
clients/src/main/java/org/apache

[jira] [Resolved] (KAFKA-18317) Remove zookeeper.connect from RemoteLogManagerTest

2024-12-28 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-18317.

Fix Version/s: 4.0.0
   Resolution: Fixed

trunk: 
https://github.com/apache/kafka/commit/bc7a1a8969b234f080dfed7553860389ad901433

4.0: 
https://github.com/apache/kafka/commit/27815fdb1a7ebcd32bde16603c6c6e69ecf5c9b6

> Remove zookeeper.connect from RemoteLogManagerTest
> --
>
> Key: KAFKA-18317
> URL: https://issues.apache.org/jira/browse/KAFKA-18317
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: 黃竣陽
>Priority: Major
> Fix For: 4.0.0
>
>
> This test still uses zookeeper.connect in its configuration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18135: ShareConsumer HB UnsupportedVersion msg mixed with Consumer HB [kafka]

2024-12-28 Thread via GitHub


AndrewJSchofield merged PR #18101:
URL: https://github.com/apache/kafka/pull/18101


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-18135) ShareConsumer HB UnsupportedVersion msg mixed with Consumer HB

2024-12-28 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield resolved KAFKA-18135.
--
Fix Version/s: 4.1.0
   Resolution: Fixed

> ShareConsumer HB UnsupportedVersion msg mixed with Consumer HB
> --
>
> Key: KAFKA-18135
> URL: https://issues.apache.org/jira/browse/KAFKA-18135
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Lianet Magrans
>Assignee: Peter Lee
>Priority: Major
> Fix For: 4.1.0
>
>
> UnsupportedVersion error is handled in the parent 
> AbstractHeartbeatRequestManager, so used by consumer and share consumer. If a 
> share consumer gets the errors, it will end up with a msg that is currently 
> specific to consumer
> [https://github.com/apache/kafka/blob/6fd951a9c0aa773060cd6bbf8a8b8c47ee9d2965/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java#L384-L386]
> One option could be to handle the UnsupportedVersion separately in the 
> existing 
> handleSpecificError (note that the unsupported version for consumer may also 
> end up containing a msg specific to SubscriptionPattern not supported in HB 
> v0, if regex is used without the required v1)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-18357) Support to dynamically configure the log level for JUL and logback

2024-12-28 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-18357:
--

 Summary: Support to dynamically configure the log level for JUL 
and logback
 Key: KAFKA-18357
 URL: https://issues.apache.org/jira/browse/KAFKA-18357
 Project: Kafka
  Issue Type: Task
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


We currently support dynamic log level configuration only when the server is 
run with Log4j, which is our official logging implementation by default. 
However, it would be beneficial to support other popular SLF4J providers to 
further consolidate Kafka's logging functionality.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18243 Loggers breaks the use of root logger after migrating to log4j2 [kafka]

2024-12-28 Thread via GitHub


frankvicky commented on code in PR #18185:
URL: https://github.com/apache/kafka/pull/18185#discussion_r1898917932


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java:
##
@@ -103,18 +105,16 @@ public synchronized LoggerLevel level(String logger) {
  * @return the levels of all known loggers; may be empty, but never null
  */
 public synchronized Map allLevels() {
-Map result = new TreeMap<>();
-
-currentLoggers().stream()
-.filter(logger -> !logger.getLevel().equals(Level.OFF))
-.forEach(logger -> result.put(logger.getName(), 
loggerLevel(logger)));
-
-org.apache.logging.log4j.Logger root = rootLogger();
-if (!root.getLevel().equals(Level.OFF)) {
-result.put(ROOT_LOGGER_NAME, loggerLevel(root));
-}
-
-return result;
+return currentLoggers()
+.values()
+.stream()
+.filter(logger -> !logger.getLevel().equals(Level.OFF))
+.collect(Collectors.toMap(
+this::getLoggerName,
+this::loggerLevel,
+(existing, replacing) -> replacing,
+TreeMap::new)

Review Comment:
   It aims to return the loggers in natural order.
   I think we should keep it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-18135) ShareConsumer HB UnsupportedVersion msg mixed with Consumer HB

2024-12-28 Thread Andrew Schofield (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-18135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17908618#comment-17908618
 ] 

Andrew Schofield commented on KAFKA-18135:
--

I have decided not to cherry-pick to the 4.0 branch because only the share 
consumer shows this error specifically for a cluster which does not have share 
groups enabled.

> ShareConsumer HB UnsupportedVersion msg mixed with Consumer HB
> --
>
> Key: KAFKA-18135
> URL: https://issues.apache.org/jira/browse/KAFKA-18135
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Lianet Magrans
>Assignee: Peter Lee
>Priority: Major
> Fix For: 4.1.0
>
>
> UnsupportedVersion error is handled in the parent 
> AbstractHeartbeatRequestManager, so used by consumer and share consumer. If a 
> share consumer gets the errors, it will end up with a msg that is currently 
> specific to consumer
> [https://github.com/apache/kafka/blob/6fd951a9c0aa773060cd6bbf8a8b8c47ee9d2965/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java#L384-L386]
> One option could be to handle the UnsupportedVersion separately in the 
> existing 
> handleSpecificError (note that the unsupported version for consumer may also 
> end up containing a msg specific to SubscriptionPattern not supported in HB 
> v0, if regex is used without the required v1)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18317: Remove zookeeper.connect from RemoteLogManagerTest [kafka]

2024-12-28 Thread via GitHub


chia7712 merged PR #18283:
URL: https://github.com/apache/kafka/pull/18283


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18243 Loggers breaks the use of root logger after migrating to log4j2 [kafka]

2024-12-28 Thread via GitHub


frankvicky commented on code in PR #18185:
URL: https://github.com/apache/kafka/pull/18185#discussion_r1898915938


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/LoggersTest.java:
##
@@ -20,248 +20,168 @@
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
 
-import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.core.LoggerContext;
-import org.apache.logging.log4j.core.config.Configuration;
-import org.apache.logging.log4j.core.config.Configurator;
-import org.apache.logging.log4j.core.config.LoggerConfig;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.junit.jupiter.MockitoSettings;
-import org.mockito.quality.Strictness;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
+import static org.apache.logging.log4j.Level.DEBUG;
+import static org.apache.logging.log4j.Level.ERROR;
+import static org.apache.logging.log4j.Level.INFO;
+import static org.apache.logging.log4j.Level.WARN;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
-@ExtendWith(MockitoExtension.class)
-@MockitoSettings(strictness = Strictness.STRICT_STUBS)
 public class LoggersTest {
-
 private static final long INITIAL_TIME = 1696951712135L;
+private final LoggerContext context = (LoggerContext) 
LogManager.getContext(false);
+private Loggers loggers;
 private Time time;
 
 @BeforeEach
 public void setup() {
+context.stop();

Review Comment:
   You're absolutely right - this appears to be the root cause of the 
`OffsetTestUtils` failures... 
   I originally chose to reset the context to ensure test isolation and 
maintain a clean testing environment. 
   I ended up shooting myself in the foot with this one.  😢 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18356 Explicitly setting up instrumentation for inline mocking (Java 21+) [kafka]

2024-12-28 Thread via GitHub


ijuma commented on code in PR #18339:
URL: https://github.com/apache/kafka/pull/18339#discussion_r1898947668


##
build.gradle:
##
@@ -1026,6 +1037,12 @@ project(':share') {
 archivesName = "kafka-share"
   }
 
+  configurations {
+mockitoAgent {
+  transitive = false
+}
+  }

Review Comment:
   Can we not do this once for all projects?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18353: Remove zk config `control.plane.listener.name` [kafka]

2024-12-28 Thread via GitHub


ijuma commented on code in PR #18329:
URL: https://github.com/apache/kafka/pull/18329#discussion_r1898947965


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -232,16 +199,14 @@ class SocketServer(
 }
 
 info("Enabling request processing.")
-controlPlaneAcceptorOpt.foreach(chainAcceptorFuture)
 dataPlaneAcceptors.values().forEach(chainAcceptorFuture)
 
FutureUtils.chainFuture(CompletableFuture.allOf(authorizerFutures.values.toArray:
 _*),
 allAuthorizerFuturesComplete)
 
 // Construct a future that will be completed when all Acceptors have been 
successfully started.
 // Alternately, if any of them fail to start, this future will be 
completed exceptionally.
-val allAcceptors = dataPlaneAcceptors.values().asScala.toSeq ++ 
controlPlaneAcceptorOpt
 val enableFuture = new CompletableFuture[Void]
-
FutureUtils.chainFuture(CompletableFuture.allOf(allAcceptors.map(_.startedFuture).toArray:
 _*), enableFuture)
+
FutureUtils.chainFuture(CompletableFuture.allOf(dataPlaneAcceptors.values().asScala.toSeq.map(_.startedFuture).toArray:
 _*), enableFuture)

Review Comment:
   Can't you avoid one of the `toSeq` by doing `toArray` immediately?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13093: Log compaction should write new segments with record version v2 (KIP-724) [kafka]

2024-12-28 Thread via GitHub


ijuma commented on code in PR #18321:
URL: https://github.com/apache/kafka/pull/18321#discussion_r1898947254


##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -2580,6 +2580,23 @@ class UnifiedLogTest {
 assertEquals(None, log.leaderEpochCache.flatMap(_.latestEpoch.toScala))
   }
 
+  @Test
+  def testLeaderEpochCacheCreatedAfterMessageFormatUpgrade(): Unit = {

Review Comment:
   This test was originally removed via #18267 - added it back and modified it 
to take into account that the leader epoch cache always exists now (since the 
configured record version is always V2).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18353: Remove zk config `control.plane.listener.name` [kafka]

2024-12-28 Thread via GitHub


ijuma commented on code in PR #18329:
URL: https://github.com/apache/kafka/pull/18329#discussion_r1898948164


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -665,16 +663,9 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
 
   def saslMechanismControllerProtocol: String = 
getString(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG)
 
-  def controlPlaneListener: Option[EndPoint] = {
-controlPlaneListenerName.map { listenerName =>
-  listeners.filter(endpoint => endpoint.listenerName.value() == 
listenerName.value()).head
-}
-  }
-
   def dataPlaneListeners: Seq[EndPoint] = {

Review Comment:
   Do we want to rename all the `dataPlane*` methods to remove the prefix? 
Perhaps that can be a separate JIRA ticket since it's not required for 4.0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18353: Remove zk config `control.plane.listener.name` [kafka]

2024-12-28 Thread via GitHub


ijuma commented on code in PR #18329:
URL: https://github.com/apache/kafka/pull/18329#discussion_r1898948313


##
docs/upgrade.html:
##
@@ -19,6 +19,13 @@
 
 

Re: [PR] KAFKA-18353: Remove zk config `control.plane.listener.name` [kafka]

2024-12-28 Thread via GitHub


ijuma commented on code in PR #18329:
URL: https://github.com/apache/kafka/pull/18329#discussion_r1898948528


##
docs/upgrade.html:
##
@@ -19,6 +19,13 @@
 
 

Re: [PR] KAFKA-18356 Explicitly setting up instrumentation for inline mocking (Java 21+) [kafka]

2024-12-28 Thread via GitHub


m1a2st commented on code in PR #18339:
URL: https://github.com/apache/kafka/pull/18339#discussion_r1898948897


##
build.gradle:
##
@@ -1026,6 +1037,12 @@ project(':share') {
 archivesName = "kafka-share"
   }
 
+  configurations {
+mockitoAgent {
+  transitive = false
+}
+  }

Review Comment:
   but not all projects need `mockitoAgent`, eg 
`':test-common:test-common-runtime'`, 
`':group-coordinator:group-coordinator-api'`, so I think its better for 
configuring it for separate projects 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18339: Remove raw unversioned direct SASL protocol (KIP-896) [kafka]

2024-12-28 Thread via GitHub


ijuma commented on code in PR #18295:
URL: https://github.com/apache/kafka/pull/18295#discussion_r1898948973


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -1107,8 +1107,10 @@ private[kafka] class Processor(
 val header = RequestHeader.parse(buffer)
 if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) {
   header
-} else {
+} else if (header.isApiVersionDeprecated()) {

Review Comment:
   No, I added a commit at the end that changes this incorrectly:
   
   
https://github.com/apache/kafka/pull/18295/commits/b381c07a595ad616aca2579fb34660f48ea60333
   
   Thanks for catching that.



##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -1107,8 +1107,10 @@ private[kafka] class Processor(
 val header = RequestHeader.parse(buffer)
 if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) {
   header
-} else {
+} else if (header.isApiVersionDeprecated()) {

Review Comment:
   No, I added a commit at the end that changed this incorrectly:
   
   
https://github.com/apache/kafka/pull/18295/commits/b381c07a595ad616aca2579fb34660f48ea60333
   
   Thanks for catching that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18339: Remove raw unversioned direct SASL protocol (KIP-896) [kafka]

2024-12-28 Thread via GitHub


ijuma commented on code in PR #18295:
URL: https://github.com/apache/kafka/pull/18295#discussion_r1898949056


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -1107,8 +1107,10 @@ private[kafka] class Processor(
 val header = RequestHeader.parse(buffer)
 if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) {
   header
-} else {
+} else if (header.isApiVersionDeprecated()) {
   throw new InvalidRequestException(s"Received request api key 
${header.apiKey} with version ${header.apiVersion} which is not enabled")
+} else {
+  throw new UnsupportedVersionException(s"Received request api key 
${header.apiKey} with version ${header.apiVersion} which is not supported")

Review Comment:
   No, I added a commit at the end that changed this incorrectly:
   
   
https://github.com/apache/kafka/commit/b381c07a595ad616aca2579fb34660f48ea60333
   
   Thanks for catching that.



##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -1107,8 +1107,10 @@ private[kafka] class Processor(
 val header = RequestHeader.parse(buffer)
 if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) {
   header
-} else {
+} else if (header.isApiVersionDeprecated()) {

Review Comment:
   No, I added a commit at the end that changed this incorrectly:
   
   
https://github.com/apache/kafka/pull/18295/commits/b381c07a595ad616aca2579fb34660f48ea60333
   
   Thanks for catching that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18339: Remove raw unversioned direct SASL protocol (KIP-896) [kafka]

2024-12-28 Thread via GitHub


ijuma commented on code in PR #18295:
URL: https://github.com/apache/kafka/pull/18295#discussion_r1898949237


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -1107,8 +1107,10 @@ private[kafka] class Processor(
 val header = RequestHeader.parse(buffer)
 if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) {
   header
-} else {
+} else if (header.isApiVersionDeprecated()) {

Review Comment:
   I don't think we need that, see the response to your other comment. Does 
that make sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16339: [Docs] Add migrating from transform to process [kafka]

2024-12-28 Thread via GitHub


fonsdant commented on PR #18314:
URL: https://github.com/apache/kafka/pull/18314#issuecomment-2564397291

   @mjsax, can I format the code? Some blocks are not indented correctly. I 
understand it might pollute the diff in code changes, so if I may, I would do 
it as a final commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18272: Deprecated protocol api usage should be logged at info level (3.9) [kafka]

2024-12-28 Thread via GitHub


ijuma commented on PR #18333:
URL: https://github.com/apache/kafka/pull/18333#issuecomment-2564382983

   Test failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18272: Deprecated protocol api usage should be logged at info level (3.9) [kafka]

2024-12-28 Thread via GitHub


ijuma merged PR #18333:
URL: https://github.com/apache/kafka/pull/18333


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9366: Upgrade log4j to log4j2 [kafka]

2024-12-28 Thread via GitHub


ijuma commented on code in PR #17373:
URL: https://github.com/apache/kafka/pull/17373#discussion_r1898950768


##
build.gradle:
##
@@ -1099,15 +1103,17 @@ project(':core') {
   implementation libs.dropwizardMetrics
   exclude module: 'slf4j-log4j12'
   exclude module: 'log4j'
-  // Both Kafka and Zookeeper use slf4j. ZooKeeper moved from log4j to 
logback in v3.8.0, but Kafka relies on reload4j.
+  // Both Kafka and Zookeeper use slf4j. ZooKeeper moved from log4j to 
logback in v3.8.0.
   // We are removing Zookeeper's dependency on logback so we have a 
singular logging backend.
   exclude module: 'logback-classic'
   exclude module: 'logback-core'
 }
 // ZooKeeperMain depends on commons-cli but declares the dependency as 
`provided`
 implementation libs.commonsCli
-
-compileOnly libs.reload4j
+implementation libs.log4j2Core
+implementation libs.log4j2Api
+implementation libs.log4j1Bridge2Api
+implementation libs.jacksonDatabindYaml

Review Comment:
   @ppkarwasz It is true that we previously tried hard to avoid making the 
logging choice for the Maven artifact while making it for the distributed 
binaries. However, this is brittle and only worked partially. When I tried to 
fix it in #12148, it caused problems and it was partially reverted (#16260, 
#16559).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18356 Explicitly setting up instrumentation for inline mocking (Java 21+) [kafka]

2024-12-28 Thread via GitHub


m1a2st commented on code in PR #18339:
URL: https://github.com/apache/kafka/pull/18339#discussion_r1898951286


##
build.gradle:
##
@@ -1026,6 +1037,12 @@ project(':share') {
 archivesName = "kafka-share"
   }
 
+  configurations {
+mockitoAgent {
+  transitive = false
+}
+  }

Review Comment:
The downside is these projects need to implement `libs.mockitoCore` 
dependency which it doesn't need, and the following projects doesn't need it 
`':test-common:test-common-runtime'`, 
`':group-coordinator:group-coordinator-api'`, `':examples'`, `':generator'`, 
`':tools:tools-api'`, `':shell'`, `':streams:streams-scala'`, 
`':streams:examples'`, `':connect:transforms'`, `':connect:json'`, 
`':connect:mirror-client'`,  `':connect:test-plugins'`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9366: Upgrade log4j to log4j2 [kafka]

2024-12-28 Thread via GitHub


ijuma commented on code in PR #17373:
URL: https://github.com/apache/kafka/pull/17373#discussion_r1898950768


##
build.gradle:
##
@@ -1099,15 +1103,17 @@ project(':core') {
   implementation libs.dropwizardMetrics
   exclude module: 'slf4j-log4j12'
   exclude module: 'log4j'
-  // Both Kafka and Zookeeper use slf4j. ZooKeeper moved from log4j to 
logback in v3.8.0, but Kafka relies on reload4j.
+  // Both Kafka and Zookeeper use slf4j. ZooKeeper moved from log4j to 
logback in v3.8.0.
   // We are removing Zookeeper's dependency on logback so we have a 
singular logging backend.
   exclude module: 'logback-classic'
   exclude module: 'logback-core'
 }
 // ZooKeeperMain depends on commons-cli but declares the dependency as 
`provided`
 implementation libs.commonsCli
-
-compileOnly libs.reload4j
+implementation libs.log4j2Core
+implementation libs.log4j2Api
+implementation libs.log4j1Bridge2Api
+implementation libs.jacksonDatabindYaml

Review Comment:
   @ppkarwasz It is true that we previously tried hard to avoid making the 
logging choice for the Maven artifact while making it for the distributed 
binaries. However, this is brittle and only worked partially. When I tried to 
fix it in #12148, it caused problems and it was partially reverted (#16260, 
#16559).
   
   Also, it's actually bad to silently not support the dynamic logging 
functionality for the broker (this is _incredibly_ useful in production).
   
   So, I think the simplest thing is to make the logging choice explicit for 
the server modules (the rare user who doesn't want that can still override it 
with exclusions via their build file) and leave it up to the applications for 
the client modules. In the future, if there is a way to address these issues, 
we can change it again. There are two promising and complementary paths:
   
   1. Your logging admin library.
   2. slf4j2 makes it possible to choose the logging library dynamically 
instead of via classpath tricks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18356 Explicitly setting up instrumentation for inline mocking (Java 21+) [kafka]

2024-12-28 Thread via GitHub


ijuma commented on code in PR #18339:
URL: https://github.com/apache/kafka/pull/18339#discussion_r1898949548


##
build.gradle:
##
@@ -1026,6 +1037,12 @@ project(':share') {
 archivesName = "kafka-share"
   }
 
+  configurations {
+mockitoAgent {
+  transitive = false
+}
+  }

Review Comment:
   What's the downside of enabling it for those projects and how many such 
projects exist?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18356 Explicitly setting up instrumentation for inline mocking (Java 21+) [kafka]

2024-12-28 Thread via GitHub


ijuma commented on code in PR #18339:
URL: https://github.com/apache/kafka/pull/18339#discussion_r1898951583


##
build.gradle:
##
@@ -1026,6 +1037,12 @@ project(':share') {
 archivesName = "kafka-share"
   }
 
+  configurations {
+mockitoAgent {
+  transitive = false
+}
+  }

Review Comment:
   Could we enable it automatically if the mockitCore dependency is added to a 
project?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18339: Remove raw unversioned direct SASL protocol (KIP-896) [kafka]

2024-12-28 Thread via GitHub


chia7712 commented on code in PR #18295:
URL: https://github.com/apache/kafka/pull/18295#discussion_r1898951531


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -1107,8 +1107,10 @@ private[kafka] class Processor(
 val header = RequestHeader.parse(buffer)
 if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) {
   header
-} else {
+} else if (header.isApiVersionDeprecated()) {

Review Comment:
   > Does that make sense?
   
   yes, that makes sense :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-18358: Replace Deprecated $buildDir variable in build.gradle [kafka]

2024-12-28 Thread via GitHub


m1a2st opened a new pull request, #18343:
URL: https://github.com/apache/kafka/pull/18343

   Jira: https://issues.apache.org/jira/browse/KAFKA-18358
   
   We should replace `$buildDir` to `${layout.buildDirectory.get().asFile}`, 
because `$buildDir` is deprecated. see the [gradle 
document](https://docs.gradle.org/current/userguide/upgrading_version_8.html#project_builddir)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-18359) ix to Kraft or remove tests associate with Zk Broker config in LocalLeaderEndPointTest

2024-12-28 Thread PoAn Yang (Jira)
PoAn Yang created KAFKA-18359:
-

 Summary: ix to Kraft or remove tests associate with Zk Broker 
config in LocalLeaderEndPointTest
 Key: KAFKA-18359
 URL: https://issues.apache.org/jira/browse/KAFKA-18359
 Project: Kafka
  Issue Type: Sub-task
Reporter: PoAn Yang
Assignee: PoAn Yang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-9366: Upgrade log4j to log4j2 [kafka]

2024-12-28 Thread via GitHub


chia7712 commented on code in PR #17373:
URL: https://github.com/apache/kafka/pull/17373#discussion_r1899049780


##
build.gradle:
##
@@ -1099,15 +1103,17 @@ project(':core') {
   implementation libs.dropwizardMetrics
   exclude module: 'slf4j-log4j12'
   exclude module: 'log4j'
-  // Both Kafka and Zookeeper use slf4j. ZooKeeper moved from log4j to 
logback in v3.8.0, but Kafka relies on reload4j.
+  // Both Kafka and Zookeeper use slf4j. ZooKeeper moved from log4j to 
logback in v3.8.0.
   // We are removing Zookeeper's dependency on logback so we have a 
singular logging backend.
   exclude module: 'logback-classic'
   exclude module: 'logback-core'
 }
 // ZooKeeperMain depends on commons-cli but declares the dependency as 
`provided`
 implementation libs.commonsCli
-
-compileOnly libs.reload4j
+implementation libs.log4j2Core
+implementation libs.log4j2Api
+implementation libs.log4j1Bridge2Api
+implementation libs.jacksonDatabindYaml

Review Comment:
   > In the meantime I can make a PR for Kafka, so that Log4jController fails 
softly if Log4j Core is not present.
   
   Yes, it would be great to display accurate error messages!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-18359) Fix to Kraft or remove tests associate with Zk Broker config in LocalLeaderEndPointTest, HighwatermarkPersistenceTest, IsrExpirationTest, ReplicaManagerQuotasTest, Offse

2024-12-28 Thread PoAn Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PoAn Yang updated KAFKA-18359:
--
Summary: Fix to Kraft or remove tests associate with Zk Broker config in 
LocalLeaderEndPointTest, HighwatermarkPersistenceTest, IsrExpirationTest, 
ReplicaManagerQuotasTest, OffsetsForLeaderEpochTest  (was: ix to Kraft or 
remove tests associate with Zk Broker config in LocalLeaderEndPointTest, 
HighwatermarkPersistenceTest, IsrExpirationTest, ReplicaManagerQuotasTest, 
OffsetsForLeaderEpochTest)

> Fix to Kraft or remove tests associate with Zk Broker config in 
> LocalLeaderEndPointTest, HighwatermarkPersistenceTest, IsrExpirationTest, 
> ReplicaManagerQuotasTest, OffsetsForLeaderEpochTest
> -
>
> Key: KAFKA-18359
> URL: https://issues.apache.org/jira/browse/KAFKA-18359
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-18359) ix to Kraft or remove tests associate with Zk Broker config in LocalLeaderEndPointTest, HighwatermarkPersistenceTest, IsrExpirationTest, ReplicaManagerQuotasTest, Offset

2024-12-28 Thread PoAn Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PoAn Yang updated KAFKA-18359:
--
Summary: ix to Kraft or remove tests associate with Zk Broker config in 
LocalLeaderEndPointTest, HighwatermarkPersistenceTest, IsrExpirationTest, 
ReplicaManagerQuotasTest, OffsetsForLeaderEpochTest  (was: ix to Kraft or 
remove tests associate with Zk Broker config in LocalLeaderEndPointTest)

> ix to Kraft or remove tests associate with Zk Broker config in 
> LocalLeaderEndPointTest, HighwatermarkPersistenceTest, IsrExpirationTest, 
> ReplicaManagerQuotasTest, OffsetsForLeaderEpochTest
> 
>
> Key: KAFKA-18359
> URL: https://issues.apache.org/jira/browse/KAFKA-18359
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-18360) Remove config in ZkConfigs

2024-12-28 Thread PoAn Yang (Jira)
PoAn Yang created KAFKA-18360:
-

 Summary: Remove config in ZkConfigs
 Key: KAFKA-18360
 URL: https://issues.apache.org/jira/browse/KAFKA-18360
 Project: Kafka
  Issue Type: Sub-task
Reporter: PoAn Yang
Assignee: PoAn Yang


Remove all config in 
[https://github.com/apache/kafka/blob/trunk/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17642: PreVote response handling and ProspectiveState [kafka]

2024-12-28 Thread via GitHub


ahuang98 commented on code in PR #18240:
URL: https://github.com/apache/kafka/pull/18240#discussion_r1899071006


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -926,29 +978,18 @@ private boolean handleVoteResponse(
 if (quorum.isLeader()) {
 logger.debug("Ignoring vote response {} since we already 
became leader for epoch {}",
 partitionResponse, quorum.epoch());
-} else if (quorum.isCandidate()) {
-CandidateState state = quorum.candidateStateOrThrow();
+} else if (quorum.isNomineeState()) {
+NomineeState state = quorum.nomineeStateOrThrow();
 if (partitionResponse.voteGranted()) {
 state.recordGrantedVote(remoteNodeId);
-maybeTransitionToLeader(state, currentTimeMs);
+maybeTransitionForward(state, currentTimeMs);
 } else {
 state.recordRejectedVote(remoteNodeId);
-
-// If our vote is rejected, we go immediately to the 
random backoff. This
-// ensures that we are not stuck waiting for the election 
timeout when the
-// vote has become gridlocked.
-if (state.isVoteRejected() && !state.isBackingOff()) {
-logger.info("Insufficient remaining votes to become 
leader (rejected by {}). " +
-"We will backoff before retrying election again", 
state.rejectingVoters());
-
-state.startBackingOff(
-currentTimeMs,
-binaryExponentialElectionBackoffMs(state.retries())
-);
-}
+maybeCandidateStartBackingOff(currentTimeMs);

Review Comment:
   currently, pollProspective handles that logic - it checks if the vote has 
been rejected and if so transitions to unattached or follower. it should be 
okay to move that logic into the vote response handling 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Enhance performance of ConsumerRecords by refactoring iterator initialization and iteration logic [kafka]

2024-12-28 Thread via GitHub


frankvicky closed pull request #16494: MINOR: Enhance performance of 
ConsumerRecords by refactoring iterator initialization and iteration logic
URL: https://github.com/apache/kafka/pull/16494


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17642: PreVote response handling and ProspectiveState [kafka]

2024-12-28 Thread via GitHub


ahuang98 commented on code in PR #18240:
URL: https://github.com/apache/kafka/pull/18240#discussion_r1899072269


##
raft/src/main/java/org/apache/kafka/raft/ProspectiveState.java:
##
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.raft.internals.EpochElection;
+
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static 
org.apache.kafka.raft.QuorumState.unattachedOrProspectiveCanGrantVote;
+
+public class ProspectiveState implements NomineeState {
+private final int localId;
+private final int epoch;
+private final OptionalInt leaderId;
+private final Optional leaderEndpoints;
+private final Optional votedKey;
+private final VoterSet voters;
+private final EpochElection epochElection;
+private final Optional highWatermark;
+private final int retries;
+private final long electionTimeoutMs;
+private final Timer electionTimer;
+private final Logger log;
+
+/**
+ * The lifetime of a prospective state is the following.
+ *
+ * 1. Once started, it will send prevote requests and keep record of the 
received vote responses
+ * 2. If it receives a message denoting a leader with a higher epoch, it 
will transition to follower state.
+ * 3. If majority votes granted, it will transition to candidate state.
+ * 4. If majority votes rejected or election times out, it will transition 
to unattached or follower state
+ *depending on if it knows the leader id and endpoints or not
+ */
+public ProspectiveState(
+Time time,
+int localId,
+int epoch,
+OptionalInt leaderId,
+Optional leaderEndpoints,
+Optional votedKey,
+VoterSet voters,
+Optional highWatermark,
+int retries,
+int electionTimeoutMs,
+LogContext logContext
+) {
+this.localId = localId;
+this.epoch = epoch;
+this.leaderId = leaderId;
+this.leaderEndpoints = leaderEndpoints;
+this.votedKey = votedKey;
+this.voters = voters;
+this.highWatermark = highWatermark;
+this.retries = retries;
+this.electionTimeoutMs = electionTimeoutMs;
+this.electionTimer = time.timer(electionTimeoutMs);
+this.log = logContext.logger(ProspectiveState.class);
+
+this.epochElection = new EpochElection(voters.voterKeys());
+epochElection.recordVote(localId, true);
+}
+
+public int localId() {
+return localId;
+}
+
+public Optional votedKey() {
+return votedKey;
+}
+
+@Override
+public EpochElection epochElection() {
+return epochElection;
+}
+
+public int retries() {
+return retries;
+}

Review Comment:
   we can use this Jira to track - 
https://issues.apache.org/jira/browse/KAFKA-18345



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17642: PreVote response handling and ProspectiveState [kafka]

2024-12-28 Thread via GitHub


ahuang98 commented on code in PR #18240:
URL: https://github.com/apache/kafka/pull/18240#discussion_r1899071006


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -926,29 +978,18 @@ private boolean handleVoteResponse(
 if (quorum.isLeader()) {
 logger.debug("Ignoring vote response {} since we already 
became leader for epoch {}",
 partitionResponse, quorum.epoch());
-} else if (quorum.isCandidate()) {
-CandidateState state = quorum.candidateStateOrThrow();
+} else if (quorum.isNomineeState()) {
+NomineeState state = quorum.nomineeStateOrThrow();
 if (partitionResponse.voteGranted()) {
 state.recordGrantedVote(remoteNodeId);
-maybeTransitionToLeader(state, currentTimeMs);
+maybeTransitionForward(state, currentTimeMs);
 } else {
 state.recordRejectedVote(remoteNodeId);
-
-// If our vote is rejected, we go immediately to the 
random backoff. This
-// ensures that we are not stuck waiting for the election 
timeout when the
-// vote has become gridlocked.
-if (state.isVoteRejected() && !state.isBackingOff()) {
-logger.info("Insufficient remaining votes to become 
leader (rejected by {}). " +
-"We will backoff before retrying election again", 
state.rejectingVoters());
-
-state.startBackingOff(
-currentTimeMs,
-binaryExponentialElectionBackoffMs(state.retries())
-);
-}
+maybeCandidateStartBackingOff(currentTimeMs);

Review Comment:
   currently, pollProspective handles that logic - it checks if the vote has 
been rejected and if so transitions to unattached or follower. it should be 
okay to move that logic into the vote response handling (just adds to the 
method's complexity)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17642: PreVote response handling and ProspectiveState [kafka]

2024-12-28 Thread via GitHub


ahuang98 commented on code in PR #18240:
URL: https://github.com/apache/kafka/pull/18240#discussion_r1899074255


##
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##
@@ -402,58 +417,100 @@ public void transitionToUnattached(int epoch) {
 }
 
 /**
- * Grant a vote to a candidate. We will transition/remain in Unattached
- * state until either the election timeout expires or a leader is elected. 
In particular,
- * we do not begin fetching until the election has concluded and
- * {@link #transitionToFollower(int, int, Endpoints)} is invoked.
+ * Grant a vote to a candidate as Unattached. We will transition to 
Unattached with votedKey
+ * state and remain there until either the election timeout expires or we 
discover the leader.
  */
-public void transitionToUnattachedVotedState(
+public void unattachedAddVotedState(
 int epoch,
 ReplicaKey candidateKey
 ) {
 int currentEpoch = state.epoch();
 if (localId.isPresent() && candidateKey.id() == localId.getAsInt()) {
 throw new IllegalStateException(
 String.format(
-"Cannot transition to Voted for %s and epoch %d since it 
matches the local " +
+"Cannot add voted key (%s) to current state (%s) in epoch 
%d since it matches the local " +
 "broker.id",
 candidateKey,
+state,
 epoch
 )
 );
 } else if (localId.isEmpty()) {
-throw new IllegalStateException("Cannot transition to voted 
without a replica id");
-} else if (epoch < currentEpoch) {
+throw new IllegalStateException("Cannot add voted state without a 
replica id");
+} else if (epoch != currentEpoch || !isUnattachedNotVoted()) {

Review Comment:
   I could also remove `epoch` as one of the params (method can use 
state.currentEpoch instead of parameter value). But I would prefer to keep 
`epoch` as a param so we can validate the method is being used correctly 
(without any unintentional epoch change).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: log4j2 upgrade follow-up [kafka]

2024-12-28 Thread via GitHub


frankvicky commented on PR #18290:
URL: https://github.com/apache/kafka/pull/18290#issuecomment-2564633215

   Hi @chia7712 
   I have tested log4j compatibility on my local machine.
   
   Explicit set `KAFKA_LOG4J_OPTS`:
   ![Screenshot from 2024-12-29 
14-51-13](https://github.com/user-attachments/assets/af8539c3-b0b5-4bfe-9e75-60590043f4d9)
   
   Leave `KAFKA_LOG4J_OPTS` as null:
   
![image](https://github.com/user-attachments/assets/e08608bd-dc5f-4462-a09d-231c29f52ec9)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17642: PreVote response handling and ProspectiveState [kafka]

2024-12-28 Thread via GitHub


ahuang98 commented on code in PR #18240:
URL: https://github.com/apache/kafka/pull/18240#discussion_r1899074463


##
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##
@@ -402,58 +417,100 @@ public void transitionToUnattached(int epoch) {
 }
 
 /**
- * Grant a vote to a candidate. We will transition/remain in Unattached
- * state until either the election timeout expires or a leader is elected. 
In particular,
- * we do not begin fetching until the election has concluded and
- * {@link #transitionToFollower(int, int, Endpoints)} is invoked.
+ * Grant a vote to a candidate as Unattached. We will transition to 
Unattached with votedKey
+ * state and remain there until either the election timeout expires or we 
discover the leader.
  */
-public void transitionToUnattachedVotedState(
+public void unattachedAddVotedState(
 int epoch,
 ReplicaKey candidateKey
 ) {
 int currentEpoch = state.epoch();
 if (localId.isPresent() && candidateKey.id() == localId.getAsInt()) {
 throw new IllegalStateException(
 String.format(
-"Cannot transition to Voted for %s and epoch %d since it 
matches the local " +
+"Cannot add voted key (%s) to current state (%s) in epoch 
%d since it matches the local " +
 "broker.id",
 candidateKey,
+state,
 epoch
 )
 );
 } else if (localId.isEmpty()) {
-throw new IllegalStateException("Cannot transition to voted 
without a replica id");
-} else if (epoch < currentEpoch) {
+throw new IllegalStateException("Cannot add voted state without a 
replica id");
+} else if (epoch != currentEpoch || !isUnattachedNotVoted()) {
 throw new IllegalStateException(
 String.format(
-"Cannot transition to Voted for %s and epoch %d since the 
current epoch " +
-"(%d) is larger",
+"Cannot add voted key (%s) to current state (%s) in epoch 
%d",
 candidateKey,
-epoch,
-currentEpoch
+state,
+epoch
 )
 );
-} else if (epoch == currentEpoch && !isUnattachedNotVoted()) {
+}
+
+// Note that we reset the election timeout after voting for a 
candidate because we
+// know that the candidate has at least as good of a chance of getting 
elected as us
+durableTransitionTo(
+new UnattachedState(
+time,
+epoch,
+state.election().optionalLeaderId(),
+Optional.of(candidateKey),
+partitionState.lastVoterSet().voterIds(),
+state.highWatermark(),
+randomElectionTimeoutMs(),
+logContext
+)
+);
+log.debug("Voted for candidate {} in epoch {}", candidateKey, epoch);
+}
+
+/**
+ * Grant a vote to a candidate as Prospective. We will transition to 
Prospective with votedKey
+ * state and remain there until either the election timeout expires or we 
discover the leader.
+ */
+public void prospectiveAddVotedState(
+int epoch,
+ReplicaKey candidateKey
+) {
+int currentEpoch = state.epoch();
+if (localId.isPresent() && candidateKey.id() == localId.getAsInt()) {
 throw new IllegalStateException(
 String.format(
-"Cannot transition to Voted for %s and epoch %d from the 
current state (%s)",
+"Cannot add voted key (%s) to current state (%s) in epoch 
%d since it matches the local " +
+"broker.id",
 candidateKey,
-epoch,
-state
+state,
+epoch
+)
+);
+} else if (localId.isEmpty()) {
+throw new IllegalStateException("Cannot add voted state without a 
replica id");
+} else if (epoch != currentEpoch || !isProspectiveNotVoted()) {

Review Comment:
   same as above, this is meant to be called only to transition from 
prospectiveNotVoted in epoch X to prospectiveVoted in epoch X



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17642: PreVote response handling and ProspectiveState [kafka]

2024-12-28 Thread via GitHub


ahuang98 commented on code in PR #18240:
URL: https://github.com/apache/kafka/pull/18240#discussion_r1899076157


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -957,13 +998,33 @@ private boolean handleVoteResponse(
 }
 }
 
+private void maybeCandidateStartBackingOff(long currentTimeMs) {

Review Comment:
   this is called within the third level of a conditional statement, adding 
this back violates checkstyle's cyclomatic complexity check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17642: PreVote response handling and ProspectiveState [kafka]

2024-12-28 Thread via GitHub


ahuang98 commented on code in PR #18240:
URL: https://github.com/apache/kafka/pull/18240#discussion_r1899077542


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -957,13 +998,33 @@ private boolean handleVoteResponse(
 }
 }
 
+private void maybeCandidateStartBackingOff(long currentTimeMs) {

Review Comment:
   for now, I've kept the helper but increased its scope to handle the case 
when prospective loses the election. I've renamed it as 
`maybeHandleElectionLoss` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-18356 Explicitly setting up instrumentation for inline mocking (Java 21+) [kafka]

2024-12-28 Thread via GitHub


m1a2st opened a new pull request, #18339:
URL: https://github.com/apache/kafka/pull/18339

   Jira: https://issues.apache.org/jira/browse/KAFKA-18356
   
   Starting from Java 21, the [JDK restricts the ability of libraries to attach 
a Java agent to their own JVM](https://openjdk.org/jeps/451). As a result, the 
inline-mock-maker might not be able to function without an explicit setup to 
enable instrumentation, and the JVM will always display a warning.
   
   The mockito 
[document](https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#mockito-instrumentation)
   
   Test in my local
   trunk: 
   ![CleanShot 2024-12-28 at 22 01 
17@2x](https://github.com/user-attachments/assets/3a15dd62-f79e-4fec-9810-53f37667da1b)
   
   this branch:
   ![CleanShot 2024-12-28 at 22 00 
15@2x](https://github.com/user-attachments/assets/0d352435-f907-4f79-9be8-1c54db6b9d08)
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: log4j2 upgrade follow-up [kafka]

2024-12-28 Thread via GitHub


FrankYang0529 commented on code in PR #18290:
URL: https://github.com/apache/kafka/pull/18290#discussion_r1898913342


##
bin/kafka-run-class.sh:
##
@@ -225,6 +225,18 @@ if [ -z "$KAFKA_LOG4J_OPTS" ]; then
   (( WINDOWS_OS_FORMAT )) && LOG4J_DIR=$(cygpath --path --mixed "${LOG4J_DIR}")
   KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_DIR}"
 else
+  if echo "$KAFKA_LOG4J_OPTS" | grep -qE "log4j\.[^[:space:]]+$"; then
+  # Extract Log4j 1.x configuration file path from KAFKA_LOG4J_OPTS
+  LOG4J1_CONFIG=$(echo "$KAFKA_LOG4J_OPTS" | grep -o 
'log4j\.configuration=\S*' | cut -d'=' -f2)
+
+  # Enable Log4j 1.x configuration compatibility mode for Log4j 2
+  export LOG4J_COMPATIBILITY=true

Review Comment:
   From document, set one of confituration from `log4j1.compatibility` and 
`log4j.configuration` if needing to install log4j1 to log4j2 bridge. It looks 
like we can set `LOG4J_COMPATIBILITY=true` only if `LOG4J1_CONFIG` has 
properties file. Also, setting `LOG4J_CONFIGURATION_FILE="$LOG4J1_CONFIG"` only 
if `LOG4J1_CONFIG` doesn't have properties file.
   
   
https://logging.apache.org/log4j/2.x/migrate-from-log4j1.html#ConfigurationCompatibility



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18135: ShareConsumer HB UnsupportedVersion msg mixed with Consumer HB [kafka]

2024-12-28 Thread via GitHub


AndrewJSchofield commented on PR #18101:
URL: https://github.com/apache/kafka/pull/18101#issuecomment-2564364250

   Failing test is known as flaky and tracked by 
https://issues.apache.org/jira/browse/KAFKA-18298.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18356 Explicitly setting up instrumentation for inline mocking (Java 21+) [kafka]

2024-12-28 Thread via GitHub


ijuma commented on code in PR #18339:
URL: https://github.com/apache/kafka/pull/18339#discussion_r1898951583


##
build.gradle:
##
@@ -1026,6 +1037,12 @@ project(':share') {
 archivesName = "kafka-share"
   }
 
+  configurations {
+mockitoAgent {
+  transitive = false
+}
+  }

Review Comment:
   Could we enable it automatically if the `mockitoCore` dependency is added to 
a project?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18356 Explicitly setting up instrumentation for inline mocking (Java 21+) [kafka]

2024-12-28 Thread via GitHub


m1a2st commented on code in PR #18339:
URL: https://github.com/apache/kafka/pull/18339#discussion_r1898951947


##
build.gradle:
##
@@ -1026,6 +1037,12 @@ project(':share') {
 archivesName = "kafka-share"
   }
 
+  configurations {
+mockitoAgent {
+  transitive = false
+}
+  }

Review Comment:
   It's a good idea, I will try for it :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-18357) Support to dynamically configure the log level for JUL and logback

2024-12-28 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-18357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17908624#comment-17908624
 ] 

Ismael Juma commented on KAFKA-18357:
-

I don't think we should implement this ourselves. If there is a project that 
tackles that, we can use it. But it's not something that we should have to 
maintain, in my opinion. As far as I know, it hasn't been requested by users at 
all so far.

> Support to dynamically configure the log level for JUL and logback
> --
>
> Key: KAFKA-18357
> URL: https://issues.apache.org/jira/browse/KAFKA-18357
> Project: Kafka
>  Issue Type: Task
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> We currently support dynamic log level configuration only when the server is 
> run with Log4j, which is our official logging implementation by default. 
> However, it would be beneficial to support other popular SLF4J providers to 
> further consolidate Kafka's logging functionality.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-9366: Upgrade log4j to log4j2 [kafka]

2024-12-28 Thread via GitHub


ppkarwasz commented on code in PR #17373:
URL: https://github.com/apache/kafka/pull/17373#discussion_r1898983229


##
build.gradle:
##
@@ -1099,15 +1103,17 @@ project(':core') {
   implementation libs.dropwizardMetrics
   exclude module: 'slf4j-log4j12'
   exclude module: 'log4j'
-  // Both Kafka and Zookeeper use slf4j. ZooKeeper moved from log4j to 
logback in v3.8.0, but Kafka relies on reload4j.
+  // Both Kafka and Zookeeper use slf4j. ZooKeeper moved from log4j to 
logback in v3.8.0.
   // We are removing Zookeeper's dependency on logback so we have a 
singular logging backend.
   exclude module: 'logback-classic'
   exclude module: 'logback-core'
 }
 // ZooKeeperMain depends on commons-cli but declares the dependency as 
`provided`
 implementation libs.commonsCli
-
-compileOnly libs.reload4j
+implementation libs.log4j2Core
+implementation libs.log4j2Api
+implementation libs.log4j1Bridge2Api
+implementation libs.jacksonDatabindYaml

Review Comment:
   > 1. Your logging admin library.
   
   I pushed the draft to Apache Logging 
([`apache/logging-admin`](https://github.com/apache/logging-admin)) and I'll 
start to actively work on it. Probably you can expect a release by end of 
January/February.
   
   In the meantime I can make a PR for Kafka, so that `Log4jController` fails 
softly if Log4j Core is not present.
   
   > 2. slf4j2 makes it possible to choose the logging library dynamically 
instead of via classpath tricks.
   
   Note that choosing the SLF4J implementation does not really tell you which 
logging implementation is being used: except Logback, all the other SLF4J 
implementation are bridges between logging APIs. If you use `slf4j-jdk14` you 
don't know which JUL implementation is being used and if you use 
`log4j-slf4j2-impl` you don't know which Log4j API implementation is being used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-18358) Replace Deprecated $buildDir variable in build.gradle

2024-12-28 Thread Jira
黃竣陽 created KAFKA-18358:
---

 Summary: Replace Deprecated $buildDir variable in build.gradle
 Key: KAFKA-18358
 URL: https://issues.apache.org/jira/browse/KAFKA-18358
 Project: Kafka
  Issue Type: Improvement
Reporter: 黃竣陽
Assignee: 黃竣陽


The $buildDir variable is deprecated in build.gradle, we can replace it to `
layout.buildDirectory.get().asFile`, see the gradle upgrade document 
https://docs.gradle.org/current/userguide/upgrading_version_8.html#project_builddir



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [WIP] KAFKA-18082: Add 0.11, 1.0, 1.1 and 2.0 to consumer upgrade e2e [kafka]

2024-12-28 Thread via GitHub


chia7712 commented on PR #18193:
URL: https://github.com/apache/kafka/pull/18193#issuecomment-2564598755

   @mjsax Apologies for bringing up this issue again. I believe that having 
clear and consistent rolling-upgrade rules can encourage users to adopt version 
4.0, which is why I am persistent about this issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9941;WorkerSinkTask:When a record triggers a RetriableException and the retry is processed successfully, its offset does not commit. [kafka]

2024-12-28 Thread via GitHub


github-actions[bot] commented on PR #9167:
URL: https://github.com/apache/kafka/pull/9167#issuecomment-2564600947

   This PR has been closed since it has not had any activity in 120 days. If 
you feel like this
   was a mistake, or you would like to continue working on it, please feel free 
to re-open the 
   PR and ask for a review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18277: fix e2e network_degrade_test [kafka]

2024-12-28 Thread via GitHub


github-actions[bot] commented on PR #18247:
URL: https://github.com/apache/kafka/pull/18247#issuecomment-2564600944

   A label of 'needs-attention' was automatically added to this PR in order to 
raise the
   attention of the committers. Once this issue has been triaged, the `triage` 
label
   should be removed to prevent this automation from happening again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9941;WorkerSinkTask:When a record triggers a RetriableException and the retry is processed successfully, its offset does not commit. [kafka]

2024-12-28 Thread via GitHub


github-actions[bot] closed pull request #9167: KAFKA-9941;WorkerSinkTask:When a 
record triggers a RetriableException and the retry is processed successfully, 
its offset does not commit.
URL: https://github.com/apache/kafka/pull/9167


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10385 - Remove no print stat when on detailed stat mode on Consumer Perf [kafka]

2024-12-28 Thread via GitHub


github-actions[bot] closed pull request #9161: KAFKA-10385 - Remove no print 
stat when on detailed stat mode on Consumer Perf
URL: https://github.com/apache/kafka/pull/9161


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18336: Improve jmh tests on ACL in AuthorizerBenchmark and StandardAuthorizerUpdateBenchmark [kafka]

2024-12-28 Thread via GitHub


github-actions[bot] commented on PR #18293:
URL: https://github.com/apache/kafka/pull/18293#issuecomment-2564600935

   A label of 'needs-attention' was automatically added to this PR in order to 
raise the
   attention of the committers. Once this issue has been triaged, the `triage` 
label
   should be removed to prevent this automation from happening again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10385 - Remove no print stat when on detailed stat mode on Consumer Perf [kafka]

2024-12-28 Thread via GitHub


github-actions[bot] commented on PR #9161:
URL: https://github.com/apache/kafka/pull/9161#issuecomment-2564600936

   This PR has been closed since it has not had any activity in 120 days. If 
you feel like this
   was a mistake, or you would like to continue working on it, please feel free 
to re-open the 
   PR and ask for a review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-18359: Set zkConnect to null in LocalLeaderEndPointTest, HighwatermarkPersistenceTest, IsrExpirationTest, ReplicaManagerQuotasTest, OffsetsForLeaderEpochTest [kafka]

2024-12-28 Thread via GitHub


FrankYang0529 opened a new pull request, #18344:
URL: https://github.com/apache/kafka/pull/18344

   Set zkConnect to `null` in `TestUtils.createBrokerConfigs`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-18359) Set zkConnect to null in LocalLeaderEndPointTest, HighwatermarkPersistenceTest, IsrExpirationTest, ReplicaManagerQuotasTest, OffsetsForLeaderEpochTest

2024-12-28 Thread PoAn Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PoAn Yang updated KAFKA-18359:
--
Summary: Set zkConnect to null in LocalLeaderEndPointTest, 
HighwatermarkPersistenceTest, IsrExpirationTest, ReplicaManagerQuotasTest, 
OffsetsForLeaderEpochTest  (was: Fix to Kraft or remove tests associate with Zk 
Broker config in LocalLeaderEndPointTest, HighwatermarkPersistenceTest, 
IsrExpirationTest, ReplicaManagerQuotasTest, OffsetsForLeaderEpochTest)

> Set zkConnect to null in LocalLeaderEndPointTest, 
> HighwatermarkPersistenceTest, IsrExpirationTest, ReplicaManagerQuotasTest, 
> OffsetsForLeaderEpochTest
> --
>
> Key: KAFKA-18359
> URL: https://issues.apache.org/jira/browse/KAFKA-18359
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18315: Fix to Kraft or remove tests associate with Zk Broker config in DynamicBrokerConfigTest, ReplicaManagerTest, DescribeTopicPartitionsRequestHandlerTest, KafkaConfigTest [kafka]

2024-12-28 Thread via GitHub


FrankYang0529 commented on code in PR #18269:
URL: https://github.com/apache/kafka/pull/18269#discussion_r1899068553


##
core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala:
##
@@ -450,54 +450,52 @@ class DynamicBrokerConfigTest {
   }
 
   @Test
-  def testPasswordConfigEncryption(): Unit = {
-val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+  def testPasswordConfigNotEncryption(): Unit = {
+val props = TestUtils.createBrokerConfig(0, null, port = 8181)

Review Comment:
   It looks like following test cases still use `MockZkConnect`. Could you also 
update it? Thanks.
   
   * testDynamicConfigInitializationWithoutConfigsInZK
   * testUpdateRemoteLogManagerDynamicThreadPool
   * testRemoteLogDynamicThreadPoolWithInvalidValues



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17921: Support SASL_PLAINTEXT protocol with java.security.auth.login.config [kafka]

2024-12-28 Thread via GitHub


chia7712 commented on code in PR #17671:
URL: https://github.com/apache/kafka/pull/17671#discussion_r1899068385


##
test-common/src/main/java/org/apache/kafka/common/test/JaasModule.java:
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class JaasModule {

Review Comment:
   Is this duplicate to core JaasModule?



##
test-common/src/main/java/org/apache/kafka/common/test/JaasTestUtils.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.test;
+
+import org.apache.kafka.common.security.JaasUtils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import javax.security.auth.login.Configuration;
+
+public class JaasTestUtils {

Review Comment:
   It is already in xxx.test package, maybe we can call it JaasUtils?



##
test-common/src/main/java/org/apache/kafka/common/test/JaasTestUtils.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.test;
+
+import org.apache.kafka.common.security.JaasUtils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import javax.security.auth.login.Configuration;
+
+public class JaasTestUtils {
+public static class JaasSection {
+private final String contextName;
+private final List modules;
+
+public JaasSection(String contextName, List modules) {
+this.contextName = contextName;
+this.modules = modules;
+}
+
+public List getModules() {
+return modules;
+}
+
+public String getContextName() {
+return contextName;
+}
+
+@Override
+public String toString() {
+return String.format("%s {%n  %s%n};%n",
+contextName,
+
modules.stream().map(Object::toString).collect(Collectors.joining("\n  ")));
+}
+}
+
+public static final String KAFKA_SERVER_CONTEXT_NAME = "KafkaServer";
+
+public static final String KAFKA_PLAIN_USER1 = "plain-user1";
+public static final String KAFKA_PLAIN_USER1_PASSWORD = 
"plain-user1-secret";
+public static final String KAFKA_PLAIN_ADMIN

Re: [PR] KAFKA-18339: Remove raw unversioned direct SASL protocol (KIP-896) [kafka]

2024-12-28 Thread via GitHub


ijuma commented on code in PR #18295:
URL: https://github.com/apache/kafka/pull/18295#discussion_r1899001868


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -1107,8 +1107,10 @@ private[kafka] class Processor(
 val header = RequestHeader.parse(buffer)
 if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) {
   header
-} else {
+} else if (header.isApiVersionDeprecated()) {
   throw new InvalidRequestException(s"Received request api key 
${header.apiKey} with version ${header.apiVersion} which is not enabled")
+} else {
+  throw new UnsupportedVersionException(s"Received request api key 
${header.apiKey} with version ${header.apiVersion} which is not supported")

Review Comment:
   Submitted the fix: https://github.com/apache/kafka/pull/18340



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-18339: Fix parseRequestHeader error handling [kafka]

2024-12-28 Thread via GitHub


ijuma opened a new pull request, #18340:
URL: https://github.com/apache/kafka/pull/18340

   A minor refactoring just before merging #18295 introduced a regression and 
no test failed. Throw the correct exception and add test to verify it. Also 
refactor the code slightly to make that possible.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17178) Update KTable.transformValues to new Processor API

2024-12-28 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17908661#comment-17908661
 ] 

A. Sophie Blee-Goldman commented on KAFKA-17178:


This is essentially just an extension of these KIPS which laid the groundwork 
but apparently didn't cover the corresponding KTable APIs:

1. [KIP-478 - Strongly typed Processor 
API|https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API]

2. [KIP-820: Extend KStream process with new Processor 
API|https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API]

> Update KTable.transformValues to new Processor API
> --
>
> Key: KAFKA-17178
> URL: https://issues.apache.org/jira/browse/KAFKA-17178
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: need-kip
> Fix For: 4.1.0
>
>
> The original Processor API was replace by a type-safe version using `Record` 
> instead of K/V pairs. The old PAPI is already mainly deprecated and will be 
> remove to large parts in 4.0 release.
> However, `KTable.transformValues` is still using the old PAPI, and should be 
> updated to use the new `api.FixedKeyProcessor` instead of the old 
> `ValueTransformerWithKey`.
> The method `transformValues` should be deprecated and replaced with 
> `processValues` to `KTable.processValues`.
> At the same time, the old interfaces `ValueTransformerWithKey` and 
> `ValueTransfromerWithKeySupplier` should be deprecated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Removed erroneous quotes from CLASSPATH variable, fixes running on Windows with spaces in path (KAFKA-9710). [kafka]

2024-12-28 Thread via GitHub


rubin55 commented on PR #13057:
URL: https://github.com/apache/kafka/pull/13057#issuecomment-2564513862

   747 pull-requests.. jeez.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17642: PreVote response handling and ProspectiveState [kafka]

2024-12-28 Thread via GitHub


ahuang98 commented on code in PR #18240:
URL: https://github.com/apache/kafka/pull/18240#discussion_r1899044657


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -2442,10 +2503,10 @@ private void maybeTransition(
 " and epoch " + epoch + " which is inconsistent with current 
leader " +
 quorum.leaderId() + " and epoch " + quorum.epoch());
 } else if (epoch > quorum.epoch()) {
-if (leaderId.isPresent()) {
+if (leaderId.isPresent() && !leaderEndpoints.isEmpty()) {

Review Comment:
   I added this because I thought it was an oversight not to check for empty 
endpoints given that we can initialize in UnattachedState when the 
leaderendpoints are not known. But I see now that `maybeTransition` is only 
called in places where the leaderendpoints are expected to be populated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-18026: KIP-1112, clean up graph node grace period resolution [kafka]

2024-12-28 Thread via GitHub


ableegoldman opened a new pull request, #18342:
URL: https://github.com/apache/kafka/pull/18342

   Minor followup to https://github.com/apache/kafka/pull/18195 that I split 
out into a separate PR since that one was getting a bit long. Should be rebased 
& reviewed after that one is merged.
   
   Introduces a new class for windowed graph nodes with a grace period defined 
to improve (slightly) the type safety 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: add broadcast/multicast to Kafka Streams docs [kafka]

2024-12-28 Thread via GitHub


mjsax opened a new pull request, #18341:
URL: https://github.com/apache/kafka/pull/18341

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: add broadcast/multicast to Kafka Streams docs [kafka]

2024-12-28 Thread via GitHub


mjsax commented on code in PR #18341:
URL: https://github.com/apache/kafka/pull/18341#discussion_r1899011500


##
docs/streams/developer-guide/dsl-api.html:
##
@@ -398,12 +421,11 @@ KStream stream = ...;
 
 // A filter that selects (keeps) only positive numbers
-// Java 8+ example, using lambda expressions

Review Comment:
   Side cleanup -- this statement seems to be rather outdated...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] KAFKA-18082: Add 0.11, 1.0, 1.1 and 2.0 to consumer upgrade e2e [kafka]

2024-12-28 Thread via GitHub


ijuma commented on PR #18193:
URL: https://github.com/apache/kafka/pull/18193#issuecomment-2564488560

   Yes, that would be simplest. That said, we should be clear that Connect is 
more of a server component (i.e. it doesn't run alongside customer 
applications) even though it communicates with the brokers as regular clients 
do.
   
   Perhaps we should start that discussion with the Streams folks again. I want 
to check the specifics of the Streams upgrade test as it seems pretty 
complicated (https://github.com/apache/kafka/pull/17876/files).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove zkBroker from StreamsGroupHeartbeatRequest and StreamsGroupDescribeRequest [kafka]

2024-12-28 Thread via GitHub


chia7712 merged PR #18319:
URL: https://github.com/apache/kafka/pull/18319


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: log4j2 upgrade follow-up [kafka]

2024-12-28 Thread via GitHub


chia7712 commented on code in PR #18290:
URL: https://github.com/apache/kafka/pull/18290#discussion_r1898896277


##
bin/kafka-run-class.sh:
##
@@ -225,6 +225,18 @@ if [ -z "$KAFKA_LOG4J_OPTS" ]; then
   (( WINDOWS_OS_FORMAT )) && LOG4J_DIR=$(cygpath --path --mixed "${LOG4J_DIR}")
   KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_DIR}"
 else
+  if echo "$KAFKA_LOG4J_OPTS" | grep -qE "log4j\.[^[:space:]]+$"; then
+  # Extract Log4j 1.x configuration file path from KAFKA_LOG4J_OPTS
+  LOG4J1_CONFIG=$(echo "$KAFKA_LOG4J_OPTS" | grep -o 
'log4j\.configuration=\S*' | cut -d'=' -f2)
+
+  # Enable Log4j 1.x configuration compatibility mode for Log4j 2
+  export LOG4J_COMPATIBILITY=true

Review Comment:
   @frankvicky could you please test the compatibility on your local?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-18356) Explicitly setting up instrumentation for inline mocking (Java 21+)

2024-12-28 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-18356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

黃竣陽 updated KAFKA-18356:

Description: 
When I run test in Java 21, I find there are some warning logs, we should 
imporve mokito to resolve these warning tips.
{code:java}
Mockito is currently self-attaching to enable the inline-mock-maker. This will 
no longer work in future releases of the JDK. Please add Mockito as an agent to 
your build what is described in Mockito's documentation: 
https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#0.3
WARNING: A Java agent has been loaded dynamically 
(/Users/ken/.gradle/caches/modules-2/files-2.1/net.bytebuddy/byte-buddy-agent/1.15.4/58e850dde88f3cf20f41f659440bef33f6c4fe02/byte-buddy-agent-1.15.4.jar)
WARNING: If a serviceability tool is in use, please run with 
-XX:+EnableDynamicAgentLoading to hide this warning
WARNING: If a serviceability tool is not in use, please run with 
-Djdk.instrument.traceUsage for more information
WARNING: Dynamic loading of agents will be disallowed by default in a future 
release {code}

  was:
When I run test in Java 21, I find there are some warning logs, we should 
imporve mokito to disable there warning tips.
{code:java}
Mockito is currently self-attaching to enable the inline-mock-maker. This will 
no longer work in future releases of the JDK. Please add Mockito as an agent to 
your build what is described in Mockito's documentation: 
https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#0.3
WARNING: A Java agent has been loaded dynamically 
(/Users/ken/.gradle/caches/modules-2/files-2.1/net.bytebuddy/byte-buddy-agent/1.15.4/58e850dde88f3cf20f41f659440bef33f6c4fe02/byte-buddy-agent-1.15.4.jar)
WARNING: If a serviceability tool is in use, please run with 
-XX:+EnableDynamicAgentLoading to hide this warning
WARNING: If a serviceability tool is not in use, please run with 
-Djdk.instrument.traceUsage for more information
WARNING: Dynamic loading of agents will be disallowed by default in a future 
release {code}


> Explicitly setting up instrumentation for inline mocking (Java 21+)
> ---
>
> Key: KAFKA-18356
> URL: https://issues.apache.org/jira/browse/KAFKA-18356
> Project: Kafka
>  Issue Type: Improvement
>Reporter: 黃竣陽
>Assignee: 黃竣陽
>Priority: Major
>
> When I run test in Java 21, I find there are some warning logs, we should 
> imporve mokito to resolve these warning tips.
> {code:java}
> Mockito is currently self-attaching to enable the inline-mock-maker. This 
> will no longer work in future releases of the JDK. Please add Mockito as an 
> agent to your build what is described in Mockito's documentation: 
> https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#0.3
> WARNING: A Java agent has been loaded dynamically 
> (/Users/ken/.gradle/caches/modules-2/files-2.1/net.bytebuddy/byte-buddy-agent/1.15.4/58e850dde88f3cf20f41f659440bef33f6c4fe02/byte-buddy-agent-1.15.4.jar)
> WARNING: If a serviceability tool is in use, please run with 
> -XX:+EnableDynamicAgentLoading to hide this warning
> WARNING: If a serviceability tool is not in use, please run with 
> -Djdk.instrument.traceUsage for more information
> WARNING: Dynamic loading of agents will be disallowed by default in a future 
> release {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-18356) Explicitly setting up instrumentation for inline mocking (Java 21+)

2024-12-28 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-18356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

黃竣陽 updated KAFKA-18356:

Description: 
When I run test in Java 21, I find there are some warning logs, we should 
imporve mokito to resolve these warning logs.
{code:java}
Mockito is currently self-attaching to enable the inline-mock-maker. This will 
no longer work in future releases of the JDK. Please add Mockito as an agent to 
your build what is described in Mockito's documentation: 
https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#0.3
WARNING: A Java agent has been loaded dynamically 
(/Users/ken/.gradle/caches/modules-2/files-2.1/net.bytebuddy/byte-buddy-agent/1.15.4/58e850dde88f3cf20f41f659440bef33f6c4fe02/byte-buddy-agent-1.15.4.jar)
WARNING: If a serviceability tool is in use, please run with 
-XX:+EnableDynamicAgentLoading to hide this warning
WARNING: If a serviceability tool is not in use, please run with 
-Djdk.instrument.traceUsage for more information
WARNING: Dynamic loading of agents will be disallowed by default in a future 
release {code}

  was:
When I run test in Java 21, I find there are some warning logs, we should 
imporve mokito to resolve these warning tips.
{code:java}
Mockito is currently self-attaching to enable the inline-mock-maker. This will 
no longer work in future releases of the JDK. Please add Mockito as an agent to 
your build what is described in Mockito's documentation: 
https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#0.3
WARNING: A Java agent has been loaded dynamically 
(/Users/ken/.gradle/caches/modules-2/files-2.1/net.bytebuddy/byte-buddy-agent/1.15.4/58e850dde88f3cf20f41f659440bef33f6c4fe02/byte-buddy-agent-1.15.4.jar)
WARNING: If a serviceability tool is in use, please run with 
-XX:+EnableDynamicAgentLoading to hide this warning
WARNING: If a serviceability tool is not in use, please run with 
-Djdk.instrument.traceUsage for more information
WARNING: Dynamic loading of agents will be disallowed by default in a future 
release {code}


> Explicitly setting up instrumentation for inline mocking (Java 21+)
> ---
>
> Key: KAFKA-18356
> URL: https://issues.apache.org/jira/browse/KAFKA-18356
> Project: Kafka
>  Issue Type: Improvement
>Reporter: 黃竣陽
>Assignee: 黃竣陽
>Priority: Major
>
> When I run test in Java 21, I find there are some warning logs, we should 
> imporve mokito to resolve these warning logs.
> {code:java}
> Mockito is currently self-attaching to enable the inline-mock-maker. This 
> will no longer work in future releases of the JDK. Please add Mockito as an 
> agent to your build what is described in Mockito's documentation: 
> https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#0.3
> WARNING: A Java agent has been loaded dynamically 
> (/Users/ken/.gradle/caches/modules-2/files-2.1/net.bytebuddy/byte-buddy-agent/1.15.4/58e850dde88f3cf20f41f659440bef33f6c4fe02/byte-buddy-agent-1.15.4.jar)
> WARNING: If a serviceability tool is in use, please run with 
> -XX:+EnableDynamicAgentLoading to hide this warning
> WARNING: If a serviceability tool is not in use, please run with 
> -Djdk.instrument.traceUsage for more information
> WARNING: Dynamic loading of agents will be disallowed by default in a future 
> release {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-18356) Explicitly setting up instrumentation for inline mocking (Java 21+)

2024-12-28 Thread Jira
黃竣陽 created KAFKA-18356:
---

 Summary: Explicitly setting up instrumentation for inline mocking 
(Java 21+)
 Key: KAFKA-18356
 URL: https://issues.apache.org/jira/browse/KAFKA-18356
 Project: Kafka
  Issue Type: Improvement
Reporter: 黃竣陽
Assignee: 黃竣陽


When I run test in Java 21, I find there are some warning logs, we should 
imporve mokito to disable there warning tips.
{code:java}
Mockito is currently self-attaching to enable the inline-mock-maker. This will 
no longer work in future releases of the JDK. Please add Mockito as an agent to 
your build what is described in Mockito's documentation: 
https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#0.3
WARNING: A Java agent has been loaded dynamically 
(/Users/ken/.gradle/caches/modules-2/files-2.1/net.bytebuddy/byte-buddy-agent/1.15.4/58e850dde88f3cf20f41f659440bef33f6c4fe02/byte-buddy-agent-1.15.4.jar)
WARNING: If a serviceability tool is in use, please run with 
-XX:+EnableDynamicAgentLoading to hide this warning
WARNING: If a serviceability tool is not in use, please run with 
-Djdk.instrument.traceUsage for more information
WARNING: Dynamic loading of agents will be disallowed by default in a future 
release {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)