[GitHub] [kafka] dengziming commented on a change in pull request #11173: MINOR: Support max timestamp in GetOffsetShell
dengziming commented on a change in pull request #11173: URL: https://github.com/apache/kafka/pull/11173#discussion_r691845925 ## File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala ## @@ -224,9 +227,14 @@ object GetOffsetShell { /** * Return the partition infos. Filter them with topicPartitionFilter. */ - private def listPartitionInfos(consumer: KafkaConsumer[_, _], topicPartitionFilter: PartitionInfo => Boolean): Seq[PartitionInfo] = { -consumer.listTopics.asScala.values.flatMap { partitions => - partitions.asScala.filter(topicPartitionFilter) + private def listPartitionInfos(client: Admin, topicPartitionFilter: PartitionInfo => Boolean, listInternal: Boolean): Seq[PartitionInfo] = { +val topics = client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names().get().asScala Review comment: I changed the method from creating a topic-partition filter to create a topic filter and a topic-partition filter, the topic filter is used to filter topics before fetching and the topic-partition filter is used to filter topic-partitions after receiving topic metadata. PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11230: KAFKA-12840; Removing `compact` cleaning on a topic should abort on-going compactions
dajac commented on pull request #11230: URL: https://github.com/apache/kafka/pull/11230#issuecomment-901717039 @lbradstreet I have updated the PR based on your feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance
dajac commented on a change in pull request #11231: URL: https://github.com/apache/kafka/pull/11231#discussion_r692154377 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -476,7 +476,6 @@ boolean joinGroupIfNeeded(final Timer timer) { else if (!future.isRetriable()) throw exception; -resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception)); Review comment: Your reasoning makes sense to me. From a first read, the PR looks pretty good. I will make a second pass on Monday to ensure that I cover all the cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r692162163 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, Review comment: Right. Reusing `makeFollowers` would not work out of the box. I am not sure to follow your point regarding the possibility of overriding a pending isr state. I thought that only the leader updates the isr state and the followers only updates it via `updateAssignmentAndIsr` based on the controller state. I might be missing something. I am not against having `updateTopicIdForFollowers` but I wanted to ensure that we have thought about all the options. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r692164057 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1420,6 +1423,10 @@ class ReplicaManager(val config: KafkaConfig, } val partitionsToBeFollower = partitionStates.filter { case (k, _) => !partitionsToBeLeader.contains(k) } + val partitionsToUpdateIdForFollower = topicIdUpdatePartitions.filter { case (_, partitionState) => +partitionState.leader != localBrokerId + } Review comment: Could we avoid having to re-iterate over `topicIdUpdatePartitions` by applying the condition before adding the partition to `topicIdUpdatePartitions`? I have opened a PR to do the same for the two collections above: https://github.com/apache/kafka/pull/11225 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r692164654 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1544,7 +1554,7 @@ class ReplicaManager(val config: KafkaConfig, // replica from source dir to destination dir logManager.abortAndPauseCleaning(topicPartition) - futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(leader, + futureReplicasAndInitialOffset.put(topicPartition, InitialFetchState(topicIds(topicPartition.topic()), leader, Review comment: nit: parenthesis not required after `topic` ;) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r692165645 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, +controllerEpoch: Int, +partitionStates: Map[Partition, LeaderAndIsrPartitionState], +correlationId: Int, +responseMap: mutable.Map[TopicPartition, Errors], +topicIds: String => Option[Uuid]) : Set[Partition] = { Review comment: Is `topicIds` necessary? I suppose that we could get the `topicId` from the `Partition`, isn't it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r692166774 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, +controllerEpoch: Int, +partitionStates: Map[Partition, LeaderAndIsrPartitionState], +correlationId: Int, +responseMap: mutable.Map[TopicPartition, Errors], +topicIds: String => Option[Uuid]) : Set[Partition] = { +val traceLoggingEnabled = stateChangeLogger.isTraceEnabled +// Do we need this? Review comment: I don't think so. Those logs, which are exactly the same that we have in `makeFollowers`, would be really confusing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r692168365 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, +controllerEpoch: Int, +partitionStates: Map[Partition, LeaderAndIsrPartitionState], +correlationId: Int, +responseMap: mutable.Map[TopicPartition, Errors], +topicIds: String => Option[Uuid]) : Set[Partition] = { +val traceLoggingEnabled = stateChangeLogger.isTraceEnabled +// Do we need this? +partitionStates.forKeyValue { (partition, partitionState) => + if (traceLoggingEnabled) +stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " + + s"epoch $controllerEpoch starting the become-follower transition for partition ${partition.topicPartition} with leader " + + s"${partitionState.leader}") + responseMap.put(partition.topicPartition, Errors.NONE) +} + +val partitionsToUpdateFollower: mutable.Set[Partition] = mutable.Set() Review comment: nit: We could omit specifying the type here and use `mutable.Set.empty[Partition]`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r692169643 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, +controllerEpoch: Int, +partitionStates: Map[Partition, LeaderAndIsrPartitionState], +correlationId: Int, +responseMap: mutable.Map[TopicPartition, Errors], +topicIds: String => Option[Uuid]) : Set[Partition] = { +val traceLoggingEnabled = stateChangeLogger.isTraceEnabled +// Do we need this? +partitionStates.forKeyValue { (partition, partitionState) => + if (traceLoggingEnabled) +stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " + + s"epoch $controllerEpoch starting the become-follower transition for partition ${partition.topicPartition} with leader " + + s"${partitionState.leader}") + responseMap.put(partition.topicPartition, Errors.NONE) +} + +val partitionsToUpdateFollower: mutable.Set[Partition] = mutable.Set() +try { + partitionStates.forKeyValue { (partition, partitionState) => +val newLeaderBrokerId = partitionState.leader + if (metadataCache.hasAliveBroker(newLeaderBrokerId)) { Review comment: Intuitively, I would have thoughts that we don't need this. The broker is already a follower in this case so the leader should be alive. What's your take on this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r692175152 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -82,7 +82,7 @@ class AbstractFetcherThreadTest { // add one partition to create the consumer lag metric fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0)) -fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 0))) +fetcher.addPartitions(Map(partition -> initialFetchState(Some(topicIds.get(partition.topic())),0L, leaderEpoch = 0))) Review comment: nit: If we keep `topicIds` as a Scala map, we would get an Option directly with `topicIds.get(...)`. Also, a space is missing after the `,`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
dajac commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r692175152 ## File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala ## @@ -82,7 +82,7 @@ class AbstractFetcherThreadTest { // add one partition to create the consumer lag metric fetcher.setReplicaState(partition, MockFetcherThread.PartitionState(leaderEpoch = 0)) -fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 0))) +fetcher.addPartitions(Map(partition -> initialFetchState(Some(topicIds.get(partition.topic())),0L, leaderEpoch = 0))) Review comment: nit: If we keep `topicIds` as a Scala map, we would get an Option directly with `topicIds.get(...)`. Also, a space is missing after the `,` and the `()` could be omitted ;). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance
vvcephei commented on pull request #11231: URL: https://github.com/apache/kafka/pull/11231#issuecomment-901975557 Thanks @hachikuji ! FYI: I ran three full batches of system tests on your PR branch, and did not see a failure in that test or any other consistent test failure: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-08-18--001.system-test-kafka-branch-builder--1629346306--hachikuji--KAFKA-13214--e5782597f3/report.html http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-08-18--001.system-test-kafka-branch-builder--1629346035--hachikuji--KAFKA-13214--e5782597f3/report.html http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-08-18--001.system-test-kafka-branch-builder--1629346129--hachikuji--KAFKA-13214--e5782597f3/report.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on pull request #11230: KAFKA-12840; Removing `compact` cleaning on a topic should abort on-going compactions
lbradstreet commented on pull request #11230: URL: https://github.com/apache/kafka/pull/11230#issuecomment-901975481 @dajac the updated changes look good to me overall. It seems like we're hitting some gradle executor errors across multiple runs, so it may not be a coincidence. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11230: KAFKA-12840; Removing `compact` cleaning on a topic should abort on-going compactions
dajac commented on pull request #11230: URL: https://github.com/apache/kafka/pull/11230#issuecomment-901980004 Let me check those. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] andricDu commented on pull request #8103: KAFKA-7061: KIP-280 Enhanced log compaction
andricDu commented on pull request #8103: URL: https://github.com/apache/kafka/pull/8103#issuecomment-902003678 Bump. Status update on this feature? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akatona84 commented on pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely
akatona84 commented on pull request #11174: URL: https://github.com/apache/kafka/pull/11174#issuecomment-902006686 > Thanks for the PR, it looks good. Is it possible to add a test for it? @mimaison , Thanks for the review, unfortunately that code part where the restclient is used, not unit tested at all. It would need more effort to fully understand how the distributed herder could be tested better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11229: KAFKA-12961; Verify group generation in `DelayedJoin`
dajac commented on a change in pull request #11229: URL: https://github.com/apache/kafka/pull/11229#discussion_r692245978 ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -1391,78 +1392,92 @@ class GroupCoordinator(val brokerId: Int, } } - def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean): Boolean = { + def tryCompleteJoin( +group: GroupMetadata, +generationId: Int, +forceComplete: () => Boolean + ): Boolean = { group.inLock { - if (group.hasAllMembersJoined) + if (generationId != group.generationId) { +forceComplete() + } else if (group.hasAllMembersJoined) { forceComplete() - else false + } else false } } - def onCompleteJoin(group: GroupMetadata): Unit = { + def onCompleteJoin( +group: GroupMetadata, +generationId: Int + ): Unit = { group.inLock { - val notYetRejoinedDynamicMembers = group.notYetRejoinedMembers.filterNot(_._2.isStaticMember) - if (notYetRejoinedDynamicMembers.nonEmpty) { -info(s"Group ${group.groupId} removed dynamic members " + - s"who haven't joined: ${notYetRejoinedDynamicMembers.keySet}") - -notYetRejoinedDynamicMembers.values.foreach { failedMember => - group.remove(failedMember.memberId) - removeHeartbeatForLeavingMember(group, failedMember.memberId) -} - } - - if (group.is(Dead)) { -info(s"Group ${group.groupId} is dead, skipping rebalance stage") - } else if (!group.maybeElectNewJoinedLeader() && group.allMembers.nonEmpty) { -// If all members are not rejoining, we will postpone the completion -// of rebalance preparing stage, and send out another delayed operation -// until session timeout removes all the non-responsive members. -error(s"Group ${group.groupId} could not complete rebalance because no members rejoined") -rebalancePurgatory.tryCompleteElseWatch( - new DelayedJoin(this, group, group.rebalanceTimeoutMs), - Seq(GroupJoinKey(group.groupId))) + if (generationId != group.generationId) { +error(s"Received unexpected notification of join complete for ${group.groupId} " + + s"with an old generation $generationId while the group has ${group.generationId}.") } else { -group.initNextGeneration() -if (group.is(Empty)) { - info(s"Group ${group.groupId} with generation ${group.generationId} is now empty " + - s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})") +val notYetRejoinedDynamicMembers = group.notYetRejoinedMembers.filterNot(_._2.isStaticMember) Review comment: That's correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11229: KAFKA-12961; Verify group generation in `DelayedJoin`
dajac commented on pull request #11229: URL: https://github.com/apache/kafka/pull/11229#issuecomment-902020042 > Thanks for the PR. It makes sense to me. Would it be possible to add a test for the case when a late `DelayedJoin` completes? Yeah, let me see if I can add something. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] spena commented on a change in pull request #11235: KAFKA-13216: write correct tombstones into stream-stream join store changelog
spena commented on a change in pull request #11235: URL: https://github.com/apache/kafka/pull/11235#discussion_r692180473 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java ## @@ -214,6 +223,128 @@ public void testLeftJoinDuplicates() { } } +@Test +public void shouldSendTombstoneForLeftJoinCandidatesRocksDb() { Review comment: Could you add a similar test to the KStreamKStreamOuterJoinTest? ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java ## @@ -72,27 +74,47 @@ public long segmentTimestamp(final Bytes key) { } /** - * {@inheritdoc} + * {@inheritDoc} * * This method is optimized for {@link RocksDBTimeOrderedWindowStore#all()} only. Key and time * range queries are not supported. */ @Override public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) { -if (binaryKeyFrom != null || binaryKeyTo != null) { -throw new IllegalArgumentException("binaryKeyFrom/binaryKeyTo keys cannot be non-null. Key and time range queries are not supported."); +if (binaryKeyFrom == null && binaryKeyTo == null && from == 0 && to == Long.MAX_VALUE) { +return Iterator::hasNext; } -if (from != 0 && to != Long.MAX_VALUE) { -throw new IllegalArgumentException("from/to time ranges should be 0 to Long.MAX_VALUE. Key and time range queries are not supported."); +if (binaryKeyFrom != null && binaryKeyFrom.equals(binaryKeyTo) && from == to) { + +return iterator -> { +while (iterator.hasNext()) { +final Bytes bytes = iterator.peekNextKey(); +final Bytes keyBytes = Bytes + .wrap(TimeOrderedKeySchema.extractStoreKeyBytes(bytes.get())); +final long time = TimeOrderedKeySchema.extractStoreTimestamp(bytes.get()); +if (keyBytes.compareTo(binaryKeyFrom) >= 0 +&& keyBytes.compareTo(binaryKeyTo) <= 0 +&& time >= from +&& time <= to) { +return true; +} +iterator.next(); +} +return false; +}; } -return iterator -> iterator.hasNext(); +throw new IllegalArgumentException("Key and time range queries are not supported."); } @Override public List segmentsToSearch(final Segments segments, final long from, final long to, final boolean forward) { Review comment: Add some tests in TimeOrderedKeySchemaTest. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java ## @@ -72,27 +74,47 @@ public long segmentTimestamp(final Bytes key) { } /** - * {@inheritdoc} + * {@inheritDoc} * * This method is optimized for {@link RocksDBTimeOrderedWindowStore#all()} only. Key and time * range queries are not supported. */ @Override public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) { Review comment: TimeOrderedKeySchemaTest ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java ## @@ -72,27 +74,47 @@ public long segmentTimestamp(final Bytes key) { } /** - * {@inheritdoc} + * {@inheritDoc} * * This method is optimized for {@link RocksDBTimeOrderedWindowStore#all()} only. Key and time * range queries are not supported. */ @Override public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) { -if (binaryKeyFrom != null || binaryKeyTo != null) { -throw new IllegalArgumentException("binaryKeyFrom/binaryKeyTo keys cannot be non-null. Key and time range queries are not supported."); +if (binaryKeyFrom == null && binaryKeyTo == null && from == 0 && to == Long.MAX_VALUE) { +return Iterator::hasNext; } -if (from != 0 && to != Long.MAX_VALUE) { -throw new IllegalArgumentException("from/to time ranges should be 0 to Long.MAX_VALUE. Key and time range queries are not supported."); +if (binaryKeyFrom != null && binaryKeyFrom.equals(binaryKeyTo) && from == to) { + +return iterator -> { +while (iterator.hasNext()) { +final Bytes bytes = iterator.peekNextKey(); +final Bytes keyBytes = Bytes + .wrap(TimeOrderedKeySchema.extractStoreKeyBytes(bytes.get())); +final long time = Tim
[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception
[ https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401736#comment-17401736 ] BurningIce commented on KAFKA-4669: --- I'm using kafka-clients 2.5.0, also encountering this issue at consumer-side. I turned on the trace level for kafka log after the error occured (not before) It seems that the correlation-id in the previous log "Using older server API v7 to send OFFSET_COMMIT ... with correlation id" does not match with the one appeared in the exception message. Will the logs help? [~rsivaram] 08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 (id: 6 rack: null) has not been processed 08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 (id: 6 rack: null) has not been processed 08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 (id: 6 rack: null) has not been processed 08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 (id: 6 rack: null) has not been processed 08-19 23:29:13 [pool-21-thread-1] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Sending asynchronous auto-commit of offsets {q-app-jvm-1=OffsetAndMetadata{offset=13500194, leaderEpoch=5, metadata=''}} 08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Sending OffsetCommit request with {q-app-jvm-1=OffsetAndMetadata{offset=13500194, leaderEpoch=5, metadata=''}} to coordinator apmkafka10:9092 (id: 2147483643 rack: null) 08-19 23:29:13 [pool-21-thread-1] DEBUG o.a.k.c.NetworkClient - [Consumer clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Using older server API v7 to send OFFSET_COMMIT {group_id=cg-apm-dc-wrap-jvm,generation_id=222,member_id=consumer-cg-apm-dc-wrap-jvm-1-c9eec325-434f-4495-b040-7522d5c2ec87,group_instance_id=null,topics=[{name=q-app-jvm,partitions=[{partition_index=1,committed_offset=13500194,committed_leader_epoch=5,committed_metadata=}]}]} with correlation id 13993458 to node 2147483643 08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 (id: 6 rack: null) has not been processed 08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 (id: 6 rack: null) has not been processed 08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 (id: 6 rack: null) has not been processed 08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 (id: 6 rack: null) has not been processed 08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 (id: 6 rack: null) has not been processed 08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 (id: 6 rack: null) has not been processed 08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 (id: 6 rack: null) has not been processed 08-19 23:29:13 [pool-21-thread-1] ERROR c.n.m.k.KafkaTopicConsumer - unexpected error to get message from kafka: Correlation id for response (13993458) does not match request (13993456), request header: RequestHeader(apiKey=OFFSET_COMMIT,
[GitHub] [kafka] mjsax commented on a change in pull request #11235: KAFKA-13216: write correct tombstones into stream-stream join store changelog
mjsax commented on a change in pull request #11235: URL: https://github.com/apache/kafka/pull/11235#discussion_r692305287 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java ## @@ -43,12 +45,12 @@ @Override public Bytes upperRange(final Bytes key, final long to) { -throw new UnsupportedOperationException(); +return null; Review comment: Works for me. @guozhangwang WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11235: KAFKA-13216: write correct tombstones into stream-stream join store changelog
mjsax commented on a change in pull request #11235: URL: https://github.com/apache/kafka/pull/11235#discussion_r692305627 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java ## @@ -72,27 +74,47 @@ public long segmentTimestamp(final Bytes key) { } /** - * {@inheritdoc} + * {@inheritDoc} * * This method is optimized for {@link RocksDBTimeOrderedWindowStore#all()} only. Key and time * range queries are not supported. */ @Override public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) { -if (binaryKeyFrom != null || binaryKeyTo != null) { -throw new IllegalArgumentException("binaryKeyFrom/binaryKeyTo keys cannot be non-null. Key and time range queries are not supported."); +if (binaryKeyFrom == null && binaryKeyTo == null && from == 0 && to == Long.MAX_VALUE) { +return Iterator::hasNext; } -if (from != 0 && to != Long.MAX_VALUE) { -throw new IllegalArgumentException("from/to time ranges should be 0 to Long.MAX_VALUE. Key and time range queries are not supported."); +if (binaryKeyFrom != null && binaryKeyFrom.equals(binaryKeyTo) && from == to) { + +return iterator -> { +while (iterator.hasNext()) { +final Bytes bytes = iterator.peekNextKey(); +final Bytes keyBytes = Bytes + .wrap(TimeOrderedKeySchema.extractStoreKeyBytes(bytes.get())); +final long time = TimeOrderedKeySchema.extractStoreTimestamp(bytes.get()); +if (keyBytes.compareTo(binaryKeyFrom) >= 0 +&& keyBytes.compareTo(binaryKeyTo) <= 0 +&& time >= from +&& time <= to) { +return true; +} +iterator.next(); +} +return false; +}; } -return iterator -> iterator.hasNext(); +throw new IllegalArgumentException("Key and time range queries are not supported."); } @Override public List segmentsToSearch(final Segments segments, final long from, final long to, final boolean forward) { -throw new UnsupportedOperationException(); +if (from != to) { +throw new IllegalArgumentException(""); Review comment: Ups. This one slipped... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei opened a new pull request #11236: MINOR: Set session timeout back to 10s for Streams system tests
vvcephei opened a new pull request #11236: URL: https://github.com/apache/kafka/pull/11236 We increased the default session timeout to 30s in KIP-735: https://cwiki.apache.org/confluence/display/KAFKA/KIP-735%3A+Increase+default+consumer+session+timeout Since then, we are observing sporadic system test failures due to rebalances taking longer than the test timeout. Rather than increase the test wait times, we can just override the session timeout to a value more appropriate in the testing domain. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] officialpatterson commented on pull request #11215: kafka-12994: Migrate TimeWindowsTest to new API
officialpatterson commented on pull request #11215: URL: https://github.com/apache/kafka/pull/11215#issuecomment-902088249 Doesn't look like I have permissions to assign reviewers to this PR @ableegoldman. Also, fairly new to this, but it looks like these test failures are down to a flakey integration test, is that correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401763#comment-17401763 ] Sagar Rao commented on KAFKA-13152: --- Thanks [~mjsax]/[~guozhang]. I will start writing a KIP and send out an email soon. > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #11236: MINOR: Set session timeout back to 10s for Streams system tests
vvcephei commented on pull request #11236: URL: https://github.com/apache/kafka/pull/11236#issuecomment-902094127 system test run: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4657/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11236: MINOR: Set session timeout back to 10s for Streams system tests
vvcephei commented on a change in pull request #11236: URL: https://github.com/apache/kafka/pull/11236#discussion_r692334355 ## File path: tests/kafkatest/services/streams.py ## @@ -569,16 +572,17 @@ def __init__(self, test_context, kafka): def prop_file(self): properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT, - streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()} - -properties['topology.optimization'] = self.OPTIMIZED_CONFIG -properties['input.topic'] = self.INPUT_TOPIC -properties['aggregation.topic'] = self.AGGREGATION_TOPIC -properties['reduce.topic'] = self.REDUCE_TOPIC -properties['join.topic'] = self.JOIN_TOPIC + streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), + 'topology.optimization': self.OPTIMIZED_CONFIG, + 'input.topic': self.INPUT_TOPIC, + 'aggregation.topic': self.AGGREGATION_TOPIC, + 'reduce.topic': self.REDUCE_TOPIC, + 'join.topic': self.JOIN_TOPIC, + "acceptable.recovery.lag": "9223372036854775807", # enable a one-shot assignment + "session.timeout.ms": "1" # set back to 10s for tests. See KIP-735 + } # Long.MAX_VALUE lets us do the assignment without a warmup Review comment: ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11236: MINOR: Set session timeout back to 10s for Streams system tests
vvcephei commented on a change in pull request #11236: URL: https://github.com/apache/kafka/pull/11236#discussion_r692334950 ## File path: tests/kafkatest/services/streams.py ## @@ -659,14 +663,15 @@ def __init__(self, test_context, kafka): def prop_file(self): properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT, - streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()} - -properties['input.topic'] = self.INPUT_TOPIC -properties['aggregation.topic'] = self.AGGREGATION_TOPIC -properties['add.operations'] = self.ADD_ADDITIONAL_OPS + streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), + 'input.topic': self.INPUT_TOPIC, + 'aggregation.topic': self.AGGREGATION_TOPIC, + 'add.operations': self.ADD_ADDITIONAL_OPS, + "acceptable.recovery.lag": "9223372036854775807", # enable a one-shot assignment + "session.timeout.ms": "1" # set back to 10s for tests. See KIP-735 + } # Long.MAX_VALUE lets us do the assignment without a warmup Review comment: ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11236: MINOR: Set session timeout back to 10s for Streams system tests
vvcephei commented on a change in pull request #11236: URL: https://github.com/apache/kafka/pull/11236#discussion_r692335138 ## File path: tests/kafkatest/services/streams.py ## @@ -686,12 +691,13 @@ def prop_file(self): streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), streams_property.NUM_THREADS: self.NUM_THREADS, consumer_property.GROUP_INSTANCE_ID: self.GROUP_INSTANCE_ID, - consumer_property.SESSION_TIMEOUT_MS: 6} - -properties['input.topic'] = self.INPUT_TOPIC + consumer_property.SESSION_TIMEOUT_MS: 6, + 'input.topic': self.INPUT_TOPIC, + "acceptable.recovery.lag": "9223372036854775807", # enable a one-shot assignment + "session.timeout.ms": "1" # set back to 10s for tests. See KIP-735 + } # Long.MAX_VALUE lets us do the assignment without a warmup Review comment: ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
jolshan commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r692338540 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, Review comment: there is an issue with alter isr which is partially explained in this comment: ``` * With the addition of AlterIsr, we also consider newly added replicas as part of the ISR when advancing * the HW. These replicas have not yet been committed to the ISR by the controller, so we could revert to the previously * committed ISR. However, adding additional replicas to the ISR makes it more restrictive and therefor safe. We call * this set the "maximal" ISR. See KIP-497 for more details ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path
jolshan commented on a change in pull request #11170: URL: https://github.com/apache/kafka/pull/11170#discussion_r692343425 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig, partitionsToMakeFollower } + private def updateTopicIdForFollowers(controllerId: Int, +controllerEpoch: Int, +partitionStates: Map[Partition, LeaderAndIsrPartitionState], +correlationId: Int, +responseMap: mutable.Map[TopicPartition, Errors], +topicIds: String => Option[Uuid]) : Set[Partition] = { Review comment: I think you are right. makeFollowers needs it to assign the topic ID to the log for the first time, but for this PR, we've assigned it above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata
[ https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12984: --- Fix Version/s: 2.8.1 > Cooperative sticky assignor can get stuck with invalid SubscriptionState > input metadata > --- > > Key: KAFKA-12984 > URL: https://issues.apache.org/jira/browse/KAFKA-12984 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Blocker > Fix For: 3.0.0, 2.8.1 > > > Some users have reported seeing their consumer group become stuck in the > CompletingRebalance phase when using the cooperative-sticky assignor. Based > on the request metadata we were able to deduce that multiple consumers were > reporting the same partition(s) in their "ownedPartitions" field of the > consumer protocol. Since this is an invalid state, the input causes the > cooperative-sticky assignor to detect that something is wrong and throw an > IllegalStateException. If the consumer application is set up to simply retry, > this will cause the group to appear to hang in the rebalance state. > The "ownedPartitions" field is encoded based on the ConsumerCoordinator's > SubscriptionState, which was assumed to always be up to date. However there > may be cases where the consumer has dropped out of the group but fails to > clear the SubscriptionState, allowing it to report some partitions as owned > that have since been reassigned to another member. > We should (a) fix the sticky assignment algorithm to resolve cases of > improper input conditions by invalidating the "ownedPartitions" in cases of > double ownership, and (b) shore up the ConsumerCoordinator logic to better > handle rejoining the group and keeping its internal state consistent. See > KAFKA-12983 for more details on (b) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9320) Enable TLSv1.3 by default and disable some of the older protocols
[ https://issues.apache.org/jira/browse/KAFKA-9320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401818#comment-17401818 ] Yiming Zang commented on KAFKA-9320: We have seen some regression after enabling and upgraded to TLS1.3 with Kafka version of 2.7.0, we have been seeing very frequent EOFException and disconnection: [2021-08-13 06:07:26,069] WARN [ReplicaFetcher replicaId=18, leaderId=20, fetcherId=0] Unexpected error from atla-alo-26-sr1.prod.twttr.net/10.41.44.125; closing connection (org.apache.kafka.common.network.Selector) java.io.EOFException: EOF during read at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:627) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:118) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:466) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:416) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:729) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:620) at org.apache.kafka.common.network.Selector.poll(Selector.java:520) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:562) at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:96) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110) at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:211) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:310) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:143) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:142) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:122) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) We have to rollback to use TLS1.2 and that solves the EOFException issue > Enable TLSv1.3 by default and disable some of the older protocols > - > > Key: KAFKA-9320 > URL: https://issues.apache.org/jira/browse/KAFKA-9320 > Project: Kafka > Issue Type: New Feature > Components: security >Reporter: Rajini Sivaram >Assignee: Nikolay Izhikov >Priority: Major > Labels: needs-kip > Fix For: 2.6.0 > > Attachments: report.txt > > > KAFKA-7251 added support for TLSv1.3. We should include this in the list of > protocols that are enabled by default. We should also disable some of the > older protocols that are not secure. This change requires a KIP. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9320) Enable TLSv1.3 by default and disable some of the older protocols
[ https://issues.apache.org/jira/browse/KAFKA-9320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401818#comment-17401818 ] Yiming Zang edited comment on KAFKA-9320 at 8/19/21, 6:45 PM: -- We have seen some regression after enabling and upgraded to TLS1.3 with Kafka version of 2.7.0, we have been seeing very frequent EOFException and disconnection: {code:java} [2021-08-13 06:07:26,069] WARN [ReplicaFetcher replicaId=18, leaderId=20, fetcherId=0] Unexpected error from atla-alo-26-sr1.prod.twttr.net/10.41.44.125; closing connection (org.apache.kafka.common.network.Selector) java.io.EOFException: EOF during read at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:627) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:118) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:466) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:416) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:729) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:620) at org.apache.kafka.common.network.Selector.poll(Selector.java:520) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:562) at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:96) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110) at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:211) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:310) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:143) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:142) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:122) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code} We have to rollback to use TLS1.2 and that solves the EOFException issue was (Author: yzang): We have seen some regression after enabling and upgraded to TLS1.3 with Kafka version of 2.7.0, we have been seeing very frequent EOFException and disconnection: [2021-08-13 06:07:26,069] WARN [ReplicaFetcher replicaId=18, leaderId=20, fetcherId=0] Unexpected error from atla-alo-26-sr1.prod.twttr.net/10.41.44.125; closing connection (org.apache.kafka.common.network.Selector) java.io.EOFException: EOF during read at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:627) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:118) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:466) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:416) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:729) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:620) at org.apache.kafka.common.network.Selector.poll(Selector.java:520) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:562) at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:96) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110) at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:211) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:310) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:143) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:142) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:122) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) We have to rollback to use TLS1.2 and that solves the EOFException issue > Enable TLSv1.3 by default and disable some of the older protocols > - > > Key: KAFKA-9320 > URL: https://issues.apache.org/jira/browse/KAFKA-9320 > Project: Kafka > Issue Type: New Feature > Components: security >Reporter: Rajini Sivaram >Assignee: Nikolay Izhikov >Priority: Major > Labels: needs-kip > Fix For: 2.6.0 > > Attachments: report.txt > > > KAFKA-7251 added support for TLSv1.3. We should include this in the list of > protocols that are enabled by default. We should also disable some of the > older protocols that are not secure. This change requires a KIP. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio opened a new pull request #11237: MINOR: Fix how the last borker id is computed
jsancio opened a new pull request #11237: URL: https://github.com/apache/kafka/pull/11237 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request #11238: MINOR: Fix force kill of KRaft colocated controllers in system tests
rondagostino opened a new pull request #11238: URL: https://github.com/apache/kafka/pull/11238 I noticed that a system test using a KRaft cluster with 3 brokers but only 1 co-located controller did not force-kill the second and third broker after shutting down the first broker (the one with the controller). The issue was a floating point rounding error. This patch adjusts for the rounding error and also makes the logic work for an even number of controllers. A local run of `tests/kafkatest/sanity_checks/test_bounce.py` succeeded (and I manually increased the cluster size for the 1 co-located controller case and observed the correct kill behavior: the second and third brokers were force-killed as expected). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-8613) Make Grace Period Mandatory in Windowed Operations
[ https://issues.apache.org/jira/browse/KAFKA-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Israel Ekpo updated KAFKA-8613: --- Summary: Make Grace Period Mandatory in Windowed Operations (was: Make grace period mandatory) > Make Grace Period Mandatory in Windowed Operations > -- > > Key: KAFKA-8613 > URL: https://issues.apache.org/jira/browse/KAFKA-8613 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: Israel Ekpo >Priority: Blocker > Labels: kip > Fix For: 3.0.0 > > > Currently, the grace period is set to retention time if the grace period is > not specified explicitly. The reason for setting the default grace period to > retention time was backward compatibility. Topologies that were implemented > before the introduction of the grace period, added late arriving records to a > window as long as the window existed, i.e., as long as its retention time was > not elapsed. > This unintuitive default grace period has already caused confusion among > users. > For the next major release, we should consider to set the default grace > period to {{Duration.ZERO}}. or to make it a mandatory parameter. > > KIP-633 > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8613) Make Grace Period Mandatory for Windowed Operations in Streams
[ https://issues.apache.org/jira/browse/KAFKA-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Israel Ekpo updated KAFKA-8613: --- Summary: Make Grace Period Mandatory for Windowed Operations in Streams (was: Make Grace Period Mandatory in Windowed Operations) > Make Grace Period Mandatory for Windowed Operations in Streams > -- > > Key: KAFKA-8613 > URL: https://issues.apache.org/jira/browse/KAFKA-8613 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: Israel Ekpo >Priority: Blocker > Labels: kip > Fix For: 3.0.0 > > > Currently, the grace period is set to retention time if the grace period is > not specified explicitly. The reason for setting the default grace period to > retention time was backward compatibility. Topologies that were implemented > before the introduction of the grace period, added late arriving records to a > window as long as the window existed, i.e., as long as its retention time was > not elapsed. > This unintuitive default grace period has already caused confusion among > users. > For the next major release, we should consider to set the default grace > period to {{Duration.ZERO}}. or to make it a mandatory parameter. > > KIP-633 > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13219) BrokerState metric not working for KRaft clusters
Ron Dagostino created KAFKA-13219: - Summary: BrokerState metric not working for KRaft clusters Key: KAFKA-13219 URL: https://issues.apache.org/jira/browse/KAFKA-13219 Project: Kafka Issue Type: Bug Components: kraft Affects Versions: 3.0.0 Reporter: Ron Dagostino Assignee: Ron Dagostino The BrokerState metric always has a value of 0, for NOT_RUNNING, in KRaft clusters -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8613) Make Grace Period Mandatory for Windowed Operations in Streams
[ https://issues.apache.org/jira/browse/KAFKA-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Israel Ekpo updated KAFKA-8613: --- Description: Currently, the grace period is set to retention time if the grace period is not specified explicitly. The reason for setting the default grace period to retention time was backward compatibility. Topologies that were implemented before the introduction of the grace period, added late arriving records to a window as long as the window existed, i.e., as long as its retention time was not elapsed. This unintuitive default grace period has already caused confusion among users. For the next major release, we should consider to set the default grace period to {{Duration.ZERO}}. or to make it a mandatory parameter. KIP-633 [https://cwiki.apache.org/confluence/x/Ho2NCg] was: Currently, the grace period is set to retention time if the grace period is not specified explicitly. The reason for setting the default grace period to retention time was backward compatibility. Topologies that were implemented before the introduction of the grace period, added late arriving records to a window as long as the window existed, i.e., as long as its retention time was not elapsed. This unintuitive default grace period has already caused confusion among users. For the next major release, we should consider to set the default grace period to {{Duration.ZERO}}. or to make it a mandatory parameter. KIP-633 [https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams] > Make Grace Period Mandatory for Windowed Operations in Streams > -- > > Key: KAFKA-8613 > URL: https://issues.apache.org/jira/browse/KAFKA-8613 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Assignee: Israel Ekpo >Priority: Blocker > Labels: kip > Fix For: 3.0.0 > > > Currently, the grace period is set to retention time if the grace period is > not specified explicitly. The reason for setting the default grace period to > retention time was backward compatibility. Topologies that were implemented > before the introduction of the grace period, added late arriving records to a > window as long as the window existed, i.e., as long as its retention time was > not elapsed. > This unintuitive default grace period has already caused confusion among > users. > For the next major release, we should consider to set the default grace > period to {{Duration.ZERO}}. or to make it a mandatory parameter. > > KIP-633 > [https://cwiki.apache.org/confluence/x/Ho2NCg] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #11237: MINOR: Fix how the last borker id is computed
hachikuji merged pull request #11237: URL: https://github.com/apache/kafka/pull/11237 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request #11239: KAFKA-13219: BrokerState metric not working for KRaft clusters
rondagostino opened a new pull request #11239: URL: https://github.com/apache/kafka/pull/11239 The BrokerState metric always has a value of 0, for NOT_RUNNING, in KRaft clusters. This patch fixes it and adds a test. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] izzyacademy commented on a change in pull request #11235: KAFKA-13216: write correct tombstones into stream-stream join store changelog
izzyacademy commented on a change in pull request #11235: URL: https://github.com/apache/kafka/pull/11235#discussion_r692514791 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RawKeyAccessor.java ## @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals.metrics; + +import java.util.Collection; +import org.apache.kafka.common.utils.Bytes; + +public interface RawKeyAccessor { Review comment: @mjsax could we add a brief doc to explain what the interface is used for? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #11239: KAFKA-13219: BrokerState metric not working for KRaft clusters
ijuma commented on a change in pull request #11239: URL: https://github.com/apache/kafka/pull/11239#discussion_r692518398 ## File path: core/src/main/scala/kafka/server/KafkaBroker.scala ## @@ -85,7 +85,8 @@ trait KafkaBroker extends KafkaMetricsGroup { explicitMetricName(Server.MetricsPrefix, KafkaBroker.MetricsTypeName, name, metricTags) } - newGauge("BrokerState", () => brokerState.value) + // visible for testing + private[server] val brokerStateGauge = newGauge("BrokerState", () => brokerState.value) Review comment: Why do we need this versus the `brokerState` method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #11239: KAFKA-13219: BrokerState metric not working for KRaft clusters
ijuma commented on a change in pull request #11239: URL: https://github.com/apache/kafka/pull/11239#discussion_r692520138 ## File path: core/src/main/scala/kafka/server/BrokerServer.scala ## @@ -84,6 +84,8 @@ class BrokerServer( val supportedFeatures: util.Map[String, VersionRange] ) extends KafkaBroker { + override def brokerState: BrokerState = currentState() Review comment: This is a bit broken. If kraft doesn't use the `_brokerState` field, we should not have it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #11239: KAFKA-13219: BrokerState metric not working for KRaft clusters
ijuma commented on a change in pull request #11239: URL: https://github.com/apache/kafka/pull/11239#discussion_r692520138 ## File path: core/src/main/scala/kafka/server/BrokerServer.scala ## @@ -84,6 +84,8 @@ class BrokerServer( val supportedFeatures: util.Map[String, VersionRange] ) extends KafkaBroker { + override def brokerState: BrokerState = currentState() Review comment: This is a bit broken. If kraft doesn't use the `_brokerState` field, we should not have it in the superclass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11215: KAFKA-12994: Migrate TimeWindowsTest to new API
ableegoldman commented on pull request #11215: URL: https://github.com/apache/kafka/pull/11215#issuecomment-902302847 Yep don't worry about it, that test is known to be flaky and unrelated. And yeah, the ability to set reviewers is restricted to committers. You can just ping people in a comment on your PR for reviews -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11215: KAFKA-12994: Migrate TimeWindowsTest to new API
ableegoldman commented on a change in pull request #11215: URL: https://github.com/apache/kafka/pull/11215#discussion_r692546162 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java ## @@ -95,27 +91,19 @@ public void advanceIntervalMustNotBeLargerThanWindowSize() { @Test public void gracePeriodShouldEnforceBoundaries() { -TimeWindows.of(ofMillis(3L)).grace(ofMillis(0L)); +TimeWindows.ofSizeAndGrace(ofMillis(3L), ofMillis(0L)); try { -TimeWindows.of(ofMillis(3L)).grace(ofMillis(-1L)); +TimeWindows.ofSizeAndGrace(ofMillis(3L), ofMillis(-1L)); fail("should not accept negatives"); } catch (final IllegalArgumentException e) { //expected } } -@Test -public void oldAPIShouldSetDefaultGracePeriod() { -assertEquals(Duration.ofDays(1).toMillis(), DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD); -assertEquals(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - 3L, TimeWindows.of(ofMillis(3L)).gracePeriodMs()); -assertEquals(0L, TimeWindows.of(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD)).gracePeriodMs()); -assertEquals(0L, TimeWindows.of(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD + 1L)).gracePeriodMs()); -} - Review comment: Same here: we should leave this test here until we remove the deprecated API. (and just suppress the warnings for only this test method) ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java ## @@ -19,50 +19,46 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.junit.Test; -import java.time.Duration; import java.util.Map; import static java.time.Duration.ofMillis; import static org.apache.kafka.streams.EqualityCheck.verifyEquality; import static org.apache.kafka.streams.EqualityCheck.verifyInEquality; -import static org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; -@SuppressWarnings("deprecation") public class TimeWindowsTest { private static final long ANY_SIZE = 123L; private static final long ANY_GRACE = 1024L; @Test public void shouldSetWindowSize() { -assertEquals(ANY_SIZE, TimeWindows.of(ofMillis(ANY_SIZE)).sizeMs); Review comment: Let's leave this one in here, as long as we still support a deprecated API we should continue to test that it works. We just shouldn't use deprecated APIs to test other functionality unrelated to the API itself (eg that an exception is thrown for window size of 0). Then we can restrict the scope of the deprecation warning to just this one test, rather than the entire class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #11239: KAFKA-13219: BrokerState metric not working for KRaft clusters
rondagostino commented on a change in pull request #11239: URL: https://github.com/apache/kafka/pull/11239#discussion_r692561312 ## File path: core/src/main/scala/kafka/server/BrokerServer.scala ## @@ -84,6 +84,8 @@ class BrokerServer( val supportedFeatures: util.Map[String, VersionRange] ) extends KafkaBroker { + override def brokerState: BrokerState = currentState() Review comment: Yeah, the same thought occurred to me, but I wanted to get a PR out there ASAP. Will fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #11239: KAFKA-13219: BrokerState metric not working for KRaft clusters
rondagostino commented on a change in pull request #11239: URL: https://github.com/apache/kafka/pull/11239#discussion_r692561636 ## File path: core/src/main/scala/kafka/server/KafkaBroker.scala ## @@ -85,7 +85,8 @@ trait KafkaBroker extends KafkaMetricsGroup { explicitMetricName(Server.MetricsPrefix, KafkaBroker.MetricsTypeName, name, metricTags) } - newGauge("BrokerState", () => brokerState.value) + // visible for testing + private[server] val brokerStateGauge = newGauge("BrokerState", () => brokerState.value) Review comment: Agreed, we don't need it; `ServerStartupTest.testBrokerStateRunningAfterZK()` goes directly against the brokerState method. Will fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13216) Streams left/outer joins cause new internal changelog topic to grow unbounded
[ https://issues.apache.org/jira/browse/KAFKA-13216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13216: Affects Version/s: (was: 3.0.0) > Streams left/outer joins cause new internal changelog topic to grow unbounded > - > > Key: KAFKA-13216 > URL: https://issues.apache.org/jira/browse/KAFKA-13216 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sergio Peña >Priority: Blocker > Fix For: 3.0.0 > > > This bug is caused by the improvements made in > https://issues.apache.org/jira/browse/KAFKA-10847, which fixes an issue with > stream-stream left/outer joins. The issue is only caused when a stream-stream > left/outer join is used with the new `JoinWindows.ofTimeDifferenceAndGrace()` > API that specifies the window time + grace period. This new API was added in > AK 3.0. No previous users are affected. > The issue causes that the internal changelog topic used by the new > OUTERSHARED window store keeps growing unbounded as new records come. The > topic is never cleaned up nor compacted even if tombstones are written to > delete the joined and/or expired records from the window store. The problem > is caused by a parameter required in the window store to retain duplicates. > This config causes that tombstones records have a new sequence ID as part of > the key ID in the changelog making those keys unique. Thus causing the > cleanup policy not working. > In 3.0, we deprecated {{JoinWindows.of(size)}} in favor of > {{JoinWindows.ofTimeDifferenceAndGrace()}} -- the old API uses the old > semantics and is thus not affected while the new API enable the new > semantics; the problem is that we deprecated the old API and thus tell users > that they should switch to the new broken API. > We have two ways forward: > * Fix the bug (non trivial) > * Un-deprecate the old {{JoinWindow.of(size)}} API (and tell users not to > use the new but broken API) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13216) Streams left/outer joins cause new internal changelog topic to grow unbounded
[ https://issues.apache.org/jira/browse/KAFKA-13216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13216: Priority: Critical (was: Blocker) > Streams left/outer joins cause new internal changelog topic to grow unbounded > - > > Key: KAFKA-13216 > URL: https://issues.apache.org/jira/browse/KAFKA-13216 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sergio Peña >Priority: Critical > Fix For: 3.1.0 > > > This bug is caused by the improvements made in > https://issues.apache.org/jira/browse/KAFKA-10847, which fixes an issue with > stream-stream left/outer joins. The issue is only caused when a stream-stream > left/outer join is used with the new `JoinWindows.ofTimeDifferenceAndGrace()` > API that specifies the window time + grace period. This new API was added in > AK 3.0. No previous users are affected. > The issue causes that the internal changelog topic used by the new > OUTERSHARED window store keeps growing unbounded as new records come. The > topic is never cleaned up nor compacted even if tombstones are written to > delete the joined and/or expired records from the window store. The problem > is caused by a parameter required in the window store to retain duplicates. > This config causes that tombstones records have a new sequence ID as part of > the key ID in the changelog making those keys unique. Thus causing the > cleanup policy not working. > In 3.0, we deprecated {{JoinWindows.of(size)}} in favor of > {{JoinWindows.ofTimeDifferenceAndGrace()}} -- the old API uses the old > semantics and is thus not affected while the new API enable the new > semantics; the problem is that we deprecated the old API and thus tell users > that they should switch to the new broken API. > We have two ways forward: > * Fix the bug (non trivial) > * Un-deprecate the old {{JoinWindow.of(size)}} API (and tell users not to > use the new but broken API) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13216) Streams left/outer joins cause new internal changelog topic to grow unbounded
[ https://issues.apache.org/jira/browse/KAFKA-13216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401940#comment-17401940 ] Matthias J. Sax commented on KAFKA-13216: - We disabled the feature in 3.0.0, and will fix forward in 3.1.0 > Streams left/outer joins cause new internal changelog topic to grow unbounded > - > > Key: KAFKA-13216 > URL: https://issues.apache.org/jira/browse/KAFKA-13216 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sergio Peña >Priority: Critical > Fix For: 3.1.0 > > > This bug is caused by the improvements made in > https://issues.apache.org/jira/browse/KAFKA-10847, which fixes an issue with > stream-stream left/outer joins. The issue is only caused when a stream-stream > left/outer join is used with the new `JoinWindows.ofTimeDifferenceAndGrace()` > API that specifies the window time + grace period. This new API was added in > AK 3.0. No previous users are affected. > The issue causes that the internal changelog topic used by the new > OUTERSHARED window store keeps growing unbounded as new records come. The > topic is never cleaned up nor compacted even if tombstones are written to > delete the joined and/or expired records from the window store. The problem > is caused by a parameter required in the window store to retain duplicates. > This config causes that tombstones records have a new sequence ID as part of > the key ID in the changelog making those keys unique. Thus causing the > cleanup policy not working. > In 3.0, we deprecated {{JoinWindows.of(size)}} in favor of > {{JoinWindows.ofTimeDifferenceAndGrace()}} -- the old API uses the old > semantics and is thus not affected while the new API enable the new > semantics; the problem is that we deprecated the old API and thus tell users > that they should switch to the new broken API. > We have two ways forward: > * Fix the bug (non trivial) > * Un-deprecate the old {{JoinWindow.of(size)}} API (and tell users not to > use the new but broken API) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13216) Streams left/outer joins cause new internal changelog topic to grow unbounded
[ https://issues.apache.org/jira/browse/KAFKA-13216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13216: Fix Version/s: (was: 3.0.0) 3.1.0 > Streams left/outer joins cause new internal changelog topic to grow unbounded > - > > Key: KAFKA-13216 > URL: https://issues.apache.org/jira/browse/KAFKA-13216 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sergio Peña >Priority: Blocker > Fix For: 3.1.0 > > > This bug is caused by the improvements made in > https://issues.apache.org/jira/browse/KAFKA-10847, which fixes an issue with > stream-stream left/outer joins. The issue is only caused when a stream-stream > left/outer join is used with the new `JoinWindows.ofTimeDifferenceAndGrace()` > API that specifies the window time + grace period. This new API was added in > AK 3.0. No previous users are affected. > The issue causes that the internal changelog topic used by the new > OUTERSHARED window store keeps growing unbounded as new records come. The > topic is never cleaned up nor compacted even if tombstones are written to > delete the joined and/or expired records from the window store. The problem > is caused by a parameter required in the window store to retain duplicates. > This config causes that tombstones records have a new sequence ID as part of > the key ID in the changelog making those keys unique. Thus causing the > cleanup policy not working. > In 3.0, we deprecated {{JoinWindows.of(size)}} in favor of > {{JoinWindows.ofTimeDifferenceAndGrace()}} -- the old API uses the old > semantics and is thus not affected while the new API enable the new > semantics; the problem is that we deprecated the old API and thus tell users > that they should switch to the new broken API. > We have two ways forward: > * Fix the bug (non trivial) > * Un-deprecate the old {{JoinWindow.of(size)}} API (and tell users not to > use the new but broken API) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-13216) Streams left/outer joins cause new internal changelog topic to grow unbounded
[ https://issues.apache.org/jira/browse/KAFKA-13216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401940#comment-17401940 ] Matthias J. Sax edited comment on KAFKA-13216 at 8/20/21, 1:04 AM: --- We disabled the feature in 3.0.0, and will fix forward in 3.1.0: [https://github.com/apache/kafka/pull/11233] was (Author: mjsax): We disabled the feature in 3.0.0, and will fix forward in 3.1.0 > Streams left/outer joins cause new internal changelog topic to grow unbounded > - > > Key: KAFKA-13216 > URL: https://issues.apache.org/jira/browse/KAFKA-13216 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sergio Peña >Priority: Critical > Fix For: 3.1.0 > > > This bug is caused by the improvements made in > https://issues.apache.org/jira/browse/KAFKA-10847, which fixes an issue with > stream-stream left/outer joins. The issue is only caused when a stream-stream > left/outer join is used with the new `JoinWindows.ofTimeDifferenceAndGrace()` > API that specifies the window time + grace period. This new API was added in > AK 3.0. No previous users are affected. > The issue causes that the internal changelog topic used by the new > OUTERSHARED window store keeps growing unbounded as new records come. The > topic is never cleaned up nor compacted even if tombstones are written to > delete the joined and/or expired records from the window store. The problem > is caused by a parameter required in the window store to retain duplicates. > This config causes that tombstones records have a new sequence ID as part of > the key ID in the changelog making those keys unique. Thus causing the > cleanup policy not working. > In 3.0, we deprecated {{JoinWindows.of(size)}} in favor of > {{JoinWindows.ofTimeDifferenceAndGrace()}} -- the old API uses the old > semantics and is thus not affected while the new API enable the new > semantics; the problem is that we deprecated the old API and thus tell users > that they should switch to the new broken API. > We have two ways forward: > * Fix the bug (non trivial) > * Un-deprecate the old {{JoinWindow.of(size)}} API (and tell users not to > use the new but broken API) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13218) kafka deleted unexpired message unexpectedly
[ https://issues.apache.org/jira/browse/KAFKA-13218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401943#comment-17401943 ] Haruki Okada commented on KAFKA-13218: -- One possible cause I can imagine is because logs are rolled based on record's timestamp. [https://kafka.apache.org/documentation/#upgrade_10_1_breaking] If your producer supplies timestamp wrongly, such phenomenon could happen. > kafka deleted unexpired message unexpectedly > > > Key: KAFKA-13218 > URL: https://issues.apache.org/jira/browse/KAFKA-13218 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.7.0 > Environment: docker file : > from openjdk:11-jre-slim-buster > RUN apt-get update > RUN apt-get -y install net-tools iputils-ping curl procps > RUN curl -OL > https://mirrors.bfsu.edu.cn/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz && tar > -xzf kafka_2.13-2.7.0.tgz && rm -f kafka_2.13-2.7.0.tgz > ENV PATH "$PATH:/kafka_2.13-2.7.0/bin" > RUN mkdir /etc/kafka > COPY server.properties /etc/kafka/server.properties > CMD ["kafka-server-start.sh", "/etc/kafka/server.properties"] > configure file: > broker.id=2 > log.dirs=/var/lib/kafka > log.segment.bytes=10485760 > zookeeper.connect=zk-cs.default.svc.cluster.local:2181 > sasl.enabled.mechanisms=PLAIN > sasl.mechanism.inter.broker.protocol=PLAIN > inter.broker.listener.name=INTERNAL > listener.security.protocol.map=INTERNAL:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT > listeners=INTERNAL://:9092,EXTERNAL://:30101 > advertised.listeners=INTERNAL://kafka-2.kafka.default.svc.cluster.local:9092,EXTERNAL://192.168.0.13:30101 >Reporter: leiminghany >Priority: Blocker > > I created a topic like this : > > {code:java} > kafka-topics.sh --create --zookeeper zk-cs.default.svc.cluster.local:2181 > --partitions 64 --replication-factor 2 --topic signal --config > retention.ms=6048000{code} > and then I send several message into partition 2 of this topic . > > after than, I try to consumer the message from this partiton, but I can't get > any messages. > I read the kafka data directory, I found the log file was rolled, here is > the files: > > {code:java} > root@kafka-2:/var/lib/kafka/signal-2# ls > 0005.index 0005.log > 0005.snapshot 0005.timeindex > leader-epoch-checkpoint > {code} > and the dump info is : > > > {code:java} > root@kafka-2:/var/lib/kafka/signal-2# kafka-run-class.sh > kafka.tools.DumpLogSegments --deep-iteration --files 0005.log > Dumping 0005.log > Starting offset: 5 > root@kafka-2:/var/lib/kafka/signal-2# > root@kafka-2:/var/lib/kafka/signal-2# kafka-run-class.sh > kafka.tools.DumpLogSegments --deep-iteration --files > 0005.index > Dumping 0005.index > root@kafka-2:/var/lib/kafka/signal-2# kafka-run-class.sh > kafka.tools.DumpLogSegments --deep-iteration --files > 0005.snapshot > Dumping 0005.snapshot > root@kafka-2:/var/lib/kafka/signal-2# kafka-run-class.sh > kafka.tools.DumpLogSegments --deep-iteration --files > 0005.timeindex > Dumping 0005.timeindex > timestamp: 0 offset: 5 > The following indexed offsets are not found in the log. > Indexed offset: 5, found log offset: -1 > root@kafka-2:/var/lib/kafka/signal-2# cat leader-epoch-checkpoint > 0 > 1 > 0 5 > {code} > > here is the kafka console log about this partition: > > {code:java} > [2021-08-18 12:04:57,652] INFO [ProducerStateManager partition=signal-2] > Writing producer snapshot at offset 5 (kafka.log.ProducerStateManager) > [2021-08-18 12:04:57,653] INFO [Log partition=signal-2, dir=/var/lib/kafka] > Rolled new log segment at offset 5 in 7 ms. (kafka.log.Log) > [2021-08-18 12:04:57,653] INFO [Log partition=signal-2, dir=/var/lib/kafka] > Deleting segment LogSegment(baseOffset=0, size=318, > lastModifiedTime=1629288220552, largestRecordTimestamp=Some(0)) due to > retention time 6048000ms breach based on the largest record timestamp in > the segment (kafka.log.Log) > [2021-08-18 12:04:57,653] INFO [Log partition=signal-2, dir=/var/lib/kafka] > Incremented log start offset to 5 due to segment deletion (kafka.log.Log) > [2021-08-18 12:05:57,671] INFO [Log partition=signal-2, dir=/var/lib/kafka] > Deleting segment files LogSegment(baseOffset=0, size=318, > lastModifiedTime=1629288220552, largestRecordTimestamp=Some(0)) > (kafka.log.Log) > [2021-08-18 12:05:57,672] INFO Deleted log > /var/lib/kafka/signal-2/.log.deleted. > (kafka.log.LogSegment) > [2021-08-18 12:05:57,672] INFO Deleted offset index > /var/lib/kafka/signal-2/.index.deleted. > (kafka.log.LogSegment) > [2021-08-18 12:05:57,673] INFO
[GitHub] [kafka] vvcephei commented on pull request #11236: MINOR: Set session timeout back to 10s for Streams system tests
vvcephei commented on pull request #11236: URL: https://github.com/apache/kafka/pull/11236#issuecomment-902390477 System test results: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-08-19--001.system-test-kafka-branch-builder--1629415396--vvcephei--MINOR-decrease-session-timeout-streams-systest--4947e6f130/report.html None of the streams tests failed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13211) fetch queries with open endpoints for WindowStore
[ https://issues.apache.org/jira/browse/KAFKA-13211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-13211: - Assignee: Luke Chen > fetch queries with open endpoints for WindowStore > - > > Key: KAFKA-13211 > URL: https://issues.apache.org/jira/browse/KAFKA-13211 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13212) fetch/findSessions queries with open endpoints for SessionStore
[ https://issues.apache.org/jira/browse/KAFKA-13212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-13212: -- Summary: fetch/findSessions queries with open endpoints for SessionStore (was: findSessions queries with open endpoints for SessionStore) > fetch/findSessions queries with open endpoints for SessionStore > --- > > Key: KAFKA-13212 > URL: https://issues.apache.org/jira/browse/KAFKA-13212 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13212) findSessions queries with open endpoints for SessionStore
[ https://issues.apache.org/jira/browse/KAFKA-13212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-13212: - Assignee: Luke Chen > findSessions queries with open endpoints for SessionStore > - > > Key: KAFKA-13212 > URL: https://issues.apache.org/jira/browse/KAFKA-13212 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #11236: MINOR: Set session timeout back to 10s for Streams system tests
vvcephei commented on pull request #11236: URL: https://github.com/apache/kafka/pull/11236#issuecomment-902407705 Thanks, @ableegoldman ! I agree it might be overkill, and I wouldn't fault the conservative approach. The reason I went for a blanket approach is that it seems like 30 seconds is an awfully long time to wait for a session timeout in these scenarios, when we expect the full test run to complete within a couple of minutes. It seems like network glitches are pretty common when we run these, so I'm afraid we'll just be playing whack-a-mole by fixing individual tests as they time out due to this setting change. Thanks for the reference to your other PR! I just verified that we are now passing in the decreased session interval to that class via the properties file, so it should no longer be necessary to set it in the java source. I've reverted that commit in this PR, and am re-running the build just to be on the safe side. Once it passes, I'll go ahead and merge it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yangdaixai opened a new pull request #11240: KAFKA-13175; test
yangdaixai opened a new pull request #11240: URL: https://github.com/apache/kafka/pull/11240 test Signed-off-by: yangdaixai *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonyanwenl opened a new pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper
jasonyanwenl opened a new pull request #11241: URL: https://github.com/apache/kafka/pull/11241 Currently in both `KStreamMap` and `KStreamFlatMap` classes, they will throw NPE if the call to `KeyValueMapper#apply` return Null. We should check whether the result of that call is Null and throw a more meaningful error message for better debugging. Two unit tests are also added to check if we successfully captured the Null. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #11214: KAFKA-12994 Migrate JoinWindowsTest and SessionWindowsTest to new API
clolov commented on pull request #11214: URL: https://github.com/apache/kafka/pull/11214#issuecomment-902458908 @ableegoldman - I am kindly requesting a review :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yangdaixai closed pull request #11240: KAFKA-13175; test
yangdaixai closed pull request #11240: URL: https://github.com/apache/kafka/pull/11240 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yangdaixai commented on pull request #11226: KAFKA-13175; Optimization TopicExistsException,When a topic is marked…
yangdaixai commented on pull request #11226: URL: https://github.com/apache/kafka/pull/11226#issuecomment-902474644 @ijuma -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org