Re: [PR] KAFKA-18129: Simplifying share partition maybeInitialize code [kafka]

2024-12-08 Thread via GitHub


apoorvmittal10 commented on code in PR #18093:
URL: https://github.com/apache/kafka/pull/18093#discussion_r1874735278


##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -371,52 +370,35 @@ public static RecordState forId(byte id) {
  */
 public CompletableFuture maybeInitialize() {
 log.debug("Maybe initialize share partition: {}-{}", groupId, 
topicIdPartition);
-CompletableFuture future = new CompletableFuture<>();
-AtomicReference> futureException = new 
AtomicReference<>(Optional.empty());
 // Check if the share partition is already initialized.
-InitializationResult initializationResult = 
checkInitializationCompletion();
-if (initializationResult.isComplete()) {
-if (initializationResult.throwable() != null) {
-future.completeExceptionally(initializationResult.throwable());
-} else {
-future.complete(null);
+try {
+if (initializedOrThrowException()) {
+return CompletableFuture.completedFuture(null);
 }
-return future;
+} catch (Exception e) {
+return CompletableFuture.failedFuture(e);
 }
 
+// If code reaches here then the share partition is not initialized. 
Initialize the share partition.
 // All the pending requests should wait to get completed before the 
share partition is initialized.
 // Attain lock to avoid any concurrent requests to be processed.
 lock.writeLock().lock();
-boolean shouldFutureBeCompleted = false;
 try {
 // Re-check the state to verify if previous requests has already 
initialized the share partition.
-initializationResult = checkInitializationCompletion();
-if (initializationResult.isComplete()) {
-if (initializationResult.throwable() != null) {
-
futureException.set(Optional.of(initializationResult.throwable()));
-}
-shouldFutureBeCompleted = true;
-return future;
+if (initializedOrThrowException()) {
+return CompletableFuture.completedFuture(null);

Review Comment:
   Your suggestion is better hence I incorporated that.
   
   However I have tested the earlier code as well and as java specification 
says that without completion of `finally` block the method will not return 
result from `try`. Unlike earlier, in the problematic code,  where we were 
completing the `future` in `try`, but now it was a `return` statement.
   
   Below is a test and output.
   ```
   public CompletableFuture testFuture() {
   lock.writeLock().lock();
   try {
   System.out.println("testFuture");
   return CompletableFuture.completedFuture(null);
   } finally {
   System.out.println("testFuture finally");
   lock.writeLock().unlock();
   System.out.println("testFuture finally unlocked");
   }
   }
   ```
   
   ```
   @RepeatedTest(100)
   public void testFuture() {
   SharePartition sharePartition = 
SharePartitionBuilder.builder().build();
   sharePartition.testFuture().whenComplete((result, exception) -> {
   System.out.println("Completed");
   });
   }
   ```
   
   ```
   testFuture
   testFuture finally
   testFuture finally unlocked
   Completed
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18129: Simplifying share partition maybeInitialize code [kafka]

2024-12-08 Thread via GitHub


apoorvmittal10 commented on code in PR #18093:
URL: https://github.com/apache/kafka/pull/18093#discussion_r1874735359


##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -426,55 +408,50 @@ public CompletableFuture maybeInitialize() {
 .build())
 .build()
 ).whenComplete((result, exception) -> {
+Throwable throwable = null;
 lock.writeLock().lock();
 try {
 if (exception != null) {
 log.error("Failed to initialize the share partition: 
{}-{}", groupId, topicIdPartition, exception);
-completeInitializationWithException();
-futureException.set(Optional.of(exception));
+throwable = exception;
 return;
 }
 
 if (result == null || result.topicsData() == null || 
result.topicsData().size() != 1) {
 log.error("Failed to initialize the share partition: 
{}-{}. Invalid state found: {}.",
 groupId, topicIdPartition, result);
-completeInitializationWithException();
-futureException.set(Optional.of(new 
IllegalStateException(String.format("Failed to initialize the share partition 
%s-%s", groupId, topicIdPartition;
+throwable = new 
IllegalStateException(String.format("Failed to initialize the share partition 
%s-%s", groupId, topicIdPartition));
 return;
 }
 
 TopicData state = result.topicsData().get(0);
 if (state.topicId() != topicIdPartition.topicId() || 
state.partitions().size() != 1) {
 log.error("Failed to initialize the share partition: 
{}-{}. Invalid topic partition response: {}.",
 groupId, topicIdPartition, result);
-completeInitializationWithException();
-futureException.set(Optional.of(new 
IllegalStateException(String.format("Failed to initialize the share partition 
%s-%s", groupId, topicIdPartition;
+throwable = new 
IllegalStateException(String.format("Failed to initialize the share partition 
%s-%s", groupId, topicIdPartition));
 return;
 }
 
 PartitionAllData partitionData = state.partitions().get(0);
 if (partitionData.partition() != topicIdPartition.partition()) 
{
 log.error("Failed to initialize the share partition: 
{}-{}. Invalid partition response: {}.",
 groupId, topicIdPartition, partitionData);
-completeInitializationWithException();
-futureException.set(Optional.of(new 
IllegalStateException(String.format("Failed to initialize the share partition 
%s-%s", groupId, topicIdPartition;
+throwable = new 
IllegalStateException(String.format("Failed to initialize the share partition 
%s-%s", groupId, topicIdPartition));
 return;
 }
 
 if (partitionData.errorCode() != Errors.NONE.code()) {
 KafkaException ex = 
fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
 log.error("Failed to initialize the share partition: 
{}-{}. Exception occurred: {}.",
 groupId, topicIdPartition, partitionData);
-completeInitializationWithException();
-futureException.set(Optional.of(ex));
+throwable = ex;
 return;
 }
 
 try {
 startOffset = 
startOffsetDuringInitialization(partitionData.startOffset());
 } catch (Exception e) {

Review Comment:
   Agreed, done.



##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -426,55 +408,50 @@ public CompletableFuture maybeInitialize() {
 .build())
 .build()
 ).whenComplete((result, exception) -> {
+Throwable throwable = null;
 lock.writeLock().lock();
 try {
 if (exception != null) {
 log.error("Failed to initialize the share partition: 
{}-{}", groupId, topicIdPartition, exception);
-completeInitializationWithException();
-futureException.set(Optional.of(exception));
+throwable = exception;
 return;
 }
 
 if (result == null || result.topicsData() == null || 
result.topicsData().size() != 1) {
 log.error("Failed to initialize the share partition: 
{}-{}. Invalid state found: {}.",
 groupId, topicIdPartition, result);
-completeInitializationWithException();
-

[PR] KAFKA-18021: Disabled MirrorCheckpointConnector throws RetriableException on task config generation [kafka]

2024-12-08 Thread via GitHub


frankvicky opened a new pull request, #18098:
URL: https://github.com/apache/kafka/pull/18098

   JIRA: KAFKA-18021
   
   When a `MirrorCheckpointConnector` is disabled, the `start` method exits 
early without initiating `loadInitialConsumerGroups`. 
   If the connector is restarted in a disabled state, `taskConfigs` 
continuously throws `RetriableException` as `loadInitialConsumerGroups` never 
completes. 
   This prevents emitting an empty set of task configs to stop running tasks, 
leaving previously active `MirrorCheckpointTasks` enabled and making it 
impossible to fully disable them.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18021: Disabled MirrorCheckpointConnector throws RetriableException on task config generation [kafka]

2024-12-08 Thread via GitHub


frankvicky commented on PR #18098:
URL: https://github.com/apache/kafka/pull/18098#issuecomment-2525655979

   Hi @gharris1727 
   Could you please take a look when you have some time?
   Many thanks 🙇🏼 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18156: VerifiableConsumer should ignore "--session-timeout" when using CONSUMER protocol [kafka]

2024-12-08 Thread via GitHub


brandboat commented on code in PR #18036:
URL: https://github.com/apache/kafka/pull/18036#discussion_r1874795473


##
tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java:
##
@@ -649,8 +648,12 @@ public static VerifiableConsumer 
createFromArgs(ArgumentParser parser, String[]
 if (groupRemoteAssignor != null)
 consumerProps.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, 
groupRemoteAssignor);
 } else {
-// This means we're using the old consumer group protocol.
+// This means we're using the CLASSIC consumer group protocol.
 
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
res.getString("assignmentStrategy"));
+Integer sessionTimeout = res.getInt("sessionTimeout");

Review Comment:
   > it is fine to throw exception if user does set the --session-timeout
   
   Pardon me, did you mean throw exception if user set session timeout when 
using CONSUMER group.protocol ? Not sure if I understand correctly 🙏 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-18183) ClusterInstance's helper should use byte array instead of Bytes in creating producer/consumer

2024-12-08 Thread Logan Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-18183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17903860#comment-17903860
 ] 

Logan Zhu commented on KAFKA-18183:
---

Sorry, [~suresh7]. I’m already working on this.

 

> ClusterInstance's helper should use byte array instead of Bytes in creating 
> producer/consumer
> -
>
> Key: KAFKA-18183
> URL: https://issues.apache.org/jira/browse/KAFKA-18183
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Logan Zhu
>Priority: Trivial
>
> https://github.com/apache/kafka/blob/trunk/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java#L162
> https://github.com/apache/kafka/blob/trunk/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java#L174
> byte array is more common than Bytes



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-18135: ShareConsumer HB UnsupportedVersion msg mixed with Consumer HB [kafka]

2024-12-08 Thread via GitHub


peterxcli opened a new pull request, #18101:
URL: https://github.com/apache/kafka/pull/18101

   UnsupportedVersion error is handled in the parent 
AbstractHeartbeatRequestManager, so used by consumer and share consumer. If a 
share consumer gets the errors, it will end up with a msg that is currently 
specific to consumer
   
   
https://github.com/apache/kafka/blob/6fd951a9c0aa773060cd6bbf8a8b8c47ee9d2965/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java#L384-L386
   
   Handle the UnsupportedVersion separately in the existing 
   handleSpecificError (note that the unsupported version for consumer may also 
end up containing a msg specific to SubscriptionPattern not supported in HB v0, 
if regex is used without the required v1)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17811: Separate modules to use different JDKs [kafka]

2024-12-08 Thread via GitHub


chia7712 commented on PR #17522:
URL: https://github.com/apache/kafka/pull/17522#issuecomment-2526275233

   open [KAFKA-18186](https://issues.apache.org/jira/browse/KAFKA-18186) to 
address this.
   
   @ableegoldman please feel free to share your thoughts on this approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] KIP-891: Connect Multiversioning Support (Configs and Validation changes for Connectors and Converters) [kafka]

2024-12-08 Thread via GitHub


gharris1727 commented on code in PR #17741:
URL: https://github.com/apache/kafka/pull/17741#discussion_r1874983220


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import 
org.apache.kafka.connect.runtime.isolation.VersionedPluginLoadingException;
+import org.apache.maven.artifact.versioning.VersionRange;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CachedConnectors {
+
+private final static String LATEST_VERSION = "latest";
+
+private final Map> connectors;
+private final Map invalidConnectors;
+private final Map> invalidVersions;
+private final Plugins plugins;
+
+public CachedConnectors(Plugins plugins) {
+this.plugins = plugins;
+this.connectors = new ConcurrentHashMap<>();
+this.invalidConnectors = new ConcurrentHashMap<>();
+this.invalidVersions = new ConcurrentHashMap<>();
+}
+
+private void validate(String connectorName, VersionRange range) throws 
Exception {
+if (invalidConnectors.containsKey(connectorName)) {
+throw invalidConnectors.get(connectorName);

Review Comment:
   Rethrowing the same exception can cause some strange results, because the 
exception's stacktrace gets computed only once.
   
   You can cache the exception from Plugins, and then create and throw a new 
exception on each method call.



##
.gitignore:
##
@@ -61,3 +61,6 @@ storage/kafka-tiered-storage/
 docker/test/report_*.html
 kafka.Kafka
 __pycache__
+/connect/runtime/src/main/java/org/apache/kafka/connect/testing/
+/connect/file/
+/connect/json/

Review Comment:
   I think these ignores include real code



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##
@@ -367,6 +371,13 @@ public Object newPlugin(String classOrAlias, VersionRange 
range) throws Versione
 return newPlugin(klass);
 }
 
+public  Object newPlugin(String classOrAlias, Class baseClass, 
VersionRange range) throws ClassNotFoundException {

Review Comment:
   This is a bad method addition:
   
   * The point of passing in a `Class` is in order to return a `T`, to avoid 
a `(T)` cast in the caller.
   * This method returns `Object` so it still requires a cast in the caller
   * It does a null check on version, which is already very reasonably handled 
within the rest of Plugins.
   * The method calls Utils.newInstance which instantiates the plugin with the 
wrong TCCL
   
   Either change the caller to use `newPlugin(String, VersionRange)` and 
perform the blind cast, or change this method to actually handle the 
casting/type safety and not call Utils.newInstance.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java:
##
@@ -70,20 +70,30 @@ public class WorkerConfig extends AbstractConfig {
 public static final String CLIENT_DNS_LOOKUP_CONFIG = 
CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
 public static final String CLIENT_DNS_LOOKUP_DOC = 
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC;
 
+public static final String PLUGIN_VERSION_SUFFIX = "version";

Review Comment:
   I checked the KIP, and the suffix should be `plugin.version`.
   
   But the connector is still written to use "version", is that a mistake or 
intentional? I would prefer it being consistent so we can use the same constant 
everywhere.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##
@@ -98,6 +118,12 @@ public class ConnectorConfig extends AbstractConfig {
 new InstantiableClassValidator()
 );
 
+public static final String VALUE_CONVERTER_VERSION_CONFIG = 
WorkerConfig.VALUE_CONVERTER_VERSION;
+private static final String VALUE_CONVERTER_VERSION_DEFAULT = null;
+private static final String VALUE_CONVERTER_VERSION_DOC = "Version of the 
value converter.";
+private static final String VALUE_CON

Re: [PR] KAFKA-16143: New JMX metrics for AsyncKafkaConsumer [kafka]

2024-12-08 Thread via GitHub


lianetm commented on code in PR #17199:
URL: https://github.com/apache/kafka/pull/17199#discussion_r1872002514


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1893,25 +1901,30 @@ private void subscribeInternal(Collection 
topics, Optional firstError = new AtomicReference<>();
 
-LinkedList events = new LinkedList<>();
-backgroundEventQueue.drainTo(events);
-
-for (BackgroundEvent event : events) {
-try {
-if (event instanceof CompletableEvent)
-backgroundEventReaper.add((CompletableEvent) event);
-
-backgroundEventProcessor.process(event);
-} catch (Throwable t) {
-KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
-
-if (!firstError.compareAndSet(null, e))
-log.warn("An error occurred when processing the background 
event: {}", e.getMessage(), e);
+List events = backgroundEventHandler.drainEvents();
+if (!events.isEmpty()) {
+long startMs = time.milliseconds();
+for (BackgroundEvent event : events) {
+
kafkaConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() - 
event.enqueuedMs());
+try {
+if (event instanceof CompletableEvent)
+backgroundEventReaper.add((CompletableEvent) event);
+
+backgroundEventProcessor.process(event);
+} catch (Throwable t) {
+KafkaException e = 
ConsumerUtils.maybeWrapAsKafkaException(t);
+
+if (!firstError.compareAndSet(null, e))
+log.warn("An error occurred when processing the 
background event: {}", e.getMessage(), e);
+}
 }
+
kafkaConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds()
 - startMs);
 }
 
 backgroundEventReaper.reap(time.milliseconds());

Review Comment:
   Interesting, and if we agree on what we want we could just send an update in 
the KIP email thread to add it to the KIP and here.
   
   To align internally first, I guess we would be interested in the num/avg of 
expired events, but we need to consider how that metric would go crazy and be a 
false alarm in cases like poll(0) right? Should we consider the expiration 
relevant only if there was a non-zero timeout? Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16143: New JMX metrics for AsyncKafkaConsumer [kafka]

2024-12-08 Thread via GitHub


lianetm commented on code in PR #17199:
URL: https://github.com/apache/kafka/pull/17199#discussion_r1875064838


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1893,25 +1901,30 @@ private void subscribeInternal(Collection 
topics, Optional firstError = new AtomicReference<>();
 
-LinkedList events = new LinkedList<>();
-backgroundEventQueue.drainTo(events);
-
-for (BackgroundEvent event : events) {
-try {
-if (event instanceof CompletableEvent)
-backgroundEventReaper.add((CompletableEvent) event);
-
-backgroundEventProcessor.process(event);
-} catch (Throwable t) {
-KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
-
-if (!firstError.compareAndSet(null, e))
-log.warn("An error occurred when processing the background 
event: {}", e.getMessage(), e);
+List events = backgroundEventHandler.drainEvents();
+if (!events.isEmpty()) {
+long startMs = time.milliseconds();
+for (BackgroundEvent event : events) {
+
kafkaConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() - 
event.enqueuedMs());
+try {
+if (event instanceof CompletableEvent)
+backgroundEventReaper.add((CompletableEvent) event);
+
+backgroundEventProcessor.process(event);
+} catch (Throwable t) {
+KafkaException e = 
ConsumerUtils.maybeWrapAsKafkaException(t);
+
+if (!firstError.compareAndSet(null, e))
+log.warn("An error occurred when processing the 
background event: {}", e.getMessage(), e);
+}
 }
+
kafkaConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds()
 - startMs);
 }
 
 backgroundEventReaper.reap(time.milliseconds());

Review Comment:
   Agree that it's not applicable for background events because this callback 
needed is the only `CompletableBackgroundEvent`, and it's intentionally not 
expired (I don't think we need to change that)
   
   I would say that what might be interesting to know is expiration of 
Application events. 
   
https://github.com/apache/kafka/blob/ee4264439ddda7bdebcaa845752b824abba14161/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L184
   
   There we do have lots of events with deadline, I guess that's what 
@AndrewJSchofield had in mind maybe? (I notice now that the initial comment was 
here on the reap of background events).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18164: Clear existing acknowledgements on share session epoch reset. [kafka]

2024-12-08 Thread via GitHub


AndrewJSchofield commented on code in PR #18063:
URL: https://github.com/apache/kafka/pull/18063#discussion_r1875029001


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##
@@ -990,16 +990,18 @@ UnsentRequest buildRequest() {
 }
 
 ShareAcknowledgeRequest.Builder requestBuilder = 
sessionHandler.newShareAcknowledgeBuilder(groupId, fetchConfig);

Review Comment:
   It seems to me that `ShareSessionHandler.addPartitionToFetch` saves 
partition and acknowledgement state into the handler to be built by the request 
builder. In the event that the request builder cannot be built, such as because 
a new share session needs to be created, the partition and acknowledgement 
state is still left in the share session. I worry that it is possible this 
could lead to an erroneous request in some situations when the session handler 
is used again when a new connection has been created. I think that it would be 
prudent to clear out `nextAcknowledgements` and `nextPartitions` in 
`ShareSessionHandler.newShareAcknowledgeBuilder` in the case where it returns 
null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Extract some codeblocks as methods to simplify readability [kafka]

2024-12-08 Thread via GitHub


AndrewJSchofield commented on code in PR #18017:
URL: https://github.com/apache/kafka/pull/18017#discussion_r1875032057


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -229,12 +229,8 @@ private Long buildRemoteLogAuxState(TopicPartition 
topicPartition,
 }
 
 RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, 
previousOffsetToLeaderLocalLogStartOffset)
-.orElseThrow(() -> new RemoteStorageException("Couldn't build 
the state from remote store for partition: " + topicPartition +
-", currentLeaderEpoch: " + currentLeaderEpoch +
-", leaderLocalLogStartOffset: " + 
leaderLocalLogStartOffset +
-", leaderLogStartOffset: " + leaderLogStartOffset +
-", epoch: " + targetEpoch +
-"as the previous remote log segment metadata was not 
found"));
+.orElseThrow(() -> buildRemoteStorageException(topicPartition, 
targetEpoch, currentLeaderEpoch,
+leaderLocalLogStartOffset, 
previousOffsetToLeaderLocalLogStartOffset));

Review Comment:
   It seems to me that the final argument here should be `leaderLogStartOffset` 
to match the previous behaviour.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18135: ShareConsumer HB UnsupportedVersion msg mixed with Consumer HB [kafka]

2024-12-08 Thread via GitHub


AndrewJSchofield commented on PR #18101:
URL: https://github.com/apache/kafka/pull/18101#issuecomment-2526360864

   We should get @lianetm to make sure she's happy too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18058: Share group state record pruning impl. [kafka]

2024-12-08 Thread via GitHub


AndrewJSchofield commented on PR #18014:
URL: https://github.com/apache/kafka/pull/18014#issuecomment-2526362182

   @smjn Please resolve the conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Extract some codeblocks as methods to simplify readability [kafka]

2024-12-08 Thread via GitHub


overpathz commented on code in PR #18017:
URL: https://github.com/apache/kafka/pull/18017#discussion_r1875034438


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -229,12 +229,8 @@ private Long buildRemoteLogAuxState(TopicPartition 
topicPartition,
 }
 
 RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, 
previousOffsetToLeaderLocalLogStartOffset)
-.orElseThrow(() -> new RemoteStorageException("Couldn't build 
the state from remote store for partition: " + topicPartition +
-", currentLeaderEpoch: " + currentLeaderEpoch +
-", leaderLocalLogStartOffset: " + 
leaderLocalLogStartOffset +
-", leaderLogStartOffset: " + leaderLogStartOffset +
-", epoch: " + targetEpoch +
-"as the previous remote log segment metadata was not 
found"));
+.orElseThrow(() -> buildRemoteStorageException(topicPartition, 
targetEpoch, currentLeaderEpoch,
+leaderLocalLogStartOffset, 
previousOffsetToLeaderLocalLogStartOffset));

Review Comment:
   Thx. Addressed in the recent commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18135: ShareConsumer HB UnsupportedVersion msg mixed with Consumer HB [kafka]

2024-12-08 Thread via GitHub


AndrewJSchofield commented on code in PR #18101:
URL: https://github.com/apache/kafka/pull/18101#discussion_r1875034076


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##
@@ -100,6 +100,9 @@ public abstract class AbstractHeartbeatRequestManager

Re: [PR] KAFKA-18014: Add duration based offset reset option for ShareConsumer [kafka]

2024-12-08 Thread via GitHub


AndrewJSchofield commented on code in PR #18096:
URL: https://github.com/apache/kafka/pull/18096#discussion_r1875037969


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ShareGroupAutoOffsetResetStrategy.java:
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.utils.Utils;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.Optional;
+
+public class ShareGroupAutoOffsetResetStrategy {
+public enum StrategyType {
+LATEST, EARLIEST, BY_DURATION;
+
+@Override
+public String toString() {
+return super.toString().toLowerCase(Locale.ROOT);
+}
+}
+
+public static final ShareGroupAutoOffsetResetStrategy EARLIEST = new 
ShareGroupAutoOffsetResetStrategy(StrategyType.EARLIEST);
+public static final ShareGroupAutoOffsetResetStrategy LATEST = new 
ShareGroupAutoOffsetResetStrategy(StrategyType.LATEST);
+
+private final StrategyType type;
+private final Optional duration;
+
+private ShareGroupAutoOffsetResetStrategy(StrategyType type) {
+this.type = type;
+this.duration = Optional.empty();
+}
+
+private ShareGroupAutoOffsetResetStrategy(Duration duration) {
+this.type = StrategyType.BY_DURATION;
+this.duration = Optional.of(duration);
+}
+
+/**
+ *  Returns the AutoOffsetResetStrategy from the given string.
+ */
+public static ShareGroupAutoOffsetResetStrategy fromString(String 
offsetStrategy) {
+if (offsetStrategy == null) {
+throw new IllegalArgumentException("Auto offset reset strategy is 
null");
+}
+
+if (StrategyType.BY_DURATION.toString().equals(offsetStrategy)) {
+throw new IllegalArgumentException("<:duration> part is missing in 
by_duration auto offset reset strategy.");
+}
+
+if 
(Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy)) 
{
+StrategyType type = 
StrategyType.valueOf(offsetStrategy.toUpperCase(Locale.ROOT));
+switch (type) {
+case EARLIEST:
+return EARLIEST;
+case LATEST:
+return LATEST;
+default:
+throw new IllegalArgumentException("Unknown auto offset 
reset strategy: " + offsetStrategy);
+}
+}
+
+if (offsetStrategy.startsWith(StrategyType.BY_DURATION + ":")) {
+String isoDuration = 
offsetStrategy.substring(StrategyType.BY_DURATION.toString().length() + 1);
+try {
+Duration duration = Duration.parse(isoDuration);
+if (duration.isNegative()) {
+throw new IllegalArgumentException("Negative duration is 
not supported in by_duration offset reset strategy.");
+}
+return new ShareGroupAutoOffsetResetStrategy(duration);
+} catch (Exception e) {
+throw new IllegalArgumentException("Unable to parse duration 
string in by_duration offset reset strategy.", e);
+}
+}
+
+throw new IllegalArgumentException("Unknown auto offset reset 
strategy: " + offsetStrategy);
+}
+
+/**
+ * Returns the offset reset strategy type.
+ */
+public StrategyType type() {
+return type;
+}
+
+/**
+ * Returns the name of the offset reset strategy.
+ */
+public String name() {
+return type.toString();
+}
+
+/**
+ * Return the timestamp to be used for the ListOffsetsRequest.
+ * @return the timestamp for the OffsetResetStrategy,
+ * if the strategy is EARLIEST or LATEST or duration is provided
+ * else return Optional.empty()
+ */
+public Optional timestamp() {
+if (type == StrategyType.EARLIEST)
+return Optional.of(ListOffsetsRequest.EARLI

Re: [PR] [WIP] KIP-891: Connect Multiversioning Support (Configs and Validation changes for Connectors and Converters) [kafka]

2024-12-08 Thread via GitHub


snehashisp commented on PR #17741:
URL: https://github.com/apache/kafka/pull/17741#issuecomment-2526364647

   Thanks for the review @gharris1727. Will get to work on the soon, in my 
daytime. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Extract some codeblocks as methods to simplify readability [kafka]

2024-12-08 Thread via GitHub


overpathz commented on code in PR #18017:
URL: https://github.com/apache/kafka/pull/18017#discussion_r1875039076


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -229,12 +229,8 @@ private Long buildRemoteLogAuxState(TopicPartition 
topicPartition,
 }
 
 RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, 
previousOffsetToLeaderLocalLogStartOffset)
-.orElseThrow(() -> new RemoteStorageException("Couldn't build 
the state from remote store for partition: " + topicPartition +
-", currentLeaderEpoch: " + currentLeaderEpoch +
-", leaderLocalLogStartOffset: " + 
leaderLocalLogStartOffset +
-", leaderLogStartOffset: " + leaderLogStartOffset +
-", epoch: " + targetEpoch +
-"as the previous remote log segment metadata was not 
found"));
+.orElseThrow(() -> buildRemoteStorageException(topicPartition, 
targetEpoch, currentLeaderEpoch,
+leaderLocalLogStartOffset, 
previousOffsetToLeaderLocalLogStartOffset));

Review Comment:
   @AndrewJSchofield Additional ping for possible notification miss



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-18186) add sourceCompatibility back to build.gradle to allow idea to configure suitable language level automatically

2024-12-08 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-18186:
--

Assignee: 黃竣陽  (was: Chia-Ping Tsai)

> add sourceCompatibility back to build.gradle to allow idea to configure 
> suitable language level automatically
> -
>
> Key: KAFKA-18186
> URL: https://issues.apache.org/jira/browse/KAFKA-18186
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Major
>
> see my comment https://github.com/apache/kafka/pull/17522



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-18186) add sourceCompatibility back to build.gradle to allow idea to configure suitable language level automatically

2024-12-08 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-18186:
---
Parent: KAFKA-16096
Issue Type: Sub-task  (was: Improvement)

> add sourceCompatibility back to build.gradle to allow idea to configure 
> suitable language level automatically
> -
>
> Key: KAFKA-18186
> URL: https://issues.apache.org/jira/browse/KAFKA-18186
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Major
>
> see my comment https://github.com/apache/kafka/pull/17522



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18058: Share group state record pruning impl. [kafka]

2024-12-08 Thread via GitHub


smjn commented on PR #18014:
URL: https://github.com/apache/kafka/pull/18014#issuecomment-2526373284

   > @smjn Please resolve the conflicts.
   
   @AndrewJSchofield done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-18186) add sourceCompatibility back to build.gradle to allow idea to configure suitable language level automatically

2024-12-08 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-18186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17903938#comment-17903938
 ] 

黃竣陽 commented on KAFKA-18186:
-

Hello, [~chia7712] , if you wont work on this, may I take the issue? Thank you.

> add sourceCompatibility back to build.gradle to allow idea to configure 
> suitable language level automatically
> -
>
> Key: KAFKA-18186
> URL: https://issues.apache.org/jira/browse/KAFKA-18186
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> see my comment https://github.com/apache/kafka/pull/17522



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-18186) add sourceCompatibility back to build.gradle to allow idea to configure suitable language level automatically

2024-12-08 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-18186:
--

 Summary: add sourceCompatibility back to build.gradle to allow 
idea to configure suitable language level automatically
 Key: KAFKA-18186
 URL: https://issues.apache.org/jira/browse/KAFKA-18186
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


see my comment https://github.com/apache/kafka/pull/17522



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17811: Separate modules to use different JDKs [kafka]

2024-12-08 Thread via GitHub


ijuma commented on PR #17522:
URL: https://github.com/apache/kafka/pull/17522#issuecomment-2526306797

   I see some confusion here and I'll try to clarify it.
   
   > Are you using JDK 11 to run the streams tests? If so, that could be an 
issue since the generator module requires JDK 17. I assume all Kafka developers 
should use JDK 17, as we typically build the entire project during development.
   
   We should make sure it's possible to run tests with Java 11 for the modules 
that support it since it's possible to have issues at runtime that only affect 
a particular version.
   
   > We do indeed need to compile the project with 17, but we should recommend 
that anyone working on the clients (or any other modules that still support 11) 
at least set their IDE's language version to JDK11 to avoid accidentally using 
APIs that don't exist back in 11 (looking at you Optional#isPresent)
   
   The expected behavior is that the IDE & gradle plugin support the --release 
flag which has the correct behavior (it sets the appropriate source and binary 
versions and ensures the standard library signatures also match it).
   
   > I plan to add sourceCompatibility back to build.gradle since IntelliJ IDEA 
sets the language level based on sourceCompatibility by default ([Gradle 
Documentation](https://docs.gradle.org/current/dsl/org.gradle.plugins.ide.idea.model.IdeaModule.html#org.gradle.plugins.ide.idea.model.IdeaModule:languageLevel)).
   
   Are we sure the Gradle plugin doesn't handle this properly? If so, we should 
add the workaround, but also file a ticket with them. See the following for a 
similar situation for the Scala:
   
   https://github.com/apache/kafka/pull/13205#issuecomment-1445120790


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18129: Simplifying share partition maybeInitialize code [kafka]

2024-12-08 Thread via GitHub


chia7712 commented on code in PR #18093:
URL: https://github.com/apache/kafka/pull/18093#discussion_r1875006046


##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -371,52 +370,35 @@ public static RecordState forId(byte id) {
  */
 public CompletableFuture maybeInitialize() {
 log.debug("Maybe initialize share partition: {}-{}", groupId, 
topicIdPartition);
-CompletableFuture future = new CompletableFuture<>();
-AtomicReference> futureException = new 
AtomicReference<>(Optional.empty());
 // Check if the share partition is already initialized.
-InitializationResult initializationResult = 
checkInitializationCompletion();
-if (initializationResult.isComplete()) {
-if (initializationResult.throwable() != null) {
-future.completeExceptionally(initializationResult.throwable());
-} else {
-future.complete(null);
+try {
+if (initializedOrThrowException()) {
+return CompletableFuture.completedFuture(null);
 }
-return future;
+} catch (Exception e) {
+return CompletableFuture.failedFuture(e);
 }
 
+// If code reaches here then the share partition is not initialized. 
Initialize the share partition.
 // All the pending requests should wait to get completed before the 
share partition is initialized.
 // Attain lock to avoid any concurrent requests to be processed.
 lock.writeLock().lock();
-boolean shouldFutureBeCompleted = false;
 try {
 // Re-check the state to verify if previous requests has already 
initialized the share partition.
-initializationResult = checkInitializationCompletion();
-if (initializationResult.isComplete()) {
-if (initializationResult.throwable() != null) {
-
futureException.set(Optional.of(initializationResult.throwable()));
-}
-shouldFutureBeCompleted = true;
-return future;
+if (initializedOrThrowException()) {
+return CompletableFuture.completedFuture(null);

Review Comment:
   > now it was a return statement.
   
   Yes, you are right. I did not take account of the "completed" 
CompletableFuture because I was focused on the relationship between 
CompletableFuture and the write lock :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17811: Separate modules to use different JDKs [kafka]

2024-12-08 Thread via GitHub


chia7712 commented on PR #17522:
URL: https://github.com/apache/kafka/pull/17522#issuecomment-2526274179

   @ableegoldman Thanks for your feedback. I agree that we should prioritize 
making our developers' experience better. I plan to add sourceCompatibility 
back to build.gradle since IntelliJ IDEA sets the language level based on 
sourceCompatibility by default ([Gradle 
Documentation](https://docs.gradle.org/current/dsl/org.gradle.plugins.ide.idea.model.IdeaModule.html#org.gradle.plugins.ide.idea.model.IdeaModule:languageLevel)).
   
   The screenshot below demonstrates that IntelliJ IDEA can automatically 
highlight unsupported usages. With this change, developers won't need to 
manually configure the language level.
   ![Screenshot From 2024-12-09 
01-23-49](https://github.com/user-attachments/assets/3affc9cb-410b-4b5c-b312-7d9e5b729686)
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-18101) Merge duplicate assertFutureThrows and assertFutureExceptionTypeEquals

2024-12-08 Thread Peter Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Lee resolved KAFKA-18101.
---
Resolution: Done

> Merge duplicate assertFutureThrows and assertFutureExceptionTypeEquals
> --
>
> Key: KAFKA-18101
> URL: https://issues.apache.org/jira/browse/KAFKA-18101
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Deng Ziming
>Assignee: Peter Lee
>Priority: Minor
>  Labels: new-bie
>
> We have 2 similar method for validate test exception: 
> org.apache.kafka.test.TestUtils.assertFutureThrows
> kafka.utils.TestUtils.assertFutureExceptionTypeEquals
> Lets merge them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13560) Load indexes and data in async manner in the critical path of replica fetcher threads.

2024-12-08 Thread Peter Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Lee reassigned KAFKA-13560:
-

Assignee: Peter Lee

> Load indexes and data in async manner in the critical path of replica fetcher 
> threads. 
> ---
>
> Key: KAFKA-13560
> URL: https://issues.apache.org/jira/browse/KAFKA-13560
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Peter Lee
>Priority: Major
> Fix For: 4.0.0
>
>
> https://github.com/apache/kafka/pull/11390#discussion_r762366976
> https://github.com/apache/kafka/pull/11390#discussion_r1033141283
> https://github.com/apache/kafka/pull/15690 removed the below method from in 
> `TierStateMachine` interface. This can be added back when we implement the 
> functionality required to address this issue. 
> {code:java}
> public Optional maybeAdvanceState(TopicPartition 
> topicPartition, PartitionFetchState currentFetchState)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18129: Simplifying share partition maybeInitialize code [kafka]

2024-12-08 Thread via GitHub


chia7712 merged PR #18093:
URL: https://github.com/apache/kafka/pull/18093


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18135: ShareConsumer HB UnsupportedVersion msg mixed with Consumer HB [kafka]

2024-12-08 Thread via GitHub


lianetm commented on code in PR #18101:
URL: https://github.com/apache/kafka/pull/18101#discussion_r1875074269


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java:
##
@@ -97,6 +99,18 @@ public boolean handleSpecificError(final 
ConsumerGroupHeartbeatResponse response
 boolean errorHandled;
 
 switch (error) {
+case UNSUPPORTED_VERSION:
+// Handle consumer-specific unsupported version error
+String message = CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG;
+if (errorMessage.contains(REGEX_RESOLUTION_NOT_SUPPORTED_MSG)) 
{
+// If the error is about regex subscription, use the 
original error message
+message = errorMessage;
+}

Review Comment:
   I would expect we don't need this here. This regex error is generated on the 
client when building the request (it generates a response with exception) so 
the handling lands on the `onFailure` path (not on the onResponse path that 
uses this `handleSpeficifError`). 
   
   I would expect we just need to have here the same handling we had  for the 
UnsupportedVersion in the parent class before this PR:
   
https://github.com/apache/kafka/blob/ee4264439ddda7bdebcaa845752b824abba14161/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java#L411-L412
   
   That being said, we do need to ensure that we handle responses for each 
consumer, to cover this bit that is now on the abstract mgr (so it could 
wrongly apply a consumer msg to the share consumer):
   
   
https://github.com/apache/kafka/blob/ee4264439ddda7bdebcaa845752b824abba14161/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java#L322-L325
   
   Both consumers need to ensure they propagate their specific msgs when that 
exception is in a response.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##
@@ -408,8 +411,10 @@ private void onErrorResponse(final R response, final long 
currentTimeMs) {
 // Broker responded with HB not supported, meaning the new 
protocol is not enabled, so propagate
 // custom message for it. Note that the case where the 
protocol is not supported at all should fail
 // on the client side when building the request and checking 
supporting APIs (handled on onFailure).
-logger.error("{} failed due to {}: {}", 
heartbeatRequestName(), error, errorMessage);
-
handleFatalFailure(error.exception(CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG));
+if (!handleSpecificError(response, currentTimeMs)) {
+logger.error("{} failed due to {}: {}", 
heartbeatRequestName(), error, errorMessage);
+handleFatalFailure(error.exception(errorMessage));
+}

Review Comment:
   I wonder if it would be simpler to remove the UnsupportedVersion handling 
from here completely? In the end we're duplicating the logic we already have in 
the `default` case further down right? We would only need the logic in the 
Share/Consumer managers, simply because even if the error is not specific the 
handling is. Thoughts? 



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##
@@ -100,6 +100,9 @@ public abstract class AbstractHeartbeatRequestManager

Re: [PR] KAFKA-18015: Add support for duration based offset reset strategy to Kafka Streams [kafka]

2024-12-08 Thread via GitHub


mjsax commented on code in PR #17973:
URL: https://github.com/apache/kafka/pull/17973#discussion_r1875208289


##
streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * Sets the {@code auto.offset.reset} configuration when
+ * {@link #addSource(AutoOffsetReset, String, String...) adding a source 
processor} or when creating {@link KStream}
+ * or {@link KTable} via {@link StreamsBuilder}.
+ */
+public class AutoOffsetReset {

Review Comment:
   The newly added class for the consumer is internal, and not user facing, and 
thus, it should not be relevant that it exist (the current public enum get 
deprecated with this KIP, and the PR you linked to).
   
   I would assume, that SharConsumer won't have a public class either, similar 
to the consumer? \cc @AndrewJSchofield 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17696 New consumer background operations unaware of metadata errors [kafka]

2024-12-08 Thread via GitHub


lianetm commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1875198457


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -325,4 +333,21 @@ void cleanup() {
 log.debug("Closed the consumer network thread");
 }
 }
+
+/**
+ * If there is a metadata error, complete all uncompleted events that 
require subscription metadata.
+ */
+private void maybeFailOnMetadataError(List> events) {
+List> 
subscriptionMetadataEvent = events.stream()
+.filter(e -> e instanceof CompletableApplicationEvent)
+.map(e -> (CompletableApplicationEvent) e)
+
.filter(CompletableApplicationEvent::requireSubscriptionMetadata)
+.collect(Collectors.toUnmodifiableList());

Review Comment:
   nit: `.toList();`? (I believe it's unmodifiable too)



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -150,7 +155,11 @@ private void maybePropagateMetadataError() {
 try {
 metadata.maybeThrowAnyException();

Review Comment:
   ok, we're saying it gets cleared as soon as it's propagated (to ensure that 
it's indeed propagated), but we could still get an exception from a previously 
sent request (this was my concern). But that's what we have with the classic 
consumer actually, so it's consistent (there are actually tests specifically 
covering the behaviour considering that)
   
https://github.com/apache/kafka/blob/ee4264439ddda7bdebcaa845752b824abba14161/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala#L553-L554
  



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -212,9 +213,26 @@ public void testPropagateMetadataError() {
 AuthenticationException authException = new 
AuthenticationException("Test Auth Exception");
 doThrow(authException).when(metadata).maybeThrowAnyException();
 
+NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate(false);
+assertTrue(networkClientDelegate.getAndClearMetadataError().isEmpty());
+networkClientDelegate.poll(0, time.milliseconds());
+
+networkClientDelegate.getAndClearMetadataError().ifPresent(

Review Comment:
   if the metadata error is not present this won't fail right, so should we 
check it's present? (and then check the value)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17696 New consumer background operations unaware of metadata errors [kafka]

2024-12-08 Thread via GitHub


lianetm commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1875210056


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -150,7 +155,11 @@ private void maybePropagateMetadataError() {
 try {
 metadata.maybeThrowAnyException();

Review Comment:
   ok, we're saying it gets cleared as soon as it's propagated (to ensure that 
it's indeed propagated), but we could still get an exception from a previously 
sent request (this was my concern). Anyways that's actually what we have with 
the classic consumer, so it's consistent (there are actually tests specifically 
covering the behaviour considering that). 
   
https://github.com/apache/kafka/blob/ee4264439ddda7bdebcaa845752b824abba14161/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala#L553-L554
  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17696 New consumer background operations unaware of metadata errors [kafka]

2024-12-08 Thread via GitHub


lianetm commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1875210056


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -150,7 +155,11 @@ private void maybePropagateMetadataError() {
 try {
 metadata.maybeThrowAnyException();

Review Comment:
   ok, we're saying it gets cleared as soon as it's propagated (to ensure that 
it's indeed propagated), and we could still get an exception from a previously 
sent request (this was my concern). Anyways that's actually what we have with 
the classic consumer, so it's consistent (there are actually tests specifically 
covering the behaviour considering that). 
   
https://github.com/apache/kafka/blob/ee4264439ddda7bdebcaa845752b824abba14161/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala#L553-L554
  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18015: Add support for duration based offset reset strategy to Kafka Streams [kafka]

2024-12-08 Thread via GitHub


mjsax commented on code in PR #17973:
URL: https://github.com/apache/kafka/pull/17973#discussion_r1875208289


##
streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * Sets the {@code auto.offset.reset} configuration when
+ * {@link #addSource(AutoOffsetReset, String, String...) adding a source 
processor} or when creating {@link KStream}
+ * or {@link KTable} via {@link StreamsBuilder}.
+ */
+public class AutoOffsetReset {

Review Comment:
   The newly added class for the consumer is internal, and not user facing, and 
thus, it should not be relevant that it exist (the current public enum get 
deprecated with this KIP, and the PR you linked to).
   
   I would assume, that SharConsumer won't have a public class either, similar 
to the consumer? \cc @AndrewJSchofield 
   
   Given that the reset strategy is exactly the same between a plain consumer 
and Kafka Streams, it would seems odd to me o name it `StreamsXxx`... Also, 
that the class is part of KS is clear from the package name.
   
   In general, discussion like this should be part of the KIP, and we should 
not change accepted KIPs w/o a strong reason. If you want, feel free to go back 
to the dev mailing list thread about the KIP and re-start the discussion there. 
It should be be part of the PR review to re-discuss a KIP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17696 New consumer background operations unaware of metadata errors [kafka]

2024-12-08 Thread via GitHub


lianetm commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1875210056


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -150,7 +155,11 @@ private void maybePropagateMetadataError() {
 try {
 metadata.maybeThrowAnyException();

Review Comment:
   ok, we're saying it gets cleared as soon as it's propagated (to ensure that 
it's indeed propagated), and we could still get an exception from a previously 
sent request (this was my concern). Anyways that's actually what we have with 
the classic consumer, so it's consistent (there are actually tests specifically 
covering the behaviour considering that). 
   
https://github.com/apache/kafka/blob/ee4264439ddda7bdebcaa845752b824abba14161/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala#L553-L554
  
   
   Ok with me, I wonder if there something we could improve there for both but 
no need to block this as it's the current behaviour. I'll file a jira if I come 
up with clear thoughts about it. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-18184:Remove the unnecessary project path check from build.gradle [kafka]

2024-12-08 Thread via GitHub


Rancho-7 opened a new pull request, #18102:
URL: https://github.com/apache/kafka/pull/18102

   jira:https://issues.apache.org/jira/browse/KAFKA-18184
   
   In the build.gradle file, the project path is not initialized, so the 
project.path should always refer to the root path ':'
   
   The condition:
   `if (!project.path.startsWith(":connect") && 
!project.path.startsWith(":storage"))`
   will always evaluate to true, so this check is unnecessary and can be 
removed.
   
   If we print the parameter `project.path`
   ```
   println "Project path is '${project.path}'"
   if (!project.path.startsWith(":connect") && 
!project.path.startsWith(":storage"))
 options.compilerArgs << "-Xlint:-rawtypes"
   ```
   
   
   we can see outputs like the following:
   
   > Starting Gradle Daemon...
   > Connected to the target VM, address: '127.0.0.1:55024', transport: 'socket'
   > Gradle Daemon started in 1 s 333 ms
   > 
   >  Configure project :
   > Starting build with version 4.0.0-SNAPSHOT (commit id 104fa579) using 
Gradle 8.10.2, Java 23 and Scala 2.13.15
   > Build properties: ignoreFailures=false, maxParallelForks=10, 
maxScalacThreads=8, maxTestRetries=0
   > Project path is ':'
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17696 New consumer background operations unaware of metadata errors [kafka]

2024-12-08 Thread via GitHub


lianetm commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1875232483


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -212,9 +213,27 @@ public void testPropagateMetadataError() {
 AuthenticationException authException = new 
AuthenticationException("Test Auth Exception");
 doThrow(authException).when(metadata).maybeThrowAnyException();
 
+NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate(false);
+assertTrue(networkClientDelegate.getAndClearMetadataError().isEmpty());
+networkClientDelegate.poll(0, time.milliseconds());
+
+
assertTrue(networkClientDelegate.getAndClearMetadataError().isPresent());

Review Comment:
   this will clear the error so the assertions within the ifPresent will never 
run right? (maybe we take the Optional result of .getAndClear, check that 
optional is present, and then the 2 asserts on the value?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17696 New consumer background operations unaware of metadata errors [kafka]

2024-12-08 Thread via GitHub


m1a2st commented on code in PR #17440:
URL: https://github.com/apache/kafka/pull/17440#discussion_r1875248156


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java:
##
@@ -212,9 +213,27 @@ public void testPropagateMetadataError() {
 AuthenticationException authException = new 
AuthenticationException("Test Auth Exception");
 doThrow(authException).when(metadata).maybeThrowAnyException();
 
+NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate(false);
+assertTrue(networkClientDelegate.getAndClearMetadataError().isEmpty());
+networkClientDelegate.poll(0, time.milliseconds());
+
+
assertTrue(networkClientDelegate.getAndClearMetadataError().isPresent());

Review Comment:
   You are right, I missed that we will clear the metadata error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18015: Add support for duration based offset reset strategy to Kafka Streams [kafka]

2024-12-08 Thread via GitHub


mjsax commented on code in PR #17973:
URL: https://github.com/apache/kafka/pull/17973#discussion_r1875208289


##
streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * Sets the {@code auto.offset.reset} configuration when
+ * {@link #addSource(AutoOffsetReset, String, String...) adding a source 
processor} or when creating {@link KStream}
+ * or {@link KTable} via {@link StreamsBuilder}.
+ */
+public class AutoOffsetReset {

Review Comment:
   The newly added class for the consumer is internal, and not user facing, and 
thus, it should not be relevant that it exist (the current public enum get 
deprecated with this KIP, and the PR you linked to).
   
   I would assume, that SharConsumer won't have a public class either, similar 
to the consumer? \cc @AndrewJSchofield 
   
   Given that the reset strategy is exactly the same between a plain consumer 
and Kafka Streams, it would seems odd to me o name it `StreamsXxx`... Also, 
that the class is part of KS is clear from the package name.
   
   In general, discussion like this should be part of the KIP, and we should 
not change accepted KIPs w/o a strong reason. If you want, feel free to go back 
to the dev mailing list thread about the KIP and re-start the discussion there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18015: Add support for duration based offset reset strategy to Kafka Streams [kafka]

2024-12-08 Thread via GitHub


mjsax commented on code in PR #17973:
URL: https://github.com/apache/kafka/pull/17973#discussion_r1875212818


##
streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * Sets the {@code auto.offset.reset} configuration when
+ * {@link #addSource(AutoOffsetReset, String, String...) adding a source 
processor} or when creating {@link KStream}
+ * or {@link KTable} via {@link StreamsBuilder}.
+ */
+public class AutoOffsetReset {
+
+private enum OffsetResetType {

Review Comment:
   Maybe better `OffsetResetStrategy` ?



##
streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * Sets the {@code auto.offset.reset} configuration when
+ * {@link #addSource(AutoOffsetReset, String, String...) adding a source 
processor} or when creating {@link KStream}
+ * or {@link KTable} via {@link StreamsBuilder}.
+ */
+public class AutoOffsetReset {
+
+private enum OffsetResetType {
+LATEST,
+EARLIEST,
+BY_DURATION
+}
+
+private final OffsetResetType type;
+private final Optional duration;
+
+private AutoOffsetReset(OffsetResetType type, Optional duration) {

Review Comment:
   ```suggestion
   private AutoOffsetReset(final OffsetResetType type, final Optional 
duration) {
   ```



##
streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * Sets the {@code auto.offset.reset} configuration when
+ * {@link #addSource(AutoOffsetReset, String, String...) adding a source 
processor} or when creating {@link KStream}
+ * or {@link KTable} via {@link StreamsBuilder}.
+ */
+public class AutoOffsetReset {
+
+private enum OffsetResetType {
+LATEST,
+EARLIEST,
+BY_DURATION
+}
+
+private final OffsetResetType type;
+private final Optional duration;
+
+private AutoOffsetReset(OffsetResetType type, Optional duration) {
+this.type = type;
+this.duration = duration;
+}
+
+/**
+ * Creates an AutoOffsetReset instance representing "latest".

Review Comment:
   ```suggestion
* Creates an {@code AutoOffsetReset} instance representing "latest".
   ```



##

[jira] [Updated] (KAFKA-12843) KIP-740 follow up: clean up TaskMetadata

2024-12-08 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12843:

Parent: KAFKA-12822
Issue Type: Sub-task  (was: Task)

> KIP-740 follow up: clean up TaskMetadata
> 
>
> Key: KAFKA-12843
> URL: https://issues.apache.org/jira/browse/KAFKA-12843
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: João Pedro Fonseca
>Priority: Blocker
> Fix For: 4.0.0
>
>
> See 
> [KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557]
>  – for the TaskMetadata class, we need to:
>  # Deprecate the TaskMetadata#getTaskId method
>  # "Remove" the deprecated TaskMetadata#taskId method, then re-add a taskId() 
> API that returns a TaskId instead of a String
>  # Remove the deprecated constructor



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16339: [4/4 KStream#flatTransformValues] Remove Deprecated "transformer" methods and classes [kafka]

2024-12-08 Thread via GitHub


mjsax commented on PR #17882:
URL: https://github.com/apache/kafka/pull/17882#issuecomment-2526845764

   Thanks for the PR. Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18014: Add duration based offset reset option for ShareConsumer [kafka]

2024-12-08 Thread via GitHub


peterxcli commented on code in PR #18096:
URL: https://github.com/apache/kafka/pull/18096#discussion_r1875313096


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##
@@ -53,8 +50,14 @@ public final class GroupConfig extends AbstractConfig {
 public static final String SHARE_RECORD_LOCK_DURATION_MS_CONFIG = 
"share.record.lock.duration.ms";
 
 public static final String SHARE_AUTO_OFFSET_RESET_CONFIG = 
"share.auto.offset.reset";
-public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT = 
ShareGroupAutoOffsetReset.LATEST.toString();
-public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to 
initialize the share-partition start offset.";
+public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT = 
ShareGroupAutoOffsetResetStrategy.LATEST.name();
+public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to 
initialize the share-partition start offset. " +
+"earliest: automatically reset the offset to the earliest 
offset" +
+"latest: automatically reset the offset to the latest offset" 
+
+"by_duration:: automatically reset the offset to a 
configured  from the current timestamp. " +
+" must be specified in ISO8601 format (PnDTnHnMn.nS). " +
+"Negative duration is not allowed." +
+"anything else: throw exception to the share consumer.";

Review Comment:
   I think we should align with the documentation for consistency between this 
and `ConsumerConfig`:  
   
https://github.com/apache/kafka/blob/b9745b160cf7b2bc2a02b11c75fa86d9e9eaf5b4/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L172-L181
   
   However, I’m uncertain about whether we should include the following note at 
the end:  
   
   ```
   Note that altering partition numbers while setting this config to 
'latest' may cause message delivery loss since 
   producers could start sending messages to newly added partitions (i.e., no 
initial offsets exist yet) before consumers reset their offsets."
   ```
   
   What are your thoughts on this? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-12843) KIP-740 follow up: clean up TaskMetadata

2024-12-08 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-12843.
-
Resolution: Duplicate  (was: Fixed)

> KIP-740 follow up: clean up TaskMetadata
> 
>
> Key: KAFKA-12843
> URL: https://issues.apache.org/jira/browse/KAFKA-12843
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: João Pedro Fonseca
>Priority: Blocker
> Fix For: 4.0.0
>
>
> See 
> [KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557]
>  – for the TaskMetadata class, we need to:
>  # Deprecate the TaskMetadata#getTaskId method
>  # "Remove" the deprecated TaskMetadata#taskId method, then re-add a taskId() 
> API that returns a TaskId instead of a String
>  # Remove the deprecated constructor



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16339: [4/4 KStream#flatTransformValues] Remove Deprecated "transformer" methods and classes [kafka]

2024-12-08 Thread via GitHub


mjsax merged PR #17882:
URL: https://github.com/apache/kafka/pull/17882


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12843) KIP-740 follow up: clean up TaskMetadata

2024-12-08 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17904014#comment-17904014
 ] 

Matthias J. Sax commented on KAFKA-12843:
-

KIP-740 overlaps with KIP-744 which deprecate the old `procescor.TaskMetadata` 
class entirely (and it was already removed via KAFKA-16329)

> KIP-740 follow up: clean up TaskMetadata
> 
>
> Key: KAFKA-12843
> URL: https://issues.apache.org/jira/browse/KAFKA-12843
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: João Pedro Fonseca
>Priority: Blocker
> Fix For: 4.0.0
>
>
> See 
> [KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557]
>  – for the TaskMetadata class, we need to:
>  # Deprecate the TaskMetadata#getTaskId method
>  # "Remove" the deprecated TaskMetadata#taskId method, then re-add a taskId() 
> API that returns a TaskId instead of a String
>  # Remove the deprecated constructor



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-12829: Remove deprecated Topology#addProcessor of old Processor API [kafka]

2024-12-08 Thread via GitHub


mjsax commented on PR #17190:
URL: https://github.com/apache/kafka/pull/17190#issuecomment-2526857196

   Thanks for your understanding. And thanks for contributing!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: remove old procesor API MockInternalProcessorContext [kafka]

2024-12-08 Thread via GitHub


mjsax opened a new pull request, #18103:
URL: https://github.com/apache/kafka/pull/18103

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove old procesor API MockInternalProcessorContext [kafka]

2024-12-08 Thread via GitHub


mjsax commented on code in PR #18103:
URL: https://github.com/apache/kafka/pull/18103#discussion_r1875348859


##
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java:
##
@@ -79,17 +80,22 @@ static void withStore(final RocksDBStore store, final 
StateStoreContext context,
 @ParameterizedTest
 @MethodSource("stores")
 public void shouldRecordCorrectBlockCacheCapacity(final RocksDBStore 
store, final StateStoreContext ctx) {
-withStore(store, ctx, () ->

Review Comment:
   side cleanup



##
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java:
##
@@ -79,17 +80,22 @@ static void withStore(final RocksDBStore store, final 
StateStoreContext context,
 @ParameterizedTest
 @MethodSource("stores")
 public void shouldRecordCorrectBlockCacheCapacity(final RocksDBStore 
store, final StateStoreContext ctx) {
-withStore(store, ctx, () ->
-assertMetric(ctx, STATE_STORE_LEVEL_GROUP, 
RocksDBMetrics.CAPACITY_OF_BLOCK_CACHE, BigInteger.valueOf(50 * 1024 * 1024L)));
+withStore(
+store,
+ctx,
+() -> assertMetric(ctx, STATE_STORE_LEVEL_GROUP, 
RocksDBMetrics.CAPACITY_OF_BLOCK_CACHE, BigInteger.valueOf(50 * 1024 * 1024L))
+);
 }
 
 @ParameterizedTest
 @MethodSource("stores")
 public void shouldRecordCorrectBlockCacheUsage(final RocksDBStore store, 
final StateStoreContext ctx) {
 withStore(store, ctx, () -> {
 final BlockBasedTableConfigWithAccessibleCache tableFormatConfig = 
(BlockBasedTableConfigWithAccessibleCache) 
store.getOptions().tableFormatConfig();
-final long usage = tableFormatConfig.blockCache().getUsage();
-assertMetric(ctx, STATE_STORE_LEVEL_GROUP, 
RocksDBMetrics.USAGE_OF_BLOCK_CACHE, BigInteger.valueOf(usage));
+try (final Cache blockCache = tableFormatConfig.blockCache()) {
+final long usage = blockCache.getUsage();
+assertMetric(ctx, STATE_STORE_LEVEL_GROUP, 
RocksDBMetrics.USAGE_OF_BLOCK_CACHE, BigInteger.valueOf(usage));
+}

Review Comment:
   side cleanup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove old procesor API MockInternalProcessorContext [kafka]

2024-12-08 Thread via GitHub


mjsax commented on code in PR #18103:
URL: https://github.com/apache/kafka/pull/18103#discussion_r1875350982


##
streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java:
##
@@ -108,12 +112,12 @@ public void setHeaders(final Headers headers) {
 }
 
 @Override
-public void setCurrentNode(final ProcessorNode currentNode) {
+public void setCurrentNode(final ProcessorNode currentNode) {

Review Comment:
   side cleanup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove old procesor API MockInternalProcessorContext [kafka]

2024-12-08 Thread via GitHub


mjsax commented on code in PR #18103:
URL: https://github.com/apache/kafka/pull/18103#discussion_r1875348688


##
streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java:
##
@@ -1049,11 +1048,12 @@ private static void putRecord(final 
TimeOrderedKeyValueBuffer(key, new Change<>(value, null), 
0L), recordContext);
 }
 
+@SuppressWarnings("resource")

Review Comment:
   Side cleanup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove old procesor API MockInternalProcessorContext [kafka]

2024-12-08 Thread via GitHub


mjsax commented on code in PR #18103:
URL: https://github.com/apache/kafka/pull/18103#discussion_r1875350717


##
streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java:
##
@@ -41,12 +41,16 @@
 import 
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
 
 import java.io.File;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 
 public class MockInternalNewProcessorContext extends 
MockProcessorContext implements InternalProcessorContext {
 
-private ProcessorNode currentNode;
+private ProcessorNode currentNode;
+private RecordCollector recordCollector;

Review Comment:
   Add missing `recordCollector` "feature -- the old 
`MockInternalProcessorContext` that we remove with this PR had this, and the 
test we rewrite use this feature



##
streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java:
##
@@ -41,12 +41,16 @@
 import 
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
 
 import java.io.File;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 
 public class MockInternalNewProcessorContext extends 
MockProcessorContext implements InternalProcessorContext {
 
-private ProcessorNode currentNode;
+private ProcessorNode currentNode;
+private RecordCollector recordCollector;
+private final Map restoreCallbacks = new 
LinkedHashMap<>();

Review Comment:
   Same



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18164: Clear existing acknowledgements on share session epoch reset. [kafka]

2024-12-08 Thread via GitHub


ShivsundarR commented on code in PR #18063:
URL: https://github.com/apache/kafka/pull/18063#discussion_r1875353133


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##
@@ -990,16 +990,18 @@ UnsentRequest buildRequest() {
 }
 
 ShareAcknowledgeRequest.Builder requestBuilder = 
sessionHandler.newShareAcknowledgeBuilder(groupId, fetchConfig);

Review Comment:
   Yeah, right. The handler does not clear the acknowledgements which can be a 
problem when future requests are built. I have added code to clear the 
`nextPartitions` and `nextAcknowledgements` now and added a test for the same.
   Thanks for catching this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove old procesor API MockInternalProcessorContext [kafka]

2024-12-08 Thread via GitHub


mjsax commented on code in PR #18103:
URL: https://github.com/apache/kafka/pull/18103#discussion_r1875349000


##
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java:
##
@@ -98,11 +104,14 @@ public void shouldRecordCorrectBlockCacheUsage(final 
RocksDBStore store, final S
 public void shouldRecordCorrectBlockCachePinnedUsage(final RocksDBStore 
store, final StateStoreContext ctx) {
 withStore(store, ctx, () -> {
 final BlockBasedTableConfigWithAccessibleCache tableFormatConfig = 
(BlockBasedTableConfigWithAccessibleCache) 
store.getOptions().tableFormatConfig();
-final long usage = tableFormatConfig.blockCache().getPinnedUsage();
-assertMetric(ctx, STATE_STORE_LEVEL_GROUP, 
RocksDBMetrics.PINNED_USAGE_OF_BLOCK_CACHE, BigInteger.valueOf(usage));
+try (final Cache blockCache = tableFormatConfig.blockCache()) {
+final long usage = blockCache.getPinnedUsage();
+assertMetric(ctx, STATE_STORE_LEVEL_GROUP, 
RocksDBMetrics.PINNED_USAGE_OF_BLOCK_CACHE, BigInteger.valueOf(usage));
+}

Review Comment:
   side cleanup



##
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java:
##
@@ -98,11 +104,14 @@ public void shouldRecordCorrectBlockCacheUsage(final 
RocksDBStore store, final S
 public void shouldRecordCorrectBlockCachePinnedUsage(final RocksDBStore 
store, final StateStoreContext ctx) {
 withStore(store, ctx, () -> {
 final BlockBasedTableConfigWithAccessibleCache tableFormatConfig = 
(BlockBasedTableConfigWithAccessibleCache) 
store.getOptions().tableFormatConfig();
-final long usage = tableFormatConfig.blockCache().getPinnedUsage();
-assertMetric(ctx, STATE_STORE_LEVEL_GROUP, 
RocksDBMetrics.PINNED_USAGE_OF_BLOCK_CACHE, BigInteger.valueOf(usage));
+try (final Cache blockCache = tableFormatConfig.blockCache()) {
+final long usage = blockCache.getPinnedUsage();
+assertMetric(ctx, STATE_STORE_LEVEL_GROUP, 
RocksDBMetrics.PINNED_USAGE_OF_BLOCK_CACHE, BigInteger.valueOf(usage));
+}
 });
 }
 
+@SuppressWarnings("resource")

Review Comment:
   side cleanup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-18187) Replicas that receive EndQuorum should grant preVotes in that epoch

2024-12-08 Thread Peter Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-18187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17904027#comment-17904027
 ] 

Peter Lee commented on KAFKA-18187:
---

Hi [~alyssahuang], 
does this depend on [https://github.com/apache/kafka/pull/17807] or 
[https://github.com/apache/kafka/pull/18041] ?

 

> Replicas that receive EndQuorum should grant preVotes in that epoch
> ---
>
> Key: KAFKA-18187
> URL: https://issues.apache.org/jira/browse/KAFKA-18187
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 4.0.0
>Reporter: Alyssa Huang
>Assignee: Peter Lee
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-18187) Replicas that receive EndQuorum should grant preVotes in that epoch

2024-12-08 Thread Alyssa Huang (Jira)
Alyssa Huang created KAFKA-18187:


 Summary: Replicas that receive EndQuorum should grant preVotes in 
that epoch
 Key: KAFKA-18187
 URL: https://issues.apache.org/jira/browse/KAFKA-18187
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 4.0.0
Reporter: Alyssa Huang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18134: Disallow group upgrades when custom assignors are used [kafka]

2024-12-08 Thread via GitHub


squah-confluent commented on code in PR #18046:
URL: https://github.com/apache/kafka/pull/18046#discussion_r1875402835


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10077,6 +10082,116 @@ barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 1)
 assertEquals(group, 
context.groupMetadataManager.getOrMaybeCreateClassicGroup("group-id", false));
 }
 
+/**
+ * Supplies the {@link Arguments} to {@link 
#testConsumerGroupHeartbeatWithCustomAssignorClassicGroup(ByteBuffer, boolean)}.
+ */
+private static Stream 
testConsumerGroupHeartbeatWithCustomAssignorClassicGroupSource() {
+return Stream.of(
+Arguments.of(null, true),
+Arguments.of(ByteBuffer.allocate(0), true),
+Arguments.of(ByteBuffer.allocate(1), false)
+);
+}
+
+@ParameterizedTest
+
@MethodSource("testConsumerGroupHeartbeatWithCustomAssignorClassicGroupSource")
+public void 
testConsumerGroupHeartbeatWithCustomAssignorClassicGroup(ByteBuffer userData, 
boolean expectUpgrade) {
+String groupId = "group-id";
+String memberId1 = "member-id-1";
+String memberId2 = "member-id-2";
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+memberId1, new MemberAssignmentImpl(mkAssignment(
+mkTopicAssignment(fooTopicId, 0)
+)),
+memberId2, new MemberAssignmentImpl(mkAssignment(
+mkTopicAssignment(barTopicId, 0)
+))
+)));
+
+MetadataImage metadataImage = new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 1)
+.addTopic(barTopicId, barTopicName, 1)
+.addRacks()
+.build();
+
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, 
ConsumerGroupMigrationPolicy.UPGRADE.toString())
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+.withMetadataImage(metadataImage)
+.build();
+
+JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = 
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+List.of(fooTopicName, barTopicName),
+null,
+List.of(
+new TopicPartition(fooTopicName, 0),
+new TopicPartition(barTopicName, 0)
+)
+
+);
+
+Map assignments = Map.of(
+memberId1,
+Utils.toArray(ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(List.of(
+new TopicPartition(fooTopicName, 0),
+new TopicPartition(barTopicName, 0)
+), userData)))
+);
+
+// Create a stable classic group with member 1.
+ClassicGroup group = context.createClassicGroup(groupId);
+group.setProtocolName(Optional.of("range"));
+group.add(
+new ClassicGroupMember(
+memberId1,
+Optional.empty(),
+"client-id",
+"client-host",
+1,
+5000,
+"consumer",
+protocols,
+assignments.get(memberId1)
+)
+);
+
+group.transitionTo(PREPARING_REBALANCE);
+group.transitionTo(COMPLETING_REBALANCE);
+group.transitionTo(STABLE);
+
+
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, 
assignments, metadataImage.features().metadataVersion()));
+context.commit();
+group = 
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
+
+// A new member 2 with new protocol joins the classic group, 
triggering the upgrade.
+ConsumerGroupHeartbeatRequestData consumerGroupHeartbeatRequestData =
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId2)
+.setRebalanceTimeoutMs(5000)
+.setServerAssignor("range")
+.setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
+.setTopicPartitions(Collections.emptyList());
+
+if (expectUpgrade) {
+context.consumerGroupHeartbeat(co

[jira] [Assigned] (KAFKA-18187) Replicas that receive EndQuorum should grant preVotes in that epoch

2024-12-08 Thread Peter Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Lee reassigned KAFKA-18187:
-

Assignee: Peter Lee

> Replicas that receive EndQuorum should grant preVotes in that epoch
> ---
>
> Key: KAFKA-18187
> URL: https://issues.apache.org/jira/browse/KAFKA-18187
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 4.0.0
>Reporter: Alyssa Huang
>Assignee: Peter Lee
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18134: Disallow group upgrades when custom assignors are used [kafka]

2024-12-08 Thread via GitHub


squah-confluent commented on code in PR #18046:
URL: https://github.com/apache/kafka/pull/18046#discussion_r1875401373


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1053,6 +1055,12 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup 
classicGroup, List

Re: [PR] KAFKA-18134: Disallow group upgrades when custom assignors are used [kafka]

2024-12-08 Thread via GitHub


squah-confluent commented on code in PR #18046:
URL: https://github.com/apache/kafka/pull/18046#discussion_r1875401870


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1053,6 +1055,13 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup 
classicGroup, List

Re: [PR] [WIP] KIP-891: Connect Multiversioning Support (Configs and Validation changes for Connectors and Converters) [kafka]

2024-12-08 Thread via GitHub


snehashisp commented on code in PR #17741:
URL: https://github.com/apache/kafka/pull/17741#discussion_r1875404360


##
.gitignore:
##
@@ -61,3 +61,6 @@ storage/kafka-tiered-storage/
 docker/test/report_*.html
 kafka.Kafka
 __pycache__
+/connect/runtime/src/main/java/org/apache/kafka/connect/testing/
+/connect/file/
+/connect/json/

Review Comment:
   Yes, I was using/altering them for testing, hence added it here. We will 
need to remove it before merge. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18156: VerifiableConsumer should ignore "--session-timeout" when using CONSUMER protocol [kafka]

2024-12-08 Thread via GitHub


brandboat commented on code in PR #18036:
URL: https://github.com/apache/kafka/pull/18036#discussion_r1874816095


##
tests/kafkatest/services/verifiable_consumer.py:
##
@@ -417,10 +417,13 @@ def start_cmd(self, node):
 else:
 cmd += " --bootstrap-server %s" % 
self.kafka.bootstrap_servers(self.security_config.security_protocol)
 
-cmd += " --reset-policy %s --group-id %s --topic %s --session-timeout 
%s" % \
-   (self.reset_policy, self.group_id, self.topic,
-self.session_timeout_sec*1000)
-   
+cmd += " --reset-policy %s --group-id %s --topic %s" % \
+(self.reset_policy, self.group_id, self.topic)
+
+# session timeout is not supported when using CONSUMER group protocol
+if self.session_timeout_sec > 0 and 
self.is_consumer_group_protocol_enabled():

Review Comment:
   Only `VerifiableConsumerTest` specifically set session timeout, but I think 
it's a bit redundant, so I refactor them out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18156: VerifiableConsumer should ignore "--session-timeout" when using CONSUMER protocol [kafka]

2024-12-08 Thread via GitHub


brandboat commented on code in PR #18036:
URL: https://github.com/apache/kafka/pull/18036#discussion_r1874823531


##
tests/kafkatest/services/verifiable_consumer.py:
##
@@ -251,8 +251,6 @@ def __init__(self, context, num_nodes, kafka, topic, 
group_id,
 self.session_timeout_sec = session_timeout_sec
 self.enable_autocommit = enable_autocommit
 self.assignment_strategy = assignment_strategy
-self.group_protocol = group_protocol
-self.group_remote_assignor = group_remote_assignor

Review Comment:
   these 2 lines are duplicate code, see L245, L246



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-18014) Add duration based offset reset option for ShareConsumer

2024-12-08 Thread Peter Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-18014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17903916#comment-17903916
 ] 

Peter Lee commented on KAFKA-18014:
---

Hi [~omkreddy], PR is ready, PTAL. Thanks!

> Add duration based offset reset option for ShareConsumer
> 
>
> Key: KAFKA-18014
> URL: https://issues.apache.org/jira/browse/KAFKA-18014
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Manikumar
>Assignee: Peter Lee
>Priority: Major
> Fix For: 4.1.0
>
>
> Kafka consumer supports auto.offset.reset config option, which is used when 
> there is no initial offset in Kafka (or) if the current offset does not exist 
> any more on the server. This config currently supports earliest/latest/none 
> options. Currently consumer resets might force applications to reprocess 
> large amounts of data from earlier offsets. With infinite storage, its 
> beneficial to have a duration based offset reset strategy. This will allow 
> applications to consume/initialise from a fixed duration when there is no 
> initial offset in Kafka.
> As part of 
> [KIP-932|https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka],
>  we are adding support for share consumer groups. Share consumer groups 
> supports dynamic group configuration property share.auto.offset.reset. This 
> is used to set the initial Share-Partition Start Offset (SPSO) based on the 
> share.auto.offset.reset configuration. Currently share.auto.offset.reset 
> supports earliest and latest options to automatically reset the offset
> Similar to the Kafka Consumer, we will add support for by_duration: config 
> value for {{{}share.auto.offset.reset{}}}.
> {quote}from 
> [KIP-1106|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1106%3A+Add+duration+based+offset+reset+option+for+consumer+clients]
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-12843) KIP-740 follow up: clean up TaskMetadata

2024-12-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

João Pedro Fonseca closed KAFKA-12843.
--

> KIP-740 follow up: clean up TaskMetadata
> 
>
> Key: KAFKA-12843
> URL: https://issues.apache.org/jira/browse/KAFKA-12843
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: João Pedro Fonseca
>Priority: Blocker
> Fix For: 4.0.0
>
>
> See 
> [KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557]
>  – for the TaskMetadata class, we need to:
>  # Deprecate the TaskMetadata#getTaskId method
>  # "Remove" the deprecated TaskMetadata#taskId method, then re-add a taskId() 
> API that returns a TaskId instead of a String
>  # Remove the deprecated constructor



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [WIP] KAFKA-10409: Refactor Kafka Streams RocksDb iterators [kafka]

2024-12-08 Thread via GitHub


fonsdant opened a new pull request, #18099:
URL: https://github.com/apache/kafka/pull/18099

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18015: Add support for duration based offset reset strategy to Kafka Streams [kafka]

2024-12-08 Thread via GitHub


peterxcli commented on code in PR #17973:
URL: https://github.com/apache/kafka/pull/17973#discussion_r1874837232


##
streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * Sets the {@code auto.offset.reset} configuration when
+ * {@link #addSource(AutoOffsetReset, String, String...) adding a source 
processor} or when creating {@link KStream}
+ * or {@link KTable} via {@link StreamsBuilder}.
+ */
+public class AutoOffsetReset {

Review Comment:
   How about renaming this to `StreamAutoOffsetReset`, as we now have 
`AutoOffsetReset` for consumer, share group, and stream?
   
   - https://github.com/apache/kafka/pull/17858
   - https://github.com/apache/kafka/pull/18096
   
   Additionally, I believe we should rename the `AutoOffsetReset` class added 
in https://github.com/apache/kafka/pull/17858 for better clarity.  
   @omkreddy, your thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18014: Add duration based offset reset option for ShareConsumer [kafka]

2024-12-08 Thread via GitHub


peterxcli commented on code in PR #18096:
URL: https://github.com/apache/kafka/pull/18096#discussion_r1874838353


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ShareGroupAutoOffsetResetStrategy.java:
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.utils.Utils;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.Optional;
+
+public class ShareGroupAutoOffsetResetStrategy {

Review Comment:
   As mention in 
https://github.com/apache/kafka/pull/17573#discussion_r1814871532, let each 
`AutoOffsetResetStrategy` evolve themselves 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18015: Add support for duration based offset reset strategy to Kafka Streams [kafka]

2024-12-08 Thread via GitHub


peterxcli commented on code in PR #17973:
URL: https://github.com/apache/kafka/pull/17973#discussion_r1874837232


##
streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * Sets the {@code auto.offset.reset} configuration when
+ * {@link #addSource(AutoOffsetReset, String, String...) adding a source 
processor} or when creating {@link KStream}
+ * or {@link KTable} via {@link StreamsBuilder}.
+ */
+public class AutoOffsetReset {

Review Comment:
   How about renaming this to `StreamAutoOffsetResetStrategy`, as we now have 
`AutoOffsetReset` for consumer, share group, and stream?
   
   - https://github.com/apache/kafka/pull/17858
   - https://github.com/apache/kafka/pull/18096
   
   Additionally, I believe we should rename the `AutoOffsetReset` class added 
in https://github.com/apache/kafka/pull/17858 for better clarity.  
   @omkreddy, your thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18014: Add duration based offset reset option for ShareConsumer [kafka]

2024-12-08 Thread via GitHub


peterxcli commented on PR #18096:
URL: https://github.com/apache/kafka/pull/18096#issuecomment-2526019079

   Hi @omkreddy, PR is ready, PTAL. Thanks!
   Sorry for mentioning you both side, just to make sure you do receive that 😁~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18014: Add duration based offset reset option for ShareConsumer [kafka]

2024-12-08 Thread via GitHub


peterxcli commented on code in PR #18096:
URL: https://github.com/apache/kafka/pull/18096#discussion_r1874838353


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ShareGroupAutoOffsetResetStrategy.java:
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.utils.Utils;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.Optional;
+
+public class ShareGroupAutoOffsetResetStrategy {

Review Comment:
   As mentioned in 
https://github.com/apache/kafka/pull/17573#discussion_r1814871532, let each 
`AutoOffsetResetStrategy` evolve themselves 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-12843) KIP-740 follow up: clean up TaskMetadata

2024-12-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

João Pedro Fonseca resolved KAFKA-12843.

Resolution: Fixed

> KIP-740 follow up: clean up TaskMetadata
> 
>
> Key: KAFKA-12843
> URL: https://issues.apache.org/jira/browse/KAFKA-12843
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: João Pedro Fonseca
>Priority: Blocker
> Fix For: 4.0.0
>
>
> See 
> [KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557]
>  – for the TaskMetadata class, we need to:
>  # Deprecate the TaskMetadata#getTaskId method
>  # "Remove" the deprecated TaskMetadata#taskId method, then re-add a taskId() 
> API that returns a TaskId instead of a String
>  # Remove the deprecated constructor



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18156: VerifiableConsumer should ignore "--session-timeout" when using CONSUMER protocol [kafka]

2024-12-08 Thread via GitHub


chia7712 commented on code in PR #18036:
URL: https://github.com/apache/kafka/pull/18036#discussion_r1874846202


##
tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java:
##
@@ -649,8 +648,12 @@ public static VerifiableConsumer 
createFromArgs(ArgumentParser parser, String[]
 if (groupRemoteAssignor != null)
 consumerProps.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, 
groupRemoteAssignor);
 } else {
-// This means we're using the old consumer group protocol.
+// This means we're using the CLASSIC consumer group protocol.
 
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
res.getString("assignmentStrategy"));
+Integer sessionTimeout = res.getInt("sessionTimeout");

Review Comment:
   yes, we should honor users' configs to let him encounter the error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18164: Clear existing acknowledgements on share session epoch reset. [kafka]

2024-12-08 Thread via GitHub


ShivsundarR commented on code in PR #18063:
URL: https://github.com/apache/kafka/pull/18063#discussion_r1874846243


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##
@@ -990,16 +990,17 @@ UnsentRequest buildRequest() {
 }
 
 ShareAcknowledgeRequest.Builder requestBuilder = 
sessionHandler.newShareAcknowledgeBuilder(groupId, fetchConfig);
-Node nodeToSend = metadata.fetch().nodeById(nodeId);
 
 log.trace("Building acknowledgements to send : {}", 
finalAcknowledgementsToSend);

Review Comment:
   Yeah makes sense, thanks. I have moved it now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18156: VerifiableConsumer should ignore "--session-timeout" when using CONSUMER protocol [kafka]

2024-12-08 Thread via GitHub


chia7712 commented on code in PR #18036:
URL: https://github.com/apache/kafka/pull/18036#discussion_r1874952912


##
tests/kafkatest/tests/verifiable_consumer_test.py:
##
@@ -56,7 +55,7 @@ def min_cluster_size(self):
 def setup_consumer(self, topic, static_membership=False, 
enable_autocommit=False,

assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor", 
group_remote_assignor="range", **kwargs):
 return VerifiableConsumer(self.test_context, self.num_consumers, 
self.kafka,
-  topic, self.group_id, 
static_membership=static_membership, 
session_timeout_sec=self.session_timeout_sec,

Review Comment:
   `session_timeout_sec` is used by other end-to-end tests, so we need to 
perform some refactoring for those test cases.
   
   **Replace Dynamic Timeouts with Constants**
   For example, change timeout_sec=self.session_timeout_sec + 5 to 
timeout_sec=60.
   
   **Remove Use Cases of Increasing session_timeout_sec**
   These cases typically increase session_timeout_sec from 45 seconds to 60 
seconds. This adjustment may be unnecessary if the tests can run stably without 
modifying the timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-18180: Move OffsetResultHolder to storage module [kafka]

2024-12-08 Thread via GitHub


m1a2st opened a new pull request, #18100:
URL: https://github.com/apache/kafka/pull/18100

   Jira: https://issues.apache.org/jira/browse/KAFKA-18180
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18180: Move OffsetResultHolder to storage module [kafka]

2024-12-08 Thread via GitHub


m1a2st commented on code in PR #18100:
URL: https://github.com/apache/kafka/pull/18100#discussion_r1874955519


##
storage/src/main/java/org/apache/kafka/storage/log/OffsetResultHolder.java:
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.log;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset;
+
+import java.util.Objects;
+import java.util.Optional;
+
+public class OffsetResultHolder {
+
+private Optional timestampAndOffsetOpt;
+private Optional> 
futureHolderOpt;
+private Optional maybeOffsetsError = Optional.empty();
+private Optional lastFetchableOffset = Optional.empty();
+
+public OffsetResultHolder() {
+this(Optional.empty(), Optional.empty());
+}
+
+public OffsetResultHolder(
+Optional timestampAndOffsetOpt,
+Optional> 
futureHolderOpt
+) {
+this.timestampAndOffsetOpt = timestampAndOffsetOpt;
+this.futureHolderOpt = futureHolderOpt;
+}
+
+public OffsetResultHolder(Optional 
timestampAndOffsetOpt) {
+this(timestampAndOffsetOpt, Optional.empty());
+}
+
+public Optional timestampAndOffsetOpt() {
+return timestampAndOffsetOpt;
+}
+
+public Optional> 
futureHolderOpt() {
+return futureHolderOpt;
+}
+
+public Optional maybeOffsetsError() {
+return maybeOffsetsError;
+}
+
+public Optional lastFetchableOffset() {
+return lastFetchableOffset;
+}
+
+public void timestampAndOffsetOpt(Optional 
timestampAndOffsetOpt) {
+this.timestampAndOffsetOpt = timestampAndOffsetOpt;
+}
+
+public void maybeOffsetsError(Optional maybeOffsetsError) {
+this.maybeOffsetsError = maybeOffsetsError;
+}
+
+public void lastFetchableOffset(Optional lastFetchableOffset) {
+this.lastFetchableOffset = lastFetchableOffset;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+
+OffsetResultHolder that = (OffsetResultHolder) o;
+return Objects.equals(timestampAndOffsetOpt, 
that.timestampAndOffsetOpt) && Objects.equals(futureHolderOpt, 
that.futureHolderOpt) && Objects.equals(maybeOffsetsError, 
that.maybeOffsetsError) && Objects.equals(lastFetchableOffset, 
that.lastFetchableOffset);
+}
+
+@Override
+public int hashCode() {
+int result = Objects.hashCode(timestampAndOffsetOpt);
+result = 31 * result + Objects.hashCode(futureHolderOpt);
+result = 31 * result + Objects.hashCode(maybeOffsetsError);
+result = 31 * result + Objects.hashCode(lastFetchableOffset);
+return result;
+}
+
+public static class FileRecordsOrError {

Review Comment:
   I don't think this name is precise enough, but I can't think of a better 
idea right now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10731: add support for SSL hot reload [kafka]

2024-12-08 Thread via GitHub


CefBoud commented on PR #17987:
URL: https://github.com/apache/kafka/pull/17987#issuecomment-2526238613

   @TaiJuWu I've drafted a [KIP](https://cwiki.apache.org/confluence/x/eIrREw). 
 If you have any feedback, I’d really appreciate it!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org