[jira] [Commented] (KAFKA-19051) Fix implicit acknowledgement cannot be overridden when RecordDeserializationException occurs

2025-04-04 Thread Andrew Schofield (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939605#comment-17939605
 ] 

Andrew Schofield commented on KAFKA-19051:
--

Yes, I've been thinking about this. I really don't like creating a new fake 
instance of ConsumerRecord and then using it to acknowledge. I think there are 
two paths forward.

1) Just change the KIP and remove the part about "the application can override 
this if it is using explicit acknowledgement". Or

2) Provide a config such as 
`share.acknowledge.type.on.deserialization.exception` which could then let the 
application override the default RELEASE with something such as REJECT.

Personally, I prefer (1) and think that (2) just overcomplicates things. The 
whole point of automatically releasing the records is that the effect of a 
deserializable record is short-lived.

What do [~brandboat] and [~frouleau] think?

> Fix implicit acknowledgement cannot be overridden when 
> RecordDeserializationException occurs
> 
>
> Key: KAFKA-19051
> URL: https://issues.apache.org/jira/browse/KAFKA-19051
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Frédérik ROULEAU
>Assignee: Kuan Po Tseng
>Priority: Major
>
> When a record generates a RecordDeserializationException, KIP mentioned that 
> with explicit acknowledgement the default Release can be overridden.
> When tried, I have:
> {code:java}
> Exception in thread "main" java.lang.IllegalStateException: The record cannot 
> be acknowledged.
>     at 
> org.apache.kafka.clients.consumer.internals.ShareFetch.acknowledge(ShareFetch.java:123)
>     at 
> org.apache.kafka.clients.consumer.internals.ShareConsumerImpl.acknowledge(ShareConsumerImpl.java:683)
>     at 
> org.apache.kafka.clients.consumer.KafkaShareConsumer.acknowledge(KafkaShareConsumer.java:534)
>     at org.example.frouleau.kip932.Main.main(Main.java:62) {code}
> It looks like the record was already released.
> Code used:
> {code:java}
> //
> } catch (RecordDeserializationException re) {
> long offset = re.offset();
> Throwable t = re.getCause();
> LOGGER.error("Failed to deserialize record at partition={} offset={}", 
> re.topicPartition().partition(), offset, t);
> ConsumerRecord record = new 
> ConsumerRecord<>(re.topicPartition().topic(), 
> re.topicPartition().partition(), offset, "", "");
> consumer.acknowledge(record, AcknowledgeType.REJECT);
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17078: Add SecurityManagerCompatibility shim (#16522) [kafka]

2025-04-04 Thread via GitHub


srdo-humio commented on PR #19221:
URL: https://github.com/apache/kafka/pull/19221#issuecomment-2732689820

   I ran the tests locally, and they eventually passed. I think the failures 
are just flakiness.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add 4.0.0 to streams system tests [kafka]

2025-04-04 Thread via GitHub


dajac commented on PR #19239:
URL: https://github.com/apache/kafka/pull/19239#issuecomment-2735636755

   I have triggered a run of the streams system tests. I will post the results 
when they are done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18935: Ensure brokers do not return null records in FetchResponse [kafka]

2025-04-04 Thread via GitHub


AndrewJSchofield commented on PR #19167:
URL: https://github.com/apache/kafka/pull/19167#issuecomment-2746292714

   @frankvicky I have opened https://issues.apache.org/jira/browse/KAFKA-19031 
for ShareFetchResponse. You're welcome to take it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Specify 2.1 as the minimum broker version for clients [kafka]

2025-04-04 Thread via GitHub


jolshan commented on code in PR #19250:
URL: https://github.com/apache/kafka/pull/19250#discussion_r2006621799


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -234,9 +229,10 @@
  * successful writes are marked as aborted, hence keeping the transactional 
guarantees.
  * 
  * 
- * This client can communicate with brokers that are version 0.10.0 or newer. 
Older or newer brokers may not support
- * certain client features.  For instance, the transactional APIs need broker 
versions 0.11.0 or later. You will receive an
- * UnsupportedVersionException when invoking an API that is not 
available in the running broker version.
+ * This client can communicate with brokers that are version 2.1 or newer. 
Older brokers may not support
+ * certain client features. For instance, {@code sendOffsetsToTransaction} 
with all consumer group metadata needs broker
+ * versions 2.5 or later. You will receive an 
UnsupportedVersionException when invoking an API that is not

Review Comment:
   I was a little confused by this at first, so I did some digging.
   This message is due to the change in the signature of 
`sendOffsetsToTransaction` requiring `ConsumerGroupMetadata groupMetadata` 
rather than `String consumerGroupId` (KIP introducing the concept 
[here](https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics),
 and old method signature deprecated in 
[this](https://github.com/apache/kafka/commit/3805f3706f8f3ebba81b80915c9259590525fb05)
 commit)
   
   But yes, this is correct. 👍 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19085: SharePartitionManagerTest testMultipleConcurrentShareFetches throws silent exception and works incorrectly [kafka]

2025-04-04 Thread via GitHub


AndrewJSchofield merged PR #19370:
URL: https://github.com/apache/kafka/pull/19370


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18067: Add a flag to disable producer reset during active task creator shutting down [kafka]

2025-04-04 Thread via GitHub


mjsax commented on code in PR #19269:
URL: https://github.com/apache/kafka/pull/19269#discussion_r2025427298


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java:
##
@@ -70,6 +70,7 @@ public class StreamsProducer {
 private Producer producer;
 private boolean transactionInFlight = false;
 private boolean transactionInitialized = false;
+private boolean resetDisabled = false;

Review Comment:
   Let's hear from Sophie.
   
   To me, "disabling the reset" is not really a feature of the producer, but a 
feature of the `TaskManager` (and in transition the `ActiveTaskCreator` as a 
helper class used by `TaskManager`). -- Given that only the `TaskManager` 
decides if it wants to skip the re-initialization or not (and the `TaskManager` 
does already skip the call by checking `streamsProducer.isResetDisabled()`), 
it's unclear to my why we would push this flag inside `StreamsProducer`. It 
seems to be an unnecessary "de-tour", and boilerplate code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Use readable interface to parse response [kafka]

2025-04-04 Thread via GitHub


ditac commented on code in PR #19353:
URL: https://github.com/apache/kafka/pull/19353#discussion_r2029310683


##
clients/src/main/java/org/apache/kafka/common/protocol/ReadableBuf.java:
##
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+/**
+ * A readable backed by some buffers
+ */
+public interface ReadableBuf extends Readable {

Review Comment:
   Why not make this part of Readable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]

2025-04-04 Thread via GitHub


adixitconfluent commented on PR #19261:
URL: https://github.com/apache/kafka/pull/19261#issuecomment-2768671007

   > Are all scenarios covered in SharePartition unit tests?
   
   Hi @apoorvmittal10, I have added the unit tests for different cases of 
acquire (cached state can be empty/acquire full batch/acquire subset batch). I 
think a better way to test the added code was through integration tests, hence 
there I have added plenty in `ShareConsumerTest` since I can mimic transactions 
and there the code to process aborted transactions is also being tested. Please 
let me know if this doesn't look enough and I should add more?
   cc - @AndrewJSchofield 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19071: Fix doc for remote.storage.enable [kafka]

2025-04-04 Thread via GitHub


m1a2st commented on code in PR #19345:
URL: https://github.com/apache/kafka/pull/19345#discussion_r2024038450


##
clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java:
##
@@ -81,7 +81,8 @@ public class TopicConfig {
 
 public static final String REMOTE_LOG_STORAGE_ENABLE_CONFIG = 
"remote.storage.enable";
 public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable 
tiered storage for a topic, set this configuration as true. " +
-"You can not disable this config once it is enabled. It will be 
provided in future versions.";
+"Disable tiered storage by setting this configuration to false for 
a tiered storage enabled topic. Setting this configuration to false " +
+"must be accompanied by setting `remote.log.delete.on.disable` to 
true";

Review Comment:
   Maybe we can make it clearer like this:  
   
   To enable tiered storage for a topic, set this configuration to true.  To 
disable tiered storage for a topic that has it enabled, set this configuration 
to false.  When disabling, you must also set 
remote.log.delete.on.disable to true."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-19085) SharePartitionManagerTest testMultipleConcurrentShareFetches throws silent exception and works incorrectly

2025-04-04 Thread Abhinav Dixit (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhinav Dixit resolved KAFKA-19085.
---
Fix Version/s: 4.1.0
   Resolution: Fixed

> SharePartitionManagerTest testMultipleConcurrentShareFetches throws silent 
> exception and works incorrectly
> --
>
> Key: KAFKA-19085
> URL: https://issues.apache.org/jira/browse/KAFKA-19085
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Abhinav Dixit
>Assignee: Abhinav Dixit
>Priority: Major
> Fix For: 4.1.0
>
>
> The tests testMultipleConcurrentShareFetches is throwing a silent exception.
> ERROR Error processing delayed share fetch request 
> (kafka.server.share.DelayedShareFetch:225)
> This is due to incomplete mocks setup for the test and also requires changes 
> in timeout.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18001: Support UpdateRaftVoterRequest in KafkaNetworkChannel (#… [kafka]

2025-04-04 Thread via GitHub


github-actions[bot] commented on PR #17830:
URL: https://github.com/apache/kafka/pull/17830#issuecomment-2774400201

   This PR has been closed since it has not had any activity in 120 days. If 
you feel like this
   was a mistake, or you would like to continue working on it, please feel free 
to re-open the 
   PR and ask for a review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18891: KIP-877 add support for RemoteLogMetadataManager and RemoteStorageManager [kafka]

2025-04-04 Thread via GitHub


TaiJuWu commented on code in PR #19286:
URL: https://github.com/apache/kafka/pull/19286#discussion_r2022583314


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -413,24 +418,19 @@ void 
testRemoteLogMetadataManagerWithEndpointConfigOverridden() throws IOExcepti
 tp -> Optional.of(mockLog),
 (topicPartition, offset) -> { },
 brokerTopicStats,
-metrics) {
+metrics,
+endPoint) {
+@Override
 public RemoteStorageManager createRemoteStorageManager() {
 return remoteStorageManager;
 }
+@Override
 public RemoteLogMetadataManager createRemoteLogMetadataManager() {
 return remoteLogMetadataManager;
 }
 }) {
-
-String host = "localhost";
-int port = 1234;
-String securityProtocol = "PLAINTEXT";
-Endpoint endpoint = new Endpoint(securityProtocol, 
SecurityProtocol.PLAINTEXT, host, port);
-remoteLogManager.onEndPointCreated(endpoint);
-remoteLogManager.startup();
-
 ArgumentCaptor> capture = 
ArgumentCaptor.forClass(Map.class);
-verify(remoteLogMetadataManager, 
times(1)).configure(capture.capture());
+verify(remoteLogMetadataManager, 
times(2)).configure(capture.capture());

Review Comment:
   One is call from `setup`, the other is from this function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19030: Remove metricNamePrefix from RequestChannel [kafka]

2025-04-04 Thread via GitHub


m1a2st commented on code in PR #19374:
URL: https://github.com/apache/kafka/pull/19374#discussion_r2028861881


##
core/src/main/scala/kafka/network/RequestChannel.scala:
##
@@ -349,8 +348,8 @@ class RequestChannel(val queueSize: Int,
 
   private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
   private val processors = new ConcurrentHashMap[Int, Processor]()
-  private val requestQueueSizeMetricName = 
metricNamePrefix.concat(RequestQueueSizeMetric)
-  private val responseQueueSizeMetricName = 
metricNamePrefix.concat(ResponseQueueSizeMetric)
+  private val requestQueueSizeMetricName = RequestQueueSizeMetric
+  private val responseQueueSizeMetricName = ResponseQueueSizeMetric

Review Comment:
   We can merge these properties with `RequestChannel` L47, L48 
   ```
   private val RequestQueueSizeMetric = "RequestQueueSize"
 private val ResponseQueueSizeMetric = "ResponseQueueSize"
 ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18827: Incorporate initializing topics in share group heartbeat [4/N] [kafka]

2025-04-04 Thread via GitHub


AndrewJSchofield merged PR #19339:
URL: https://github.com/apache/kafka/pull/19339


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-19056) Move EndToEndClusterIdTest to server module

2025-04-04 Thread Ming-Yen Chung (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ming-Yen Chung reassigned KAFKA-19056:
--

Assignee: Ming-Yen Chung  (was: Chia-Ping Tsai)

> Move EndToEndClusterIdTest to server module
> ---
>
> Key: KAFKA-19056
> URL: https://issues.apache.org/jira/browse/KAFKA-19056
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Ming-Yen Chung
>Priority: Minor
>
> 1. rewrite by java
> 2. use ClusterTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-19080) The constraint on segment.ms is not enforced at topic level

2025-04-04 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-19080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17940498#comment-17940498
 ] 

黃竣陽 commented on KAFKA-19080:
-

Hello [~junrao], If you won't work on this, may I take it?

> The constraint on segment.ms is not enforced at topic level
> ---
>
> Key: KAFKA-19080
> URL: https://issues.apache.org/jira/browse/KAFKA-19080
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 4.0.0
>Reporter: Jun Rao
>Priority: Major
>
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations]
>  sets a new constraint (at least 1MB) on segment.bytes. This is implemented 
> in [https://github.com/apache/kafka/pull/18140.] However, it doesn't seem to 
> be enforced at the topic level.
> {code:java}
> bash-3.2$ bin/kafka-configs.sh --alter --bootstrap-server localhost:9092 
> --topic test --add-config segment.bytes=1000
> Completed updating config for topic test.
> bash-3.2$ bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 
> --topic test
> Dynamic configs for topic test are:
>   segment.bytes=1000 sensitive=false 
> synonyms={DYNAMIC_TOPIC_CONFIG:segment.bytes=1000, 
> STATIC_BROKER_CONFIG:log.segment.bytes=1073741824, 
> DEFAULT_CONFIG:log.segment.bytes=1073741824} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16407: Fix foreign key INNER join on change of FK from/to a null value [kafka]

2025-04-04 Thread via GitHub


mjsax commented on code in PR #19303:
URL: https://github.com/apache/kafka/pull/19303#discussion_r2029558102


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java:
##
@@ -316,6 +367,21 @@ public void innerJoinShouldPropagateDeletionOfPrimaryKey() 
{
 );
 }
 
+@Test
+public void 
innerJoinShouldNotPropagateDeletionOfPrimaryKeyWhenPreviousFKIsNull() {
+final MockInternalProcessorContext> context = new MockInternalProcessorContext<>();
+innerJoinProcessor.init(context);
+context.setRecordMetadata("topic", 0, 0);
+
+innerJoinProcessor.process(new Record<>(pk, new Change<>(null, new 
LeftValue(null)), 0));
+
+assertThat(context.forwarded(), empty());
+
+// test dropped-records sensors
+assertEquals(1.0, getDroppedRecordsTotalMetric(context));

Review Comment:
   That's is not really the definition of "dropped records" -- "dropped 
records" is a metric for malformed records that could not be processed. -- Eg, 
a `null`-key record for a table is not valid, because we need a valid PK, and 
thus we "drop" such a record.
   
   However for the case we discuss here, the input record is totally ok, and 
it's operator semantics to decide/figure out that the join result is already 
correct, and thus we don't need to do anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19001: Use streams group-level configurations in heartbeat [kafka]

2025-04-04 Thread via GitHub


mjsax commented on code in PR #19219:
URL: https://github.com/apache/kafka/pull/19219#discussion_r2029561158


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -17848,6 +17921,104 @@ public void testStreamsRebalanceTimeoutExpiration() {
 context.assertNoRebalanceTimeout(groupId, memberId1);
 }
 
+@Test
+public void testStreamsOnNewMetadataImage() {

Review Comment:
   Did miss the PR comment... Sorry. -- In general we should avoid such 
piggy-backing to not confuse reviewers (ie, me :) )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19001: Use streams group-level configurations in heartbeat [kafka]

2025-04-04 Thread via GitHub


mjsax commented on code in PR #19219:
URL: https://github.com/apache/kafka/pull/19219#discussion_r2029559949


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -8148,7 +8174,10 @@ private TaskAssignor streamsGroupAssignor(String 
groupId) {
  * Get the assignor of the provided streams group.
  */
 private Map streamsGroupAssignmentConfigs(String groupId) {

Review Comment:
   Thanks. Should be good for now. It's all internal stuff so we can still 
refactor in the future is we want to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-19067) AsyncKafkaConsumer may return stale fetch result after seek operation

2025-04-04 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17940129#comment-17940129
 ] 

Lianet Magrans commented on KAFKA-19067:


Hey [~yangpoan] , thanks for filing! I'm still getting my head around this, but 
first thoughts if that it does seem to me we could have a tricky situation 
here, still not sure about the impact though. 

There is indeed a rather risky way we briefly transition to FETCHING in the 
background before validating the position:

[https://github.com/apache/kafka/blob/1eea7f0528954ce8dcbcc4357ae2ef28c1d1e5f2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L1133-L1134]

So if the app thread hits the fetching part funnily in-between, it would 
wrongly take it as a valid position, although it may need validation, ex. here 

[https://github.com/apache/kafka/blob/cee55dbdeca79cba5cbfca2f53a37344ebe2e38a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java#L223]

Even though that check would be wrong, I expect it wouldn't return buffered 
data from a different position because of the app thread check for it, right?

[https://github.com/apache/kafka/blob/cee55dbdeca79cba5cbfca2f53a37344ebe2e38a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java#L254]
 

So all the above is the side concerning the buffered data kept in the app 
thread, and how it could be impacted by the background briefly transitioning to 
FETCHING before validating.

Then there is another side of the story, which is the fetch request generated 
(and what the logs you shared show). But the fetch requests are always 
generated in the background, so I would expect we would have no race with this 
situation right? How exactly would we get to generate a fetch for position 10 
(in the background), if the position 10 hasn't been validated yet? This log:
{code:java}
Added read_uncommitted fetch request for partition topic-0 at position 
FetchPosition{offset=10...{code}
 I would expect the partition is not fetchable because it does not have a valid 
position, so not included in the fetch (here we couldn't be FETCHING briefly 
since it's all happening in the same thread). 

Could be missing something, thinking out loud here, let me know your thoughts. 
Thanks!

> AsyncKafkaConsumer may return stale fetch result after seek operation
> -
>
> Key: KAFKA-19067
> URL: https://issues.apache.org/jira/browse/KAFKA-19067
> Project: Kafka
>  Issue Type: Bug
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Major
>
> The KafkaConsumer sends FetchRequest after it subscribes topics. The 
> FetchResponse data stores to FetchBuffer. For KafkaConsumer#seek operation, 
> the FetchState changes to AWAIT_RESET and the consumer sends LIST_OFFSET 
> request. The state changes back to FETCHING after the consumer receives 
> LIST_OFFSET response.
> If a KafkaConsumer subscribes topics and calls seek function, there may have 
> stale FetchResponse data in FetchBuffer. For ClassicKafkaConsumer#poll, it 
> gets data from FetchBuffer first and then calls ConsumerNetworkClient#poll. 
> If there is stale data in FetchBuffer, the data is ignored because the 
> FetchState is in AWAIT_RESET. The FetchState in ClassicKafkaConsumer changes 
> back to FETCHING after ConsumerNetworkClient#poll receives LIST_OFFSET 
> response.
> However, for AsyncKafkaConsumer, it may return stale FetchResponse data to 
> users, because the ConsumerNetworkThread runs in another thread. The 
> FetchState may changes back to FETCHING before AsyncKafkaConsumer#poll does 
> valid position check.
> Following logs show the case for ClassicKafkaConsumer:
> {noformat}
> [Consumer clientId=consumer-group-1, groupId=group] Added read_uncommitted 
> fetch request for partition topic-0 at position FetchPosition{offset=0, 
> offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack: 
> null isFenced: false)], epoch=0}} to node localhost:50923 (id: 1 rack: null 
> isFenced: false) 
> (org.apache.kafka.clients.consumer.internals.AbstractFetch:471)
> [Consumer clientId=consumer-group-1, groupId=group] Sending FETCH request 
> with header RequestHeader(apiKey=FETCH, apiVersion=17, 
> clientId=consumer-group-1, correlationId=12, headerVersion=2) and timeout 
> 3 to node 1: FetchRequestData(clusterId=null, replicaId=-1, 
> replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, 
> minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, 
> topics=[FetchTopic(topic='topic', topicId=BatA1H3WQ6KdwhZpMq6fOw, 
> partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, 
> lastFetchedEpoch=-1, logStartOffset=-1, par

Re: [PR] KAFKA-16407: Fix foreign key INNER join on change of FK from/to a null value [kafka]

2025-04-04 Thread via GitHub


mjsax commented on code in PR #19303:
URL: https://github.com/apache/kafka/pull/19303#discussion_r2029556963


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##
@@ -143,28 +143,25 @@ private void leftJoinInstructions(final Record> record) {
 
 private void defaultJoinInstructions(final Record> record) {
 if (record.value().oldValue != null) {
-final KRight oldForeignKey = record.value().oldValue == null ? 
null : foreignKeyExtractor.extract(record.key(), record.value().oldValue);
-if (oldForeignKey == null) {
+final KRight oldForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().oldValue);

Review Comment:
   That's not my point -- `foreignKeyExtractor` is a user-provided 
callback/lambda, and it does not expect to be called with a `null` value. So it 
seems not save to make this call, as the user-code could potentially crash with 
a NPE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Use switch expressions introduced in Java 14 to simplify code [kafka]

2025-04-04 Thread via GitHub


mjsax commented on PR #18371:
URL: https://github.com/apache/kafka/pull/18371#issuecomment-2779997645

   Seems we need to revert all KS related changed, as KS is still on Java 11.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove dead code `maybeWarnIfOversizedRecords` [kafka]

2025-04-04 Thread via GitHub


ijuma commented on code in PR #19316:
URL: https://github.com/apache/kafka/pull/19316#discussion_r2021657292


##
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala:
##
@@ -161,15 +157,6 @@ class ReplicaFetcherThread(name: String,
 }
   }
 
-  private def maybeWarnIfOversizedRecords(records: MemoryRecords, 
topicPartition: TopicPartition): Unit = {
-// oversized messages don't cause replication to fail from fetch request 
version 3 (KIP-74)
-if (metadataVersionSupplier().fetchRequestVersion <= 2 && 
records.sizeInBytes > 0 && records.validBytes <= 0)

Review Comment:
   I think you're saying that IBP 3.3 implies fetch request 13 or higher. That 
makes sense, I pushed a commit that removes that check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10789: Streamlining Tests in ChangeLoggingKeyValueBytesStoreTest [kafka]

2025-04-04 Thread via GitHub


mjsax commented on code in PR #18816:
URL: https://github.com/apache/kafka/pull/18816#discussion_r2029593389


##
streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java:
##
@@ -89,8 +95,81 @@ public class ChangeLoggingKeyValueBytesStoreTest {
 public void before() {
 context = mockContext();
 context.setTime(0);
+store = new ChangeLoggingKeyValueBytesStore(innerMock);
 store.init(context, store);
 }
+private void mockPosition() {

Review Comment:
   nit: insert empty lines between methods; above here, and also down below.



##
streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java:
##
@@ -62,18 +63,23 @@
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.hasEntry;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @SuppressWarnings("rawtypes")
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.STRICT_STUBS)
-public class ChangeLoggingKeyValueBytesStoreTest {
+class ChangeLoggingKeyValueBytesStoreTest {

Review Comment:
   Can a test class be package private?



##
streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java:
##
@@ -111,29 +190,38 @@ public void after() {
 }
 
 @Test
-public void shouldDelegateInit() {
-final InternalMockProcessorContext context = mockContext();
-final KeyValueStore innerMock = 
mock(InMemoryKeyValueStore.class);
+void shouldDelegateInit() {

Review Comment:
   Same question as above for the class: I thought test methods must be 
`public`?



##
streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java:
##
@@ -89,8 +95,81 @@ public class ChangeLoggingKeyValueBytesStoreTest {
 public void before() {
 context = mockContext();
 context.setTime(0);
+store = new ChangeLoggingKeyValueBytesStore(innerMock);
 store.init(context, store);
 }
+private void mockPosition() {
+when(innerMock.getPosition()).thenReturn(Position.emptyPosition());
+}
+private void mockGet(final Map mockMap) {

Review Comment:
   Why are we using `mockMap`? It seems unnecessarily complex? -- It seems much 
more straightforward to just "expect" calls into the `innerMock` store?



##
streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java:
##
@@ -89,8 +95,81 @@ public class ChangeLoggingKeyValueBytesStoreTest {
 public void before() {
 context = mockContext();
 context.setTime(0);
+store = new ChangeLoggingKeyValueBytesStore(innerMock);
 store.init(context, store);
 }
+private void mockPosition() {
+when(innerMock.getPosition()).thenReturn(Position.emptyPosition());
+}
+private void mockGet(final Map mockMap) {
+when(innerMock.get(any(Bytes.class))).thenAnswer(invocation -> 
mockMap.get(invocation.getArgument(0)));
+}
+private void mockPut(final Map mockMap) {
+doAnswer(invocation -> {
+mockMap.put(invocation.getArgument(0), invocation.getArgument(1));
+StoreQueryUtils.updatePosition(innerMock.getPosition(), context);
+return null;
+}).when(innerMock).put(any(Bytes.class), any(byte[].class));

Review Comment:
   Why do we use `doAnswer(...).when(...)` here?
   
   In `mockGet` we use `when(...).thanAnswer(...)` what I find much easier to 
read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16355: Fix ConcurrentModificationException in evictWhile Method [kafka]

2025-04-04 Thread via GitHub


mjsax commented on PR #16554:
URL: https://github.com/apache/kafka/pull/16554#issuecomment-2779989694

   @abhi-ksolves -- What is the status of this PR? Are you still interested to 
finish it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17662: config.providers configuration missing from the docs [kafka]

2025-04-04 Thread via GitHub


mjsax commented on code in PR #18930:
URL: https://github.com/apache/kafka/pull/18930#discussion_r2029612516


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -1236,7 +1236,11 @@ public class StreamsConfig extends AbstractConfig {
 Type.LONG,
 null,
 Importance.LOW,
-WINDOW_SIZE_MS_DOC);
+WINDOW_SIZE_MS_DOC)
+.define(CONFIG_PROVIDERS_CONFIG, 

Review Comment:
   We keep configs sorted by name, within each priority group. Please insert at 
the right place alphabetically.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Change testing for git command in Compatibility Checker [kafka]

2025-04-04 Thread via GitHub


github-actions[bot] closed pull request #18006: Change testing for git command 
in Compatibility Checker 
URL: https://github.com/apache/kafka/pull/18006


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19001: Use streams group-level configurations in heartbeat [kafka]

2025-04-04 Thread via GitHub


mjsax commented on code in PR #19219:
URL: https://github.com/apache/kafka/pull/19219#discussion_r2029560159


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -8148,7 +8174,10 @@ private TaskAssignor streamsGroupAssignor(String 
groupId) {
  * Get the assignor of the provided streams group.
  */
 private Map streamsGroupAssignmentConfigs(String groupId) {
-return Map.of("group.streams.num.standby.replicas", "0");
+Optional groupConfig = 
groupConfigManager.groupConfig(groupId);
+final Integer numStandbyReplicas = 
groupConfig.map(GroupConfig::streamsNumStandbyReplicas)
+.orElse(config.streamsGroupNumStandbyReplicas());
+return Map.of("num.standby.replicas", numStandbyReplicas.toString());

Review Comment:
   Ok, was just wondering because the existing code does use the prefix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18826: Add global thread metrics [kafka]

2025-04-04 Thread via GitHub


bbejeck merged PR #18953:
URL: https://github.com/apache/kafka/pull/18953


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Minor: Add functionalinterface to the producer callback [kafka]

2025-04-04 Thread via GitHub


chia7712 commented on code in PR #19366:
URL: https://github.com/apache/kafka/pull/19366#discussion_r2027523376


##
clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgementCommitCallback.java:
##
@@ -34,6 +34,7 @@
  * The callback may be executed in any thread calling {@link 
ShareConsumer#poll(java.time.Duration)}.
  */
 @InterfaceStability.Evolving
+@FunctionalInterface

Review Comment:
   share consumer is not production-ready, so we should add the annotation 
later. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19001: Use streams group-level configurations in heartbeat [kafka]

2025-04-04 Thread via GitHub


mjsax commented on code in PR #19219:
URL: https://github.com/apache/kafka/pull/19219#discussion_r2029562373


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -18070,6 +18241,101 @@ public void testShareGroupDynamicConfigs() {
 context.assertNoRebalanceTimeout(groupId, memberId);
 }
 
+@Test
+public void testStreamsGroupDynamicConfigs() {
+String groupId = "fooup";
+String memberId = Uuid.randomUuid().toString();
+String subtopology1 = "subtopology1";
+String fooTopicName = "foo";
+Uuid fooTopicId = Uuid.randomUuid();
+Topology topology = new Topology().setSubtopologies(List.of(
+new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+));
+
+MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withStreamsGroupTaskAssignors(List.of(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.addRacks()
+.build())
+.build();
+
+assignor.prepareGroupAssignment(
+Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2;
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult 
result =
+context.streamsGroupHeartbeat(
+new StreamsGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(1)
+.setTopology(topology)
+.setActiveTasks(List.of())
+.setStandbyTasks(List.of())
+.setWarmupTasks(List.of()));
+assertEquals(1, result.response().data().memberEpoch());
+assertEquals(Map.of("num.standby.replicas", "0"), 
assignor.lastPassedAssignmentConfigs());
+
+// Verify heartbeat interval
+assertEquals(5000, result.response().data().heartbeatIntervalMs());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(

Review Comment:
   > In theory, one could refactor this into a separate test
   
   This could make sense. IMHO, a single test, should test a single thing, not 
multiple things. Leave it up to you to keep as-is in this PR (and file a follow 
up Jira ticket for it), and maybe change right away.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove dead code `maybeWarnIfOversizedRecords` [kafka]

2025-04-04 Thread via GitHub


chia7712 commented on code in PR #19316:
URL: https://github.com/apache/kafka/pull/19316#discussion_r2022845482


##
core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala:
##
@@ -203,7 +203,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
   None
 } else {
   val metadataVersion = metadataVersionSupplier()
-  val version: Short = if (metadataVersion.fetchRequestVersion >= 13 && 
!fetchData.canUseTopicIds) {
+  val version: Short = if (!fetchData.canUseTopicIds) {

Review Comment:
   You are right. file https://issues.apache.org/jira/browse/KAFKA-19065



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-19006) Rack-aware Partition Assignment is broken when leader reassign

2025-04-04 Thread oshione gabriel esiemokhai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941108#comment-17941108
 ] 

oshione gabriel esiemokhai commented on KAFKA-19006:


do you have any proposed solutions? 

 

> Rack-aware Partition Assignment is broken when leader reassign
> --
>
> Key: KAFKA-19006
> URL: https://issues.apache.org/jira/browse/KAFKA-19006
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.4.1
>Reporter: HanXu
>Priority: Minor
>
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-881%3A+Rack-aware+Partition+Assignment+for+Kafka+Consumers]
>  
> The 'Rack-aware Partition Assignment' functions effectively when the consumer 
> rebalance takes place. However, it is unable to detect that the partition 
> leader has been reassigned to a different rack.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16758: Extend Consumer#close with an option to leave the group or not [kafka]

2025-04-04 Thread via GitHub


ableegoldman commented on code in PR #17614:
URL: https://github.com/apache/kafka/pull/17614#discussion_r2029656317


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java:
##
@@ -469,8 +493,17 @@ public int joinGroupEpoch() {
  */
 @Override
 public int leaveGroupEpoch() {
-return groupInstanceId.isPresent() ?
-ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH :
-ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+boolean isStaticMember = groupInstanceId.isPresent();
+if (REMAIN_IN_GROUP == leaveGroupOperation && isStaticMember) {

Review Comment:
   this condition is basically redundant, right? if `isStaticMember` is true 
then we'll return `LEAVE_GROUP_STATIC_MEMBER_EPOCH` anyways via the final check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16758: Extend Consumer#close with an option to leave the group or not [kafka]

2025-04-04 Thread via GitHub


ableegoldman commented on code in PR #17614:
URL: https://github.com/apache/kafka/pull/17614#discussion_r2029654165


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java:
##
@@ -211,6 +214,29 @@ public ConsumerMembershipManager membershipManager() {
 return membershipManager;
 }
 
+@Override
+protected boolean shouldSendLeaveHeartbeatNow() {
+// If the consumer has dynamic membership,
+// we should skip the leaving heartbeat when leaveGroupOperation is 
REMAIN_IN_GROUP
+if (membershipManager.groupInstanceId().isEmpty() && REMAIN_IN_GROUP 
== membershipManager.leaveGroupOperation())
+return false;
+return membershipManager().state() == MemberState.LEAVING;
+}
+
+@Override
+public NetworkClientDelegate.PollResult pollOnClose(long currentTimeMs) {
+// Determine if we should send a leaving heartbeat:
+// - For static membership (when groupInstanceId is present): Always 
send the leaving heartbeat
+// - For dynamic membership: Send the leaving heartbeat only when 
leaveGroupOperation is not REMAIN_IN_GROUP
+boolean shouldHeartbeat = 
membershipManager.groupInstanceId().isPresent()

Review Comment:
   Also from a quick look at the ConsumerMembershipManager#isLeavingGroup` 
implementation, it already includes checks on both of these conditions 
(`groupInstanceId.isPresent` and `REMAIN_IN_GROUP == 
membershipManager.leaveGroupOperation()`). imo it would be cleaner and easier 
to read if we just encapsulated all the logic around these conditions inside 
the `#isLeavingGroup` method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16758: Extend Consumer#close with an option to leave the group or not [kafka]

2025-04-04 Thread via GitHub


ableegoldman commented on code in PR #17614:
URL: https://github.com/apache/kafka/pull/17614#discussion_r2029660460


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java:
##
@@ -469,8 +493,17 @@ public int joinGroupEpoch() {
  */
 @Override
 public int leaveGroupEpoch() {
-return groupInstanceId.isPresent() ?
-ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH :
-ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+boolean isStaticMember = groupInstanceId.isPresent();
+if (REMAIN_IN_GROUP == leaveGroupOperation && isStaticMember) {
+return 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
+}
+
+if (LEAVE_GROUP == leaveGroupOperation) {
+return ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;

Review Comment:
   I think I understand why this condition is needed but it would be good to 
leave a comment explaining this change because it's pretty obscure. IIUC 
basically the problem is that there's no way to permanently leave the group 
with a static consumer right now, and the only method of removing static 
members is via an admin API. So we essentially have to "trick" the 
GroupMetadataManager into fencing this member to kick it out of the group, 
because if we use the `LEAVE_GROUP_STATIC_MEMBER_EPOCH ` then it thinks it's 
just being "temporarily" removed and this won't actually result in the consumer 
leaving the group. So we use `LEAVE_GROUP_MEMBER_EPOCH` because anything that 
isn't the `LEAVE_GROUP_STATIC_MEMBER_EPOCH` triggers the GroupMetadataManager 
to fence the consumer in its `#consumerGroupLeave` method.
   
   First off does that summary sound right? It's also pretty hacky, and while I 
think it's fine to proceed with this for now, I'm wondering if we shouldn't 
just try to implement proper LeaveGroup mechanics for static membership. I know 
this would require some server-side changes but maybe this can be a followup 
KIP? Again, no need to block this PR on that, just putting it out there. Feel 
free to pick that up yourself or file a ticket if that makes sense to you too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18935: Ensure brokers do not return null records in FetchResponse [kafka]

2025-04-04 Thread via GitHub


chia7712 commented on PR #19167:
URL: https://github.com/apache/kafka/pull/19167#issuecomment-2780150409

   @frankvicky this PR is not urgent, so it can be your first "merged" PR by 
yourself 😃 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17662: config.providers configuration missing from the docs [kafka]

2025-04-04 Thread via GitHub


m1a2st commented on PR #18930:
URL: https://github.com/apache/kafka/pull/18930#issuecomment-2780153869

   Hi @mjsax, thanks for your review, I believe this change is simply updating 
the documentation for something we've been missing. I wrote a simple demo using 
a Kafka consumer with `config.providers`, and it still works as expected. Given 
that, I don't think this change requires a KIP, WDTY?
   demo code:
   
https://github.com/m1a2st/Kafka-practice/blob/test-config-provider/kafka-basic/src/main/java/io/demos/kafka/configprovider/ConfigProviderConsumer.java
   
https://github.com/m1a2st/Kafka-practice/blob/test-config-provider/kafka-basic/src/main/java/io/demos/kafka/configprovider/MapConfigProvider.java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [MINOR] Cleanup Core Module- Scala Modules [kafka]

2025-04-04 Thread via GitHub


sjhajharia opened a new pull request, #19380:
URL: https://github.com/apache/kafka/pull/19380

   Now that Kafka Brokers support Java 17, this PR makes some changes in core 
module. The changes in this PR are limited to only the Scala files in the Core 
module's tests. The unit tests module is still pending. It shall follow next.
   The changes mostly include:
   
   - Collections.emptyList(), Collections.singletonList() and Arrays.asList() 
are replaced with List.of()
   - Collections.emptyMap() and Collections.singletonMap() are replaced with 
Map.of()
   - Collections.singleton() is replaced with Set.of()
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-19079) KIP-890 Cleanups

2025-04-04 Thread oshione gabriel esiemokhai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941095#comment-17941095
 ] 

oshione gabriel esiemokhai commented on KAFKA-19079:


okay so how do i find these items ? and you want them to belong to one Jira 
ticket correct? 

> KIP-890 Cleanups
> 
>
> Key: KAFKA-19079
> URL: https://issues.apache.org/jira/browse/KAFKA-19079
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Justine Olshan
>Priority: Minor
>
> Creating a parent Jira for some of the cleanups for KIP-890 that are not high 
> priority.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16394: Fix null propagation in foreign key join result [kafka]

2025-04-04 Thread via GitHub


AyoubOm commented on code in PR #15607:
URL: https://github.com/apache/kafka/pull/15607#discussion_r2029522029


##
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##
@@ -203,26 +206,26 @@ public void doJoinFromLeftThenDeleteLeftEntity(final 
boolean leftJoin,
 left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 4);
 
 {
-final Map expected = mkMap(
-mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
-mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")
-);
+final List> expected = Arrays.asList(
+KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
+KeyValue.pair("lhs2", "(lhsValue2|rhs2,rhsValue2)"));
+
 assertThat(
-outputTopic.readKeyValuesToMap(),
+outputTopic.readKeyValuesToList(),
 is(expected)
 );
 if (rejoin) {
 assertThat(
-rejoinOutputTopic.readKeyValuesToMap(),
-is(mkMap(
-mkEntry("lhs1", 
"rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
-mkEntry("lhs2", 
"rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
-))
+rejoinOutputTopic.readKeyValuesToList(),
+is(Arrays.asList(
+KeyValue.pair("lhs1", 
"rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
+KeyValue.pair("lhs2", 
"rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)"))
+)
 );
 }
 if (materialized) {
 assertThat(
-asMap(store),
+asList(store),

Review Comment:
   Agree, will change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-12410) KafkaAPis ought to group fetch data before generating fetch response

2025-04-04 Thread TaiJuWu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TaiJuWu reassigned KAFKA-12410:
---

Assignee: TaiJuWu  (was: Chia-Ping Tsai)

> KafkaAPis ought to group fetch data before generating fetch response
> 
>
> Key: KAFKA-12410
> URL: https://issues.apache.org/jira/browse/KAFKA-12410
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: TaiJuWu
>Priority: Major
>
> That helper methods introduce a couple of collections/groups so it would be 
> better to replace all usages by FetchResponseData.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-18276) Migrate ProducerRebootstrapTest to new test infra

2025-04-04 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-18276.

Resolution: Fixed

> Migrate ProducerRebootstrapTest to new test infra
> -
>
> Key: KAFKA-18276
> URL: https://issues.apache.org/jira/browse/KAFKA-18276
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Peter Lee
>Priority: Major
> Fix For: 4.1.0
>
>
> as title



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18892: KIP-877 Add support for ClientQuotaCallback [kafka]

2025-04-04 Thread via GitHub


m1a2st commented on code in PR #19068:
URL: https://github.com/apache/kafka/pull/19068#discussion_r2029637899


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java:
##
@@ -69,10 +70,42 @@ public void testCustomQuotaCallbackWithControllerServer() 
throws InterruptedExce
 && 
CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> 
counter.get() > 0), 
 "The CustomQuotaCallback not triggered in all controllers. 
"
 );
-
+
+}
+}
+
+
+@ClusterTest(

Review Comment:
   I don't think we need to align these methods. These tests belong to 
different modules, and this method is only used in two places. Extracting it 
into a utility at this point doesn't add much value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19071: Fix doc for remote.storage.enable [kafka]

2025-04-04 Thread via GitHub


FrankYang0529 commented on code in PR #19345:
URL: https://github.com/apache/kafka/pull/19345#discussion_r2029643749


##
core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala:
##
@@ -441,7 +441,29 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
   AlterConfigOp.OpType.SET),
   ))
 assertThrowsException(classOf[InvalidConfigurationException],
-  () => admin.incrementalAlterConfigs(configs).all().get(), "Disabling 
remote storage feature on the topic level is not supported.")
+  () => admin.incrementalAlterConfigs(configs).all().get(), "It is invalid 
to disable remote storage without deleting remote data. " +
+"If you want to keep the remote data and turn to read only, please set 
`remote.storage.enable=true,remote.log.copy.disable=true`. " +
+"If you want to disable remote storage and delete all remote data, 
please set `remote.storage.enable=false,remote.log.delete.on.disable=true`.")
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def 
testUpdateTopicConfigWithDisablingRemoteStorageWithDeleteOnDisable(quorum: 
String): Unit = {
+val admin = createAdminClient()
+val topicConfig = new Properties
+topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
"true")
+TestUtils.createTopicWithAdmin(admin, testTopicName, brokers, 
controllerServers, numPartitions, numReplicationFactor,
+  topicConfig = topicConfig)
+
+val configs = new util.HashMap[ConfigResource, 
util.Collection[AlterConfigOp]]()
+configs.put(new ConfigResource(ConfigResource.Type.TOPIC, testTopicName),
+  util.Arrays.asList(
+new AlterConfigOp(new 
ConfigEntry(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"),
+  AlterConfigOp.OpType.SET),
+new AlterConfigOp(new 
ConfigEntry(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, "true"),
+  AlterConfigOp.OpType.SET)
+  ))
+verifyRemoteLogTopicConfigs(topicConfig)

Review Comment:
   Thanks for the update. However, the config is changed, so we have to set new 
value to topic config to pass the verification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16758: Extend Consumer#close with an option to leave the group or not [kafka]

2025-04-04 Thread via GitHub


ableegoldman commented on code in PR #17614:
URL: https://github.com/apache/kafka/pull/17614#discussion_r2029652347


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java:
##
@@ -211,6 +214,29 @@ public ConsumerMembershipManager membershipManager() {
 return membershipManager;
 }
 
+@Override
+protected boolean shouldSendLeaveHeartbeatNow() {
+// If the consumer has dynamic membership,
+// we should skip the leaving heartbeat when leaveGroupOperation is 
REMAIN_IN_GROUP
+if (membershipManager.groupInstanceId().isEmpty() && REMAIN_IN_GROUP 
== membershipManager.leaveGroupOperation())
+return false;
+return membershipManager().state() == MemberState.LEAVING;
+}
+
+@Override
+public NetworkClientDelegate.PollResult pollOnClose(long currentTimeMs) {
+// Determine if we should send a leaving heartbeat:
+// - For static membership (when groupInstanceId is present): Always 
send the leaving heartbeat
+// - For dynamic membership: Send the leaving heartbeat only when 
leaveGroupOperation is not REMAIN_IN_GROUP
+boolean shouldHeartbeat = 
membershipManager.groupInstanceId().isPresent()

Review Comment:
   IIUC the only difference in this #pollOnClose implementation from the 
original one in the abstract class is the addition of this `shouldHeartbeat` 
condition to the `if membershipManager().isLeavingGroup()` check. 
   
   However it seems like the extra condition is built entirely of calls to 
`membershipManager` methods so I'm wondering if it doesn't make more sense to 
just incorporate the `shouldHeartbeat` condition into the `#isLeavingGroup` 
method of the ConsumerMembershipManager class? That way we don't need to make 
`#pollOnClose` abstract and can cut down on some redundant code 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18796: Added more information to error message when assertion fails for acquisition lock timeout [kafka]

2025-04-04 Thread via GitHub


AndrewJSchofield commented on code in PR #19247:
URL: https://github.com/apache/kafka/pull/19247#discussion_r2005360228


##
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##
@@ -6648,6 +6648,15 @@ public void 
testAcquireWhenBatchesRemovedForFetchOffsetForSameCachedBatch() {
 });
 });
 }
+
+private String assertionFailedMessage(SharePartition sharePartition) {

Review Comment:
   If I understand the usage of this correctly, the hard-coded `5L` here should 
be a parameter to this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] Cleanup Storage Module [kafka]

2025-04-04 Thread via GitHub


chia7712 merged PR #19072:
URL: https://github.com/apache/kafka/pull/19072


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-19004) Move DelayedDeleteRecords to server-common module

2025-04-04 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-19004:
--

 Summary: Move DelayedDeleteRecords to server-common module
 Key: KAFKA-19004
 URL: https://issues.apache.org/jira/browse/KAFKA-19004
 Project: Kafka
  Issue Type: Sub-task
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


as title



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18888: Add KIP-877 support to Authorizer [kafka]

2025-04-04 Thread via GitHub


mimaison commented on code in PR #19050:
URL: https://github.com/apache/kafka/pull/19050#discussion_r2028335989


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java:
##
@@ -94,8 +94,8 @@
 
 @State(Scope.Benchmark)
 @Fork(value = 1)
-@Warmup(iterations = 5)
-@Measurement(iterations = 15)
+@Warmup(iterations = 1)
+@Measurement(iterations = 3)

Review Comment:
   No this is unwanted, I just used these smaller numbers to run the benchmark 
more quickly. I'll revert it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19027: Replace ConsumerGroupCommandTestUtils#generator by ClusterTestDefaults [kafka]

2025-04-04 Thread via GitHub


Rancho-7 commented on code in PR #19347:
URL: https://github.com/apache/kafka/pull/19347#discussion_r2029669057


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java:
##
@@ -38,12 +34,6 @@
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
-import static org.apache.kafka.common.test.api.Type.CO_KRAFT;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 
 /**
  * The old test framework {@link 
kafka.api.BaseConsumerTest#getTestGroupProtocolParametersAll} test for the 
following cases:

Review Comment:
   Thanks for the suggestion! Updated it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-19090) Move DelayedFuture and DelayedFuturePurgatory to server module

2025-04-04 Thread PoAn Yang (Jira)
PoAn Yang created KAFKA-19090:
-

 Summary: Move DelayedFuture and DelayedFuturePurgatory to server 
module
 Key: KAFKA-19090
 URL: https://issues.apache.org/jira/browse/KAFKA-19090
 Project: Kafka
  Issue Type: Sub-task
Reporter: PoAn Yang
Assignee: PoAn Yang


Move DelayedFuture and DelayedFuturePurgatory to 
org.apache.kafka.server.purgatory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18379: Enforce resigned cannot transition to any other state in same epoch [kafka]

2025-04-04 Thread via GitHub


github-actions[bot] commented on PR #19236:
URL: https://github.com/apache/kafka/pull/19236#issuecomment-2780168744

   A label of 'needs-attention' was automatically added to this PR in order to 
raise the
   attention of the committers. Once this issue has been triaged, the `triage` 
label
   should be removed to prevent this automation from happening again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18870 Implement describeDelegationToken for controller [kafka]

2025-04-04 Thread via GitHub


github-actions[bot] commented on PR #19306:
URL: https://github.com/apache/kafka/pull/19306#issuecomment-2780168709

   A label of 'needs-attention' was automatically added to this PR in order to 
raise the
   attention of the committers. Once this issue has been triaged, the `triage` 
label
   should be removed to prevent this automation from happening again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18679: KafkaRaftMetrics metrics are exposing doubles instead of integers [kafka]

2025-04-04 Thread via GitHub


github-actions[bot] commented on PR #19220:
URL: https://github.com/apache/kafka/pull/19220#issuecomment-2780168765

   A label of 'needs-attention' was automatically added to this PR in order to 
raise the
   attention of the committers. Once this issue has been triaged, the `triage` 
label
   should be removed to prevent this automation from happening again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17662: config.providers configuration missing from the docs [kafka]

2025-04-04 Thread via GitHub


m1a2st commented on PR #18930:
URL: https://github.com/apache/kafka/pull/18930#issuecomment-2780157473

   > I don't know exactly how config.provider works from top of my head. Can 
you give a TL;DR? Would like to understand to what extend it applies to KS and 
if it would work as expected w/o any other changes?
   
   Config Providers is a mechanism for externalizing configuration values, 
particularly sensitive information like credentials, from configuration files. 
   
   ref: https://kafka.apache.org/documentation/#config_providers


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19027: Replace ConsumerGroupCommandTestUtils#generator by ClusterTestDefaults [kafka]

2025-04-04 Thread via GitHub


Rancho-7 commented on code in PR #19347:
URL: https://github.com/apache/kafka/pull/19347#discussion_r2029669057


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java:
##
@@ -38,12 +34,6 @@
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
-import static org.apache.kafka.common.test.api.Type.CO_KRAFT;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 
 /**
  * The old test framework {@link 
kafka.api.BaseConsumerTest#getTestGroupProtocolParametersAll} test for the 
following cases:

Review Comment:
   > It seems that this utils class does not generate a Kafka cluster. Could 
you also update the JavaDoc to reflect that?
   
   Thanks for the suggestion! Updated it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-18997) Move ConfigHelper to server module

2025-04-04 Thread PoAn Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PoAn Yang updated KAFKA-18997:
--
Parent: KAFKA-15852
Issue Type: Sub-task  (was: Improvement)

> Move ConfigHelper to server module
> --
>
> Key: KAFKA-18997
> URL: https://issues.apache.org/jira/browse/KAFKA-18997
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-18998) Move AuthHelper to server module

2025-04-04 Thread PoAn Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PoAn Yang updated KAFKA-18998:
--
Parent: KAFKA-15852
Issue Type: Sub-task  (was: Improvement)

> Move AuthHelper to server module
> 
>
> Key: KAFKA-18998
> URL: https://issues.apache.org/jira/browse/KAFKA-18998
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-19091) Flaky test DelayedFutureTest#testDelayedFuture

2025-04-04 Thread PoAn Yang (Jira)
PoAn Yang created KAFKA-19091:
-

 Summary: Flaky test DelayedFutureTest#testDelayedFuture
 Key: KAFKA-19091
 URL: https://issues.apache.org/jira/browse/KAFKA-19091
 Project: Kafka
  Issue Type: Test
Reporter: PoAn Yang


The test case is flaky in 3% pipeline recently.

 
{noformat}
org.opentest4j.AssertionFailedError: expected: <40> but was: <-1>

at 
org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145)
at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:531)
at 
integration.kafka.server.DelayedFutureTest.testDelayedFuture(DelayedFutureTest.scala:91)at
 java.lang.reflect.Method.invoke(Method.java:580)
at java.util.ArrayList.forEach(ArrayList.java:1597)
at java.util.ArrayList.forEach(ArrayList.java:1597){noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-19091) Flaky test DelayedFutureTest#testDelayedFuture

2025-04-04 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941188#comment-17941188
 ] 

PoAn Yang commented on KAFKA-19091:
---

Failed pipeline: 
https://github.com/apache/kafka/actions/runs/14269851970/job/4712339?pr=19372

> Flaky test DelayedFutureTest#testDelayedFuture
> --
>
> Key: KAFKA-19091
> URL: https://issues.apache.org/jira/browse/KAFKA-19091
> Project: Kafka
>  Issue Type: Test
>Reporter: PoAn Yang
>Priority: Minor
>
> The test case is flaky in 3% pipeline recently.
>  
> {noformat}
> org.opentest4j.AssertionFailedError: expected: <40> but was: <-1>
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> at org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
> at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
> at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145)
> at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:531)
> at 
> integration.kafka.server.DelayedFutureTest.testDelayedFuture(DelayedFutureTest.scala:91)at
>  java.lang.reflect.Method.invoke(Method.java:580)
> at java.util.ArrayList.forEach(ArrayList.java:1597)
> at java.util.ArrayList.forEach(ArrayList.java:1597){noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-19092) Flaky test QuorumControllerTest#testBalancePartitionLeaders

2025-04-04 Thread PoAn Yang (Jira)
PoAn Yang created KAFKA-19092:
-

 Summary: Flaky test 
QuorumControllerTest#testBalancePartitionLeaders
 Key: KAFKA-19092
 URL: https://issues.apache.org/jira/browse/KAFKA-19092
 Project: Kafka
  Issue Type: Test
Reporter: PoAn Yang


The test case is flaky in 3% pipeline recently.

 
{noformat}
org.opentest4j.AssertionFailedError: Condition not met within timeout 1. 
Leaders were not balanced after unfencing all of the brokers ==> expected: 
 but was: 

at 
org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
at org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:435)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:483)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:432)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:416)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:406)
at 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders(QuorumControllerTest.java:864)
at java.lang.reflect.Method.invoke(Method.java:580)at 
java.util.ArrayList.forEach(ArrayList.java:1597)
at java.util.ArrayList.forEach(ArrayList.java:1597){noformat}
Failed pipeline: 
[https://github.com/apache/kafka/actions/runs/14269851970/job/4712339?pr=19372]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16710:Continuously `makeFollower` may cause the replica fetcher thread to encounter an offset mismatch exception [kafka]

2025-04-04 Thread via GitHub


github-actions[bot] commented on PR #15929:
URL: https://github.com/apache/kafka/pull/15929#issuecomment-2780176557

   This PR is being marked as stale since it has not had any activity in 90 
days. If you
   would like to keep this PR alive, please leave a comment asking for a 
review. If the PR has 
   merge conflicts, update it with the latest from the base branch.
   
   If you are having difficulty finding a reviewer, please reach out on the 
   [mailing list](https://kafka.apache.org/contact).
   
   If this PR is no longer valid or desired, please feel free to close it. If 
no activity
   occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15556: Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect [kafka]

2025-04-04 Thread via GitHub


github-actions[bot] commented on PR #15020:
URL: https://github.com/apache/kafka/pull/15020#issuecomment-2780176526

   This PR is being marked as stale since it has not had any activity in 90 
days. If you
   would like to keep this PR alive, please leave a comment asking for a 
review. If the PR has 
   merge conflicts, update it with the latest from the base branch.
   
   If you are having difficulty finding a reviewer, please reach out on the 
   [mailing list](https://kafka.apache.org/contact).
   
   If this PR is no longer valid or desired, please feel free to close it. If 
no activity
   occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16599: LegacyConsumer should always await pending async commits on commitSync and close [kafka]

2025-04-04 Thread via GitHub


github-actions[bot] commented on PR #15693:
URL: https://github.com/apache/kafka/pull/15693#issuecomment-2780176545

   This PR is being marked as stale since it has not had any activity in 90 
days. If you
   would like to keep this PR alive, please leave a comment asking for a 
review. If the PR has 
   merge conflicts, update it with the latest from the base branch.
   
   If you are having difficulty finding a reviewer, please reach out on the 
   [mailing list](https://kafka.apache.org/contact).
   
   If this PR is no longer valid or desired, please feel free to close it. If 
no activity
   occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14588 ConfigCommand rewritten to java [kafka]

2025-04-04 Thread via GitHub


github-actions[bot] commented on PR #15417:
URL: https://github.com/apache/kafka/pull/15417#issuecomment-2780176535

   This PR is being marked as stale since it has not had any activity in 90 
days. If you
   would like to keep this PR alive, please leave a comment asking for a 
review. If the PR has 
   merge conflicts, update it with the latest from the base branch.
   
   If you are having difficulty finding a reviewer, please reach out on the 
   [mailing list](https://kafka.apache.org/contact).
   
   If this PR is no longer valid or desired, please feel free to close it. If 
no activity
   occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-966 unclean recovery elect leader request [kafka]

2025-04-04 Thread via GitHub


github-actions[bot] commented on PR #15876:
URL: https://github.com/apache/kafka/pull/15876#issuecomment-2780176549

   This PR is being marked as stale since it has not had any activity in 90 
days. If you
   would like to keep this PR alive, please leave a comment asking for a 
review. If the PR has 
   merge conflicts, update it with the latest from the base branch.
   
   If you are having difficulty finding a reviewer, please reach out on the 
   [mailing list](https://kafka.apache.org/contact).
   
   If this PR is no longer valid or desired, please feel free to close it. If 
no activity
   occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16902: Consider socket timeout in blocking Sender waits [kafka]

2025-04-04 Thread via GitHub


github-actions[bot] commented on PR #16220:
URL: https://github.com/apache/kafka/pull/16220#issuecomment-2780176568

   This PR is being marked as stale since it has not had any activity in 90 
days. If you
   would like to keep this PR alive, please leave a comment asking for a 
review. If the PR has 
   merge conflicts, update it with the latest from the base branch.
   
   If you are having difficulty finding a reviewer, please reach out on the 
   [mailing list](https://kafka.apache.org/contact).
   
   If this PR is no longer valid or desired, please feel free to close it. If 
no activity
   occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [DO NOT MERGE] KAFKA-14419: limit time spent processing during ongoing rebalance and delay followup rebalance trigger [kafka]

2025-04-04 Thread via GitHub


github-actions[bot] commented on PR #15009:
URL: https://github.com/apache/kafka/pull/15009#issuecomment-2780176518

   This PR is being marked as stale since it has not had any activity in 90 
days. If you
   would like to keep this PR alive, please leave a comment asking for a 
review. If the PR has 
   merge conflicts, update it with the latest from the base branch.
   
   If you are having difficulty finding a reviewer, please reach out on the 
   [mailing list](https://kafka.apache.org/contact).
   
   If this PR is no longer valid or desired, please feel free to close it. If 
no activity
   occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]

2025-04-04 Thread via GitHub


adixitconfluent commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2026362292


##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -2484,6 +2505,174 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
 }
 }
 
+private ShareAcquiredRecords 
maybeFilterAbortedTransactionalAcquiredRecords(
+FetchPartitionData fetchPartitionData,
+FetchIsolation isolationLevel,
+ShareAcquiredRecords shareAcquiredRecords
+) {
+if (isolationLevel != FetchIsolation.TXN_COMMITTED || 
fetchPartitionData.abortedTransactions.isEmpty() || 
fetchPartitionData.abortedTransactions.get().isEmpty())
+return shareAcquiredRecords;
+// When FetchIsolation.TXN_COMMITTED is used as isolation level by the 
share group, we need to filter any
+// transactions that were aborted/did not commit due to timeout.
+List result = 
filterAbortedTransactionalAcquiredRecords(fetchPartitionData.records.batches(),
+shareAcquiredRecords.acquiredRecords(), 
fetchPartitionData.abortedTransactions.get());
+int acquiredCount = 0;
+for (AcquiredRecords records : result) {
+acquiredCount += (int) (records.lastOffset() - 
records.firstOffset() + 1);
+}
+return new ShareAcquiredRecords(result, acquiredCount);
+}
+
+private List filterAbortedTransactionalAcquiredRecords(
+Iterable batches,
+List acquiredRecords,
+List abortedTransactions
+) {
+lock.writeLock().lock();

Review Comment:
   agreed, we don't need it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-18874) KRaft controller does not retry registration if the first attempt times out

2025-04-04 Thread Daniel Fonai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-18874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939589#comment-17939589
 ] 

Daniel Fonai commented on KAFKA-18874:
--

I think we might have found the root cause. There is a flag in 
ControllerRegistrationManager (pendingRpc) which is set when the controller 
registration request is queued. In case the request is not sent but timeout 
occurs, the flag is not reset and and request is not rescheduled: 
[https://github.com/apache/kafka/blob/c095faa5783b3b2f37f0be590eca7a3ab9aabc99/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala#L211-L214]

I created a PR to reset the flag on timeout: 
[https://github.com/apache/kafka/pull/19321].

> KRaft controller does not retry registration if the first attempt times out
> ---
>
> Key: KAFKA-18874
> URL: https://issues.apache.org/jira/browse/KAFKA-18874
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 3.9.0
>Reporter: Daniel Fonai
>Priority: Minor
> Attachments: controller-3.log, controller-4.log, controller-5.log
>
>
> There is a [retry 
> mechanism|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala#L274]
>  with exponential backoff built-in in KRaft controller registration. The 
> timeout of the first attempt is 5 s for KRaft controllers 
> ([code|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerServer.scala#L448])
>  which is not configurable.
> If for some reason the controller's first registration request times out, the 
> attempt should be retried but in practice this does not happen and the 
> controller is not able to join the quorum. We see the following in the faulty 
> controller's log:
> {noformat}
> 2025-02-21 13:31:46,606 INFO [ControllerRegistrationManager id=3 
> incarnation=mEzjHheAQ_eDWejAFquGiw] sendControllerRegistration: attempting to 
> send ControllerRegistrationRequestData(controllerId=3, 
> incarnationId=mEzjHheAQ_eDWejAFquGiw, zkMigrationReady=true, 
> listeners=[Listener(name='CONTROLPLANE-9090', 
> host='kraft-rollback-kafka-controller-pool-3.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-631e64ac.svc',
>  port=9090, securityProtocol=1)], features=[Feature(name='kraft.version', 
> minSupportedVersion=0, maxSupportedVersion=1), 
> Feature(name='metadata.version', minSupportedVersion=1, 
> maxSupportedVersion=21)]) (kafka.server.ControllerRegistrationManager) 
> [controller-3-registration-manager-event-handler]
> ...
> 2025-02-21 13:31:51,627 ERROR [ControllerRegistrationManager id=3 
> incarnation=mEzjHheAQ_eDWejAFquGiw] RegistrationResponseHandler: channel 
> manager timed out before sending the request. 
> (kafka.server.ControllerRegistrationManager) 
> [controller-3-to-controller-registration-channel-manager]
> 2025-02-21 13:31:51,726 INFO [ControllerRegistrationManager id=3 
> incarnation=mEzjHheAQ_eDWejAFquGiw] maybeSendControllerRegistration: waiting 
> for the previous RPC to complete. 
> (kafka.server.ControllerRegistrationManager) 
> [controller-3-registration-manager-event-handler]
> {noformat}
> After this we can not see any controller retry in the log.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-19084) Port KAFKA-16224 for acknowledgements in ShareConsumers.

2025-04-04 Thread Shivsundar R (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shivsundar R reassigned KAFKA-19084:


Assignee: Shivsundar R

> Port KAFKA-16224 for acknowledgements in ShareConsumers.
> 
>
> Key: KAFKA-19084
> URL: https://issues.apache.org/jira/browse/KAFKA-19084
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Shivsundar R
>Assignee: Shivsundar R
>Priority: Major
>
> Currently for ShareConsumers, if we receive an UNKNOWN_TOPIC_OR_PARTITION 
> error code in the ShareAcknowledgeResponse, then we retry sending the 
> acknowledgements until the timer expires.
> We ideally do not want this when a topic/partition is deleted, hence like the 
> CommitRequestManager(https://github.com/apache/kafka/pull/15581), we will 
> treat this error as fatal and not retry the acknowledgements.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-19084) Port KAFKA-16224 for acknowledgements in ShareConsumers.

2025-04-04 Thread Shivsundar R (Jira)
Shivsundar R created KAFKA-19084:


 Summary: Port KAFKA-16224 for acknowledgements in ShareConsumers.
 Key: KAFKA-19084
 URL: https://issues.apache.org/jira/browse/KAFKA-19084
 Project: Kafka
  Issue Type: Sub-task
Reporter: Shivsundar R


Currently for ShareConsumers, if we receive an UNKNOWN_TOPIC_OR_PARTITION error 
code in the ShareAcknowledgeResponse, then we retry sending the 
acknowledgements until the timer expires.
We ideally do not want this when a topic/partition is deleted, hence like the 
CommitRequestManager(https://github.com/apache/kafka/pull/15581), we will treat 
this error as fatal and not retry the acknowledgements.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-19084: Port KAFKA-16224, KAFKA-16764 for ShareConsumers [kafka]

2025-04-04 Thread via GitHub


ShivsundarR opened a new pull request, #19369:
URL: https://github.com/apache/kafka/pull/19369

   *What*
   Currently for ShareConsumers, if we receive an `UNKNOWN_TOPIC_OR_PARTITION` 
error code in the `ShareAcknowledgeResponse`, then we retry sending the 
acknowledgements until the timer expires.
   We ideally do not want this when a topic/partition is deleted, hence like 
the `CommitRequestManager`(https://github.com/apache/kafka/pull/15581), we will 
treat this error as fatal and not retry the acknowledgements.
   
   PR also suppresses `InvalidTopicException` during `unsubscribe()` which was 
also added in the 
`AsyncKafkaConsumer`(https://github.com/apache/kafka/pull/16043). It was later 
removed in the regular consumer 
here(https://github.com/apache/kafka/commit/d76238a18fb6a86b4b08bc04918ea1a0ee626517),
 as they notified the background operations of metadata errors instead of 
propagating them via `ErrorEvent`. 
   `ShareConsumerImpl` however does not require that change and it still 
propagates the metadata error back to the application. So we would need to 
suppress this exception during unsubscribe().


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19077: Propagate shutdownRequested field [kafka]

2025-04-04 Thread via GitHub


lucasbru commented on code in PR #19359:
URL: https://github.com/apache/kafka/pull/19359#discussion_r2028433315


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##
@@ -197,6 +197,13 @@ public static class DeadlineAndEpoch {
  */
 private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
 
+/**
+ * A flag to indicate whether a shutdown has been requested for this group.
+ * This has no direct effect inside the group coordinator, but is 
propagated to all members of the group.
+ * This is not persisted in the log.

Review Comment:
   Yes, I also wasn't sure about these things, so we discussed it in this 
thread: https://confluent.slack.com/archives/C06UA1XN326/p1743686382861189
   
   The aim of the PR is to keep it simple, so:
- The flag is best-effort, as in the old protocol
- The flag remains set until either there is a coordinator failover, or all 
members have shutdown. The application is not intended to be restarted after 
this flag.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19077: Propagate shutdownRequested field [kafka]

2025-04-04 Thread via GitHub


lucasbru commented on code in PR #19359:
URL: https://github.com/apache/kafka/pull/19359#discussion_r2028429508


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2070,8 +2067,6 @@ private CoordinatorResult stream
 String processId,
 Endpoint userEndpoint,
 List clientTags,
-List taskOffsets,
-List taskEndOffsets,

Review Comment:
   Yes. I am just removing them, since we won't make use of these fields in 
4.1/4.2 according to our updated plans, so I'm just cleaning up the code a bit 
here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19077: Propagate shutdownRequested field [kafka]

2025-04-04 Thread via GitHub


lucasbru commented on code in PR #19359:
URL: https://github.com/apache/kafka/pull/19359#discussion_r2028433315


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##
@@ -197,6 +197,13 @@ public static class DeadlineAndEpoch {
  */
 private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
 
+/**
+ * A flag to indicate whether a shutdown has been requested for this group.
+ * This has no direct effect inside the group coordinator, but is 
propagated to all members of the group.
+ * This is not persisted in the log.

Review Comment:
   Yes, I also wasn't sure about these things, so we discussed it in this 
thread: https://confluent.slack.com/archives/C06UA1XN326/p1743686382861189
   
   The aim of the PR is to keep it simple, so:
- The flag is best-effort, as in the old protocol
- The flag remains set until either there is a coordinator failover, or all 
members have shutdown. The members are not expected to be restarted when they 
receive this flag. If a new member joins the group before the last old member 
has shutdown, it will also be asked to shut down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-19085: SharePartitionManagerTest testMultipleConcurrentShareFetches throws silent exception and works incorrectly [kafka]

2025-04-04 Thread via GitHub


adixitconfluent opened a new pull request, #19370:
URL: https://github.com/apache/kafka/pull/19370

   ### What
   The tests testMultipleConcurrentShareFetches is throwing a silent exception.
   ERROR Error processing delayed share fetch request 
(kafka.server.share.DelayedShareFetch:225)
   
   This is due to incomplete mocks setup for the test and also requires changes 
in timeout.
   
   ### Testing
   I have run the test 500 times locally with the help of gradle to make sure 
there are no silent exceptions and it is not flaky.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19077: Propagate shutdownRequested field [kafka]

2025-04-04 Thread via GitHub


lucasbru commented on code in PR #19359:
URL: https://github.com/apache/kafka/pull/19359#discussion_r2028446403


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -16233,6 +16233,97 @@ public void 
testStreamsGroupMemberJoiningWithStaleTopology() {
 assertRecordsEquals(expectedRecords, result.records());
 }
 
+@Test
+public void testStreamsGroupMemberRequestingShutdownApplication() {
+String groupId = "fooup";
+String memberId1 = Uuid.randomUuid().toString();
+String memberId2 = Uuid.randomUuid().toString();
+String subtopology1 = "subtopology1";
+String fooTopicName = "foo";
+Uuid fooTopicId = Uuid.randomUuid();
+Topology topology = new Topology().setSubtopologies(List.of(
+new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+));
+
+MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withStreamsGroupTaskAssignors(List.of(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.build())
+.withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
+.withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(9)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
+.build())
+.withMember(streamsGroupMemberBuilderWithDefaults(memberId2)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(9)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5)))
+.build())
+.withTargetAssignment(memberId1, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
+.withTargetAssignment(memberId2, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5)))
+.withTargetAssignmentEpoch(10)
+.withTopology(StreamsTopology.fromHeartbeatRequest(topology))
+.withPartitionMetadata(Map.of(
+fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6)
+))
+)
+.build();
+
+CoordinatorResult 
result1 = context.streamsGroupHeartbeat(
+new StreamsGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId1)
+.setMemberEpoch(10)
+.setShutdownApplication(true)
+);
+
+assertResponseEquals(
+new StreamsGroupHeartbeatResponseData()
+.setMemberId(memberId1)
+.setMemberEpoch(10)
+.setHeartbeatIntervalMs(5000)
+.setStatus(List.of(
+new StreamsGroupHeartbeatResponseData.Status()
+.setStatusCode(Status.SHUTDOWN_APPLICATION.code())
+.setStatusDetail("A KafkaStreams instance encountered 
a fatal error and requested a shutdown for the entire application.")
+)),
+result1.response().data()
+);
+assertRecordsEquals(List.of(), result1.records());
+
+CoordinatorResult 
result2 = context.streamsGroupHeartbeat(
+new StreamsGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId2)
+.setMemberEpoch(10)
+.setShutdownApplication(true)

Review Comment:
   Good catch, that was a copy/paste error. 



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##
@@ -197,6 +197,13 @@ public static class DeadlineAndEpoch {
  */
 private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
 
+/**
+ * A flag to indicate whether a shutdown has been requested for this group.
+ * This has no direct effect inside the group coordinator, but is 
propagated to all members of the group.
+ * This is not persisted in the log.

Review Comment:
   I extended the comment a bit.



##
group-coordinator/src/main/java/org/apache/kafka/coord

[PR] KAFKA-19080 The constraint on segment.ms is not enforced at topic level [kafka]

2025-04-04 Thread via GitHub


m1a2st opened a new pull request, #19371:
URL: https://github.com/apache/kafka/pull/19371

   The main reason is that we forgot setting the 
`TopicConfig.SEGMENT_BYTES_CONFIG` at least to `1024 * 1024`, thus addressed 
it, and add a test for it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16718-2/n: KafkaAdminClient and GroupCoordinator implementation for DeleteShareGroupOffsets RPC [kafka]

2025-04-04 Thread via GitHub


AndrewJSchofield commented on PR #18976:
URL: https://github.com/apache/kafka/pull/18976#issuecomment-2778570067

   @chirag-wadhwa5 Please could you resolve the conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-19081) Share session capacity and eviction

2025-04-04 Thread Andrew Schofield (Jira)
Andrew Schofield created KAFKA-19081:


 Summary: Share session capacity and eviction
 Key: KAFKA-19081
 URL: https://issues.apache.org/jira/browse/KAFKA-19081
 Project: Kafka
  Issue Type: Sub-task
Reporter: Andrew Schofield
Assignee: Andrew Schofield


The share session cache currently operates in a similar manner to the fetch 
session cache. However, the eviction behaviour is not appropriate for share 
sessions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [MINOR] Cleanup Core Module [kafka]

2025-04-04 Thread via GitHub


sjhajharia opened a new pull request, #19372:
URL: https://github.com/apache/kafka/pull/19372

   Now that Kafka Brokers support Java 17, this PR makes some changes in core 
module. The changes in this PR are limited to only the Java files in the Core 
module. Scala related changes may follow next.
   The changes mostly include:
   - Collections.emptyList(), Collections.singletonList() and Arrays.asList() 
are replaced with List.of()
   - Collections.emptyMap() and Collections.singletonMap() are replaced with 
Map.of()
   - Collections.singleton() is replaced with Set.of()
   - Some changes to use enhanced switch statement.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19001: Use streams group-level configurations in heartbeat [kafka]

2025-04-04 Thread via GitHub


lucasbru commented on code in PR #19219:
URL: https://github.com/apache/kafka/pull/19219#discussion_r2028521537


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -18070,6 +18241,101 @@ public void testShareGroupDynamicConfigs() {
 context.assertNoRebalanceTimeout(groupId, memberId);
 }
 
+@Test
+public void testStreamsGroupDynamicConfigs() {
+String groupId = "fooup";
+String memberId = Uuid.randomUuid().toString();
+String subtopology1 = "subtopology1";
+String fooTopicName = "foo";
+Uuid fooTopicId = Uuid.randomUuid();
+Topology topology = new Topology().setSubtopologies(List.of(
+new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+));
+
+MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withStreamsGroupTaskAssignors(List.of(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.addRacks()
+.build())
+.build();
+
+assignor.prepareGroupAssignment(
+Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2;
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult 
result =
+context.streamsGroupHeartbeat(
+new StreamsGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(1)
+.setTopology(topology)
+.setActiveTasks(List.of())
+.setStandbyTasks(List.of())
+.setWarmupTasks(List.of()));
+assertEquals(1, result.response().data().memberEpoch());
+assertEquals(Map.of("num.standby.replicas", "0"), 
assignor.lastPassedAssignmentConfigs());
+
+// Verify heartbeat interval
+assertEquals(5000, result.response().data().heartbeatIntervalMs());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 45000);
+
+// Advance time.
+assertEquals(
+List.of(),
+context.sleep(result.response().data().heartbeatIntervalMs())
+);
+
+// Dynamic update group config
+Properties newGroupConfig = new Properties();
+newGroupConfig.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 5);
+newGroupConfig.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 1);
+newGroupConfig.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 2);
+context.updateGroupConfig(groupId, newGroupConfig);
+
+// Session timer is rescheduled on second heartbeat, new assignment 
with new parameter is calculated.
+result = context.streamsGroupHeartbeat(
+new StreamsGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(result.response().data().memberEpoch())
+.setRackId("bla"));
+assertEquals(2, result.response().data().memberEpoch());
+
+// Verify heartbeat interval
+assertEquals(1, result.response().data().heartbeatIntervalMs());
+
+// Verify that there is a session time.
+context.assertSessionTimeout(groupId, memberId, 5);
+
+// Verify that the new number of standby replicas is used
+assertEquals(Map.of("num.standby.replicas", "2"), 
assignor.lastPassedAssignmentConfigs());
+
+// Advance time.
+assertEquals(
+List.of(),
+context.sleep(result.response().data().heartbeatIntervalMs())
+);
+
+// Session timer is cancelled on leave.

Review Comment:
   See above.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -18070,6 +18241,101 @@ public void testShareGroupDynamicConfigs() {
 context.assertNoRebalanceTimeout(groupId, memberId);
 }
 
+@Test
+public void testStreamsGroupDynamicConfigs() {
+String groupId = "fooup";
+String memberId = Uuid.randomUuid().toString();
+String subtopology1 = "subtopology1";
+String fooTopicName = "foo";
+Uuid fooTopicId = Uuid.randomUuid();
+Topology topology = new Topology().setSubtopologies(List.of(
+new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+));
+
+MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()

Re: [PR] KAFKA-19001: Use streams group-level configurations in heartbeat [kafka]

2025-04-04 Thread via GitHub


lucasbru commented on code in PR #19219:
URL: https://github.com/apache/kafka/pull/19219#discussion_r2028522022


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -18070,6 +18241,101 @@ public void testShareGroupDynamicConfigs() {
 context.assertNoRebalanceTimeout(groupId, memberId);
 }
 
+@Test
+public void testStreamsGroupDynamicConfigs() {
+String groupId = "fooup";
+String memberId = Uuid.randomUuid().toString();
+String subtopology1 = "subtopology1";
+String fooTopicName = "foo";
+Uuid fooTopicId = Uuid.randomUuid();
+Topology topology = new Topology().setSubtopologies(List.of(
+new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+));
+
+MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withStreamsGroupTaskAssignors(List.of(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.addRacks()
+.build())
+.build();
+
+assignor.prepareGroupAssignment(
+Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2;
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult 
result =
+context.streamsGroupHeartbeat(
+new StreamsGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(1)
+.setTopology(topology)
+.setActiveTasks(List.of())
+.setStandbyTasks(List.of())
+.setWarmupTasks(List.of()));
+assertEquals(1, result.response().data().memberEpoch());
+assertEquals(Map.of("num.standby.replicas", "0"), 
assignor.lastPassedAssignmentConfigs());
+
+// Verify heartbeat interval
+assertEquals(5000, result.response().data().heartbeatIntervalMs());

Review Comment:
   It's from 
`GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT`. I 
referred that line instead.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -18070,6 +18241,101 @@ public void testShareGroupDynamicConfigs() {
 context.assertNoRebalanceTimeout(groupId, memberId);
 }
 
+@Test
+public void testStreamsGroupDynamicConfigs() {
+String groupId = "fooup";
+String memberId = Uuid.randomUuid().toString();
+String subtopology1 = "subtopology1";
+String fooTopicName = "foo";
+Uuid fooTopicId = Uuid.randomUuid();
+Topology topology = new Topology().setSubtopologies(List.of(
+new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+));
+
+MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withStreamsGroupTaskAssignors(List.of(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.addRacks()
+.build())
+.build();
+
+assignor.prepareGroupAssignment(
+Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2;
+
+// Session timer is scheduled on first heartbeat.
+CoordinatorResult 
result =
+context.streamsGroupHeartbeat(
+new StreamsGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(1)
+.setTopology(topology)
+.setActiveTasks(List.of())
+.setStandbyTasks(List.of())
+.setWarmupTasks(List.of()));
+assertEquals(1, result.response().data().memberEpoch());

Review Comment:
   Sure, this just shows that a new target assignment was calculated - if the 
config would be missing from the next line, one cause could be that the 
assignor was not called at all. So there is some overlap with the next line, 
but this check gives context and rules out possible errors if the next line 
ever fails. I would keep it, but let me know if you want me to remove it.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -17848,6 +17921,10

Re: [PR] KAFKA-19077: Propagate shutdownRequested field [kafka]

2025-04-04 Thread via GitHub


lucasbru commented on code in PR #19359:
URL: https://github.com/apache/kafka/pull/19359#discussion_r2028446631


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##
@@ -1106,4 +1106,32 @@ public void testIsSubscribedToTopic() {
 assertTrue(streamsGroup.isSubscribedToTopic("test-topic2"));
 assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
 }
+
+@Test
+public void testShutdownRequestedMethods() {
+String memberId1 = "test-member-id1";
+String memberId2 = "test-member-id2";
+LogContext logContext = new LogContext();
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+GroupCoordinatorMetricsShard metricsShard = 
mock(GroupCoordinatorMetricsShard.class);
+StreamsGroup streamsGroup = new StreamsGroup(logContext, 
snapshotRegistry, "test-group", metricsShard);
+
+
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId1));
+
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId2));
+
+// Initially, shutdown should not be requested
+assertFalse(streamsGroup.isShutdownRequested());
+
+// Set shutdown requested
+streamsGroup.maybeSetShutdownRequested(memberId1, true);
+assertTrue(streamsGroup.isShutdownRequested());
+
+// As long as group not empty, remain in shutdown requested state
+streamsGroup.removeMember(memberId1);
+assertTrue(streamsGroup.isShutdownRequested());
+
+// As soon as the group is empty, clear the shutdown requested state
+streamsGroup.removeMember(memberId2);
+assertFalse(streamsGroup.isShutdownRequested());
+}

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19001: Use streams group-level configurations in heartbeat [kafka]

2025-04-04 Thread via GitHub


lucasbru commented on code in PR #19219:
URL: https://github.com/apache/kafka/pull/19219#discussion_r2028522552


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -15776,6 +15779,76 @@ public void testStreamsGroupMemberEpochValidation() {
 assertEquals(100, result.response().data().memberEpoch());
 }
 
+@Test
+public void testStreamsNewMemberIsRejectedWithMaximumMembersIsReached() {
+String groupId = "fooup";
+String memberId1 = Uuid.randomUuid().toString();
+String memberId2 = Uuid.randomUuid().toString();
+String memberId3 = Uuid.randomUuid().toString();
+
+String subtopology1 = "subtopology1";
+String fooTopicName = "foo";
+Uuid fooTopicId = Uuid.randomUuid();
+String subtopology2 = "subtopology2";
+String barTopicName = "bar";
+Uuid barTopicId = Uuid.randomUuid();
+Topology topology = new Topology().setSubtopologies(List.of(
+new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)),
+new 
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))

Review Comment:
   True. Simplified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18891: KIP-877 add support for RemoteLogMetadataManager and RemoteStorageManager [kafka]

2025-04-04 Thread via GitHub


mimaison commented on code in PR #19286:
URL: https://github.com/apache/kafka/pull/19286#discussion_r2027343207


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -413,24 +418,19 @@ void 
testRemoteLogMetadataManagerWithEndpointConfigOverridden() throws IOExcepti
 tp -> Optional.of(mockLog),
 (topicPartition, offset) -> { },
 brokerTopicStats,
-metrics) {
+metrics,
+endPoint) {
+@Override
 public RemoteStorageManager createRemoteStorageManager() {
 return remoteStorageManager;
 }
+@Override
 public RemoteLogMetadataManager createRemoteLogMetadataManager() {
 return remoteLogMetadataManager;
 }
 }) {
-
-String host = "localhost";
-int port = 1234;
-String securityProtocol = "PLAINTEXT";
-Endpoint endpoint = new Endpoint(securityProtocol, 
SecurityProtocol.PLAINTEXT, host, port);
-remoteLogManager.onEndPointCreated(endpoint);
-remoteLogManager.startup();
-
 ArgumentCaptor> capture = 
ArgumentCaptor.forClass(Map.class);
-verify(remoteLogMetadataManager, 
times(1)).configure(capture.capture());
+verify(remoteLogMetadataManager, 
times(2)).configure(capture.capture());

Review Comment:
   This could be a comment



##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -441,7 +441,6 @@ public RemoteLogMetadataManager 
createRemoteLogMetadataManager() {
 
 @Test
 void testStartup() {
-remoteLogManager.startup();

Review Comment:
   Do we need to adjust or keep this test now that the `startup()` method does 
not exist anymore?



##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -3011,7 +2996,6 @@ public RemoteLogMetadataManager 
createRemoteLogMetadataManager() {
 
 task.cleanupExpiredRemoteLogSegments();
 
-verifyNoMoreInteractions(remoteStorageManager);

Review Comment:
   So should we assert that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19074: Remove the cached responseData from ShareFetchResponse [kafka]

2025-04-04 Thread via GitHub


FrankYang0529 commented on code in PR #19357:
URL: https://github.com/apache/kafka/pull/19357#discussion_r2028360352


##
server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java:
##
@@ -66,6 +79,19 @@ public void testErrorInAllPartitions() {
 assertTrue(shareFetch.errorInAllPartitions());
 }
 
+@Test
+public void testDontCacheAnyData() {
+ShareFetchResponse shareFetch = shareFetchResponse(tidp0, records, 
Errors.NONE, "", (short) 0,
+"", List.of(), 0);
+LinkedHashMap 
responseData = shareFetch.responseData(Map.of(tidp0.topicId(), tidp0.topic()));
+assertEquals(1, responseData.size());
+responseData.forEach((topicIdPartition, partitionData) -> 
assertEquals(records, partitionData.records()));
+
+LinkedHashMap 
nonResponseData = shareFetch.responseData(Map.of());
+assertEquals(0, nonResponseData.size());
+nonResponseData.forEach((topicIdPartition, partitionData) -> 
assertEquals(MemoryRecords.EMPTY, partitionData.records()));

Review Comment:
   If `nonResponseData` size is 0, the assertion for data inside it will not be 
executed. Do we still need this line? Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18761: [2/N] List share group offsets with state and auth [kafka]

2025-04-04 Thread via GitHub


AndrewJSchofield commented on code in PR #19328:
URL: https://github.com/apache/kafka/pull/19328#discussion_r2028378471


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1365,6 +1368,67 @@ public 
CompletableFuture
 describeShareGroupAllOffsets(
+RequestContext context,
+
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData
+) {
+if (!isActive.get()) {
+return CompletableFuture.completedFuture(
+
DescribeShareGroupOffsetsRequest.getErrorDescribedGroup(requestData.groupId(), 
Errors.COORDINATOR_NOT_AVAILABLE));
+}
+
+if (metadataImage == null) {
+return CompletableFuture.completedFuture(
+
DescribeShareGroupOffsetsRequest.getErrorDescribedGroup(requestData.groupId(), 
Errors.COORDINATOR_NOT_AVAILABLE));
+}
+
+return runtime.scheduleReadOperation(
+"share-group-initialized-partitions",
+topicPartitionFor(requestData.groupId()),
+(coordinator, offset) -> 
coordinator.initializedShareGroupPartitions(requestData.groupId())
+).thenCompose(topicPartitionMap -> {
+Map requestTopicIdToNameMapping = new HashMap<>();
+
List
 describeShareGroupOffsetsResponseTopicList = new 
ArrayList<>(topicPartitionMap.size());
+ReadShareGroupStateSummaryRequestData readSummaryRequestData = new 
ReadShareGroupStateSummaryRequestData()
+.setGroupId(requestData.groupId());
+topicPartitionMap.forEach((topicId, partitionSet) -> {
+String topicName = 
metadataImage.topics().topicIdToNameView().get(topicId);
+if (topicName != null) {
+requestTopicIdToNameMapping.put(topicId, topicName);
+readSummaryRequestData.topics().add(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+.setTopicId(topicId)
+.setPartitions(
+partitionSet.stream().map(
+partitionIndex -> new 
ReadShareGroupStateSummaryRequestData.PartitionData().setPartition(partitionIndex)
+).toList()
+));
+}
+});
+return readShareGroupStateSummary(readSummaryRequestData, 
requestTopicIdToNameMapping, describeShareGroupOffsetsResponseTopicList);
+});

Review Comment:
   There is already exception handling inside `readShareGroupStateSummary` and 
there is unit testing of it (`testDescribeShareGroupAllOffsetsThrowsError` for 
example).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Added trace logs to help debug SharePartition [kafka]

2025-04-04 Thread via GitHub


AndrewJSchofield merged PR #19358:
URL: https://github.com/apache/kafka/pull/19358


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14523: Move DelayedRemoteListOffsets to the storage module [kafka]

2025-04-04 Thread via GitHub


chia7712 commented on code in PR #19285:
URL: https://github.com/apache/kafka/pull/19285#discussion_r2028502006


##
storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java:
##
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.purgatory;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.message.ListOffsetsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.log.OffsetResultHolder;
+
+import com.yammer.metrics.core.Meter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+public class DelayedRemoteListOffsets extends DelayedOperation {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(DelayedRemoteListOffsets.class);
+
+// For compatibility, metrics are defined to be under 
`kafka.server.DelayedRemoteListOffsetsMetrics` class
+private static final KafkaMetricsGroup METRICS_GROUP = new 
KafkaMetricsGroup("kafka.server", "DelayedRemoteListOffsetsMetrics");
+static final Meter AGGREGATE_EXPIRATION_METER = 
METRICS_GROUP.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS);
+static final Map PARTITION_EXPIRATION_METERS = new 
ConcurrentHashMap<>();
+
+private final int version;
+private final Map 
statusByPartition;
+private final Consumer partitionOrException;
+private final 
Consumer> 
responseCallback;
+
+public DelayedRemoteListOffsets(long delayMs,
+int version,
+Map statusByPartition,
+Consumer 
partitionOrException,
+
Consumer> 
responseCallback) {
+super(delayMs);
+this.version = version;
+this.statusByPartition = statusByPartition;
+this.partitionOrException = partitionOrException;
+this.responseCallback = responseCallback;
+// Mark the status as completed, if there is no async task to track.
+// If there is a task to track, then build the response as 
REQUEST_TIMED_OUT by default.
+statusByPartition.forEach((topicPartition, status) -> {
+status.completed(status.futureHolderOpt().isEmpty());
+if (status.futureHolderOpt().isPresent()) {
+
status.responseOpt(Optional.of(buildErrorResponse(Errors.REQUEST_TIMED_OUT, 
topicPartition.partition(;
+}
+LOG.trace("Initial partition status for {} is {}", topicPartition, 
status);
+});
+}
+
+/**
+ * Call-back to execute when a delayed operation gets expired and hence 
forced to complete.
+ */
+@Override
+public void onExpiration() {
+statusByPartition.forEach((topicPartition, status) -> {
+if (!status.completed()) {
+LOG.debug("Expiring list offset request for partition {} with 
status {}", topicPartition, status);
+status.futureHolderOpt().ifPresent(futureHolder -> 
futureHolder.jobFuture().cancel(true));
+recordExpiration(topicPartition);
+}
+});
+}
+
+/**
+ * Process for completing an operation; This function needs to be defined
+ * in subclasses and will be called exactly once in forceComplete()
+ */
+@Override
+public void onComplete() {
+Map> groupedByTopic = 
new HashMap<>();
+for (Map.Entry entry : 
statusByPartition.entrySet()) {
+List 
partitions =
+groupedByTopic.computeIfAbsent(entry.getKey().topic(), k 
-> new ArrayList

Re: [PR] KAFKA-19077: Propagate shutdownRequested field [kafka]

2025-04-04 Thread via GitHub


lucasbru commented on PR #19359:
URL: https://github.com/apache/kafka/pull/19359#issuecomment-2778072098

   @mjsax @jeffkbkim Thanks for the comments! All addressed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19001: Use streams group-level configurations in heartbeat [kafka]

2025-04-04 Thread via GitHub


lucasbru commented on code in PR #19219:
URL: https://github.com/apache/kafka/pull/19219#discussion_r2028511160


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/MockTaskAssignor.java:
##
@@ -60,6 +65,7 @@ public String name() {
 @Override
 public GroupAssignment assign(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber)
 throws TaskAssignorException {
+assignmentConfigs = groupSpec.assignmentConfigs();

Review Comment:
   We are using `lastPassedAssignmentConfigs` to test which configs are passed 
to the assignor. The `assignmentConfigs` were already there in the `GroupSpec`, 
not changes were required. The Mockassignor was just ignoring the map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-19086) Extending support for Microsecond Precision for Kafka Connect

2025-04-04 Thread Pritam Kumar (Jira)
Pritam Kumar created KAFKA-19086:


 Summary: Extending support for Microsecond Precision for Kafka 
Connect
 Key: KAFKA-19086
 URL: https://issues.apache.org/jira/browse/KAFKA-19086
 Project: Kafka
  Issue Type: Improvement
  Components: connect
Reporter: Pritam Kumar
 Fix For: 4.0.1


While formats like {*}Avro{*}, {*}Parquet{*}, and others support higher 
precision timestamps (including microseconds :: 
[https://avro.apache.org/docs/1.11.0/spec.html|https://avro.apache.org/docs/1.11.0/spec.html#Timestamp+%28microsecond+precision%29]),
 Kafka Connect has been limited to handling only millisecond precision 
(timestamp). 
As a result, any timestamp data expressed in microseconds is truncated when 
communicated between the Kafka Connect source and sink, leading to potential 
loss of precision and data fidelity.
This change aims to extend the logical timestamp support in Kafka Connect to 
include timestamp-micros. By doing so, it ensures that the full precision of 
time data can be communicated accurately across the entire pipeline without any 
loss of information.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR VerifableProducer ducktape can set idempotency and retries [kafka]

2025-04-04 Thread via GitHub


FrankYang0529 commented on code in PR #19362:
URL: https://github.com/apache/kafka/pull/19362#discussion_r2028331394


##
tests/kafkatest/services/verifiable_producer.py:
##
@@ -147,9 +147,10 @@ def _worker(self, idx, node):
 if self.enable_idempotence:
 self.logger.info("Setting up an idempotent producer")
 producer_prop_file += "\nmax.in.flight.requests.per.connection=5\n"
-producer_prop_file += "\nretries=100\n"
 producer_prop_file += "\nenable.idempotence=true\n"
-elif self.retries is not None:
+if self.retries is None:
+producer_prop_file += "\nretries=100\n"
+if self.retries is not None:

Review Comment:
   In retries document 
https://kafka.apache.org/documentation/#producerconfigs_retries, it mentions
   ```
   Enabling idempotence requires this config value to be greater than 0.
   If conflicting configurations are set and idempotence is not explicitly 
enabled, idempotence is disabled.
   ```
   
   I think we should add assertion to `__int__` to make sure if 
enable_idempotence is true, then retries should not be zero.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-19025: Reuse LZ4 buffers for compression [kafka]

2025-04-04 Thread via GitHub


squah-confluent opened a new pull request, #19260:
URL: https://github.com/apache/kafka/pull/19260

   Introduce a buffer reuse mechanism for LZ4 compression, similar to the
   ones Snappy and zstd compression have.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15931: Cancel RemoteLogReader gracefully [kafka]

2025-04-04 Thread via GitHub


jeqo commented on PR #19150:
URL: https://github.com/apache/kafka/pull/19150#issuecomment-2768303521

   Opened https://github.com/apache/kafka/pull/19331 for 4.0 backport
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18576: Convert ConfigType to Enum [kafka]

2025-04-04 Thread via GitHub


chia7712 commented on code in PR #18711:
URL: https://github.com/apache/kafka/pull/18711#discussion_r2008102525


##
server-common/src/main/java/org/apache/kafka/server/config/ConfigType.java:
##
@@ -16,19 +16,30 @@
  */
 package org.apache.kafka.server.config;
 
+import java.util.Arrays;
 import java.util.List;
 
 /**
  * Represents all the entities that can be configured.
  */
-public class ConfigType {
-public static final String TOPIC = "topics";
-public static final String CLIENT = "clients";
-public static final String USER = "users";
-public static final String BROKER = "brokers";
-public static final String IP = "ips";
-public static final String CLIENT_METRICS = "client-metrics";
-public static final String GROUP = "groups";
+public enum ConfigType {
+TOPIC("topics"),
+CLIENT("clients"),
+USER("users"),
+BROKER("brokers"),
+IP("ips"),
+CLIENT_METRICS("client-metrics"),
+GROUP("groups");
 
-public static final List ALL = List.of(TOPIC, CLIENT, USER, 
BROKER, IP, CLIENT_METRICS, GROUP);
+private final String value;
+
+ConfigType(String value) {
+this.value = value;
+}
+
+public String value() {
+return value;
+}
+
+public static final List ALL = 
Arrays.stream(ConfigType.values()).map(ConfigType::value).toList();

Review Comment:
   Could you please inline it to `ConfigCommand`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >