[GitHub] [kafka] Vaibhav-Nazare commented on pull request #13817: KAFKA-15062: Adding ppc64le build stage

2023-07-20 Thread via GitHub


Vaibhav-Nazare commented on PR #13817:
URL: https://github.com/apache/kafka/pull/13817#issuecomment-1643386529

   Hi @cmccabe @mimaison @divijvaidya Any further updates on nightly job 
enablement?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #14045: MINOR: refactor(storage): topic-based RLMM consumer-manager/task related improvements

2023-07-20 Thread via GitHub


jeqo commented on code in PR #14045:
URL: https://github.com/apache/kafka/pull/14045#discussion_r1269038188


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java:
##
@@ -182,13 +182,12 @@ private CompletableFuture 
storeRemoteLogMetadata(TopicIdPartition topicIdP
 CompletableFuture produceFuture = 
producerManager.publishMessage(remoteLogMetadata);
 
 // Create and return a `CompletableFuture` instance which 
completes when the consumer is caught up with the produced record's offset.
-return produceFuture.thenApplyAsync(recordMetadata -> {
+return produceFuture.thenAcceptAsync(recordMetadata -> {
 try {
-
consumerManager.waitTillConsumptionCatchesUp(recordMetadata);
+
consumerManager.waitTillConsumptionCatchesUp(recordMetadata.partition(), 
recordMetadata.offset());

Review Comment:
   Of course, missed this one -- pushed fixes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on pull request #13278: KAFKA-14591 DeleteRecordsCommand moved to tools

2023-07-20 Thread via GitHub


nizhikov commented on PR #13278:
URL: https://github.com/apache/kafka/pull/13278#issuecomment-1643445734

   @mimaison I reworked `parseOffsetJsonStringWithoutDedup` to return 
`Map>`.
   Now, Tuple class eliminated from PR. Please, review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #14053: [KAFKA-15221] Fix the race between fetch requests from a rebooted follower.

2023-07-20 Thread via GitHub


dajac commented on PR #14053:
URL: https://github.com/apache/kafka/pull/14053#issuecomment-1643446391

   @CalvinConfluent Thanks for the PR. There is indeed something fishy here. 
Could you please try to better explain the race condition in the description? 
My understanding is that we may pick the wrong broker epoch when we construct 
the AlterPartition request because we don't really respect the atomic reference 
(we read it multiple times vs working on a consistent snapshot). Is my 
understanding correct?
   
   If so, I agree that acquiring the isr lock solve the issue. However, it goes 
a bit against what we tried to achieve with the atomic reference. Our goal was 
to not acquire this lock. Therefore, I think that we should ensure that 
acquiring it does not impact the performances. Have we validated this? I also 
wonder if we could rework the shrink/expand paths to work based on a snapshot 
instead of acquiring the isr lock here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14053: [KAFKA-15221] Fix the race between fetch requests from a rebooted follower.

2023-07-20 Thread via GitHub


dajac commented on code in PR #14053:
URL: https://github.com/apache/kafka/pull/14053#discussion_r1269075133


##
core/src/main/scala/kafka/cluster/Replica.scala:
##
@@ -98,31 +101,39 @@ class Replica(val brokerId: Int, val topicPartition: 
TopicPartition) extends Log
* fetch request is always smaller than the leader's LEO, which can happen 
if small produce requests are received at
* high frequency.
*/
-  def updateFetchState(
+  def maybeUpdateFetchState(
 followerFetchOffsetMetadata: LogOffsetMetadata,
 followerStartOffset: Long,
 followerFetchTimeMs: Long,
 leaderEndOffset: Long,
 brokerEpoch: Long
-  ): Unit = {
+  ): Boolean = {
+var updateSuccess = true
 replicaState.updateAndGet { currentReplicaState =>
-  val lastCaughtUpTime = if (followerFetchOffsetMetadata.messageOffset >= 
leaderEndOffset) {
-math.max(currentReplicaState.lastCaughtUpTimeMs, followerFetchTimeMs)
-  } else if (followerFetchOffsetMetadata.messageOffset >= 
currentReplicaState.lastFetchLeaderLogEndOffset) {
-math.max(currentReplicaState.lastCaughtUpTimeMs, 
currentReplicaState.lastFetchTimeMs)
+  // Fence the update if it provides a stale broker epoch.
+  if (brokerEpoch != -1 && brokerEpoch < 
currentReplicaState.brokerEpoch.getOrElse(-1L)) {
+updateSuccess = false

Review Comment:
   It is not recommended to have side effect in the method updating the state 
of an atomic value. I wonder if we could throw an exception instead to really 
ensure that the update process is stopped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13817: KAFKA-15062: Adding ppc64le build stage

2023-07-20 Thread via GitHub


mimaison commented on PR #13817:
URL: https://github.com/apache/kafka/pull/13817#issuecomment-1643452195

   @Vaibhav-Nazare The KIP needs to be voted. So far you've only started a 
discussion. I'd recommend replying to the discussion thread to ask for any more 
feedback, otherwise you can start a vote in the next few days. See 
xhttps://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals 
for the process


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14053: [KAFKA-15221] Fix the race between fetch requests from a rebooted follower.

2023-07-20 Thread via GitHub


dajac commented on code in PR #14053:
URL: https://github.com/apache/kafka/pull/14053#discussion_r1269078528


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -858,13 +858,22 @@ class Partition(val topicPartition: TopicPartition,
 // No need to calculate low watermark if there is no delayed 
DeleteRecordsRequest
 val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) 
lowWatermarkIfLeader else -1L
 val prevFollowerEndOffset = replica.stateSnapshot.logEndOffset
-replica.updateFetchState(
-  followerFetchOffsetMetadata,
-  followerStartOffset,
-  followerFetchTimeMs,
-  leaderEndOffset,
-  brokerEpoch
-)
+
+// Acquire the lock for the fetch state update. A race can happen between 
fetch requests from a rebooted broker.
+// The requests before and after the reboot can carry different fetch 
metadata especially offsets and broker epoch.
+// It can particularly affect the ISR expansion where we decide to expand 
based on stale fetch request but use the
+// latest broker epoch to fill in the AlterPartition request.
+inReadLock(leaderIsrUpdateLock) {
+  if (!replica.maybeUpdateFetchState(
+followerFetchOffsetMetadata,
+followerStartOffset,
+followerFetchTimeMs,
+leaderEndOffset,
+brokerEpoch
+  )) {
+return

Review Comment:
   Should we return an error here? `followerReplicaOrThrow` may be a good 
inspiration for the errors that we could use here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13303) RoundRobinPartitioner broken by KIP-480

2023-07-20 Thread zhangzhisheng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744943#comment-17744943
 ] 

zhangzhisheng commented on KAFKA-13303:
---

its's ture that RoundRobinPartitioner lead to data imbalance in some cases

> RoundRobinPartitioner broken by KIP-480
> ---
>
> Key: KAFKA-13303
> URL: https://issues.apache.org/jira/browse/KAFKA-13303
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.4.0, 2.5.0, 2.4.1, 2.5.1
>Reporter: Jon McEwen
>Priority: Minor
>
> Since KIP-480 Sticky Partitioning, the RoundRobinPartitioner doesn't behave 
> correctly.  An additional call to `partition()` on new batch leads to 
> partitions being skipped.
>  
> I have a fix that I would like to contribute, but I need help getting started 
> as a contributor, e.g. for basic things like formatting the code.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-9965) Uneven distribution with RoundRobinPartitioner in AK 2.4+

2023-07-20 Thread zhangzhisheng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744944#comment-17744944
 ] 

zhangzhisheng commented on KAFKA-9965:
--

(y)

> Uneven distribution with RoundRobinPartitioner in AK 2.4+
> -
>
> Key: KAFKA-9965
> URL: https://issues.apache.org/jira/browse/KAFKA-9965
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: Michael Bingham
>Priority: Major
>
> {{RoundRobinPartitioner}} states that it will provide equal distribution of 
> records across partitions. However with the enhancements made in KIP-480, it 
> may not. In some cases, when a new batch is started, the partitioner may be 
> called a second time for the same record:
> [https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L909]
> [https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L934]
> Each time the partitioner is called, it increments a counter in 
> {{RoundRobinPartitioner}}, so this can result in unequal distribution.
> Easiest fix might be to decrement the counter in 
> {{RoundRobinPartitioner#onNewBatch}}.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Owen-CH-Leung opened a new pull request, #14057: KAFKA-15194-Prepend-Offset-as-Filename

2023-07-20 Thread via GitHub


Owen-CH-Leung opened a new pull request, #14057:
URL: https://github.com/apache/kafka/pull/14057

   Prepend the offset information to the filename.
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-9964) Better description of RoundRobinPartitioner behavior for AK 2.4+

2023-07-20 Thread zhangzhisheng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744947#comment-17744947
 ] 

zhangzhisheng commented on KAFKA-9964:
--

it's bug, pls follow RoundRobinPartitioner broken by KIP-480

> Better description of RoundRobinPartitioner behavior for AK 2.4+
> 
>
> Key: KAFKA-9964
> URL: https://issues.apache.org/jira/browse/KAFKA-9964
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: Michael Bingham
>Priority: Minor
>
> The Javadocs for {{RoundRobinPartitioner}} currently state:
> {quote}This partitioning strategy can be used when user wants to distribute 
> the writes to all partitions equally
> {quote}
> In AK 2.4+, equal distribution is not guaranteed, even with this partitioner. 
> The enhancements to consider batching made with 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner]
>  affect this partitioner as well.
> So it would be useful to add some additional Javadocs to explain that unless 
> batching is disabled, even distribution of records is not guaranteed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-9964) Better description of RoundRobinPartitioner behavior for AK 2.4+

2023-07-20 Thread zhangzhisheng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744947#comment-17744947
 ] 

zhangzhisheng edited comment on KAFKA-9964 at 7/20/23 8:41 AM:
---

it's bug, pls follow [RoundRobinPartitioner broken by 
KIP-480|https://issues.apache.org/jira/browse/KAFKA-13303]


was (Author: zhangzs):
it's bug, pls follow RoundRobinPartitioner broken by KIP-480

> Better description of RoundRobinPartitioner behavior for AK 2.4+
> 
>
> Key: KAFKA-9964
> URL: https://issues.apache.org/jira/browse/KAFKA-9964
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: Michael Bingham
>Priority: Minor
>
> The Javadocs for {{RoundRobinPartitioner}} currently state:
> {quote}This partitioning strategy can be used when user wants to distribute 
> the writes to all partitions equally
> {quote}
> In AK 2.4+, equal distribution is not guaranteed, even with this partitioner. 
> The enhancements to consider batching made with 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner]
>  affect this partitioner as well.
> So it would be useful to add some additional Javadocs to explain that unless 
> batching is disabled, even distribution of records is not guaranteed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error

2023-07-20 Thread via GitHub


dajac commented on PR #14046:
URL: https://github.com/apache/kafka/pull/14046#issuecomment-1643522968

   @jolshan I was actually thinking about the `AuthorizerIntegrationTest` 
failures overnight and I found an issue with the `latestVersionUnstable` flag. 
Let me try to explain.
   
   The `latestVersionUnstable` is used on the broker side to ensure that an 
unreleased/unstable version is not exposed by the broker. That's fine. However, 
it does not guarantee that a client having an unreleased/unstable version is 
not going to use it.
   
   Let's take this change as an example. The version 9 will be shipped in the 
next release even if we don't want to use it because the schema is there. So 
the client knows about it and may use it if the broker eventually supports the 
version. The issue is that the release version my be different from the one 
shipped so the client would get an error.
   
   I have updated the `AbstractRequest.Builder` to be defensive and only 
consider stable versions. If the user wants to construct a request for an 
unstable version, it has to specify it explicitly. This guarantee that even if 
an unstable version is shipped, it is never used.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] muralibasani commented on pull request #13417: KAFKA-14585: Moving StorageTool from core to tools module

2023-07-20 Thread via GitHub


muralibasani commented on PR #13417:
URL: https://github.com/apache/kafka/pull/13417#issuecomment-1643531272

   > Thanks @muralibasani, let me know when you are ready for another review.
   
   @fvaleri made the necessary changes I believe. Pls take a look. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13931: KAFKA-8977: Remove MockStreamsMetrics since it is not a mock

2023-07-20 Thread via GitHub


cadonna commented on code in PR #13931:
URL: https://github.com/apache/kafka/pull/13931#discussion_r1265659668


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##
@@ -93,13 +99,19 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
-
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.eq;
+@RunWith(MockitoJUnitRunner.StrictStubs.class)

Review Comment:
   ```suggestion
   
   @RunWith(MockitoJUnitRunner.StrictStubs.class)
   ```



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##
@@ -186,57 +205,25 @@ public void cleanup() {
 public void shouldRecordRecordsAndBytesProduced() {
 final Headers headers = new RecordHeaders(new Header[]{new 
RecordHeader("key", "value".getBytes())});
 
-final String threadId = Thread.currentThread().getName();
-final String processorNodeId = sinkNodeName;
-final String topic = "topic";
-final Metric recordsProduced = streamsMetrics.metrics().get(
-new MetricName("records-produced-total",
-   TOPIC_LEVEL_GROUP,
-   "The total number of records produced from this 
topic",
-   streamsMetrics.topicLevelTagMap(threadId, 
taskId.toString(), processorNodeId, topic))
-);
-final Metric bytesProduced = streamsMetrics.metrics().get(
-new MetricName("bytes-produced-total",
-   TOPIC_LEVEL_GROUP,
-   "The total number of bytes produced from this 
topic",
-   streamsMetrics.topicLevelTagMap(threadId, 
taskId.toString(), processorNodeId, topic))
-);
-
-double totalRecords = 0D;
-double totalBytes = 0D;
+final MockedStatic topicMetrics = 
mockStatic(TopicMetrics.class);

Review Comment:
   If you use the static mock only in this method, I would suggest to use a 
`try-resource`-clause.
   
   ```java
   try (final MockedStatic topicMetrics = 
mockStatic(TopicMetrics.class)) {
   ...
   }
   ```
   without the `topicMetrics.close()` at end. 



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##
@@ -186,57 +205,25 @@ public void cleanup() {
 public void shouldRecordRecordsAndBytesProduced() {
 final Headers headers = new RecordHeaders(new Header[]{new 
RecordHeader("key", "value".getBytes())});
 
-final String threadId = Thread.currentThread().getName();
-final String processorNodeId = sinkNodeName;
-final String topic = "topic";
-final Metric recordsProduced = streamsMetrics.metrics().get(
-new MetricName("records-produced-total",
-   TOPIC_LEVEL_GROUP,
-   "The total number of records produced from this 
topic",
-   streamsMetrics.topicLevelTagMap(threadId, 
taskId.toString(), processorNodeId, topic))
-);
-final Metric bytesProduced = streamsMetrics.metrics().get(
-new MetricName("bytes-produced-total",
-   TOPIC_LEVEL_GROUP,
-   "The total number of bytes produced from this 
topic",
-   streamsMetrics.topicLevelTagMap(threadId, 
taskId.toString(), processorNodeId, topic))
-);
-
-double totalRecords = 0D;
-double totalBytes = 0D;
+final MockedStatic topicMetrics = 
mockStatic(TopicMetrics.class);
 
-assertThat(recordsProduced.metricValue(), equalTo(totalRecords));
-assertThat(bytesProduced.metricValue(), equalTo(totalBytes));
+when(TopicMetrics.producedSensor(
+Mockito.anyString(),
+Mockito.anyString(),
+Mockito.anyString(),
+Mockito.anyString(),
+Mockito.any(StreamsMetricsImpl.class)
+)).thenReturn(mockSensor);

Review Comment:
   nit:
   we usually use 4 spaces:
   ```suggestion
   when(TopicMetrics.producedSensor(
   Mockito.anyString(),
   Mockito.anyString(),
   Mockito.anyString(),
   Mockito.anyString(),
   Mockito.any(StreamsMetricsImpl.class)
   )).thenReturn(mockSensor);
   ```



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##
@@ -186,57 +205,25 @@ public void cleanup() {
 public void shouldRecordRecordsAndBytesProduced() {
 final Headers headers = new RecordHeaders(new Header[]{new 
RecordHeader("key", "value".getBytes())});
 
-final String threadId = Thread.currentThr

[GitHub] [kafka] dajac commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related

2023-07-20 Thread via GitHub


dajac commented on code in PR #14047:
URL: https://github.com/apache/kafka/pull/14047#discussion_r1269152813


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -386,6 +386,18 @@ public short groupMetadataValueVersion() {
 }
 }
 
+public short offsetCommitValueVersion() {
+if (isLessThan(MetadataVersion.IBP_2_1_IV0)) {
+return 1;
+} else if (isLessThan(MetadataVersion.IBP_2_1_IV1)) {
+return 2;
+} else {
+// Serialize with the highest supported non-flexible version
+// until a tagged field is introduced or the version is bumped.
+return  3;
+}

Review Comment:
   > > // Serialize with the highest supported non-flexible version
   > > // until a tagged field is introduced or the version is bumped.
   > 
   > This comment confused me a bit. Do we plan to manually update this method 
when new versions come in? Why is there a callout for flexible versions?
   
   This is related to 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-915%3A+Txn+and+Group+Coordinator+Downgrade+Foundation.
 Basically, we have already introduced the version 4 of this record to support 
tagged fields that may be added in the future. However, we don't want to use 
version 4 yet.
   
   When a new version is released or tagged fields are added, we will change 
the logic here to use the correct version based on the metadata version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related

2023-07-20 Thread via GitHub


dajac commented on code in PR #14047:
URL: https://github.com/apache/kafka/pull/14047#discussion_r1269155972


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+/**
+ * Represents a committed offset with its metadata.
+ */
+public class OffsetAndMetadata {
+public static final String NO_METADATA = "";
+
+/**
+ * The committed offset.
+ */
+public final long offset;
+
+/**
+ * The leader epoch in use when the offset was committed.
+ */
+public final OptionalInt leaderEpoch;

Review Comment:
   Correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related

2023-07-20 Thread via GitHub


dajac commented on code in PR #14047:
URL: https://github.com/apache/kafka/pull/14047#discussion_r1269156330


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+
+/**
+ * Represents a committed offset with its metadata.
+ */
+public class OffsetAndMetadata {
+public static final String NO_METADATA = "";

Review Comment:
   It will be used by a following patch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13983: KAFKA-14734: Use CommandDefaultOptions in StreamsResetter

2023-07-20 Thread via GitHub


cadonna commented on code in PR #13983:
URL: https://github.com/apache/kafka/pull/13983#discussion_r1269179717


##
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##
@@ -133,77 +112,69 @@ public class StreamsResetter {
 + "*** Warning! This tool makes irreversible changes to your 
application. It is strongly recommended that "
 + "you run this once with \"--dry-run\" to preview your changes 
before making them.\n\n";
 
-private OptionSet options = null;
 private final List allTopics = new LinkedList<>();
 
-
-public int run(final String[] args) {
-return run(args, new Properties());
+public static void main(final String[] args) {
+Exit.exit(new StreamsResetter().execute(args));
 }
 
-public int run(final String[] args,
-   final Properties config) {
-int exitCode;
+public int execute(final String[] args) {
+return execute(args, new Properties());
+}
 
-Admin adminClient = null;
+public int execute(final String[] args, final Properties config) {
 try {
-parseArguments(args);
-
-final boolean dryRun = options.has(dryRunOption);
+StreamsResetterOptions options = new StreamsResetterOptions(args);
 
-final String groupId = options.valueOf(applicationIdOption);
-final Properties properties = new Properties();
-if (options.has(commandConfigOption)) {
-
properties.putAll(Utils.loadProps(options.valueOf(commandConfigOption)));
+String groupId = options.applicationId();

Review Comment:
   > streams used to have this checkstyle that all variables should be final
   
   Yeah, we still have that checkstyle rule. The checkstyle rule is not 
perfect, but it is the best you can get in java to make sure variable are 
changed inadvertently. Unfortunately, changes to a referenced object are not 
affected by final. That is the "not perfect" part.
   
   Regarding applying the rule to the tools package, I would be in favor. 
However, it should not be part of this PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13983: KAFKA-14734: Use CommandDefaultOptions in StreamsResetter

2023-07-20 Thread via GitHub


cadonna commented on code in PR #13983:
URL: https://github.com/apache/kafka/pull/13983#discussion_r1269179717


##
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##
@@ -133,77 +112,69 @@ public class StreamsResetter {
 + "*** Warning! This tool makes irreversible changes to your 
application. It is strongly recommended that "
 + "you run this once with \"--dry-run\" to preview your changes 
before making them.\n\n";
 
-private OptionSet options = null;
 private final List allTopics = new LinkedList<>();
 
-
-public int run(final String[] args) {
-return run(args, new Properties());
+public static void main(final String[] args) {
+Exit.exit(new StreamsResetter().execute(args));
 }
 
-public int run(final String[] args,
-   final Properties config) {
-int exitCode;
+public int execute(final String[] args) {
+return execute(args, new Properties());
+}
 
-Admin adminClient = null;
+public int execute(final String[] args, final Properties config) {
 try {
-parseArguments(args);
-
-final boolean dryRun = options.has(dryRunOption);
+StreamsResetterOptions options = new StreamsResetterOptions(args);
 
-final String groupId = options.valueOf(applicationIdOption);
-final Properties properties = new Properties();
-if (options.has(commandConfigOption)) {
-
properties.putAll(Utils.loadProps(options.valueOf(commandConfigOption)));
+String groupId = options.applicationId();

Review Comment:
   > streams used to have this checkstyle that all variables should be final
   
   Yeah, we still have that checkstyle rule. The checkstyle rule is not 
perfect, but it is the best you can get in java to make sure variable are not 
changed inadvertently. Unfortunately, changes to a referenced object are not 
affected by final. That is the "not perfect" part.
   
   Regarding applying the rule to the tools package, I would be in favor. 
However, it should not be part of this PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #14045: MINOR: refactor(storage): topic-based RLMM consumer-manager/task related improvements

2023-07-20 Thread via GitHub


satishd commented on code in PR #14045:
URL: https://github.com/apache/kafka/pull/14045#discussion_r1269184601


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -353,4 +358,10 @@ public void close() {
 }
 }
 }
+
+public Set metadataPartitionsAssigned() {
+return assignedMetaPartitions.stream()

Review Comment:
   Can we return an immutable `Set` of `assignedMetaPartitions` instead of 
topic-partitions as the topic names are repetitive? This is currently being 
passed to build a string of partitions in an Exception message 
[here](https://github.com/apache/kafka/pull/14045/files#diff-7d10de4e1fdb683a452acb203d1ee92b64539a2b0b19533a6abc42fa97b2f04aR115).
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14057: KAFKA-15194-Prepend-Offset-as-Filename

2023-07-20 Thread via GitHub


divijvaidya commented on code in PR #14057:
URL: https://github.com/apache/kafka/pull/14057#discussion_r1269189609


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java:
##
@@ -399,20 +403,21 @@ public Verifier(final LocalTieredStorage remoteStorage, 
final TopicIdPartition t
 this.topicIdPartition = requireNonNull(topicIdPartition);
 }
 
-private List expectedPaths(final RemoteLogSegmentId id) {
+private List expectedPaths(final RemoteLogSegmentMetadata 
metadata) {
 final String rootPath = getStorageRootDirectory();
 TopicPartition tp = topicIdPartition.topicPartition();
 final String topicPartitionSubpath = format("%s-%d-%s", 
tp.topic(), tp.partition(),
 topicIdPartition.topicId());
-final String uuid = id.id().toString();
+final String uuid = metadata.remoteLogSegmentId().id().toString();
+final long startOffset = metadata.startOffset();
 
 return Arrays.asList(
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.LOG_FILE_SUFFIX),
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.INDEX_FILE_SUFFIX),
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.TIME_INDEX_FILE_SUFFIX),
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.TXN_INDEX_FILE_SUFFIX),
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LEADER_EPOCH_CHECKPOINT.getSuffix()),
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX)
+Paths.get(rootPath, topicPartitionSubpath, startOffset + 
"-" + uuid + LogFileUtils.LOG_FILE_SUFFIX),

Review Comment:
   Ideally, we want the test implementation to be as close to the actual log 
file implementation as possible. Considering that, could we use 
`LogFileUtils#logFile(File dir, long offset)` here? Same for index file names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #14040: KAFKA-15212: Delete Classgraph-MIT license

2023-07-20 Thread via GitHub


divijvaidya commented on PR #14040:
URL: https://github.com/apache/kafka/pull/14040#issuecomment-1643579946

   Unrelated test failures. Merging this in.
   
   ```
   [Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14040/2/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_20_and_Scala_2_13___testBalancePartitionLeaders__/)
   [Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testOffsetTranslationBehindReplicationFlow()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14040/2/testReport/junit/org.apache.kafka.connect.mirror.integration/IdentityReplicationIntegrationTest/Build___JDK_8_and_Scala_2_12___testOffsetTranslationBehindReplicationFlow__/)
   [Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testOffsetSyncsTopicsOnTarget()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14040/2/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationBaseTest/Build___JDK_8_and_Scala_2_12___testOffsetSyncsTopicsOnTarget__/)
   [Build / JDK 8 and Scala 2.12 / 
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14040/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_8_and_Scala_2_12___testRackAwareRangeAssignor__/)
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testReplicateSourceDefault()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14040/2/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationTransactionsTest/Build___JDK_11_and_Scala_2_13___testReplicateSourceDefault__/)
   [Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14040/2/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_11_and_Scala_2_13___testBalancePartitionLeaders__/)
   [Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14040/2/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationBaseTest/Build___JDK_17_and_Scala_2_13___testSyncTopicConfigs__/)
   [Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14040/2/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationExactlyOnceTest/Build___JDK_17_and_Scala_2_13___testOffsetTranslationBehindReplicationFlow__/)
   [Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.tools.MetadataQuorumCommandTest.[6] Type=Raft-Isolated, 
Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.6-IV0, 
Security=PLAINTEXT](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14040/2/testReport/junit/org.apache.kafka.tools/MetadataQuorumCommandTest/Build___JDK_17_and_Scala_2_136__Type_Raft_Isolated__Name_testDescribeQuorumReplicationSuccessful__MetadataVersion_3_6_IV0__Security_PLAINTEXT/)
   [Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.tools.MetadataQuorumCommandTest.[2] Type=Raft-Isolated, 
Name=testDescribeQuorumStatusSuccessful, MetadataVersion=3.6-IV0, 
Security=PLAINTEXT](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14040/2/testReport/junit/org.apache.kafka.tools/MetadataQuorumCommandTest/Build___JDK_17_and_Scala_2_132__Type_Raft_Isolated__Name_testDescribeQuorumStatusSuccessful__MetadataVersion_3_6_IV0__Security_PLAINTEXT/)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya merged pull request #14040: KAFKA-15212: Delete Classgraph-MIT license

2023-07-20 Thread via GitHub


divijvaidya merged PR #14040:
URL: https://github.com/apache/kafka/pull/14040


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15212) Remove unneeded classgraph license file

2023-07-20 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya resolved KAFKA-15212.
--
  Reviewer: Divij Vaidya
Resolution: Fixed

> Remove unneeded classgraph license file
> ---
>
> Key: KAFKA-15212
> URL: https://issues.apache.org/jira/browse/KAFKA-15212
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Assignee: Tanay Karmarkar
>Priority: Major
>  Labels: newbie
> Fix For: 3.6.0
>
>
> The license file for classgraph can be completely removed from here: 
> [https://github.com/apache/kafka/blob/trunk/licenses/classgraph-MIT] since it 
> is not a dependency of Kafka any more.
> The associated package was removed from license at 
> [https://github.com/apache/kafka/commit/6cf4a2eaa7a436f0233aece49ed81bafe64262c4]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13983: KAFKA-14734: Use CommandDefaultOptions in StreamsResetter

2023-07-20 Thread via GitHub


vamossagar12 commented on code in PR #13983:
URL: https://github.com/apache/kafka/pull/13983#discussion_r1269194729


##
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##
@@ -133,77 +112,69 @@ public class StreamsResetter {
 + "*** Warning! This tool makes irreversible changes to your 
application. It is strongly recommended that "
 + "you run this once with \"--dry-run\" to preview your changes 
before making them.\n\n";
 
-private OptionSet options = null;
 private final List allTopics = new LinkedList<>();
 
-
-public int run(final String[] args) {
-return run(args, new Properties());
+public static void main(final String[] args) {
+Exit.exit(new StreamsResetter().execute(args));
 }
 
-public int run(final String[] args,
-   final Properties config) {
-int exitCode;
+public int execute(final String[] args) {
+return execute(args, new Properties());
+}
 
-Admin adminClient = null;
+public int execute(final String[] args, final Properties config) {
 try {
-parseArguments(args);
-
-final boolean dryRun = options.has(dryRunOption);
+StreamsResetterOptions options = new StreamsResetterOptions(args);
 
-final String groupId = options.valueOf(applicationIdOption);
-final Properties properties = new Properties();
-if (options.has(commandConfigOption)) {
-
properties.putAll(Utils.loadProps(options.valueOf(commandConfigOption)));
+String groupId = options.applicationId();

Review Comment:
   >  However, it should not be part of this PR.
   
   Thanks Bruno, I was about to comment the same thing. We can track it 
separately @fvaleri.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #14032: MINOR: Upgrade Gradle wrapper version to 8.2.1

2023-07-20 Thread via GitHub


divijvaidya commented on PR #14032:
URL: https://github.com/apache/kafka/pull/14032#issuecomment-1643590831

   Interestingly the failure rate has gone down today. I guess we will 
encounter it every time we upgrade gradle?! Let's wait and observer for another 
24 hours.
   
   Separately, upgrading Zinc is definitely a good idea.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi opened a new pull request, #14058: KAFKA-15129;[10/N] Remove metrics in log when broker shutdown

2023-07-20 Thread via GitHub


hudeqi opened a new pull request, #14058:
URL: https://github.com/apache/kafka/pull/14058

   This pr is used to remove the metrics in log when broker shutdown.
   This pr has passed the corresponding unit test, and it is part of 
[KAFKA-15129](https://issues.apache.org/jira/browse/KAFKA-15129).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13998: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-20 Thread via GitHub


dajac commented on code in PR #13998:
URL: https://github.com/apache/kafka/pull/13998#discussion_r1269199183


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java:
##
@@ -52,33 +42,22 @@ public Map members() {
 return members;
 }
 
-/**
- * @return Topic metadata keyed by topic Ids.
- */
-public Map topics() {
-return topics;
-}
-
 @Override
 public boolean equals(Object o) {
 if (this == o) return true;
-if (o == null || getClass() != o.getClass()) return false;
+if (!(o instanceof AssignmentSpec)) return false;

Review Comment:
   nit: This line change is not necessary. Let's revert it.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicDescriber.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Set;
+
+/**
+ * The assignment topic describer is used by the {@link PartitionAssignor}
+ * to obtain topic and partition metadata of subscribed topics.
+ *
+ * The interface is kept in an internal module until KIP-848 is fully
+ * implemented and ready to be released.
+ */
+@InterfaceStability.Unstable
+public interface AssignmentTopicDescriber {

Review Comment:
   nit: I wonder if `SubscribedTopicDescriber` would be better based on the 
javadoc.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicDescriber.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Set;
+
+/**
+ * The assignment topic describer is used by the {@link PartitionAssignor}
+ * to obtain topic and partition metadata of subscribed topics.
+ *
+ * The interface is kept in an internal module until KIP-848 is fully
+ * implemented and ready to be released.
+ */
+@InterfaceStability.Unstable
+public interface AssignmentTopicDescriber {
+
+/**
+ * Returns a set of subscribed topicIds.
+ *
+ * @return Set of topicIds corresponding to the subscribed topics.
+ */
+Set subscribedTopicIds();
+
+/**
+ * Number of partitions for the given topicId.

Review Comment:
   nit: `topicIds` -> `topic id`.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicDescriber.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific l

[GitHub] [kafka] vamossagar12 commented on a diff in pull request #14051: KAFKA-15218: Avoid NPE thrown while deleting topic and fetch from follower concurrently

2023-07-20 Thread via GitHub


vamossagar12 commented on code in PR #14051:
URL: https://github.com/apache/kafka/pull/14051#discussion_r1269230642


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -998,7 +998,13 @@ class Partition(val topicPartition: TopicPartition,
   // 3. Its metadata cached broker epoch matches its Fetch request broker 
epoch. Or the Fetch
   //request broker epoch is -1 which bypasses the epoch verification.
   case kRaftMetadataCache: KRaftMetadataCache =>
-val storedBrokerEpoch = 
remoteReplicasMap.get(followerReplicaId).stateSnapshot.brokerEpoch
+val mayBeReplica = getReplica(followerReplicaId)
+// The topic is already deleted and we don't have any replica 
information. In this case, we can return false
+// so as to avoid NPE
+if (mayBeReplica.isEmpty) {
+  return false

Review Comment:
   Makes sense. Added the suggested logline.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #14051: KAFKA-15218: Avoid NPE thrown while deleting topic and fetch from follower concurrently

2023-07-20 Thread via GitHub


vamossagar12 commented on PR #14051:
URL: https://github.com/apache/kafka/pull/14051#issuecomment-1643633746

   Thanks @showuon I did consider writing a test but felt since this is looks 
like a race  condition i.e fetch request from follower coming in around the 
same time the `remoteReplicasMap` gets cleared, it seemed hard to replicate in 
unit tests. I did try updating  the test you suggested to 
   
   1) Execute a partition.delete and try to do a fetchfollower. It fails with 
`NotLeaderOrFollowerException`.
   2) Execute partition.delete on a separate thread and executing the 
fetchFollower on the main Test thread. Even in that case, I got `Replica not 
Found`.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-12525) Inaccurate task status due to status record interleaving in fast rebalances in Connect

2023-07-20 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-12525.
---
Resolution: Fixed

> Inaccurate task status due to status record interleaving in fast rebalances 
> in Connect
> --
>
> Key: KAFKA-12525
> URL: https://issues.apache.org/jira/browse/KAFKA-12525
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.1, 2.4.1, 2.5.1, 2.7.0, 2.6.1
>Reporter: Konstantine Karantasis
>Assignee: Sagar Rao
>Priority: Major
>
> When a task is stopped in Connect it produces an {{UNASSIGNED}} status 
> record. 
> Equivalently, when a task is started or restarted in Connect it produces an 
> {{RUNNING}} status record in the Connect status topic.
> At the same time rebalances are decoupled from task start and stop. These 
> operations happen in separate executor outside of the main worker thread that 
> performs the rebalance.
> Normally, any delayed and stale {{UNASSIGNED}} status records are fenced by 
> the worker that is sending them. This worker is using the 
> {{StatusBackingStore#putSafe}} method that will reject any stale status 
> messages (called only for {{UNASSIGNED}} or {{FAILED}}) as long as the worker 
> is aware of the newer status record that declares a task as {{RUNNING}}.
> In cases of fast consecutive rebalances where a task is revoked from one 
> worker and assigned to another one, it has been observed that there is a 
> small time window and thus a race condition during which a {{RUNNING}} status 
> record in the new generation is produced and is immediately followed by a 
> delayed {{UNASSIGNED}} status record belonging to the same or a previous 
> generation before the worker that sends this message reads the {{RUNNING}} 
> status record that corresponds to the latest generation.
> A couple of options are available to remediate this race condition. 
> For example a worker that is has started a task can re-write the {{RUNNING}} 
> status message in the topic if it reads a stale {{UNASSIGNED}} message from a 
> previous generation (that should have been fenced). 
> Another option is to ignore stale {{UNASSIGNED}} message (messages from an 
> earlier generation than the one in which the task had {{RUNNING}} status).
> Worth noting that when this race condition takes place, besides the 
> inaccurate status representation, the actual execution of the tasks remains 
> unaffected (e.g. the tasks are running correctly even though they appear as 
> {{UNASSIGNED}}). 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] fvaleri opened a new pull request, #14059: KAFKA-14583: Move ReplicaVerificationTool to tools

2023-07-20 Thread via GitHub


fvaleri opened a new pull request, #14059:
URL: https://github.com/apache/kafka/pull/14059

   Added --bootstrap-sever to align with other tools, --broker-list is now 
deprecated. Added warnings for all deprecated options, that will be removed in 
the next major release. Updated replica_verification_tool.py to use the wrapper 
script rather than the class name. Ran replica_verification_testi.py system 
test.
   
   The system test is a bit flaky and the migration doesn't change this. I'll 
play more around it to see if it can be improved.
   
   
http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/3.4/2023-01-31--001.system-test-kafka-3.4--1675184554--confluentinc--3.4--ef3f5bd834/report.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13983: KAFKA-14734: Use CommandDefaultOptions in StreamsResetter

2023-07-20 Thread via GitHub


fvaleri commented on code in PR #13983:
URL: https://github.com/apache/kafka/pull/13983#discussion_r1269261271


##
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##
@@ -133,77 +112,69 @@ public class StreamsResetter {
 + "*** Warning! This tool makes irreversible changes to your 
application. It is strongly recommended that "
 + "you run this once with \"--dry-run\" to preview your changes 
before making them.\n\n";
 
-private OptionSet options = null;
 private final List allTopics = new LinkedList<>();
 
-
-public int run(final String[] args) {
-return run(args, new Properties());
+public static void main(final String[] args) {
+Exit.exit(new StreamsResetter().execute(args));
 }
 
-public int run(final String[] args,
-   final Properties config) {
-int exitCode;
+public int execute(final String[] args) {
+return execute(args, new Properties());
+}
 
-Admin adminClient = null;
+public int execute(final String[] args, final Properties config) {
 try {
-parseArguments(args);
-
-final boolean dryRun = options.has(dryRunOption);
+StreamsResetterOptions options = new StreamsResetterOptions(args);
 
-final String groupId = options.valueOf(applicationIdOption);
-final Properties properties = new Properties();
-if (options.has(commandConfigOption)) {
-
properties.putAll(Utils.loadProps(options.valueOf(commandConfigOption)));
+String groupId = options.applicationId();

Review Comment:
   Ok, do you have any further comments on this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13983: KAFKA-14734: Use CommandDefaultOptions in StreamsResetter

2023-07-20 Thread via GitHub


fvaleri commented on code in PR #13983:
URL: https://github.com/apache/kafka/pull/13983#discussion_r1269261271


##
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##
@@ -133,77 +112,69 @@ public class StreamsResetter {
 + "*** Warning! This tool makes irreversible changes to your 
application. It is strongly recommended that "
 + "you run this once with \"--dry-run\" to preview your changes 
before making them.\n\n";
 
-private OptionSet options = null;
 private final List allTopics = new LinkedList<>();
 
-
-public int run(final String[] args) {
-return run(args, new Properties());
+public static void main(final String[] args) {
+Exit.exit(new StreamsResetter().execute(args));
 }
 
-public int run(final String[] args,
-   final Properties config) {
-int exitCode;
+public int execute(final String[] args) {
+return execute(args, new Properties());
+}
 
-Admin adminClient = null;
+public int execute(final String[] args, final Properties config) {
 try {
-parseArguments(args);
-
-final boolean dryRun = options.has(dryRunOption);
+StreamsResetterOptions options = new StreamsResetterOptions(args);
 
-final String groupId = options.valueOf(applicationIdOption);
-final Properties properties = new Properties();
-if (options.has(commandConfigOption)) {
-
properties.putAll(Utils.loadProps(options.valueOf(commandConfigOption)));
+String groupId = options.applicationId();

Review Comment:
   Ok, thanks for taking a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15218) NPE will be thrown while deleting topic and fetch from follower concurrently

2023-07-20 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-15218:
-

Assignee: Sagar Rao

> NPE will be thrown while deleting topic and fetch from follower concurrently
> 
>
> Key: KAFKA-15218
> URL: https://issues.apache.org/jira/browse/KAFKA-15218
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0
>Reporter: Luke Chen
>Assignee: Sagar Rao
>Priority: Major
>
> When deleting topics, we'll first clear all the remoteReplicaMap when 
> stopPartitions 
> [here|https://github.com/apache/kafka/blob/2999168cde37142ae3a2377fe939d6b581e692b8/core/src/main/scala/kafka/server/ReplicaManager.scala#L554].
>  But this time, there might be fetch request coming from follower, and try to 
> check if the replica is eligible to be added into ISR 
> [here|https://github.com/apache/kafka/blob/2999168cde37142ae3a2377fe939d6b581e692b8/core/src/main/scala/kafka/cluster/Partition.scala#L1001].
>  At this moment, NPE will be thrown. Although it's fine since this topic is 
> already deleted, it'd be better to avoid it happen.
>  
>  
> {code:java}
> java.lang.NullPointerException: Cannot invoke 
> "kafka.cluster.Replica.stateSnapshot()" because the return value of 
> "kafka.utils.Pool.get(Object)" is null  at 
> kafka.cluster.Partition.isReplicaIsrEligible(Partition.scala:992) 
> ~[kafka_2.13-3.5.0.jar:?]  at 
> kafka.cluster.Partition.canAddReplicaToIsr(Partition.scala:974) 
> ~[kafka_2.13-3.5.0.jar:?]at 
> kafka.cluster.Partition.maybeExpandIsr(Partition.scala:947) 
> ~[kafka_2.13-3.5.0.jar:?]at 
> kafka.cluster.Partition.updateFollowerFetchState(Partition.scala:866) 
> ~[kafka_2.13-3.5.0.jar:?]  at 
> kafka.cluster.Partition.fetchRecords(Partition.scala:1361) 
> ~[kafka_2.13-3.5.0.jar:?] at 
> kafka.server.ReplicaManager.read$1(ReplicaManager.scala:1164) 
> ~[kafka_2.13-3.5.0.jar:?]  at 
> kafka.server.ReplicaManager.$anonfun$readFromLocalLog$7(ReplicaManager.scala:1235)
>  ~[kafka_2.13-3.5.0.jar:?] at 
> scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575) 
> ~[scala-library-2.13.10.jar:?]  at 
> scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573) 
> ~[scala-library-2.13.10.jar:?] at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:933) 
> ~[scala-library-2.13.10.jar:?] at 
> kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:1234) 
> ~[kafka_2.13-3.5.0.jar:?]at 
> kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:1044) 
> ~[kafka_2.13-3.5.0.jar:?]   at 
> kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:994) 
> ~[kafka_2.13-3.5.0.jar:?] at 
> kafka.server.KafkaApis.handle(KafkaApis.scala:181) ~[kafka_2.13-3.5.0.jar:?] 
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76) 
> ~[kafka_2.13-3.5.0.jar:?] at java.lang.Thread.run(Thread.java:1623) [?:?] 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13983: KAFKA-14734: Use CommandDefaultOptions in StreamsResetter

2023-07-20 Thread via GitHub


vamossagar12 commented on code in PR #13983:
URL: https://github.com/apache/kafka/pull/13983#discussion_r1269262979


##
tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java:
##
@@ -133,77 +112,69 @@ public class StreamsResetter {
 + "*** Warning! This tool makes irreversible changes to your 
application. It is strongly recommended that "
 + "you run this once with \"--dry-run\" to preview your changes 
before making them.\n\n";
 
-private OptionSet options = null;
 private final List allTopics = new LinkedList<>();
 
-
-public int run(final String[] args) {
-return run(args, new Properties());
+public static void main(final String[] args) {
+Exit.exit(new StreamsResetter().execute(args));
 }
 
-public int run(final String[] args,
-   final Properties config) {
-int exitCode;
+public int execute(final String[] args) {
+return execute(args, new Properties());
+}
 
-Admin adminClient = null;
+public int execute(final String[] args, final Properties config) {
 try {
-parseArguments(args);
-
-final boolean dryRun = options.has(dryRunOption);
+StreamsResetterOptions options = new StreamsResetterOptions(args);
 
-final String groupId = options.valueOf(applicationIdOption);
-final Properties properties = new Properties();
-if (options.has(commandConfigOption)) {
-
properties.putAll(Utils.loadProps(options.valueOf(commandConfigOption)));
+String groupId = options.applicationId();

Review Comment:
   Nope. I already approved it :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15222) Upgrade zinc scala incremental compiler plugin version to a latests stable fit version

2023-07-20 Thread Said BOUDJELDA (Jira)
Said BOUDJELDA created KAFKA-15222:
--

 Summary: Upgrade zinc scala incremental compiler plugin version to 
a latests stable fit version 
 Key: KAFKA-15222
 URL: https://issues.apache.org/jira/browse/KAFKA-15222
 Project: Kafka
  Issue Type: Improvement
  Components: build, tools
Reporter: Said BOUDJELDA
Assignee: Said BOUDJELDA






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15222) Upgrade zinc scala incremental compiler plugin version to a latests stable fit version (1.9.2)

2023-07-20 Thread Said BOUDJELDA (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Said BOUDJELDA updated KAFKA-15222:
---
Summary: Upgrade zinc scala incremental compiler plugin version to a 
latests stable fit version (1.9.2)  (was: Upgrade zinc scala incremental 
compiler plugin version to a latests stable fit version )

> Upgrade zinc scala incremental compiler plugin version to a latests stable 
> fit version (1.9.2)
> --
>
> Key: KAFKA-15222
> URL: https://issues.apache.org/jira/browse/KAFKA-15222
> Project: Kafka
>  Issue Type: Improvement
>  Components: build, tools
>Reporter: Said BOUDJELDA
>Assignee: Said BOUDJELDA
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] bmscomp opened a new pull request, #14060: KAFKA-15222: Upgrade zinc Scala incremental compiler plugin version to a latests stable fit version (1.9.2)

2023-07-20 Thread via GitHub


bmscomp opened a new pull request, #14060:
URL: https://github.com/apache/kafka/pull/14060

   The existing version of zinc incremental scala compiler plugin is getting a 
bit old, upgrading to last stable version 1.9.2
   
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13983: KAFKA-14734: Use CommandDefaultOptions in StreamsResetter

2023-07-20 Thread via GitHub


showuon commented on PR #13983:
URL: https://github.com/apache/kafka/pull/13983#issuecomment-1643688414

   Failed tests are unrelated:
   ```
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.tools.MetadataQuorumCommandTest.[1] Type=Raft-Combined, 
Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.6-IV0, 
Security=PLAINTEXT
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testReplication()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
   Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #13983: KAFKA-14734: Use CommandDefaultOptions in StreamsResetter

2023-07-20 Thread via GitHub


showuon merged PR #13983:
URL: https://github.com/apache/kafka/pull/13983


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors

2023-07-20 Thread via GitHub


mimaison commented on code in PR #14005:
URL: https://github.com/apache/kafka/pull/14005#discussion_r1269268948


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -65,27 +72,170 @@ static String encodeTopicPartition(TopicPartition 
topicPartition) {
 
 static Map wrapPartition(TopicPartition topicPartition, 
String sourceClusterAlias) {
 Map wrapped = new HashMap<>();
-wrapped.put("topic", topicPartition.topic());
-wrapped.put("partition", topicPartition.partition());
-wrapped.put("cluster", sourceClusterAlias);
+wrapped.put(TOPIC_KEY, topicPartition.topic());
+wrapped.put(PARTITION_KEY, topicPartition.partition());
+wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias);
 return wrapped;
 }
 
-static Map wrapOffset(long offset) {
-return Collections.singletonMap("offset", offset);
+public static Map wrapOffset(long offset) {
+return Collections.singletonMap(OFFSET_KEY, offset);
 }
 
-static TopicPartition unwrapPartition(Map wrapped) {
-String topic = (String) wrapped.get("topic");
-int partition = (Integer) wrapped.get("partition");
+public static TopicPartition unwrapPartition(Map wrapped) {
+String topic = (String) wrapped.get(TOPIC_KEY);
+int partition = (Integer) wrapped.get(PARTITION_KEY);
 return new TopicPartition(topic, partition);
 }
 
 static Long unwrapOffset(Map wrapped) {
-if (wrapped == null || wrapped.get("offset") == null) {
+if (wrapped == null || wrapped.get(OFFSET_KEY) == null) {
 return -1L;
 }
-return (Long) wrapped.get("offset");
+return (Long) wrapped.get(OFFSET_KEY);
+}
+
+
+/**
+ * Validate a specific key in a source partition that may be written to 
the offsets topic for one of the MM2 connectors.
+ * This method ensures that the key is present in the source partition map 
and that its value is a string.
+ *
+ * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, 
Map)
+ * @see SourceRecord#sourcePartition()
+ *
+ * @param sourcePartition the to-be-validated source partition; may not be 
null
+ * @param key the key to check for in the source partition; may be null

Review Comment:
   It's not public API, all existing callers pass a non-null value and having 
null should cause an NPE below in get(), so maybe we can remove `may be null`?



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -65,27 +72,170 @@ static String encodeTopicPartition(TopicPartition 
topicPartition) {
 
 static Map wrapPartition(TopicPartition topicPartition, 
String sourceClusterAlias) {
 Map wrapped = new HashMap<>();
-wrapped.put("topic", topicPartition.topic());
-wrapped.put("partition", topicPartition.partition());
-wrapped.put("cluster", sourceClusterAlias);
+wrapped.put(TOPIC_KEY, topicPartition.topic());
+wrapped.put(PARTITION_KEY, topicPartition.partition());
+wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias);
 return wrapped;
 }
 
-static Map wrapOffset(long offset) {
-return Collections.singletonMap("offset", offset);
+public static Map wrapOffset(long offset) {
+return Collections.singletonMap(OFFSET_KEY, offset);
 }
 
-static TopicPartition unwrapPartition(Map wrapped) {
-String topic = (String) wrapped.get("topic");
-int partition = (Integer) wrapped.get("partition");
+public static TopicPartition unwrapPartition(Map wrapped) {
+String topic = (String) wrapped.get(TOPIC_KEY);
+int partition = (Integer) wrapped.get(PARTITION_KEY);
 return new TopicPartition(topic, partition);
 }
 
 static Long unwrapOffset(Map wrapped) {
-if (wrapped == null || wrapped.get("offset") == null) {
+if (wrapped == null || wrapped.get(OFFSET_KEY) == null) {
 return -1L;
 }
-return (Long) wrapped.get("offset");
+return (Long) wrapped.get(OFFSET_KEY);
+}
+
+
+/**
+ * Validate a specific key in a source partition that may be written to 
the offsets topic for one of the MM2 connectors.
+ * This method ensures that the key is present in the source partition map 
and that its value is a string.
+ *
+ * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, 
Map)
+ * @see SourceRecord#sourcePartition()
+ *
+ * @param sourcePartition the to-be-validated source partition; may not be 
null
+ * @param key the key to check for in the source partition; may be null
+ *
+ * @throws ConnectException if the offset is invalid
+ */
+static void validateSourcePartitionString(Map sourcePartition, 
String key) {
+Objects.requireNonNull(sourcePartition, "Source partition may not be 
null

[jira] [Commented] (KAFKA-15222) Upgrade zinc scala incremental compiler plugin version to a latests stable fit version (1.9.2)

2023-07-20 Thread Said BOUDJELDA (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17745034#comment-17745034
 ] 

Said BOUDJELDA commented on KAFKA-15222:


I made a small pull request for this Jira 
[https://github.com/apache/kafka/pull/14060] 

> Upgrade zinc scala incremental compiler plugin version to a latests stable 
> fit version (1.9.2)
> --
>
> Key: KAFKA-15222
> URL: https://issues.apache.org/jira/browse/KAFKA-15222
> Project: Kafka
>  Issue Type: Improvement
>  Components: build, tools
>Reporter: Said BOUDJELDA
>Assignee: Said BOUDJELDA
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15222) Upgrade zinc scala incremental compiler plugin version to a latests stable fit version (1.9.2)

2023-07-20 Thread Said BOUDJELDA (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Said BOUDJELDA updated KAFKA-15222:
---
Docs Text: 
Upgrading the version of zinc incremental scala compiler plugin, is far a good 
idea, since we already have an issue related to zinc probably every Gradle 
version upgrade since much older versions, 

>From the current version 1.8.0 to version 1.9.2 there may be some enhancements 
>there is much improvements and dependency upgrades 

Check this link : https://github.com/sbt/zinc/compare/v1.8.0...v1.9.2


This upgrade is too minor, and consists only on changing the version if zinc, 
it can be tricky since it's related to the building process and needs to be 
sure every things is working well, and no regression will be caused on build 
process






  was:
Upgrading the version of zinc incremental scala compiler plugin, is far a good 
idea, since we already have an issue related to zinc probably every Gradle 
version upgrade since much older versions, 

>From the current version 1.8.0 to version 1.9.2 there may be some enhancements 
>there is much improvements and dependency upgrades 

Check this link : https://github.com/sbt/zinc/compare/v1.8.0...v1.9.2






> Upgrade zinc scala incremental compiler plugin version to a latests stable 
> fit version (1.9.2)
> --
>
> Key: KAFKA-15222
> URL: https://issues.apache.org/jira/browse/KAFKA-15222
> Project: Kafka
>  Issue Type: Improvement
>  Components: build, tools
>Reporter: Said BOUDJELDA
>Assignee: Said BOUDJELDA
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeqo commented on a diff in pull request #14045: MINOR: refactor(storage): topic-based RLMM consumer-manager/task related improvements

2023-07-20 Thread via GitHub


jeqo commented on code in PR #14045:
URL: https://github.com/apache/kafka/pull/14045#discussion_r1269305190


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java:
##
@@ -353,4 +358,10 @@ public void close() {
 }
 }
 }
+
+public Set metadataPartitionsAssigned() {
+return assignedMetaPartitions.stream()

Review Comment:
   Sure, applying suggestion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bmscomp opened a new pull request, #14061: MINOR: Add jdk 20 to list of jdks that can build scala in README.md file

2023-07-20 Thread via GitHub


bmscomp opened a new pull request, #14061:
URL: https://github.com/apache/kafka/pull/14061

   Adding the value 20 to the JDK version that can build Apache Kafka into 
README.md 
   
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #14060: KAFKA-15222: Upgrade zinc Scala incremental compiler plugin version to a latests stable fit version (1.9.2)

2023-07-20 Thread via GitHub


divijvaidya commented on PR #14060:
URL: https://github.com/apache/kafka/pull/14060#issuecomment-1643732810

   Please add link to release notes explaining the difference amongst version.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bmscomp commented on pull request #14060: KAFKA-15222: Upgrade zinc Scala incremental compiler plugin version to a latests stable fit version (1.9.2)

2023-07-20 Thread via GitHub


bmscomp commented on PR #14060:
URL: https://github.com/apache/kafka/pull/14060#issuecomment-1643741474

   @divijvaidya I am working on it right now, I have a good resource 
https://github.com/sbt/zinc/compare/v1.8.0...v1.9.2 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15223) Need clarity in documentation for upgrade/downgrade across releases.

2023-07-20 Thread kaushik srinivas (Jira)
kaushik srinivas created KAFKA-15223:


 Summary: Need clarity in documentation for upgrade/downgrade 
across releases.
 Key: KAFKA-15223
 URL: https://issues.apache.org/jira/browse/KAFKA-15223
 Project: Kafka
  Issue Type: Improvement
Reporter: kaushik srinivas


Referring to the upgrade documentation for apache kafka.

[https://kafka.apache.org/34/documentation.html#upgrade_3_4_0]

There is confusion with respect to below statements from the above sectioned 
link of apache docs.

"If you are upgrading from a version prior to 2.1.x, please see the note below 
about the change to the schema used to store consumer offsets. *Once you have 
changed the inter.broker.protocol.version to the latest version, it will not be 
possible to downgrade to a version prior to 2.1."*

The above statement mentions that the downgrade would not be possible to 
version prior to "2.1" in case of "upgrading the inter.broker.protocol.version 
to the latest version".

But, there is another statement made in the documentation in *point 4* as below

"Restart the brokers one by one for the new protocol version to take effect. 
{*}Once the brokers begin using the latest protocol version, it will no longer 
be possible to downgrade the cluster to an older version.{*}"

 

These two statements are repeated across a lot of prior release of kafka and is 
confusing.

Below are the questions:
 # Is downgrade not at all possible to *"any"* older version of kafka once the 
inter.broker.protocol.version is updated to latest version *OR* downgrades are 
not possible only to versions *"<2.1"* ?
 # Suppose one takes an approach similar to upgrade even for the downgrade 
path. i.e. downgrade the inter.broker.protocol.version first to the previous 
version, next downgrade the software/code of kafka to previous release 
revision. Does downgrade work with this approach ?

Can these two questions be documented if the results are already known ?

Maybe a downgrade guide can be created too similar to the existing upgrade 
guide ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15223) Need more clarity in documentation for upgrade/downgrade procedures and limitations across releases.

2023-07-20 Thread kaushik srinivas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kaushik srinivas updated KAFKA-15223:
-
Summary: Need more clarity in documentation for upgrade/downgrade 
procedures and limitations across releases.  (was: Need clarity in 
documentation for upgrade/downgrade across releases.)

> Need more clarity in documentation for upgrade/downgrade procedures and 
> limitations across releases.
> 
>
> Key: KAFKA-15223
> URL: https://issues.apache.org/jira/browse/KAFKA-15223
> Project: Kafka
>  Issue Type: Improvement
>Reporter: kaushik srinivas
>Priority: Critical
>
> Referring to the upgrade documentation for apache kafka.
> [https://kafka.apache.org/34/documentation.html#upgrade_3_4_0]
> There is confusion with respect to below statements from the above sectioned 
> link of apache docs.
> "If you are upgrading from a version prior to 2.1.x, please see the note 
> below about the change to the schema used to store consumer offsets. *Once 
> you have changed the inter.broker.protocol.version to the latest version, it 
> will not be possible to downgrade to a version prior to 2.1."*
> The above statement mentions that the downgrade would not be possible to 
> version prior to "2.1" in case of "upgrading the 
> inter.broker.protocol.version to the latest version".
> But, there is another statement made in the documentation in *point 4* as 
> below
> "Restart the brokers one by one for the new protocol version to take effect. 
> {*}Once the brokers begin using the latest protocol version, it will no 
> longer be possible to downgrade the cluster to an older version.{*}"
>  
> These two statements are repeated across a lot of prior release of kafka and 
> is confusing.
> Below are the questions:
>  # Is downgrade not at all possible to *"any"* older version of kafka once 
> the inter.broker.protocol.version is updated to latest version *OR* 
> downgrades are not possible only to versions *"<2.1"* ?
>  # Suppose one takes an approach similar to upgrade even for the downgrade 
> path. i.e. downgrade the inter.broker.protocol.version first to the previous 
> version, next downgrade the software/code of kafka to previous release 
> revision. Does downgrade work with this approach ?
> Can these two questions be documented if the results are already known ?
> Maybe a downgrade guide can be created too similar to the existing upgrade 
> guide ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15223) Need more clarity in documentation for upgrade/downgrade procedures and limitations across releases.

2023-07-20 Thread kaushik srinivas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kaushik srinivas updated KAFKA-15223:
-
Description: 
Referring to the upgrade documentation for apache kafka.

[https://kafka.apache.org/34/documentation.html#upgrade_3_4_0]

There is confusion with respect to below statements from the above sectioned 
link of apache docs.

"If you are upgrading from a version prior to 2.1.x, please see the note below 
about the change to the schema used to store consumer offsets. *Once you have 
changed the inter.broker.protocol.version to the latest version, it will not be 
possible to downgrade to a version prior to 2.1."*

The above statement mentions that the downgrade would not be possible to 
version prior to "2.1" in case of "upgrading the inter.broker.protocol.version 
to the latest version".

But, there is another statement made in the documentation in *point 4* as below

"Restart the brokers one by one for the new protocol version to take effect. 
{*}Once the brokers begin using the latest protocol version, it will no longer 
be possible to downgrade the cluster to an older version.{*}"

 

These two statements are repeated across a lot of prior releases of kafka and 
is confusing.

Below are the questions:
 # Is downgrade not at all possible to *"any"* older version of kafka once the 
inter.broker.protocol.version is updated to latest version *OR* downgrades are 
not possible only to versions *"<2.1"* ?
 # Suppose one takes an approach similar to upgrade even for the downgrade 
path. i.e. downgrade the inter.broker.protocol.version first to the previous 
version, next downgrade the software/code of kafka to previous release 
revision. Does downgrade work with this approach ?

Can these two questions be documented if the results are already known ?

Maybe a downgrade guide can be created too similar to the existing upgrade 
guide ?

  was:
Referring to the upgrade documentation for apache kafka.

[https://kafka.apache.org/34/documentation.html#upgrade_3_4_0]

There is confusion with respect to below statements from the above sectioned 
link of apache docs.

"If you are upgrading from a version prior to 2.1.x, please see the note below 
about the change to the schema used to store consumer offsets. *Once you have 
changed the inter.broker.protocol.version to the latest version, it will not be 
possible to downgrade to a version prior to 2.1."*

The above statement mentions that the downgrade would not be possible to 
version prior to "2.1" in case of "upgrading the inter.broker.protocol.version 
to the latest version".

But, there is another statement made in the documentation in *point 4* as below

"Restart the brokers one by one for the new protocol version to take effect. 
{*}Once the brokers begin using the latest protocol version, it will no longer 
be possible to downgrade the cluster to an older version.{*}"

 

These two statements are repeated across a lot of prior release of kafka and is 
confusing.

Below are the questions:
 # Is downgrade not at all possible to *"any"* older version of kafka once the 
inter.broker.protocol.version is updated to latest version *OR* downgrades are 
not possible only to versions *"<2.1"* ?
 # Suppose one takes an approach similar to upgrade even for the downgrade 
path. i.e. downgrade the inter.broker.protocol.version first to the previous 
version, next downgrade the software/code of kafka to previous release 
revision. Does downgrade work with this approach ?

Can these two questions be documented if the results are already known ?

Maybe a downgrade guide can be created too similar to the existing upgrade 
guide ?


> Need more clarity in documentation for upgrade/downgrade procedures and 
> limitations across releases.
> 
>
> Key: KAFKA-15223
> URL: https://issues.apache.org/jira/browse/KAFKA-15223
> Project: Kafka
>  Issue Type: Improvement
>Reporter: kaushik srinivas
>Priority: Critical
>
> Referring to the upgrade documentation for apache kafka.
> [https://kafka.apache.org/34/documentation.html#upgrade_3_4_0]
> There is confusion with respect to below statements from the above sectioned 
> link of apache docs.
> "If you are upgrading from a version prior to 2.1.x, please see the note 
> below about the change to the schema used to store consumer offsets. *Once 
> you have changed the inter.broker.protocol.version to the latest version, it 
> will not be possible to downgrade to a version prior to 2.1."*
> The above statement mentions that the downgrade would not be possible to 
> version prior to "2.1" in case of "upgrading the 
> inter.broker.protocol.version to the latest version".
> But, there is another statement made in the documentation in *point 4* as 
> below
> "Res

[jira] [Updated] (KAFKA-15223) Need more clarity in documentation for upgrade/downgrade procedures and limitations across releases.

2023-07-20 Thread kaushik srinivas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kaushik srinivas updated KAFKA-15223:
-
Affects Version/s: 3.4.1
   3.5.0
   3.3.2
   3.3.1
   3.4.0

> Need more clarity in documentation for upgrade/downgrade procedures and 
> limitations across releases.
> 
>
> Key: KAFKA-15223
> URL: https://issues.apache.org/jira/browse/KAFKA-15223
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1
>Reporter: kaushik srinivas
>Priority: Critical
>
> Referring to the upgrade documentation for apache kafka.
> [https://kafka.apache.org/34/documentation.html#upgrade_3_4_0]
> There is confusion with respect to below statements from the above sectioned 
> link of apache docs.
> "If you are upgrading from a version prior to 2.1.x, please see the note 
> below about the change to the schema used to store consumer offsets. *Once 
> you have changed the inter.broker.protocol.version to the latest version, it 
> will not be possible to downgrade to a version prior to 2.1."*
> The above statement mentions that the downgrade would not be possible to 
> version prior to "2.1" in case of "upgrading the 
> inter.broker.protocol.version to the latest version".
> But, there is another statement made in the documentation in *point 4* as 
> below
> "Restart the brokers one by one for the new protocol version to take effect. 
> {*}Once the brokers begin using the latest protocol version, it will no 
> longer be possible to downgrade the cluster to an older version.{*}"
>  
> These two statements are repeated across a lot of prior releases of kafka and 
> is confusing.
> Below are the questions:
>  # Is downgrade not at all possible to *"any"* older version of kafka once 
> the inter.broker.protocol.version is updated to latest version *OR* 
> downgrades are not possible only to versions *"<2.1"* ?
>  # Suppose one takes an approach similar to upgrade even for the downgrade 
> path. i.e. downgrade the inter.broker.protocol.version first to the previous 
> version, next downgrade the software/code of kafka to previous release 
> revision. Does downgrade work with this approach ?
> Can these two questions be documented if the results are already known ?
> Maybe a downgrade guide can be created too similar to the existing upgrade 
> guide ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #14017: KAFKA-14500; [6/6] Implement SyncGroup protocol in new GroupCoordinator

2023-07-20 Thread via GitHub


dajac commented on code in PR #14017:
URL: https://github.com/apache/kafka/pull/14017#discussion_r1269398879


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2643,9 +2652,175 @@ private CoordinatorResult 
updateStaticMemberAndRebalance(
 group.stateAsString() + " when the unknown static member " + 
request.groupInstanceId() + " rejoins.");
 
 }
+return maybeCompleteJoinPhase(group);
+}
+
+public CoordinatorResult genericGroupSync(
+RequestContext context,
+SyncGroupRequestData request,
+CompletableFuture responseFuture
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false);
+Optional errorOpt = validateSyncGroup(group, request);
+if (errorOpt.isPresent()) {
+responseFuture.complete(new SyncGroupResponseData()
+.setErrorCode(errorOpt.get().code()));
+
+} else if (group.isInState(EMPTY)) {
+responseFuture.complete(new SyncGroupResponseData()
+.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
+
+} else if (group.isInState(PREPARING_REBALANCE)) {
+responseFuture.complete(new SyncGroupResponseData()
+.setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()));
+
+} else if (group.isInState(COMPLETING_REBALANCE)) {
+group.member(memberId).setAwaitingSyncFuture(responseFuture);
+removePendingSyncMember(group, request.memberId());
+
+// If this is the leader, then we can attempt to persist state and 
transition to stable
+if (group.isLeader(memberId)) {
+log.info("Assignment received from leader {} for group {} for 
generation {}. " +
+"The group has {} members, {} of which are static.",
+memberId, groupId, group.generationId(),
+group.size(), group.allStaticMemberIds().size());
+
+// Fill all members with corresponding assignment. Reset 
members not specified in
+// the assignment to empty assignments.
+Map assignments = new HashMap<>();
+request.assignments()
+.forEach(assignment -> 
assignments.put(assignment.memberId(), assignment.assignment()));
+
+Set membersWithMissingAssignment = new HashSet<>();
+group.allMembers().forEach(member -> {
+byte[] assignment = assignments.get(member.memberId());
+if (assignment != null) {
+member.setAssignment(assignment);
+} else {
+membersWithMissingAssignment.add(member.memberId());
+member.setAssignment(new byte[0]);
+}
+});

Review Comment:
   I am not sure. I lean towards keeping the implementation as it was to avoid 
any unwanted side effects.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14017: KAFKA-14500; [6/6] Implement SyncGroup protocol in new GroupCoordinator

2023-07-20 Thread via GitHub


dajac commented on code in PR #14017:
URL: https://github.com/apache/kafka/pull/14017#discussion_r1269401629


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2643,9 +2652,175 @@ private CoordinatorResult 
updateStaticMemberAndRebalance(
 group.stateAsString() + " when the unknown static member " + 
request.groupInstanceId() + " rejoins.");
 
 }
+return maybeCompleteJoinPhase(group);
+}
+
+public CoordinatorResult genericGroupSync(
+RequestContext context,
+SyncGroupRequestData request,
+CompletableFuture responseFuture
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+String groupId = request.groupId();
+String memberId = request.memberId();
+GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false);
+Optional errorOpt = validateSyncGroup(group, request);
+if (errorOpt.isPresent()) {
+responseFuture.complete(new SyncGroupResponseData()
+.setErrorCode(errorOpt.get().code()));
+
+} else if (group.isInState(EMPTY)) {
+responseFuture.complete(new SyncGroupResponseData()
+.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
+
+} else if (group.isInState(PREPARING_REBALANCE)) {
+responseFuture.complete(new SyncGroupResponseData()
+.setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()));
+
+} else if (group.isInState(COMPLETING_REBALANCE)) {
+group.member(memberId).setAwaitingSyncFuture(responseFuture);
+removePendingSyncMember(group, request.memberId());
+
+// If this is the leader, then we can attempt to persist state and 
transition to stable
+if (group.isLeader(memberId)) {
+log.info("Assignment received from leader {} for group {} for 
generation {}. " +
+"The group has {} members, {} of which are static.",
+memberId, groupId, group.generationId(),
+group.size(), group.allStaticMemberIds().size());
+
+// Fill all members with corresponding assignment. Reset 
members not specified in
+// the assignment to empty assignments.
+Map assignments = new HashMap<>();
+request.assignments()
+.forEach(assignment -> 
assignments.put(assignment.memberId(), assignment.assignment()));
+
+Set membersWithMissingAssignment = new HashSet<>();
+group.allMembers().forEach(member -> {
+byte[] assignment = assignments.get(member.memberId());
+if (assignment != null) {
+member.setAssignment(assignment);
+} else {
+membersWithMissingAssignment.add(member.memberId());
+member.setAssignment(new byte[0]);
+}
+});
+
+if (!membersWithMissingAssignment.isEmpty()) {
+log.warn("Setting empty assignments for members {} of {} 
for generation {}.",
+membersWithMissingAssignment, groupId, 
group.generationId());
+}
+
+CompletableFuture appendFuture = new 
CompletableFuture<>();
+appendFuture.whenComplete((__, t) -> {
+// Another member may have joined the group while we were 
awaiting this callback,
+// so we must ensure we are still in the 
CompletingRebalance state and the same generation
+// when it gets invoked. if we have transitioned to 
another state, then do nothing
+if (group.isInState(COMPLETING_REBALANCE) && 
request.generationId() == group.generationId()) {
+if (t != null) {
+Errors error = Errors.forException(t);
+resetAndPropagateAssignmentWithError(group, error);
+maybePrepareRebalanceOrCompleteJoin(group, "Error 
" + error + " when storing group assignment" +
+"during SyncGroup (member: " + memberId + 
").");
+} else {
+// Members' assignments were already updated. 
Propagate and transition to Stable.
+propagateAssignment(group, Errors.NONE);
+group.transitionTo(STABLE);
+}
+}
+});
+
+List records = Collections.singletonList(
+RecordHelpers.newGroupMetadataRecord(group, 
metadataImage.features().metadataVersion())
+);
+return new CoordinatorResult<>(records, appendFuture);
+}
+
+} else if (group.isInState(STABLE)) {
+removePendingSyncMember(grou

[GitHub] [kafka] bmscomp commented on pull request #14060: KAFKA-15222: Upgrade zinc Scala incremental compiler plugin version to a latests stable fit version (1.9.2)

2023-07-20 Thread via GitHub


bmscomp commented on PR #14060:
URL: https://github.com/apache/kafka/pull/14060#issuecomment-1643853720

   The current pull request CI, the errors related to zinc appeared again in 
current Jenkins build, rebasing the branch again will run the build again, the 
strange behaviour is that the related locked zinc file is pointing to an old 
version  of it 
   
   I am checking now the behaviour of the build, wait and see 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14057: KAFKA-15194-Prepend-Offset-as-Filename

2023-07-20 Thread via GitHub


Owen-CH-Leung commented on code in PR #14057:
URL: https://github.com/apache/kafka/pull/14057#discussion_r1269422337


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java:
##
@@ -399,20 +403,21 @@ public Verifier(final LocalTieredStorage remoteStorage, 
final TopicIdPartition t
 this.topicIdPartition = requireNonNull(topicIdPartition);
 }
 
-private List expectedPaths(final RemoteLogSegmentId id) {
+private List expectedPaths(final RemoteLogSegmentMetadata 
metadata) {
 final String rootPath = getStorageRootDirectory();
 TopicPartition tp = topicIdPartition.topicPartition();
 final String topicPartitionSubpath = format("%s-%d-%s", 
tp.topic(), tp.partition(),
 topicIdPartition.topicId());
-final String uuid = id.id().toString();
+final String uuid = metadata.remoteLogSegmentId().id().toString();
+final long startOffset = metadata.startOffset();
 
 return Arrays.asList(
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.LOG_FILE_SUFFIX),
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.INDEX_FILE_SUFFIX),
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.TIME_INDEX_FILE_SUFFIX),
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.TXN_INDEX_FILE_SUFFIX),
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LEADER_EPOCH_CHECKPOINT.getSuffix()),
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX)
+Paths.get(rootPath, topicPartitionSubpath, startOffset + 
"-" + uuid + LogFileUtils.LOG_FILE_SUFFIX),

Review Comment:
   @divijvaidya Thanks for your feedback. I think the actual log file was 
actually named as [offset].log. Looking at the implementation of 
`LogFileUtils#logFile(File dir, long offset)`, I don't think it will allow us 
to insert a uuid in the middle as part of the filename. 
   
   If we are to keep the `[offset-uuid.filetype]` pattern, instead of using 
`LogFileUtils#logFile(File dir, long offset)`, maybe we should make 
`LogFileUtils#filenamePrefixFromOffset(long offset)` as a public method so that 
we can construct a real offset using this method. What do you think ? 
   
   FYI, the method to create these offloaded files is 
`RemoteLogSegmentFileset#openFileset(final File storageDir, final 
RemoteLogSegmentId id)` . Currently my PR has changed this method to accept 
`RemoteLogSegmentMetadata` instead of `RemoteLogSegmentId` , get offset from 
metadata, and prepend it to the filename. (So yes, it's not close to the actual 
log file implementation, as the offset was just "0" without formatting, instead 
of "000")



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Owen-CH-Leung commented on a diff in pull request #14057: KAFKA-15194-Prepend-Offset-as-Filename

2023-07-20 Thread via GitHub


Owen-CH-Leung commented on code in PR #14057:
URL: https://github.com/apache/kafka/pull/14057#discussion_r1269422337


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java:
##
@@ -399,20 +403,21 @@ public Verifier(final LocalTieredStorage remoteStorage, 
final TopicIdPartition t
 this.topicIdPartition = requireNonNull(topicIdPartition);
 }
 
-private List expectedPaths(final RemoteLogSegmentId id) {
+private List expectedPaths(final RemoteLogSegmentMetadata 
metadata) {
 final String rootPath = getStorageRootDirectory();
 TopicPartition tp = topicIdPartition.topicPartition();
 final String topicPartitionSubpath = format("%s-%d-%s", 
tp.topic(), tp.partition(),
 topicIdPartition.topicId());
-final String uuid = id.id().toString();
+final String uuid = metadata.remoteLogSegmentId().id().toString();
+final long startOffset = metadata.startOffset();
 
 return Arrays.asList(
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.LOG_FILE_SUFFIX),
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.INDEX_FILE_SUFFIX),
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.TIME_INDEX_FILE_SUFFIX),
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.TXN_INDEX_FILE_SUFFIX),
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LEADER_EPOCH_CHECKPOINT.getSuffix()),
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX)
+Paths.get(rootPath, topicPartitionSubpath, startOffset + 
"-" + uuid + LogFileUtils.LOG_FILE_SUFFIX),

Review Comment:
   @divijvaidya Thanks for your feedback. I think the actual log file was named 
as [offset.filetype]. Looking at the implementation of 
`LogFileUtils#logFile(File dir, long offset)`, I don't think it will allow us 
to insert a uuid in the middle as part of the filename. 
   
   If we are to keep the `[offset-uuid.filetype]` pattern, instead of using 
`LogFileUtils#logFile(File dir, long offset)`, maybe we should make 
`LogFileUtils#filenamePrefixFromOffset(long offset)` as a public method so that 
we can construct a real offset using this method. What do you think ? 
   
   FYI, the method to create these offloaded files is 
`RemoteLogSegmentFileset#openFileset(final File storageDir, final 
RemoteLogSegmentId id)` . Currently my PR has changed this method to accept 
`RemoteLogSegmentMetadata` instead of `RemoteLogSegmentId` , get offset from 
metadata, and prepend it to the filename. (So yes, it's not close to the actual 
log file implementation, as the offset was just "0" without formatting, instead 
of "000")



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15224) Automate version change to snapshot

2023-07-20 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-15224:


 Summary: Automate version change to snapshot 
 Key: KAFKA-15224
 URL: https://issues.apache.org/jira/browse/KAFKA-15224
 Project: Kafka
  Issue Type: Sub-task
Reporter: Divij Vaidya


We require changing to SNAPSHOT version as part of the release process [1]. The 
specific manual steps are:

Update version on the branch to 0.10.0.1-SNAPSHOT in the following places:
 * 
 ** docs/js/templateData.js
 ** gradle.properties
 ** kafka-merge-pr.py
 ** streams/quickstart/java/pom.xml
 ** streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
 ** streams/quickstart/pom.xml
 ** tests/kafkatest/__init__.py (note: this version name can't follow the 
-SNAPSHOT convention due to python version naming restrictions, instead
update it to 0.10.0.1.dev0)
 ** tests/kafkatest/version.py

It would be nice if we could run a script to automatically do it. Note that 
release.py (line 550) already does something similar where it replaces SNAPSHOT 
with actual version. We need to do the opposite here. We can repurpose that 
code in release.py and extract into a new script to perform this opertaion.

[1] https://cwiki.apache.org/confluence/display/KAFKA/Release+Process



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15224) Automate version change to snapshot

2023-07-20 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15224:
-
Description: 
We require changing to SNAPSHOT version as part of the release process [1]. The 
specific manual steps are:

Update version on the branch to 0.10.0.1-SNAPSHOT in the following places:
 * 
 ** docs/js/templateData.js
 ** gradle.properties
 ** kafka-merge-pr.py
 ** streams/quickstart/java/pom.xml
 ** streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
 ** streams/quickstart/pom.xml
 ** tests/kafkatest/_{_}init{_}_.py (note: this version name can't follow the 
-SNAPSHOT convention due to python version naming restrictions, instead
update it to 0.10.0.1.dev0)
 ** tests/kafkatest/version.py

The diff of the changes look like 
[https://github.com/apache/kafka/commit/484a86feb562f645bdbec74b18f8a28395a686f7#diff-21a0ab11b8bbdab9930ad18d4bca2d943bbdf40d29d68ab8a96f765bd1f9]
 

 

It would be nice if we could run a script to automatically do it. Note that 
release.py (line 550) already does something similar where it replaces SNAPSHOT 
with actual version. We need to do the opposite here. We can repurpose that 
code in release.py and extract into a new script to perform this opertaion.

[1] [https://cwiki.apache.org/confluence/display/KAFKA/Release+Process]

  was:
We require changing to SNAPSHOT version as part of the release process [1]. The 
specific manual steps are:

Update version on the branch to 0.10.0.1-SNAPSHOT in the following places:
 * 
 ** docs/js/templateData.js
 ** gradle.properties
 ** kafka-merge-pr.py
 ** streams/quickstart/java/pom.xml
 ** streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
 ** streams/quickstart/pom.xml
 ** tests/kafkatest/__init__.py (note: this version name can't follow the 
-SNAPSHOT convention due to python version naming restrictions, instead
update it to 0.10.0.1.dev0)
 ** tests/kafkatest/version.py

It would be nice if we could run a script to automatically do it. Note that 
release.py (line 550) already does something similar where it replaces SNAPSHOT 
with actual version. We need to do the opposite here. We can repurpose that 
code in release.py and extract into a new script to perform this opertaion.

[1] https://cwiki.apache.org/confluence/display/KAFKA/Release+Process


> Automate version change to snapshot 
> 
>
> Key: KAFKA-15224
> URL: https://issues.apache.org/jira/browse/KAFKA-15224
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Priority: Minor
>
> We require changing to SNAPSHOT version as part of the release process [1]. 
> The specific manual steps are:
> Update version on the branch to 0.10.0.1-SNAPSHOT in the following places:
>  * 
>  ** docs/js/templateData.js
>  ** gradle.properties
>  ** kafka-merge-pr.py
>  ** streams/quickstart/java/pom.xml
>  ** streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
>  ** streams/quickstart/pom.xml
>  ** tests/kafkatest/_{_}init{_}_.py (note: this version name can't follow the 
> -SNAPSHOT convention due to python version naming restrictions, instead
> update it to 0.10.0.1.dev0)
>  ** tests/kafkatest/version.py
> The diff of the changes look like 
> [https://github.com/apache/kafka/commit/484a86feb562f645bdbec74b18f8a28395a686f7#diff-21a0ab11b8bbdab9930ad18d4bca2d943bbdf40d29d68ab8a96f765bd1f9]
>  
>  
> It would be nice if we could run a script to automatically do it. Note that 
> release.py (line 550) already does something similar where it replaces 
> SNAPSHOT with actual version. We need to do the opposite here. We can 
> repurpose that code in release.py and extract into a new script to perform 
> this opertaion.
> [1] [https://cwiki.apache.org/confluence/display/KAFKA/Release+Process]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15200) verify pre-requisite at start of release.py

2023-07-20 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15200:
-
Description: 
At the start of release.py, the first thing it should do is verify that 
dependency pre-requisites are satisfied. The pre-requisites are:
 # maven should be installed.
 # sftp should be installed. Connection to @home.apache.org should be 
successful. Currently it is done manually at the step "Verify by using 
`{{{}sftp @home.apache.org{}}}`" in 
[https://cwiki.apache.org/confluence/display/KAFKA/Release+Process]
 # svn should be installed

  was:
At the start of release.py, the first thing it should do is verify that 
dependency pre-requisites are satisfied. The pre-requisites are:

1. maven should be installed.
2. sftp should be installed. Connection to @home.apache.org should be 
successful. Currently it is done manually at the step "Verify by using 
`{{{}sftp @home.apache.org{}}}`" in 
[https://cwiki.apache.org/confluence/display/KAFKA/Release+Process] 


> verify pre-requisite at start of release.py
> ---
>
> Key: KAFKA-15200
> URL: https://issues.apache.org/jira/browse/KAFKA-15200
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Priority: Major
>
> At the start of release.py, the first thing it should do is verify that 
> dependency pre-requisites are satisfied. The pre-requisites are:
>  # maven should be installed.
>  # sftp should be installed. Connection to @home.apache.org should be 
> successful. Currently it is done manually at the step "Verify by using 
> `{{{}sftp @home.apache.org{}}}`" in 
> [https://cwiki.apache.org/confluence/display/KAFKA/Release+Process]
>  # svn should be installed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna commented on pull request #13942: KAFKA-14936: Check the versioned table's history retention and compare to grace period (4/N)

2023-07-20 Thread via GitHub


cadonna commented on PR #13942:
URL: https://github.com/apache/kafka/pull/13942#issuecomment-1643982125

   Build failures are unrelated:
   ```
   Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
   Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
   Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplication()
   Build / JDK 20 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testReplicateSourceDefault()
   Build / JDK 17 and Scala 2.13 / 
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #13942: KAFKA-14936: Check the versioned table's history retention and compare to grace period (4/N)

2023-07-20 Thread via GitHub


cadonna merged PR #13942:
URL: https://github.com/apache/kafka/pull/13942


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #14044: KAFKA-15216: InternalSinkRecord::newRecord should not ignore new headers

2023-07-20 Thread via GitHub


C0urante merged PR #14044:
URL: https://github.com/apache/kafka/pull/14044


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #14041: KAFKA-14469: Add MirrorMaker 2 configs to table of contents in docs page

2023-07-20 Thread via GitHub


C0urante merged PR #14041:
URL: https://github.com/apache/kafka/pull/14041


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14669) Include MirrorMaker connector configurations in docs

2023-07-20 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14669:
--
Priority: Major  (was: Blocker)

> Include MirrorMaker connector configurations in docs
> 
>
> Key: KAFKA-14669
> URL: https://issues.apache.org/jira/browse/KAFKA-14669
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Mickael Maison
>Assignee: Gantigmaa Selenge
>Priority: Major
> Fix For: 3.6.0
>
>
> In the https://kafka.apache.org/documentation/#georeplication-flow-configure 
> section we list some of the MirrorMaker connectors configurations. These are 
> hardcoded in the docs: 
> https://github.com/apache/kafka/blob/trunk/docs/ops.html#L768-L788
> Instead we should used the generated docs (added as part of 
> https://github.com/apache/kafka/commit/40af3a74507cce9155f4fb4fca317d3c68235d78)
>  like we do for the file connectors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14669) Include MirrorMaker connector configurations in docs

2023-07-20 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14669.
---
Resolution: Done

> Include MirrorMaker connector configurations in docs
> 
>
> Key: KAFKA-14669
> URL: https://issues.apache.org/jira/browse/KAFKA-14669
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs
>Reporter: Mickael Maison
>Assignee: Gantigmaa Selenge
>Priority: Major
> Fix For: 3.6.0
>
>
> In the https://kafka.apache.org/documentation/#georeplication-flow-configure 
> section we list some of the MirrorMaker connectors configurations. These are 
> hardcoded in the docs: 
> https://github.com/apache/kafka/blob/trunk/docs/ops.html#L768-L788
> Instead we should used the generated docs (added as part of 
> https://github.com/apache/kafka/commit/40af3a74507cce9155f4fb4fca317d3c68235d78)
>  like we do for the file connectors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15216) InternalSinkRecord::newRecord method ignores the headers argument

2023-07-20 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15216:
--
Fix Version/s: 3.5.2

> InternalSinkRecord::newRecord method ignores the headers argument
> -
>
> Key: KAFKA-15216
> URL: https://issues.apache.org/jira/browse/KAFKA-15216
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Minor
> Fix For: 3.6.0, 3.5.2
>
>
> [https://github.com/apache/kafka/blob/a1f6ab69387deb10988461152a0087f0cd2827c4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java#L50-L56]
>  - the headers argument passed to the {{InternalSinkRecord}} constructor is 
> the instance field via the accessor {{headers()}} method instead of the 
> {{newRecord}} method's {{headers}} argument value.
>  
> Originally discovered 
> [here.|https://github.com/apache/kafka/pull/14024#discussion_r1266917499]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15216) InternalSinkRecord::newRecord method ignores the headers argument

2023-07-20 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15216:
--
Fix Version/s: 3.4.2

> InternalSinkRecord::newRecord method ignores the headers argument
> -
>
> Key: KAFKA-15216
> URL: https://issues.apache.org/jira/browse/KAFKA-15216
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Minor
> Fix For: 3.6.0, 3.4.2, 3.5.2
>
>
> [https://github.com/apache/kafka/blob/a1f6ab69387deb10988461152a0087f0cd2827c4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java#L50-L56]
>  - the headers argument passed to the {{InternalSinkRecord}} constructor is 
> the instance field via the accessor {{headers()}} method instead of the 
> {{newRecord}} method's {{headers}} argument value.
>  
> Originally discovered 
> [here.|https://github.com/apache/kafka/pull/14024#discussion_r1266917499]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15216) InternalSinkRecord::newRecord method ignores the headers argument

2023-07-20 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15216:
--
Fix Version/s: 3.3.3

> InternalSinkRecord::newRecord method ignores the headers argument
> -
>
> Key: KAFKA-15216
> URL: https://issues.apache.org/jira/browse/KAFKA-15216
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Minor
> Fix For: 3.3.3, 3.6.0, 3.4.2, 3.5.2
>
>
> [https://github.com/apache/kafka/blob/a1f6ab69387deb10988461152a0087f0cd2827c4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/InternalSinkRecord.java#L50-L56]
>  - the headers argument passed to the {{InternalSinkRecord}} constructor is 
> the instance field via the accessor {{headers()}} method instead of the 
> {{newRecord}} method's {{headers}} argument value.
>  
> Originally discovered 
> [here.|https://github.com/apache/kafka/pull/14024#discussion_r1266917499]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] yashmayya commented on pull request #14024: KAFKA-13431: Expose the original pre-transform topic partition and offset in sink records

2023-07-20 Thread via GitHub


yashmayya commented on PR #14024:
URL: https://github.com/apache/kafka/pull/14024#issuecomment-1644061803

   Thanks Chris, I've rebased this on the latest `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bmscomp commented on pull request #14060: KAFKA-15222: Upgrade zinc Scala incremental compiler plugin version to a latests stable fit version (1.9.2)

2023-07-20 Thread via GitHub


bmscomp commented on PR #14060:
URL: https://github.com/apache/kafka/pull/14060#issuecomment-1644078470

   @It's ok now, things seems more stable, but there is some failure on 
building kafak with jdk 20 that has no relation with zinc compiler, 
   
   Notice that for all build the retry_zinc step is an without issue 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13313: KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime

2023-07-20 Thread via GitHub


gharris1727 commented on PR #13313:
URL: https://github.com/apache/kafka/pull/13313#issuecomment-1644151941

   I ran a full system test run:
   
   ```
   

   SESSION REPORT (ALL TESTS)
   ducktape version: 0.11.3
   session_id:   2023-07-18--002
   run time: 1602 minutes 29.170 seconds
   tests run:1096
   passed:   786
   flaky:0
   failed:   20
   ignored:  290
   

   ```
   
   With the following failed tests:
   ```
   
'tests/kafkatest/tests/core/throttling_test.py::ThrottlingTest.test_throttled_reassignment@{"bounce_brokers":true}'
 
   
'tests/kafkatest/tests/core/mirror_maker_test.py::TestMirrorMakerService.test_bounce@{"clean_shutdown":false,"security_protocol":"SASL_PLAINTEXT"}'
 
   
'tests/kafkatest/tests/core/mirror_maker_test.py::TestMirrorMakerService.test_bounce@{"clean_shutdown":false,"security_protocol":"SASL_SSL"}'
 
   
'tests/kafkatest/tests/core/reassign_partitions_test.py::ReassignPartitionsTest.test_reassign_partitions@{"bounce_brokers":false,"reassign_from_offset_zero":false,"metadata_quorum":"REMOTE_KRAFT"}'
 
   
'tests/kafkatest/tests/core/reassign_partitions_test.py::ReassignPartitionsTest.test_reassign_partitions@{"bounce_brokers":true,"reassign_from_offset_zero":false,"metadata_quorum":"REMOTE_KRAFT"}'
 
   
'tests/kafkatest/tests/core/reassign_partitions_test.py::ReassignPartitionsTest.test_reassign_partitions@{"bounce_brokers":true,"reassign_from_offset_zero":false,"metadata_quorum":"ZK"}'
 
   
'tests/kafkatest/tests/streams/streams_smoke_test.py::StreamsSmokeTest.test_streams@{"processing_guarantee":"at_least_once","crash":false,"metadata_quorum":"REMOTE_KRAFT"}'
 
   
'tests/kafkatest/tests/core/mirror_maker_test.py::TestMirrorMakerService.test_bounce@{"clean_shutdown":false,"security_protocol":"PLAINTEXT"}'
 
   
'tests/kafkatest/tests/core/mirror_maker_test.py::TestMirrorMakerService.test_bounce@{"clean_shutdown":false,"security_protocol":"SSL"}'
 
   
'tests/kafkatest/tests/tools/replica_verification_test.py::ReplicaVerificationToolTest.test_replica_lags@{"metadata_quorum":"REMOTE_KRAFT"}'
 
   
'tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota@{"quota_type":"(user,
 client-id)","override_quota":false}' 
   
'tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota@{"quota_type":"(user,
 client-id)","override_quota":true}' 
   
'tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota@{"quota_type":"client-id","consumer_num":2}'
 
   
'tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota@{"quota_type":"client-id","old_broker_throttling_behavior":true}'
 
   
'tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota@{"quota_type":"client-id","old_client_throttling_behavior":true}'
 
   
'tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota@{"quota_type":"client-id","override_quota":false}'
 
   
'tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota@{"quota_type":"client-id","override_quota":true}'
 
   
'tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota@{"quota_type":"user","override_quota":false}'
 
   
'tests/kafkatest/tests/client/quota_test.py::QuotaTest.test_quota@{"quota_type":"user","override_quota":true}'
 
   
'tests/kafkatest/tests/core/network_degrade_test.py::NetworkDegradeTest.test_rate@{"task_name":"rate-1000-latency-50","device_name":"eth0","latency_ms":50,"rate_limit_kbit":100}'
   ```
   
   None of which make use of the 0.8.2.x artifacts version which is being 
affected here. In particular, the test which I was concerned about 
(upgrade_test.py 
from_kafka_version=0.8.2.2.to_message_format_version=None.compression_types=.snappy
 FAIL) does pass on this i86_64 machine when it failed on my arm64 machine, 
indicating that the failure was due to native library dependencies missing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #14057: KAFKA-15194-Prepend-Offset-as-Filename

2023-07-20 Thread via GitHub


divijvaidya commented on code in PR #14057:
URL: https://github.com/apache/kafka/pull/14057#discussion_r1269646588


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentFileset.java:
##
@@ -59,9 +59,9 @@
  * the local tiered storage:
  *
  * 
- * / storage-directory / topic-partition-uuidBase64 / 
oAtiIQ95REujbuzNd_lkLQ.log
- *  . 
oAtiIQ95REujbuzNd_lkLQ.index
- *  . 
oAtiIQ95REujbuzNd_lkLQ.timeindex
+ * / storage-directory / topic-partition-uuidBase64 / 
startOffset-oAtiIQ95REujbuzNd_lkLQ.log

Review Comment:
   nit
   
   Please replace "startOffset" with dummy values.



##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorageTest.java:
##
@@ -399,20 +403,21 @@ public Verifier(final LocalTieredStorage remoteStorage, 
final TopicIdPartition t
 this.topicIdPartition = requireNonNull(topicIdPartition);
 }
 
-private List expectedPaths(final RemoteLogSegmentId id) {
+private List expectedPaths(final RemoteLogSegmentMetadata 
metadata) {
 final String rootPath = getStorageRootDirectory();
 TopicPartition tp = topicIdPartition.topicPartition();
 final String topicPartitionSubpath = format("%s-%d-%s", 
tp.topic(), tp.partition(),
 topicIdPartition.topicId());
-final String uuid = id.id().toString();
+final String uuid = metadata.remoteLogSegmentId().id().toString();
+final long startOffset = metadata.startOffset();
 
 return Arrays.asList(
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.LOG_FILE_SUFFIX),
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.INDEX_FILE_SUFFIX),
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.TIME_INDEX_FILE_SUFFIX),
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.TXN_INDEX_FILE_SUFFIX),
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LEADER_EPOCH_CHECKPOINT.getSuffix()),
-Paths.get(rootPath, topicPartitionSubpath, uuid + 
LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX)
+Paths.get(rootPath, topicPartitionSubpath, startOffset + 
"-" + uuid + LogFileUtils.LOG_FILE_SUFFIX),

Review Comment:
   > I don't think it will allow us to insert a uuid in the middle as part of 
the filename.
   
   Ack. I missed that.
   
   > maybe we should make LogFileUtils#filenamePrefixFromOffset(long offset) as 
a public method so that we can construct a real offset using this method. What 
do you think ?
   
   Yes please. Let's use that.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13313: KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime

2023-07-20 Thread via GitHub


gharris1727 commented on PR #13313:
URL: https://github.com/apache/kafka/pull/13313#issuecomment-1644153313

   @ijuma Could you take another look at this? This is blocking KIP-898 that 
I'm trying to get landed in time for 3.6.0. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13998: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-20 Thread via GitHub


rreddy-22 commented on code in PR #13998:
URL: https://github.com/apache/kafka/pull/13998#discussion_r1269665264


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java:
##
@@ -52,33 +42,22 @@ public Map members() {
 return members;
 }
 
-/**
- * @return Topic metadata keyed by topic Ids.
- */
-public Map topics() {
-return topics;
-}
-
 @Override
 public boolean equals(Object o) {
 if (this == o) return true;
-if (o == null || getClass() != o.getClass()) return false;
+if (!(o instanceof AssignmentSpec)) return false;

Review Comment:
   Sry I just auto-generated these functions, is there a reason why one is 
better than the other?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya merged pull request #13874: KAFKA-14133: Migrate various mocks in TaskManagerTest to Mockito

2023-07-20 Thread via GitHub


divijvaidya merged PR #13874:
URL: https://github.com/apache/kafka/pull/13874


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #13990: KAFKA-14937: Refactoring for client code to reduce boilerplate

2023-07-20 Thread via GitHub


junrao commented on PR #13990:
URL: https://github.com/apache/kafka/pull/13990#issuecomment-1644221461

   @kirktrue : It seems there were 4 test failures for jdk 11. But the tests 
for jdk 17 and 20 were aborted. Do you know why?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah opened a new pull request, #14062: MINOR: Add a Builder for KRaftMigrationDriver

2023-07-20 Thread via GitHub


mumrah opened a new pull request, #14062:
URL: https://github.com/apache/kafka/pull/14062

   The number of arguments for KRaftMigrationDriver has grown rather large and 
there are already two constructors. This PR refactors the class to have a 
single package-private constructor and a builder that can be used by tests and 
ControllerServer. 
   
   No other changes in this patch, just refactoring the constructor.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error

2023-07-20 Thread via GitHub


mumrah commented on code in PR #14046:
URL: https://github.com/apache/kafka/pull/14046#discussion_r1269710067


##
clients/src/main/java/org/apache/kafka/common/errors/StaleMemberEpochException.java:
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+@InterfaceStability.Evolving
+public class StaleMemberEpochException extends ApiException {

Review Comment:
   I know there isn't much precedent for this, but it might be useful to 
include a doc string here explaining which RPC this error is used in and at 
what version



##
clients/src/main/resources/common/message/OffsetCommitRequest.json:
##
@@ -31,13 +31,19 @@
   // version 7 adds a new field called groupInstanceId to indicate member 
identity across restarts.
   //
   // Version 8 is the first flexible version.
-  "validVersions": "0-8",
+  //
+  // Version 9 is the first version that can be used with the new consumer 
group protocol (KIP-848). The
+  // request is the same as version 8.
+  // Version 9 is added as part of KIP-848 and is still under development. 
Hence, the last version of the
+  // API is not exposed by default by brokers unless explicitly enabled.
+  "latestVersionUnstable": true,
+  "validVersions": "0-9",
   "flexibleVersions": "8+",
   "fields": [
 { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
   "about": "The unique group identifier." },
-{ "name": "GenerationId", "type": "int32", "versions": "1+", "default": 
"-1", "ignorable": true,
-  "about": "The generation of the group." },
+{ "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "1+", 
"default": "-1", "ignorable": true,
+  "about": "The generation of the group if the generic group protocol or 
the member epoch if the consumer protocol." },

Review Comment:
   How does the server decide to interpret this value as a GenerationId vs a 
MemberEpoch? Is it based on the API version used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error

2023-07-20 Thread via GitHub


jolshan commented on code in PR #14046:
URL: https://github.com/apache/kafka/pull/14046#discussion_r1269711912


##
clients/src/main/resources/common/message/OffsetCommitRequest.json:
##
@@ -31,13 +31,19 @@
   // version 7 adds a new field called groupInstanceId to indicate member 
identity across restarts.
   //
   // Version 8 is the first flexible version.
-  "validVersions": "0-8",
+  //
+  // Version 9 is the first version that can be used with the new consumer 
group protocol (KIP-848). The
+  // request is the same as version 8.
+  // Version 9 is added as part of KIP-848 and is still under development. 
Hence, the last version of the
+  // API is not exposed by default by brokers unless explicitly enabled.
+  "latestVersionUnstable": true,
+  "validVersions": "0-9",
   "flexibleVersions": "8+",
   "fields": [
 { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
   "about": "The unique group identifier." },
-{ "name": "GenerationId", "type": "int32", "versions": "1+", "default": 
"-1", "ignorable": true,
-  "about": "The generation of the group." },
+{ "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "1+", 
"default": "-1", "ignorable": true,
+  "about": "The generation of the group if the generic group protocol or 
the member epoch if the consumer protocol." },

Review Comment:
   The new group coordinator uses the member epoch and the old one uses the 
generation id I believe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related

2023-07-20 Thread via GitHub


mumrah commented on code in PR #14047:
URL: https://github.com/apache/kafka/pull/14047#discussion_r1269713183


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -467,6 +471,72 @@ public static Record newEmptyGroupMetadataRecord(
 );
 }
 
+/**
+ * Creates an OffsetCommit record.
+ *
+ * @param groupId   The group id.
+ * @param topic The topic name.
+ * @param partitionId   The partition id.
+ * @param offsetAndMetadata The offset and metadata.
+ * @param metadataVersion   The metadata version.
+ * @return The record.
+ */
+public static Record newOffsetCommitRecord(
+String groupId,
+String topic,
+int partitionId,
+OffsetAndMetadata offsetAndMetadata,
+MetadataVersion metadataVersion
+) {
+short version = offsetAndMetadata.expireTimestampMs.isPresent() ?
+(short) 1 : metadataVersion.offsetCommitValueVersion();
+
+return new Record(
+new ApiMessageAndVersion(
+new OffsetCommitKey()
+.setGroup(groupId)
+.setTopic(topic)
+.setPartition(partitionId),
+(short) 1

Review Comment:
   Can we define these `(short) 1` as a constant? That might reduce the changes 
of us changing one without the others in the future



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -467,6 +471,72 @@ public static Record newEmptyGroupMetadataRecord(
 );
 }
 
+/**
+ * Creates an OffsetCommit record.
+ *
+ * @param groupId   The group id.
+ * @param topic The topic name.
+ * @param partitionId   The partition id.
+ * @param offsetAndMetadata The offset and metadata.
+ * @param metadataVersion   The metadata version.
+ * @return The record.
+ */
+public static Record newOffsetCommitRecord(
+String groupId,
+String topic,
+int partitionId,
+OffsetAndMetadata offsetAndMetadata,
+MetadataVersion metadataVersion
+) {
+short version = offsetAndMetadata.expireTimestampMs.isPresent() ?
+(short) 1 : metadataVersion.offsetCommitValueVersion();

Review Comment:
   Would it make sense to relocate this logic and the linked logic into 
MetadataVersion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error

2023-07-20 Thread via GitHub


jolshan commented on PR #14046:
URL: https://github.com/apache/kafka/pull/14046#issuecomment-1644264091

   > @jolshan I was actually thinking about the AuthorizerIntegrationTest 
failures overnight and I found an issue with the latestVersionUnstable flag. 
Let me try to explain.
   
   I was curious if the unstable version flag was causing issues since I recall 
some weirdness in tests when I had an unstable version. Makes sense to require 
the unstable-ness to be explicit, but I will take a second look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error

2023-07-20 Thread via GitHub


dajac commented on code in PR #14046:
URL: https://github.com/apache/kafka/pull/14046#discussion_r1269732280


##
clients/src/main/resources/common/message/OffsetCommitRequest.json:
##
@@ -31,13 +31,19 @@
   // version 7 adds a new field called groupInstanceId to indicate member 
identity across restarts.
   //
   // Version 8 is the first flexible version.
-  "validVersions": "0-8",
+  //
+  // Version 9 is the first version that can be used with the new consumer 
group protocol (KIP-848). The
+  // request is the same as version 8.
+  // Version 9 is added as part of KIP-848 and is still under development. 
Hence, the last version of the
+  // API is not exposed by default by brokers unless explicitly enabled.
+  "latestVersionUnstable": true,
+  "validVersions": "0-9",
   "flexibleVersions": "8+",
   "fields": [
 { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
   "about": "The unique group identifier." },
-{ "name": "GenerationId", "type": "int32", "versions": "1+", "default": 
"-1", "ignorable": true,
-  "about": "The generation of the group." },
+{ "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "1+", 
"default": "-1", "ignorable": true,
+  "about": "The generation of the group if the generic group protocol or 
the member epoch if the consumer protocol." },

Review Comment:
   It based on the type of the group. In the new group coordinator, we have two 
types: generic (the old protocol) and consumer (the new protocol). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related

2023-07-20 Thread via GitHub


dajac commented on code in PR #14047:
URL: https://github.com/apache/kafka/pull/14047#discussion_r1269737507


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -467,6 +471,72 @@ public static Record newEmptyGroupMetadataRecord(
 );
 }
 
+/**
+ * Creates an OffsetCommit record.
+ *
+ * @param groupId   The group id.
+ * @param topic The topic name.
+ * @param partitionId   The partition id.
+ * @param offsetAndMetadata The offset and metadata.
+ * @param metadataVersion   The metadata version.
+ * @return The record.
+ */
+public static Record newOffsetCommitRecord(
+String groupId,
+String topic,
+int partitionId,
+OffsetAndMetadata offsetAndMetadata,
+MetadataVersion metadataVersion
+) {
+short version = offsetAndMetadata.expireTimestampMs.isPresent() ?
+(short) 1 : metadataVersion.offsetCommitValueVersion();

Review Comment:
   Yeah, I was debating whether the 
`offsetAndMetadata.expireTimestampMs.isPresent()` part of this should be in 
MetadataVersion or not. I could pass a boolean for this purpose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related

2023-07-20 Thread via GitHub


dajac commented on code in PR #14047:
URL: https://github.com/apache/kafka/pull/14047#discussion_r1269744467


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -467,6 +471,72 @@ public static Record newEmptyGroupMetadataRecord(
 );
 }
 
+/**
+ * Creates an OffsetCommit record.
+ *
+ * @param groupId   The group id.
+ * @param topic The topic name.
+ * @param partitionId   The partition id.
+ * @param offsetAndMetadata The offset and metadata.
+ * @param metadataVersion   The metadata version.
+ * @return The record.
+ */
+public static Record newOffsetCommitRecord(
+String groupId,
+String topic,
+int partitionId,
+OffsetAndMetadata offsetAndMetadata,
+MetadataVersion metadataVersion
+) {
+short version = offsetAndMetadata.expireTimestampMs.isPresent() ?
+(short) 1 : metadataVersion.offsetCommitValueVersion();
+
+return new Record(
+new ApiMessageAndVersion(
+new OffsetCommitKey()
+.setGroup(groupId)
+.setTopic(topic)
+.setPartition(partitionId),
+(short) 1

Review Comment:
   I actually used the value on purpose vs using something like 
`ConsumerGroupTargetAssignmentMemberKey.HIGHEST_SUPPORTED_VERSION ` in order to 
not change it by mistake.
   
   I wanted to rework the format of those records to include an api key and to 
auto-generate the constants based on it. In the mean time, we could define them 
manually. Do you mind if I address separably though? I will do it for all the 
records at once.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related

2023-07-20 Thread via GitHub


dajac commented on code in PR #14047:
URL: https://github.com/apache/kafka/pull/14047#discussion_r1269744467


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -467,6 +471,72 @@ public static Record newEmptyGroupMetadataRecord(
 );
 }
 
+/**
+ * Creates an OffsetCommit record.
+ *
+ * @param groupId   The group id.
+ * @param topic The topic name.
+ * @param partitionId   The partition id.
+ * @param offsetAndMetadata The offset and metadata.
+ * @param metadataVersion   The metadata version.
+ * @return The record.
+ */
+public static Record newOffsetCommitRecord(
+String groupId,
+String topic,
+int partitionId,
+OffsetAndMetadata offsetAndMetadata,
+MetadataVersion metadataVersion
+) {
+short version = offsetAndMetadata.expireTimestampMs.isPresent() ?
+(short) 1 : metadataVersion.offsetCommitValueVersion();
+
+return new Record(
+new ApiMessageAndVersion(
+new OffsetCommitKey()
+.setGroup(groupId)
+.setTopic(topic)
+.setPartition(partitionId),
+(short) 1

Review Comment:
   I actually used the value on purpose vs using something like 
`ConsumerGroupTargetAssignmentMemberKey.HIGHEST_SUPPORTED_VERSION ` in order to 
not change it by mistake.
   
   I wanted to rework the format of those records to include an api key and to 
auto-generate the constants based on it. In the mean time, we could define them 
manually. Do you mind if I address this separably though? I will do it for all 
the records at once.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #14046: KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error

2023-07-20 Thread via GitHub


jolshan commented on PR #14046:
URL: https://github.com/apache/kafka/pull/14046#issuecomment-1644292908

   Looking at the tests `[Build / JDK 20 and Scala 2.13 / 
kafka.server.FetchRequestTest.testCurrentEpochValidationV12()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14046/7/testReport/junit/kafka.server/FetchRequestTest/Build___JDK_20_and_Scala_2_13___testCurrentEpochValidationV12__/)`
 is a bit strange but it only failed on that version. Everything else seems to 
be familiar-ish flakes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] msn-tldr opened a new pull request, #14063: Kip951 poc

2023-07-20 Thread via GitHub


msn-tldr opened a new pull request, #14063:
URL: https://github.com/apache/kafka/pull/14063

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-07-20 Thread via GitHub


rreddy-22 commented on PR #13920:
URL: https://github.com/apache/kafka/pull/13920#issuecomment-1644359975

   Looks good to me! Thanks @flashmouse for the changes and replies! @dajac is 
a committer so he'll give the final approval!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #14047: KAFKA-14499: [2/N] Add OffsetCommit record & related

2023-07-20 Thread via GitHub


dajac commented on code in PR #14047:
URL: https://github.com/apache/kafka/pull/14047#discussion_r1269799340


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##
@@ -467,6 +471,72 @@ public static Record newEmptyGroupMetadataRecord(
 );
 }
 
+/**
+ * Creates an OffsetCommit record.
+ *
+ * @param groupId   The group id.
+ * @param topic The topic name.
+ * @param partitionId   The partition id.
+ * @param offsetAndMetadata The offset and metadata.
+ * @param metadataVersion   The metadata version.
+ * @return The record.
+ */
+public static Record newOffsetCommitRecord(
+String groupId,
+String topic,
+int partitionId,
+OffsetAndMetadata offsetAndMetadata,
+MetadataVersion metadataVersion
+) {
+short version = offsetAndMetadata.expireTimestampMs.isPresent() ?
+(short) 1 : metadataVersion.offsetCommitValueVersion();
+
+return new Record(
+new ApiMessageAndVersion(
+new OffsetCommitKey()
+.setGroup(groupId)
+.setTopic(topic)
+.setPartition(partitionId),
+(short) 1

Review Comment:
   Filed https://issues.apache.org/jira/browse/KAFKA-15225 for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15225) Define constants for record types

2023-07-20 Thread David Jacot (Jira)
David Jacot created KAFKA-15225:
---

 Summary: Define constants for record types
 Key: KAFKA-15225
 URL: https://issues.apache.org/jira/browse/KAFKA-15225
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot
Assignee: David Jacot


Define constants for all the record types. Ideally, this should be defined in 
the record definitions and the constants should be auto-generated (e.g. like 
ApiKeys).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13998: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-20 Thread via GitHub


rreddy-22 commented on code in PR #13998:
URL: https://github.com/apache/kafka/pull/13998#discussion_r1269837065


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicDescriber.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Set;
+
+/**
+ * The assignment topic describer is used by the {@link PartitionAssignor}
+ * to obtain topic and partition metadata of subscribed topics.
+ *
+ * The interface is kept in an internal module until KIP-848 is fully
+ * implemented and ready to be released.
+ */
+@InterfaceStability.Unstable
+public interface AssignmentTopicDescriber {

Review Comment:
   Yeah I named it this way cause I was just wondering if it'd be more uniform 
with assignmentSpec but I'll change it cause I agree



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13998: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-20 Thread via GitHub


rreddy-22 commented on code in PR #13998:
URL: https://github.com/apache/kafka/pull/13998#discussion_r1269846580


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicDescriber.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Set;
+
+/**
+ * The assignment topic describer is used by the {@link PartitionAssignor}
+ * to obtain topic and partition metadata of subscribed topics.
+ *
+ * The interface is kept in an internal module until KIP-848 is fully
+ * implemented and ready to be released.
+ */
+@InterfaceStability.Unstable
+public interface AssignmentTopicDescriber {
+
+/**
+ * Returns a set of subscribed topicIds.
+ *
+ * @return Set of topicIds corresponding to the subscribed topics.
+ */
+Set subscribedTopicIds();
+
+/**
+ * Number of partitions for the given topicId.

Review Comment:
   It says topicId singular already, did we want a space between topic and Id
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15226) System tests for plugin.discovery worker configuration

2023-07-20 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15226:
---

 Summary: System tests for plugin.discovery worker configuration
 Key: KAFKA-15226
 URL: https://issues.apache.org/jira/browse/KAFKA-15226
 Project: Kafka
  Issue Type: Test
  Components: KafkaConnect
Reporter: Greg Harris
Assignee: Greg Harris


Add system tests as described in KIP-898, targeting the startup behavior of the 
connect worker, various states of plugin migration, and the migration script.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15227) Use plugin.discovery=SERVICE_LOAD in all plugin test suites

2023-07-20 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15227:
---

 Summary: Use plugin.discovery=SERVICE_LOAD in all plugin test 
suites
 Key: KAFKA-15227
 URL: https://issues.apache.org/jira/browse/KAFKA-15227
 Project: Kafka
  Issue Type: Test
  Components: KafkaConnect
Reporter: Greg Harris
Assignee: Greg Harris


To speed up these tests where we know all plugins are migrated, use 
SERVICE_LOAD mode in all known test suites.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 merged pull request #13313: KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime

2023-07-20 Thread via GitHub


gharris1727 merged PR #13313:
URL: https://github.com/apache/kafka/pull/13313


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13313: KAFKA-14760: Move ThroughputThrottler from tools to clients, remove tools dependency from connect-runtime

2023-07-20 Thread via GitHub


gharris1727 commented on PR #13313:
URL: https://github.com/apache/kafka/pull/13313#issuecomment-1644520071

   Thanks for your help Ismael!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors

2023-07-20 Thread via GitHub


C0urante commented on code in PR #14005:
URL: https://github.com/apache/kafka/pull/14005#discussion_r1269901272


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##
@@ -597,7 +596,9 @@ private Set listPartitions(
 Admin admin,
 Collection topics
 ) throws TimeoutException, InterruptedException, ExecutionException {
-assertFalse("collection of topics may not be empty", topics.isEmpty());

Review Comment:
   Did the same in `assertConnectorAndExactlyNumTasksAreRunning`.



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -65,27 +72,170 @@ static String encodeTopicPartition(TopicPartition 
topicPartition) {
 
 static Map wrapPartition(TopicPartition topicPartition, 
String sourceClusterAlias) {
 Map wrapped = new HashMap<>();
-wrapped.put("topic", topicPartition.topic());
-wrapped.put("partition", topicPartition.partition());
-wrapped.put("cluster", sourceClusterAlias);
+wrapped.put(TOPIC_KEY, topicPartition.topic());
+wrapped.put(PARTITION_KEY, topicPartition.partition());
+wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias);
 return wrapped;
 }
 
-static Map wrapOffset(long offset) {
-return Collections.singletonMap("offset", offset);
+public static Map wrapOffset(long offset) {
+return Collections.singletonMap(OFFSET_KEY, offset);
 }
 
-static TopicPartition unwrapPartition(Map wrapped) {
-String topic = (String) wrapped.get("topic");
-int partition = (Integer) wrapped.get("partition");
+public static TopicPartition unwrapPartition(Map wrapped) {
+String topic = (String) wrapped.get(TOPIC_KEY);
+int partition = (Integer) wrapped.get(PARTITION_KEY);
 return new TopicPartition(topic, partition);
 }
 
 static Long unwrapOffset(Map wrapped) {
-if (wrapped == null || wrapped.get("offset") == null) {
+if (wrapped == null || wrapped.get(OFFSET_KEY) == null) {
 return -1L;
 }
-return (Long) wrapped.get("offset");
+return (Long) wrapped.get(OFFSET_KEY);
+}
+
+
+/**
+ * Validate a specific key in a source partition that may be written to 
the offsets topic for one of the MM2 connectors.
+ * This method ensures that the key is present in the source partition map 
and that its value is a string.
+ *
+ * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, 
Map)
+ * @see SourceRecord#sourcePartition()
+ *
+ * @param sourcePartition the to-be-validated source partition; may not be 
null
+ * @param key the key to check for in the source partition; may be null
+ *
+ * @throws ConnectException if the offset is invalid
+ */
+static void validateSourcePartitionString(Map sourcePartition, 
String key) {
+Objects.requireNonNull(sourcePartition, "Source partition may not be 
null");
+
+if (!sourcePartition.containsKey(key))
+throw new ConnectException(String.format(
+"Source partition %s is missing the '%s' key, which is 
required",
+sourcePartition,
+key
+));
+
+Object value = sourcePartition.get(key);
+if (!(value instanceof String)) {
+throw new ConnectException(String.format(
+"Source partition %s has an invalid value %s for the '%s' 
key, which must be a string",
+sourcePartition,
+value,
+key
+));
+}
+}
+
+/**
+ * Validate the {@link #PARTITION_KEY partition key} in a source partition 
that may be written to the offsets topic
+ * for one of the MM2 connectors.
+ * This method ensures that the key is present in the source partition map 
and that its value is a non-negative integer.
+ * 
+ * Note that the partition key most likely refers to a partition in a 
Kafka topic, whereas the term "source partition" refers
+ * to a {@link SourceRecord#sourcePartition() source partition} that is 
stored in a Kafka Connect worker's internal offsets
+ * topic (or, if running in standalone mode, offsets file).
+ *
+ * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, 
Map)
+ * @see SourceRecord#sourcePartition()
+ *
+ * @param sourcePartition the to-be-validated source partition; may not be 
null
+ *
+ * @throws ConnectException if the offset is invalid
+ */
+static void validateSourcePartitionPartition(Map 
sourcePartition) {
+Objects.requireNonNull(sourcePartition, "Source partition may not be 
null");
+
+if (!sourcePartition.containsKey(PARTITION_KEY))
+throw new ConnectException(String.format(
+"Source partition %s is missing the '%s' key

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13998: KAFKA-14702: Extend server side assignor to support rack aware replica placement

2023-07-20 Thread via GitHub


rreddy-22 commented on code in PR #13998:
URL: https://github.com/apache/kafka/pull/13998#discussion_r1269927386


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicDescriber.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Set;
+
+/**
+ * The assignment topic describer is used by the {@link PartitionAssignor}
+ * to obtain topic and partition metadata of subscribed topics.
+ *
+ * The interface is kept in an internal module until KIP-848 is fully
+ * implemented and ready to be released.
+ */
+@InterfaceStability.Unstable
+public interface AssignmentTopicDescriber {
+
+/**
+ * Returns a set of subscribed topicIds.
+ *
+ * @return Set of topicIds corresponding to the subscribed topics.
+ */
+Set subscribedTopicIds();
+
+/**
+ * Number of partitions for the given topicId.
+ *
+ * @param topicId   Uuid corresponding to the topic.
+ * @return The number of partitions corresponding to the given topicId.
+ * If the topicId doesn't exist return 0;
+ */
+int numPartitions(Uuid topicId);
+
+/**
+ * Returns all the racks associated with the replicas for the given 
partition.
+ *
+ * @param topicId   Uuid corresponding to the partition's topic.
+ * @param partition Partition number within topic.

Review Comment:
   partition number is used a lot throughout the kafka code and I thought it's 
easier to understand than Id even though they're interchangeable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >