Re: [PR] KAFKA-18129: Simplifying share partition maybeInitialize code [kafka]
apoorvmittal10 commented on code in PR #18093: URL: https://github.com/apache/kafka/pull/18093#discussion_r1874735278 ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -371,52 +370,35 @@ public static RecordState forId(byte id) { */ public CompletableFuture maybeInitialize() { log.debug("Maybe initialize share partition: {}-{}", groupId, topicIdPartition); -CompletableFuture future = new CompletableFuture<>(); -AtomicReference> futureException = new AtomicReference<>(Optional.empty()); // Check if the share partition is already initialized. -InitializationResult initializationResult = checkInitializationCompletion(); -if (initializationResult.isComplete()) { -if (initializationResult.throwable() != null) { -future.completeExceptionally(initializationResult.throwable()); -} else { -future.complete(null); +try { +if (initializedOrThrowException()) { +return CompletableFuture.completedFuture(null); } -return future; +} catch (Exception e) { +return CompletableFuture.failedFuture(e); } +// If code reaches here then the share partition is not initialized. Initialize the share partition. // All the pending requests should wait to get completed before the share partition is initialized. // Attain lock to avoid any concurrent requests to be processed. lock.writeLock().lock(); -boolean shouldFutureBeCompleted = false; try { // Re-check the state to verify if previous requests has already initialized the share partition. -initializationResult = checkInitializationCompletion(); -if (initializationResult.isComplete()) { -if (initializationResult.throwable() != null) { - futureException.set(Optional.of(initializationResult.throwable())); -} -shouldFutureBeCompleted = true; -return future; +if (initializedOrThrowException()) { +return CompletableFuture.completedFuture(null); Review Comment: Your suggestion is better hence I incorporated that. However I have tested the earlier code as well and as java specification says that without completion of `finally` block the method will not return result from `try`. Unlike earlier, in the problematic code, where we were completing the `future` in `try`, but now it was a `return` statement. Below is a test and output. ``` public CompletableFuture testFuture() { lock.writeLock().lock(); try { System.out.println("testFuture"); return CompletableFuture.completedFuture(null); } finally { System.out.println("testFuture finally"); lock.writeLock().unlock(); System.out.println("testFuture finally unlocked"); } } ``` ``` @RepeatedTest(100) public void testFuture() { SharePartition sharePartition = SharePartitionBuilder.builder().build(); sharePartition.testFuture().whenComplete((result, exception) -> { System.out.println("Completed"); }); } ``` ``` testFuture testFuture finally testFuture finally unlocked Completed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18129: Simplifying share partition maybeInitialize code [kafka]
apoorvmittal10 commented on code in PR #18093: URL: https://github.com/apache/kafka/pull/18093#discussion_r1874735359 ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -426,55 +408,50 @@ public CompletableFuture maybeInitialize() { .build()) .build() ).whenComplete((result, exception) -> { +Throwable throwable = null; lock.writeLock().lock(); try { if (exception != null) { log.error("Failed to initialize the share partition: {}-{}", groupId, topicIdPartition, exception); -completeInitializationWithException(); -futureException.set(Optional.of(exception)); +throwable = exception; return; } if (result == null || result.topicsData() == null || result.topicsData().size() != 1) { log.error("Failed to initialize the share partition: {}-{}. Invalid state found: {}.", groupId, topicIdPartition, result); -completeInitializationWithException(); -futureException.set(Optional.of(new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition; +throwable = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition)); return; } TopicData state = result.topicsData().get(0); if (state.topicId() != topicIdPartition.topicId() || state.partitions().size() != 1) { log.error("Failed to initialize the share partition: {}-{}. Invalid topic partition response: {}.", groupId, topicIdPartition, result); -completeInitializationWithException(); -futureException.set(Optional.of(new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition; +throwable = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition)); return; } PartitionAllData partitionData = state.partitions().get(0); if (partitionData.partition() != topicIdPartition.partition()) { log.error("Failed to initialize the share partition: {}-{}. Invalid partition response: {}.", groupId, topicIdPartition, partitionData); -completeInitializationWithException(); -futureException.set(Optional.of(new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition; +throwable = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition)); return; } if (partitionData.errorCode() != Errors.NONE.code()) { KafkaException ex = fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage()); log.error("Failed to initialize the share partition: {}-{}. Exception occurred: {}.", groupId, topicIdPartition, partitionData); -completeInitializationWithException(); -futureException.set(Optional.of(ex)); +throwable = ex; return; } try { startOffset = startOffsetDuringInitialization(partitionData.startOffset()); } catch (Exception e) { Review Comment: Agreed, done. ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -426,55 +408,50 @@ public CompletableFuture maybeInitialize() { .build()) .build() ).whenComplete((result, exception) -> { +Throwable throwable = null; lock.writeLock().lock(); try { if (exception != null) { log.error("Failed to initialize the share partition: {}-{}", groupId, topicIdPartition, exception); -completeInitializationWithException(); -futureException.set(Optional.of(exception)); +throwable = exception; return; } if (result == null || result.topicsData() == null || result.topicsData().size() != 1) { log.error("Failed to initialize the share partition: {}-{}. Invalid state found: {}.", groupId, topicIdPartition, result); -completeInitializationWithException(); -
[PR] KAFKA-18021: Disabled MirrorCheckpointConnector throws RetriableException on task config generation [kafka]
frankvicky opened a new pull request, #18098: URL: https://github.com/apache/kafka/pull/18098 JIRA: KAFKA-18021 When a `MirrorCheckpointConnector` is disabled, the `start` method exits early without initiating `loadInitialConsumerGroups`. If the connector is restarted in a disabled state, `taskConfigs` continuously throws `RetriableException` as `loadInitialConsumerGroups` never completes. This prevents emitting an empty set of task configs to stop running tasks, leaving previously active `MirrorCheckpointTasks` enabled and making it impossible to fully disable them. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18021: Disabled MirrorCheckpointConnector throws RetriableException on task config generation [kafka]
frankvicky commented on PR #18098: URL: https://github.com/apache/kafka/pull/18098#issuecomment-2525655979 Hi @gharris1727 Could you please take a look when you have some time? Many thanks 🙇🏼 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18156: VerifiableConsumer should ignore "--session-timeout" when using CONSUMER protocol [kafka]
brandboat commented on code in PR #18036: URL: https://github.com/apache/kafka/pull/18036#discussion_r1874795473 ## tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java: ## @@ -649,8 +648,12 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[] if (groupRemoteAssignor != null) consumerProps.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, groupRemoteAssignor); } else { -// This means we're using the old consumer group protocol. +// This means we're using the CLASSIC consumer group protocol. consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, res.getString("assignmentStrategy")); +Integer sessionTimeout = res.getInt("sessionTimeout"); Review Comment: > it is fine to throw exception if user does set the --session-timeout Pardon me, did you mean throw exception if user set session timeout when using CONSUMER group.protocol ? Not sure if I understand correctly 🙏 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-18183) ClusterInstance's helper should use byte array instead of Bytes in creating producer/consumer
[ https://issues.apache.org/jira/browse/KAFKA-18183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17903860#comment-17903860 ] Logan Zhu commented on KAFKA-18183: --- Sorry, [~suresh7]. I’m already working on this. > ClusterInstance's helper should use byte array instead of Bytes in creating > producer/consumer > - > > Key: KAFKA-18183 > URL: https://issues.apache.org/jira/browse/KAFKA-18183 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Logan Zhu >Priority: Trivial > > https://github.com/apache/kafka/blob/trunk/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java#L162 > https://github.com/apache/kafka/blob/trunk/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java#L174 > byte array is more common than Bytes -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-18135: ShareConsumer HB UnsupportedVersion msg mixed with Consumer HB [kafka]
peterxcli opened a new pull request, #18101: URL: https://github.com/apache/kafka/pull/18101 UnsupportedVersion error is handled in the parent AbstractHeartbeatRequestManager, so used by consumer and share consumer. If a share consumer gets the errors, it will end up with a msg that is currently specific to consumer https://github.com/apache/kafka/blob/6fd951a9c0aa773060cd6bbf8a8b8c47ee9d2965/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java#L384-L386 Handle the UnsupportedVersion separately in the existing handleSpecificError (note that the unsupported version for consumer may also end up containing a msg specific to SubscriptionPattern not supported in HB v0, if regex is used without the required v1) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17811: Separate modules to use different JDKs [kafka]
chia7712 commented on PR #17522: URL: https://github.com/apache/kafka/pull/17522#issuecomment-2526275233 open [KAFKA-18186](https://issues.apache.org/jira/browse/KAFKA-18186) to address this. @ableegoldman please feel free to share your thoughts on this approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] KIP-891: Connect Multiversioning Support (Configs and Validation changes for Connectors and Converters) [kafka]
gharris1727 commented on code in PR #17741: URL: https://github.com/apache/kafka/pull/17741#discussion_r1874983220 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java: ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.isolation.VersionedPluginLoadingException; +import org.apache.maven.artifact.versioning.VersionRange; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class CachedConnectors { + +private final static String LATEST_VERSION = "latest"; + +private final Map> connectors; +private final Map invalidConnectors; +private final Map> invalidVersions; +private final Plugins plugins; + +public CachedConnectors(Plugins plugins) { +this.plugins = plugins; +this.connectors = new ConcurrentHashMap<>(); +this.invalidConnectors = new ConcurrentHashMap<>(); +this.invalidVersions = new ConcurrentHashMap<>(); +} + +private void validate(String connectorName, VersionRange range) throws Exception { +if (invalidConnectors.containsKey(connectorName)) { +throw invalidConnectors.get(connectorName); Review Comment: Rethrowing the same exception can cause some strange results, because the exception's stacktrace gets computed only once. You can cache the exception from Plugins, and then create and throw a new exception on each method call. ## .gitignore: ## @@ -61,3 +61,6 @@ storage/kafka-tiered-storage/ docker/test/report_*.html kafka.Kafka __pycache__ +/connect/runtime/src/main/java/org/apache/kafka/connect/testing/ +/connect/file/ +/connect/json/ Review Comment: I think these ignores include real code ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java: ## @@ -367,6 +371,13 @@ public Object newPlugin(String classOrAlias, VersionRange range) throws Versione return newPlugin(klass); } +public Object newPlugin(String classOrAlias, Class baseClass, VersionRange range) throws ClassNotFoundException { Review Comment: This is a bad method addition: * The point of passing in a `Class` is in order to return a `T`, to avoid a `(T)` cast in the caller. * This method returns `Object` so it still requires a cast in the caller * It does a null check on version, which is already very reasonably handled within the rest of Plugins. * The method calls Utils.newInstance which instantiates the plugin with the wrong TCCL Either change the caller to use `newPlugin(String, VersionRange)` and perform the blind cast, or change this method to actually handle the casting/type safety and not call Utils.newInstance. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java: ## @@ -70,20 +70,30 @@ public class WorkerConfig extends AbstractConfig { public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG; public static final String CLIENT_DNS_LOOKUP_DOC = CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC; +public static final String PLUGIN_VERSION_SUFFIX = "version"; Review Comment: I checked the KIP, and the suffix should be `plugin.version`. But the connector is still written to use "version", is that a mistake or intentional? I would prefer it being consistent so we can use the same constant everywhere. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java: ## @@ -98,6 +118,12 @@ public class ConnectorConfig extends AbstractConfig { new InstantiableClassValidator() ); +public static final String VALUE_CONVERTER_VERSION_CONFIG = WorkerConfig.VALUE_CONVERTER_VERSION; +private static final String VALUE_CONVERTER_VERSION_DEFAULT = null; +private static final String VALUE_CONVERTER_VERSION_DOC = "Version of the value converter."; +private static final String VALUE_CON
Re: [PR] KAFKA-16143: New JMX metrics for AsyncKafkaConsumer [kafka]
lianetm commented on code in PR #17199: URL: https://github.com/apache/kafka/pull/17199#discussion_r1872002514 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1893,25 +1901,30 @@ private void subscribeInternal(Collection topics, Optional firstError = new AtomicReference<>(); -LinkedList events = new LinkedList<>(); -backgroundEventQueue.drainTo(events); - -for (BackgroundEvent event : events) { -try { -if (event instanceof CompletableEvent) -backgroundEventReaper.add((CompletableEvent) event); - -backgroundEventProcessor.process(event); -} catch (Throwable t) { -KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t); - -if (!firstError.compareAndSet(null, e)) -log.warn("An error occurred when processing the background event: {}", e.getMessage(), e); +List events = backgroundEventHandler.drainEvents(); +if (!events.isEmpty()) { +long startMs = time.milliseconds(); +for (BackgroundEvent event : events) { + kafkaConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() - event.enqueuedMs()); +try { +if (event instanceof CompletableEvent) +backgroundEventReaper.add((CompletableEvent) event); + +backgroundEventProcessor.process(event); +} catch (Throwable t) { +KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t); + +if (!firstError.compareAndSet(null, e)) +log.warn("An error occurred when processing the background event: {}", e.getMessage(), e); +} } + kafkaConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds() - startMs); } backgroundEventReaper.reap(time.milliseconds()); Review Comment: Interesting, and if we agree on what we want we could just send an update in the KIP email thread to add it to the KIP and here. To align internally first, I guess we would be interested in the num/avg of expired events, but we need to consider how that metric would go crazy and be a false alarm in cases like poll(0) right? Should we consider the expiration relevant only if there was a non-zero timeout? Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16143: New JMX metrics for AsyncKafkaConsumer [kafka]
lianetm commented on code in PR #17199: URL: https://github.com/apache/kafka/pull/17199#discussion_r1875064838 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1893,25 +1901,30 @@ private void subscribeInternal(Collection topics, Optional firstError = new AtomicReference<>(); -LinkedList events = new LinkedList<>(); -backgroundEventQueue.drainTo(events); - -for (BackgroundEvent event : events) { -try { -if (event instanceof CompletableEvent) -backgroundEventReaper.add((CompletableEvent) event); - -backgroundEventProcessor.process(event); -} catch (Throwable t) { -KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t); - -if (!firstError.compareAndSet(null, e)) -log.warn("An error occurred when processing the background event: {}", e.getMessage(), e); +List events = backgroundEventHandler.drainEvents(); +if (!events.isEmpty()) { +long startMs = time.milliseconds(); +for (BackgroundEvent event : events) { + kafkaConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() - event.enqueuedMs()); +try { +if (event instanceof CompletableEvent) +backgroundEventReaper.add((CompletableEvent) event); + +backgroundEventProcessor.process(event); +} catch (Throwable t) { +KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t); + +if (!firstError.compareAndSet(null, e)) +log.warn("An error occurred when processing the background event: {}", e.getMessage(), e); +} } + kafkaConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds() - startMs); } backgroundEventReaper.reap(time.milliseconds()); Review Comment: Agree that it's not applicable for background events because this callback needed is the only `CompletableBackgroundEvent`, and it's intentionally not expired (I don't think we need to change that) I would say that what might be interesting to know is expiration of Application events. https://github.com/apache/kafka/blob/ee4264439ddda7bdebcaa845752b824abba14161/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L184 There we do have lots of events with deadline, I guess that's what @AndrewJSchofield had in mind maybe? (I notice now that the initial comment was here on the reap of background events). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18164: Clear existing acknowledgements on share session epoch reset. [kafka]
AndrewJSchofield commented on code in PR #18063: URL: https://github.com/apache/kafka/pull/18063#discussion_r1875029001 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ## @@ -990,16 +990,18 @@ UnsentRequest buildRequest() { } ShareAcknowledgeRequest.Builder requestBuilder = sessionHandler.newShareAcknowledgeBuilder(groupId, fetchConfig); Review Comment: It seems to me that `ShareSessionHandler.addPartitionToFetch` saves partition and acknowledgement state into the handler to be built by the request builder. In the event that the request builder cannot be built, such as because a new share session needs to be created, the partition and acknowledgement state is still left in the share session. I worry that it is possible this could lead to an erroneous request in some situations when the session handler is used again when a new connection has been created. I think that it would be prudent to clear out `nextAcknowledgements` and `nextPartitions` in `ShareSessionHandler.newShareAcknowledgeBuilder` in the case where it returns null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Extract some codeblocks as methods to simplify readability [kafka]
AndrewJSchofield commented on code in PR #18017: URL: https://github.com/apache/kafka/pull/18017#discussion_r1875032057 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -229,12 +229,8 @@ private Long buildRemoteLogAuxState(TopicPartition topicPartition, } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset) -.orElseThrow(() -> new RemoteStorageException("Couldn't build the state from remote store for partition: " + topicPartition + -", currentLeaderEpoch: " + currentLeaderEpoch + -", leaderLocalLogStartOffset: " + leaderLocalLogStartOffset + -", leaderLogStartOffset: " + leaderLogStartOffset + -", epoch: " + targetEpoch + -"as the previous remote log segment metadata was not found")); +.orElseThrow(() -> buildRemoteStorageException(topicPartition, targetEpoch, currentLeaderEpoch, +leaderLocalLogStartOffset, previousOffsetToLeaderLocalLogStartOffset)); Review Comment: It seems to me that the final argument here should be `leaderLogStartOffset` to match the previous behaviour. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18135: ShareConsumer HB UnsupportedVersion msg mixed with Consumer HB [kafka]
AndrewJSchofield commented on PR #18101: URL: https://github.com/apache/kafka/pull/18101#issuecomment-2526360864 We should get @lianetm to make sure she's happy too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18058: Share group state record pruning impl. [kafka]
AndrewJSchofield commented on PR #18014: URL: https://github.com/apache/kafka/pull/18014#issuecomment-2526362182 @smjn Please resolve the conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Extract some codeblocks as methods to simplify readability [kafka]
overpathz commented on code in PR #18017: URL: https://github.com/apache/kafka/pull/18017#discussion_r1875034438 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -229,12 +229,8 @@ private Long buildRemoteLogAuxState(TopicPartition topicPartition, } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset) -.orElseThrow(() -> new RemoteStorageException("Couldn't build the state from remote store for partition: " + topicPartition + -", currentLeaderEpoch: " + currentLeaderEpoch + -", leaderLocalLogStartOffset: " + leaderLocalLogStartOffset + -", leaderLogStartOffset: " + leaderLogStartOffset + -", epoch: " + targetEpoch + -"as the previous remote log segment metadata was not found")); +.orElseThrow(() -> buildRemoteStorageException(topicPartition, targetEpoch, currentLeaderEpoch, +leaderLocalLogStartOffset, previousOffsetToLeaderLocalLogStartOffset)); Review Comment: Thx. Addressed in the recent commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18135: ShareConsumer HB UnsupportedVersion msg mixed with Consumer HB [kafka]
AndrewJSchofield commented on code in PR #18101: URL: https://github.com/apache/kafka/pull/18101#discussion_r1875034076 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java: ## @@ -100,6 +100,9 @@ public abstract class AbstractHeartbeatRequestManager
Re: [PR] KAFKA-18014: Add duration based offset reset option for ShareConsumer [kafka]
AndrewJSchofield commented on code in PR #18096: URL: https://github.com/apache/kafka/pull/18096#discussion_r1875037969 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ShareGroupAutoOffsetResetStrategy.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.utils.Utils; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.Locale; +import java.util.Objects; +import java.util.Optional; + +public class ShareGroupAutoOffsetResetStrategy { +public enum StrategyType { +LATEST, EARLIEST, BY_DURATION; + +@Override +public String toString() { +return super.toString().toLowerCase(Locale.ROOT); +} +} + +public static final ShareGroupAutoOffsetResetStrategy EARLIEST = new ShareGroupAutoOffsetResetStrategy(StrategyType.EARLIEST); +public static final ShareGroupAutoOffsetResetStrategy LATEST = new ShareGroupAutoOffsetResetStrategy(StrategyType.LATEST); + +private final StrategyType type; +private final Optional duration; + +private ShareGroupAutoOffsetResetStrategy(StrategyType type) { +this.type = type; +this.duration = Optional.empty(); +} + +private ShareGroupAutoOffsetResetStrategy(Duration duration) { +this.type = StrategyType.BY_DURATION; +this.duration = Optional.of(duration); +} + +/** + * Returns the AutoOffsetResetStrategy from the given string. + */ +public static ShareGroupAutoOffsetResetStrategy fromString(String offsetStrategy) { +if (offsetStrategy == null) { +throw new IllegalArgumentException("Auto offset reset strategy is null"); +} + +if (StrategyType.BY_DURATION.toString().equals(offsetStrategy)) { +throw new IllegalArgumentException("<:duration> part is missing in by_duration auto offset reset strategy."); +} + +if (Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy)) { +StrategyType type = StrategyType.valueOf(offsetStrategy.toUpperCase(Locale.ROOT)); +switch (type) { +case EARLIEST: +return EARLIEST; +case LATEST: +return LATEST; +default: +throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy); +} +} + +if (offsetStrategy.startsWith(StrategyType.BY_DURATION + ":")) { +String isoDuration = offsetStrategy.substring(StrategyType.BY_DURATION.toString().length() + 1); +try { +Duration duration = Duration.parse(isoDuration); +if (duration.isNegative()) { +throw new IllegalArgumentException("Negative duration is not supported in by_duration offset reset strategy."); +} +return new ShareGroupAutoOffsetResetStrategy(duration); +} catch (Exception e) { +throw new IllegalArgumentException("Unable to parse duration string in by_duration offset reset strategy.", e); +} +} + +throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy); +} + +/** + * Returns the offset reset strategy type. + */ +public StrategyType type() { +return type; +} + +/** + * Returns the name of the offset reset strategy. + */ +public String name() { +return type.toString(); +} + +/** + * Return the timestamp to be used for the ListOffsetsRequest. + * @return the timestamp for the OffsetResetStrategy, + * if the strategy is EARLIEST or LATEST or duration is provided + * else return Optional.empty() + */ +public Optional timestamp() { +if (type == StrategyType.EARLIEST) +return Optional.of(ListOffsetsRequest.EARLI
Re: [PR] [WIP] KIP-891: Connect Multiversioning Support (Configs and Validation changes for Connectors and Converters) [kafka]
snehashisp commented on PR #17741: URL: https://github.com/apache/kafka/pull/17741#issuecomment-2526364647 Thanks for the review @gharris1727. Will get to work on the soon, in my daytime. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Extract some codeblocks as methods to simplify readability [kafka]
overpathz commented on code in PR #18017: URL: https://github.com/apache/kafka/pull/18017#discussion_r1875039076 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -229,12 +229,8 @@ private Long buildRemoteLogAuxState(TopicPartition topicPartition, } RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset) -.orElseThrow(() -> new RemoteStorageException("Couldn't build the state from remote store for partition: " + topicPartition + -", currentLeaderEpoch: " + currentLeaderEpoch + -", leaderLocalLogStartOffset: " + leaderLocalLogStartOffset + -", leaderLogStartOffset: " + leaderLogStartOffset + -", epoch: " + targetEpoch + -"as the previous remote log segment metadata was not found")); +.orElseThrow(() -> buildRemoteStorageException(topicPartition, targetEpoch, currentLeaderEpoch, +leaderLocalLogStartOffset, previousOffsetToLeaderLocalLogStartOffset)); Review Comment: @AndrewJSchofield Additional ping for possible notification miss -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-18186) add sourceCompatibility back to build.gradle to allow idea to configure suitable language level automatically
[ https://issues.apache.org/jira/browse/KAFKA-18186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-18186: -- Assignee: 黃竣陽 (was: Chia-Ping Tsai) > add sourceCompatibility back to build.gradle to allow idea to configure > suitable language level automatically > - > > Key: KAFKA-18186 > URL: https://issues.apache.org/jira/browse/KAFKA-18186 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: 黃竣陽 >Priority: Major > > see my comment https://github.com/apache/kafka/pull/17522 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-18186) add sourceCompatibility back to build.gradle to allow idea to configure suitable language level automatically
[ https://issues.apache.org/jira/browse/KAFKA-18186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-18186: --- Parent: KAFKA-16096 Issue Type: Sub-task (was: Improvement) > add sourceCompatibility back to build.gradle to allow idea to configure > suitable language level automatically > - > > Key: KAFKA-18186 > URL: https://issues.apache.org/jira/browse/KAFKA-18186 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: 黃竣陽 >Priority: Major > > see my comment https://github.com/apache/kafka/pull/17522 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18058: Share group state record pruning impl. [kafka]
smjn commented on PR #18014: URL: https://github.com/apache/kafka/pull/18014#issuecomment-2526373284 > @smjn Please resolve the conflicts. @AndrewJSchofield done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-18186) add sourceCompatibility back to build.gradle to allow idea to configure suitable language level automatically
[ https://issues.apache.org/jira/browse/KAFKA-18186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17903938#comment-17903938 ] 黃竣陽 commented on KAFKA-18186: - Hello, [~chia7712] , if you wont work on this, may I take the issue? Thank you. > add sourceCompatibility back to build.gradle to allow idea to configure > suitable language level automatically > - > > Key: KAFKA-18186 > URL: https://issues.apache.org/jira/browse/KAFKA-18186 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > see my comment https://github.com/apache/kafka/pull/17522 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-18186) add sourceCompatibility back to build.gradle to allow idea to configure suitable language level automatically
Chia-Ping Tsai created KAFKA-18186: -- Summary: add sourceCompatibility back to build.gradle to allow idea to configure suitable language level automatically Key: KAFKA-18186 URL: https://issues.apache.org/jira/browse/KAFKA-18186 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai see my comment https://github.com/apache/kafka/pull/17522 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17811: Separate modules to use different JDKs [kafka]
ijuma commented on PR #17522: URL: https://github.com/apache/kafka/pull/17522#issuecomment-2526306797 I see some confusion here and I'll try to clarify it. > Are you using JDK 11 to run the streams tests? If so, that could be an issue since the generator module requires JDK 17. I assume all Kafka developers should use JDK 17, as we typically build the entire project during development. We should make sure it's possible to run tests with Java 11 for the modules that support it since it's possible to have issues at runtime that only affect a particular version. > We do indeed need to compile the project with 17, but we should recommend that anyone working on the clients (or any other modules that still support 11) at least set their IDE's language version to JDK11 to avoid accidentally using APIs that don't exist back in 11 (looking at you Optional#isPresent) The expected behavior is that the IDE & gradle plugin support the --release flag which has the correct behavior (it sets the appropriate source and binary versions and ensures the standard library signatures also match it). > I plan to add sourceCompatibility back to build.gradle since IntelliJ IDEA sets the language level based on sourceCompatibility by default ([Gradle Documentation](https://docs.gradle.org/current/dsl/org.gradle.plugins.ide.idea.model.IdeaModule.html#org.gradle.plugins.ide.idea.model.IdeaModule:languageLevel)). Are we sure the Gradle plugin doesn't handle this properly? If so, we should add the workaround, but also file a ticket with them. See the following for a similar situation for the Scala: https://github.com/apache/kafka/pull/13205#issuecomment-1445120790 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18129: Simplifying share partition maybeInitialize code [kafka]
chia7712 commented on code in PR #18093: URL: https://github.com/apache/kafka/pull/18093#discussion_r1875006046 ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -371,52 +370,35 @@ public static RecordState forId(byte id) { */ public CompletableFuture maybeInitialize() { log.debug("Maybe initialize share partition: {}-{}", groupId, topicIdPartition); -CompletableFuture future = new CompletableFuture<>(); -AtomicReference> futureException = new AtomicReference<>(Optional.empty()); // Check if the share partition is already initialized. -InitializationResult initializationResult = checkInitializationCompletion(); -if (initializationResult.isComplete()) { -if (initializationResult.throwable() != null) { -future.completeExceptionally(initializationResult.throwable()); -} else { -future.complete(null); +try { +if (initializedOrThrowException()) { +return CompletableFuture.completedFuture(null); } -return future; +} catch (Exception e) { +return CompletableFuture.failedFuture(e); } +// If code reaches here then the share partition is not initialized. Initialize the share partition. // All the pending requests should wait to get completed before the share partition is initialized. // Attain lock to avoid any concurrent requests to be processed. lock.writeLock().lock(); -boolean shouldFutureBeCompleted = false; try { // Re-check the state to verify if previous requests has already initialized the share partition. -initializationResult = checkInitializationCompletion(); -if (initializationResult.isComplete()) { -if (initializationResult.throwable() != null) { - futureException.set(Optional.of(initializationResult.throwable())); -} -shouldFutureBeCompleted = true; -return future; +if (initializedOrThrowException()) { +return CompletableFuture.completedFuture(null); Review Comment: > now it was a return statement. Yes, you are right. I did not take account of the "completed" CompletableFuture because I was focused on the relationship between CompletableFuture and the write lock :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17811: Separate modules to use different JDKs [kafka]
chia7712 commented on PR #17522: URL: https://github.com/apache/kafka/pull/17522#issuecomment-2526274179 @ableegoldman Thanks for your feedback. I agree that we should prioritize making our developers' experience better. I plan to add sourceCompatibility back to build.gradle since IntelliJ IDEA sets the language level based on sourceCompatibility by default ([Gradle Documentation](https://docs.gradle.org/current/dsl/org.gradle.plugins.ide.idea.model.IdeaModule.html#org.gradle.plugins.ide.idea.model.IdeaModule:languageLevel)). The screenshot below demonstrates that IntelliJ IDEA can automatically highlight unsupported usages. With this change, developers won't need to manually configure the language level.  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-18101) Merge duplicate assertFutureThrows and assertFutureExceptionTypeEquals
[ https://issues.apache.org/jira/browse/KAFKA-18101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter Lee resolved KAFKA-18101. --- Resolution: Done > Merge duplicate assertFutureThrows and assertFutureExceptionTypeEquals > -- > > Key: KAFKA-18101 > URL: https://issues.apache.org/jira/browse/KAFKA-18101 > Project: Kafka > Issue Type: Improvement > Components: unit tests >Reporter: Deng Ziming >Assignee: Peter Lee >Priority: Minor > Labels: new-bie > > We have 2 similar method for validate test exception: > org.apache.kafka.test.TestUtils.assertFutureThrows > kafka.utils.TestUtils.assertFutureExceptionTypeEquals > Lets merge them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-13560) Load indexes and data in async manner in the critical path of replica fetcher threads.
[ https://issues.apache.org/jira/browse/KAFKA-13560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter Lee reassigned KAFKA-13560: - Assignee: Peter Lee > Load indexes and data in async manner in the critical path of replica fetcher > threads. > --- > > Key: KAFKA-13560 > URL: https://issues.apache.org/jira/browse/KAFKA-13560 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Satish Duggana >Assignee: Peter Lee >Priority: Major > Fix For: 4.0.0 > > > https://github.com/apache/kafka/pull/11390#discussion_r762366976 > https://github.com/apache/kafka/pull/11390#discussion_r1033141283 > https://github.com/apache/kafka/pull/15690 removed the below method from in > `TierStateMachine` interface. This can be added back when we implement the > functionality required to address this issue. > {code:java} > public Optional maybeAdvanceState(TopicPartition > topicPartition, PartitionFetchState currentFetchState) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18129: Simplifying share partition maybeInitialize code [kafka]
chia7712 merged PR #18093: URL: https://github.com/apache/kafka/pull/18093 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18135: ShareConsumer HB UnsupportedVersion msg mixed with Consumer HB [kafka]
lianetm commented on code in PR #18101: URL: https://github.com/apache/kafka/pull/18101#discussion_r1875074269 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java: ## @@ -97,6 +99,18 @@ public boolean handleSpecificError(final ConsumerGroupHeartbeatResponse response boolean errorHandled; switch (error) { +case UNSUPPORTED_VERSION: +// Handle consumer-specific unsupported version error +String message = CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG; +if (errorMessage.contains(REGEX_RESOLUTION_NOT_SUPPORTED_MSG)) { +// If the error is about regex subscription, use the original error message +message = errorMessage; +} Review Comment: I would expect we don't need this here. This regex error is generated on the client when building the request (it generates a response with exception) so the handling lands on the `onFailure` path (not on the onResponse path that uses this `handleSpeficifError`). I would expect we just need to have here the same handling we had for the UnsupportedVersion in the parent class before this PR: https://github.com/apache/kafka/blob/ee4264439ddda7bdebcaa845752b824abba14161/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java#L411-L412 That being said, we do need to ensure that we handle responses for each consumer, to cover this bit that is now on the abstract mgr (so it could wrongly apply a consumer msg to the share consumer): https://github.com/apache/kafka/blob/ee4264439ddda7bdebcaa845752b824abba14161/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java#L322-L325 Both consumers need to ensure they propagate their specific msgs when that exception is in a response. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java: ## @@ -408,8 +411,10 @@ private void onErrorResponse(final R response, final long currentTimeMs) { // Broker responded with HB not supported, meaning the new protocol is not enabled, so propagate // custom message for it. Note that the case where the protocol is not supported at all should fail // on the client side when building the request and checking supporting APIs (handled on onFailure). -logger.error("{} failed due to {}: {}", heartbeatRequestName(), error, errorMessage); - handleFatalFailure(error.exception(CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG)); +if (!handleSpecificError(response, currentTimeMs)) { +logger.error("{} failed due to {}: {}", heartbeatRequestName(), error, errorMessage); +handleFatalFailure(error.exception(errorMessage)); +} Review Comment: I wonder if it would be simpler to remove the UnsupportedVersion handling from here completely? In the end we're duplicating the logic we already have in the `default` case further down right? We would only need the logic in the Share/Consumer managers, simply because even if the error is not specific the handling is. Thoughts? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java: ## @@ -100,6 +100,9 @@ public abstract class AbstractHeartbeatRequestManager
Re: [PR] KAFKA-18015: Add support for duration based offset reset strategy to Kafka Streams [kafka]
mjsax commented on code in PR #17973: URL: https://github.com/apache/kafka/pull/17973#discussion_r1875208289 ## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { Review Comment: The newly added class for the consumer is internal, and not user facing, and thus, it should not be relevant that it exist (the current public enum get deprecated with this KIP, and the PR you linked to). I would assume, that SharConsumer won't have a public class either, similar to the consumer? \cc @AndrewJSchofield -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17696 New consumer background operations unaware of metadata errors [kafka]
lianetm commented on code in PR #17440: URL: https://github.com/apache/kafka/pull/17440#discussion_r1875198457 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -325,4 +333,21 @@ void cleanup() { log.debug("Closed the consumer network thread"); } } + +/** + * If there is a metadata error, complete all uncompleted events that require subscription metadata. + */ +private void maybeFailOnMetadataError(List> events) { +List> subscriptionMetadataEvent = events.stream() +.filter(e -> e instanceof CompletableApplicationEvent) +.map(e -> (CompletableApplicationEvent) e) + .filter(CompletableApplicationEvent::requireSubscriptionMetadata) +.collect(Collectors.toUnmodifiableList()); Review Comment: nit: `.toList();`? (I believe it's unmodifiable too) ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -150,7 +155,11 @@ private void maybePropagateMetadataError() { try { metadata.maybeThrowAnyException(); Review Comment: ok, we're saying it gets cleared as soon as it's propagated (to ensure that it's indeed propagated), but we could still get an exception from a previously sent request (this was my concern). But that's what we have with the classic consumer actually, so it's consistent (there are actually tests specifically covering the behaviour considering that) https://github.com/apache/kafka/blob/ee4264439ddda7bdebcaa845752b824abba14161/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala#L553-L554 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java: ## @@ -212,9 +213,26 @@ public void testPropagateMetadataError() { AuthenticationException authException = new AuthenticationException("Test Auth Exception"); doThrow(authException).when(metadata).maybeThrowAnyException(); +NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(false); +assertTrue(networkClientDelegate.getAndClearMetadataError().isEmpty()); +networkClientDelegate.poll(0, time.milliseconds()); + +networkClientDelegate.getAndClearMetadataError().ifPresent( Review Comment: if the metadata error is not present this won't fail right, so should we check it's present? (and then check the value) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17696 New consumer background operations unaware of metadata errors [kafka]
lianetm commented on code in PR #17440: URL: https://github.com/apache/kafka/pull/17440#discussion_r1875210056 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -150,7 +155,11 @@ private void maybePropagateMetadataError() { try { metadata.maybeThrowAnyException(); Review Comment: ok, we're saying it gets cleared as soon as it's propagated (to ensure that it's indeed propagated), but we could still get an exception from a previously sent request (this was my concern). Anyways that's actually what we have with the classic consumer, so it's consistent (there are actually tests specifically covering the behaviour considering that). https://github.com/apache/kafka/blob/ee4264439ddda7bdebcaa845752b824abba14161/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala#L553-L554 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17696 New consumer background operations unaware of metadata errors [kafka]
lianetm commented on code in PR #17440: URL: https://github.com/apache/kafka/pull/17440#discussion_r1875210056 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -150,7 +155,11 @@ private void maybePropagateMetadataError() { try { metadata.maybeThrowAnyException(); Review Comment: ok, we're saying it gets cleared as soon as it's propagated (to ensure that it's indeed propagated), and we could still get an exception from a previously sent request (this was my concern). Anyways that's actually what we have with the classic consumer, so it's consistent (there are actually tests specifically covering the behaviour considering that). https://github.com/apache/kafka/blob/ee4264439ddda7bdebcaa845752b824abba14161/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala#L553-L554 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18015: Add support for duration based offset reset strategy to Kafka Streams [kafka]
mjsax commented on code in PR #17973: URL: https://github.com/apache/kafka/pull/17973#discussion_r1875208289 ## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { Review Comment: The newly added class for the consumer is internal, and not user facing, and thus, it should not be relevant that it exist (the current public enum get deprecated with this KIP, and the PR you linked to). I would assume, that SharConsumer won't have a public class either, similar to the consumer? \cc @AndrewJSchofield Given that the reset strategy is exactly the same between a plain consumer and Kafka Streams, it would seems odd to me o name it `StreamsXxx`... Also, that the class is part of KS is clear from the package name. In general, discussion like this should be part of the KIP, and we should not change accepted KIPs w/o a strong reason. If you want, feel free to go back to the dev mailing list thread about the KIP and re-start the discussion there. It should be be part of the PR review to re-discuss a KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17696 New consumer background operations unaware of metadata errors [kafka]
lianetm commented on code in PR #17440: URL: https://github.com/apache/kafka/pull/17440#discussion_r1875210056 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -150,7 +155,11 @@ private void maybePropagateMetadataError() { try { metadata.maybeThrowAnyException(); Review Comment: ok, we're saying it gets cleared as soon as it's propagated (to ensure that it's indeed propagated), and we could still get an exception from a previously sent request (this was my concern). Anyways that's actually what we have with the classic consumer, so it's consistent (there are actually tests specifically covering the behaviour considering that). https://github.com/apache/kafka/blob/ee4264439ddda7bdebcaa845752b824abba14161/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala#L553-L554 Ok with me, I wonder if there something we could improve there for both but no need to block this as it's the current behaviour. I'll file a jira if I come up with clear thoughts about it. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-18184:Remove the unnecessary project path check from build.gradle [kafka]
Rancho-7 opened a new pull request, #18102: URL: https://github.com/apache/kafka/pull/18102 jira:https://issues.apache.org/jira/browse/KAFKA-18184 In the build.gradle file, the project path is not initialized, so the project.path should always refer to the root path ':' The condition: `if (!project.path.startsWith(":connect") && !project.path.startsWith(":storage"))` will always evaluate to true, so this check is unnecessary and can be removed. If we print the parameter `project.path` ``` println "Project path is '${project.path}'" if (!project.path.startsWith(":connect") && !project.path.startsWith(":storage")) options.compilerArgs << "-Xlint:-rawtypes" ``` we can see outputs like the following: > Starting Gradle Daemon... > Connected to the target VM, address: '127.0.0.1:55024', transport: 'socket' > Gradle Daemon started in 1 s 333 ms > > Configure project : > Starting build with version 4.0.0-SNAPSHOT (commit id 104fa579) using Gradle 8.10.2, Java 23 and Scala 2.13.15 > Build properties: ignoreFailures=false, maxParallelForks=10, maxScalacThreads=8, maxTestRetries=0 > Project path is ':' ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17696 New consumer background operations unaware of metadata errors [kafka]
lianetm commented on code in PR #17440: URL: https://github.com/apache/kafka/pull/17440#discussion_r1875232483 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java: ## @@ -212,9 +213,27 @@ public void testPropagateMetadataError() { AuthenticationException authException = new AuthenticationException("Test Auth Exception"); doThrow(authException).when(metadata).maybeThrowAnyException(); +NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(false); +assertTrue(networkClientDelegate.getAndClearMetadataError().isEmpty()); +networkClientDelegate.poll(0, time.milliseconds()); + + assertTrue(networkClientDelegate.getAndClearMetadataError().isPresent()); Review Comment: this will clear the error so the assertions within the ifPresent will never run right? (maybe we take the Optional result of .getAndClear, check that optional is present, and then the 2 asserts on the value?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17696 New consumer background operations unaware of metadata errors [kafka]
m1a2st commented on code in PR #17440: URL: https://github.com/apache/kafka/pull/17440#discussion_r1875248156 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java: ## @@ -212,9 +213,27 @@ public void testPropagateMetadataError() { AuthenticationException authException = new AuthenticationException("Test Auth Exception"); doThrow(authException).when(metadata).maybeThrowAnyException(); +NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(false); +assertTrue(networkClientDelegate.getAndClearMetadataError().isEmpty()); +networkClientDelegate.poll(0, time.milliseconds()); + + assertTrue(networkClientDelegate.getAndClearMetadataError().isPresent()); Review Comment: You are right, I missed that we will clear the metadata error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18015: Add support for duration based offset reset strategy to Kafka Streams [kafka]
mjsax commented on code in PR #17973: URL: https://github.com/apache/kafka/pull/17973#discussion_r1875208289 ## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { Review Comment: The newly added class for the consumer is internal, and not user facing, and thus, it should not be relevant that it exist (the current public enum get deprecated with this KIP, and the PR you linked to). I would assume, that SharConsumer won't have a public class either, similar to the consumer? \cc @AndrewJSchofield Given that the reset strategy is exactly the same between a plain consumer and Kafka Streams, it would seems odd to me o name it `StreamsXxx`... Also, that the class is part of KS is clear from the package name. In general, discussion like this should be part of the KIP, and we should not change accepted KIPs w/o a strong reason. If you want, feel free to go back to the dev mailing list thread about the KIP and re-start the discussion there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18015: Add support for duration based offset reset strategy to Kafka Streams [kafka]
mjsax commented on code in PR #17973: URL: https://github.com/apache/kafka/pull/17973#discussion_r1875212818 ## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { + +private enum OffsetResetType { Review Comment: Maybe better `OffsetResetStrategy` ? ## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { + +private enum OffsetResetType { +LATEST, +EARLIEST, +BY_DURATION +} + +private final OffsetResetType type; +private final Optional duration; + +private AutoOffsetReset(OffsetResetType type, Optional duration) { Review Comment: ```suggestion private AutoOffsetReset(final OffsetResetType type, final Optional duration) { ``` ## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { + +private enum OffsetResetType { +LATEST, +EARLIEST, +BY_DURATION +} + +private final OffsetResetType type; +private final Optional duration; + +private AutoOffsetReset(OffsetResetType type, Optional duration) { +this.type = type; +this.duration = duration; +} + +/** + * Creates an AutoOffsetReset instance representing "latest". Review Comment: ```suggestion * Creates an {@code AutoOffsetReset} instance representing "latest". ``` ##
[jira] [Updated] (KAFKA-12843) KIP-740 follow up: clean up TaskMetadata
[ https://issues.apache.org/jira/browse/KAFKA-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-12843: Parent: KAFKA-12822 Issue Type: Sub-task (was: Task) > KIP-740 follow up: clean up TaskMetadata > > > Key: KAFKA-12843 > URL: https://issues.apache.org/jira/browse/KAFKA-12843 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: João Pedro Fonseca >Priority: Blocker > Fix For: 4.0.0 > > > See > [KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557] > – for the TaskMetadata class, we need to: > # Deprecate the TaskMetadata#getTaskId method > # "Remove" the deprecated TaskMetadata#taskId method, then re-add a taskId() > API that returns a TaskId instead of a String > # Remove the deprecated constructor -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16339: [4/4 KStream#flatTransformValues] Remove Deprecated "transformer" methods and classes [kafka]
mjsax commented on PR #17882: URL: https://github.com/apache/kafka/pull/17882#issuecomment-2526845764 Thanks for the PR. Merged to `trunk`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18014: Add duration based offset reset option for ShareConsumer [kafka]
peterxcli commented on code in PR #18096: URL: https://github.com/apache/kafka/pull/18096#discussion_r1875313096 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java: ## @@ -53,8 +50,14 @@ public final class GroupConfig extends AbstractConfig { public static final String SHARE_RECORD_LOCK_DURATION_MS_CONFIG = "share.record.lock.duration.ms"; public static final String SHARE_AUTO_OFFSET_RESET_CONFIG = "share.auto.offset.reset"; -public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT = ShareGroupAutoOffsetReset.LATEST.toString(); -public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to initialize the share-partition start offset."; +public static final String SHARE_AUTO_OFFSET_RESET_DEFAULT = ShareGroupAutoOffsetResetStrategy.LATEST.name(); +public static final String SHARE_AUTO_OFFSET_RESET_DOC = "The strategy to initialize the share-partition start offset. " + +"earliest: automatically reset the offset to the earliest offset" + +"latest: automatically reset the offset to the latest offset" + +"by_duration:: automatically reset the offset to a configured from the current timestamp. " + +" must be specified in ISO8601 format (PnDTnHnMn.nS). " + +"Negative duration is not allowed." + +"anything else: throw exception to the share consumer."; Review Comment: I think we should align with the documentation for consistency between this and `ConsumerConfig`: https://github.com/apache/kafka/blob/b9745b160cf7b2bc2a02b11c75fa86d9e9eaf5b4/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L172-L181 However, I’m uncertain about whether we should include the following note at the end: ``` Note that altering partition numbers while setting this config to 'latest' may cause message delivery loss since producers could start sending messages to newly added partitions (i.e., no initial offsets exist yet) before consumers reset their offsets." ``` What are your thoughts on this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12843) KIP-740 follow up: clean up TaskMetadata
[ https://issues.apache.org/jira/browse/KAFKA-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-12843. - Resolution: Duplicate (was: Fixed) > KIP-740 follow up: clean up TaskMetadata > > > Key: KAFKA-12843 > URL: https://issues.apache.org/jira/browse/KAFKA-12843 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: João Pedro Fonseca >Priority: Blocker > Fix For: 4.0.0 > > > See > [KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557] > – for the TaskMetadata class, we need to: > # Deprecate the TaskMetadata#getTaskId method > # "Remove" the deprecated TaskMetadata#taskId method, then re-add a taskId() > API that returns a TaskId instead of a String > # Remove the deprecated constructor -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16339: [4/4 KStream#flatTransformValues] Remove Deprecated "transformer" methods and classes [kafka]
mjsax merged PR #17882: URL: https://github.com/apache/kafka/pull/17882 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12843) KIP-740 follow up: clean up TaskMetadata
[ https://issues.apache.org/jira/browse/KAFKA-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17904014#comment-17904014 ] Matthias J. Sax commented on KAFKA-12843: - KIP-740 overlaps with KIP-744 which deprecate the old `procescor.TaskMetadata` class entirely (and it was already removed via KAFKA-16329) > KIP-740 follow up: clean up TaskMetadata > > > Key: KAFKA-12843 > URL: https://issues.apache.org/jira/browse/KAFKA-12843 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: João Pedro Fonseca >Priority: Blocker > Fix For: 4.0.0 > > > See > [KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557] > – for the TaskMetadata class, we need to: > # Deprecate the TaskMetadata#getTaskId method > # "Remove" the deprecated TaskMetadata#taskId method, then re-add a taskId() > API that returns a TaskId instead of a String > # Remove the deprecated constructor -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-12829: Remove deprecated Topology#addProcessor of old Processor API [kafka]
mjsax commented on PR #17190: URL: https://github.com/apache/kafka/pull/17190#issuecomment-2526857196 Thanks for your understanding. And thanks for contributing! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: remove old procesor API MockInternalProcessorContext [kafka]
mjsax opened a new pull request, #18103: URL: https://github.com/apache/kafka/pull/18103 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: remove old procesor API MockInternalProcessorContext [kafka]
mjsax commented on code in PR #18103: URL: https://github.com/apache/kafka/pull/18103#discussion_r1875348859 ## streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java: ## @@ -79,17 +80,22 @@ static void withStore(final RocksDBStore store, final StateStoreContext context, @ParameterizedTest @MethodSource("stores") public void shouldRecordCorrectBlockCacheCapacity(final RocksDBStore store, final StateStoreContext ctx) { -withStore(store, ctx, () -> Review Comment: side cleanup ## streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java: ## @@ -79,17 +80,22 @@ static void withStore(final RocksDBStore store, final StateStoreContext context, @ParameterizedTest @MethodSource("stores") public void shouldRecordCorrectBlockCacheCapacity(final RocksDBStore store, final StateStoreContext ctx) { -withStore(store, ctx, () -> -assertMetric(ctx, STATE_STORE_LEVEL_GROUP, RocksDBMetrics.CAPACITY_OF_BLOCK_CACHE, BigInteger.valueOf(50 * 1024 * 1024L))); +withStore( +store, +ctx, +() -> assertMetric(ctx, STATE_STORE_LEVEL_GROUP, RocksDBMetrics.CAPACITY_OF_BLOCK_CACHE, BigInteger.valueOf(50 * 1024 * 1024L)) +); } @ParameterizedTest @MethodSource("stores") public void shouldRecordCorrectBlockCacheUsage(final RocksDBStore store, final StateStoreContext ctx) { withStore(store, ctx, () -> { final BlockBasedTableConfigWithAccessibleCache tableFormatConfig = (BlockBasedTableConfigWithAccessibleCache) store.getOptions().tableFormatConfig(); -final long usage = tableFormatConfig.blockCache().getUsage(); -assertMetric(ctx, STATE_STORE_LEVEL_GROUP, RocksDBMetrics.USAGE_OF_BLOCK_CACHE, BigInteger.valueOf(usage)); +try (final Cache blockCache = tableFormatConfig.blockCache()) { +final long usage = blockCache.getUsage(); +assertMetric(ctx, STATE_STORE_LEVEL_GROUP, RocksDBMetrics.USAGE_OF_BLOCK_CACHE, BigInteger.valueOf(usage)); +} Review Comment: side cleanup -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: remove old procesor API MockInternalProcessorContext [kafka]
mjsax commented on code in PR #18103: URL: https://github.com/apache/kafka/pull/18103#discussion_r1875350982 ## streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java: ## @@ -108,12 +112,12 @@ public void setHeaders(final Headers headers) { } @Override -public void setCurrentNode(final ProcessorNode currentNode) { +public void setCurrentNode(final ProcessorNode currentNode) { Review Comment: side cleanup -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: remove old procesor API MockInternalProcessorContext [kafka]
mjsax commented on code in PR #18103: URL: https://github.com/apache/kafka/pull/18103#discussion_r1875348688 ## streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java: ## @@ -1049,11 +1048,12 @@ private static void putRecord(final TimeOrderedKeyValueBuffer(key, new Change<>(value, null), 0L), recordContext); } +@SuppressWarnings("resource") Review Comment: Side cleanup -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: remove old procesor API MockInternalProcessorContext [kafka]
mjsax commented on code in PR #18103: URL: https://github.com/apache/kafka/pull/18103#discussion_r1875350717 ## streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java: ## @@ -41,12 +41,16 @@ import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener; import java.io.File; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.Objects; import java.util.Properties; public class MockInternalNewProcessorContext extends MockProcessorContext implements InternalProcessorContext { -private ProcessorNode currentNode; +private ProcessorNode currentNode; +private RecordCollector recordCollector; Review Comment: Add missing `recordCollector` "feature -- the old `MockInternalProcessorContext` that we remove with this PR had this, and the test we rewrite use this feature ## streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java: ## @@ -41,12 +41,16 @@ import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener; import java.io.File; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.Objects; import java.util.Properties; public class MockInternalNewProcessorContext extends MockProcessorContext implements InternalProcessorContext { -private ProcessorNode currentNode; +private ProcessorNode currentNode; +private RecordCollector recordCollector; +private final Map restoreCallbacks = new LinkedHashMap<>(); Review Comment: Same -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18164: Clear existing acknowledgements on share session epoch reset. [kafka]
ShivsundarR commented on code in PR #18063: URL: https://github.com/apache/kafka/pull/18063#discussion_r1875353133 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ## @@ -990,16 +990,18 @@ UnsentRequest buildRequest() { } ShareAcknowledgeRequest.Builder requestBuilder = sessionHandler.newShareAcknowledgeBuilder(groupId, fetchConfig); Review Comment: Yeah, right. The handler does not clear the acknowledgements which can be a problem when future requests are built. I have added code to clear the `nextPartitions` and `nextAcknowledgements` now and added a test for the same. Thanks for catching this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: remove old procesor API MockInternalProcessorContext [kafka]
mjsax commented on code in PR #18103: URL: https://github.com/apache/kafka/pull/18103#discussion_r1875349000 ## streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java: ## @@ -98,11 +104,14 @@ public void shouldRecordCorrectBlockCacheUsage(final RocksDBStore store, final S public void shouldRecordCorrectBlockCachePinnedUsage(final RocksDBStore store, final StateStoreContext ctx) { withStore(store, ctx, () -> { final BlockBasedTableConfigWithAccessibleCache tableFormatConfig = (BlockBasedTableConfigWithAccessibleCache) store.getOptions().tableFormatConfig(); -final long usage = tableFormatConfig.blockCache().getPinnedUsage(); -assertMetric(ctx, STATE_STORE_LEVEL_GROUP, RocksDBMetrics.PINNED_USAGE_OF_BLOCK_CACHE, BigInteger.valueOf(usage)); +try (final Cache blockCache = tableFormatConfig.blockCache()) { +final long usage = blockCache.getPinnedUsage(); +assertMetric(ctx, STATE_STORE_LEVEL_GROUP, RocksDBMetrics.PINNED_USAGE_OF_BLOCK_CACHE, BigInteger.valueOf(usage)); +} Review Comment: side cleanup ## streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java: ## @@ -98,11 +104,14 @@ public void shouldRecordCorrectBlockCacheUsage(final RocksDBStore store, final S public void shouldRecordCorrectBlockCachePinnedUsage(final RocksDBStore store, final StateStoreContext ctx) { withStore(store, ctx, () -> { final BlockBasedTableConfigWithAccessibleCache tableFormatConfig = (BlockBasedTableConfigWithAccessibleCache) store.getOptions().tableFormatConfig(); -final long usage = tableFormatConfig.blockCache().getPinnedUsage(); -assertMetric(ctx, STATE_STORE_LEVEL_GROUP, RocksDBMetrics.PINNED_USAGE_OF_BLOCK_CACHE, BigInteger.valueOf(usage)); +try (final Cache blockCache = tableFormatConfig.blockCache()) { +final long usage = blockCache.getPinnedUsage(); +assertMetric(ctx, STATE_STORE_LEVEL_GROUP, RocksDBMetrics.PINNED_USAGE_OF_BLOCK_CACHE, BigInteger.valueOf(usage)); +} }); } +@SuppressWarnings("resource") Review Comment: side cleanup -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-18187) Replicas that receive EndQuorum should grant preVotes in that epoch
[ https://issues.apache.org/jira/browse/KAFKA-18187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17904027#comment-17904027 ] Peter Lee commented on KAFKA-18187: --- Hi [~alyssahuang], does this depend on [https://github.com/apache/kafka/pull/17807] or [https://github.com/apache/kafka/pull/18041] ? > Replicas that receive EndQuorum should grant preVotes in that epoch > --- > > Key: KAFKA-18187 > URL: https://issues.apache.org/jira/browse/KAFKA-18187 > Project: Kafka > Issue Type: Improvement >Affects Versions: 4.0.0 >Reporter: Alyssa Huang >Assignee: Peter Lee >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-18187) Replicas that receive EndQuorum should grant preVotes in that epoch
Alyssa Huang created KAFKA-18187: Summary: Replicas that receive EndQuorum should grant preVotes in that epoch Key: KAFKA-18187 URL: https://issues.apache.org/jira/browse/KAFKA-18187 Project: Kafka Issue Type: Improvement Affects Versions: 4.0.0 Reporter: Alyssa Huang -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18134: Disallow group upgrades when custom assignors are used [kafka]
squah-confluent commented on code in PR #18046: URL: https://github.com/apache/kafka/pull/18046#discussion_r1875402835 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -10077,6 +10082,116 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) assertEquals(group, context.groupMetadataManager.getOrMaybeCreateClassicGroup("group-id", false)); } +/** + * Supplies the {@link Arguments} to {@link #testConsumerGroupHeartbeatWithCustomAssignorClassicGroup(ByteBuffer, boolean)}. + */ +private static Stream testConsumerGroupHeartbeatWithCustomAssignorClassicGroupSource() { +return Stream.of( +Arguments.of(null, true), +Arguments.of(ByteBuffer.allocate(0), true), +Arguments.of(ByteBuffer.allocate(1), false) +); +} + +@ParameterizedTest + @MethodSource("testConsumerGroupHeartbeatWithCustomAssignorClassicGroupSource") +public void testConsumerGroupHeartbeatWithCustomAssignorClassicGroup(ByteBuffer userData, boolean expectUpgrade) { +String groupId = "group-id"; +String memberId1 = "member-id-1"; +String memberId2 = "member-id-2"; +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; +Uuid barTopicId = Uuid.randomUuid(); +String barTopicName = "bar"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); +assignor.prepareGroupAssignment(new GroupAssignment(Map.of( +memberId1, new MemberAssignmentImpl(mkAssignment( +mkTopicAssignment(fooTopicId, 0) +)), +memberId2, new MemberAssignmentImpl(mkAssignment( +mkTopicAssignment(barTopicId, 0) +)) +))); + +MetadataImage metadataImage = new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 1) +.addTopic(barTopicId, barTopicName, 1) +.addRacks() +.build(); + +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.UPGRADE.toString()) + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) +.withMetadataImage(metadataImage) +.build(); + +JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1); +protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription( +List.of(fooTopicName, barTopicName), +null, +List.of( +new TopicPartition(fooTopicName, 0), +new TopicPartition(barTopicName, 0) +) + +); + +Map assignments = Map.of( +memberId1, +Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(List.of( +new TopicPartition(fooTopicName, 0), +new TopicPartition(barTopicName, 0) +), userData))) +); + +// Create a stable classic group with member 1. +ClassicGroup group = context.createClassicGroup(groupId); +group.setProtocolName(Optional.of("range")); +group.add( +new ClassicGroupMember( +memberId1, +Optional.empty(), +"client-id", +"client-host", +1, +5000, +"consumer", +protocols, +assignments.get(memberId1) +) +); + +group.transitionTo(PREPARING_REBALANCE); +group.transitionTo(COMPLETING_REBALANCE); +group.transitionTo(STABLE); + + context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, assignments, metadataImage.features().metadataVersion())); +context.commit(); +group = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false); + +// A new member 2 with new protocol joins the classic group, triggering the upgrade. +ConsumerGroupHeartbeatRequestData consumerGroupHeartbeatRequestData = +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId2) +.setRebalanceTimeoutMs(5000) +.setServerAssignor("range") +.setSubscribedTopicNames(List.of(fooTopicName, barTopicName)) +.setTopicPartitions(Collections.emptyList()); + +if (expectUpgrade) { +context.consumerGroupHeartbeat(co
[jira] [Assigned] (KAFKA-18187) Replicas that receive EndQuorum should grant preVotes in that epoch
[ https://issues.apache.org/jira/browse/KAFKA-18187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter Lee reassigned KAFKA-18187: - Assignee: Peter Lee > Replicas that receive EndQuorum should grant preVotes in that epoch > --- > > Key: KAFKA-18187 > URL: https://issues.apache.org/jira/browse/KAFKA-18187 > Project: Kafka > Issue Type: Improvement >Affects Versions: 4.0.0 >Reporter: Alyssa Huang >Assignee: Peter Lee >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18134: Disallow group upgrades when custom assignors are used [kafka]
squah-confluent commented on code in PR #18046: URL: https://github.com/apache/kafka/pull/18046#discussion_r1875401373 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1053,6 +1055,12 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List
Re: [PR] KAFKA-18134: Disallow group upgrades when custom assignors are used [kafka]
squah-confluent commented on code in PR #18046: URL: https://github.com/apache/kafka/pull/18046#discussion_r1875401870 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1053,6 +1055,13 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List
Re: [PR] [WIP] KIP-891: Connect Multiversioning Support (Configs and Validation changes for Connectors and Converters) [kafka]
snehashisp commented on code in PR #17741: URL: https://github.com/apache/kafka/pull/17741#discussion_r1875404360 ## .gitignore: ## @@ -61,3 +61,6 @@ storage/kafka-tiered-storage/ docker/test/report_*.html kafka.Kafka __pycache__ +/connect/runtime/src/main/java/org/apache/kafka/connect/testing/ +/connect/file/ +/connect/json/ Review Comment: Yes, I was using/altering them for testing, hence added it here. We will need to remove it before merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18156: VerifiableConsumer should ignore "--session-timeout" when using CONSUMER protocol [kafka]
brandboat commented on code in PR #18036: URL: https://github.com/apache/kafka/pull/18036#discussion_r1874816095 ## tests/kafkatest/services/verifiable_consumer.py: ## @@ -417,10 +417,13 @@ def start_cmd(self, node): else: cmd += " --bootstrap-server %s" % self.kafka.bootstrap_servers(self.security_config.security_protocol) -cmd += " --reset-policy %s --group-id %s --topic %s --session-timeout %s" % \ - (self.reset_policy, self.group_id, self.topic, -self.session_timeout_sec*1000) - +cmd += " --reset-policy %s --group-id %s --topic %s" % \ +(self.reset_policy, self.group_id, self.topic) + +# session timeout is not supported when using CONSUMER group protocol +if self.session_timeout_sec > 0 and self.is_consumer_group_protocol_enabled(): Review Comment: Only `VerifiableConsumerTest` specifically set session timeout, but I think it's a bit redundant, so I refactor them out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18156: VerifiableConsumer should ignore "--session-timeout" when using CONSUMER protocol [kafka]
brandboat commented on code in PR #18036: URL: https://github.com/apache/kafka/pull/18036#discussion_r1874823531 ## tests/kafkatest/services/verifiable_consumer.py: ## @@ -251,8 +251,6 @@ def __init__(self, context, num_nodes, kafka, topic, group_id, self.session_timeout_sec = session_timeout_sec self.enable_autocommit = enable_autocommit self.assignment_strategy = assignment_strategy -self.group_protocol = group_protocol -self.group_remote_assignor = group_remote_assignor Review Comment: these 2 lines are duplicate code, see L245, L246 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-18014) Add duration based offset reset option for ShareConsumer
[ https://issues.apache.org/jira/browse/KAFKA-18014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17903916#comment-17903916 ] Peter Lee commented on KAFKA-18014: --- Hi [~omkreddy], PR is ready, PTAL. Thanks! > Add duration based offset reset option for ShareConsumer > > > Key: KAFKA-18014 > URL: https://issues.apache.org/jira/browse/KAFKA-18014 > Project: Kafka > Issue Type: Sub-task >Reporter: Manikumar >Assignee: Peter Lee >Priority: Major > Fix For: 4.1.0 > > > Kafka consumer supports auto.offset.reset config option, which is used when > there is no initial offset in Kafka (or) if the current offset does not exist > any more on the server. This config currently supports earliest/latest/none > options. Currently consumer resets might force applications to reprocess > large amounts of data from earlier offsets. With infinite storage, its > beneficial to have a duration based offset reset strategy. This will allow > applications to consume/initialise from a fixed duration when there is no > initial offset in Kafka. > As part of > [KIP-932|https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka], > we are adding support for share consumer groups. Share consumer groups > supports dynamic group configuration property share.auto.offset.reset. This > is used to set the initial Share-Partition Start Offset (SPSO) based on the > share.auto.offset.reset configuration. Currently share.auto.offset.reset > supports earliest and latest options to automatically reset the offset > Similar to the Kafka Consumer, we will add support for by_duration: config > value for {{{}share.auto.offset.reset{}}}. > {quote}from > [KIP-1106|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1106%3A+Add+duration+based+offset+reset+option+for+consumer+clients] > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-12843) KIP-740 follow up: clean up TaskMetadata
[ https://issues.apache.org/jira/browse/KAFKA-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] João Pedro Fonseca closed KAFKA-12843. -- > KIP-740 follow up: clean up TaskMetadata > > > Key: KAFKA-12843 > URL: https://issues.apache.org/jira/browse/KAFKA-12843 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: João Pedro Fonseca >Priority: Blocker > Fix For: 4.0.0 > > > See > [KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557] > – for the TaskMetadata class, we need to: > # Deprecate the TaskMetadata#getTaskId method > # "Remove" the deprecated TaskMetadata#taskId method, then re-add a taskId() > API that returns a TaskId instead of a String > # Remove the deprecated constructor -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [WIP] KAFKA-10409: Refactor Kafka Streams RocksDb iterators [kafka]
fonsdant opened a new pull request, #18099: URL: https://github.com/apache/kafka/pull/18099 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18015: Add support for duration based offset reset strategy to Kafka Streams [kafka]
peterxcli commented on code in PR #17973: URL: https://github.com/apache/kafka/pull/17973#discussion_r1874837232 ## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { Review Comment: How about renaming this to `StreamAutoOffsetReset`, as we now have `AutoOffsetReset` for consumer, share group, and stream? - https://github.com/apache/kafka/pull/17858 - https://github.com/apache/kafka/pull/18096 Additionally, I believe we should rename the `AutoOffsetReset` class added in https://github.com/apache/kafka/pull/17858 for better clarity. @omkreddy, your thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18014: Add duration based offset reset option for ShareConsumer [kafka]
peterxcli commented on code in PR #18096: URL: https://github.com/apache/kafka/pull/18096#discussion_r1874838353 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ShareGroupAutoOffsetResetStrategy.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.utils.Utils; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.Locale; +import java.util.Objects; +import java.util.Optional; + +public class ShareGroupAutoOffsetResetStrategy { Review Comment: As mention in https://github.com/apache/kafka/pull/17573#discussion_r1814871532, let each `AutoOffsetResetStrategy` evolve themselves -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18015: Add support for duration based offset reset strategy to Kafka Streams [kafka]
peterxcli commented on code in PR #17973: URL: https://github.com/apache/kafka/pull/17973#discussion_r1874837232 ## streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.time.Duration; +import java.util.Optional; + +/** + * Sets the {@code auto.offset.reset} configuration when + * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream} + * or {@link KTable} via {@link StreamsBuilder}. + */ +public class AutoOffsetReset { Review Comment: How about renaming this to `StreamAutoOffsetResetStrategy`, as we now have `AutoOffsetReset` for consumer, share group, and stream? - https://github.com/apache/kafka/pull/17858 - https://github.com/apache/kafka/pull/18096 Additionally, I believe we should rename the `AutoOffsetReset` class added in https://github.com/apache/kafka/pull/17858 for better clarity. @omkreddy, your thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18014: Add duration based offset reset option for ShareConsumer [kafka]
peterxcli commented on PR #18096: URL: https://github.com/apache/kafka/pull/18096#issuecomment-2526019079 Hi @omkreddy, PR is ready, PTAL. Thanks! Sorry for mentioning you both side, just to make sure you do receive that 😁~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18014: Add duration based offset reset option for ShareConsumer [kafka]
peterxcli commented on code in PR #18096: URL: https://github.com/apache/kafka/pull/18096#discussion_r1874838353 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ShareGroupAutoOffsetResetStrategy.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.utils.Utils; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.Locale; +import java.util.Objects; +import java.util.Optional; + +public class ShareGroupAutoOffsetResetStrategy { Review Comment: As mentioned in https://github.com/apache/kafka/pull/17573#discussion_r1814871532, let each `AutoOffsetResetStrategy` evolve themselves -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12843) KIP-740 follow up: clean up TaskMetadata
[ https://issues.apache.org/jira/browse/KAFKA-12843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] João Pedro Fonseca resolved KAFKA-12843. Resolution: Fixed > KIP-740 follow up: clean up TaskMetadata > > > Key: KAFKA-12843 > URL: https://issues.apache.org/jira/browse/KAFKA-12843 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: João Pedro Fonseca >Priority: Blocker > Fix For: 4.0.0 > > > See > [KIP-740|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557] > – for the TaskMetadata class, we need to: > # Deprecate the TaskMetadata#getTaskId method > # "Remove" the deprecated TaskMetadata#taskId method, then re-add a taskId() > API that returns a TaskId instead of a String > # Remove the deprecated constructor -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18156: VerifiableConsumer should ignore "--session-timeout" when using CONSUMER protocol [kafka]
chia7712 commented on code in PR #18036: URL: https://github.com/apache/kafka/pull/18036#discussion_r1874846202 ## tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java: ## @@ -649,8 +648,12 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[] if (groupRemoteAssignor != null) consumerProps.put(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG, groupRemoteAssignor); } else { -// This means we're using the old consumer group protocol. +// This means we're using the CLASSIC consumer group protocol. consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, res.getString("assignmentStrategy")); +Integer sessionTimeout = res.getInt("sessionTimeout"); Review Comment: yes, we should honor users' configs to let him encounter the error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18164: Clear existing acknowledgements on share session epoch reset. [kafka]
ShivsundarR commented on code in PR #18063: URL: https://github.com/apache/kafka/pull/18063#discussion_r1874846243 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ## @@ -990,16 +990,17 @@ UnsentRequest buildRequest() { } ShareAcknowledgeRequest.Builder requestBuilder = sessionHandler.newShareAcknowledgeBuilder(groupId, fetchConfig); -Node nodeToSend = metadata.fetch().nodeById(nodeId); log.trace("Building acknowledgements to send : {}", finalAcknowledgementsToSend); Review Comment: Yeah makes sense, thanks. I have moved it now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18156: VerifiableConsumer should ignore "--session-timeout" when using CONSUMER protocol [kafka]
chia7712 commented on code in PR #18036: URL: https://github.com/apache/kafka/pull/18036#discussion_r1874952912 ## tests/kafkatest/tests/verifiable_consumer_test.py: ## @@ -56,7 +55,7 @@ def min_cluster_size(self): def setup_consumer(self, topic, static_membership=False, enable_autocommit=False, assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor", group_remote_assignor="range", **kwargs): return VerifiableConsumer(self.test_context, self.num_consumers, self.kafka, - topic, self.group_id, static_membership=static_membership, session_timeout_sec=self.session_timeout_sec, Review Comment: `session_timeout_sec` is used by other end-to-end tests, so we need to perform some refactoring for those test cases. **Replace Dynamic Timeouts with Constants** For example, change timeout_sec=self.session_timeout_sec + 5 to timeout_sec=60. **Remove Use Cases of Increasing session_timeout_sec** These cases typically increase session_timeout_sec from 45 seconds to 60 seconds. This adjustment may be unnecessary if the tests can run stably without modifying the timeout. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-18180: Move OffsetResultHolder to storage module [kafka]
m1a2st opened a new pull request, #18100: URL: https://github.com/apache/kafka/pull/18100 Jira: https://issues.apache.org/jira/browse/KAFKA-18180 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18180: Move OffsetResultHolder to storage module [kafka]
m1a2st commented on code in PR #18100: URL: https://github.com/apache/kafka/pull/18100#discussion_r1874955519 ## storage/src/main/java/org/apache/kafka/storage/log/OffsetResultHolder.java: ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.log; + +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.record.FileRecords.TimestampAndOffset; + +import java.util.Objects; +import java.util.Optional; + +public class OffsetResultHolder { + +private Optional timestampAndOffsetOpt; +private Optional> futureHolderOpt; +private Optional maybeOffsetsError = Optional.empty(); +private Optional lastFetchableOffset = Optional.empty(); + +public OffsetResultHolder() { +this(Optional.empty(), Optional.empty()); +} + +public OffsetResultHolder( +Optional timestampAndOffsetOpt, +Optional> futureHolderOpt +) { +this.timestampAndOffsetOpt = timestampAndOffsetOpt; +this.futureHolderOpt = futureHolderOpt; +} + +public OffsetResultHolder(Optional timestampAndOffsetOpt) { +this(timestampAndOffsetOpt, Optional.empty()); +} + +public Optional timestampAndOffsetOpt() { +return timestampAndOffsetOpt; +} + +public Optional> futureHolderOpt() { +return futureHolderOpt; +} + +public Optional maybeOffsetsError() { +return maybeOffsetsError; +} + +public Optional lastFetchableOffset() { +return lastFetchableOffset; +} + +public void timestampAndOffsetOpt(Optional timestampAndOffsetOpt) { +this.timestampAndOffsetOpt = timestampAndOffsetOpt; +} + +public void maybeOffsetsError(Optional maybeOffsetsError) { +this.maybeOffsetsError = maybeOffsetsError; +} + +public void lastFetchableOffset(Optional lastFetchableOffset) { +this.lastFetchableOffset = lastFetchableOffset; +} + +@Override +public boolean equals(Object o) { +if (this == o) return true; +if (o == null || getClass() != o.getClass()) return false; + +OffsetResultHolder that = (OffsetResultHolder) o; +return Objects.equals(timestampAndOffsetOpt, that.timestampAndOffsetOpt) && Objects.equals(futureHolderOpt, that.futureHolderOpt) && Objects.equals(maybeOffsetsError, that.maybeOffsetsError) && Objects.equals(lastFetchableOffset, that.lastFetchableOffset); +} + +@Override +public int hashCode() { +int result = Objects.hashCode(timestampAndOffsetOpt); +result = 31 * result + Objects.hashCode(futureHolderOpt); +result = 31 * result + Objects.hashCode(maybeOffsetsError); +result = 31 * result + Objects.hashCode(lastFetchableOffset); +return result; +} + +public static class FileRecordsOrError { Review Comment: I don't think this name is precise enough, but I can't think of a better idea right now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10731: add support for SSL hot reload [kafka]
CefBoud commented on PR #17987: URL: https://github.com/apache/kafka/pull/17987#issuecomment-2526238613 @TaiJuWu I've drafted a [KIP](https://cwiki.apache.org/confluence/x/eIrREw). If you have any feedback, I’d really appreciate it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org