Re: [PR] KAFKA-16758: Extend Consumer#close with an option to leave the group or not [kafka]

2025-04-05 Thread via GitHub


ableegoldman commented on PR #17614:
URL: https://github.com/apache/kafka/pull/17614#issuecomment-2781223772

   Merged to trunk (🥳 ) -- do you want to start working on the followup PR now 
to utilize this new API for `KafkaStreams#close`? This will also let us clean 
up the `internal.leave.group.on.close` config that streams has been using as a 
backdoor. Now that we have the `REMAIN_IN_GROUP` option we can use that as the 
default for Streams and get rid of the internal config entirely


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16758: Extend Consumer#close with an option to leave the group or not [kafka]

2025-04-05 Thread via GitHub


frankvicky commented on PR #17614:
URL: https://github.com/apache/kafka/pull/17614#issuecomment-2781237676

   Hi @ableegoldman 
   A huge thanks for your patience and review! 🙇🏼 
   >  do you want to start working on the followup PR now to utilize this new 
API for KafkaStreams#close?
   Yes, I will start to tackle it soon 🚀 
   I will ping you once the patch is ready.
   
   Thanks again 😸 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR:modify scala $var to java string [kafka]

2025-04-05 Thread via GitHub


showuon commented on code in PR #18393:
URL: https://github.com/apache/kafka/pull/18393#discussion_r2030056754


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java:
##
@@ -470,8 +470,8 @@ public FetchDataInfo read(long startOffset,
 return maybeHandleIOException(
 () -> "Exception while reading from " + topicPartition + " in 
dir " + dir.getParent(),
 () -> {
-logger.trace("Reading maximum $maxLength bytes at offset 
{} from log with total length {} bytes",
-startOffset, segments.sizeInBytes());
+logger.trace("Reading maximum {} bytes at offset {} from 
log with total length {} bytes",
+maxLength, startOffset, segments.sizeInBytes());

Review Comment:
   +1! Nice catch @chia7712 !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Cleanup 0.10.x legacy references in ClusterResourceListener and TopicConfig (clients module) [kafka]

2025-04-05 Thread via GitHub


LoganZhuZzz opened a new pull request, #19388:
URL: https://github.com/apache/kafka/pull/19388

   This PR is a minor follow-up to [PR 
#19320](https://github.com/apache/kafka/pull/19320), which cleaned up 0.10.x 
legacy information from the clients module.
   
   It addresses remaining reviewer suggestions that were not included in the 
original PR:
   
   - `ClusterResourceListener`: Removed "Note the minimum supported broker 
version is 2.1." per review suggestion to avoid repeating version-specific 
details across multiple classes.
   - `TopicConfig`: Simplified `MAX_MESSAGE_BYTES_DOC` by removing obsolete 
notes about behavior in versions prior to 0.10.2.
   
   These changes help reduce outdated version information in client 
documentation and improve clarity.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] Cleanup Core Module- Scala Modules [kafka]

2025-04-05 Thread via GitHub


sjhajharia commented on PR #19380:
URL: https://github.com/apache/kafka/pull/19380#issuecomment-2781209728

   OffloadAndTxnConsumeFromLeaderTest -> Seems to be a flaky test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16407: Fix foreign key INNER join on change of FK from/to a null value [kafka]

2025-04-05 Thread via GitHub


mjsax commented on PR #19303:
URL: https://github.com/apache/kafka/pull/19303#issuecomment-2781214593

   Thanks for the patch! Merged to `trunk`, and cherry-picked to `4.0` and 
`3.9` branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16758: Extend Consumer#close with an option to leave the group or not [kafka]

2025-04-05 Thread via GitHub


ableegoldman merged PR #17614:
URL: https://github.com/apache/kafka/pull/17614


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19018,KAFKA-19063: Implement maxRecords and acquisition lock timeout in share fetch request and response resp. [kafka]

2025-04-05 Thread via GitHub


apoorvmittal10 commented on code in PR #19334:
URL: https://github.com/apache/kafka/pull/19334#discussion_r2022389354


##
clients/src/main/resources/common/message/ShareFetchResponse.json:
##
@@ -39,6 +39,8 @@
   "about": "The top-level response error code." },
 { "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
   "about": "The top-level error message, or null if there was no error." },
+{ "name": "AcquisitionLockTimeout", "type": "int64", "versions": "0+",

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-18667) Add ducktape tests for simultaneous broker + controller failure

2025-04-05 Thread Kevin Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Wu resolved KAFKA-18667.
--
Resolution: Fixed

> Add ducktape tests for simultaneous broker + controller failure
> ---
>
> Key: KAFKA-18667
> URL: https://issues.apache.org/jira/browse/KAFKA-18667
> Project: Kafka
>  Issue Type: Task
>Reporter: Kevin Wu
>Assignee: Kevin Wu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]

2025-04-05 Thread via GitHub


ShivsundarR commented on code in PR #19295:
URL: https://github.com/apache/kafka/pull/19295#discussion_r2022174511


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##
@@ -1234,26 +1087,20 @@ public void testRetryAcknowledgementsWithLeaderChange() 
{
 subscriptions.assignFromSubscribed(partitions);
 
 client.updateMetadata(
-RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 1),
-tp -> validLeaderEpoch, topicIds, false));
+RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 1),

Review Comment:
   Thanks, yeah it was some IDE setting which changed the indentation 
automatically, I missed it.
   I have changed this back now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18736: Add pollOnClose() and maximumTimeToWait() [kafka]

2025-04-05 Thread via GitHub


cadonna merged PR #19233:
URL: https://github.com/apache/kafka/pull/19233


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15655) Consider making transactional apis more compatible with topic IDs

2025-04-05 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-15655:
---
Parent Issue: KAFKA-19079  (was: KAFKA-14402)

> Consider making transactional apis more compatible with topic IDs
> -
>
> Key: KAFKA-15655
> URL: https://issues.apache.org/jira/browse/KAFKA-15655
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> Some ideas include adding topic ID to AddPartitions and other topic partition 
> specific APIs.
> Adding topic ID as a tagged field in the transactional state logs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18913: Removing _state.updater.enabled_ flag through the Stream… [kafka]

2025-04-05 Thread via GitHub


cadonna commented on code in PR #19275:
URL: https://github.com/apache/kafka/pull/19275#discussion_r2024268591


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##
@@ -616,7 +598,7 @@ public void flushCache() {
 public void close() throws ProcessorStateException {
 log.debug("Closing its state manager and all the registered state 
stores: {}", stores);
 
-if (!stateUpdaterEnabled && changelogReader != null) {
+if (changelogReader != null) {
 changelogReader.unregister(getAllChangelogTopicPartitions());
 }

Review Comment:
   Since `stateUpdaterEnabled` is basically always `true` if we remove the 
flag, the `if`-condition should always be `false` and the code guarded by the 
`if`-condition should never be executed.
   
   IMO, the changelog reader can be removed from the `ProcessorStateManager` 
since registering changelog topics is done in the state updater. Only the old 
code path that did not use the state updater needed the changelog reader here.
   
   If the `ProcessorStateManager` does not need the changelog reader, the 
active task creator and the standby task creator do also not need the changelog 
reader.  



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -372,15 +371,13 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
   final Runnable shutdownErrorHook,
   final BiConsumer 
streamsUncaughtExceptionHandler) {
 
-final boolean stateUpdaterEnabled = 
InternalConfig.stateUpdaterEnabled(config.originals());
-
 final String threadId = clientId + THREAD_ID_SUBSTRING + threadIdx;
 final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING, 
STATE_UPDATER_ID_SUBSTRING);
-final String restorationThreadId = stateUpdaterEnabled ? 
stateUpdaterId : threadId;
+final String restorationThreadId = stateUpdaterId;

Review Comment:
   You could directly use `stateUpdaterId` instead of `restorationThreadId` 
since the distinction between state updater ID and thread ID for restoration 
does not hold anymore.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -407,30 +406,6 @@ public long position(final TopicPartition partition) {
 }
 }
 
-@ParameterizedTest
-@EnumSource(value = Task.TaskType.class, names = {"ACTIVE", "STANDBY"})
-public void shouldPollWithRightTimeoutWithStateUpdater(final Task.TaskType 
type) {
-setupStateManagerMock(type);
-setupStoreMetadata();
-setupStore();
-shouldPollWithRightTimeout(true, type);
-}
-
-@ParameterizedTest
-@EnumSource(value = Task.TaskType.class, names = {"ACTIVE", "STANDBY"})
-public void shouldPollWithRightTimeoutWithoutStateUpdater(final 
Task.TaskType type) {
-setupStateManagerMock(type);
-setupStoreMetadata();
-setupStore();
-shouldPollWithRightTimeout(false, type);
-}
-
-private void shouldPollWithRightTimeout(final boolean stateUpdaterEnabled, 
final Task.TaskType type) {
-final Properties properties = new Properties();
-properties.put(InternalConfig.STATE_UPDATER_ENABLED, 
stateUpdaterEnabled);
-shouldPollWithRightTimeout(properties, type);
-}
-
 @ParameterizedTest
 @EnumSource(value = Task.TaskType.class, names = {"ACTIVE", "STANDBY"})
 public void shouldPollWithRightTimeoutWithStateUpdaterDefault(final 
Task.TaskType type) {

Review Comment:
   Could you please include `shouldPollWithRightTimeout()` into this test and 
rename this test to `shouldPollWithRightTimeout()`? 



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##
@@ -1716,15 +1699,14 @@ public void 
shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterE
 runOnce(processingThreadsEnabled);
 
 // the third actually polls, processes the record, and throws the 
corruption exception
-if (stateUpdaterEnabled) {
-TestUtils.waitForCondition(
+TestUtils.waitForCondition(
 () -> thread.taskManager().checkStateUpdater(
 mockTime.milliseconds(),
 topicPartitions -> 
mockConsumer.seekToBeginning(singleton(t1p1))
 ),
 10 * 1000,
 "State updater never returned tasks.");

Review Comment:
   Please fix the indentation here. 



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##
@@ -664,7 +646,7 @@ else if (exception instanceof StreamsException)
 void recycle() {
 log.debug("Recycling state for {} task {}.", taskType, taskId);
 
-if (!stateUpdaterEnabled && changelogReader !

[jira] [Commented] (KAFKA-19007) Issues running the apache/kafka docker container on M4 Macs with OS >= 15.2

2025-04-05 Thread Christofer Dutz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17936504#comment-17936504
 ] 

Christofer Dutz commented on KAFKA-19007:
-

Some additional Links to the issue:
[https://bugs.openjdk.org/browse/JDK-8345296]

[https://github.com/corretto/corretto-21/issues/85]

[https://forums.docker.com/t/image-builds-fail-on-new-macbook-despite-working-fine-on-prior-apple-silicon/145772/5?u=bluepuma77]

 

> Issues running the apache/kafka docker container on M4 Macs with OS >= 15.2
> ---
>
> Key: KAFKA-19007
> URL: https://issues.apache.org/jira/browse/KAFKA-19007
> Project: Kafka
>  Issue Type: Bug
>  Components: docker
>Affects Versions: 3.9.0
>Reporter: Christofer Dutz
>Priority: Major
>
> It seems there's an odd issue running java inside docker containers running 
> on MacOS systems with an M4 chip and using an OS version that's 15.2 or 
> greater. In this case the JVM simply dies with a core-dump.
> This can be avoided, by passing "-XX:UseSVE=0" to the jvm, which can be done 
> by setting KAFKA_OPTS="-XX:UserSVE=0". However, this doesn't completely seem 
> to work. 
> I was using TestContainers to start Kafka in docker, and inside the container 
> the file: /opt/kafka/bin/kafka-run-class.sh contains this block of code:
> {code:java}
> if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then
>   GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX
>   # The first segment of the version number, which is '1' for releases before 
> Java 9
>   # it then becomes '9', '10', ...
>   # Some examples of the first line of `java --version`:
>   # 8 -> java version "1.8.0_152"
>   # 9.0.4 -> java version "9.0.4"
>   # 10 -> java version "10" 2018-03-20
>   # 10.0.1 -> java version "10.0.1" 2018-04-17
>   # We need to match to the end of the line to prevent sed from printing the 
> characters that do not match
>   JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -E -n 's/.* version 
> "([0-9]*).*$/\1/p')
>   if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then
> 
> KAFKA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=100M"
>   else
> KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc 
> -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps 
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
>   fi
> fi {code}
> On these systems, the execution of:
> {code:java}
>  JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -E -n 's/.* version 
> "([0-9]*).*$/\1/p') {code}
> Results in the JAVA_MAJOR_VERSION being set to the value "Aborted", which 
> makes the if statement go into the else branch. This then sets 
> PrintGCDateStamps, which makes the JVM die with an error message. 
> Currently I have noticed that on these systems using the apache/kafka-native 
> image seems to work around the issue.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-17806: remove this-escape suppress warnings in AclCommand [kafka]

2025-04-05 Thread via GitHub


FrankYang0529 opened a new pull request, #19256:
URL: https://github.com/apache/kafka/pull/19256

   Change AclCommandOptions to final class, so we can remove 
`@SuppressWarnings("this-escape")` on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-19068) Eliminate the duplicate type check in creating ControlRecord

2025-04-05 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-19068:
--

 Summary: Eliminate the duplicate type check in creating 
ControlRecord
 Key: KAFKA-19068
 URL: https://issues.apache.org/jira/browse/KAFKA-19068
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


`RecordsIterator#decodeControlRecord` [0] do the type check and then 
`ControlRecord` constructor [1] does that again ...

we should add a static method to ControlRecord to create `ControlRecord` with 
type check, and then `ControlRecord` constructor should be changed to private 
to ensure all instance is created by the static method 


[0] 
https://github.com/apache/kafka/blob/ccf2510fdda755b7a8e0900d3316fade46eb533a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java#L361
[1] 
https://github.com/apache/kafka/blob/ccf2510fdda755b7a8e0900d3316fade46eb533a/raft/src/main/java/org/apache/kafka/raft/ControlRecord.java#L45



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Mark streams RPCs as unstable [kafka]

2025-04-05 Thread via GitHub


lucasbru commented on PR #19292:
URL: https://github.com/apache/kafka/pull/19292#issuecomment-2754724705

   @cadonna 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18913: Removing _state.updater.enabled_ flag through the Stream… [kafka]

2025-04-05 Thread via GitHub


janchilling commented on code in PR #19275:
URL: https://github.com/apache/kafka/pull/19275#discussion_r2021686307


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -801,13 +786,13 @@ void maybeGetClientInstanceIds() {
 }
 } else {
 mainConsumerInstanceIdFuture.completeExceptionally(
-new TimeoutException("Could not retrieve main consumer 
client instance id.")
+new TimeoutException("Could not retrieve main 
consumer client instance id.")
 );
 }
 }
 
 
-if (!stateUpdaterEnabled && 
!restoreConsumerInstanceIdFuture.isDone()) {
+if (!restoreConsumerInstanceIdFuture.isDone()) {

Review Comment:
   Ah yeah makes sense, removed the entire condition and is updated in the 
latest commit. There was another condition where part of it checked for the 
same `!stateUpdaterEnabled && !restoreConsumerInstanceIdFuture.isDone()` 
condition in the latter part of the same method. Remove that condition also 
since it will always be false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16580: Enable dynamic quorum reconfiguration for raft simulation tests [kafka]

2025-04-05 Thread via GitHub


kevin-wu24 commented on code in PR #18987:
URL: https://github.com/apache/kafka/pull/18987#discussion_r2006278691


##
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##
@@ -1127,14 +1331,75 @@ private MajorityReachedHighWatermark(Cluster cluster) {
 
 @Override
 public void verify() {
-cluster.leaderHighWatermark().ifPresent(highWatermark -> {
-long numReachedHighWatermark = 
cluster.nodes.entrySet().stream()
-.filter(entry -> 
cluster.voters.containsKey(entry.getKey()))
-.filter(entry -> entry.getValue().log.endOffset().offset() 
>= highWatermark)
-.count();
-assertTrue(
-numReachedHighWatermark >= cluster.majoritySize(),
-"Insufficient nodes have reached current high watermark");
+if (cluster.withKip853) {
+/*
+* For clusters running in KIP-853 mode, we check that a 
majority of at least one of:
+* 1. the leader's voter set at the HWM
+* 2. the leader's lastVoterSet()
+* has reached the HWM. We need to perform a more elaborate 
check here because in clusters where
+* an Add/RemoveVoter request increases/decreases the majority 
of voters value by 1, the leader
+* could have used either majority value to update its HWM 
value. This is because depending on
+* whether the leader read the most recent VotersRecord prior 
to updating its HWM value, the number
+* of nodes (the majority) used to calculate that HWM value is 
different. This matters for invariant
+* checking because we perform this verification on every 
message delivery.
+* */
+cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> {
+leaderNode.client.highWatermark().ifPresent(highWatermark 
-> {
+VoterSet voterSet = 
leaderNode.client.partitionState().lastVoterSet();
+long numReachedHighWatermark = 
numReachedHighWatermark(highWatermark, voterSet.voterIds());
+if (numReachedHighWatermark < 
cluster.majoritySize(voterSet.size())) {
+
leaderNode.client.partitionState().voterSetAtOffset(highWatermark - 
1).ifPresent(otherVoterSet -> {

Review Comment:
   ~~I agree that this minimal voter set check is valid, but I think it is too 
relaxed for a cluster with dynamic voters. Specifically, I have concerns about 
this statement, but maybe I'm misunderstanding what is going on:~~
   > KRaft is allowed to commit up to the HWM between voterSet1 or voterSet2, 
with either voter set.
   
   ~~From my understanding of the code, once KRaft becomes aware of voterSet2, 
which happens at the following point in execution because 
`partitionState.lastVoterSet()` is set to voterSet2: `appendAsLeader -> 
partitionState.updateState()`, it should be using voterSet2 to calculate the 
HWM thereafter when it handles `FetchRequests` in subsequent events.~~
   
   ~~For example, in the case of adding a voter, voterSet1 is the minimal voter 
set with majority size X, and voterSet2 is the uncommitted voter set with 
majority size X + 1. This means when verifying the HWM value, we would be 
checking that X nodes have replicated up to the HWM offset, not X + 1 nodes, 
which should only happen if the HWM value was actually calculated with 
voterSet1.~~



##
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##
@@ -1127,14 +1331,75 @@ private MajorityReachedHighWatermark(Cluster cluster) {
 
 @Override
 public void verify() {
-cluster.leaderHighWatermark().ifPresent(highWatermark -> {
-long numReachedHighWatermark = 
cluster.nodes.entrySet().stream()
-.filter(entry -> 
cluster.voters.containsKey(entry.getKey()))
-.filter(entry -> entry.getValue().log.endOffset().offset() 
>= highWatermark)
-.count();
-assertTrue(
-numReachedHighWatermark >= cluster.majoritySize(),
-"Insufficient nodes have reached current high watermark");
+if (cluster.withKip853) {
+/*
+* For clusters running in KIP-853 mode, we check that a 
majority of at least one of:
+* 1. the leader's voter set at the HWM
+* 2. the leader's lastVoterSet()
+* has reached the HWM. We need to perform a more elaborate 
check here because in clusters where
+* an Add/RemoveVoter request increases/decreases the majority 
of voters value by 1, the leader
+* could have used either majority value to update its HWM 
value. This is because depending on
+* whether the leader read the most rec

[PR] [DO NOT MERGE] Find Producer test thread leak [kafka]

2025-04-05 Thread via GitHub


m1a2st opened a new pull request, #19381:
URL: https://github.com/apache/kafka/pull/19381

   FYI: 
https://github.com/apache/kafka/actions/runs/14277595321/job/40023202087?pr=18930
 
   
https://github.com/apache/kafka/actions/runs/14145654402/job/39632853567?pr=18930
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-18563) move RaftClientTestContext xyzRpcVersion methods into RaftProtocol

2025-04-05 Thread PoAn Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PoAn Yang reassigned KAFKA-18563:
-

Assignee: PoAn Yang

> move RaftClientTestContext xyzRpcVersion methods into RaftProtocol 
> ---
>
> Key: KAFKA-18563
> URL: https://issues.apache.org/jira/browse/KAFKA-18563
> Project: Kafka
>  Issue Type: Task
>  Components: kraft
>Reporter: Alyssa Huang
>Assignee: PoAn Yang
>Priority: Minor
>
> relies on https://github.com/apache/kafka/pull/18240



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-19026) AlterConfigPolicy incompatibility between ZK mode and KRaft mode when using AlterConfigOp.OpType.SUBTRACT

2025-04-05 Thread Edoardo Comar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-19026:
--
Attachment: (was: KAFKA19026Test-1.java)

> AlterConfigPolicy incompatibility between ZK mode and KRaft mode when using 
> AlterConfigOp.OpType.SUBTRACT
> -
>
> Key: KAFKA-19026
> URL: https://issues.apache.org/jira/browse/KAFKA-19026
> Project: Kafka
>  Issue Type: Bug
>  Components: core, migration
>Affects Versions: 3.9.0, 3.8.1
>Reporter: Edoardo Comar
>Priority: Major
> Attachments: KAFKA19026Policy.java, KAFKA19026Test.java
>
>
> When processing an Incremental Alter Config on a Config entry of type List 
> with OpType.SUBTRACT
> the metadata passed to  {color:#00}AlterConfigPolicy.validate contains 
> {color}
>  * {color:#00}in KRaft mode : {color}{color:#00}the config that would 
> result AFTER the subtraction{color}
>  * {color:#00}in ZK mode : the config as if the opType was OpType.SET, 
> with no indication that actually the value would be removed{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-19058) Running the streams/streams-scala module tests produces a streams-scala.log

2025-04-05 Thread Jira
黃竣陽 created KAFKA-19058:
---

 Summary: Running the streams/streams-scala module tests produces a 
streams-scala.log
 Key: KAFKA-19058
 URL: https://issues.apache.org/jira/browse/KAFKA-19058
 Project: Kafka
  Issue Type: Task
Reporter: 黃竣陽
Assignee: 黃竣陽


Running tests should not produce untracked files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: leverage preProcessParsedConfig within AbstractConfig [kafka]

2025-04-05 Thread via GitHub


chia7712 commented on PR #19259:
URL: https://github.com/apache/kafka/pull/19259#issuecomment-2742476082

   This is a kind of big fix, so please add a test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19042: [2/N] Move TransactionsExpirationTest to client-integration-tests module [kafka]

2025-04-05 Thread via GitHub


chia7712 commented on code in PR #19288:
URL: https://github.com/apache/kafka/pull/19288#discussion_r2021404113


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/TransactionsExpirationTest.java:
##
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.ProducerState;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidPidMappingException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterFeature;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
+import org.apache.kafka.server.common.Feature;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.config.ServerConfigs;
+import org.apache.kafka.server.config.ServerLogConfigs;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+types = {Type.CO_KRAFT},
+brokers = 3,
+serverProperties = {
+@ClusterConfigProperty(key = 
ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
+// Set a smaller value for the number of partitions for the 
__consumer_offsets topic
+// so that the creation of that topic/partition(s) and subsequent 
leader assignment doesn't take relatively long.
+@ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+@ClusterConfigProperty(key = 
TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "3"),
+@ClusterConfigProperty(key = 
TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
+@ClusterConfigProperty(key = 
TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "2"),
+@ClusterConfigProperty(key = 
ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "true"),
+@ClusterConfigProperty(key = "log.unclean.leader.election.enable", 
value = "false"),
+@ClusterConfigProperty(key = 
ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"),
+@ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"),
+@ClusterConfigProperty(key = 
TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG,
 value = "200"),
+@ClusterConfigProperty(key = 
TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, value = 
"1"),
+@Clust

[jira] [Updated] (KAFKA-19022) Display cluster IDs being compared when encountering INCONSISTENT_CLUSTER_ID error

2025-04-05 Thread Ranganath Samudrala (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ranganath Samudrala updated KAFKA-19022:

Description: 
While migrating Kafka from zookeeper to kraft, we see errors in logs like

INCONSISTENT_CLUSTER_ID in FETCH response 

or

INCONSISTENT_CLUSTER_ID in VOTER response 

But cluster IDs being compared is not displayed in logs so there is not enough 
information to see where the issue is. Is the class data *clusterId* empty 
(which could potentially be a bug?) or incoming *clusterId* empty or incorrect?

[KafkaRaftClient|https://github.com/apache/kafka/blob/31e1a57c41cf9cb600751669dc71bcd9596b45f9/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1459]
{quote} 

    private boolean hasValidClusterId(String requestClusterId) {
        // We don't enforce the cluster id if it is not provided.
        if (requestClusterId == null){

             return true;

         }

        return clusterId.equals(requestClusterId);
    }

.

.

 

    private CompletableFuture handleFetchRequest(
        RaftRequest.Inbound requestMetadata,
        long currentTimeMs
    ) {
        FetchRequestData request = (FetchRequestData) requestMetadata.data();

        if (!hasValidClusterId(request.clusterId())){

             return completedFuture(new 
FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));

         }

.

.

 

```
{quote}

  was:
While migrating Kafka from zookeeper to kraft, we see errors in logs like

INCONSISTENT_CLUSTER_ID in FETCH response 

or

INCONSISTENT_CLUSTER_ID in VOTER response 

But cluster IDs compared is not displayed in logs so there is not enough 
information to see where the issue is. Is the class data *clusterId* empty 
(which could potentially be a bug?)

[KafkaRaftClient|https://github.com/apache/kafka/blob/31e1a57c41cf9cb600751669dc71bcd9596b45f9/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1459]
{quote} 

    private boolean hasValidClusterId(String requestClusterId) {
        // We don't enforce the cluster id if it is not provided.
        if (requestClusterId == null) {
            return true;
        }
        return clusterId.equals(requestClusterId);
    }

.

.

 

    private CompletableFuture handleFetchRequest(
        RaftRequest.Inbound requestMetadata,
        long currentTimeMs
    ) {
        FetchRequestData request = (FetchRequestData) requestMetadata.data();

        if (!hasValidClusterId(request.clusterId())) {
            return completedFuture(new 
FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
        }

.

.

 

```
{quote}


> Display cluster IDs being compared when encountering INCONSISTENT_CLUSTER_ID 
> error
> --
>
> Key: KAFKA-19022
> URL: https://issues.apache.org/jira/browse/KAFKA-19022
> Project: Kafka
>  Issue Type: Improvement
>  Components: logging
>Affects Versions: 3.9.0
>Reporter: Ranganath Samudrala
>Priority: Major
>
> While migrating Kafka from zookeeper to kraft, we see errors in logs like
> INCONSISTENT_CLUSTER_ID in FETCH response 
> or
> INCONSISTENT_CLUSTER_ID in VOTER response 
> But cluster IDs being compared is not displayed in logs so there is not 
> enough information to see where the issue is. Is the class data *clusterId* 
> empty (which could potentially be a bug?) or incoming *clusterId* empty or 
> incorrect?
> [KafkaRaftClient|https://github.com/apache/kafka/blob/31e1a57c41cf9cb600751669dc71bcd9596b45f9/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1459]
> {quote} 
>     private boolean hasValidClusterId(String requestClusterId) {
>         // We don't enforce the cluster id if it is not provided.
>         if (requestClusterId == null){
>              return true;
>          }
>         return clusterId.equals(requestClusterId);
>     }
> .
> .
>  
>     private CompletableFuture handleFetchRequest(
>         RaftRequest.Inbound requestMetadata,
>         long currentTimeMs
>     ) {
>         FetchRequestData request = (FetchRequestData) requestMetadata.data();
>         if (!hasValidClusterId(request.clusterId())){
>              return completedFuture(new 
> FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
>          }
> .
> .
>  
> ```
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-19002) Rewrite ListOffsetsIntegrationTest and move it to clients-integration-test

2025-04-05 Thread Dmitry Werner (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17936260#comment-17936260
 ] 

Dmitry Werner commented on KAFKA-19002:
---

[~chia7712] Can I take it on?

> Rewrite ListOffsetsIntegrationTest and move it to clients-integration-test
> --
>
> Key: KAFKA-19002
> URL: https://issues.apache.org/jira/browse/KAFKA-19002
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> the following tasks should be addressed in this ticket
>  # rewrite it by new test infra
>  # use java
>  # move it to clients-integration-test



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18913[WIP]: Removal of code guarded by the negation of _state.u… [kafka]

2025-04-05 Thread via GitHub


janchilling closed pull request #19208: KAFKA-18913[WIP]: Removal of code 
guarded by the negation of _state.u…
URL: https://github.com/apache/kafka/pull/19208


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14542) Deprecate OffsetFetch/Commit version 0 and remove them in 4.0

2025-04-05 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-14542.
-
Resolution: Duplicate

Addressed by https://issues.apache.org/jira/browse/KAFKA-14560.

> Deprecate OffsetFetch/Commit version 0 and remove them in 4.0
> -
>
> Key: KAFKA-14542
> URL: https://issues.apache.org/jira/browse/KAFKA-14542
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> We should deprecate OffsetFetch/Commit APIs and remove them in AK 4.0. Those 
> two APIs are used by old clients to write offsets to and read offsets from ZK.
> We need a small KIP for this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18981: Fix flaky QuorumControllerTest.testMinIsrUpdateWithElr [kafka]

2025-04-05 Thread via GitHub


FrankYang0529 commented on PR #19262:
URL: https://github.com/apache/kafka/pull/19262#issuecomment-2743335132

   @mumrah The QuorumControllerTest#testMinIsrUpdateWithElr is flaky but 
doesn't have `@Flaky` tag. For this case, do we want to add the tag to it? 
Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Modify KafkaEventQueue VoidEvent to as singleton [kafka]

2025-04-05 Thread via GitHub


gongxuanzhang opened a new pull request, #19356:
URL: https://github.com/apache/kafka/pull/19356

   class `VoidEvent` provides singleton object , but nobody use it.
   I think we should private `VoidEvent` constructor and only use singleton


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12744: dependency upgrade: `argparse4j` 0.7.0 -->> 0.9.0 [kafka]

2025-04-05 Thread via GitHub


mimaison commented on PR #19265:
URL: https://github.com/apache/kafka/pull/19265#issuecomment-2761152756

   Both options are valid, but ideally we'd want to refactor the current logic 
to not use the deprecated APIs.
   
   At a glance this does not seem too complicated. As mentioned in 
https://github.com/apache/kafka/pull/10626 you'll also have to ensure this does 
not change the behavior of the impacted tools.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-19082) Client side changes to enable 2PC

2025-04-05 Thread Ritika Reddy (Jira)
Ritika Reddy created KAFKA-19082:


 Summary: Client side changes to enable 2PC
 Key: KAFKA-19082
 URL: https://issues.apache.org/jira/browse/KAFKA-19082
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ritika Reddy
Assignee: Ritika Reddy






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: migrate BrokerCompressionTest to storage module [kafka]

2025-04-05 Thread via GitHub


TaiJuWu commented on code in PR #19277:
URL: https://github.com/apache/kafka/pull/19277#discussion_r2011385275


##
storage/src/test/java/org/apache/kafka/storage/internals/log/BrokerCompressionTest.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class BrokerCompressionTest {
+private final File tmpDir = TestUtils.tempDirectory();
+private final File logDir = TestUtils.randomPartitionLogDir(tmpDir);
+private final MockTime time = new MockTime(0, 0);
+
+@AfterEach
+public void tearDown() throws IOException {
+Utils.delete(tmpDir);
+}
+
+/**
+ * Test broker-side compression configuration
+ */
+@ParameterizedTest
+@MethodSource("parameters")
+public void testBrokerSideCompression(CompressionType 
messageCompressionType, BrokerCompressionType brokerCompressionType) throws 
IOException {
+Compression messageCompression = 
Compression.of(messageCompressionType).build();
+Properties logProps = new Properties();
+logProps.put(TopicConfig.COMPRESSION_TYPE_CONFIG, 
brokerCompressionType.name);
+
+/* Configure broker-side compression */
+UnifiedLog log = UnifiedLog.create(
+logDir,
+new LogConfig(logProps),
+0L,
+0L,
+time.scheduler,
+new BrokerTopicStats(),
+time,
+5 * 60 * 1000,
+new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+new LogDirFailureChannel(10),
+true,
+Optional.empty()
+);
+
+/* Append two messages */
+log.appendAsLeader(
+MemoryRecords.withRecords(messageCompression, 0,
+new SimpleRecord("hello".getBytes()),
+new SimpleRecord("there".getBytes())
+), 0
+);
+
+if (brokerCompressionType != BrokerCompressionType.PRODUCER) {
+RecordBatch batch = readBatch(log, 0);
+Compression targetCompression = 
BrokerCompressionType.targetCompression(log.config().compression, null);
+assertEquals(targetCompression.type(), batch.compressionType(), 
"Compression at offset 0 should produce " + brokerCompressionType);
+} else {
+assertEquals(messageCompressionType, readBatch(log, 
0).compressionType(), "Compression at offset 0 should produce " + 
messageCompressionType);
+}
+}
+
+private static RecordBatch readBatch(UnifiedLog log, int offset) throws 
IOException {
+FetchDataInfo fetchInfo = log.read(offset, 4096, 
FetchIsolation.LOG_END, true);
+return fetchInfo.records.batches().iterator().next();
+}
+
+private static Stream parameters() {

Review Comment:
   I change the name to `allCompressionParameter`.
   Thanks for suggesting.



-- 
This is an automat

Re: [PR] MINOR: Modify KafkaEventQueue VoidEvent to as singleton and use more proper function interface [kafka]

2025-04-05 Thread via GitHub


gongxuanzhang commented on PR #19356:
URL: https://github.com/apache/kafka/pull/19356#issuecomment-2775186778

   @chia7712  PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Mar 18 test 2 [kafka-merge-queue-sandbox]

2025-04-05 Thread via GitHub


mumrah opened a new pull request, #64:
URL: https://github.com/apache/kafka-merge-queue-sandbox/pull/64

   Another test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14486: Move LogCleanerManager to storage module [kafka]

2025-04-05 Thread via GitHub


wernerdv commented on PR #19216:
URL: https://github.com/apache/kafka/pull/19216#issuecomment-2740992633

   I fixed the tests, ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18923: resource leak in RSM fetchIndex inputStream [kafka]

2025-04-05 Thread via GitHub


FrankYang0529 commented on PR #19111:
URL: https://github.com/apache/kafka/pull/19111#issuecomment-2769816571

   Hi @showuon, thanks for the suggestion. Following are all references about 
`fetchLogSegment` and `fetchIndex`.
   
   * InputStream fetchLogSegment(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, int startPosition)
 * RemoteLogManager#lookupTimestamp: 
https://github.com/apache/kafka/blob/1eea7f0528954ce8dcbcc4357ae2ef28c1d1e5f2/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L653-L655
 * RemoteLogManager#read: 
https://github.com/apache/kafka/blob/1eea7f0528954ce8dcbcc4357ae2ef28c1d1e5f2/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1744-L1748
 * _LocalTieredStorageTest#verifyFetchedLogSegment: Fixed by this PR._
   * InputStream fetchLogSegment(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, int startPosition, int endPosition)
 * LocalTieredStorageTest: All usage throws exception before creating input 
stream.
   * InputStream fetchIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata, 
IndexType indexType)
 * _TierStateMachine#readLeaderEpochCheckpoint and 
TierStateMachine#buildProducerSnapshotFile: Fixed by this PR._
 * RemoteIndexCache#createCacheEntry: The input stream is supplied by a 
function and is closed in RemoteIndexCache#loadIndexFile. 
https://github.com/apache/kafka/blob/1eea7f0528954ce8dcbcc4357ae2ef28c1d1e5f2/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java#L355-L357
 * _LocalTieredStorageTest#verifyFetchedOffsetIndex, 
LocalTieredStorageTest#verifyFetchedTimeIndex, 
LocalTieredStorageTest#verifyFetchedTransactionIndex, 
LocalTieredStorageTest#verifyLeaderEpochCheckpoint, and 
LocalTieredStorageTest#verifyProducerSnapshot: Fixed by this PR. The input 
stream is closed in LocalTieredStorageTest#verifyFileContents._


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-19019) Verify Share Fetch with Tiered Storage

2025-04-05 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal reassigned KAFKA-19019:
-

Assignee: Abhinav Dixit  (was: Apoorv Mittal)

> Verify Share Fetch with Tiered Storage
> --
>
> Key: KAFKA-19019
> URL: https://issues.apache.org/jira/browse/KAFKA-19019
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Abhinav Dixit
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-19022) Display cluster IDs being compared when encountering INCONSISTENT_CLUSTER_ID error

2025-04-05 Thread Lorcan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lorcan reassigned KAFKA-19022:
--

Assignee: (was: Lorcan)

> Display cluster IDs being compared when encountering INCONSISTENT_CLUSTER_ID 
> error
> --
>
> Key: KAFKA-19022
> URL: https://issues.apache.org/jira/browse/KAFKA-19022
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft, logging
>Affects Versions: 3.9.0
>Reporter: Ranganath Samudrala
>Priority: Major
>
> While migrating Kafka from zookeeper to kraft, we see errors in logs like
> {{INCONSISTENT_CLUSTER_ID in FETCH response }}
> or
> {{INCONSISTENT_CLUSTER_ID in VOTER response }}
> But cluster IDs being compared is not displayed in logs so there is not 
> enough information to see where the issue is. Is the class data *clusterId* 
> empty (which could potentially be a bug?) or incoming *clusterId* empty or 
> incorrect?
> [KafkaRaftClient|https://github.com/apache/kafka/blob/31e1a57c41cf9cb600751669dc71bcd9596b45f9/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1459]
> {code:java}
>     private boolean hasValidClusterId(String requestClusterId) {
>         // We don't enforce the cluster id if it is not provided.
>         if (requestClusterId == null)Unknown macro: {
>                       return true;
>                  }
>                 return clusterId.equals(requestClusterId);
>      }
> .
> .
>     private CompletableFuture handleFetchRequest(
>         RaftRequest.Inbound requestMetadata,
>         long currentTimeMs
>     ) {
>         FetchRequestData request = (FetchRequestData) requestMetadata.data();
>         if (!hasValidClusterId(request.clusterId())) {
>                      return completedFuture(new 
> FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
>                  }
> .
> .
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR:modify scala $var to java string [kafka]

2025-04-05 Thread via GitHub


chia7712 commented on code in PR #18393:
URL: https://github.com/apache/kafka/pull/18393#discussion_r2029925512


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java:
##
@@ -470,8 +470,8 @@ public FetchDataInfo read(long startOffset,
 return maybeHandleIOException(
 () -> "Exception while reading from " + topicPartition + " in 
dir " + dir.getParent(),
 () -> {
-logger.trace("Reading maximum $maxLength bytes at offset 
{} from log with total length {} bytes",
-startOffset, segments.sizeInBytes());
+logger.trace("Reading maximum {} bytes at offset {} from 
log with total length {} bytes",
+maxLength, startOffset, segments.sizeInBytes());

Review Comment:
   nit: Since `segments.sizeInBytes()` iterates through all segments to 
calculate the size, consider adding an `isTraceEnabled` flag to avoid 
unnecessary loops when trace logging is disabled by default.
   
   @gongxuanzhang WDYT? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-10357) Handle accidental deletion of repartition-topics as exceptional failure

2025-04-05 Thread Kevin Apolinario (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17940334#comment-17940334
 ] 

Kevin Apolinario commented on KAFKA-10357:
--

Picking this one up to finish whats left

> Handle accidental deletion of repartition-topics as exceptional failure
> ---
>
> Key: KAFKA-10357
> URL: https://issues.apache.org/jira/browse/KAFKA-10357
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: needs-kip, new-streams-runtime-should-fix
>
> Repartition topics are both written by Stream's producer and read by Stream's 
> consumer, so when they are accidentally deleted both clients may be notified. 
> But in practice the consumer would react to it much quicker than producer 
> since the latter has a delivery timeout expiration period (see 
> https://issues.apache.org/jira/browse/KAFKA-10356). When consumer reacts to 
> it, it will re-join the group since metadata changed and during the triggered 
> rebalance it would auto-recreate the topic silently and continue, causing 
> data lost silently. 
> One idea, is to only create all repartition topics *once* in the first 
> rebalance and not auto-create them any more in future rebalances, instead it 
> would be treated similar as INCOMPLETE_SOURCE_TOPIC_METADATA error code 
> (https://issues.apache.org/jira/browse/KAFKA-10355).
> The challenge part would be, how to determine if it is the first-ever 
> rebalance, and there are several wild ideas I'd like to throw out here:
> 1) change the thread state transition diagram so that STARTING state would 
> not transit to PARTITION_REVOKED but only to PARTITION_ASSIGNED, then in the 
> assign function we can check if the state is still in CREATED and not RUNNING.
> 2) augment the subscriptionInfo to encode whether or not this is the first 
> time ever rebalance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16394: Fix null propagation in foreign key join result [kafka]

2025-04-05 Thread via GitHub


mjsax commented on code in PR #15607:
URL: https://github.com/apache/kafka/pull/15607#discussion_r2026210742


##
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##
@@ -644,16 +645,16 @@ public void 
shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated(final boolean left
 
 left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 2);
 {
-final Map expected = mkMap(
-mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")
+final List> expected = 
Collections.singletonList(
+KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)")
 );
 assertThat(
-outputTopic.readKeyValuesToMap(),
+outputTopic.readKeyValuesToList(),
 is(expected)
 );
 if (materialized) {
 assertThat(
-asMap(store),
+asList(store),

Review Comment:
   as above



##
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##
@@ -712,12 +713,12 @@ public void 
shouldEmitRecordOnNullForeignKeyForLeftJoins(final String optimizati
 
 left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
 {
-final Map expected = mkMap(
-mkEntry("lhs1", "(lhsValue1|rhs1,null)")
+final List> expected = 
Collections.singletonList(
+KeyValue.pair("lhs1", "(lhsValue1|rhs1,null)")
 );
-assertThat(outputTopic.readKeyValuesToMap(), is(expected));
+assertThat(outputTopic.readKeyValuesToList(), is(expected));
 if (materialized) {
-assertThat(asMap(store), is(expected));
+assertThat(asList(store), is(expected));

Review Comment:
   as above



##
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##
@@ -423,15 +428,15 @@ public void 
shouldEmitTombstoneWhenDeletingNonJoiningRecords(final boolean leftJ
 left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
 
 {
-final Map expected =
-leftJoin ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)")) 
: emptyMap();
+final List> expected =
+leftJoin ? Collections.singletonList(KeyValue.pair("lhs1", 
"(lhsValue1|rhs1,null)")) : emptyList();
 assertThat(
-outputTopic.readKeyValuesToMap(),
+outputTopic.readKeyValuesToList(),
 is(expected)
 );
 if (materialized) {
 assertThat(
-asMap(store),
+asList(store),

Review Comment:
   as above



##
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##
@@ -753,23 +754,23 @@ public void shouldEmitRecordWhenOldAndNewFkDiffer(final 
String optimization,
 final Bytes key = subscriptionStoreKey("lhs1", "rhs1");
 left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
 {
-final Map expected = mkMap(
-mkEntry("lhs1", "(lhsValue1|rhs1,null)")
+final List> expected = 
Collections.singletonList(
+KeyValue.pair("lhs1", "(lhsValue1|rhs1,null)")
 );
-assertThat(outputTopic.readKeyValuesToMap(), is(expected));
+assertThat(outputTopic.readKeyValuesToList(), is(expected));
 if (materialized) {
-assertThat(asMap(store), is(expected));
+assertThat(asList(store), is(expected));

Review Comment:
   as above



##
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##
@@ -307,12 +310,13 @@ public void doJoinFromRightThenDeleteRightEntity(final 
boolean leftJoin,
 left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 2);
 
 assertThat(
-outputTopic.readKeyValuesToMap(),
+outputTopic.readKeyValuesToList(),
 is(leftJoin
-? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
-mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
-mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
-: emptyMap()
+? Arrays.asList(
+KeyValue.pair("lhs1", "(lhsValue1|rhs1,null)"),

Review Comment:
   nit: indention



##
streams/integration-tests/src/test/java/org/apache/kafka/streams/i

[jira] [Created] (KAFKA-19078) Implement automatic controller addition to cluster metadata partition

2025-04-05 Thread Jira
José Armando García Sancio created KAFKA-19078:
--

 Summary: Implement automatic controller addition to cluster 
metadata partition
 Key: KAFKA-19078
 URL: https://issues.apache.org/jira/browse/KAFKA-19078
 Project: Kafka
  Issue Type: Sub-task
  Components: controller, kraft
Reporter: José Armando García Sancio
Assignee: Kevin Wu


Feature design: 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217391519#KIP853:KRaftControllerMembershipChanges-Controllerautojoining



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16260: Deprecate window.size.ms and window.inner.class.serde in StreamsConfig [kafka]

2025-04-05 Thread via GitHub


lucasbru commented on code in PR #18297:
URL: https://github.com/apache/kafka/pull/18297#discussion_r2010879588


##
streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedDeserializer.java:
##
@@ -23,10 +23,20 @@
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.state.internals.SessionKeySchema;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Map;
 
 public class SessionWindowedDeserializer implements 
Deserializer> {
 
+/**
+ * Configuration key for the windowed inner deserializer class.

Review Comment:
   Can we put any more information into this (and the other) java doc comments 
about the constants? Since this is a public string constant now, the use of the 
constant should ideally be clear from the javadoc comments.



##
docs/upgrade.html:
##
@@ -430,6 +430,9 @@ Nota
  See https://cwiki.apache.org/confluence/x/B40ODg";>KIP-890 and
 https://cwiki.apache.org/confluence/x/8ItyEg";>KIP-1050 for more 
details 
 
+
+The window.size.ms and 
window.inner.serde.class in stream config are deprecated.

Review Comment:
   This should not be in the 4.0 section.
   



##
docs/upgrade.html:
##
@@ -430,6 +430,9 @@ Nota
  See https://cwiki.apache.org/confluence/x/B40ODg";>KIP-890 and
 https://cwiki.apache.org/confluence/x/8ItyEg";>KIP-1050 for more 
details 
 
+
+The window.size.ms and 
window.inner.serde.class in stream config are deprecated.

Review Comment:
   ```suggestion
   The window.size.ms and 
window.inner.serde.class in `StreamsConfig` are deprecated. Use 
the corresponding string constants defined in `TimeWindowedSerializer`, 
`TimeWindowedDeserializer`, `SessionWindowedSerializer` and 
`SessionWindowedDeserializer` instead.
   ```
   
   Something like that? Could also be the actual string constants. But it's 
good to make these notes actionable.



##
streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java:
##
@@ -38,32 +48,42 @@ public TimeWindowedSerializer(final Serializer inner) {
 this.inner = inner;
 }
 
-@SuppressWarnings("unchecked")
+@SuppressWarnings({"deprecation", "unchecked"})
 @Override
 public void configure(final Map configs, final boolean isKey) {
-final String windowedInnerClassSerdeConfig = (String) 
configs.get(StreamsConfig.WINDOWED_INNER_CLASS_SERDE);
-Serde windowInnerClassSerde = null;
-if (windowedInnerClassSerdeConfig != null) {
+String serializerConfigFrom = WINDOWED_INNER_SERIALIZER_CLASS;

Review Comment:
   variable name suggestion: serializerConfigKey
   
   Also in other places



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -823,13 +827,28 @@ public class StreamsConfig extends AbstractConfig {
 + CONFIG_ERROR_MSG
 + "\"NO_OPTIMIZATION\" by default.";
 
-/** {@code windowed.inner.class.serde} */
+/**
+ * {@code windowed.inner.class.serde}
+ *
+ * @deprecated since 4.0.0.

Review Comment:
   4.1.0



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -823,13 +827,28 @@ public class StreamsConfig extends AbstractConfig {
 + CONFIG_ERROR_MSG
 + "\"NO_OPTIMIZATION\" by default.";
 
-/** {@code windowed.inner.class.serde} */
+/**
+ * {@code windowed.inner.class.serde}
+ *
+ * @deprecated since 4.0.0.
+ * Use {@link TimeWindowedSerializer#WINDOWED_INNER_SERIALIZER_CLASS} for 
{@link TimeWindowedSerializer}.
+ * Use {@link TimeWindowedDeserializer#WINDOWED_INNER_DESERIALIZER_CLASS} 
for {@link TimeWindowedDeserializer}.
+ * Use {@link SessionWindowedSerializer#WINDOWED_INNER_SERIALIZER_CLASS} 
for {@link SessionWindowedSerializer}.
+ * Use {@link 
SessionWindowedDeserializer#WINDOWED_INNER_DESERIALIZER_CLASS} for {@link 
SessionWindowedDeserializer}.
+ */
+@Deprecated
 public static final String WINDOWED_INNER_CLASS_SERDE = 
"windowed.inner.class.serde";
 private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default 
serializer / deserializer for the inner class of a windowed record. Must 
implement the " +
 "org.apache.kafka.common.serialization.Serde interface. 
Note that setting this config in KafkaStreams application would result " +
 "in an error as it is meant to be used only from Plain consumer 
client.";
 
-/** {@code window.size.ms} */
+/**
+ * {@code window.size.ms}
+ *
+ * @deprecated since 4.0.0.

Review Comment:
   4.1.0



##
streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java:
##
@@ -38,32 +48,42 @@ public TimeWindowedSerializer(final Serializer i

[PR] MINOR: remove unused function BrokerRegistration#isMigratingZkBroker [kafka]

2025-04-05 Thread via GitHub


FrankYang0529 opened a new pull request, #19330:
URL: https://github.com/apache/kafka/pull/19330

   The `BrokerRegistration#isMigratingZkBroker` is not used by any production 
function. Remove it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18826: Add global thread metrics [kafka]

2025-04-05 Thread via GitHub


bbejeck commented on PR #18953:
URL: https://github.com/apache/kafka/pull/18953#issuecomment-2772541530

   Merged #18953 into trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-18874) KRaft controller does not retry registration if the first attempt times out

2025-04-05 Thread Daniel Fonai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Fonai updated KAFKA-18874:
-
Attachment: controller-3.log
controller-4.log
controller-5.log

> KRaft controller does not retry registration if the first attempt times out
> ---
>
> Key: KAFKA-18874
> URL: https://issues.apache.org/jira/browse/KAFKA-18874
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 3.9.0
>Reporter: Daniel Fonai
>Priority: Minor
> Attachments: controller-3.log, controller-4.log, controller-5.log
>
>
> There is a [retry 
> mechanism|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala#L274]
>  with exponential backoff built-in in KRaft controller registration. The 
> timeout of the first attempt is 5 s for KRaft controllers 
> ([code|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerServer.scala#L448])
>  which is not configurable.
> If for some reason the controller's first registration request times out, the 
> attempt should be retried but in practice this does not happen and the 
> controller is not able to join the quorum. We see the following in the faulty 
> controller's log:
> {noformat}
> 2025-02-21 13:31:46,606 INFO [ControllerRegistrationManager id=3 
> incarnation=mEzjHheAQ_eDWejAFquGiw] sendControllerRegistration: attempting to 
> send ControllerRegistrationRequestData(controllerId=3, 
> incarnationId=mEzjHheAQ_eDWejAFquGiw, zkMigrationReady=true, 
> listeners=[Listener(name='CONTROLPLANE-9090', 
> host='kraft-rollback-kafka-controller-pool-3.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-631e64ac.svc',
>  port=9090, securityProtocol=1)], features=[Feature(name='kraft.version', 
> minSupportedVersion=0, maxSupportedVersion=1), 
> Feature(name='metadata.version', minSupportedVersion=1, 
> maxSupportedVersion=21)]) (kafka.server.ControllerRegistrationManager) 
> [controller-3-registration-manager-event-handler]
> ...
> 2025-02-21 13:31:51,627 ERROR [ControllerRegistrationManager id=3 
> incarnation=mEzjHheAQ_eDWejAFquGiw] RegistrationResponseHandler: channel 
> manager timed out before sending the request. 
> (kafka.server.ControllerRegistrationManager) 
> [controller-3-to-controller-registration-channel-manager]
> 2025-02-21 13:31:51,726 INFO [ControllerRegistrationManager id=3 
> incarnation=mEzjHheAQ_eDWejAFquGiw] maybeSendControllerRegistration: waiting 
> for the previous RPC to complete. 
> (kafka.server.ControllerRegistrationManager) 
> [controller-3-registration-manager-event-handler]
> {noformat}
> After this we can not see any controller retry in the log.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17554: Flaky testFutureCompletionOutsidePoll in ConsumerNetworkClientTest [kafka]

2025-04-05 Thread via GitHub


github-actions[bot] commented on PR #18298:
URL: https://github.com/apache/kafka/pull/18298#issuecomment-2746781466

   A label of 'needs-attention' was automatically added to this PR in order to 
raise the
   attention of the committers. Once this issue has been triaged, the `triage` 
label
   should be removed to prevent this automation from happening again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-19026) AlterConfigPolicy incompatibility between ZK mode and KRaft mode when using AlterConfigOp.OpType.SUBTRACT

2025-04-05 Thread Edoardo Comar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-19026:
--
Attachment: (was: KAFKA19026Test.java)

> AlterConfigPolicy incompatibility between ZK mode and KRaft mode when using 
> AlterConfigOp.OpType.SUBTRACT
> -
>
> Key: KAFKA-19026
> URL: https://issues.apache.org/jira/browse/KAFKA-19026
> Project: Kafka
>  Issue Type: Bug
>  Components: core, migration
>Affects Versions: 3.9.0, 3.8.1
>Reporter: Edoardo Comar
>Priority: Major
> Attachments: KAFKA19026Policy.java, KAFKA19026Test.java
>
>
> When processing an Incremental Alter Config on a Config entry of type List 
> with OpType.SUBTRACT
> the metadata passed to  {color:#00}AlterConfigPolicy.validate contains 
> {color}
>  * {color:#00}in KRaft mode : {color}{color:#00}the config that would 
> result AFTER the subtraction{color}
>  * {color:#00}in ZK mode : the config as if the opType was OpType.SET, 
> with no indication that actually the value would be removed{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Deduplicate topics of a topology for authorization check [kafka]

2025-04-05 Thread via GitHub


cadonna opened a new pull request, #19352:
URL: https://github.com/apache/kafka/pull/19352

   With the new Streams rebalance protocol, the Streams client sends the 
topology with the used topics to the broker for initialization. For the 
initialization the broker needs to describe the topics in the topology and 
consequently the Streams application needs to be authorized to describe the 
topics.
   
   The broker checks the authorization by filtering the topics in the topology 
by authorization. This filtering implicitly deduplicates the topics of the 
topology if they appear multiple times in the topology send to the brokers. 
After that the broker compares the size of the authorized topics with the 
topics in the topology. If the authorized topics are less than the topics in 
the topology a TOPIC_AUTHORIZATION_FAILED error is returned.
   
   In Streams a topology that is sent to the brokers likely has duplicate 
topics because a repartition topic appears as a sink for one subtopology and as 
a source for another subtopology.
   
   This commit deduplicates the topics of a topology before the verification of 
the authorization.
   
   Delete this text and replace it with a detailed description of your change. 
The 
   PR title and body will become the squashed commit message.
   
   If you would like to tag individuals, add some commentary, upload images, or
   include other supplemental information that should not be part of the 
eventual
   commit message, please use a separate comment.
   
   If applicable, please include a summary of the testing strategy (including 
   rationale) for the proposed change. Unit and/or integration tests are 
expected
   for any behavior change and system tests should be considered for larger
   changes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16616: refactor mergeWith and updatePartitionLeadership [kafka]

2025-04-05 Thread via GitHub


lorcanj commented on code in PR #19199:
URL: https://github.com/apache/kafka/pull/19199#discussion_r2004919701


##
clients/src/main/java/org/apache/kafka/clients/MetadataSnapshot.java:
##
@@ -173,12 +173,15 @@ MetadataSnapshot mergeWith(String newClusterId,
 
 Map newMetadataByPartition = new 
HashMap<>(addPartitions.size());
 
-// We want the most recent topic ID. We start with the previous ID 
stored for retained topics and then
-// update with newest information from the MetadataResponse. We always 
take the latest state, removing existing
-// topic IDs if the latest state contains the topic name but not a 
topic ID.
-Map newTopicIds = this.topicIds.entrySet().stream()
-.filter(entry -> shouldRetainTopic.test(entry.getKey()))
-.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+Map newTopicIds = new HashMap<>(this.topicIds.size());
+
+for (Map.Entry entry : 
metadataByPartition.entrySet()) {
+if (shouldRetainTopic.test(entry.getKey().topic())) {
+newMetadataByPartition.put(entry.getKey(), entry.getValue());

Review Comment:
   I think I changed it because previously it was primarily using 
`putIfAbsent()` to ensure that the newest information from the MetadataResponse 
wasn't being overwritten, which is now different because of the code re-order 
where this is now done first.
   
   I guess if there are duplicate entries there isn't any need to put if there 
is an entry for the topic partition, but I don't have a full grasp of what this 
map would look like populated with real-world data. I think I tested with both 
and it didn't seem to affect the unit tests.
   
   Happy to go with either approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14579) Move DumpLogSegments to tools

2025-04-05 Thread TaiJuWu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TaiJuWu reassigned KAFKA-14579:
---

Assignee: TaiJuWu  (was: Alexandre Dupriez)

> Move DumpLogSegments to tools
> -
>
> Key: KAFKA-14579
> URL: https://issues.apache.org/jira/browse/KAFKA-14579
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: TaiJuWu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Add Functional Interface annotation to interfaces used by Lambdas [kafka]

2025-04-05 Thread via GitHub


mjsax commented on code in PR #19234:
URL: https://github.com/apache/kafka/pull/19234#discussion_r2025461183


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordBatchingStateRestoreCallback.java:
##
@@ -22,6 +22,7 @@
 
 import java.util.Collection;
 
+@FunctionalInterface

Review Comment:
   As above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-19018,KAFKA-19063: Implement maxRecords and acquisition lock timeout in share fetch request and response resp. [kafka]

2025-04-05 Thread via GitHub


apoorvmittal10 opened a new pull request, #19334:
URL: https://github.com/apache/kafka/pull/19334

   PR add `MaxRecords` to share fetch request and also adds 
`AcquisitionLockTimeout` to share fetch response. PR also removes internal 
broker config of `max.fetch.records`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-19017) Change consumer-config to command-config in verifiable_share_consumer.py

2025-04-05 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield updated KAFKA-19017:
-
Fix Version/s: 4.1.0

> Change consumer-config to command-config in verifiable_share_consumer.py
> 
>
> Key: KAFKA-19017
> URL: https://issues.apache.org/jira/browse/KAFKA-19017
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chirag Wadhwa
>Assignee: Chirag Wadhwa
>Priority: Major
> Fix For: 4.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18713: Fix left join bug by using DELETE_KEY_NO_PROPAGATE [kafka]

2025-04-05 Thread via GitHub


mjsax commented on PR #18887:
URL: https://github.com/apache/kafka/pull/18887#issuecomment-2774410196

   @nilmadhab -- I think we can close this PR in favor of the new PR you opened?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19074: Remove the cached responseData from ShareFetchResponse [kafka]

2025-04-05 Thread via GitHub


TaiJuWu commented on code in PR #19357:
URL: https://github.com/apache/kafka/pull/19357#discussion_r2028466184


##
server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTest.java:
##
@@ -66,6 +79,19 @@ public void testErrorInAllPartitions() {
 assertTrue(shareFetch.errorInAllPartitions());
 }
 
+@Test
+public void testDontCacheAnyData() {
+ShareFetchResponse shareFetch = shareFetchResponse(tidp0, records, 
Errors.NONE, "", (short) 0,
+"", List.of(), 0);
+LinkedHashMap 
responseData = shareFetch.responseData(Map.of(tidp0.topicId(), tidp0.topic()));
+assertEquals(1, responseData.size());
+responseData.forEach((topicIdPartition, partitionData) -> 
assertEquals(records, partitionData.records()));
+
+LinkedHashMap 
nonResponseData = shareFetch.responseData(Map.of());
+assertEquals(0, nonResponseData.size());
+nonResponseData.forEach((topicIdPartition, partitionData) -> 
assertEquals(MemoryRecords.EMPTY, partitionData.records()));

Review Comment:
   Removed it, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-19076) UnifiedLog#maybeHandleIOException should replace `String` by `Supplier`

2025-04-05 Thread Nick Guo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17940362#comment-17940362
 ] 

Nick Guo commented on KAFKA-19076:
--

Hi [~chia7712] ,I’d like to try this out.Thanks!

> UnifiedLog#maybeHandleIOException should replace `String` by 
> `Supplier`
> ---
>
> Key: KAFKA-19076
> URL: https://issues.apache.org/jira/browse/KAFKA-19076
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> the message is used when the function encounters error, so the error message 
> should be created lazy.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18761: [2/N] List share group offsets with state and auth [kafka]

2025-04-05 Thread via GitHub


AndrewJSchofield commented on code in PR #19328:
URL: https://github.com/apache/kafka/pull/19328#discussion_r2027266957


##
clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java:
##
@@ -47,9 +46,10 @@ public ListShareGroupOffsetsSpec 
topicPartitions(Collection topi
 
 /**
  * Returns the topic partitions whose offsets are to be listed for a share 
group.
+ * {@code null} indicates that offsets of all partitions of the group are 
to be listed.
  */
 public Collection topicPartitions() {

Review Comment:
   Not sure, but this is precisely the same behaviour as 
`ListConsumerGroupOffsetsSpec`. Essentially, if you do `new 
ListShareGroupOffsetsSpec()` you get all of them. If you do `new 
ListShareGroupOffsetsSpec().topicPartitions()` then you get a smaller list. 
There's no way to ask for none.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add Functional Interface annotation to interfaces used by Lambdas [kafka]

2025-04-05 Thread via GitHub


Tombert commented on code in PR #19234:
URL: https://github.com/apache/kafka/pull/19234#discussion_r2025675202


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordBatchingStateRestoreCallback.java:
##
@@ -22,6 +22,7 @@
 
 import java.util.Collection;
 
+@FunctionalInterface

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14486: Move LogCleanerManager to storage module [kafka]

2025-04-05 Thread via GitHub


wernerdv commented on PR #19216:
URL: https://github.com/apache/kafka/pull/19216#issuecomment-2742924375

   @junrao @frankvicky Thanks for the comments, I've addresed them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-19068) Eliminate the duplicate type check in creating ControlRecord

2025-04-05 Thread Nick Guo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17940093#comment-17940093
 ] 

Nick Guo commented on KAFKA-19068:
--

Hi [~chia7712] ,I would like to handle this.Thanks!

> Eliminate the duplicate type check in creating ControlRecord
> 
>
> Key: KAFKA-19068
> URL: https://issues.apache.org/jira/browse/KAFKA-19068
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> `RecordsIterator#decodeControlRecord` [0] do the type check and then 
> `ControlRecord` constructor [1] does that again ...
> we should add a static method to ControlRecord to create `ControlRecord` with 
> type check, and then `ControlRecord` constructor should be changed to private 
> to ensure all instance is created by the static method 
> [0] 
> https://github.com/apache/kafka/blob/ccf2510fdda755b7a8e0900d3316fade46eb533a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java#L361
> [1] 
> https://github.com/apache/kafka/blob/ccf2510fdda755b7a8e0900d3316fade46eb533a/raft/src/main/java/org/apache/kafka/raft/ControlRecord.java#L45



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-18827) Initialize share group state impl

2025-04-05 Thread Sushant Mahajan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sushant Mahajan resolved KAFKA-18827.
-
Resolution: Fixed

> Initialize share group state impl
> -
>
> Key: KAFKA-18827
> URL: https://issues.apache.org/jira/browse/KAFKA-18827
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Sushant Mahajan
>Assignee: Sushant Mahajan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18761: [2/N] List share group offsets with state and auth [kafka]

2025-04-05 Thread via GitHub


smjn commented on code in PR #19328:
URL: https://github.com/apache/kafka/pull/19328#discussion_r2024607201


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1365,6 +1368,67 @@ public 
CompletableFuture
 describeShareGroupAllOffsets(
+RequestContext context,
+
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData
+) {
+if (!isActive.get()) {
+return CompletableFuture.completedFuture(
+
DescribeShareGroupOffsetsRequest.getErrorDescribedGroup(requestData.groupId(), 
Errors.COORDINATOR_NOT_AVAILABLE));
+}
+
+if (metadataImage == null) {
+return CompletableFuture.completedFuture(
+
DescribeShareGroupOffsetsRequest.getErrorDescribedGroup(requestData.groupId(), 
Errors.COORDINATOR_NOT_AVAILABLE));
+}
+
+return runtime.scheduleReadOperation(
+"share-group-initialized-partitions",
+topicPartitionFor(requestData.groupId()),
+(coordinator, offset) -> 
coordinator.initializedShareGroupPartitions(requestData.groupId())
+).thenCompose(topicPartitionMap -> {
+Map requestTopicIdToNameMapping = new HashMap<>();
+
List
 describeShareGroupOffsetsResponseTopicList = new 
ArrayList<>(topicPartitionMap.size());
+ReadShareGroupStateSummaryRequestData readSummaryRequestData = new 
ReadShareGroupStateSummaryRequestData()
+.setGroupId(requestData.groupId());
+topicPartitionMap.forEach((topicId, partitionSet) -> {
+String topicName = 
metadataImage.topics().topicIdToNameView().get(topicId);
+if (topicName != null) {
+requestTopicIdToNameMapping.put(topicId, topicName);
+readSummaryRequestData.topics().add(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+.setTopicId(topicId)
+.setPartitions(
+partitionSet.stream().map(
+partitionIndex -> new 
ReadShareGroupStateSummaryRequestData.PartitionData().setPartition(partitionIndex)
+).toList()
+));
+}
+});
+return readShareGroupStateSummary(readSummaryRequestData, 
requestTopicIdToNameMapping, describeShareGroupOffsetsResponseTopicList);
+});

Review Comment:
   lets add an exceptionally as well



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1365,6 +1368,67 @@ public 
CompletableFuture
 describeShareGroupAllOffsets(
+RequestContext context,
+
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData
+) {
+if (!isActive.get()) {
+return CompletableFuture.completedFuture(
+
DescribeShareGroupOffsetsRequest.getErrorDescribedGroup(requestData.groupId(), 
Errors.COORDINATOR_NOT_AVAILABLE));
+}
+
+if (metadataImage == null) {
+return CompletableFuture.completedFuture(
+
DescribeShareGroupOffsetsRequest.getErrorDescribedGroup(requestData.groupId(), 
Errors.COORDINATOR_NOT_AVAILABLE));
+}
+
+return runtime.scheduleReadOperation(
+"share-group-initialized-partitions",
+topicPartitionFor(requestData.groupId()),
+(coordinator, offset) -> 
coordinator.initializedShareGroupPartitions(requestData.groupId())
+).thenCompose(topicPartitionMap -> {
+Map requestTopicIdToNameMapping = new HashMap<>();
+
List
 describeShareGroupOffsetsResponseTopicList = new 
ArrayList<>(topicPartitionMap.size());
+ReadShareGroupStateSummaryRequestData readSummaryRequestData = new 
ReadShareGroupStateSummaryRequestData()
+.setGroupId(requestData.groupId());
+topicPartitionMap.forEach((topicId, partitionSet) -> {
+String topicName = 
metadataImage.topics().topicIdToNameView().get(topicId);
+if (topicName != null) {
+requestTopicIdToNameMapping.put(topicId, topicName);
+readSummaryRequestData.topics().add(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+.setTopicId(topicId)
+.setPartitions(
+partitionSet.stream().map(
+partitionIndex -> new 
ReadShareGroupStateSummaryRequestData.PartitionData().setPartition(partitionIndex)
+).toList()
+));
+}
+});
+return readShareGroupStateSummary(readSummaryRequestData, 
requestTopicIdToNameMapping, describeShareGroupOffsetsResponseTopicList);
+});

Revie

Re: [PR] KAFKA-17182: Consumer fetch sessions are evicted too quickly with AsyncKafkaConsumer [kafka]

2025-04-05 Thread via GitHub


chia7712 commented on code in PR #18795:
URL: https://github.com/apache/kafka/pull/18795#discussion_r2004284732


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##
@@ -432,7 +439,14 @@ protected Map 
prepareFetchRequests()
 // going to be failed anyway before being sent, so skip 
sending the request for now
 log.trace("Skipping fetch for partition {} because node {} is 
awaiting reconnect backoff", partition, node);
 } else if (nodesWithPendingFetchRequests.contains(node.id())) {
+// If there's already an inflight request for this node, don't 
issue another request.
 log.trace("Skipping fetch for partition {} because previous 
request to {} has not been processed", partition, node);
+} else if (bufferedNodes.contains(node.id())) {

Review Comment:
   Sorry that I don't follow whole discussion of this PR, but I have a small 
question for this condition.
   
   Is it impossible for server A to have a pending fetch request when the 
consumer is still processing records from server A? If so, does this constraint 
reduce concurrency?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] Cleanup Server Common Module [kafka]

2025-04-05 Thread via GitHub


chia7712 commented on PR #19085:
URL: https://github.com/apache/kafka/pull/19085#issuecomment-2742992871

   @sjhajharia could you please rebase code? there is a build error if I apply 
this PR on the trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Trying again [kafka-merge-queue-sandbox]

2025-04-05 Thread via GitHub


mumrah merged PR #68:
URL: https://github.com/apache/kafka-merge-queue-sandbox/pull/68


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-17431) KRaft servers require valid static socketserver configuration to start

2025-04-05 Thread Kevin Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Wu resolved KAFKA-17431.
--
Fix Version/s: 4.1.0
   Resolution: Fixed

> KRaft servers require valid static socketserver configuration to start
> --
>
> Key: KAFKA-17431
> URL: https://issues.apache.org/jira/browse/KAFKA-17431
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Kevin Wu
>Priority: Major
> Fix For: 4.1.0
>
>
> KRaft servers require a valid static socketserver configuration to start. 
> However, it would be better if we could support invalid static 
> configurations, as long as there were dynamically set changes that made them 
> valid. This will require reworking startup somewhat so that we start the 
> socket server later.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Add documentation about KIP-405 remote reads serving just one partition per FetchRequest [kafka]

2025-04-05 Thread via GitHub


showuon commented on code in PR #19336:
URL: https://github.com/apache/kafka/pull/19336#discussion_r2026363709


##
docs/ops.html:
##
@@ -4009,7 +4009,7 @@ In order to migrate from ZooKeeper to KRaft you need to use a bridge 
release. The last bridge release is Kafka 3.9.
 See the ZooKeeper to KRaft 
Migration steps in the 3.9 documentation.
 
-6.9 Tiered Storage
+6.9 Tiered Storage

Review Comment:
   Nice find!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18891: KIP-877 add support for RemoteLogMetadataManager and RemoteStorageManager [kafka]

2025-04-05 Thread via GitHub


mimaison commented on code in PR #19286:
URL: https://github.com/apache/kafka/pull/19286#discussion_r2013965436


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -407,14 +413,17 @@ private void configureRLMM() {
 rlmmProps.put(LOG_DIR_CONFIG, logDir);
 rlmmProps.put("cluster.id", clusterId);
 
-remoteLogMetadataManager.configure(rlmmProps);
+remoteLogMetadataManagerPlugin.get().configure(rlmmProps);
 }
 
 public void startup() {
 // Initialize and configure RSM and RLMM. This will start RSM, RLMM 
resources which may need to start resources
 // in connecting to the brokers or remote storages.
 configureRSM();
 configureRLMM();
+// the withPluginMetrics() method will be called when the plugin is 
instantiated (after configure() if the plugin also implements Configurable)

Review Comment:
   This is annoying.
   Is there a good reason RSM and RLMM are not configured when they are created?
   
   Currently the logic in `BrokerServer` first creates the `RemoteLogManager` 
instance, then optionally calls `onEndPointCreated()`, before finally calling 
`startup()`.
   
   It looks like we could compute the `Endpoint` object before creating the 
`RemoteLogManager` instance and pass it directly in the constructor as 
`Optional` instead of using `onEndPointCreated()`. That way in the 
`RemoteLogManager` constructor, we could instantiate and immediately configure 
the RSM and RLMM instances.
   
   This would allow us to delete `onEndPointCreated()` (it's only once used in 
`BrokerServer`) and streamline the creation of `RemoteLogManager`.
   
   @showuon @satishd WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: leverage preProcessParsedConfig within AbstractConfig [kafka]

2025-04-05 Thread via GitHub


m1a2st commented on code in PR #19259:
URL: https://github.com/apache/kafka/pull/19259#discussion_r2007871072


##
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##
@@ -114,6 +114,15 @@ public void testOriginalsWithPrefix() {
 assertEquals(expected, originalsWithPrefix);
 }
 
+@Test
+public void testPreprocessConfig() {
+Properties props = new Properties();
+props.put("foo.bar", "abc");
+props.put("setting", "def");
+TestConfig config = new TestConfig(props);
+assertEquals("success", config.get("preprocess"));

Review Comment:
   Could you also check `foo.bar` and `setting` are not in the map?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18713: Fix left join bug by using DELETE_KEY_NO_PROPAGATE [kafka]

2025-04-05 Thread via GitHub


nilmadhab commented on PR #18887:
URL: https://github.com/apache/kafka/pull/18887#issuecomment-2774464449

   Yes, sure
   
   Regards,
   N. Mondal
   +31687153557
   Amsterdam, Netherlands
   
   
   
   On Thu, 3 Apr 2025 at 05:49, Matthias J. Sax ***@***.***>
   wrote:
   
   > @nilmadhab  -- I think we can close this PR
   > in favor of the new PR you opened?
   >
   > —
   > Reply to this email directly, view it on GitHub
   > , or
   > unsubscribe
   > 

   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   > [image: mjsax]*mjsax* left a comment (apache/kafka#18887)
   > 
   >
   > @nilmadhab  -- I think we can close this PR
   > in favor of the new PR you opened?
   >
   > —
   > Reply to this email directly, view it on GitHub
   > , or
   > unsubscribe
   > 

   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Minor: Add functionalinterface to the producer callback [kafka]

2025-04-05 Thread via GitHub


Tombert commented on code in PR #19366:
URL: https://github.com/apache/kafka/pull/19366#discussion_r2027542000


##
clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgementCommitCallback.java:
##
@@ -34,6 +34,7 @@
  * The callback may be executed in any thread calling {@link 
ShareConsumer#poll(java.time.Duration)}.
  */
 @InterfaceStability.Evolving
+@FunctionalInterface

Review Comment:
   Forgive me a bit here, I'm still kind of learning the Kafka codebase, but I 
just think that the word "callback" itself implies some form of "response 
handler" function.  I will stay within Kafka's nomenclature, I just thought 
that that was more or less the terminology.
   
   `ConsumerRebalanceListener` can't be made into a `@FunctionalInterface` in 
its current state because it has more than one method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Another change but this time with a long title. What happens if the title is over 72 characters? I guess we'll find out. [kafka-merge-queue-sandbox]

2025-04-05 Thread via GitHub


mumrah merged PR #66:
URL: https://github.com/apache/kafka-merge-queue-sandbox/pull/66


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-19012) Messages ending up on the wrong topic

2025-04-05 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-19012:
--
Component/s: clients
 producer 

> Messages ending up on the wrong topic
> -
>
> Key: KAFKA-19012
> URL: https://issues.apache.org/jira/browse/KAFKA-19012
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.2.3, 3.8.1
>Reporter: Donny Nadolny
>Priority: Major
>
> We're experiencing messages very occasionally ending up on a different topic 
> than what they were published to. That is, we publish a message to topicA and 
> consumers of topicB see it and fail to parse it because the message contents 
> are meant for topicA. This has happened for various topics. 
> We've begun adding a header with the intended topic (which we get just by 
> reading the topic from the record that we're about to pass to the OSS client) 
> right before we call producer.send, this header shows the correct topic 
> (which also matches up with the message contents itself). Similarly we're 
> able to use this header and compare it to the actual topic to prevent 
> consuming these misrouted messages, but this is still concerning.
> Some details:
>  - This happens rarely: it happened approximately once per 10 trillion 
> messages for a few months, though there was a period of a week or so where it 
> happened more frequently (once per 1 trillion messages or so)
>  - It often happens in a small burst, eg 2 or 3 messages very close in time 
> (but from different hosts) will be misrouted
>  - It often but not always coincides with some sort of event in the cluster 
> (a broker restarting or being replaced, network issues causing errors, etc). 
> Also these cluster events happen quite often with no misrouted messages
>  - We run many clusters, it has happened for several of them
>  - There is no pattern between intended and actual topic, other than the 
> intended topic tends to be higher volume ones (but I'd attribute that to 
> there being more messages published -> more occurrences affecting it rather 
> than it being more likely per-message)
>  - It only occurs with clients that are using a non-zero linger
>  - Once it happened with two sequential messages, both were intended for 
> topicA but both ended up on topicB, published by the same host (presumably 
> within the same linger batch)
>  - Most of our clients are 3.2.3 and it has only affected those, most of our 
> brokers are 3.2.3 but it has also happened with a cluster that's running 
> 3.8.1 (but I suspect a client rather than broker problem because of it never 
> happening with clients that use 0 linger)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15931: Cancel RemoteLogReader gracefully [kafka]

2025-04-05 Thread via GitHub


jeqo opened a new pull request, #19331:
URL: https://github.com/apache/kafka/pull/19331

   Backports f24945b519005c0bc7a28db2db7aae6cec158927 to 4.0
   
   Instead of reopening the transaction index, it cancels the RemoteFetchTask 
without interrupting it--avoiding to close the TransactionIndex channel.
   
   This will lead to complete the execution of the remote fetch but ignoring
   the results. Given that this is considered a rare case, we could live with 
this. If it becomes a performance issue, it could be optimized.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19084: Port KAFKA-16224, KAFKA-16764 for ShareConsumers [kafka]

2025-04-05 Thread via GitHub


smjn commented on code in PR #19369:
URL: https://github.com/apache/kafka/pull/19369#discussion_r2028490277


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##
@@ -971,9 +971,11 @@ private void 
handlePartitionError(ShareAcknowledgeResponseData.PartitionData par
   AtomicBoolean shouldRetry) {
 if (partitionError.exception() != null) {
 boolean retry = false;
-if (partitionError == Errors.NOT_LEADER_OR_FOLLOWER || 
partitionError == Errors.FENCED_LEADER_EPOCH) {
+if (partitionError == Errors.NOT_LEADER_OR_FOLLOWER || 
partitionError == Errors.FENCED_LEADER_EPOCH || partitionError == 
Errors.UNKNOWN_TOPIC_OR_PARTITION) {

Review Comment:
   could we add unit tests to verify this?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##
@@ -937,7 +938,7 @@ private void sendAcknowledgementsAndLeaveGroup(final Timer 
timer, final AtomicRe
 // If users have fatal error, they will get some exceptions in the 
background queue.
 // When running unsubscribe, these exceptions should be ignored, 
or users can't unsubscribe successfully.
 processBackgroundEvents(unsubscribeEvent.future(), timer, e -> (e 
instanceof GroupAuthorizationException
-|| e instanceof TopicAuthorizationException));
+|| e instanceof TopicAuthorizationException || e instanceof 
InvalidTopicException));

Review Comment:
   same as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-17354) StreamThread::setState race condition causes java.lang.RuntimeException: State mismatch PENDING_SHUTDOWN different from STARTING

2025-04-05 Thread Ao Li (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939350#comment-17939350
 ] 

Ao Li edited comment on KAFKA-17354 at 3/28/25 11:41 PM:
-

We recently released the Fray Gradle 
plugin(https://github.com/cmu-pasta/fray?tab=readme-ov-file#gradle), which 
allows developers to use Fray easily. I added the Fray debugger to PR/18675 and 
ran the test using Fray, but I still saw the failure. 

You may find how I did this here: 
https://github.com/aoli-al/kafka/tree/KAFKA-17379 and this commit 
https://github.com/aoli-al/kafka/commit/bdf4033b67cc7e65bc9b44f846e8cd38d2256c60

Our extension still has some limitations, such as missing support of 
@ParameterizedTest or timed operations. So it requires a few code changes to 
the existing tests. 



was (Author: JIRAUSER306156):
We recently released the Fray Gradle plugin, which allows developers to use 
Fray easily. I added the Fray debugger to PR/18675 and ran the test using Fray, 
but I still saw the failure. 

You may find how I did this here: 
https://github.com/aoli-al/kafka/tree/KAFKA-17379 and this commit 
https://github.com/aoli-al/kafka/commit/bdf4033b67cc7e65bc9b44f846e8cd38d2256c60

Our extension still has some limitations, such as missing support of 
@ParameterizedTest or timed operations. So it requires a few code changes to 
the existing tests. 


> StreamThread::setState race condition causes java.lang.RuntimeException: 
> State mismatch PENDING_SHUTDOWN different from STARTING
> 
>
> Key: KAFKA-17354
> URL: https://issues.apache.org/jira/browse/KAFKA-17354
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Ao Li
>Assignee: Anton Liauchuk
>Priority: Major
>
> I saw a test failure in `StreamThreadTest::shouldChangeStateAtStartClose`. A 
> race condition in `setState` causes an uncaught exception thrown in 
> `StateListenerStub`. 
> Basically, the function `setState` allows two threads to call 
> `stateListener.onChange` concurrently. 
> This patch will help you to reproduce the failure deterministically. 
> https://github.com/aoli-al/kafka/commit/033a9a33766740e6843effb9beabfdcb3804846b



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14486: Move LogCleanerManager to storage module [kafka]

2025-04-05 Thread via GitHub


wernerdv commented on code in PR #19216:
URL: https://github.com/apache/kafka/pull/19216#discussion_r2007200264


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogToClean.java:
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Objects;
+
+/**
+ * Helper class for a log, its topic/partition, the first cleanable position, 
the first uncleanable dirty position,
+ * and whether it needs compaction immediately.
+ */
+public final class LogToClean implements Comparable {
+private final TopicPartition topicPartition;
+private final UnifiedLog log;
+private final long firstDirtyOffset;
+private final long uncleanableOffset;
+private final boolean needCompactionNow;
+private final long cleanBytes;
+private final long firstUncleanableOffset;
+private final long cleanableBytes;
+private final long totalBytes;
+private final double cleanableRatio;
+
+public LogToClean(
+TopicPartition topicPartition,
+UnifiedLog log,
+long firstDirtyOffset,
+long uncleanableOffset,
+boolean needCompactionNow
+) {
+this.topicPartition = Objects.requireNonNull(topicPartition, 
"topicPartition must not be null");
+this.log = Objects.requireNonNull(log, "log must not be null");
+this.firstDirtyOffset = firstDirtyOffset;
+this.uncleanableOffset = uncleanableOffset;
+this.needCompactionNow = needCompactionNow;
+
+this.cleanBytes = log.logSegments(-1, firstDirtyOffset).stream()
+.mapToLong(LogSegment::size)
+.sum();
+
+var cleanableBytesResult = 
LogCleanerManager.calculateCleanableBytes(log, firstDirtyOffset, 
uncleanableOffset);
+this.firstUncleanableOffset = cleanableBytesResult.getKey();
+this.cleanableBytes = cleanableBytesResult.getValue();
+
+this.totalBytes = cleanBytes + cleanableBytes;
+this.cleanableRatio = (double) cleanableBytes / totalBytes;
+}
+
+public TopicPartition topicPartition() {
+return topicPartition;
+}
+
+public UnifiedLog log() {
+return log;
+}
+
+public long firstDirtyOffset() {
+return firstDirtyOffset;
+}
+
+public long uncleanableOffset() {

Review Comment:
   Yes, I will remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Specify 2.1 as the minimum broker version for clients [kafka]

2025-04-05 Thread via GitHub


ijuma commented on code in PR #19250:
URL: https://github.com/apache/kafka/pull/19250#discussion_r200354


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -234,9 +229,10 @@
  * successful writes are marked as aborted, hence keeping the transactional 
guarantees.
  * 
  * 
- * This client can communicate with brokers that are version 0.10.0 or newer. 
Older or newer brokers may not support
- * certain client features.  For instance, the transactional APIs need broker 
versions 0.11.0 or later. You will receive an
- * UnsupportedVersionException when invoking an API that is not 
available in the running broker version.
+ * This client can communicate with brokers that are version 2.1 or newer. 
Older brokers may not support
+ * certain client features. For instance, {@code sendOffsetsToTransaction} 
with all consumer group metadata needs broker
+ * versions 2.5 or later. You will receive an 
UnsupportedVersionException when invoking an API that is not

Review Comment:
   Interesting, not sure if this implication was fully understood. I'll check 
what other updates are required due to this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-19003) Add force-terminate feature to kafka-transactions CLI and Admin Client

2025-04-05 Thread Ritika Reddy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ritika Reddy resolved KAFKA-19003.
--
Resolution: Fixed

> Add force-terminate feature to  kafka-transactions CLI and Admin Client
> ---
>
> Key: KAFKA-19003
> URL: https://issues.apache.org/jira/browse/KAFKA-19003
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ritika Reddy
>Assignee: Ritika Reddy
>Priority: Major
>
> The {{kafka-transactions.sh}} tool is going to support a new command 
> forceTerminateTransaction.  It has one required argument {{-transactionalId}} 
> that would take the transactional id for the transaction to be terminated.
> The {{Admin}}  interface will support a new method:
> {{public TerminateTransactionResult forceTerminateTransaction(String 
> transactionalId)}} 
> {{TerminateTransactionResult}} just contains {{KafkaFuture result}} 
> method.
> NOTE that there is an existing {{abortTransaction}}  method that is used to 
> abort “hanging” transactions (artifact of some gaps in the transaction 
> protocol implementation that will be addressed in 
> [KIP-890|https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense],
>  i.e. once part 1 of 
> [KIP-890|https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense]
>  is implemented we won’t have “hanging” transactions).  “Hanging” 
> transactions are not known to the Kafka transaction coordinator, they are 
> just dangling messages in data partitions that cannot be aborted via the 
> normal transaction protocol.  So {{abortTransaction}} actually needs 
> information about data partitions so that it could go and insert markers 
> directly there.
> On the other hand, the {{forceTerminateTransaction}} method would operate on 
> a well-formed, but long running transaction for a given transactional id.  
> Under the covers it would just use {{InitProducerId}} call with 
> {{{}keepPreparedTxn=false{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR VerifableProducer ducktape can set idempotency and retries [kafka]

2025-04-05 Thread via GitHub


josefk31 commented on PR #19362:
URL: https://github.com/apache/kafka/pull/19362#issuecomment-2776483452

   Testing:
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-19030) Remove metricNamePrefix from RequestChannel

2025-04-05 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-19030:
--

 Summary: Remove metricNamePrefix from RequestChannel
 Key: KAFKA-19030
 URL: https://issues.apache.org/jira/browse/KAFKA-19030
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


the `controlPlaneRequestChannelOpt` was removed from kraft mode, so we don't 
need to use the metrics prefix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [WIP] KAFKA-16717 [2/N]: Add AdminClient.alterShareGroupOffsets [kafka]

2025-04-05 Thread via GitHub


JimmyWang6 commented on PR #18929:
URL: https://github.com/apache/kafka/pull/18929#issuecomment-2765640456

   Hi @AndrewJSchofield,
   Sorry for the delay. I've been occupied with other tasks recently, but I 
will start to work on this issue as soon as possible. Much thanks for your help.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix upgrade finalization flag [kafka]

2025-04-05 Thread via GitHub


fvaleri commented on code in PR #19198:
URL: https://github.com/apache/kafka/pull/19198#discussion_r1999330309


##
docs/upgrade.html:
##
@@ -72,10 +72,8 @@ Upgrading 
Servers to 4.0.0 from
 brokers will be running the latest version and you can verify that the 
cluster's behavior and performance meet expectations.
 
 Once the cluster's behavior and performance have been verified, 
finalize the upgrade by running
-
-bin/kafka-features.sh --bootstrap-server localhost:9092 upgrade 
--release-version 4.0
-
-
+bin/kafka-features.sh --bootstrap-server localhost:9092 upgrade 
--release-version 4.0 
+(use --bootstrap-controller when running against a 
controller node).

Review Comment:
   @jsancio thanks for your feedback. Effectively I was laser focused on this 
use case, and forgot to look at the extended documentation. I noticed that all 
the other documented commands don't have `--bootstrap-broker` or 
`--bootstrap-controller`, so I was wondering if that was intentional and we 
should follow the same approach here. Wdyt?
   
   For example, this is what the user gets when running one of the other 
kafka-features documented commands. Then the user can easily choose the 
bootstrap option that fits the specific use case.
   
   ```sh
   $ bin/kafka-features.sh upgrade --metadata 3.9
   usage: kafka-features [-h] [--command-config COMMAND_CONFIG] 
(--bootstrap-server BOOTSTRAP_SERVER | --bootstrap-controller 
BOOTSTRAP_CONTROLLER) 
{describe,upgrade,downgrade,disable,version-mapping,feature-dependencies} ...
   kafka-features: error: one of the arguments --bootstrap-server 
--bootstrap-controller is required
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add Functional Interface annotation to interfaces used by Lambdas [kafka]

2025-04-05 Thread via GitHub


chia7712 commented on code in PR #19234:
URL: https://github.com/apache/kafka/pull/19234#discussion_r2027507368


##
clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java:
##
@@ -26,6 +26,7 @@
  * A callback interface that the user can implement to trigger custom actions 
when a commit request completes. The callback
  * may be executed in any thread calling {@link 
Consumer#poll(java.time.Duration) poll()}.
  */
+@FunctionalInterface
 public interface OffsetCommitCallback {

Review Comment:
   Maybe we can add the annotation to 
`org.apache.kafka.clients.producer.Callback` too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18761: [2/N] List share group offsets with state and auth [kafka]

2025-04-05 Thread via GitHub


smjn commented on code in PR #19328:
URL: https://github.com/apache/kafka/pull/19328#discussion_r2024645145


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3533,19 +3539,74 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
-  private def describeShareGroupOffsetsForGroup(requestContext: RequestContext,
+  private def describeShareGroupAllOffsetsForGroup(requestContext: 
RequestContext,
 groupDescribeOffsetsRequest: 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
   ): 
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
 = {
-groupCoordinator.describeShareGroupOffsets(
+groupCoordinator.describeShareGroupAllOffsets(
   requestContext,
   groupDescribeOffsetsRequest
 
).handle[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
 { (groupDescribeOffsetsResponse, exception) =>
   if (exception != null) {
 new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
   .setGroupId(groupDescribeOffsetsRequest.groupId)
   .setErrorCode(Errors.forException(exception).code)

Review Comment:
   nit: could we declare a new val instead of Errors lookup twice?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18946: Move BrokerReconfigurable and DynamicProducerStateManagerConfig to server module [kafka]

2025-04-05 Thread via GitHub


frankvicky commented on code in PR #19174:
URL: https://github.com/apache/kafka/pull/19174#discussion_r2005054523


##
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##
@@ -605,6 +616,9 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   }
 }
 
+/**
+ * Implement [[org.apache.kafka.server.config.BrokerReconfigurable]] instead.
+ */
 trait BrokerReconfigurable {

Review Comment:
   I don't plan to replace `kafka.server.BrokerReconfigurable` with 
`org.apache.kafka.server.config.BrokerReconfigurable` in this patch.
   IMHO, we should do the replacing when we migrate these subclass of 
`kafka.server.BrokerReconfigurable`. In this way, we could keep minimum changes 
for each patch and eventually remove `kafka.server.BrokerReconfigurable`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-19026) AlterConfigPolicy incompatibility between ZK mode and KRaft mode when using AlterConfigOp.OpType.SUBTRACT

2025-04-05 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17937379#comment-17937379
 ] 

Edoardo Comar commented on KAFKA-19026:
---

Running the attached ClusterTest when in ZK mode the output is (wrongly) :

{{>>>AlterConfigPolicy:AlterConfigPolicy.RequestMetadata(resource=ConfigResource(type=BROKER,
 name='0'), configs=\{ssl.cipher.suites=foo})}}

in the two KRAFT modes , the output is correct :

{{>>>AlterConfigPolicy:AlterConfigPolicy.RequestMetadata(resource=ConfigResource(type=BROKER,
 name='0'), configs=\{ssl.cipher.suites=})}}

 

> AlterConfigPolicy incompatibility between ZK mode and KRaft mode when using 
> AlterConfigOp.OpType.SUBTRACT
> -
>
> Key: KAFKA-19026
> URL: https://issues.apache.org/jira/browse/KAFKA-19026
> Project: Kafka
>  Issue Type: Bug
>  Components: core, migration
>Affects Versions: 3.9.0, 3.8.1
>Reporter: Edoardo Comar
>Priority: Major
> Attachments: KAFKA19026Policy.java, KAFKA19026Test.java
>
>
> When processing an Incremental Alter Config on a Config entry of type List 
> with OpType.SUBTRACT
> the metadata passed to  {color:#00}AlterConfigPolicy.validate contains 
> {color}
>  * {color:#00}in KRaft mode : {color}{color:#00}the config that would 
> result AFTER the subtraction{color}
>  * {color:#00}in ZK mode : the config as if the opType was OpType.SET, 
> with no indication that actually the value would be removed{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18959: increase the num_workers from 9 to 12 [kafka]

2025-04-05 Thread via GitHub


chia7712 merged PR #19274:
URL: https://github.com/apache/kafka/pull/19274


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18913: Removing _state.updater.enabled_ flag through the Stream… [kafka]

2025-04-05 Thread via GitHub


janchilling commented on code in PR #19275:
URL: https://github.com/apache/kafka/pull/19275#discussion_r2021686307


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -801,13 +786,13 @@ void maybeGetClientInstanceIds() {
 }
 } else {
 mainConsumerInstanceIdFuture.completeExceptionally(
-new TimeoutException("Could not retrieve main consumer 
client instance id.")
+new TimeoutException("Could not retrieve main 
consumer client instance id.")
 );
 }
 }
 
 
-if (!stateUpdaterEnabled && 
!restoreConsumerInstanceIdFuture.isDone()) {
+if (!restoreConsumerInstanceIdFuture.isDone()) {

Review Comment:
   Ah yeah makes sense, removed the entire condition and is updated in the 
latest commit. There was another condition where part of it checked for the 
same `!stateUpdaterEnabled && !restoreConsumerInstanceIdFuture.isDone()` 
condition in the latter part of the same method. Remove that part of the check 
since it will always be false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR:modify scala $var to java string [kafka]

2025-04-05 Thread via GitHub


ijuma commented on code in PR #18393:
URL: https://github.com/apache/kafka/pull/18393#discussion_r2029928276


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java:
##
@@ -470,8 +470,8 @@ public FetchDataInfo read(long startOffset,
 return maybeHandleIOException(
 () -> "Exception while reading from " + topicPartition + " in 
dir " + dir.getParent(),
 () -> {
-logger.trace("Reading maximum $maxLength bytes at offset 
{} from log with total length {} bytes",
-startOffset, segments.sizeInBytes());
+logger.trace("Reading maximum {} bytes at offset {} from 
log with total length {} bytes",
+maxLength, startOffset, segments.sizeInBytes());

Review Comment:
   This is a good catch - looks like we regressed here. The Scala logging 
library handled this automatically.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Use readable interface to parse response [kafka]

2025-04-05 Thread via GitHub


soondenana opened a new pull request, #19353:
URL: https://github.com/apache/kafka/pull/19353

   This is the Response side of Request  refactoring PR in commit: 
56d1dc1b6e12a8a325c91749485aa03bce17c7f1
   
   The generated ReequestData classes take Readable as input to parse the 
Response. However, the individual Response objects take ByteBuffer as input and 
thus convert them to Readable using `new ByteBufferAccessor` call. This call is 
in all the Respose classes that create boilderplate code in high tens of these 
classes.
   
   This PR changes the parse method of all the Respnose class to take Readable 
instead so that no such conversion is needed. The clients are the responsible 
for calling the parse method with appropriate object which is majority of the 
cases is already a Readable.
   
   There were couple of places in code where `position` method is used, which 
is not present in the Readable interface, so a new `ReadableBuf` interface is 
created to add these methods. This allows us to not retreive ByteBuffer from 
Readable to use the `position` method.
   
   Rest of the changes are all refactoring with no logical change.
   
   Delete this text and replace it with a detailed description of your change. 
The 
   PR title and body will become the squashed commit message.
   
   If you would like to tag individuals, add some commentary, upload images, or
   include other supplemental information that should not be part of the 
eventual
   commit message, please use a separate comment.
   
   If applicable, please include a summary of the testing strategy (including 
   rationale) for the proposed change. Unit and/or integration tests are 
expected
   for any behavior change and system tests should be considered for larger
   changes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Another small update [kafka-merge-queue-sandbox]

2025-04-05 Thread via GitHub


mumrah opened a new pull request, #65:
URL: https://github.com/apache/kafka-merge-queue-sandbox/pull/65

   Here is a PR with a long description. The following is one long line
   
   Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod 
tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, 
quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo 
consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse 
cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non 
proident, sunt in culpa qui officia deserunt mollit anim id est laborum.
   
   Here is another version with lines broken manually
   
   Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod 
tempor incididunt ut labore 
   et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation 
ullamco laboris nisi ut 
   aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in 
voluptate velit esse cillum 
   dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non 
proident, sunt in culpa qui 
   officia deserunt mollit anim id est laborum.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-19003) Add force-terminate command to kafka-transactions CLI

2025-04-05 Thread Ritika Reddy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ritika Reddy updated KAFKA-19003:
-
Description: 
The {{kafka-transactions.sh}} tool is going to support a new command 
{{{}--forceTerminateTransaction{}}}.  It has one required argument 
{{--transactionalId}} that would take the transactional id for the transaction 
to be terminated.

The {{kafka-acls.sh}} tool is going to support a new {{{}--operation 
TwoPhaseCommit{}}}.

> Add force-terminate command to  kafka-transactions CLI
> --
>
> Key: KAFKA-19003
> URL: https://issues.apache.org/jira/browse/KAFKA-19003
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ritika Reddy
>Assignee: Ritika Reddy
>Priority: Major
>
> The {{kafka-transactions.sh}} tool is going to support a new command 
> {{{}--forceTerminateTransaction{}}}.  It has one required argument 
> {{--transactionalId}} that would take the transactional id for the 
> transaction to be terminated.
> The {{kafka-acls.sh}} tool is going to support a new {{{}--operation 
> TwoPhaseCommit{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17662: config.providers configuration missing from the docs [kafka]

2025-04-05 Thread via GitHub


github-actions[bot] commented on PR #18930:
URL: https://github.com/apache/kafka/pull/18930#issuecomment-2739024832

   A label of 'needs-attention' was automatically added to this PR in order to 
raise the
   attention of the committers. Once this issue has been triaged, the `triage` 
label
   should be removed to prevent this automation from happening again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18613: Add StreamsGroupHeartbeat handler in the group coordinator [kafka]

2025-04-05 Thread via GitHub


lucasbru commented on code in PR #19114:
URL: https://github.com/apache/kafka/pull/19114#discussion_r2012357466


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1452,6 +1533,49 @@ private void throwIfShareGroupHeartbeatRequestIsInvalid(
 }
 }
 
+/**
+ * Validates the request.
+ *
+ * @param request The request to validate.
+ * @throws InvalidRequestException if the request is not valid.
+ * @throws UnsupportedAssignorException if the assignor is not supported.
+ */
+private static void throwIfStreamsGroupHeartbeatRequestIsInvalid(

Review Comment:
   Maybe, but we have to think this through. Even if we let clients crash only 
when they use the same topology epoch, that would mean that scaling down the 
number of input partitions for a stateless application would become impossible. 
I am wondering if it isn't a better course of action to just ignore tasks that 
we do not expect. After all, the client is just saying what he thinks he owns. 
If we'd just ignore those tasks, the next target assignment would not include 
the tasks, so the client would automatically be instructed to revoke whatever 
tasks it thinks it currently has.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Specify 2.1 as the minimum broker version for clients [kafka]

2025-04-05 Thread via GitHub


jolshan commented on code in PR #19250:
URL: https://github.com/apache/kafka/pull/19250#discussion_r2007965891


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -234,9 +229,10 @@
  * successful writes are marked as aborted, hence keeping the transactional 
guarantees.
  * 
  * 
- * This client can communicate with brokers that are version 0.10.0 or newer. 
Older or newer brokers may not support
- * certain client features.  For instance, the transactional APIs need broker 
versions 0.11.0 or later. You will receive an
- * UnsupportedVersionException when invoking an API that is not 
available in the running broker version.
+ * This client can communicate with brokers that are version 2.1 or newer. 
Older brokers may not support
+ * certain client features. For instance, {@code sendOffsetsToTransaction} 
with all consumer group metadata needs broker
+ * versions 2.5 or later. You will receive an 
UnsupportedVersionException when invoking an API that is not

Review Comment:
   Yeah, I don't think it was when it was deprecated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-19080) The constraint on segment.ms is not enforced at topic level

2025-04-05 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-19080:
--

Assignee: 黃竣陽

> The constraint on segment.ms is not enforced at topic level
> ---
>
> Key: KAFKA-19080
> URL: https://issues.apache.org/jira/browse/KAFKA-19080
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 4.0.0
>Reporter: Jun Rao
>Assignee: 黃竣陽
>Priority: Major
>
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations]
>  sets a new constraint (at least 1MB) on segment.bytes. This is implemented 
> in [https://github.com/apache/kafka/pull/18140.] However, it doesn't seem to 
> be enforced at the topic level.
> {code:java}
> bash-3.2$ bin/kafka-configs.sh --alter --bootstrap-server localhost:9092 
> --topic test --add-config segment.bytes=1000
> Completed updating config for topic test.
> bash-3.2$ bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 
> --topic test
> Dynamic configs for topic test are:
>   segment.bytes=1000 sensitive=false 
> synonyms={DYNAMIC_TOPIC_CONFIG:segment.bytes=1000, 
> STATIC_BROKER_CONFIG:log.segment.bytes=1073741824, 
> DEFAULT_CONFIG:log.segment.bytes=1073741824} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >