Re: [PR] KAFKA-17327: Add support of group in kafka-configs.sh [kafka]

2024-08-25 Thread via GitHub


DL1231 commented on code in PR #16887:
URL: https://github.com/apache/kafka/pull/16887#discussion_r1730271755


##
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##
@@ -256,6 +278,32 @@ public void testDynamicBrokerConfigUpdateUsingKraft() 
throws Exception {
 }
 }
 
+@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
+@ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+@ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", 
value = "classic,consumer"),

Review Comment:
   I tried to do it, but failed.
   
![image](https://github.com/user-attachments/assets/cfab879d-7473-4e10-9a6a-408fe530b23b)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-08-25 Thread Balaji Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876480#comment-17876480
 ] 

Balaji Rao commented on KAFKA-16567:


[~cadonna] it seems the remaining metrics
 * 
{quote}restore-total
{quote}
 * 
{quote}restore-rate
{quote}
 * 
{quote}update-total
{quote}
 * 
{quote}update-rate
{quote}
 * 
{quote}restore-remaining-records-total
{quote}

are implemented in this 
[PR.|https://github.com/apache/kafka/pull/13300/files#diff-4df67afac7b840ed6080d7fae931148c2f661af25eaafd83e0f8535ac34a0b71]
 

I was planning to try implementing these metrics. I first checked out 
DefaultStateUpdater because one of the metrics (restore-call-rate) in the KIP 
was defined there. While figuring out how to add the task ID to the new 
metrics, I found this PR.

Initially, I thought the metrics from the PR wouldn't work for state updater 
since they are recorded from StoreChangelogReader (which I had presumed was the 
predecessor to the state updater). But it seems like this shouldn't be an issue 
because the state updater uses StoreChangelogReader for the actual restoration 
? Just want to make sure!

cc: [~lucasbru], who has kindly offered me to help me with this as my first 
contribution to kafka-streams.

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Priority: Major
>  Labels: kip
>
> Add the following metrics to the state updater:
> * restore-total
> * restore-rate
> * update-total
> * update-rate
> * restore-remaining-records-total
> Please see the KIP for more information about the metrics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17053: [Minor] Restructure build.gradle to configure publishing last [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on PR #16950:
URL: https://github.com/apache/kafka/pull/16950#issuecomment-2308746393

   > By restructuring the build.gradle, I meant that the shadowJar block should 
be moved above the publishing block. That is not happening in this PR, only 
changing the fork is.
   
   @KTKTK-HZ WDYT? could you fix it in this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17373: Add print.epoch to kafka-console-share-consumer.sh/kafka-console-consumer.sh [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on code in PR #16987:
URL: https://github.com/apache/kafka/pull/16987#discussion_r1730294085


##
tools/src/main/java/org/apache/kafka/tools/consumer/DefaultMessageFormatter.java:
##
@@ -130,25 +134,31 @@ public void writeTo(ConsumerRecord 
consumerRecord, PrintStream o
 } else {
 output.print("NO_TIMESTAMP");
 }
-writeSeparator(output, printPartition || printOffset || 
printDelivery || printHeaders || printKey || printValue);
+writeSeparator(output, anyTrue(printPartition, printOffset, 
printDelivery, printEpoch, printHeaders, printKey, printValue));

Review Comment:
   How about adding `BooleanExpressionComplexity` to suppression? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17331: Throw UnsupportedVersionException if the data in ListOffsetRequest does NOT fit EarliestLocalSpec and LatestTieredSpec. [kafka]

2024-08-25 Thread via GitHub


chia7712 merged PR #16876:
URL: https://github.com/apache/kafka/pull/16876


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: add ReconfigurableQuorumIntegrationTest [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on PR #16991:
URL: https://github.com/apache/kafka/pull/16991#issuecomment-2308754564

   @cmccabe could you please fix build error?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17053: [Minor] Restructure build.gradle to configure publishing last [kafka]

2024-08-25 Thread via GitHub


KTKTK-HZ commented on PR #16950:
URL: https://github.com/apache/kafka/pull/16950#issuecomment-2308758996

   Hey @gharris1727 @chia7712 , I will modify this PR and refactor build.gradle 
to move the shadowJar block before the publishing block.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17399: Apply LambdaValidator to code base [kafka]

2024-08-25 Thread via GitHub


xijiu commented on PR #16980:
URL: https://github.com/apache/kafka/pull/16980#issuecomment-2308792034

   > @xijiu could you please fix the build error?
   
   @chia7712   It's my fault, I should add the `@SuppressWarnings("unchecked")` 
annotation when I perform forced type conversion, like this:
   ```
   @SuppressWarnings("unchecked")
   final List aliases = (List) value;
   ```
   I have fiexd it, PTAL
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17360) local log retention ms/bytes "-2" is not treated correctly

2024-08-25 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-17360:
---
Fix Version/s: 3.7.2
   3.8.1

> local log retention ms/bytes "-2" is not treated correctly
> --
>
> Key: KAFKA-17360
> URL: https://issues.apache.org/jira/browse/KAFKA-17360
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Critical
> Fix For: 4.0.0, 3.9.0, 3.7.2, 3.8.1
>
>
> # When the local.retention.ms/bytes is set to -2, we didn't replace it with 
> the server-side retention.ms/bytes config, so the -2 local retention won't 
> take effect.
>  # When setting retention.ms/bytes to -2, we can notice this log message:
> {code:java}
> Deleting segment LogSegment(baseOffset=10045, size=1037087, 
> lastModifiedTime=1724040653922, largestRecordTimestamp=1724040653835) due to 
> local log retention size -2 breach. Local log size after deletion will be 
> 13435280. (kafka.log.UnifiedLog) [kafka-scheduler-6]{code}
> This is not helpful for users. We should replace -2 with real retention value 
> when logging.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17360 local log retention ms/bytes "-2" is not treated correctly [kafka]

2024-08-25 Thread via GitHub


chia7712 merged PR #16932:
URL: https://github.com/apache/kafka/pull/16932


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17373: Add print.epoch to kafka-console-share-consumer.sh/kafka-console-consumer.sh [kafka]

2024-08-25 Thread via GitHub


xijiu commented on code in PR #16987:
URL: https://github.com/apache/kafka/pull/16987#discussion_r1730319638


##
tools/src/main/java/org/apache/kafka/tools/consumer/DefaultMessageFormatter.java:
##
@@ -130,25 +134,31 @@ public void writeTo(ConsumerRecord 
consumerRecord, PrintStream o
 } else {
 output.print("NO_TIMESTAMP");
 }
-writeSeparator(output, printPartition || printOffset || 
printDelivery || printHeaders || printKey || printValue);
+writeSeparator(output, anyTrue(printPartition, printOffset, 
printDelivery, printEpoch, printHeaders, printKey, printValue));

Review Comment:
   > How about adding `BooleanExpressionComplexity` to suppression?
   
   Wonderful idea, I have fixed it, PTAL   @chia7712 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17360) local log retention ms/bytes "-2" is not treated correctly

2024-08-25 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876497#comment-17876497
 ] 

Chia-Ping Tsai commented on KAFKA-17360:


the patch is merged to trunk 
(https://github.com/apache/kafka/commit/11966a209a8bb5bbf867b5bb5ca1d60b80e26650)
 and 3.9 
(https://github.com/apache/kafka/commit/6d2b81e07f22086e3e752024168772657294c4d9)

will open another PR to backport the fix_2 to 3.8 and 3.7

> local log retention ms/bytes "-2" is not treated correctly
> --
>
> Key: KAFKA-17360
> URL: https://issues.apache.org/jira/browse/KAFKA-17360
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Critical
> Fix For: 4.0.0, 3.9.0, 3.7.2, 3.8.1
>
>
> # When the local.retention.ms/bytes is set to -2, we didn't replace it with 
> the server-side retention.ms/bytes config, so the -2 local retention won't 
> take effect.
>  # When setting retention.ms/bytes to -2, we can notice this log message:
> {code:java}
> Deleting segment LogSegment(baseOffset=10045, size=1037087, 
> lastModifiedTime=1724040653922, largestRecordTimestamp=1724040653835) due to 
> local log retention size -2 breach. Local log size after deletion will be 
> 13435280. (kafka.log.UnifiedLog) [kafka-scheduler-6]{code}
> This is not helpful for users. We should replace -2 with real retention value 
> when logging.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17418) Fix the incorrect markdown of junit.py caused by newline character

2024-08-25 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17418:
--

 Summary: Fix the incorrect markdown of junit.py caused by newline 
character
 Key: KAFKA-17418
 URL: https://issues.apache.org/jira/browse/KAFKA-17418
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


see 
https://github.com/apache/kafka/actions/runs/10534117374/jobs/21477070226/summary_raw



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17418) Fix the incorrect markdown of junit.py caused by newline character

2024-08-25 Thread kangning.li (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876500#comment-17876500
 ] 

kangning.li commented on KAFKA-17418:
-

Hi [~chia7712] , I am interested in this issue, cloud you assign it to me ?   
Thanks

> Fix the incorrect markdown of junit.py caused by newline character
> --
>
> Key: KAFKA-17418
> URL: https://issues.apache.org/jira/browse/KAFKA-17418
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> see 
> https://github.com/apache/kafka/actions/runs/10534117374/jobs/21477070226/summary_raw



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17418) Fix the incorrect markdown of junit.py caused by newline character

2024-08-25 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-17418:
--

Assignee: kangning.li  (was: Chia-Ping Tsai)

> Fix the incorrect markdown of junit.py caused by newline character
> --
>
> Key: KAFKA-17418
> URL: https://issues.apache.org/jira/browse/KAFKA-17418
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: kangning.li
>Priority: Minor
>
> see 
> https://github.com/apache/kafka/actions/runs/10534117374/jobs/21477070226/summary_raw



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17384) Remove deprecated options in tools

2024-08-25 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876501#comment-17876501
 ] 

Chia-Ping Tsai commented on KAFKA-17384:


[~yangpoan] Could you please add link of jira (and KIP) which deprecates the 
config to each sub task? I have updated KAFKA-17390

> Remove deprecated options in tools
> --
>
> Key: KAFKA-17384
> URL: https://issues.apache.org/jira/browse/KAFKA-17384
> Project: Kafka
>  Issue Type: Improvement
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Minor
>
> There're deprecated options in following tools. We can consider to remove 
> them in 4.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17390:Remove broker-list in GetOffsetShell [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on code in PR #16992:
URL: https://github.com/apache/kafka/pull/16992#discussion_r1730323750


##
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##
@@ -107,12 +107,7 @@ private static class GetOffsetShellOptions extends 
CommandDefaultOptions {
 public GetOffsetShellOptions(String[] args) throws TerseException {
 super(args);
 
-OptionSpec brokerListOpt = parser.accepts("broker-list", 
"DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is 
specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
-.withRequiredArg()
-.describedAs("HOST1:PORT1,...,HOST3:PORT3")
-.ofType(String.class);
 OptionSpec bootstrapServerOpt = 
parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in 
the form HOST1:PORT1,HOST2:PORT2.")

Review Comment:
   Please rename `effectiveBrokerListOpt` to `bootstrapServerOpt`. Also, please 
initialize it in line#110



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-17360 local log retention ms/bytes "-2" is not treated correctly [kafka]

2024-08-25 Thread via GitHub


brandboat opened a new pull request, #16995:
URL: https://github.com/apache/kafka/pull/16995

   related to https://issues.apache.org/jira/browse/KAFKA-17360 (3.8 branch)
   
   When the local.retention.ms/bytes is set to -2,
   we didn't replace it with the server-side retention.ms/bytes config, so the 
-2 local retention won't take effect.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17388: Remove broker-list in VerifiableProducer [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on code in PR #16958:
URL: https://github.com/apache/kafka/pull/16958#discussion_r1730325275


##
tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java:
##
@@ -131,14 +131,6 @@ private static ArgumentParser argParser() {
 .dest("bootstrapServer")
 .help("REQUIRED: The server(s) to connect to. Comma-separated 
list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
 
-connectionGroup.addArgument("--broker-list")

Review Comment:
   @LoganZhuZzz could you please update e2e also? see 
https://github.com/apache/kafka/blob/11966a209a8bb5bbf867b5bb5ca1d60b80e26650/tests/kafkatest/services/verifiable_producer.py#L227
   
   noted that we should use `--bootstrap-server` only if the version >= 2.5. 
otherwise, we should keep using  `--broker-list`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17335: Lack of default for URL encoding configuration for OAuth causes NPE [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on code in PR #16990:
URL: https://github.com/apache/kafka/pull/16990#discussion_r1730326689


##
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java:
##
@@ -209,6 +209,19 @@ public String validateString(String name, boolean 
isRequired) throws ValidateExc
 return value;
 }
 
+/**
+ * Validates that a value, if supplied, is a {@link Boolean}. If no value 
is present in the configuration, a
+ * default value of {@link Boolean#FALSE} is returned.
+ */
+public Boolean validateBoolean(String name) {
+Boolean value = get(name);
+
+if (value != null)
+return value;
+
+return Boolean.FALSE;

Review Comment:
   the other methods take argument `boolean isRequired`. maybe this method 
should follow the pattern?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-17360 local log retention ms/bytes "-2" is not treated correctly [kafka]

2024-08-25 Thread via GitHub


brandboat opened a new pull request, #16996:
URL: https://github.com/apache/kafka/pull/16996

   based on 3.7 branch, related to 
https://issues.apache.org/jira/browse/KAFKA-17360
   
   When the local.retention.ms/bytes is set to -2,
   we didn't replace it with the server-side retention.ms/bytes config, so the 
-2 local retention won't take effect.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix an incorrect message in kafka-consumer-groups.sh when missing necessary options [kafka]

2024-08-25 Thread via GitHub


chia7712 merged PR #16961:
URL: https://github.com/apache/kafka/pull/16961


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove get prefix for internal IQ methods [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on code in PR #16954:
URL: https://github.com/apache/kafka/pull/16954#discussion_r1730329818


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##
@@ -206,29 +206,29 @@ public synchronized Collection 
getAllMetadataForTopology(final
  * if streams is (re-)initializing or {@code null} if the corresponding 
topic cannot be found,
  * or null if no matching metadata could be found.
  */
-public synchronized  KeyQueryMetadata getKeyQueryMetadataForKey(final 
String storeName,
-   final K 
key,
-   final 
Serializer keySerializer) {
+public synchronized  KeyQueryMetadata keyQueryMetadataForKey(final 
String storeName,
+final K 
key,
+final 
Serializer keySerializer) {
 Objects.requireNonNull(keySerializer, "keySerializer can't be null");
 if (topologyMetadata.hasNamedTopologies()) {
 throw new IllegalArgumentException("Cannot invoke the 
getKeyQueryMetadataForKey(storeName, key, keySerializer)"
+ "method when using named 
topologies, please use the overload that"
+ "accepts a topologyName 
parameter to identify the correct store");
 }
-return getKeyQueryMetadataForKey(storeName,
+return keyQueryMetadataForKey(storeName,
  key,
  new 
DefaultStreamPartitioner<>(keySerializer));
 }
 
 /**
- * See {@link StreamsMetadataState#getKeyQueryMetadataForKey(String, 
Object, Serializer)}
+ * See {@link StreamsMetadataState#keyQueryMetadataForKey(String, Object, 
Serializer)}
  */
-public synchronized  KeyQueryMetadata getKeyQueryMetadataForKey(final 
String storeName,
-   final K 
key,
-   final 
Serializer keySerializer,
-   final 
String topologyName) {
+public synchronized  KeyQueryMetadata keyQueryMetadataForKey(final 
String storeName,
+final K 
key,
+final 
Serializer keySerializer,
+final 
String topologyName) {
 Objects.requireNonNull(keySerializer, "keySerializer can't be null");
-return getKeyQueryMetadataForKey(storeName,
+return keyQueryMetadataForKey(storeName,
  key,
  new 
DefaultStreamPartitioner<>(keySerializer),
  topologyName);

Review Comment:
   please update error message (line#257) also



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##
@@ -206,29 +206,29 @@ public synchronized Collection 
getAllMetadataForTopology(final
  * if streams is (re-)initializing or {@code null} if the corresponding 
topic cannot be found,
  * or null if no matching metadata could be found.
  */
-public synchronized  KeyQueryMetadata getKeyQueryMetadataForKey(final 
String storeName,
-   final K 
key,
-   final 
Serializer keySerializer) {
+public synchronized  KeyQueryMetadata keyQueryMetadataForKey(final 
String storeName,
+final K 
key,
+final 
Serializer keySerializer) {
 Objects.requireNonNull(keySerializer, "keySerializer can't be null");
 if (topologyMetadata.hasNamedTopologies()) {
 throw new IllegalArgumentException("Cannot invoke the 
getKeyQueryMetadataForKey(storeName, key, keySerializer)"

Review Comment:
   please update the error message also



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##
@@ -112,7 +112,7 @@ public Collection getAllMetadata() {
  * @param storeName the storeName to find metadata for
  * @return A collection of {@link StreamsMetadata} that have the provided 
storeName
  */
-public synchronized Collection 
getAllMetadataForStore(final String storeName) {
+public synchronized Collection allMetadataForStore(final 
String storeName) {
 Objects.requ

[PR] KAFKA-17390: Add a checkstyle rule to suppress all generated code [kafka]

2024-08-25 Thread via GitHub


xijiu opened a new pull request, #16997:
URL: https://github.com/apache/kafka/pull/16997

   1. remove `src/generated/java` from core module
   2. add `` to the `suppressions.xml` to suppress all generated 
code
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17416: Add a checkstyle rule to suppress all generated code [kafka]

2024-08-25 Thread via GitHub


xijiu closed pull request #16997: KAFKA-17416: Add a checkstyle rule to 
suppress all generated code
URL: https://github.com/apache/kafka/pull/16997


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-17416: Add a checkstyle rule to suppress all generated code [kafka]

2024-08-25 Thread via GitHub


xijiu opened a new pull request, #16998:
URL: https://github.com/apache/kafka/pull/16998

   1. remove `src/generated/java` from core module
   2. add `` to the `suppressions.xml` to suppress all generated 
code
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17416: Add a checkstyle rule to suppress all generated code [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on code in PR #16998:
URL: https://github.com/apache/kafka/pull/16998#discussion_r1730338891


##
checkstyle/suppressions.xml:
##
@@ -237,6 +237,7 @@
 
 
 
+

Review Comment:
   Please remove other generated-related suppression. they are redundant now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17416: Add a checkstyle rule to suppress all generated code [kafka]

2024-08-25 Thread via GitHub


xijiu commented on code in PR #16998:
URL: https://github.com/apache/kafka/pull/16998#discussion_r1730342466


##
checkstyle/suppressions.xml:
##
@@ -237,6 +237,7 @@
 
 
 
+

Review Comment:
   > Please remove other generated-related suppression. they are redundant now
   
   Agree, I have already removed them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17306; Soften the validation when replaying tombstones [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on code in PR #16898:
URL: https://github.com/apache/kafka/pull/16898#discussion_r1730343775


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3225,7 +3228,14 @@ public void replay(
 .updateWith(value)
 .build());
 } else {
-ConsumerGroupMember oldMember = 
consumerGroup.getOrMaybeCreateMember(memberId, false);
+ConsumerGroupMember oldMember;
+try {
+oldMember = consumerGroup.getOrMaybeCreateMember(memberId, 
false);

Review Comment:
   Could you please add `UnknownMemberIdException` to `getOrMaybeCreateMember` 
signature? 



##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -156,17 +164,42 @@ class CoordinatorLoaderImpl[T](
   } else {
 batch.asScala.foreach { record =>
   numRecords = numRecords + 1
-  try {
-coordinator.replay(
-  record.offset(),
-  batch.producerId,
-  batch.producerEpoch,
-  deserializer.deserialize(record.key, record.value)
-)
-  } catch {
-case ex: UnknownRecordTypeException =>
-  warn(s"Unknown record type ${ex.unknownType} while 
loading offsets and group metadata " +
-s"from $tp. Ignoring it. It could be a left over from 
an aborted upgrade.")
+
+  val coordinatorRecordOpt = {
+try {
+  Some(deserializer.deserialize(record.key, record.value))
+} catch {
+  case ex: UnknownRecordTypeException =>
+warn(s"Unknown record type ${ex.unknownType} while 
loading offsets and group metadata " +
+  s"from $tp. Ignoring it. It could be a left over 
from an aborted upgrade.")
+None
+  case ex: RuntimeException =>
+val msg = s"Deserializing record $record from $tp 
failed due to: ${ex.getMessage}"
+error(s"$msg.")

Review Comment:
   How about `error(msg)`?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3215,7 +3211,14 @@ public void replay(
 String groupId = key.groupId();
 String memberId = key.memberId();
 
-ConsumerGroup consumerGroup = 
getOrMaybeCreatePersistedConsumerGroup(groupId, value != null);
+ConsumerGroup consumerGroup;
+try {
+consumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, 
value != null);
+} catch (IllegalStateException ex) {
+// If the group does not exist and a tombstone is replayed, we can 
ignore it.

Review Comment:
   > Is there any other reason we could throw this IllegalStateException
   
   I have the same question. What if the group type is not `CONSUMER`?



##
core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala:
##
@@ -12,53 +12,332 @@
  */
 package kafka.api
 
-import kafka.integration.KafkaServerTestHarness
 import kafka.log.UnifiedLog
-import kafka.server.KafkaConfig
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
+import kafka.test.junit.ClusterTestExtensions
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.OffsetAndMetadata
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.clients.admin.{Admin, ConsumerGroupDescription}
+import org.apache.kafka.clients.consumer.{Consumer, GroupProtocol, 
OffsetAndMetadata}
+import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture, 
TopicPartition}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
 
 import scala.jdk.CollectionConverters._
-import java.util.Properties
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record.CompressionType
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.server.config.ServerConfigs
+import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.api.extension.ExtendWith
 
-class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
-  val offsetsTopicCompressionCodec = CompressionType.GZIP
-  val overridingProps = new Properties()
-  overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 
"1")
-  
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG,
 offsetsTopicCompressionCodec.id.toString)
+import java.time.Duration
+import java.util.Collections
+import java.util.concurrent.TimeUnit
 
-  override def generateConfigs = TestUt

Re: [PR] KAFKA-12829: Remove deprecated methods and classes of old Processor API [kafka]

2024-08-25 Thread via GitHub


pegasas commented on code in PR #16791:
URL: https://github.com/apache/kafka/pull/16791#discussion_r1730390328


##
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##
@@ -820,110 +778,6 @@ public synchronized  Topology 
addReadOnlyStateStore(final StoreBuilder
 );
 }
 
-/**
- * Adds a global {@link StateStore} to the topology.
- * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
- * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
- * 
- * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
- * of the input topic.
- * 
- * The provided {@link 
org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an 
{@link ProcessorNode} that will receive all
- * records forwarded from the {@link SourceNode}.
- * The supplier should always generate a new instance each time
- * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} gets 
called. Creating a single
- * {@link org.apache.kafka.streams.processor.Processor} object and 
returning the same object reference in
- * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} 
would be a violation of the supplier pattern
- * and leads to runtime exceptions.
- * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
- * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
- *
- * @param storeBuilder  user defined state store builder
- * @param sourceNamename of the {@link SourceNode} that will 
be automatically added
- * @param keyDeserializer   the {@link Deserializer} to deserialize 
keys with
- * @param valueDeserializer the {@link Deserializer} to deserialize 
values with
- * @param topic the topic to source the data from
- * @param processorName the name of the {@link 
org.apache.kafka.streams.processor.ProcessorSupplier}
- * @param stateUpdateSupplier   the instance of {@link 
org.apache.kafka.streams.processor.ProcessorSupplier}
- * @return itself
- * @throws TopologyException if the processor of state is already 
registered
- * @deprecated Since 2.7.0. Use {@link #addGlobalStore(StoreBuilder, 
String, Deserializer, Deserializer, String, String, ProcessorSupplier)} instead.
- */
-@Deprecated
-public synchronized  Topology addGlobalStore(final StoreBuilder 
storeBuilder,
-   final String sourceName,
-   final Deserializer 
keyDeserializer,
-   final Deserializer 
valueDeserializer,
-   final String topic,
-   final String 
processorName,
-   final 
org.apache.kafka.streams.processor.ProcessorSupplier stateUpdateSupplier) 
{
-internalTopologyBuilder.addGlobalStore(
-new StoreBuilderWrapper(storeBuilder),
-sourceName,
-null,
-keyDeserializer,
-valueDeserializer,
-topic,
-processorName,
-() -> ProcessorAdapter.adapt(stateUpdateSupplier.get()),
-true
-);
-return this;
-}
-
-/**
- * Adds a global {@link StateStore} to the topology.
- * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
- * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
- * 
- * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
- * of the input topic.
- * 
- * The provided {@link 
org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an 
{@link ProcessorNode} that will receive all
- * records forwarded from the {@link SourceNode}.
- * The supplier should always generate a new instance each time
- * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} gets 
called. Creating a single
- * {@link org.apache.kafka.streams.processor.Processor} object and 
returning the same object reference in
- * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} 
would be a violation of the supplier pattern
- * and leads to runtime exceptions.
- * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
- *
- * @param storeBuilder  user defined key value store builder
- * @param sourceNamename of the {@link SourceNode} that will 
be automatically added
- * @param timestampExtractort

Re: [PR] KAFKA-12829: Remove deprecated methods and classes of old Processor API [kafka]

2024-08-25 Thread via GitHub


pegasas commented on PR #16791:
URL: https://github.com/apache/kafka/pull/16791#issuecomment-2308913465

   > @pegasas -- Any updates on this PR?
   
   org.apache.kafka.streams.Topology#addProcessor(java.lang.String, 
org.apache.kafka.streams.processor.ProcessorSupplier, java.lang.String...)
   
   It looks like this method still uses by examples. shall we remove it in this 
loop?
   
![image](https://github.com/user-attachments/assets/1901371b-c700-4ec9-9b21-1bb6c976e282)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12829: Remove deprecated methods and classes of old Processor API [kafka]

2024-08-25 Thread via GitHub


pegasas commented on code in PR #16791:
URL: https://github.com/apache/kafka/pull/16791#discussion_r1730392424


##
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java:
##
@@ -292,7 +292,7 @@ Cancellable schedule(final Duration interval,
  * (including the currently processed record), i.e., it can be considered 
a high-watermark.
  * Stream-time is tracked on a per-task basis and is preserved across 
restarts and during task migration.
  *
- *  Note: this method is not supported for global processors (cf. 
{@link Topology#addGlobalStore} (...)

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16331) Remove Deprecated EOSv1

2024-08-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876530#comment-17876530
 ] 

Matthias J. Sax commented on KAFKA-16331:
-

This is a very complicated ticket... I would not recommend you to pick it up. I 
think it's best if a seasoned contributor (or even better a committer) works on 
this one.

> Remove Deprecated EOSv1
> ---
>
> Key: KAFKA-16331
> URL: https://issues.apache.org/jira/browse/KAFKA-16331
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Blocker
> Fix For: 4.0.0
>
>
> EOSv1 was deprecated in AK 3.0 via 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2]
>  * remove conifg
>  * remove Producer#sendOffsetsToTransaction
>  * cleanup code



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: remove get prefix for internal IQ methods [kafka]

2024-08-25 Thread via GitHub


mjsax commented on code in PR #16954:
URL: https://github.com/apache/kafka/pull/16954#discussion_r1730404618


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##
@@ -206,29 +206,29 @@ public synchronized Collection 
getAllMetadataForTopology(final
  * if streams is (re-)initializing or {@code null} if the corresponding 
topic cannot be found,
  * or null if no matching metadata could be found.
  */
-public synchronized  KeyQueryMetadata getKeyQueryMetadataForKey(final 
String storeName,
-   final K 
key,
-   final 
Serializer keySerializer) {
+public synchronized  KeyQueryMetadata keyQueryMetadataForKey(final 
String storeName,
+final K 
key,
+final 
Serializer keySerializer) {
 Objects.requireNonNull(keySerializer, "keySerializer can't be null");
 if (topologyMetadata.hasNamedTopologies()) {
 throw new IllegalArgumentException("Cannot invoke the 
getKeyQueryMetadataForKey(storeName, key, keySerializer)"

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove get prefix for internal IQ methods [kafka]

2024-08-25 Thread via GitHub


mjsax commented on code in PR #16954:
URL: https://github.com/apache/kafka/pull/16954#discussion_r1730404855


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java:
##
@@ -246,7 +246,7 @@ default boolean commitRequested() {
 
 // IQ related methods
 
-StateStore getStore(final String name);
+StateStore store(final String name);

Review Comment:
   Yes, but I will do this in a follow up PR to keep the scope of individual 
PRs small.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17354) StreamThread::setState race condition causes java.lang.RuntimeException: State mismatch PENDING_SHUTDOWN different from STARTING

2024-08-25 Thread Anton Liauchuk (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876537#comment-17876537
 ] 

Anton Liauchuk commented on KAFKA-17354:


[~aoli-al] 
Didn't reproduce the issue with the commit mentioned in the description. Are 
there any additional changes required to reproduce? Could you please check?

> StreamThread::setState race condition causes java.lang.RuntimeException: 
> State mismatch PENDING_SHUTDOWN different from STARTING
> 
>
> Key: KAFKA-17354
> URL: https://issues.apache.org/jira/browse/KAFKA-17354
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Ao Li
>Assignee: Anton Liauchuk
>Priority: Major
>
> I saw a test failure in `StreamThreadTest::shouldChangeStateAtStartClose`. A 
> race condition in `setState` causes an uncaught exception thrown in 
> `StateListenerStub`. 
> Basically, the function `setState` allows two threads to call 
> `stateListener.onChange` concurrently. 
> This patch will help you to reproduce the failure deterministically. 
> https://github.com/aoli-al/kafka/commit/033a9a33766740e6843effb9beabfdcb3804846b



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16331) Remove Deprecated EOSv1

2024-08-25 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876539#comment-17876539
 ] 

Ismael Juma commented on KAFKA-16331:
-

Seems reasonable. The main thing to call out is the AK 2.5 or newer 
requirement, right?

> Remove Deprecated EOSv1
> ---
>
> Key: KAFKA-16331
> URL: https://issues.apache.org/jira/browse/KAFKA-16331
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Blocker
> Fix For: 4.0.0
>
>
> EOSv1 was deprecated in AK 3.0 via 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2]
>  * remove conifg
>  * remove Producer#sendOffsetsToTransaction
>  * cleanup code



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16331) Remove Deprecated EOSv1

2024-08-25 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876539#comment-17876539
 ] 

Ismael Juma edited comment on KAFKA-16331 at 8/25/24 7:28 PM:
--

Seems reasonable. The main thing to call out is the Kafka broker 2.5 or newer 
requirement, right?


was (Author: ijuma):
Seems reasonable. The main thing to call out is the AK 2.5 or newer 
requirement, right?

> Remove Deprecated EOSv1
> ---
>
> Key: KAFKA-16331
> URL: https://issues.apache.org/jira/browse/KAFKA-16331
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Blocker
> Fix For: 4.0.0
>
>
> EOSv1 was deprecated in AK 3.0 via 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2]
>  * remove conifg
>  * remove Producer#sendOffsetsToTransaction
>  * cleanup code



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17354) StreamThread::setState race condition causes java.lang.RuntimeException: State mismatch PENDING_SHUTDOWN different from STARTING

2024-08-25 Thread Ao Li (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876540#comment-17876540
 ] 

Ao Li commented on KAFKA-17354:
---

[~anton.liauchuk] Yes, I'm able to reproduce the failure. Note that you will 
only see the exception, but the test will still be passed because Junit only 
marks test failures if there are exceptions from the main thread.

I've pushed another commit to the fork 
https://github.com/aoli-al/kafka/tree/KAFKA-63 to propagate the error to the 
main thread. If you run `./gradlew  streams:test --rerun --tests 
StreamThreadTest.shouldChangeStateAtStartClos` you will see the test failure.  

> StreamThread::setState race condition causes java.lang.RuntimeException: 
> State mismatch PENDING_SHUTDOWN different from STARTING
> 
>
> Key: KAFKA-17354
> URL: https://issues.apache.org/jira/browse/KAFKA-17354
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Ao Li
>Assignee: Anton Liauchuk
>Priority: Major
>
> I saw a test failure in `StreamThreadTest::shouldChangeStateAtStartClose`. A 
> race condition in `setState` causes an uncaught exception thrown in 
> `StateListenerStub`. 
> Basically, the function `setState` allows two threads to call 
> `stateListener.onChange` concurrently. 
> This patch will help you to reproduce the failure deterministically. 
> https://github.com/aoli-al/kafka/commit/033a9a33766740e6843effb9beabfdcb3804846b



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-12829: Remove deprecated methods and classes of old Processor API [kafka]

2024-08-25 Thread via GitHub


mjsax commented on PR #16791:
URL: https://github.com/apache/kafka/pull/16791#issuecomment-2308982763

   There is two overloads. The old one, using 
`org.apache.kafka.streams.processor.ProcessorSupplier`
   ```
   org.apache.kafka.streams.Topology#addProcessor(java.lang.String, 
org.apache.kafka.streams.processor.ProcessorSupplier, java.lang.String...)
   ```
   
   And the new one using 
`org.apache.kafka.streams.processor.api.ProcessorSupplier`
   ```
   org.apache.kafka.streams.Topology#addProcessor(java.lang.String, 
org.apache.kafka.streams.processor.ProcessorSupplier, 
java.lang.String...)
   ```
   
   Note the different package names. Only the old one is deprecated is should 
be removed, and as far as I can tell, it's only used in some test (which can 
also be removed).
   
   The screenshot you posted seem to refer to the new one though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12829: Remove deprecated methods and classes of old Processor API [kafka]

2024-08-25 Thread via GitHub


mjsax commented on code in PR #16791:
URL: https://github.com/apache/kafka/pull/16791#discussion_r1730426306


##
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java:
##
@@ -292,8 +292,7 @@ Cancellable schedule(final Duration interval,
  * (including the currently processed record), i.e., it can be considered 
a high-watermark.
  * Stream-time is tracked on a per-task basis and is preserved across 
restarts and during task migration.
  *
- *  Note: this method is not supported for global processors (cf. 
{@link Topology#addGlobalStore} (...)
- * and {@link StreamsBuilder#addGlobalStore} (...),
+ *  Note: this method is not supported for global processors {@link 
StreamsBuilder#addGlobalStore} (...)

Review Comment:
   ```suggestion
*  Note: this method is not supported for global processors {@link 
Topology#addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, 
String, String, ProcessorSupplier)}```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12829: Remove deprecated methods and classes of old Processor API [kafka]

2024-08-25 Thread via GitHub


mjsax commented on code in PR #16791:
URL: https://github.com/apache/kafka/pull/16791#discussion_r1730426384


##
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java:
##
@@ -292,8 +292,7 @@ Cancellable schedule(final Duration interval,
  * (including the currently processed record), i.e., it can be considered 
a high-watermark.
  * Stream-time is tracked on a per-task basis and is preserved across 
restarts and during task migration.
  *
- *  Note: this method is not supported for global processors (cf. 
{@link Topology#addGlobalStore} (...)
- * and {@link StreamsBuilder#addGlobalStore} (...),
+ *  Note: this method is not supported for global processors {@link 
StreamsBuilder#addGlobalStore} (...)

Review Comment:
   This not correct. Should be something like this ^



##
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java:
##
@@ -292,8 +292,7 @@ Cancellable schedule(final Duration interval,
  * (including the currently processed record), i.e., it can be considered 
a high-watermark.
  * Stream-time is tracked on a per-task basis and is preserved across 
restarts and during task migration.
  *
- *  Note: this method is not supported for global processors (cf. 
{@link Topology#addGlobalStore} (...)
- * and {@link StreamsBuilder#addGlobalStore} (...),
+ *  Note: this method is not supported for global processors {@link 
StreamsBuilder#addGlobalStore} (...)

Review Comment:
   ```suggestion
*  Note: this method is not supported for global processors {@link 
Topology#addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, 
String, String, ProcessorSupplier)}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12829: Remove deprecated methods and classes of old Processor API [kafka]

2024-08-25 Thread via GitHub


mjsax commented on code in PR #16791:
URL: https://github.com/apache/kafka/pull/16791#discussion_r1730426457


##
streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java:
##
@@ -201,8 +201,7 @@ Cancellable schedule(final Duration interval,
  * (including the currently processed record), i.e., it can be considered 
a high-watermark.
  * Stream-time is tracked on a per-task basis and is preserved across 
restarts and during task migration.
  *
- *  Note: this method is not supported for global processors (cf. 
{@link Topology#addGlobalStore} (...)
- * and {@link StreamsBuilder#addGlobalStore} (...),
+ *  Note: this method is not supported for global processors  {@link 
StreamsBuilder#addGlobalStore} (...)

Review Comment:
   ```suggestion
*  Note: this method is not supported for global processors {@link 
Topology#addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, 
String, String, ProcessorSupplier)}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12829: Remove deprecated methods and classes of old Processor API [kafka]

2024-08-25 Thread via GitHub


mjsax commented on PR #16791:
URL: https://github.com/apache/kafka/pull/16791#issuecomment-2308985631

   The build has checkstyle errors:
   
   ```
   > Task :streams:checkstyleMain
   --
     | 2993 | 09:39:12 AM | [ant:checkstyle] [ERROR] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-16791/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java:23:8:
 Unused import - org.apache.kafka.streams.Topology. [UnusedImports]
     | 2994 | 09:39:12 AM | [ant:checkstyle] [ERROR] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-16791/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java:22:8:
 Unused import - org.apache.kafka.streams.Topology. [UnusedImports]
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16335: Remove deprecated method of StreamPartitioner [kafka]

2024-08-25 Thread via GitHub


mjsax commented on code in PR #15482:
URL: https://github.com/apache/kafka/pull/15482#discussion_r1730427154


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##
@@ -570,7 +570,7 @@ public Optional> partitions(final String 
topic, final String key, f
 public void shouldUseDefaultPartitionerAsPartitionReturnsNull() {
 
 final StreamPartitioner streamPartitioner =
-(topic, key, value, numPartitions) -> null;
+(topic, key, value, numPartitions) -> Optional.empty();

Review Comment:
   The test name should be updated... we used `null` for the old method, but 
`empty` for the new one, which we test now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16335: Remove deprecated method of StreamPartitioner [kafka]

2024-08-25 Thread via GitHub


mjsax commented on code in PR #15482:
URL: https://github.com/apache/kafka/pull/15482#discussion_r1730427271


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##
@@ -1230,7 +1230,7 @@ private  KTable 
doJoinOnForeignKey(final KTable forei
 
 final StreamPartitioner> 
foreignResponseSinkPartitioner =
 tableJoinedInternal.partitioner() == null
-? (topic, key, subscriptionResponseWrapper, 
numPartitions) -> 
Optional.of(Collections.singleton(subscriptionResponseWrapper.getPrimaryPartition()))
+? (topic, key, val, numPartitions) -> 
val.getPrimaryPartition() == null ? Optional.empty() : 
Optional.of(Collections.singleton(val.getPrimaryPartition()))

Review Comment:
   Why did you rename `subscriptionResponseWrapper` to `val`? The old name seem 
to be much more descriptive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16335: Remove deprecated method of StreamPartitioner [kafka]

2024-08-25 Thread via GitHub


mjsax commented on code in PR #15482:
URL: https://github.com/apache/kafka/pull/15482#discussion_r1730427271


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##
@@ -1230,7 +1230,7 @@ private  KTable 
doJoinOnForeignKey(final KTable forei
 
 final StreamPartitioner> 
foreignResponseSinkPartitioner =
 tableJoinedInternal.partitioner() == null
-? (topic, key, subscriptionResponseWrapper, 
numPartitions) -> 
Optional.of(Collections.singleton(subscriptionResponseWrapper.getPrimaryPartition()))
+? (topic, key, val, numPartitions) -> 
val.getPrimaryPartition() == null ? Optional.empty() : 
Optional.of(Collections.singleton(val.getPrimaryPartition()))

Review Comment:
   Why did you rename `subscriptionResponseWrapper` to `val`? The old name seem 
to be much more descriptive.
   
   Nit: should we call `val.getPrimaryPartition()` twice as the code does now, 
or better introduce a variable which stores the result and only call it once?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Kafka Streams tests from Zookeeper to KRaft [kafka]

2024-08-25 Thread via GitHub


mjsax commented on PR #15341:
URL: https://github.com/apache/kafka/pull/15341#issuecomment-2308987847

   > I have some capacity to pickup this one.
   
   Nice! This is highly appeciated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12829: Remove the deprecated method `init(ProcessorContext, StateStore)` from the `StateStore` interface [kafka]

2024-08-25 Thread via GitHub


mjsax commented on code in PR #16906:
URL: https://github.com/apache/kafka/pull/16906#discussion_r1730428926


##
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java:
##
@@ -72,14 +71,6 @@ class CachingWindowStore
 this.maxObservedTimestamp = new AtomicLong(RecordQueue.UNKNOWN);
 }
 
-@Deprecated
-@Override
-public void init(final ProcessorContext context, final StateStore root) {
-final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, name(), Boolean.TRUE);
-initInternal(asInternalProcessorContext(context), changelogTopic);
-super.init(context, root);
-}
-
 @Override
 public void init(final StateStoreContext context, final StateStore root) {
 final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, name(), Boolean.TRUE);

Review Comment:
   Can we inline `initInternal` here, too?



##
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java:
##
@@ -95,16 +93,17 @@ public String name() {
 return name;
 }
 
-@Deprecated
 @Override
-public void init(final ProcessorContext context, final StateStore root) {
+public void init(final StateStoreContext stateStoreContext,
+ final StateStore root) {
+this.stateStoreContext = stateStoreContext;
 final String threadId = Thread.currentThread().getName();
-final String taskName = context.taskId().toString();
+final String taskName = stateStoreContext.taskId().toString();
 
 // The provided context is not required to implement 
InternalProcessorContext,
 // If it doesn't, we can't record this metric.
-if (context instanceof InternalProcessorContext) {
-this.context = (InternalProcessorContext) context;
+if (stateStoreContext instanceof InternalProcessorContext) {

Review Comment:
   Should we refactor this method and use `asInternalContext` similar to other 
methods instead?



##
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java:
##
@@ -196,15 +195,6 @@ public void setSerdesIfNull(final SerdeGetter getter) {
 keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde;
 valueSerde = valueSerde == null ? FullChangeSerde.wrap((Serde) 
getter.valueSerde()) : valueSerde;
 }
-
-@Deprecated
-@Override
-public void init(final ProcessorContext context, final StateStore root) {
-this.context = 
ProcessorContextUtils.asInternalProcessorContext(context);
-changelogTopic = ProcessorContextUtils.changelogFor(context, name(), 
Boolean.TRUE);
-init(root);

Review Comment:
   Seem this `init` is the same and `internalInit` in other classes -- can we 
inline it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12824: Delete unused doBranch method [kafka]

2024-08-25 Thread via GitHub


mjsax commented on PR #16981:
URL: https://github.com/apache/kafka/pull/16981#issuecomment-2308994715

   Thank you both!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16330 : remove deprecated methods of TaskId, private params [kafka]

2024-08-25 Thread via GitHub


mjsax commented on PR #16985:
URL: https://github.com/apache/kafka/pull/16985#issuecomment-2308995761

   On build failed with compilation error:
   ```
   > Task :streams:compileJava
   --
   more_vert | 2701 | 06:13:33 AM | 
/home/jenkins/workspace/Kafka_kafka-pr_PR-16985/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:481:
 error: incompatible types: inference variable K#1 has incompatible equality 
constraints V#2,K#2,K#3
     | 2702 | 06:13:33 AM | addSink(name, new 
StaticTopicNameExtractor<>(topic), keySerializer, valSerializer, partitioner, 
predecessorNames);
     | 2703 | 06:13:33 AM | ^
     | 2704 | 06:13:33 AM | where K#1,V#1,V#2,K#2,K#3 are type-variables:
     | 2705 | 06:13:33 AM | K#1 extends Object declared in method 
addSink(String,TopicNameExtractor,Serializer,Serializer,StreamPartitioner,String...)
     | 2706 | 06:13:33 AM | V#1 extends Object declared in method 
addSink(String,TopicNameExtractor,Serializer,Serializer,StreamPartitioner,String...)
     | 2707 | 06:13:33 AM | V#2 extends Object declared in method 
addSink(String,String,Serializer,Serializer,StreamPartitioner,String...)
     | 2708 | 06:13:33 AM | K#2 extends Object declared in method 
addSink(String,String,Serializer,Serializer,StreamPartitioner,String...)
     | 2709 | 06:13:33 AM | K#3 extends Object declared in class 
StaticTopicNameExtractor
     | 2720 | 06:13:33 AM | 1 error
     | 2721 | 06:13:33 AM |  
   ```
   
   Not sure how the others could pass? -- There is also bunch of test failures; 
are they related?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16330 : remove deprecated methods of TaskId, private params [kafka]

2024-08-25 Thread via GitHub


mjsax commented on code in PR #16985:
URL: https://github.com/apache/kafka/pull/16985#discussion_r1730431915


##
streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java:
##
@@ -41,10 +34,10 @@ public class TaskId implements Comparable {
 
 /** The ID of the subtopology, aka topicGroupId. */
 @Deprecated

Review Comment:
   We can remove this annotation



##
streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java:
##
@@ -41,10 +34,10 @@ public class TaskId implements Comparable {
 
 /** The ID of the subtopology, aka topicGroupId. */
 @Deprecated
-public final int topicGroupId;
+private final int topicGroupId;
 /** The ID of the partition. */
 @Deprecated

Review Comment:
   As above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16330 : remove deprecated methods of TaskId, private params [kafka]

2024-08-25 Thread via GitHub


mjsax commented on PR #16985:
URL: https://github.com/apache/kafka/pull/16985#issuecomment-2308996189

   This PR should also remove `TaskMetadata#taskId()`, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16330 : remove deprecated methods of TaskId, private params [kafka]

2024-08-25 Thread via GitHub


mjsax commented on PR #16985:
URL: https://github.com/apache/kafka/pull/16985#issuecomment-2308998527

   Ah. Never mind. `TaskMetadata` was remove by the other PR you did already... 
(now I remember why I did leave the commend on the Jira ticket about, both 
should be worked on together...) -- it's easily confusing. :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17371: Flaky test in DefaultTaskExecutorTest.shouldUnassignTaskWhenRequired [kafka]

2024-08-25 Thread via GitHub


mjsax commented on PR #16941:
URL: https://github.com/apache/kafka/pull/16941#issuecomment-2309001082

   > I'm not entirely sure how this method will be used. From the docs, it 
seems that client code can invoke it since it is a public method declared in 
the interface.
   
   It's all internal. Note the package name `...internal...`. So if there is 
any incorrect usage, it's a bug in Kafka Streams itself. A Kafka Streams user, 
would never call it.
   
   Having said this, it could still make sense to refactor to avoid that we 
introduce bug accidentally. However, I am not super familiar with this part of 
the code and thus cannot judge... Maybe @cadonna or @lucasbru can comment on 
this question?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-17216) StreamsConfig STATE_DIR_CONFIG

2024-08-25 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-17216.
-
Resolution: Invalid

Cf the reply on GitHub – caused by version miss-match.

> StreamsConfig STATE_DIR_CONFIG
> --
>
> Key: KAFKA-17216
> URL: https://issues.apache.org/jira/browse/KAFKA-17216
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: raphaelauv
>Priority: Major
>
> I can't use the class StreamsConfig 
> it fail with         Caused by: java.lang.ExceptionInInitializerError at 
> StreamsConfig.java:866
> problem is not present in 3.7.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17100: GlobalStreamThread#start should not busy-wait [kafka]

2024-08-25 Thread via GitHub


mjsax merged PR #16914:
URL: https://github.com/apache/kafka/pull/16914


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17416: Add a checkstyle rule to suppress all generated code [kafka]

2024-08-25 Thread via GitHub


mumrah commented on code in PR #16998:
URL: https://github.com/apache/kafka/pull/16998#discussion_r1730436780


##
checkstyle/suppressions.xml:
##
@@ -221,22 +221,7 @@
   
files="^(?!.*[\\/]org[\\/]apache[\\/]kafka[\\/]streams[\\/].*$)"/>
 
 
-
-
-
-
-
-
-
-
-
-

Review Comment:
   I don't think you meant to delete these



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17100: GlobalStreamThread#start should not busy-wait [kafka]

2024-08-25 Thread via GitHub


mjsax commented on PR #16914:
URL: https://github.com/apache/kafka/pull/16914#issuecomment-2309007974

   Thanks for the PR @raminqaf -- merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17416: Add a checkstyle rule to suppress all generated code [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on PR #16998:
URL: https://github.com/apache/kafka/pull/16998#issuecomment-2309008426

   ```
   
   ```
   ```
   
   ```
   ```
   
   ```
   @xijiu please remove above rules also, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16331) Remove Deprecated EOSv1

2024-08-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876545#comment-17876545
 ] 

Matthias J. Sax commented on KAFKA-16331:
-

Yes, that's also important. Thanks for the reminder.

My main point was really about internal refactoring to simplify the code base 
by removing EOSv1 related code. Strictly speaking, it would be enough to just 
remove the config from `StreamsConfig` so users cannot enable EOSv1 any longer 
to resolve this ticket; internal refactoring could be done afterwards. However, 
I would like to use this ticket for internal refactoring, too (at least he 
lions share – we might do more refactoring down the line...).

> Remove Deprecated EOSv1
> ---
>
> Key: KAFKA-16331
> URL: https://issues.apache.org/jira/browse/KAFKA-16331
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Blocker
> Fix For: 4.0.0
>
>
> EOSv1 was deprecated in AK 3.0 via 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2]
>  * remove conifg
>  * remove Producer#sendOffsetsToTransaction
>  * cleanup code



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17419) Disable SslAdminIntegrationTest#testExpireDelegationToken for 3.8 and 3.7

2024-08-25 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17419:
--

 Summary: Disable SslAdminIntegrationTest#testExpireDelegationToken 
for 3.8 and 3.7
 Key: KAFKA-17419
 URL: https://issues.apache.org/jira/browse/KAFKA-17419
 Project: Kafka
  Issue Type: Sub-task
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


When backport KAFKA-17315, I open KAFKA_17417 as follow-up but I forgot to 
disable `SslAdminIntegrationTest#testExpireDelegationToken` ...

The test currently is failed, so it should be disabled and re-enabled in the 
future by KAFKA-17417



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17360 local log retention ms/bytes "-2" is not treated correctly [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on PR #16995:
URL: https://github.com/apache/kafka/pull/16995#issuecomment-2309011897

   The failed test is caused by my previous backport. I forgot to disable it 
... open https://issues.apache.org/jira/browse/KAFKA-17419 to disable it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14957: Update-Description-String [kafka]

2024-08-25 Thread via GitHub


mjsax commented on code in PR #13909:
URL: https://github.com/apache/kafka/pull/13909#discussion_r1730439203


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -839,7 +839,8 @@ public class StreamsConfig extends AbstractConfig {
 Type.STRING,
 System.getProperty("java.io.tmpdir") + File.separator + 
"kafka-streams",
 Importance.HIGH,
-STATE_DIR_DOC)
+STATE_DIR_DOC,
+"${java.io.tmpdir}")

Review Comment:
   Thanks for confirming. No problem at all :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17053: [Minor] Restructure build.gradle to configure publishing last [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on PR #16950:
URL: https://github.com/apache/kafka/pull/16950#issuecomment-2309013403

   > I will modify this PR and refactor build.gradle to move the shadowJar 
block before the publishing block.
   
   @KTKTK-HZ Thanks a bunch! 
   
   BTW, could you please remove the "[Minor]"? The value of patch you will 
contribute is NOT MINOR I feel :smile: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17360 local log retention ms/bytes "-2" is not treated correctly [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on PR #16995:
URL: https://github.com/apache/kafka/pull/16995#issuecomment-2309013599

   another failed test pass on my local. will merge it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17360 local log retention ms/bytes "-2" is not treated correctly [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on PR #16996:
URL: https://github.com/apache/kafka/pull/16996#issuecomment-2309016578

   the failed test is traced by 
https://issues.apache.org/jira/browse/KAFKA-17419. other failed tests pass on 
my local


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17360 local log retention ms/bytes "-2" is not treated correctly [kafka]

2024-08-25 Thread via GitHub


chia7712 merged PR #16996:
URL: https://github.com/apache/kafka/pull/16996


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17360 local log retention ms/bytes "-2" is not treated correctly [kafka]

2024-08-25 Thread via GitHub


chia7712 merged PR #16995:
URL: https://github.com/apache/kafka/pull/16995


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17360) local log retention ms/bytes "-2" is not treated correctly

2024-08-25 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876547#comment-17876547
 ] 

Chia-Ping Tsai commented on KAFKA-17360:


3.7: 
https://github.com/apache/kafka/commit/57b6c2ef98d8177ab0e43f7653c8079d4daa8789

3.8: 
https://github.com/apache/kafka/commit/d9a26a95a70ac5632fb36d7e1da4728a86e789b5

> local log retention ms/bytes "-2" is not treated correctly
> --
>
> Key: KAFKA-17360
> URL: https://issues.apache.org/jira/browse/KAFKA-17360
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Critical
> Fix For: 4.0.0, 3.9.0, 3.7.2, 3.8.1
>
>
> # When the local.retention.ms/bytes is set to -2, we didn't replace it with 
> the server-side retention.ms/bytes config, so the -2 local retention won't 
> take effect.
>  # When setting retention.ms/bytes to -2, we can notice this log message:
> {code:java}
> Deleting segment LogSegment(baseOffset=10045, size=1037087, 
> lastModifiedTime=1724040653922, largestRecordTimestamp=1724040653835) due to 
> local log retention size -2 breach. Local log size after deletion will be 
> 13435280. (kafka.log.UnifiedLog) [kafka-scheduler-6]{code}
> This is not helpful for users. We should replace -2 with real retention value 
> when logging.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-17360) local log retention ms/bytes "-2" is not treated correctly

2024-08-25 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-17360.

Resolution: Fixed

> local log retention ms/bytes "-2" is not treated correctly
> --
>
> Key: KAFKA-17360
> URL: https://issues.apache.org/jira/browse/KAFKA-17360
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Critical
> Fix For: 4.0.0, 3.9.0, 3.7.2, 3.8.1
>
>
> # When the local.retention.ms/bytes is set to -2, we didn't replace it with 
> the server-side retention.ms/bytes config, so the -2 local retention won't 
> take effect.
>  # When setting retention.ms/bytes to -2, we can notice this log message:
> {code:java}
> Deleting segment LogSegment(baseOffset=10045, size=1037087, 
> lastModifiedTime=1724040653922, largestRecordTimestamp=1724040653835) due to 
> local log retention size -2 breach. Local log size after deletion will be 
> 13435280. (kafka.log.UnifiedLog) [kafka-scheduler-6]{code}
> This is not helpful for users. We should replace -2 with real retention value 
> when logging.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17327: Add support of group in kafka-configs.sh [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on code in PR #16887:
URL: https://github.com/apache/kafka/pull/16887#discussion_r1730446002


##
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##
@@ -256,6 +278,32 @@ public void testDynamicBrokerConfigUpdateUsingKraft() 
throws Exception {
 }
 }
 
+@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
+@ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+@ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", 
value = "classic,consumer"),

Review Comment:
   @DL1231 I feel that is false limit. Could you adjust the checkstyle rule?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-17420) Fix flaky StreamThreadTest.tearDown

2024-08-25 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17420:
--

 Summary: Fix flaky StreamThreadTest.tearDown
 Key: KAFKA-17420
 URL: https://issues.apache.org/jira/browse/KAFKA-17420
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


see https://github.com/apache/kafka/actions/runs/10549075483?pr=16954


{code:java}
org.opentest4j.AssertionFailedError: expected:  but was: 
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)
at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:183)
at 
app//org.apache.kafka.streams.processor.internals.StreamThreadTest.tearDown(StreamThreadTest.java:240)
at java.base@17.0.12/java.lang.reflect.Method.invoke(Method.java:569)
at 
java.base@17.0.12/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17419) Disable SslAdminIntegrationTest#testExpireDelegationToken for 3.8 and 3.7

2024-08-25 Thread kangning.li (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876549#comment-17876549
 ] 

kangning.li commented on KAFKA-17419:
-

hi [~chia7712] , I am interested in this issue, cloud you assign it to me? 
Thanks

> Disable SslAdminIntegrationTest#testExpireDelegationToken for 3.8 and 3.7
> -
>
> Key: KAFKA-17419
> URL: https://issues.apache.org/jira/browse/KAFKA-17419
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> When backport KAFKA-17315, I open KAFKA_17417 as follow-up but I forgot to 
> disable `SslAdminIntegrationTest#testExpireDelegationToken` ...
> The test currently is failed, so it should be disabled and re-enabled in the 
> future by KAFKA-17417



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17419) Disable SslAdminIntegrationTest#testExpireDelegationToken for 3.8 and 3.7

2024-08-25 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-17419:
--

Assignee: kangning.li  (was: Chia-Ping Tsai)

> Disable SslAdminIntegrationTest#testExpireDelegationToken for 3.8 and 3.7
> -
>
> Key: KAFKA-17419
> URL: https://issues.apache.org/jira/browse/KAFKA-17419
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: kangning.li
>Priority: Major
>
> When backport KAFKA-17315, I open KAFKA_17417 as follow-up but I forgot to 
> disable `SslAdminIntegrationTest#testExpireDelegationToken` ...
> The test currently is failed, so it should be disabled and re-enabled in the 
> future by KAFKA-17417



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: remove get prefix for internal IQ methods [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on PR #16954:
URL: https://github.com/apache/kafka/pull/16954#issuecomment-2309029491

   I open https://issues.apache.org/jira/browse/KAFKA-17420 to trace 
`StreamThreadTest.tearDown`, and others are known flaky.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove get prefix for internal IQ methods [kafka]

2024-08-25 Thread via GitHub


chia7712 merged PR #16954:
URL: https://github.com/apache/kafka/pull/16954


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17411: Create local state Standbys on start [kafka]

2024-08-25 Thread via GitHub


mjsax commented on code in PR #16922:
URL: https://github.com/apache/kafka/pull/16922#discussion_r1730446985


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
 return stateDirLock != null;
 }
 
+public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+final List nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+final LogContext logContext = new LogContext("main-thread ");
+final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
+final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+// discover all non-empty task directories in StateDirectory
+for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+final String dirName = taskDirectory.file().getName();
+final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
+final ProcessorTopology topology = 
topologyMetadata.buildSubtopology(id);
+final Set inputPartitions = 
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, 
id.partition())).collect(Collectors.toSet());
+
+// create a StandbyTask for each one
+if (topology.hasStateWithChangelogs()) {

Review Comment:
   Why do we need this additional check? -- Given that we have a non-empty task 
directory, it seems redundant? 



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##
@@ -182,6 +199,120 @@ private boolean lockStateDirectory() {
 return stateDirLock != null;
 }
 
+public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+final List nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+final LogContext logContext = new LogContext("main-thread ");
+final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
+final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+// discover all non-empty task directories in StateDirectory
+for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+final String dirName = taskDirectory.file().getName();
+final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
+final ProcessorTopology topology = 
topologyMetadata.buildSubtopology(id);
+final Set inputPartitions = 
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, 
id.partition())).collect(Collectors.toSet());
+
+// create a StandbyTask for each one
+if (topology.hasStateWithChangelogs()) {
+final ProcessorStateManager stateManager = new 
ProcessorStateManager(
+id,
+Task.TaskType.STANDBY,
+eosEnabled,
+logContext,
+this,
+null,
+topology.storeToChangelogTopic(),
+inputPartitions,
+stateUpdaterEnabled
+);
+
+final InternalProcessorContext context = 
new ProcessorContextImpl(
+id,
+config,
+stateManager,
+streamsMetrics,
+dummyCache
+);
+
+final Task task = new StandbyTask(
+id,
+inputPartitions,
+topology,
+topologyMetadata.taskConfig(id),
+streamsMetrics,
+stateManager,
+this,
+dummyCache,
+context
+);
+
+// initialize and suspend new Tasks
+try {
+task.initializeIfNeeded();
+task.suspend();
+
+// add new Tasks to tasksForLocalState
+tasksForLocalState.put(id, task);
+} catch (final TaskCorruptedException e) {
+// Task is corrupt - wipe it 

Re: [PR] KAFKA-17416: Add a checkstyle rule to suppress all generated code [kafka]

2024-08-25 Thread via GitHub


xijiu commented on code in PR #16998:
URL: https://github.com/apache/kafka/pull/16998#discussion_r1730499267


##
checkstyle/suppressions.xml:
##
@@ -221,22 +221,7 @@
   
files="^(?!.*[\\/]org[\\/]apache[\\/]kafka[\\/]streams[\\/].*$)"/>
 
 
-
-
-
-
-
-
-
-
-
-

Review Comment:
   @mumrah  Hi David, thanks very much for code review, my default, I will fix 
it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17416: Add a checkstyle rule to suppress all generated code [kafka]

2024-08-25 Thread via GitHub


xijiu commented on PR #16998:
URL: https://github.com/apache/kafka/pull/16998#issuecomment-2309123331

   @chia7712 @mumrahI have fixed above issues, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17416: Add a checkstyle rule to suppress all generated code [kafka]

2024-08-25 Thread via GitHub


xijiu commented on code in PR #16998:
URL: https://github.com/apache/kafka/pull/16998#discussion_r1730499267


##
checkstyle/suppressions.xml:
##
@@ -221,22 +221,7 @@
   
files="^(?!.*[\\/]org[\\/]apache[\\/]kafka[\\/]streams[\\/].*$)"/>
 
 
-
-
-
-
-
-
-
-
-
-

Review Comment:
   @mumrah  Hi David, thanks very much for code review, my fault, I will fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17399: Apply LambdaValidator to code base [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on PR #16980:
URL: https://github.com/apache/kafka/pull/16980#issuecomment-2309141760

   @xijiu Could you please rebase code?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15909: Throw error when consumer configured with empty/whitespace-only group.id for LegacyKafkaConsumer [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on PR #16933:
URL: https://github.com/apache/kafka/pull/16933#issuecomment-2309143513

   @FrankYang0529 could you please rebase code to trigger QA again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15909: Throw error when consumer configured with empty/whitespace-only group.id for LegacyKafkaConsumer [kafka]

2024-08-25 Thread via GitHub


FrankYang0529 commented on PR #16933:
URL: https://github.com/apache/kafka/pull/16933#issuecomment-2309151303

   > @FrankYang0529 could you please rebase code to trigger QA again?
   
   Rebased it. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17137[part-5]: Ensure Admin APIs are properly tested [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on code in PR #16905:
URL: https://github.com/apache/kafka/pull/16905#discussion_r1730549155


##
core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala:
##
@@ -331,6 +332,54 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
 assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl), 
classOf[InvalidRequestException])
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateDelegationTokenWithLargeTimeout(quorum: String): Unit = {
+client = createAdminClient
+val timeout = Long.MaxValue
+
+val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout)
+val token = client.createDelegationToken(options).delegationToken().get()
+
+
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT,
 token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp)
+assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateDelegationTokenWithNegativeTimeout(quorum: String): Unit = {
+client = createAdminClient
+val timeout = -1
+
+val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout)
+val token = client.createDelegationToken(options).delegationToken().get()
+
+
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT,
 token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp)
+assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testExpiredTimeStampLargerThanMaxLifeStamp(quorum: String): Unit = {
+client = createAdminClient
+val timeout = -1
+
+val createOptions = new 
CreateDelegationTokenOptions().maxlifeTimeMs(timeout)
+val token = 
client.createDelegationToken(createOptions).delegationToken().get()
+
+
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT,
 token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp)
+assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp)
+
+TestUtils.waitUntilTrue(() => brokers.forall(server => 
server.tokenCache.tokens().size() == 1),
+  "Timed out waiting for token to propagate to all servers")
+
+val expiredOptions = new 
ExpireDelegationTokenOptions().expiryTimePeriodMs(token.tokenInfo.maxTimestamp)

Review Comment:
   All we want to verify is the expired ts is never larger the max timestamp, 
so we should set `maxTimestamp + 1`, right?



##
core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala:
##
@@ -331,6 +332,54 @@ class SaslSslAdminIntegrationTest extends 
BaseAdminIntegrationTest with SaslSetu
 assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl), 
classOf[InvalidRequestException])
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateDelegationTokenWithLargeTimeout(quorum: String): Unit = {
+client = createAdminClient
+val timeout = Long.MaxValue
+
+val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout)
+val token = client.createDelegationToken(options).delegationToken().get()
+
+
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT,
 token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp)
+assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateDelegationTokenWithNegativeTimeout(quorum: String): Unit = {
+client = createAdminClient
+val timeout = -1
+
+val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout)
+val token = client.createDelegationToken(options).delegationToken().get()
+
+
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT,
 token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp)
+assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp)
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testExpiredTimeStampLargerThanMaxLifeStamp(quorum: String): Unit = {
+client = createAdminClient
+val timeout = -1
+
+val createOptions = new 
CreateDelegationTokenOptions().maxlifeTimeMs(timeout)
+val token = 
client.createDelegationToken(createOptions).delegationToken().get()
+
+
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT,
 token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp)
+assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp)
+
+TestUtils.waitUntilTrue(() => brokers.forall(server => 
server.tokenCache.tokens().size() == 1),
+  "Timed out waiting for token to propag

[PR] KAFKA-17382: cleanup out-of-date configs of config_property [kafka]

2024-08-25 Thread via GitHub


gongxuanzhang opened a new pull request, #16999:
URL: https://github.com/apache/kafka/pull/16999

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17137: Feat admin client it acl configs [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on code in PR #16648:
URL: https://github.com/apache/kafka/pull/16648#discussion_r1730562364


##
clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java:
##
@@ -25,10 +25,19 @@
  * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
-public class ExpireDelegationTokenOptions extends 
AbstractOptions {
+public class ExpireDelegationTokenOptions

Review Comment:
   Could you please revert this unrelated change?



##
clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java:
##
@@ -25,10 +25,19 @@
  * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
-public class ExpireDelegationTokenOptions extends 
AbstractOptions {
+public class ExpireDelegationTokenOptions
+extends AbstractOptions {
+
 private long expiryTimePeriodMs = -1L;
 
-public ExpireDelegationTokenOptions expiryTimePeriodMs(long 
expiryTimePeriodMs) {
+/**
+ * @param expiryTimePeriodMs the time period until we should expire this 
token.
+ * {@code expiryTimePeriodMs} >= 0: the token will update the `expiration 
timestamp` if the current expiration timestamp is small than (now + 
expiryTimePeriodMs).

Review Comment:
   Sorry that could you please revise it to "{@code expiryTimePeriodMs} >= 0: 
the token will update the `expiration timestamp` to min(now + 
expiryTimePeriodMs, maxTimestamp)`



##
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##
@@ -106,19 +150,22 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 config.put(AdminClientConfig.CLIENT_ID_CONFIG, clientId)
 client = Admin.create(config)
 
-val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> 
clientId).asJava)
-val configEntries = 
Map(QuotaConfigs.CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG -> 1.0, 
QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 3.0)
-client.alterClientQuotas(Seq(new ClientQuotaAlteration(entity, 
configEntries.map {case (k, v) =>
-  new 
ClientQuotaAlteration.Op(k,v)}.asJavaCollection)).asJavaCollection).all.get
+try {
+  val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> 
clientId).asJava)
+  val configEntries = 
Map(QuotaConfigs.CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG -> 1.0, 
QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 3.0)
+  client.alterClientQuotas(Seq(new ClientQuotaAlteration(entity, 
configEntries.map { case (k, v) =>
+new ClientQuotaAlteration.Op(k, v)
+  }.asJavaCollection)).asJavaCollection).all.get
 
-TestUtils.waitUntilTrue(() => {
-  // wait for our ClientQuotaEntity to be set
-  
client.describeClientQuotas(ClientQuotaFilter.all()).entities().get().size == 1
-}, "Timed out waiting for quota config to be propagated to all servers")
+  TestUtils.waitUntilTrue(() => {
+// wait for our ClientQuotaEntity to be set
+
client.describeClientQuotas(ClientQuotaFilter.all()).entities().get().size == 1
+  }, "Timed out waiting for quota config to be propagated to all servers")
 
-val quotaEntities = 
client.describeClientQuotas(ClientQuotaFilter.all()).entities().get()
+  val quotaEntities = 
client.describeClientQuotas(ClientQuotaFilter.all()).entities().get()
 
-assertEquals(configEntries,quotaEntities.get(entity).asScala)
+  assertEquals(configEntries, quotaEntities.get(entity).asScala)
+} finally client.close(time.Duration.ZERO)

Review Comment:
   this client is created normally (correct port), so it can be closed as 
usual. that means we don't need to add try-finally, as it will be closed by the 
`@AfterEach`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17382: cleanup out-of-date configs of config_property [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on PR #16999:
URL: https://github.com/apache/kafka/pull/16999#issuecomment-2309176757

   @gongxuanzhang thanks for this patch. the changes are beyond the jira. Could 
you please focus on "config_property" file?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17327: Add support of group in kafka-configs.sh [kafka]

2024-08-25 Thread via GitHub


DL1231 commented on code in PR #16887:
URL: https://github.com/apache/kafka/pull/16887#discussion_r1730575833


##
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##
@@ -256,6 +278,32 @@ public void testDynamicBrokerConfigUpdateUsingKraft() 
throws Exception {
 }
 }
 
+@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = {
+@ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+@ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", 
value = "classic,consumer"),

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17382: cleanup out-of-date configs of config_property [kafka]

2024-08-25 Thread via GitHub


gongxuanzhang closed pull request #16999: KAFKA-17382: cleanup out-of-date 
configs of config_property
URL: https://github.com/apache/kafka/pull/16999


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12829: Remove deprecated Topology#addGlobalStore of old Processor API [kafka]

2024-08-25 Thread via GitHub


pegasas commented on PR #16791:
URL: https://github.com/apache/kafka/pull/16791#issuecomment-2309191228

   > The build has checkstyle errors:
   > 
   > ```
   > > Task :streams:checkstyleMain
   > --
   >   | 2993 | 09:39:12 AM | [ant:checkstyle] [ERROR] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-16791/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java:23:8:
 Unused import - org.apache.kafka.streams.Topology. [UnusedImports]
   >   | 2994 | 09:39:12 AM | [ant:checkstyle] [ERROR] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-16791/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java:22:8:
 Unused import - org.apache.kafka.streams.Topology. [UnusedImports]
   > ```
   
   fixed these checkstyle errors.
   Is there any commands with gradle which I can run in local? like mvn 
spotless:apply or something?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17367: Share coordinator impl. Introduce infra classes [1/N] [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on code in PR #16921:
URL: https://github.com/apache/kafka/pull/16921#discussion_r1730578171


##
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
+import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
+import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
+import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.IntSupplier;
+
+public interface ShareCoordinator {
+short SHARE_SNAPSHOT_RECORD_KEY_VERSION = 0;
+short SHARE_SNAPSHOT_RECORD_VALUE_VERSION = 0;
+short SHARE_UPDATE_RECORD_KEY_VERSION = 1;
+short SHARE_UPDATE_RECORD_VALUE_VERSION = 1;

Review Comment:
   Should we change the version defined in json 
(https://github.com/apache/kafka/blob/trunk/share-coordinator/src/main/resources/common/message/ShareUpdateValue.json#L21)
 to `1` for consistency?



##
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer for {@link CoordinatorRecord}. The format is 
defined below:
+ * 
+ * record_key   = [record_type key_message]
+ * record_value = [value_version value_message]
+ *
+ * record_type : The record type is currently define as the version of 
the key
+ *   {@link ApiMessageAndVersion} object.
+ * key_message : The serialized message of the key {@link 
ApiMessageAndVersion} object.
+ * value_version   : The value version is currently define as the version 
of the value
+ *   {@link ApiMessageAndVersion} object.
+ * value_message   : The serialized message of the value {@link 
ApiMessageAndVersion} object.
+ * 
+ */
+public abstract class CoordinatorRecordSerde implements 
Serializer, Deserializer {
+@Override
+public byte[] serializeKey(CoordinatorRecord record) {
+// Record does not accept a null key.
+return MessageUtil.toVersionPrefixedBytes(
+record.key().version(),
+record.key().message()
+);
+}
+
+@Override
+public byte[] serializeValue(CoordinatorRecord record) {
+// Tombstone is represented with a null value.
+if (record.value() == null) {
+return null;
+} else {
+return MessageUtil.toVersionPrefixedBytes(
+record.value().version(),
+record.value().message()
+);
+}
+}
+
+@Override
+public CoordinatorRecord deserialize(
+ByteBuffer keyBuffer,
+ByteBuffer valueBuffer
+) throws RuntimeE

Re: [PR] KAFKA-14262: Deletion of MirrorMaker v1 deprecated classes & tests [kafka]

2024-08-25 Thread via GitHub


chia7712 commented on PR #16879:
URL: https://github.com/apache/kafka/pull/16879#issuecomment-2309201764

   @abhi-ksolves could you please remove related scripts and e2e?
   
   
https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/mirror_maker.py
   
https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/core/mirror_maker_test.py
   https://github.com/apache/kafka/blob/trunk/bin/kafka-mirror-maker.sh
   https://github.com/apache/kafka/blob/trunk/bin/windows/kafka-mirror-maker.bat
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17399: Apply LambdaValidator to code base [kafka]

2024-08-25 Thread via GitHub


xijiu commented on PR #16980:
URL: https://github.com/apache/kafka/pull/16980#issuecomment-2309230891

   > @xijiu Could you please rebase code?
   
   @chia7712 
   Yeap, I have already rebased the branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17062: handle dangling "copy_segment_start" state when deleting remote logs [kafka]

2024-08-25 Thread via GitHub


showuon commented on code in PR #16959:
URL: https://github.com/apache/kafka/pull/16959#discussion_r1730617565


##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -2530,6 +2635,21 @@ public void 
testDeleteLogSegmentDueToRetentionTimeBreach(int segmentCount,
 verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount, 
currentLeaderEpoch);
 }
 
+private void verifyRemoteDeleteMetrics(long remoteDeleteLagBytes, long 
remoteDeleteLagSegments) {
+assertEquals(remoteDeleteLagBytes, 
safeLongYammerMetricValue("RemoteDeleteLagBytes"),
+String.format("Expected to find %d for RemoteDeleteLagBytes 
metric value, but found %d",
+remoteDeleteLagBytes, 
safeLongYammerMetricValue("RemoteDeleteLagBytes")));
+assertEquals(remoteDeleteLagBytes, 
safeLongYammerMetricValue("RemoteDeleteLagBytes"),
+String.format("Expected to find %d for RemoteDeleteLagBytes 
metric value, but found %d",
+remoteDeleteLagBytes, 
safeLongYammerMetricValue("RemoteDeleteLagBytes")));
+assertEquals(remoteDeleteLagSegments, 
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic),
+String.format("Expected to find %d for RemoteDeleteLagSegments 
for 'Leader' topic metric value, but found %d",
+remoteDeleteLagSegments, 
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic)));
+assertEquals(remoteDeleteLagSegments, 
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic),
+String.format("Expected to find %d for RemoteDeleteLagSegments 
for 'Leader' topic metric value, but found %d",
+remoteDeleteLagSegments, 
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic)));
+}

Review Comment:
   Oh, nice catch! Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14588 Tests for ConfigCommand of DynamicBrokerReconfigurationTest rewritten in java [kafka]

2024-08-25 Thread via GitHub


github-actions[bot] commented on PR #15848:
URL: https://github.com/apache/kafka/pull/15848#issuecomment-2309238540

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14588 ConfigCommand rewritten to java [kafka]

2024-08-25 Thread via GitHub


github-actions[bot] commented on PR #15417:
URL: https://github.com/apache/kafka/pull/15417#issuecomment-2309238666

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-17382: cleanup out-of-date configs of config_property [kafka]

2024-08-25 Thread via GitHub


gongxuanzhang opened a new pull request, #17000:
URL: https://github.com/apache/kafka/pull/17000

   fix https://issues.apache.org/jira/browse/KAFKA-17382


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17397) Ensure ClassicKafkaConsumer sends leave request on close even if interrupted

2024-08-25 Thread Arpit Goyal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876576#comment-17876576
 ] 

Arpit Goyal commented on KAFKA-17397:
-

[~kirktrue] Can I pick this up ? 

> Ensure ClassicKafkaConsumer sends leave request on close even if interrupted
> 
>
> Key: KAFKA-17397
> URL: https://issues.apache.org/jira/browse/KAFKA-17397
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0, 3.9.0
>Reporter: Kirk True
>Priority: Major
>  Labels: integration-tests
>
> During testing for KAFKA-16985, a new, parameterized integration test was 
> added to {{PlaintextConsumerTest}} named 
> {{{}testCloseLeavesGroupOnInterrupt(){}}}. When the test is executed locally, 
> it passes using both the {{AsyncKafkaConsumer}} and the 
> {{{}ClassicKafkaConsumer{}}}. However, when the test is run in the Apache CI 
> environment, it passes for the {{AsyncKafkaConsumer}} but fails for the 
> {{{}ClassicKafkaConsumer{}}}. Rather than hold up KAFKA-16985, this Jira was 
> filed to investigate and fix the {{{}ClassicKafkaConsumer{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17062: handle dangling "copy_segment_start" state when deleting remote logs [kafka]

2024-08-25 Thread via GitHub


showuon commented on code in PR #16959:
URL: https://github.com/apache/kafka/pull/16959#discussion_r1730655484


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1254,6 +1255,20 @@ void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionE
 canProcess = false;
 continue;
 }
+
+if 
(RemoteLogSegmentState.COPY_SEGMENT_STARTED.equals(metadata.state())) {
+// get the current segment state here to avoid the 
race condition that before the loop, it's under copying process,
+// but then completed. In this case, 
segmentIdsBeingCopied will not contain this id, so we might
+// delete this segment unexpectedly.
+Optional curMetadata = 
remoteLogMetadataManager.remoteLogSegmentMetadata(

Review Comment:
   > Instead of deleting the dangling segments in the same iteration, Can we 
note down the dangling segments in the current iteration and delete them in the 
next iteration of cleanupExpiredRemoteLogSegments?
   
   This is a good suggestion! Let me update the PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >