[GitHub] [kafka] Vaibhav-Nazare commented on pull request #13817: KAFKA-15062: Adding ppc64le build stage
Vaibhav-Nazare commented on PR #13817: URL: https://github.com/apache/kafka/pull/13817#issuecomment-1643386529 Hi @cmccabe @mimaison @divijvaidya Any further updates on nightly job enablement? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a diff in pull request #14045: MINOR: refactor(storage): topic-based RLMM consumer-manager/task related improvements
jeqo commented on code in PR #14045: URL: https://github.com/apache/kafka/pull/14045#discussion_r1269038188 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java: ## @@ -182,13 +182,12 @@ private CompletableFuture storeRemoteLogMetadata(TopicIdPartition topicIdP CompletableFuture produceFuture = producerManager.publishMessage(remoteLogMetadata); // Create and return a `CompletableFuture` instance which completes when the consumer is caught up with the produced record's offset. -return produceFuture.thenApplyAsync(recordMetadata -> { +return produceFuture.thenAcceptAsync(recordMetadata -> { try { - consumerManager.waitTillConsumptionCatchesUp(recordMetadata); + consumerManager.waitTillConsumptionCatchesUp(recordMetadata.partition(), recordMetadata.offset()); Review Comment: Of course, missed this one -- pushed fixes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools
nizhikov commented on PR #13278: URL: https://github.com/apache/kafka/pull/13278#issuecomment-1643445734 @mimaison I reworked `parseOffsetJsonStringWithoutDedup` to return `Map>`. Now, Tuple class eliminated from PR. Please, review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #14053: [KAFKA-15221] Fix the race between fetch requests from a rebooted follower.
dajac commented on PR #14053: URL: https://github.com/apache/kafka/pull/14053#issuecomment-1643446391 @CalvinConfluent Thanks for the PR. There is indeed something fishy here. Could you please try to better explain the race condition in the description? My understanding is that we may pick the wrong broker epoch when we construct the AlterPartition request because we don't really respect the atomic reference (we read it multiple times vs working on a consistent snapshot). Is my understanding correct? If so, I agree that acquiring the isr lock solve the issue. However, it goes a bit against what we tried to achieve with the atomic reference. Our goal was to not acquire this lock. Therefore, I think that we should ensure that acquiring it does not impact the performances. Have we validated this? I also wonder if we could rework the shrink/expand paths to work based on a snapshot instead of acquiring the isr lock here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14053: [KAFKA-15221] Fix the race between fetch requests from a rebooted follower.
dajac commented on code in PR #14053: URL: https://github.com/apache/kafka/pull/14053#discussion_r1269075133 ## core/src/main/scala/kafka/cluster/Replica.scala: ## @@ -98,31 +101,39 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log * fetch request is always smaller than the leader's LEO, which can happen if small produce requests are received at * high frequency. */ - def updateFetchState( + def maybeUpdateFetchState( followerFetchOffsetMetadata: LogOffsetMetadata, followerStartOffset: Long, followerFetchTimeMs: Long, leaderEndOffset: Long, brokerEpoch: Long - ): Unit = { + ): Boolean = { +var updateSuccess = true replicaState.updateAndGet { currentReplicaState => - val lastCaughtUpTime = if (followerFetchOffsetMetadata.messageOffset >= leaderEndOffset) { -math.max(currentReplicaState.lastCaughtUpTimeMs, followerFetchTimeMs) - } else if (followerFetchOffsetMetadata.messageOffset >= currentReplicaState.lastFetchLeaderLogEndOffset) { -math.max(currentReplicaState.lastCaughtUpTimeMs, currentReplicaState.lastFetchTimeMs) + // Fence the update if it provides a stale broker epoch. + if (brokerEpoch != -1 && brokerEpoch < currentReplicaState.brokerEpoch.getOrElse(-1L)) { +updateSuccess = false Review Comment: It is not recommended to have side effect in the method updating the state of an atomic value. I wonder if we could throw an exception instead to really ensure that the update process is stopped. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13817: KAFKA-15062: Adding ppc64le build stage
mimaison commented on PR #13817: URL: https://github.com/apache/kafka/pull/13817#issuecomment-1643452195 @Vaibhav-Nazare The KIP needs to be voted. So far you've only started a discussion. I'd recommend replying to the discussion thread to ask for any more feedback, otherwise you can start a vote in the next few days. See xhttps://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals for the process -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14053: [KAFKA-15221] Fix the race between fetch requests from a rebooted follower.
dajac commented on code in PR #14053: URL: https://github.com/apache/kafka/pull/14053#discussion_r1269078528 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -858,13 +858,22 @@ class Partition(val topicPartition: TopicPartition, // No need to calculate low watermark if there is no delayed DeleteRecordsRequest val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L val prevFollowerEndOffset = replica.stateSnapshot.logEndOffset -replica.updateFetchState( - followerFetchOffsetMetadata, - followerStartOffset, - followerFetchTimeMs, - leaderEndOffset, - brokerEpoch -) + +// Acquire the lock for the fetch state update. A race can happen between fetch requests from a rebooted broker. +// The requests before and after the reboot can carry different fetch metadata especially offsets and broker epoch. +// It can particularly affect the ISR expansion where we decide to expand based on stale fetch request but use the +// latest broker epoch to fill in the AlterPartition request. +inReadLock(leaderIsrUpdateLock) { + if (!replica.maybeUpdateFetchState( +followerFetchOffsetMetadata, +followerStartOffset, +followerFetchTimeMs, +leaderEndOffset, +brokerEpoch + )) { +return Review Comment: Should we return an error here? `followerReplicaOrThrow` may be a good inspiration for the errors that we could use here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13303) RoundRobinPartitioner broken by KIP-480
[ https://issues.apache.org/jira/browse/KAFKA-13303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744943#comment-17744943 ] zhangzhisheng commented on KAFKA-13303: --- its's ture that RoundRobinPartitioner lead to data imbalance in some cases > RoundRobinPartitioner broken by KIP-480 > --- > > Key: KAFKA-13303 > URL: https://issues.apache.org/jira/browse/KAFKA-13303 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.4.0, 2.5.0, 2.4.1, 2.5.1 >Reporter: Jon McEwen >Priority: Minor > > Since KIP-480 Sticky Partitioning, the RoundRobinPartitioner doesn't behave > correctly. An additional call to `partition()` on new batch leads to > partitions being skipped. > > I have a fix that I would like to contribute, but I need help getting started > as a contributor, e.g. for basic things like formatting the code. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-9965) Uneven distribution with RoundRobinPartitioner in AK 2.4+
[ https://issues.apache.org/jira/browse/KAFKA-9965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744944#comment-17744944 ] zhangzhisheng commented on KAFKA-9965: -- (y) > Uneven distribution with RoundRobinPartitioner in AK 2.4+ > - > > Key: KAFKA-9965 > URL: https://issues.apache.org/jira/browse/KAFKA-9965 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.4.0, 2.5.0, 2.4.1 >Reporter: Michael Bingham >Priority: Major > > {{RoundRobinPartitioner}} states that it will provide equal distribution of > records across partitions. However with the enhancements made in KIP-480, it > may not. In some cases, when a new batch is started, the partitioner may be > called a second time for the same record: > [https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L909] > [https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L934] > Each time the partitioner is called, it increments a counter in > {{RoundRobinPartitioner}}, so this can result in unequal distribution. > Easiest fix might be to decrement the counter in > {{RoundRobinPartitioner#onNewBatch}}. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] Owen-CH-Leung opened a new pull request, #14057: KAFKA-15194-Prepend-Offset-as-Filename
Owen-CH-Leung opened a new pull request, #14057: URL: https://github.com/apache/kafka/pull/14057 Prepend the offset information to the filename. ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [x] Verify test coverage and CI build status - [x] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9964) Better description of RoundRobinPartitioner behavior for AK 2.4+
[ https://issues.apache.org/jira/browse/KAFKA-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744947#comment-17744947 ] zhangzhisheng commented on KAFKA-9964: -- it's bug, pls follow RoundRobinPartitioner broken by KIP-480 > Better description of RoundRobinPartitioner behavior for AK 2.4+ > > > Key: KAFKA-9964 > URL: https://issues.apache.org/jira/browse/KAFKA-9964 > Project: Kafka > Issue Type: Improvement > Components: producer >Affects Versions: 2.4.0, 2.5.0, 2.4.1 >Reporter: Michael Bingham >Priority: Minor > > The Javadocs for {{RoundRobinPartitioner}} currently state: > {quote}This partitioning strategy can be used when user wants to distribute > the writes to all partitions equally > {quote} > In AK 2.4+, equal distribution is not guaranteed, even with this partitioner. > The enhancements to consider batching made with > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner] > affect this partitioner as well. > So it would be useful to add some additional Javadocs to explain that unless > batching is disabled, even distribution of records is not guaranteed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-9964) Better description of RoundRobinPartitioner behavior for AK 2.4+
[ https://issues.apache.org/jira/browse/KAFKA-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744947#comment-17744947 ] zhangzhisheng edited comment on KAFKA-9964 at 7/20/23 8:41 AM: --- it's bug, pls follow [RoundRobinPartitioner broken by KIP-480|https://issues.apache.org/jira/browse/KAFKA-13303] was (Author: zhangzs): it's bug, pls follow RoundRobinPartitioner broken by KIP-480 > Better description of RoundRobinPartitioner behavior for AK 2.4+ > > > Key: KAFKA-9964 > URL: https://issues.apache.org/jira/browse/KAFKA-9964 > Project: Kafka > Issue Type: Improvement > Components: producer >Affects Versions: 2.4.0, 2.5.0, 2.4.1 >Reporter: Michael Bingham >Priority: Minor > > The Javadocs for {{RoundRobinPartitioner}} currently state: > {quote}This partitioning strategy can be used when user wants to distribute > the writes to all partitions equally > {quote} > In AK 2.4+, equal distribution is not guaranteed, even with this partitioner. > The enhancements to consider batching made with > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner] > affect this partitioner as well. > So it would be useful to add some additional Javadocs to explain that unless > batching is disabled, even distribution of records is not guaranteed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error
dajac commented on PR #14046: URL: https://github.com/apache/kafka/pull/14046#issuecomment-1643522968 @jolshan I was actually thinking about the `AuthorizerIntegrationTest` failures overnight and I found an issue with the `latestVersionUnstable` flag. Let me try to explain. The `latestVersionUnstable` is used on the broker side to ensure that an unreleased/unstable version is not exposed by the broker. That's fine. However, it does not guarantee that a client having an unreleased/unstable version is not going to use it. Let's take this change as an example. The version 9 will be shipped in the next release even if we don't want to use it because the schema is there. So the client knows about it and may use it if the broker eventually supports the version. The issue is that the release version my be different from the one shipped so the client would get an error. I have updated the `AbstractRequest.Builder` to be defensive and only consider stable versions. If the user wants to construct a request for an unstable version, it has to specify it explicitly. This guarantee that even if an unstable version is shipped, it is never used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] muralibasani commented on pull request #13417: KAFKA-14585: Moving StorageTool from core to tools module
muralibasani commented on PR #13417: URL: https://github.com/apache/kafka/pull/13417#issuecomment-1643531272 > Thanks @muralibasani, let me know when you are ready for another review. @fvaleri made the necessary changes I believe. Pls take a look. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13931: KAFKA-8977: Remove MockStreamsMetrics since it is not a mock
cadonna commented on code in PR #13931: URL: https://github.com/apache/kafka/pull/13931#discussion_r1265659668 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java: ## @@ -93,13 +99,19 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; - +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.eq; +@RunWith(MockitoJUnitRunner.StrictStubs.class) Review Comment: ```suggestion @RunWith(MockitoJUnitRunner.StrictStubs.class) ``` ## streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java: ## @@ -186,57 +205,25 @@ public void cleanup() { public void shouldRecordRecordsAndBytesProduced() { final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); -final String threadId = Thread.currentThread().getName(); -final String processorNodeId = sinkNodeName; -final String topic = "topic"; -final Metric recordsProduced = streamsMetrics.metrics().get( -new MetricName("records-produced-total", - TOPIC_LEVEL_GROUP, - "The total number of records produced from this topic", - streamsMetrics.topicLevelTagMap(threadId, taskId.toString(), processorNodeId, topic)) -); -final Metric bytesProduced = streamsMetrics.metrics().get( -new MetricName("bytes-produced-total", - TOPIC_LEVEL_GROUP, - "The total number of bytes produced from this topic", - streamsMetrics.topicLevelTagMap(threadId, taskId.toString(), processorNodeId, topic)) -); - -double totalRecords = 0D; -double totalBytes = 0D; +final MockedStatic topicMetrics = mockStatic(TopicMetrics.class); Review Comment: If you use the static mock only in this method, I would suggest to use a `try-resource`-clause. ```java try (final MockedStatic topicMetrics = mockStatic(TopicMetrics.class)) { ... } ``` without the `topicMetrics.close()` at end. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java: ## @@ -186,57 +205,25 @@ public void cleanup() { public void shouldRecordRecordsAndBytesProduced() { final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); -final String threadId = Thread.currentThread().getName(); -final String processorNodeId = sinkNodeName; -final String topic = "topic"; -final Metric recordsProduced = streamsMetrics.metrics().get( -new MetricName("records-produced-total", - TOPIC_LEVEL_GROUP, - "The total number of records produced from this topic", - streamsMetrics.topicLevelTagMap(threadId, taskId.toString(), processorNodeId, topic)) -); -final Metric bytesProduced = streamsMetrics.metrics().get( -new MetricName("bytes-produced-total", - TOPIC_LEVEL_GROUP, - "The total number of bytes produced from this topic", - streamsMetrics.topicLevelTagMap(threadId, taskId.toString(), processorNodeId, topic)) -); - -double totalRecords = 0D; -double totalBytes = 0D; +final MockedStatic topicMetrics = mockStatic(TopicMetrics.class); -assertThat(recordsProduced.metricValue(), equalTo(totalRecords)); -assertThat(bytesProduced.metricValue(), equalTo(totalBytes)); +when(TopicMetrics.producedSensor( +Mockito.anyString(), +Mockito.anyString(), +Mockito.anyString(), +Mockito.anyString(), +Mockito.any(StreamsMetricsImpl.class) +)).thenReturn(mockSensor); Review Comment: nit: we usually use 4 spaces: ```suggestion when(TopicMetrics.producedSensor( Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.any(StreamsMetricsImpl.class) )).thenReturn(mockSensor); ``` ## streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java: ## @@ -186,57 +205,25 @@ public void cleanup() { public void shouldRecordRecordsAndBytesProduced() { final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); -final String threadId = Thread.currentThr
[GitHub] [kafka] dajac commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related
dajac commented on code in PR #14047: URL: https://github.com/apache/kafka/pull/14047#discussion_r1269152813 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -386,6 +386,18 @@ public short groupMetadataValueVersion() { } } +public short offsetCommitValueVersion() { +if (isLessThan(MetadataVersion.IBP_2_1_IV0)) { +return 1; +} else if (isLessThan(MetadataVersion.IBP_2_1_IV1)) { +return 2; +} else { +// Serialize with the highest supported non-flexible version +// until a tagged field is introduced or the version is bumped. +return 3; +} Review Comment: > > // Serialize with the highest supported non-flexible version > > // until a tagged field is introduced or the version is bumped. > > This comment confused me a bit. Do we plan to manually update this method when new versions come in? Why is there a callout for flexible versions? This is related to https://cwiki.apache.org/confluence/display/KAFKA/KIP-915%3A+Txn+and+Group+Coordinator+Downgrade+Foundation. Basically, we have already introduced the version 4 of this record to support tagged fields that may be added in the future. However, we don't want to use version 4 yet. When a new version is released or tagged fields are added, we will change the logic here to use the correct version based on the metadata version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related
dajac commented on code in PR #14047: URL: https://github.com/apache/kafka/pull/14047#discussion_r1269155972 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; + +import java.util.Objects; +import java.util.OptionalInt; +import java.util.OptionalLong; + +/** + * Represents a committed offset with its metadata. + */ +public class OffsetAndMetadata { +public static final String NO_METADATA = ""; + +/** + * The committed offset. + */ +public final long offset; + +/** + * The leader epoch in use when the offset was committed. + */ +public final OptionalInt leaderEpoch; Review Comment: Correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related
dajac commented on code in PR #14047: URL: https://github.com/apache/kafka/pull/14047#discussion_r1269156330 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; + +import java.util.Objects; +import java.util.OptionalInt; +import java.util.OptionalLong; + +/** + * Represents a committed offset with its metadata. + */ +public class OffsetAndMetadata { +public static final String NO_METADATA = ""; Review Comment: It will be used by a following patch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13983: KAFKA-14734: Use CommandDefaultOptions in StreamsResetter
cadonna commented on code in PR #13983: URL: https://github.com/apache/kafka/pull/13983#discussion_r1269179717 ## tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java: ## @@ -133,77 +112,69 @@ public class StreamsResetter { + "*** Warning! This tool makes irreversible changes to your application. It is strongly recommended that " + "you run this once with \"--dry-run\" to preview your changes before making them.\n\n"; -private OptionSet options = null; private final List allTopics = new LinkedList<>(); - -public int run(final String[] args) { -return run(args, new Properties()); +public static void main(final String[] args) { +Exit.exit(new StreamsResetter().execute(args)); } -public int run(final String[] args, - final Properties config) { -int exitCode; +public int execute(final String[] args) { +return execute(args, new Properties()); +} -Admin adminClient = null; +public int execute(final String[] args, final Properties config) { try { -parseArguments(args); - -final boolean dryRun = options.has(dryRunOption); +StreamsResetterOptions options = new StreamsResetterOptions(args); -final String groupId = options.valueOf(applicationIdOption); -final Properties properties = new Properties(); -if (options.has(commandConfigOption)) { - properties.putAll(Utils.loadProps(options.valueOf(commandConfigOption))); +String groupId = options.applicationId(); Review Comment: > streams used to have this checkstyle that all variables should be final Yeah, we still have that checkstyle rule. The checkstyle rule is not perfect, but it is the best you can get in java to make sure variable are changed inadvertently. Unfortunately, changes to a referenced object are not affected by final. That is the "not perfect" part. Regarding applying the rule to the tools package, I would be in favor. However, it should not be part of this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13983: KAFKA-14734: Use CommandDefaultOptions in StreamsResetter
cadonna commented on code in PR #13983: URL: https://github.com/apache/kafka/pull/13983#discussion_r1269179717 ## tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java: ## @@ -133,77 +112,69 @@ public class StreamsResetter { + "*** Warning! This tool makes irreversible changes to your application. It is strongly recommended that " + "you run this once with \"--dry-run\" to preview your changes before making them.\n\n"; -private OptionSet options = null; private final List allTopics = new LinkedList<>(); - -public int run(final String[] args) { -return run(args, new Properties()); +public static void main(final String[] args) { +Exit.exit(new StreamsResetter().execute(args)); } -public int run(final String[] args, - final Properties config) { -int exitCode; +public int execute(final String[] args) { +return execute(args, new Properties()); +} -Admin adminClient = null; +public int execute(final String[] args, final Properties config) { try { -parseArguments(args); - -final boolean dryRun = options.has(dryRunOption); +StreamsResetterOptions options = new StreamsResetterOptions(args); -final String groupId = options.valueOf(applicationIdOption); -final Properties properties = new Properties(); -if (options.has(commandConfigOption)) { - properties.putAll(Utils.loadProps(options.valueOf(commandConfigOption))); +String groupId = options.applicationId(); Review Comment: > streams used to have this checkstyle that all variables should be final Yeah, we still have that checkstyle rule. The checkstyle rule is not perfect, but it is the best you can get in java to make sure variable are not changed inadvertently. Unfortunately, changes to a referenced object are not affected by final. That is the "not perfect" part. Regarding applying the rule to the tools package, I would be in favor. However, it should not be part of this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #14045: MINOR: refactor(storage): topic-based RLMM consumer-manager/task related improvements
satishd commented on code in PR #14045: URL: https://github.com/apache/kafka/pull/14045#discussion_r1269184601 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java: ## @@ -353,4 +358,10 @@ public void close() { } } } + +public Set metadataPartitionsAssigned() { +return assignedMetaPartitions.stream() Review Comment: Can we return an immutable `Set` of `assignedMetaPartitions` instead of topic-partitions as the topic names are repetitive? This is currently being passed to build a string of partitions in an Exception message [here](https://github.com/apache/kafka/pull/14045/files#diff-7d10de4e1fdb683a452acb203d1ee92b64539a2b0b19533a6abc42fa97b2f04aR115). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14057: KAFKA-15194-Prepend-Offset-as-Filename
divijvaidya commented on code in PR #14057: URL: https://github.com/apache/kafka/pull/14057#discussion_r1269189609 ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java: ## @@ -399,20 +403,21 @@ public Verifier(final LocalTieredStorage remoteStorage, final TopicIdPartition t this.topicIdPartition = requireNonNull(topicIdPartition); } -private List expectedPaths(final RemoteLogSegmentId id) { +private List expectedPaths(final RemoteLogSegmentMetadata metadata) { final String rootPath = getStorageRootDirectory(); TopicPartition tp = topicIdPartition.topicPartition(); final String topicPartitionSubpath = format("%s-%d-%s", tp.topic(), tp.partition(), topicIdPartition.topicId()); -final String uuid = id.id().toString(); +final String uuid = metadata.remoteLogSegmentId().id().toString(); +final long startOffset = metadata.startOffset(); return Arrays.asList( -Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.LOG_FILE_SUFFIX), -Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.INDEX_FILE_SUFFIX), -Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.TIME_INDEX_FILE_SUFFIX), -Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.TXN_INDEX_FILE_SUFFIX), -Paths.get(rootPath, topicPartitionSubpath, uuid + LEADER_EPOCH_CHECKPOINT.getSuffix()), -Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX) +Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.LOG_FILE_SUFFIX), Review Comment: Ideally, we want the test implementation to be as close to the actual log file implementation as possible. Considering that, could we use `LogFileUtils#logFile(File dir, long offset)` here? Same for index file names. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #14040: KAFKA-15212: Delete Classgraph-MIT license
divijvaidya commented on PR #14040: URL: https://github.com/apache/kafka/pull/14040#issuecomment-1643579946 Unrelated test failures. Merging this in. ``` [Build / JDK 20 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14040/2/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_20_and_Scala_2_13___testBalancePartitionLeaders__/) [Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testOffsetTranslationBehindReplicationFlow()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14040/2/testReport/junit/org.apache.kafka.connect.mirror.integration/IdentityReplicationIntegrationTest/Build___JDK_8_and_Scala_2_12___testOffsetTranslationBehindReplicationFlow__/) [Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testOffsetSyncsTopicsOnTarget()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14040/2/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationBaseTest/Build___JDK_8_and_Scala_2_12___testOffsetSyncsTopicsOnTarget__/) [Build / JDK 8 and Scala 2.12 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14040/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_8_and_Scala_2_12___testRackAwareRangeAssignor__/) [Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplicateSourceDefault()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14040/2/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTransactionsTest/Build___JDK_11_and_Scala_2_13___testReplicateSourceDefault__/) [Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14040/2/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_11_and_Scala_2_13___testBalancePartitionLeaders__/) [Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14040/2/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationBaseTest/Build___JDK_17_and_Scala_2_13___testSyncTopicConfigs__/) [Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14040/2/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationExactlyOnceTest/Build___JDK_17_and_Scala_2_13___testOffsetTranslationBehindReplicationFlow__/) [Build / JDK 17 and Scala 2.13 / org.apache.kafka.tools.MetadataQuorumCommandTest.[6] Type=Raft-Isolated, Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.6-IV0, Security=PLAINTEXT](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14040/2/testReport/junit/org.apache.kafka.tools/MetadataQuorumCommandTest/Build___JDK_17_and_Scala_2_136__Type_Raft_Isolated__Name_testDescribeQuorumReplicationSuccessful__MetadataVersion_3_6_IV0__Security_PLAINTEXT/) [Build / JDK 17 and Scala 2.13 / org.apache.kafka.tools.MetadataQuorumCommandTest.[2] Type=Raft-Isolated, Name=testDescribeQuorumStatusSuccessful, MetadataVersion=3.6-IV0, Security=PLAINTEXT](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14040/2/testReport/junit/org.apache.kafka.tools/MetadataQuorumCommandTest/Build___JDK_17_and_Scala_2_132__Type_Raft_Isolated__Name_testDescribeQuorumStatusSuccessful__MetadataVersion_3_6_IV0__Security_PLAINTEXT/) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya merged pull request #14040: KAFKA-15212: Delete Classgraph-MIT license
divijvaidya merged PR #14040: URL: https://github.com/apache/kafka/pull/14040 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15212) Remove unneeded classgraph license file
[ https://issues.apache.org/jira/browse/KAFKA-15212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya resolved KAFKA-15212. -- Reviewer: Divij Vaidya Resolution: Fixed > Remove unneeded classgraph license file > --- > > Key: KAFKA-15212 > URL: https://issues.apache.org/jira/browse/KAFKA-15212 > Project: Kafka > Issue Type: Bug >Reporter: Divij Vaidya >Assignee: Tanay Karmarkar >Priority: Major > Labels: newbie > Fix For: 3.6.0 > > > The license file for classgraph can be completely removed from here: > [https://github.com/apache/kafka/blob/trunk/licenses/classgraph-MIT] since it > is not a dependency of Kafka any more. > The associated package was removed from license at > [https://github.com/apache/kafka/commit/6cf4a2eaa7a436f0233aece49ed81bafe64262c4] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13983: KAFKA-14734: Use CommandDefaultOptions in StreamsResetter
vamossagar12 commented on code in PR #13983: URL: https://github.com/apache/kafka/pull/13983#discussion_r1269194729 ## tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java: ## @@ -133,77 +112,69 @@ public class StreamsResetter { + "*** Warning! This tool makes irreversible changes to your application. It is strongly recommended that " + "you run this once with \"--dry-run\" to preview your changes before making them.\n\n"; -private OptionSet options = null; private final List allTopics = new LinkedList<>(); - -public int run(final String[] args) { -return run(args, new Properties()); +public static void main(final String[] args) { +Exit.exit(new StreamsResetter().execute(args)); } -public int run(final String[] args, - final Properties config) { -int exitCode; +public int execute(final String[] args) { +return execute(args, new Properties()); +} -Admin adminClient = null; +public int execute(final String[] args, final Properties config) { try { -parseArguments(args); - -final boolean dryRun = options.has(dryRunOption); +StreamsResetterOptions options = new StreamsResetterOptions(args); -final String groupId = options.valueOf(applicationIdOption); -final Properties properties = new Properties(); -if (options.has(commandConfigOption)) { - properties.putAll(Utils.loadProps(options.valueOf(commandConfigOption))); +String groupId = options.applicationId(); Review Comment: > However, it should not be part of this PR. Thanks Bruno, I was about to comment the same thing. We can track it separately @fvaleri. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #14032: MINOR: Upgrade Gradle wrapper version to 8.2.1
divijvaidya commented on PR #14032: URL: https://github.com/apache/kafka/pull/14032#issuecomment-1643590831 Interestingly the failure rate has gone down today. I guess we will encounter it every time we upgrade gradle?! Let's wait and observer for another 24 hours. Separately, upgrading Zinc is definitely a good idea. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi opened a new pull request, #14058: KAFKA-15129;[10/N] Remove metrics in log when broker shutdown
hudeqi opened a new pull request, #14058: URL: https://github.com/apache/kafka/pull/14058 This pr is used to remove the metrics in log when broker shutdown. This pr has passed the corresponding unit test, and it is part of [KAFKA-15129](https://issues.apache.org/jira/browse/KAFKA-15129). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13998: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on code in PR #13998: URL: https://github.com/apache/kafka/pull/13998#discussion_r1269199183 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java: ## @@ -52,33 +42,22 @@ public Map members() { return members; } -/** - * @return Topic metadata keyed by topic Ids. - */ -public Map topics() { -return topics; -} - @Override public boolean equals(Object o) { if (this == o) return true; -if (o == null || getClass() != o.getClass()) return false; +if (!(o instanceof AssignmentSpec)) return false; Review Comment: nit: This line change is not necessary. Let's revert it. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicDescriber.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Set; + +/** + * The assignment topic describer is used by the {@link PartitionAssignor} + * to obtain topic and partition metadata of subscribed topics. + * + * The interface is kept in an internal module until KIP-848 is fully + * implemented and ready to be released. + */ +@InterfaceStability.Unstable +public interface AssignmentTopicDescriber { Review Comment: nit: I wonder if `SubscribedTopicDescriber` would be better based on the javadoc. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicDescriber.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Set; + +/** + * The assignment topic describer is used by the {@link PartitionAssignor} + * to obtain topic and partition metadata of subscribed topics. + * + * The interface is kept in an internal module until KIP-848 is fully + * implemented and ready to be released. + */ +@InterfaceStability.Unstable +public interface AssignmentTopicDescriber { + +/** + * Returns a set of subscribed topicIds. + * + * @return Set of topicIds corresponding to the subscribed topics. + */ +Set subscribedTopicIds(); + +/** + * Number of partitions for the given topicId. Review Comment: nit: `topicIds` -> `topic id`. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicDescriber.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific l
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #14051: KAFKA-15218: Avoid NPE thrown while deleting topic and fetch from follower concurrently
vamossagar12 commented on code in PR #14051: URL: https://github.com/apache/kafka/pull/14051#discussion_r1269230642 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -998,7 +998,13 @@ class Partition(val topicPartition: TopicPartition, // 3. Its metadata cached broker epoch matches its Fetch request broker epoch. Or the Fetch //request broker epoch is -1 which bypasses the epoch verification. case kRaftMetadataCache: KRaftMetadataCache => -val storedBrokerEpoch = remoteReplicasMap.get(followerReplicaId).stateSnapshot.brokerEpoch +val mayBeReplica = getReplica(followerReplicaId) +// The topic is already deleted and we don't have any replica information. In this case, we can return false +// so as to avoid NPE +if (mayBeReplica.isEmpty) { + return false Review Comment: Makes sense. Added the suggested logline. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #14051: KAFKA-15218: Avoid NPE thrown while deleting topic and fetch from follower concurrently
vamossagar12 commented on PR #14051: URL: https://github.com/apache/kafka/pull/14051#issuecomment-1643633746 Thanks @showuon I did consider writing a test but felt since this is looks like a race condition i.e fetch request from follower coming in around the same time the `remoteReplicasMap` gets cleared, it seemed hard to replicate in unit tests. I did try updating the test you suggested to 1) Execute a partition.delete and try to do a fetchfollower. It fails with `NotLeaderOrFollowerException`. 2) Execute partition.delete on a separate thread and executing the fetchFollower on the main Test thread. Even in that case, I got `Replica not Found`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12525) Inaccurate task status due to status record interleaving in fast rebalances in Connect
[ https://issues.apache.org/jira/browse/KAFKA-12525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-12525. --- Resolution: Fixed > Inaccurate task status due to status record interleaving in fast rebalances > in Connect > -- > > Key: KAFKA-12525 > URL: https://issues.apache.org/jira/browse/KAFKA-12525 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.1, 2.4.1, 2.5.1, 2.7.0, 2.6.1 >Reporter: Konstantine Karantasis >Assignee: Sagar Rao >Priority: Major > > When a task is stopped in Connect it produces an {{UNASSIGNED}} status > record. > Equivalently, when a task is started or restarted in Connect it produces an > {{RUNNING}} status record in the Connect status topic. > At the same time rebalances are decoupled from task start and stop. These > operations happen in separate executor outside of the main worker thread that > performs the rebalance. > Normally, any delayed and stale {{UNASSIGNED}} status records are fenced by > the worker that is sending them. This worker is using the > {{StatusBackingStore#putSafe}} method that will reject any stale status > messages (called only for {{UNASSIGNED}} or {{FAILED}}) as long as the worker > is aware of the newer status record that declares a task as {{RUNNING}}. > In cases of fast consecutive rebalances where a task is revoked from one > worker and assigned to another one, it has been observed that there is a > small time window and thus a race condition during which a {{RUNNING}} status > record in the new generation is produced and is immediately followed by a > delayed {{UNASSIGNED}} status record belonging to the same or a previous > generation before the worker that sends this message reads the {{RUNNING}} > status record that corresponds to the latest generation. > A couple of options are available to remediate this race condition. > For example a worker that is has started a task can re-write the {{RUNNING}} > status message in the topic if it reads a stale {{UNASSIGNED}} message from a > previous generation (that should have been fenced). > Another option is to ignore stale {{UNASSIGNED}} message (messages from an > earlier generation than the one in which the task had {{RUNNING}} status). > Worth noting that when this race condition takes place, besides the > inaccurate status representation, the actual execution of the tasks remains > unaffected (e.g. the tasks are running correctly even though they appear as > {{UNASSIGNED}}). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] fvaleri opened a new pull request, #14059: KAFKA-14583: Move ReplicaVerificationTool to tools
fvaleri opened a new pull request, #14059: URL: https://github.com/apache/kafka/pull/14059 Added --bootstrap-sever to align with other tools, --broker-list is now deprecated. Added warnings for all deprecated options, that will be removed in the next major release. Updated replica_verification_tool.py to use the wrapper script rather than the class name. Ran replica_verification_testi.py system test. The system test is a bit flaky and the migration doesn't change this. I'll play more around it to see if it can be improved. http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/3.4/2023-01-31--001.system-test-kafka-3.4--1675184554--confluentinc--3.4--ef3f5bd834/report.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13983: KAFKA-14734: Use CommandDefaultOptions in StreamsResetter
fvaleri commented on code in PR #13983: URL: https://github.com/apache/kafka/pull/13983#discussion_r1269261271 ## tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java: ## @@ -133,77 +112,69 @@ public class StreamsResetter { + "*** Warning! This tool makes irreversible changes to your application. It is strongly recommended that " + "you run this once with \"--dry-run\" to preview your changes before making them.\n\n"; -private OptionSet options = null; private final List allTopics = new LinkedList<>(); - -public int run(final String[] args) { -return run(args, new Properties()); +public static void main(final String[] args) { +Exit.exit(new StreamsResetter().execute(args)); } -public int run(final String[] args, - final Properties config) { -int exitCode; +public int execute(final String[] args) { +return execute(args, new Properties()); +} -Admin adminClient = null; +public int execute(final String[] args, final Properties config) { try { -parseArguments(args); - -final boolean dryRun = options.has(dryRunOption); +StreamsResetterOptions options = new StreamsResetterOptions(args); -final String groupId = options.valueOf(applicationIdOption); -final Properties properties = new Properties(); -if (options.has(commandConfigOption)) { - properties.putAll(Utils.loadProps(options.valueOf(commandConfigOption))); +String groupId = options.applicationId(); Review Comment: Ok, do you have any further comments on this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13983: KAFKA-14734: Use CommandDefaultOptions in StreamsResetter
fvaleri commented on code in PR #13983: URL: https://github.com/apache/kafka/pull/13983#discussion_r1269261271 ## tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java: ## @@ -133,77 +112,69 @@ public class StreamsResetter { + "*** Warning! This tool makes irreversible changes to your application. It is strongly recommended that " + "you run this once with \"--dry-run\" to preview your changes before making them.\n\n"; -private OptionSet options = null; private final List allTopics = new LinkedList<>(); - -public int run(final String[] args) { -return run(args, new Properties()); +public static void main(final String[] args) { +Exit.exit(new StreamsResetter().execute(args)); } -public int run(final String[] args, - final Properties config) { -int exitCode; +public int execute(final String[] args) { +return execute(args, new Properties()); +} -Admin adminClient = null; +public int execute(final String[] args, final Properties config) { try { -parseArguments(args); - -final boolean dryRun = options.has(dryRunOption); +StreamsResetterOptions options = new StreamsResetterOptions(args); -final String groupId = options.valueOf(applicationIdOption); -final Properties properties = new Properties(); -if (options.has(commandConfigOption)) { - properties.putAll(Utils.loadProps(options.valueOf(commandConfigOption))); +String groupId = options.applicationId(); Review Comment: Ok, thanks for taking a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15218) NPE will be thrown while deleting topic and fetch from follower concurrently
[ https://issues.apache.org/jira/browse/KAFKA-15218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-15218: - Assignee: Sagar Rao > NPE will be thrown while deleting topic and fetch from follower concurrently > > > Key: KAFKA-15218 > URL: https://issues.apache.org/jira/browse/KAFKA-15218 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0 >Reporter: Luke Chen >Assignee: Sagar Rao >Priority: Major > > When deleting topics, we'll first clear all the remoteReplicaMap when > stopPartitions > [here|https://github.com/apache/kafka/blob/2999168cde37142ae3a2377fe939d6b581e692b8/core/src/main/scala/kafka/server/ReplicaManager.scala#L554]. > But this time, there might be fetch request coming from follower, and try to > check if the replica is eligible to be added into ISR > [here|https://github.com/apache/kafka/blob/2999168cde37142ae3a2377fe939d6b581e692b8/core/src/main/scala/kafka/cluster/Partition.scala#L1001]. > At this moment, NPE will be thrown. Although it's fine since this topic is > already deleted, it'd be better to avoid it happen. > > > {code:java} > java.lang.NullPointerException: Cannot invoke > "kafka.cluster.Replica.stateSnapshot()" because the return value of > "kafka.utils.Pool.get(Object)" is null at > kafka.cluster.Partition.isReplicaIsrEligible(Partition.scala:992) > ~[kafka_2.13-3.5.0.jar:?] at > kafka.cluster.Partition.canAddReplicaToIsr(Partition.scala:974) > ~[kafka_2.13-3.5.0.jar:?]at > kafka.cluster.Partition.maybeExpandIsr(Partition.scala:947) > ~[kafka_2.13-3.5.0.jar:?]at > kafka.cluster.Partition.updateFollowerFetchState(Partition.scala:866) > ~[kafka_2.13-3.5.0.jar:?] at > kafka.cluster.Partition.fetchRecords(Partition.scala:1361) > ~[kafka_2.13-3.5.0.jar:?] at > kafka.server.ReplicaManager.read$1(ReplicaManager.scala:1164) > ~[kafka_2.13-3.5.0.jar:?] at > kafka.server.ReplicaManager.$anonfun$readFromLocalLog$7(ReplicaManager.scala:1235) > ~[kafka_2.13-3.5.0.jar:?] at > scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575) > ~[scala-library-2.13.10.jar:?] at > scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573) > ~[scala-library-2.13.10.jar:?] at > scala.collection.AbstractIterable.foreach(Iterable.scala:933) > ~[scala-library-2.13.10.jar:?] at > kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:1234) > ~[kafka_2.13-3.5.0.jar:?]at > kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:1044) > ~[kafka_2.13-3.5.0.jar:?] at > kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:994) > ~[kafka_2.13-3.5.0.jar:?] at > kafka.server.KafkaApis.handle(KafkaApis.scala:181) ~[kafka_2.13-3.5.0.jar:?] > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76) > ~[kafka_2.13-3.5.0.jar:?] at java.lang.Thread.run(Thread.java:1623) [?:?] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13983: KAFKA-14734: Use CommandDefaultOptions in StreamsResetter
vamossagar12 commented on code in PR #13983: URL: https://github.com/apache/kafka/pull/13983#discussion_r1269262979 ## tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java: ## @@ -133,77 +112,69 @@ public class StreamsResetter { + "*** Warning! This tool makes irreversible changes to your application. It is strongly recommended that " + "you run this once with \"--dry-run\" to preview your changes before making them.\n\n"; -private OptionSet options = null; private final List allTopics = new LinkedList<>(); - -public int run(final String[] args) { -return run(args, new Properties()); +public static void main(final String[] args) { +Exit.exit(new StreamsResetter().execute(args)); } -public int run(final String[] args, - final Properties config) { -int exitCode; +public int execute(final String[] args) { +return execute(args, new Properties()); +} -Admin adminClient = null; +public int execute(final String[] args, final Properties config) { try { -parseArguments(args); - -final boolean dryRun = options.has(dryRunOption); +StreamsResetterOptions options = new StreamsResetterOptions(args); -final String groupId = options.valueOf(applicationIdOption); -final Properties properties = new Properties(); -if (options.has(commandConfigOption)) { - properties.putAll(Utils.loadProps(options.valueOf(commandConfigOption))); +String groupId = options.applicationId(); Review Comment: Nope. I already approved it :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15222) Upgrade zinc scala incremental compiler plugin version to a latests stable fit version
Said BOUDJELDA created KAFKA-15222: -- Summary: Upgrade zinc scala incremental compiler plugin version to a latests stable fit version Key: KAFKA-15222 URL: https://issues.apache.org/jira/browse/KAFKA-15222 Project: Kafka Issue Type: Improvement Components: build, tools Reporter: Said BOUDJELDA Assignee: Said BOUDJELDA -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15222) Upgrade zinc scala incremental compiler plugin version to a latests stable fit version (1.9.2)
[ https://issues.apache.org/jira/browse/KAFKA-15222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Said BOUDJELDA updated KAFKA-15222: --- Summary: Upgrade zinc scala incremental compiler plugin version to a latests stable fit version (1.9.2) (was: Upgrade zinc scala incremental compiler plugin version to a latests stable fit version ) > Upgrade zinc scala incremental compiler plugin version to a latests stable > fit version (1.9.2) > -- > > Key: KAFKA-15222 > URL: https://issues.apache.org/jira/browse/KAFKA-15222 > Project: Kafka > Issue Type: Improvement > Components: build, tools >Reporter: Said BOUDJELDA >Assignee: Said BOUDJELDA >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] bmscomp opened a new pull request, #14060: KAFKA-15222: Upgrade zinc Scala incremental compiler plugin version to a latests stable fit version (1.9.2)
bmscomp opened a new pull request, #14060: URL: https://github.com/apache/kafka/pull/14060 The existing version of zinc incremental scala compiler plugin is getting a bit old, upgrading to last stable version 1.9.2 ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [ ] Verify test coverage and CI build status - [x] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13983: KAFKA-14734: Use CommandDefaultOptions in StreamsResetter
showuon commented on PR #13983: URL: https://github.com/apache/kafka/pull/13983#issuecomment-1643688414 Failed tests are unrelated: ``` Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow() Build / JDK 17 and Scala 2.13 / org.apache.kafka.tools.MetadataQuorumCommandTest.[1] Type=Raft-Combined, Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.6-IV0, Security=PLAINTEXT Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testReplication() Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() Build / JDK 20 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow() Build / JDK 20 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #13983: KAFKA-14734: Use CommandDefaultOptions in StreamsResetter
showuon merged PR #13983: URL: https://github.com/apache/kafka/pull/13983 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors
mimaison commented on code in PR #14005: URL: https://github.com/apache/kafka/pull/14005#discussion_r1269268948 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ## @@ -65,27 +72,170 @@ static String encodeTopicPartition(TopicPartition topicPartition) { static Map wrapPartition(TopicPartition topicPartition, String sourceClusterAlias) { Map wrapped = new HashMap<>(); -wrapped.put("topic", topicPartition.topic()); -wrapped.put("partition", topicPartition.partition()); -wrapped.put("cluster", sourceClusterAlias); +wrapped.put(TOPIC_KEY, topicPartition.topic()); +wrapped.put(PARTITION_KEY, topicPartition.partition()); +wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias); return wrapped; } -static Map wrapOffset(long offset) { -return Collections.singletonMap("offset", offset); +public static Map wrapOffset(long offset) { +return Collections.singletonMap(OFFSET_KEY, offset); } -static TopicPartition unwrapPartition(Map wrapped) { -String topic = (String) wrapped.get("topic"); -int partition = (Integer) wrapped.get("partition"); +public static TopicPartition unwrapPartition(Map wrapped) { +String topic = (String) wrapped.get(TOPIC_KEY); +int partition = (Integer) wrapped.get(PARTITION_KEY); return new TopicPartition(topic, partition); } static Long unwrapOffset(Map wrapped) { -if (wrapped == null || wrapped.get("offset") == null) { +if (wrapped == null || wrapped.get(OFFSET_KEY) == null) { return -1L; } -return (Long) wrapped.get("offset"); +return (Long) wrapped.get(OFFSET_KEY); +} + + +/** + * Validate a specific key in a source partition that may be written to the offsets topic for one of the MM2 connectors. + * This method ensures that the key is present in the source partition map and that its value is a string. + * + * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, Map) + * @see SourceRecord#sourcePartition() + * + * @param sourcePartition the to-be-validated source partition; may not be null + * @param key the key to check for in the source partition; may be null Review Comment: It's not public API, all existing callers pass a non-null value and having null should cause an NPE below in get(), so maybe we can remove `may be null`? ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ## @@ -65,27 +72,170 @@ static String encodeTopicPartition(TopicPartition topicPartition) { static Map wrapPartition(TopicPartition topicPartition, String sourceClusterAlias) { Map wrapped = new HashMap<>(); -wrapped.put("topic", topicPartition.topic()); -wrapped.put("partition", topicPartition.partition()); -wrapped.put("cluster", sourceClusterAlias); +wrapped.put(TOPIC_KEY, topicPartition.topic()); +wrapped.put(PARTITION_KEY, topicPartition.partition()); +wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias); return wrapped; } -static Map wrapOffset(long offset) { -return Collections.singletonMap("offset", offset); +public static Map wrapOffset(long offset) { +return Collections.singletonMap(OFFSET_KEY, offset); } -static TopicPartition unwrapPartition(Map wrapped) { -String topic = (String) wrapped.get("topic"); -int partition = (Integer) wrapped.get("partition"); +public static TopicPartition unwrapPartition(Map wrapped) { +String topic = (String) wrapped.get(TOPIC_KEY); +int partition = (Integer) wrapped.get(PARTITION_KEY); return new TopicPartition(topic, partition); } static Long unwrapOffset(Map wrapped) { -if (wrapped == null || wrapped.get("offset") == null) { +if (wrapped == null || wrapped.get(OFFSET_KEY) == null) { return -1L; } -return (Long) wrapped.get("offset"); +return (Long) wrapped.get(OFFSET_KEY); +} + + +/** + * Validate a specific key in a source partition that may be written to the offsets topic for one of the MM2 connectors. + * This method ensures that the key is present in the source partition map and that its value is a string. + * + * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, Map) + * @see SourceRecord#sourcePartition() + * + * @param sourcePartition the to-be-validated source partition; may not be null + * @param key the key to check for in the source partition; may be null + * + * @throws ConnectException if the offset is invalid + */ +static void validateSourcePartitionString(Map sourcePartition, String key) { +Objects.requireNonNull(sourcePartition, "Source partition may not be null
[jira] [Commented] (KAFKA-15222) Upgrade zinc scala incremental compiler plugin version to a latests stable fit version (1.9.2)
[ https://issues.apache.org/jira/browse/KAFKA-15222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17745034#comment-17745034 ] Said BOUDJELDA commented on KAFKA-15222: I made a small pull request for this Jira [https://github.com/apache/kafka/pull/14060] > Upgrade zinc scala incremental compiler plugin version to a latests stable > fit version (1.9.2) > -- > > Key: KAFKA-15222 > URL: https://issues.apache.org/jira/browse/KAFKA-15222 > Project: Kafka > Issue Type: Improvement > Components: build, tools >Reporter: Said BOUDJELDA >Assignee: Said BOUDJELDA >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15222) Upgrade zinc scala incremental compiler plugin version to a latests stable fit version (1.9.2)
[ https://issues.apache.org/jira/browse/KAFKA-15222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Said BOUDJELDA updated KAFKA-15222: --- Docs Text: Upgrading the version of zinc incremental scala compiler plugin, is far a good idea, since we already have an issue related to zinc probably every Gradle version upgrade since much older versions, >From the current version 1.8.0 to version 1.9.2 there may be some enhancements >there is much improvements and dependency upgrades Check this link : https://github.com/sbt/zinc/compare/v1.8.0...v1.9.2 This upgrade is too minor, and consists only on changing the version if zinc, it can be tricky since it's related to the building process and needs to be sure every things is working well, and no regression will be caused on build process was: Upgrading the version of zinc incremental scala compiler plugin, is far a good idea, since we already have an issue related to zinc probably every Gradle version upgrade since much older versions, >From the current version 1.8.0 to version 1.9.2 there may be some enhancements >there is much improvements and dependency upgrades Check this link : https://github.com/sbt/zinc/compare/v1.8.0...v1.9.2 > Upgrade zinc scala incremental compiler plugin version to a latests stable > fit version (1.9.2) > -- > > Key: KAFKA-15222 > URL: https://issues.apache.org/jira/browse/KAFKA-15222 > Project: Kafka > Issue Type: Improvement > Components: build, tools >Reporter: Said BOUDJELDA >Assignee: Said BOUDJELDA >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeqo commented on a diff in pull request #14045: MINOR: refactor(storage): topic-based RLMM consumer-manager/task related improvements
jeqo commented on code in PR #14045: URL: https://github.com/apache/kafka/pull/14045#discussion_r1269305190 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java: ## @@ -353,4 +358,10 @@ public void close() { } } } + +public Set metadataPartitionsAssigned() { +return assignedMetaPartitions.stream() Review Comment: Sure, applying suggestion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bmscomp opened a new pull request, #14061: MINOR: Add jdk 20 to list of jdks that can build scala in README.md file
bmscomp opened a new pull request, #14061: URL: https://github.com/apache/kafka/pull/14061 Adding the value 20 to the JDK version that can build Apache Kafka into README.md ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [x] Verify test coverage and CI build status - [x] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #14060: KAFKA-15222: Upgrade zinc Scala incremental compiler plugin version to a latests stable fit version (1.9.2)
divijvaidya commented on PR #14060: URL: https://github.com/apache/kafka/pull/14060#issuecomment-1643732810 Please add link to release notes explaining the difference amongst version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bmscomp commented on pull request #14060: KAFKA-15222: Upgrade zinc Scala incremental compiler plugin version to a latests stable fit version (1.9.2)
bmscomp commented on PR #14060: URL: https://github.com/apache/kafka/pull/14060#issuecomment-1643741474 @divijvaidya I am working on it right now, I have a good resource https://github.com/sbt/zinc/compare/v1.8.0...v1.9.2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15223) Need clarity in documentation for upgrade/downgrade across releases.
kaushik srinivas created KAFKA-15223: Summary: Need clarity in documentation for upgrade/downgrade across releases. Key: KAFKA-15223 URL: https://issues.apache.org/jira/browse/KAFKA-15223 Project: Kafka Issue Type: Improvement Reporter: kaushik srinivas Referring to the upgrade documentation for apache kafka. [https://kafka.apache.org/34/documentation.html#upgrade_3_4_0] There is confusion with respect to below statements from the above sectioned link of apache docs. "If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets. *Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1."* The above statement mentions that the downgrade would not be possible to version prior to "2.1" in case of "upgrading the inter.broker.protocol.version to the latest version". But, there is another statement made in the documentation in *point 4* as below "Restart the brokers one by one for the new protocol version to take effect. {*}Once the brokers begin using the latest protocol version, it will no longer be possible to downgrade the cluster to an older version.{*}" These two statements are repeated across a lot of prior release of kafka and is confusing. Below are the questions: # Is downgrade not at all possible to *"any"* older version of kafka once the inter.broker.protocol.version is updated to latest version *OR* downgrades are not possible only to versions *"<2.1"* ? # Suppose one takes an approach similar to upgrade even for the downgrade path. i.e. downgrade the inter.broker.protocol.version first to the previous version, next downgrade the software/code of kafka to previous release revision. Does downgrade work with this approach ? Can these two questions be documented if the results are already known ? Maybe a downgrade guide can be created too similar to the existing upgrade guide ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15223) Need more clarity in documentation for upgrade/downgrade procedures and limitations across releases.
[ https://issues.apache.org/jira/browse/KAFKA-15223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kaushik srinivas updated KAFKA-15223: - Summary: Need more clarity in documentation for upgrade/downgrade procedures and limitations across releases. (was: Need clarity in documentation for upgrade/downgrade across releases.) > Need more clarity in documentation for upgrade/downgrade procedures and > limitations across releases. > > > Key: KAFKA-15223 > URL: https://issues.apache.org/jira/browse/KAFKA-15223 > Project: Kafka > Issue Type: Improvement >Reporter: kaushik srinivas >Priority: Critical > > Referring to the upgrade documentation for apache kafka. > [https://kafka.apache.org/34/documentation.html#upgrade_3_4_0] > There is confusion with respect to below statements from the above sectioned > link of apache docs. > "If you are upgrading from a version prior to 2.1.x, please see the note > below about the change to the schema used to store consumer offsets. *Once > you have changed the inter.broker.protocol.version to the latest version, it > will not be possible to downgrade to a version prior to 2.1."* > The above statement mentions that the downgrade would not be possible to > version prior to "2.1" in case of "upgrading the > inter.broker.protocol.version to the latest version". > But, there is another statement made in the documentation in *point 4* as > below > "Restart the brokers one by one for the new protocol version to take effect. > {*}Once the brokers begin using the latest protocol version, it will no > longer be possible to downgrade the cluster to an older version.{*}" > > These two statements are repeated across a lot of prior release of kafka and > is confusing. > Below are the questions: > # Is downgrade not at all possible to *"any"* older version of kafka once > the inter.broker.protocol.version is updated to latest version *OR* > downgrades are not possible only to versions *"<2.1"* ? > # Suppose one takes an approach similar to upgrade even for the downgrade > path. i.e. downgrade the inter.broker.protocol.version first to the previous > version, next downgrade the software/code of kafka to previous release > revision. Does downgrade work with this approach ? > Can these two questions be documented if the results are already known ? > Maybe a downgrade guide can be created too similar to the existing upgrade > guide ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15223) Need more clarity in documentation for upgrade/downgrade procedures and limitations across releases.
[ https://issues.apache.org/jira/browse/KAFKA-15223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kaushik srinivas updated KAFKA-15223: - Description: Referring to the upgrade documentation for apache kafka. [https://kafka.apache.org/34/documentation.html#upgrade_3_4_0] There is confusion with respect to below statements from the above sectioned link of apache docs. "If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets. *Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1."* The above statement mentions that the downgrade would not be possible to version prior to "2.1" in case of "upgrading the inter.broker.protocol.version to the latest version". But, there is another statement made in the documentation in *point 4* as below "Restart the brokers one by one for the new protocol version to take effect. {*}Once the brokers begin using the latest protocol version, it will no longer be possible to downgrade the cluster to an older version.{*}" These two statements are repeated across a lot of prior releases of kafka and is confusing. Below are the questions: # Is downgrade not at all possible to *"any"* older version of kafka once the inter.broker.protocol.version is updated to latest version *OR* downgrades are not possible only to versions *"<2.1"* ? # Suppose one takes an approach similar to upgrade even for the downgrade path. i.e. downgrade the inter.broker.protocol.version first to the previous version, next downgrade the software/code of kafka to previous release revision. Does downgrade work with this approach ? Can these two questions be documented if the results are already known ? Maybe a downgrade guide can be created too similar to the existing upgrade guide ? was: Referring to the upgrade documentation for apache kafka. [https://kafka.apache.org/34/documentation.html#upgrade_3_4_0] There is confusion with respect to below statements from the above sectioned link of apache docs. "If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets. *Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1."* The above statement mentions that the downgrade would not be possible to version prior to "2.1" in case of "upgrading the inter.broker.protocol.version to the latest version". But, there is another statement made in the documentation in *point 4* as below "Restart the brokers one by one for the new protocol version to take effect. {*}Once the brokers begin using the latest protocol version, it will no longer be possible to downgrade the cluster to an older version.{*}" These two statements are repeated across a lot of prior release of kafka and is confusing. Below are the questions: # Is downgrade not at all possible to *"any"* older version of kafka once the inter.broker.protocol.version is updated to latest version *OR* downgrades are not possible only to versions *"<2.1"* ? # Suppose one takes an approach similar to upgrade even for the downgrade path. i.e. downgrade the inter.broker.protocol.version first to the previous version, next downgrade the software/code of kafka to previous release revision. Does downgrade work with this approach ? Can these two questions be documented if the results are already known ? Maybe a downgrade guide can be created too similar to the existing upgrade guide ? > Need more clarity in documentation for upgrade/downgrade procedures and > limitations across releases. > > > Key: KAFKA-15223 > URL: https://issues.apache.org/jira/browse/KAFKA-15223 > Project: Kafka > Issue Type: Improvement >Reporter: kaushik srinivas >Priority: Critical > > Referring to the upgrade documentation for apache kafka. > [https://kafka.apache.org/34/documentation.html#upgrade_3_4_0] > There is confusion with respect to below statements from the above sectioned > link of apache docs. > "If you are upgrading from a version prior to 2.1.x, please see the note > below about the change to the schema used to store consumer offsets. *Once > you have changed the inter.broker.protocol.version to the latest version, it > will not be possible to downgrade to a version prior to 2.1."* > The above statement mentions that the downgrade would not be possible to > version prior to "2.1" in case of "upgrading the > inter.broker.protocol.version to the latest version". > But, there is another statement made in the documentation in *point 4* as > below > "Res
[jira] [Updated] (KAFKA-15223) Need more clarity in documentation for upgrade/downgrade procedures and limitations across releases.
[ https://issues.apache.org/jira/browse/KAFKA-15223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kaushik srinivas updated KAFKA-15223: - Affects Version/s: 3.4.1 3.5.0 3.3.2 3.3.1 3.4.0 > Need more clarity in documentation for upgrade/downgrade procedures and > limitations across releases. > > > Key: KAFKA-15223 > URL: https://issues.apache.org/jira/browse/KAFKA-15223 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1 >Reporter: kaushik srinivas >Priority: Critical > > Referring to the upgrade documentation for apache kafka. > [https://kafka.apache.org/34/documentation.html#upgrade_3_4_0] > There is confusion with respect to below statements from the above sectioned > link of apache docs. > "If you are upgrading from a version prior to 2.1.x, please see the note > below about the change to the schema used to store consumer offsets. *Once > you have changed the inter.broker.protocol.version to the latest version, it > will not be possible to downgrade to a version prior to 2.1."* > The above statement mentions that the downgrade would not be possible to > version prior to "2.1" in case of "upgrading the > inter.broker.protocol.version to the latest version". > But, there is another statement made in the documentation in *point 4* as > below > "Restart the brokers one by one for the new protocol version to take effect. > {*}Once the brokers begin using the latest protocol version, it will no > longer be possible to downgrade the cluster to an older version.{*}" > > These two statements are repeated across a lot of prior releases of kafka and > is confusing. > Below are the questions: > # Is downgrade not at all possible to *"any"* older version of kafka once > the inter.broker.protocol.version is updated to latest version *OR* > downgrades are not possible only to versions *"<2.1"* ? > # Suppose one takes an approach similar to upgrade even for the downgrade > path. i.e. downgrade the inter.broker.protocol.version first to the previous > version, next downgrade the software/code of kafka to previous release > revision. Does downgrade work with this approach ? > Can these two questions be documented if the results are already known ? > Maybe a downgrade guide can be created too similar to the existing upgrade > guide ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #14017: KAFKA-14500; [6/6] Implement SyncGroup protocol in new GroupCoordinator
dajac commented on code in PR #14017: URL: https://github.com/apache/kafka/pull/14017#discussion_r1269398879 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2643,9 +2652,175 @@ private CoordinatorResult updateStaticMemberAndRebalance( group.stateAsString() + " when the unknown static member " + request.groupInstanceId() + " rejoins."); } +return maybeCompleteJoinPhase(group); +} + +public CoordinatorResult genericGroupSync( +RequestContext context, +SyncGroupRequestData request, +CompletableFuture responseFuture +) throws UnknownMemberIdException, GroupIdNotFoundException { +String groupId = request.groupId(); +String memberId = request.memberId(); +GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false); +Optional errorOpt = validateSyncGroup(group, request); +if (errorOpt.isPresent()) { +responseFuture.complete(new SyncGroupResponseData() +.setErrorCode(errorOpt.get().code())); + +} else if (group.isInState(EMPTY)) { +responseFuture.complete(new SyncGroupResponseData() +.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())); + +} else if (group.isInState(PREPARING_REBALANCE)) { +responseFuture.complete(new SyncGroupResponseData() +.setErrorCode(Errors.REBALANCE_IN_PROGRESS.code())); + +} else if (group.isInState(COMPLETING_REBALANCE)) { +group.member(memberId).setAwaitingSyncFuture(responseFuture); +removePendingSyncMember(group, request.memberId()); + +// If this is the leader, then we can attempt to persist state and transition to stable +if (group.isLeader(memberId)) { +log.info("Assignment received from leader {} for group {} for generation {}. " + +"The group has {} members, {} of which are static.", +memberId, groupId, group.generationId(), +group.size(), group.allStaticMemberIds().size()); + +// Fill all members with corresponding assignment. Reset members not specified in +// the assignment to empty assignments. +Map assignments = new HashMap<>(); +request.assignments() +.forEach(assignment -> assignments.put(assignment.memberId(), assignment.assignment())); + +Set membersWithMissingAssignment = new HashSet<>(); +group.allMembers().forEach(member -> { +byte[] assignment = assignments.get(member.memberId()); +if (assignment != null) { +member.setAssignment(assignment); +} else { +membersWithMissingAssignment.add(member.memberId()); +member.setAssignment(new byte[0]); +} +}); Review Comment: I am not sure. I lean towards keeping the implementation as it was to avoid any unwanted side effects. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14017: KAFKA-14500; [6/6] Implement SyncGroup protocol in new GroupCoordinator
dajac commented on code in PR #14017: URL: https://github.com/apache/kafka/pull/14017#discussion_r1269401629 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2643,9 +2652,175 @@ private CoordinatorResult updateStaticMemberAndRebalance( group.stateAsString() + " when the unknown static member " + request.groupInstanceId() + " rejoins."); } +return maybeCompleteJoinPhase(group); +} + +public CoordinatorResult genericGroupSync( +RequestContext context, +SyncGroupRequestData request, +CompletableFuture responseFuture +) throws UnknownMemberIdException, GroupIdNotFoundException { +String groupId = request.groupId(); +String memberId = request.memberId(); +GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false); +Optional errorOpt = validateSyncGroup(group, request); +if (errorOpt.isPresent()) { +responseFuture.complete(new SyncGroupResponseData() +.setErrorCode(errorOpt.get().code())); + +} else if (group.isInState(EMPTY)) { +responseFuture.complete(new SyncGroupResponseData() +.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())); + +} else if (group.isInState(PREPARING_REBALANCE)) { +responseFuture.complete(new SyncGroupResponseData() +.setErrorCode(Errors.REBALANCE_IN_PROGRESS.code())); + +} else if (group.isInState(COMPLETING_REBALANCE)) { +group.member(memberId).setAwaitingSyncFuture(responseFuture); +removePendingSyncMember(group, request.memberId()); + +// If this is the leader, then we can attempt to persist state and transition to stable +if (group.isLeader(memberId)) { +log.info("Assignment received from leader {} for group {} for generation {}. " + +"The group has {} members, {} of which are static.", +memberId, groupId, group.generationId(), +group.size(), group.allStaticMemberIds().size()); + +// Fill all members with corresponding assignment. Reset members not specified in +// the assignment to empty assignments. +Map assignments = new HashMap<>(); +request.assignments() +.forEach(assignment -> assignments.put(assignment.memberId(), assignment.assignment())); + +Set membersWithMissingAssignment = new HashSet<>(); +group.allMembers().forEach(member -> { +byte[] assignment = assignments.get(member.memberId()); +if (assignment != null) { +member.setAssignment(assignment); +} else { +membersWithMissingAssignment.add(member.memberId()); +member.setAssignment(new byte[0]); +} +}); + +if (!membersWithMissingAssignment.isEmpty()) { +log.warn("Setting empty assignments for members {} of {} for generation {}.", +membersWithMissingAssignment, groupId, group.generationId()); +} + +CompletableFuture appendFuture = new CompletableFuture<>(); +appendFuture.whenComplete((__, t) -> { +// Another member may have joined the group while we were awaiting this callback, +// so we must ensure we are still in the CompletingRebalance state and the same generation +// when it gets invoked. if we have transitioned to another state, then do nothing +if (group.isInState(COMPLETING_REBALANCE) && request.generationId() == group.generationId()) { +if (t != null) { +Errors error = Errors.forException(t); +resetAndPropagateAssignmentWithError(group, error); +maybePrepareRebalanceOrCompleteJoin(group, "Error " + error + " when storing group assignment" + +"during SyncGroup (member: " + memberId + ")."); +} else { +// Members' assignments were already updated. Propagate and transition to Stable. +propagateAssignment(group, Errors.NONE); +group.transitionTo(STABLE); +} +} +}); + +List records = Collections.singletonList( +RecordHelpers.newGroupMetadataRecord(group, metadataImage.features().metadataVersion()) +); +return new CoordinatorResult<>(records, appendFuture); +} + +} else if (group.isInState(STABLE)) { +removePendingSyncMember(grou
[GitHub] [kafka] bmscomp commented on pull request #14060: KAFKA-15222: Upgrade zinc Scala incremental compiler plugin version to a latests stable fit version (1.9.2)
bmscomp commented on PR #14060: URL: https://github.com/apache/kafka/pull/14060#issuecomment-1643853720 The current pull request CI, the errors related to zinc appeared again in current Jenkins build, rebasing the branch again will run the build again, the strange behaviour is that the related locked zinc file is pointing to an old version of it I am checking now the behaviour of the build, wait and see -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14057: KAFKA-15194-Prepend-Offset-as-Filename
Owen-CH-Leung commented on code in PR #14057: URL: https://github.com/apache/kafka/pull/14057#discussion_r1269422337 ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java: ## @@ -399,20 +403,21 @@ public Verifier(final LocalTieredStorage remoteStorage, final TopicIdPartition t this.topicIdPartition = requireNonNull(topicIdPartition); } -private List expectedPaths(final RemoteLogSegmentId id) { +private List expectedPaths(final RemoteLogSegmentMetadata metadata) { final String rootPath = getStorageRootDirectory(); TopicPartition tp = topicIdPartition.topicPartition(); final String topicPartitionSubpath = format("%s-%d-%s", tp.topic(), tp.partition(), topicIdPartition.topicId()); -final String uuid = id.id().toString(); +final String uuid = metadata.remoteLogSegmentId().id().toString(); +final long startOffset = metadata.startOffset(); return Arrays.asList( -Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.LOG_FILE_SUFFIX), -Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.INDEX_FILE_SUFFIX), -Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.TIME_INDEX_FILE_SUFFIX), -Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.TXN_INDEX_FILE_SUFFIX), -Paths.get(rootPath, topicPartitionSubpath, uuid + LEADER_EPOCH_CHECKPOINT.getSuffix()), -Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX) +Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.LOG_FILE_SUFFIX), Review Comment: @divijvaidya Thanks for your feedback. I think the actual log file was actually named as [offset].log. Looking at the implementation of `LogFileUtils#logFile(File dir, long offset)`, I don't think it will allow us to insert a uuid in the middle as part of the filename. If we are to keep the `[offset-uuid.filetype]` pattern, instead of using `LogFileUtils#logFile(File dir, long offset)`, maybe we should make `LogFileUtils#filenamePrefixFromOffset(long offset)` as a public method so that we can construct a real offset using this method. What do you think ? FYI, the method to create these offloaded files is `RemoteLogSegmentFileset#openFileset(final File storageDir, final RemoteLogSegmentId id)` . Currently my PR has changed this method to accept `RemoteLogSegmentMetadata` instead of `RemoteLogSegmentId` , get offset from metadata, and prepend it to the filename. (So yes, it's not close to the actual log file implementation, as the offset was just "0" without formatting, instead of "000") -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14057: KAFKA-15194-Prepend-Offset-as-Filename
Owen-CH-Leung commented on code in PR #14057: URL: https://github.com/apache/kafka/pull/14057#discussion_r1269422337 ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java: ## @@ -399,20 +403,21 @@ public Verifier(final LocalTieredStorage remoteStorage, final TopicIdPartition t this.topicIdPartition = requireNonNull(topicIdPartition); } -private List expectedPaths(final RemoteLogSegmentId id) { +private List expectedPaths(final RemoteLogSegmentMetadata metadata) { final String rootPath = getStorageRootDirectory(); TopicPartition tp = topicIdPartition.topicPartition(); final String topicPartitionSubpath = format("%s-%d-%s", tp.topic(), tp.partition(), topicIdPartition.topicId()); -final String uuid = id.id().toString(); +final String uuid = metadata.remoteLogSegmentId().id().toString(); +final long startOffset = metadata.startOffset(); return Arrays.asList( -Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.LOG_FILE_SUFFIX), -Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.INDEX_FILE_SUFFIX), -Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.TIME_INDEX_FILE_SUFFIX), -Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.TXN_INDEX_FILE_SUFFIX), -Paths.get(rootPath, topicPartitionSubpath, uuid + LEADER_EPOCH_CHECKPOINT.getSuffix()), -Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX) +Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.LOG_FILE_SUFFIX), Review Comment: @divijvaidya Thanks for your feedback. I think the actual log file was named as [offset.filetype]. Looking at the implementation of `LogFileUtils#logFile(File dir, long offset)`, I don't think it will allow us to insert a uuid in the middle as part of the filename. If we are to keep the `[offset-uuid.filetype]` pattern, instead of using `LogFileUtils#logFile(File dir, long offset)`, maybe we should make `LogFileUtils#filenamePrefixFromOffset(long offset)` as a public method so that we can construct a real offset using this method. What do you think ? FYI, the method to create these offloaded files is `RemoteLogSegmentFileset#openFileset(final File storageDir, final RemoteLogSegmentId id)` . Currently my PR has changed this method to accept `RemoteLogSegmentMetadata` instead of `RemoteLogSegmentId` , get offset from metadata, and prepend it to the filename. (So yes, it's not close to the actual log file implementation, as the offset was just "0" without formatting, instead of "000") -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15224) Automate version change to snapshot
Divij Vaidya created KAFKA-15224: Summary: Automate version change to snapshot Key: KAFKA-15224 URL: https://issues.apache.org/jira/browse/KAFKA-15224 Project: Kafka Issue Type: Sub-task Reporter: Divij Vaidya We require changing to SNAPSHOT version as part of the release process [1]. The specific manual steps are: Update version on the branch to 0.10.0.1-SNAPSHOT in the following places: * ** docs/js/templateData.js ** gradle.properties ** kafka-merge-pr.py ** streams/quickstart/java/pom.xml ** streams/quickstart/java/src/main/resources/archetype-resources/pom.xml ** streams/quickstart/pom.xml ** tests/kafkatest/__init__.py (note: this version name can't follow the -SNAPSHOT convention due to python version naming restrictions, instead update it to 0.10.0.1.dev0) ** tests/kafkatest/version.py It would be nice if we could run a script to automatically do it. Note that release.py (line 550) already does something similar where it replaces SNAPSHOT with actual version. We need to do the opposite here. We can repurpose that code in release.py and extract into a new script to perform this opertaion. [1] https://cwiki.apache.org/confluence/display/KAFKA/Release+Process -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15224) Automate version change to snapshot
[ https://issues.apache.org/jira/browse/KAFKA-15224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-15224: - Description: We require changing to SNAPSHOT version as part of the release process [1]. The specific manual steps are: Update version on the branch to 0.10.0.1-SNAPSHOT in the following places: * ** docs/js/templateData.js ** gradle.properties ** kafka-merge-pr.py ** streams/quickstart/java/pom.xml ** streams/quickstart/java/src/main/resources/archetype-resources/pom.xml ** streams/quickstart/pom.xml ** tests/kafkatest/_{_}init{_}_.py (note: this version name can't follow the -SNAPSHOT convention due to python version naming restrictions, instead update it to 0.10.0.1.dev0) ** tests/kafkatest/version.py The diff of the changes look like [https://github.com/apache/kafka/commit/484a86feb562f645bdbec74b18f8a28395a686f7#diff-21a0ab11b8bbdab9930ad18d4bca2d943bbdf40d29d68ab8a96f765bd1f9] It would be nice if we could run a script to automatically do it. Note that release.py (line 550) already does something similar where it replaces SNAPSHOT with actual version. We need to do the opposite here. We can repurpose that code in release.py and extract into a new script to perform this opertaion. [1] [https://cwiki.apache.org/confluence/display/KAFKA/Release+Process] was: We require changing to SNAPSHOT version as part of the release process [1]. The specific manual steps are: Update version on the branch to 0.10.0.1-SNAPSHOT in the following places: * ** docs/js/templateData.js ** gradle.properties ** kafka-merge-pr.py ** streams/quickstart/java/pom.xml ** streams/quickstart/java/src/main/resources/archetype-resources/pom.xml ** streams/quickstart/pom.xml ** tests/kafkatest/__init__.py (note: this version name can't follow the -SNAPSHOT convention due to python version naming restrictions, instead update it to 0.10.0.1.dev0) ** tests/kafkatest/version.py It would be nice if we could run a script to automatically do it. Note that release.py (line 550) already does something similar where it replaces SNAPSHOT with actual version. We need to do the opposite here. We can repurpose that code in release.py and extract into a new script to perform this opertaion. [1] https://cwiki.apache.org/confluence/display/KAFKA/Release+Process > Automate version change to snapshot > > > Key: KAFKA-15224 > URL: https://issues.apache.org/jira/browse/KAFKA-15224 > Project: Kafka > Issue Type: Sub-task >Reporter: Divij Vaidya >Priority: Minor > > We require changing to SNAPSHOT version as part of the release process [1]. > The specific manual steps are: > Update version on the branch to 0.10.0.1-SNAPSHOT in the following places: > * > ** docs/js/templateData.js > ** gradle.properties > ** kafka-merge-pr.py > ** streams/quickstart/java/pom.xml > ** streams/quickstart/java/src/main/resources/archetype-resources/pom.xml > ** streams/quickstart/pom.xml > ** tests/kafkatest/_{_}init{_}_.py (note: this version name can't follow the > -SNAPSHOT convention due to python version naming restrictions, instead > update it to 0.10.0.1.dev0) > ** tests/kafkatest/version.py > The diff of the changes look like > [https://github.com/apache/kafka/commit/484a86feb562f645bdbec74b18f8a28395a686f7#diff-21a0ab11b8bbdab9930ad18d4bca2d943bbdf40d29d68ab8a96f765bd1f9] > > > It would be nice if we could run a script to automatically do it. Note that > release.py (line 550) already does something similar where it replaces > SNAPSHOT with actual version. We need to do the opposite here. We can > repurpose that code in release.py and extract into a new script to perform > this opertaion. > [1] [https://cwiki.apache.org/confluence/display/KAFKA/Release+Process] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15200) verify pre-requisite at start of release.py
[ https://issues.apache.org/jira/browse/KAFKA-15200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-15200: - Description: At the start of release.py, the first thing it should do is verify that dependency pre-requisites are satisfied. The pre-requisites are: # maven should be installed. # sftp should be installed. Connection to @home.apache.org should be successful. Currently it is done manually at the step "Verify by using `{{{}sftp @home.apache.org{}}}`" in [https://cwiki.apache.org/confluence/display/KAFKA/Release+Process] # svn should be installed was: At the start of release.py, the first thing it should do is verify that dependency pre-requisites are satisfied. The pre-requisites are: 1. maven should be installed. 2. sftp should be installed. Connection to @home.apache.org should be successful. Currently it is done manually at the step "Verify by using `{{{}sftp @home.apache.org{}}}`" in [https://cwiki.apache.org/confluence/display/KAFKA/Release+Process] > verify pre-requisite at start of release.py > --- > > Key: KAFKA-15200 > URL: https://issues.apache.org/jira/browse/KAFKA-15200 > Project: Kafka > Issue Type: Sub-task >Reporter: Divij Vaidya >Priority: Major > > At the start of release.py, the first thing it should do is verify that > dependency pre-requisites are satisfied. The pre-requisites are: > # maven should be installed. > # sftp should be installed. Connection to @home.apache.org should be > successful. Currently it is done manually at the step "Verify by using > `{{{}sftp @home.apache.org{}}}`" in > [https://cwiki.apache.org/confluence/display/KAFKA/Release+Process] > # svn should be installed -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cadonna commented on pull request #13942: KAFKA-14936: Check the versioned table's history retention and compare to grace period (4/N)
cadonna commented on PR #13942: URL: https://github.com/apache/kafka/pull/13942#issuecomment-1643982125 Build failures are unrelated: ``` Build / JDK 20 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow() Build / JDK 20 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow() Build / JDK 20 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplication() Build / JDK 20 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testReplicateSourceDefault() Build / JDK 17 and Scala 2.13 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor() Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders() Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #13942: KAFKA-14936: Check the versioned table's history retention and compare to grace period (4/N)
cadonna merged PR #13942: URL: https://github.com/apache/kafka/pull/13942 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #14044: KAFKA-15216: InternalSinkRecord::newRecord should not ignore new headers
C0urante merged PR #14044: URL: https://github.com/apache/kafka/pull/14044 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #14041: KAFKA-14469: Add MirrorMaker 2 configs to table of contents in docs page
C0urante merged PR #14041: URL: https://github.com/apache/kafka/pull/14041 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14669) Include MirrorMaker connector configurations in docs
[ https://issues.apache.org/jira/browse/KAFKA-14669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14669: -- Priority: Major (was: Blocker) > Include MirrorMaker connector configurations in docs > > > Key: KAFKA-14669 > URL: https://issues.apache.org/jira/browse/KAFKA-14669 > Project: Kafka > Issue Type: Improvement > Components: docs >Reporter: Mickael Maison >Assignee: Gantigmaa Selenge >Priority: Major > Fix For: 3.6.0 > > > In the https://kafka.apache.org/documentation/#georeplication-flow-configure > section we list some of the MirrorMaker connectors configurations. These are > hardcoded in the docs: > https://github.com/apache/kafka/blob/trunk/docs/ops.html#L768-L788 > Instead we should used the generated docs (added as part of > https://github.com/apache/kafka/commit/40af3a74507cce9155f4fb4fca317d3c68235d78) > like we do for the file connectors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14669) Include MirrorMaker connector configurations in docs
[ https://issues.apache.org/jira/browse/KAFKA-14669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-14669. --- Resolution: Done > Include MirrorMaker connector configurations in docs > > > Key: KAFKA-14669 > URL: https://issues.apache.org/jira/browse/KAFKA-14669 > Project: Kafka > Issue Type: Improvement > Components: docs >Reporter: Mickael Maison >Assignee: Gantigmaa Selenge >Priority: Major > Fix For: 3.6.0 > > > In the https://kafka.apache.org/documentation/#georeplication-flow-configure > section we list some of the MirrorMaker connectors configurations. These are > hardcoded in the docs: > https://github.com/apache/kafka/blob/trunk/docs/ops.html#L768-L788 > Instead we should used the generated docs (added as part of > https://github.com/apache/kafka/commit/40af3a74507cce9155f4fb4fca317d3c68235d78) > like we do for the file connectors. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15216) InternalSinkRecord::newRecord method ignores the headers argument
[ https://issues.apache.org/jira/browse/KAFKA-15216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-15216: -- Fix Version/s: 3.5.2 > InternalSinkRecord::newRecord method ignores the headers argument > - > > Key: KAFKA-15216 > URL: https://issues.apache.org/jira/browse/KAFKA-15216 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Minor > Fix For: 3.6.0, 3.5.2 > > > [https://github.com/apache/kafka/blob/a1f6ab69387deb10988461152a0087f0cd2827c4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java#L50-L56] > - the headers argument passed to the {{InternalSinkRecord}} constructor is > the instance field via the accessor {{headers()}} method instead of the > {{newRecord}} method's {{headers}} argument value. > > Originally discovered > [here.|https://github.com/apache/kafka/pull/14024#discussion_r1266917499] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15216) InternalSinkRecord::newRecord method ignores the headers argument
[ https://issues.apache.org/jira/browse/KAFKA-15216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-15216: -- Fix Version/s: 3.4.2 > InternalSinkRecord::newRecord method ignores the headers argument > - > > Key: KAFKA-15216 > URL: https://issues.apache.org/jira/browse/KAFKA-15216 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Minor > Fix For: 3.6.0, 3.4.2, 3.5.2 > > > [https://github.com/apache/kafka/blob/a1f6ab69387deb10988461152a0087f0cd2827c4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java#L50-L56] > - the headers argument passed to the {{InternalSinkRecord}} constructor is > the instance field via the accessor {{headers()}} method instead of the > {{newRecord}} method's {{headers}} argument value. > > Originally discovered > [here.|https://github.com/apache/kafka/pull/14024#discussion_r1266917499] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15216) InternalSinkRecord::newRecord method ignores the headers argument
[ https://issues.apache.org/jira/browse/KAFKA-15216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-15216: -- Fix Version/s: 3.3.3 > InternalSinkRecord::newRecord method ignores the headers argument > - > > Key: KAFKA-15216 > URL: https://issues.apache.org/jira/browse/KAFKA-15216 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Yash Mayya >Assignee: Yash Mayya >Priority: Minor > Fix For: 3.3.3, 3.6.0, 3.4.2, 3.5.2 > > > [https://github.com/apache/kafka/blob/a1f6ab69387deb10988461152a0087f0cd2827c4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java#L50-L56] > - the headers argument passed to the {{InternalSinkRecord}} constructor is > the instance field via the accessor {{headers()}} method instead of the > {{newRecord}} method's {{headers}} argument value. > > Originally discovered > [here.|https://github.com/apache/kafka/pull/14024#discussion_r1266917499] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] yashmayya commented on pull request #14024: KAFKA-13431: Expose the original pre-transform topic partition and offset in sink records
yashmayya commented on PR #14024: URL: https://github.com/apache/kafka/pull/14024#issuecomment-1644061803 Thanks Chris, I've rebased this on the latest `trunk`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bmscomp commented on pull request #14060: KAFKA-15222: Upgrade zinc Scala incremental compiler plugin version to a latests stable fit version (1.9.2)
bmscomp commented on PR #14060: URL: https://github.com/apache/kafka/pull/14060#issuecomment-1644078470 @It's ok now, things seems more stable, but there is some failure on building kafak with jdk 20 that has no relation with zinc compiler, Notice that for all build the retry_zinc step is an without issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13313: KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime
gharris1727 commented on PR #13313: URL: https://github.com/apache/kafka/pull/13313#issuecomment-1644151941 I ran a full system test run: ``` SESSION REPORT (ALL TESTS) ducktape version: 0.11.3 session_id: 2023-07-18--002 run time: 1602 minutes 29.170 seconds tests run:1096 passed: 786 flaky:0 failed: 20 ignored: 290 ``` With the following failed tests: ``` 'tests/kafkatest/tests/core/throttling_test.py::ThrottlingTest.test_throttled_reassignment@{"bounce_brokers":true}' 'tests/kafkatest/tests/core/mirror_maker_test.py::TestMirrorMakerService.test_bounce@{"clean_shutdown":false,"security_protocol":"SASL_PLAINTEXT"}' 'tests/kafkatest/tests/core/mirror_maker_test.py::TestMirrorMakerService.test_bounce@{"clean_shutdown":false,"security_protocol":"SASL_SSL"}' 'tests/kafkatest/tests/core/reassign_partitions_test.py::ReassignPartitionsTest.test_reassign_partitions@{"bounce_brokers":false,"reassign_from_offset_zero":false,"metadata_quorum":"REMOTE_KRAFT"}' 'tests/kafkatest/tests/core/reassign_partitions_test.py::ReassignPartitionsTest.test_reassign_partitions@{"bounce_brokers":true,"reassign_from_offset_zero":false,"metadata_quorum":"REMOTE_KRAFT"}' 'tests/kafkatest/tests/core/reassign_partitions_test.py::ReassignPartitionsTest.test_reassign_partitions@{"bounce_brokers":true,"reassign_from_offset_zero":false,"metadata_quorum":"ZK"}' 'tests/kafkatest/tests/streams/streams_smoke_test.py::StreamsSmokeTest.test_streams@{"processing_guarantee":"at_least_once","crash":false,"metadata_quorum":"REMOTE_KRAFT"}' 'tests/kafkatest/tests/core/mirror_maker_test.py::TestMirrorMakerService.test_bounce@{"clean_shutdown":false,"security_protocol":"PLAINTEXT"}' 'tests/kafkatest/tests/core/mirror_maker_test.py::TestMirrorMakerService.test_bounce@{"clean_shutdown":false,"security_protocol":"SSL"}' 'tests/kafkatest/tests/tools/replica_verification_test.py::ReplicaVerificationToolTest.test_replica_lags@{"metadata_quorum":"REMOTE_KRAFT"}' 'tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota@{"quota_type":"(user, client-id)","override_quota":false}' 'tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota@{"quota_type":"(user, client-id)","override_quota":true}' 'tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota@{"quota_type":"client-id","consumer_num":2}' 'tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota@{"quota_type":"client-id","old_broker_throttling_behavior":true}' 'tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota@{"quota_type":"client-id","old_client_throttling_behavior":true}' 'tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota@{"quota_type":"client-id","override_quota":false}' 'tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota@{"quota_type":"client-id","override_quota":true}' 'tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota@{"quota_type":"user","override_quota":false}' 'tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota@{"quota_type":"user","override_quota":true}' 'tests/kafkatest/tests/core/network_degrade_test.py::NetworkDegradeTest.test_rate@{"task_name":"rate-1000-latency-50","device_name":"eth0","latency_ms":50,"rate_limit_kbit":100}' ``` None of which make use of the 0.8.2.x artifacts version which is being affected here. In particular, the test which I was concerned about (upgrade_test.py from_kafka_version=0.8.2.2.to_message_format_version=None.compression_types=.snappy FAIL) does pass on this i86_64 machine when it failed on my arm64 machine, indicating that the failure was due to native library dependencies missing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #14057: KAFKA-15194-Prepend-Offset-as-Filename
divijvaidya commented on code in PR #14057: URL: https://github.com/apache/kafka/pull/14057#discussion_r1269646588 ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java: ## @@ -59,9 +59,9 @@ * the local tiered storage: * * - * / storage-directory / topic-partition-uuidBase64 / oAtiIQ95REujbuzNd_lkLQ.log - * . oAtiIQ95REujbuzNd_lkLQ.index - * . oAtiIQ95REujbuzNd_lkLQ.timeindex + * / storage-directory / topic-partition-uuidBase64 / startOffset-oAtiIQ95REujbuzNd_lkLQ.log Review Comment: nit Please replace "startOffset" with dummy values. ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java: ## @@ -399,20 +403,21 @@ public Verifier(final LocalTieredStorage remoteStorage, final TopicIdPartition t this.topicIdPartition = requireNonNull(topicIdPartition); } -private List expectedPaths(final RemoteLogSegmentId id) { +private List expectedPaths(final RemoteLogSegmentMetadata metadata) { final String rootPath = getStorageRootDirectory(); TopicPartition tp = topicIdPartition.topicPartition(); final String topicPartitionSubpath = format("%s-%d-%s", tp.topic(), tp.partition(), topicIdPartition.topicId()); -final String uuid = id.id().toString(); +final String uuid = metadata.remoteLogSegmentId().id().toString(); +final long startOffset = metadata.startOffset(); return Arrays.asList( -Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.LOG_FILE_SUFFIX), -Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.INDEX_FILE_SUFFIX), -Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.TIME_INDEX_FILE_SUFFIX), -Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.TXN_INDEX_FILE_SUFFIX), -Paths.get(rootPath, topicPartitionSubpath, uuid + LEADER_EPOCH_CHECKPOINT.getSuffix()), -Paths.get(rootPath, topicPartitionSubpath, uuid + LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX) +Paths.get(rootPath, topicPartitionSubpath, startOffset + "-" + uuid + LogFileUtils.LOG_FILE_SUFFIX), Review Comment: > I don't think it will allow us to insert a uuid in the middle as part of the filename. Ack. I missed that. > maybe we should make LogFileUtils#filenamePrefixFromOffset(long offset) as a public method so that we can construct a real offset using this method. What do you think ? Yes please. Let's use that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13313: KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime
gharris1727 commented on PR #13313: URL: https://github.com/apache/kafka/pull/13313#issuecomment-1644153313 @ijuma Could you take another look at this? This is blocking KIP-898 that I'm trying to get landed in time for 3.6.0. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13998: KAFKA-14702: Extend server side assignor to support rack aware replica placement
rreddy-22 commented on code in PR #13998: URL: https://github.com/apache/kafka/pull/13998#discussion_r1269665264 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java: ## @@ -52,33 +42,22 @@ public Map members() { return members; } -/** - * @return Topic metadata keyed by topic Ids. - */ -public Map topics() { -return topics; -} - @Override public boolean equals(Object o) { if (this == o) return true; -if (o == null || getClass() != o.getClass()) return false; +if (!(o instanceof AssignmentSpec)) return false; Review Comment: Sry I just auto-generated these functions, is there a reason why one is better than the other? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya merged pull request #13874: KAFKA-14133: Migrate various mocks in TaskManagerTest to Mockito
divijvaidya merged PR #13874: URL: https://github.com/apache/kafka/pull/13874 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #13990: KAFKA-14937: Refactoring for client code to reduce boilerplate
junrao commented on PR #13990: URL: https://github.com/apache/kafka/pull/13990#issuecomment-1644221461 @kirktrue : It seems there were 4 test failures for jdk 11. But the tests for jdk 17 and 20 were aborted. Do you know why? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request, #14062: MINOR: Add a Builder for KRaftMigrationDriver
mumrah opened a new pull request, #14062: URL: https://github.com/apache/kafka/pull/14062 The number of arguments for KRaftMigrationDriver has grown rather large and there are already two constructors. This PR refactors the class to have a single package-private constructor and a builder that can be used by tests and ControllerServer. No other changes in this patch, just refactoring the constructor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error
mumrah commented on code in PR #14046: URL: https://github.com/apache/kafka/pull/14046#discussion_r1269710067 ## clients/src/main/java/org/apache/kafka/common/errors/StaleMemberEpochException.java: ## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +import org.apache.kafka.common.annotation.InterfaceStability; + +@InterfaceStability.Evolving +public class StaleMemberEpochException extends ApiException { Review Comment: I know there isn't much precedent for this, but it might be useful to include a doc string here explaining which RPC this error is used in and at what version ## clients/src/main/resources/common/message/OffsetCommitRequest.json: ## @@ -31,13 +31,19 @@ // version 7 adds a new field called groupInstanceId to indicate member identity across restarts. // // Version 8 is the first flexible version. - "validVersions": "0-8", + // + // Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The + // request is the same as version 8. + // Version 9 is added as part of KIP-848 and is still under development. Hence, the last version of the + // API is not exposed by default by brokers unless explicitly enabled. + "latestVersionUnstable": true, + "validVersions": "0-9", "flexibleVersions": "8+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The unique group identifier." }, -{ "name": "GenerationId", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true, - "about": "The generation of the group." }, +{ "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true, + "about": "The generation of the group if the generic group protocol or the member epoch if the consumer protocol." }, Review Comment: How does the server decide to interpret this value as a GenerationId vs a MemberEpoch? Is it based on the API version used? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error
jolshan commented on code in PR #14046: URL: https://github.com/apache/kafka/pull/14046#discussion_r1269711912 ## clients/src/main/resources/common/message/OffsetCommitRequest.json: ## @@ -31,13 +31,19 @@ // version 7 adds a new field called groupInstanceId to indicate member identity across restarts. // // Version 8 is the first flexible version. - "validVersions": "0-8", + // + // Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The + // request is the same as version 8. + // Version 9 is added as part of KIP-848 and is still under development. Hence, the last version of the + // API is not exposed by default by brokers unless explicitly enabled. + "latestVersionUnstable": true, + "validVersions": "0-9", "flexibleVersions": "8+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The unique group identifier." }, -{ "name": "GenerationId", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true, - "about": "The generation of the group." }, +{ "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true, + "about": "The generation of the group if the generic group protocol or the member epoch if the consumer protocol." }, Review Comment: The new group coordinator uses the member epoch and the old one uses the generation id I believe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related
mumrah commented on code in PR #14047: URL: https://github.com/apache/kafka/pull/14047#discussion_r1269713183 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -467,6 +471,72 @@ public static Record newEmptyGroupMetadataRecord( ); } +/** + * Creates an OffsetCommit record. + * + * @param groupId The group id. + * @param topic The topic name. + * @param partitionId The partition id. + * @param offsetAndMetadata The offset and metadata. + * @param metadataVersion The metadata version. + * @return The record. + */ +public static Record newOffsetCommitRecord( +String groupId, +String topic, +int partitionId, +OffsetAndMetadata offsetAndMetadata, +MetadataVersion metadataVersion +) { +short version = offsetAndMetadata.expireTimestampMs.isPresent() ? +(short) 1 : metadataVersion.offsetCommitValueVersion(); + +return new Record( +new ApiMessageAndVersion( +new OffsetCommitKey() +.setGroup(groupId) +.setTopic(topic) +.setPartition(partitionId), +(short) 1 Review Comment: Can we define these `(short) 1` as a constant? That might reduce the changes of us changing one without the others in the future ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -467,6 +471,72 @@ public static Record newEmptyGroupMetadataRecord( ); } +/** + * Creates an OffsetCommit record. + * + * @param groupId The group id. + * @param topic The topic name. + * @param partitionId The partition id. + * @param offsetAndMetadata The offset and metadata. + * @param metadataVersion The metadata version. + * @return The record. + */ +public static Record newOffsetCommitRecord( +String groupId, +String topic, +int partitionId, +OffsetAndMetadata offsetAndMetadata, +MetadataVersion metadataVersion +) { +short version = offsetAndMetadata.expireTimestampMs.isPresent() ? +(short) 1 : metadataVersion.offsetCommitValueVersion(); Review Comment: Would it make sense to relocate this logic and the linked logic into MetadataVersion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error
jolshan commented on PR #14046: URL: https://github.com/apache/kafka/pull/14046#issuecomment-1644264091 > @jolshan I was actually thinking about the AuthorizerIntegrationTest failures overnight and I found an issue with the latestVersionUnstable flag. Let me try to explain. I was curious if the unstable version flag was causing issues since I recall some weirdness in tests when I had an unstable version. Makes sense to require the unstable-ness to be explicit, but I will take a second look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error
dajac commented on code in PR #14046: URL: https://github.com/apache/kafka/pull/14046#discussion_r1269732280 ## clients/src/main/resources/common/message/OffsetCommitRequest.json: ## @@ -31,13 +31,19 @@ // version 7 adds a new field called groupInstanceId to indicate member identity across restarts. // // Version 8 is the first flexible version. - "validVersions": "0-8", + // + // Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The + // request is the same as version 8. + // Version 9 is added as part of KIP-848 and is still under development. Hence, the last version of the + // API is not exposed by default by brokers unless explicitly enabled. + "latestVersionUnstable": true, + "validVersions": "0-9", "flexibleVersions": "8+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The unique group identifier." }, -{ "name": "GenerationId", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true, - "about": "The generation of the group." }, +{ "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true, + "about": "The generation of the group if the generic group protocol or the member epoch if the consumer protocol." }, Review Comment: It based on the type of the group. In the new group coordinator, we have two types: generic (the old protocol) and consumer (the new protocol). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related
dajac commented on code in PR #14047: URL: https://github.com/apache/kafka/pull/14047#discussion_r1269737507 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -467,6 +471,72 @@ public static Record newEmptyGroupMetadataRecord( ); } +/** + * Creates an OffsetCommit record. + * + * @param groupId The group id. + * @param topic The topic name. + * @param partitionId The partition id. + * @param offsetAndMetadata The offset and metadata. + * @param metadataVersion The metadata version. + * @return The record. + */ +public static Record newOffsetCommitRecord( +String groupId, +String topic, +int partitionId, +OffsetAndMetadata offsetAndMetadata, +MetadataVersion metadataVersion +) { +short version = offsetAndMetadata.expireTimestampMs.isPresent() ? +(short) 1 : metadataVersion.offsetCommitValueVersion(); Review Comment: Yeah, I was debating whether the `offsetAndMetadata.expireTimestampMs.isPresent()` part of this should be in MetadataVersion or not. I could pass a boolean for this purpose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related
dajac commented on code in PR #14047: URL: https://github.com/apache/kafka/pull/14047#discussion_r1269744467 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -467,6 +471,72 @@ public static Record newEmptyGroupMetadataRecord( ); } +/** + * Creates an OffsetCommit record. + * + * @param groupId The group id. + * @param topic The topic name. + * @param partitionId The partition id. + * @param offsetAndMetadata The offset and metadata. + * @param metadataVersion The metadata version. + * @return The record. + */ +public static Record newOffsetCommitRecord( +String groupId, +String topic, +int partitionId, +OffsetAndMetadata offsetAndMetadata, +MetadataVersion metadataVersion +) { +short version = offsetAndMetadata.expireTimestampMs.isPresent() ? +(short) 1 : metadataVersion.offsetCommitValueVersion(); + +return new Record( +new ApiMessageAndVersion( +new OffsetCommitKey() +.setGroup(groupId) +.setTopic(topic) +.setPartition(partitionId), +(short) 1 Review Comment: I actually used the value on purpose vs using something like `ConsumerGroupTargetAssignmentMemberKey.HIGHEST_SUPPORTED_VERSION ` in order to not change it by mistake. I wanted to rework the format of those records to include an api key and to auto-generate the constants based on it. In the mean time, we could define them manually. Do you mind if I address separably though? I will do it for all the records at once. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related
dajac commented on code in PR #14047: URL: https://github.com/apache/kafka/pull/14047#discussion_r1269744467 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -467,6 +471,72 @@ public static Record newEmptyGroupMetadataRecord( ); } +/** + * Creates an OffsetCommit record. + * + * @param groupId The group id. + * @param topic The topic name. + * @param partitionId The partition id. + * @param offsetAndMetadata The offset and metadata. + * @param metadataVersion The metadata version. + * @return The record. + */ +public static Record newOffsetCommitRecord( +String groupId, +String topic, +int partitionId, +OffsetAndMetadata offsetAndMetadata, +MetadataVersion metadataVersion +) { +short version = offsetAndMetadata.expireTimestampMs.isPresent() ? +(short) 1 : metadataVersion.offsetCommitValueVersion(); + +return new Record( +new ApiMessageAndVersion( +new OffsetCommitKey() +.setGroup(groupId) +.setTopic(topic) +.setPartition(partitionId), +(short) 1 Review Comment: I actually used the value on purpose vs using something like `ConsumerGroupTargetAssignmentMemberKey.HIGHEST_SUPPORTED_VERSION ` in order to not change it by mistake. I wanted to rework the format of those records to include an api key and to auto-generate the constants based on it. In the mean time, we could define them manually. Do you mind if I address this separably though? I will do it for all the records at once. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error
jolshan commented on PR #14046: URL: https://github.com/apache/kafka/pull/14046#issuecomment-1644292908 Looking at the tests `[Build / JDK 20 and Scala 2.13 / kafka.server.FetchRequestTest.testCurrentEpochValidationV12()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14046/7/testReport/junit/kafka.server/FetchRequestTest/Build___JDK_20_and_Scala_2_13___testCurrentEpochValidationV12__/)` is a bit strange but it only failed on that version. Everything else seems to be familiar-ish flakes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] msn-tldr opened a new pull request, #14063: Kip951 poc
msn-tldr opened a new pull request, #14063: URL: https://github.com/apache/kafka/pull/14063 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
rreddy-22 commented on PR #13920: URL: https://github.com/apache/kafka/pull/13920#issuecomment-1644359975 Looks good to me! Thanks @flashmouse for the changes and replies! @dajac is a committer so he'll give the final approval! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related
dajac commented on code in PR #14047: URL: https://github.com/apache/kafka/pull/14047#discussion_r1269799340 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java: ## @@ -467,6 +471,72 @@ public static Record newEmptyGroupMetadataRecord( ); } +/** + * Creates an OffsetCommit record. + * + * @param groupId The group id. + * @param topic The topic name. + * @param partitionId The partition id. + * @param offsetAndMetadata The offset and metadata. + * @param metadataVersion The metadata version. + * @return The record. + */ +public static Record newOffsetCommitRecord( +String groupId, +String topic, +int partitionId, +OffsetAndMetadata offsetAndMetadata, +MetadataVersion metadataVersion +) { +short version = offsetAndMetadata.expireTimestampMs.isPresent() ? +(short) 1 : metadataVersion.offsetCommitValueVersion(); + +return new Record( +new ApiMessageAndVersion( +new OffsetCommitKey() +.setGroup(groupId) +.setTopic(topic) +.setPartition(partitionId), +(short) 1 Review Comment: Filed https://issues.apache.org/jira/browse/KAFKA-15225 for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15225) Define constants for record types
David Jacot created KAFKA-15225: --- Summary: Define constants for record types Key: KAFKA-15225 URL: https://issues.apache.org/jira/browse/KAFKA-15225 Project: Kafka Issue Type: Sub-task Reporter: David Jacot Assignee: David Jacot Define constants for all the record types. Ideally, this should be defined in the record definitions and the constants should be auto-generated (e.g. like ApiKeys). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13998: KAFKA-14702: Extend server side assignor to support rack aware replica placement
rreddy-22 commented on code in PR #13998: URL: https://github.com/apache/kafka/pull/13998#discussion_r1269837065 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicDescriber.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Set; + +/** + * The assignment topic describer is used by the {@link PartitionAssignor} + * to obtain topic and partition metadata of subscribed topics. + * + * The interface is kept in an internal module until KIP-848 is fully + * implemented and ready to be released. + */ +@InterfaceStability.Unstable +public interface AssignmentTopicDescriber { Review Comment: Yeah I named it this way cause I was just wondering if it'd be more uniform with assignmentSpec but I'll change it cause I agree -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13998: KAFKA-14702: Extend server side assignor to support rack aware replica placement
rreddy-22 commented on code in PR #13998: URL: https://github.com/apache/kafka/pull/13998#discussion_r1269846580 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicDescriber.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Set; + +/** + * The assignment topic describer is used by the {@link PartitionAssignor} + * to obtain topic and partition metadata of subscribed topics. + * + * The interface is kept in an internal module until KIP-848 is fully + * implemented and ready to be released. + */ +@InterfaceStability.Unstable +public interface AssignmentTopicDescriber { + +/** + * Returns a set of subscribed topicIds. + * + * @return Set of topicIds corresponding to the subscribed topics. + */ +Set subscribedTopicIds(); + +/** + * Number of partitions for the given topicId. Review Comment: It says topicId singular already, did we want a space between topic and Id -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15226) System tests for plugin.discovery worker configuration
Greg Harris created KAFKA-15226: --- Summary: System tests for plugin.discovery worker configuration Key: KAFKA-15226 URL: https://issues.apache.org/jira/browse/KAFKA-15226 Project: Kafka Issue Type: Test Components: KafkaConnect Reporter: Greg Harris Assignee: Greg Harris Add system tests as described in KIP-898, targeting the startup behavior of the connect worker, various states of plugin migration, and the migration script. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15227) Use plugin.discovery=SERVICE_LOAD in all plugin test suites
Greg Harris created KAFKA-15227: --- Summary: Use plugin.discovery=SERVICE_LOAD in all plugin test suites Key: KAFKA-15227 URL: https://issues.apache.org/jira/browse/KAFKA-15227 Project: Kafka Issue Type: Test Components: KafkaConnect Reporter: Greg Harris Assignee: Greg Harris To speed up these tests where we know all plugins are migrated, use SERVICE_LOAD mode in all known test suites. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 merged pull request #13313: KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime
gharris1727 merged PR #13313: URL: https://github.com/apache/kafka/pull/13313 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13313: KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime
gharris1727 commented on PR #13313: URL: https://github.com/apache/kafka/pull/13313#issuecomment-1644520071 Thanks for your help Ismael! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors
C0urante commented on code in PR #14005: URL: https://github.com/apache/kafka/pull/14005#discussion_r1269901272 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ## @@ -597,7 +596,9 @@ private Set listPartitions( Admin admin, Collection topics ) throws TimeoutException, InterruptedException, ExecutionException { -assertFalse("collection of topics may not be empty", topics.isEmpty()); Review Comment: Did the same in `assertConnectorAndExactlyNumTasksAreRunning`. ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ## @@ -65,27 +72,170 @@ static String encodeTopicPartition(TopicPartition topicPartition) { static Map wrapPartition(TopicPartition topicPartition, String sourceClusterAlias) { Map wrapped = new HashMap<>(); -wrapped.put("topic", topicPartition.topic()); -wrapped.put("partition", topicPartition.partition()); -wrapped.put("cluster", sourceClusterAlias); +wrapped.put(TOPIC_KEY, topicPartition.topic()); +wrapped.put(PARTITION_KEY, topicPartition.partition()); +wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias); return wrapped; } -static Map wrapOffset(long offset) { -return Collections.singletonMap("offset", offset); +public static Map wrapOffset(long offset) { +return Collections.singletonMap(OFFSET_KEY, offset); } -static TopicPartition unwrapPartition(Map wrapped) { -String topic = (String) wrapped.get("topic"); -int partition = (Integer) wrapped.get("partition"); +public static TopicPartition unwrapPartition(Map wrapped) { +String topic = (String) wrapped.get(TOPIC_KEY); +int partition = (Integer) wrapped.get(PARTITION_KEY); return new TopicPartition(topic, partition); } static Long unwrapOffset(Map wrapped) { -if (wrapped == null || wrapped.get("offset") == null) { +if (wrapped == null || wrapped.get(OFFSET_KEY) == null) { return -1L; } -return (Long) wrapped.get("offset"); +return (Long) wrapped.get(OFFSET_KEY); +} + + +/** + * Validate a specific key in a source partition that may be written to the offsets topic for one of the MM2 connectors. + * This method ensures that the key is present in the source partition map and that its value is a string. + * + * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, Map) + * @see SourceRecord#sourcePartition() + * + * @param sourcePartition the to-be-validated source partition; may not be null + * @param key the key to check for in the source partition; may be null + * + * @throws ConnectException if the offset is invalid + */ +static void validateSourcePartitionString(Map sourcePartition, String key) { +Objects.requireNonNull(sourcePartition, "Source partition may not be null"); + +if (!sourcePartition.containsKey(key)) +throw new ConnectException(String.format( +"Source partition %s is missing the '%s' key, which is required", +sourcePartition, +key +)); + +Object value = sourcePartition.get(key); +if (!(value instanceof String)) { +throw new ConnectException(String.format( +"Source partition %s has an invalid value %s for the '%s' key, which must be a string", +sourcePartition, +value, +key +)); +} +} + +/** + * Validate the {@link #PARTITION_KEY partition key} in a source partition that may be written to the offsets topic + * for one of the MM2 connectors. + * This method ensures that the key is present in the source partition map and that its value is a non-negative integer. + * + * Note that the partition key most likely refers to a partition in a Kafka topic, whereas the term "source partition" refers + * to a {@link SourceRecord#sourcePartition() source partition} that is stored in a Kafka Connect worker's internal offsets + * topic (or, if running in standalone mode, offsets file). + * + * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, Map) + * @see SourceRecord#sourcePartition() + * + * @param sourcePartition the to-be-validated source partition; may not be null + * + * @throws ConnectException if the offset is invalid + */ +static void validateSourcePartitionPartition(Map sourcePartition) { +Objects.requireNonNull(sourcePartition, "Source partition may not be null"); + +if (!sourcePartition.containsKey(PARTITION_KEY)) +throw new ConnectException(String.format( +"Source partition %s is missing the '%s' key
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13998: KAFKA-14702: Extend server side assignor to support rack aware replica placement
rreddy-22 commented on code in PR #13998: URL: https://github.com/apache/kafka/pull/13998#discussion_r1269927386 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicDescriber.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Set; + +/** + * The assignment topic describer is used by the {@link PartitionAssignor} + * to obtain topic and partition metadata of subscribed topics. + * + * The interface is kept in an internal module until KIP-848 is fully + * implemented and ready to be released. + */ +@InterfaceStability.Unstable +public interface AssignmentTopicDescriber { + +/** + * Returns a set of subscribed topicIds. + * + * @return Set of topicIds corresponding to the subscribed topics. + */ +Set subscribedTopicIds(); + +/** + * Number of partitions for the given topicId. + * + * @param topicId Uuid corresponding to the topic. + * @return The number of partitions corresponding to the given topicId. + * If the topicId doesn't exist return 0; + */ +int numPartitions(Uuid topicId); + +/** + * Returns all the racks associated with the replicas for the given partition. + * + * @param topicId Uuid corresponding to the partition's topic. + * @param partition Partition number within topic. Review Comment: partition number is used a lot throughout the kafka code and I thought it's easier to understand than Id even though they're interchangeable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org