[GitHub] [kafka] C0urante opened a new pull request, #13137: KAFKA-15086: Intra-cluster communication for MirrorMaker 2
C0urante opened a new pull request, #13137: URL: https://github.com/apache/kafka/pull/13137 [Jira](https://issues.apache.org/jira/browse/KAFKA-10586) Implements the internal REST API changes described in [KIP-710](https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters). A lot of the diff here comes from shuffling existing parts of the code base into new parts, without significantly altering them. This is necessary in order to make the current REST API logic for Kafka Connect more extensible and reusable, which in turn allows us to add logic for the dedicated Mirror Maker 2 REST API with less effort. Shuffled but not extensively rewritten portions include: - `WorkerConfig` properties being extracted into the new `RestServerConfig` class - Pulling out some REST forwarding logic previously internal to the `ConnectorsResource` class into the new `HerderRequestHandler` class - Pulling out all REST logic for internal endpoints from the `ConnectorsResource` class into the new `InternalClusterResource` class These changes should not affect the docs generated by, e.g., the `DistributedConfig` class, but should allow other `AbstractConfig` classes to easily add properties related to the (public- or internal-facing) Connect REST API. An integration test is also added that runs a multi-node, dedicated Mirror Maker 2 cluster with exactly-once support enabled in order to test out both code paths related to intra-cluster communication. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2
C0urante commented on PR #13137: URL: https://github.com/apache/kafka/pull/13137#issuecomment-1398797302 @gharris1727 @viktorsomogyi @mimaison would you mind taking a look at this when you have a moment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request, #13138: MINOR: Small cleanups in refactored consumer implementation
hachikuji opened a new pull request, #13138: URL: https://github.com/apache/kafka/pull/13138 This patch contains a few cleanups in the new refactored consumer logic: - Use `CompletableFuture` instead of `RequestFuture` in `NetworkClientDelegate`. This is a much more extensible API and it avoids tying the new implementation to `ConsumerNetworkClient`. - Fix call to `isReady` in `NetworkClientDelegate`. We need the call to `ready` to initiate the connection. - Ensure backoff is enforced even after successful `FindCoordinator` request. This avoids a tight loop while metadata is converging after a coordinator change. - `RequestState` was incorrectly use the reconnect backoff as the retry backoff. In fact, we don't currently have a retry backoff max, so the use of `ExponentialBackoff` is unnecessary, but I've left it since we may add this in https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients. - Minor cleanups in test cases to avoid unused classes/fields. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #13138: MINOR: Small cleanups in refactored consumer implementation
philipnee commented on code in PR #13138: URL: https://github.com/apache/kafka/pull/13138#discussion_r1082970121 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -97,8 +97,8 @@ private void trySend(final long currentTimeMs) { unsent.timer.update(currentTimeMs); if (unsent.timer.isExpired()) { iterator.remove(); -unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException( -"Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms."))); +unsent.callback.onFailure(new TimeoutException( +"Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms.")); Review Comment: maybe ` "Failed to send request after " + unsent.timer.timeoutMs() + " ms."` I'm not sure why I added an extra `+ " "` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions
kirktrue commented on code in PR #12813: URL: https://github.com/apache/kafka/pull/12813#discussion_r1082997576 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -641,7 +643,7 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons // thus it is not safe to reassign the sequence. failBatch(batch, response, batch.attempts() < this.retries); } -if (error.exception() instanceof InvalidMetadataException) { +if (error.exception() instanceof InvalidMetadataException || error.exception() instanceof TimeoutException) { Review Comment: I believe it is necessary and added a comment to the code to explain why. LMK if that makes sense. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions
kirktrue commented on code in PR #12813: URL: https://github.com/apache/kafka/pull/12813#discussion_r1082997268 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -552,9 +552,11 @@ private void handleProduceResponse(ClientResponse response, Map
[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions
kirktrue commented on code in PR #12813: URL: https://github.com/apache/kafka/pull/12813#discussion_r1082998126 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ## @@ -2519,7 +2519,7 @@ public void testInflightBatchesExpireOnDeliveryTimeout() throws InterruptedExcep Map responseMap = new HashMap<>(); responseMap.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L)); -client.respond(new ProduceResponse(responseMap)); +client.respond(new ProduceResponse(responseMap), true, true); Review Comment: Ah! I reverted that change and added a dedicated `testMetadataRefreshOnRequestTimeout` test method to `SenderTest`. Let me know if that new test is sufficient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13138: MINOR: Small cleanups in refactored consumer implementation
hachikuji commented on code in PR #13138: URL: https://github.com/apache/kafka/pull/13138#discussion_r1083069990 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -97,8 +97,8 @@ private void trySend(final long currentTimeMs) { unsent.timer.update(currentTimeMs); if (unsent.timer.isExpired()) { iterator.remove(); -unsent.callback.ifPresent(c -> c.onFailure(new TimeoutException( -"Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms."))); +unsent.callback.onFailure(new TimeoutException( +"Failed to send request after " + unsent.timer.timeoutMs() + " " + "ms.")); Review Comment: Ack. Wil fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #13134: MINOR: fix flaky StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription
ableegoldman commented on PR #13134: URL: https://github.com/apache/kafka/pull/13134#issuecomment-1398999167 It's just timing out, there's no error beyond that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #13134: MINOR: fix flaky StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription
ableegoldman merged PR #13134: URL: https://github.com/apache/kafka/pull/13134 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #13134: MINOR: fix flaky StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription
ableegoldman commented on PR #13134: URL: https://github.com/apache/kafka/pull/13134#issuecomment-1399019480 Merged to trunk and cherrypicked to 3.4 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #13132: MINOR: fix warnings in Streams javadocs
ableegoldman merged PR #13132: URL: https://github.com/apache/kafka/pull/13132 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee opened a new pull request, #13139: MINOR: Update outdate documentation for the SubscriptionState
philipnee opened a new pull request, #13139: URL: https://github.com/apache/kafka/pull/13139 The current documentation indicates two positions are tracked, but these positions were removed a few years ago. Now we use a single position to track the last consumed record. Updated the documentation to reflect to the current state. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request, #13140: KAFKA-14644: Process should crash after failure in Raft IO thread
hachikuji opened a new pull request, #13140: URL: https://github.com/apache/kafka/pull/13140 Unexpected errors caught in the Raft IO thread should cause the process to stop. This is similar to the handling of exceptions in the controller. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #13139: MINOR: Update outdate documentation for the SubscriptionState
guozhangwang merged PR #13139: URL: https://github.com/apache/kafka/pull/13139 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13140: KAFKA-14644: Process should crash after failure in Raft IO thread
cmccabe commented on code in PR #13140: URL: https://github.com/apache/kafka/pull/13140#discussion_r1083120749 ## core/src/main/scala/kafka/raft/RaftManager.scala: ## @@ -47,19 +47,27 @@ import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec, NON_ROUTA import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, LeaderAndEpoch, RaftClient, RaftConfig, RaftRequest, ReplicatedLog} import org.apache.kafka.server.common.serialization.RecordSerde import org.apache.kafka.server.util.KafkaScheduler +import org.apache.kafka.server.fault.FaultHandler import scala.jdk.CollectionConverters._ object KafkaRaftManager { class RaftIoThread( client: KafkaRaftClient[_], -threadNamePrefix: String +threadNamePrefix: String, +fatalFaultHandler: FaultHandler ) extends ShutdownableThread( name = threadNamePrefix + "-io-thread", isInterruptible = false ) { override def doWork(): Unit = { - client.poll() + try { +client.poll() + } catch { +case t: Throwable => + fatalFaultHandler.handleFault("Unexpected error in raft IO thread", t) Review Comment: suggest doing `throw fatalFaultHandler.handleFault(...)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13140: KAFKA-14644: Process should crash after failure in Raft IO thread
cmccabe commented on code in PR #13140: URL: https://github.com/apache/kafka/pull/13140#discussion_r1083122187 ## core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala: ## @@ -221,16 +225,23 @@ class RaftManagerTest { @Test def testUncaughtExceptionInIoThread(): Unit = { val raftClient = mock(classOf[KafkaRaftClient[String]]) -val ioThread = new RaftIoThread(raftClient, threadNamePrefix = "test-raft") +val faultHandler = mock(classOf[FaultHandler]) Review Comment: easier to just use `MockFaultHandler` you can then look at `MockFaultHandler.firstException` to at least see if it has the `Unexpected error in raft IO thread` text -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13138: MINOR: Small cleanups in refactored consumer implementation
guozhangwang commented on code in PR #13138: URL: https://github.com/apache/kafka/pull/13138#discussion_r1083125061 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ## @@ -149,11 +156,14 @@ private void onSuccessfulResponse(final FindCoordinatorResponseData.Coordinator coordinator.host(), coordinator.port()); log.info("Discovered group coordinator {}", coordinator); -coordinatorRequestState.reset(); +coordinatorRequestState.onSuccessfulAttempt(currentTimeMs); } -private void onFailedCoordinatorResponse(final Exception exception, final long currentTimeMs) { -coordinatorRequestState.updateLastFailedAttempt(currentTimeMs); +private void onFailedResponse( +final long currentTimeMs, +final Throwable exception +) { +coordinatorRequestState.onFailedAttempt(currentTimeMs); Review Comment: How about moving this to the end of the function as well, since `nonRetriableErrorHandler.handle(exception);` may take time? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -72,17 +73,16 @@ public NetworkClientDelegate( * @param currentTimeMs current time * @return a list of client response */ -public List poll(final long timeoutMs, final long currentTimeMs) { +public void poll(final long timeoutMs, final long currentTimeMs) { trySend(currentTimeMs); long pollTimeoutMs = timeoutMs; if (!unsentRequests.isEmpty()) { pollTimeoutMs = Math.min(retryBackoffMs, pollTimeoutMs); } -List res = this.client.poll(pollTimeoutMs, currentTimeMs); + +this.client.poll(pollTimeoutMs, currentTimeMs); checkDisconnects(); -wakeup(); Review Comment: We can also remove the function in https://github.com/apache/kafka/pull/13138/files#diff-a5be3b830b3475fd019bb6c218b7b81d5f5f25f77587f33e48c92c4dc4271ed5R168 as well right? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java: ## @@ -72,17 +73,16 @@ public NetworkClientDelegate( * @param currentTimeMs current time * @return a list of client response */ -public List poll(final long timeoutMs, final long currentTimeMs) { +public void poll(final long timeoutMs, final long currentTimeMs) { trySend(currentTimeMs); long pollTimeoutMs = timeoutMs; if (!unsentRequests.isEmpty()) { pollTimeoutMs = Math.min(retryBackoffMs, pollTimeoutMs); } -List res = this.client.poll(pollTimeoutMs, currentTimeMs); + +this.client.poll(pollTimeoutMs, currentTimeMs); checkDisconnects(); -wakeup(); Review Comment: NVM, the client is not exposed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions
guozhangwang commented on code in PR #12813: URL: https://github.com/apache/kafka/pull/12813#discussion_r1083134655 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ## @@ -549,7 +549,13 @@ private boolean awaitNodeReady(Node node, FindCoordinatorRequest.CoordinatorType private void handleProduceResponse(ClientResponse response, Map batches, long now) { RequestHeader requestHeader = response.requestHeader(); int correlationId = requestHeader.correlationId(); -if (response.wasDisconnected()) { +if (response.wasTimedOut()) { +log.trace("Cancelled request with header {} due to node {} being disconnected due to timeout", Review Comment: very nit: `due to the last request timed out`? just to distinguish from connection timeout. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue closed pull request #12945: KAFKA-14365: Refactor Fetcher to allow different implementations
kirktrue closed pull request #12945: KAFKA-14365: Refactor Fetcher to allow different implementations URL: https://github.com/apache/kafka/pull/12945 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13139: MINOR: Update outdate documentation for the SubscriptionState
philipnee commented on PR #13139: URL: https://github.com/apache/kafka/pull/13139#issuecomment-1399093410 Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request, #13141: MINOR: fix flaky integrations tests by using 60s default timeout for startup
ableegoldman opened a new pull request, #13141: URL: https://github.com/apache/kafka/pull/13141 The timeouts used for starting up Streams and waiting for the RUNNING state are all over the place across our integration tests, with some as low as 15s (which are unsurprisingly rather flaky). We use 60s as the default timeout for other APIs in the `IntegrationTestUtils` so we should do the same for `#startApplicationAndWaitUntilRunning` I also noticed that we have several versions of that exact API in `StreamsTestUtils`, so I migrated everyone over to the `IntegrationTestUtils#startApplicationAndWaitUntilRunning` and added a few overloads for ease of use, including one for single KafkaStreams apps and one for using the default timeout -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia opened a new pull request, #13142: KAFKA-14491: [2/N] Refactor RocksDB store open iterator management
vcrfxia opened a new pull request, #13142: URL: https://github.com/apache/kafka/pull/13142 This PR refactors how the list of open iterators for RocksDB stores is managed. Prior to this PR, the `openIterators` list was passed into the constructor for the iterators themselves, allowing `RocksDbIterator.close()` to remove the iterator from the `openIterators` list. After this PR, the iterators themselves will not know about lists of open iterators. Instead, a generic close callback is exposed, and it's the responsibility of the store that creates a new iterator to set the callback on the iterator, to ensure that closing an iterator removes the iterator from the list of open iterators. This refactor is desirable because it enables more flexible iterator lifecycle management. Building on top of this, RocksDBStore is updated with an option to allow the user (i.e., the caller of methods such as `range()` and `prefixScan()` which return iterators) to pass a custom `openIterators` list for the new iterator to be stored in. This will allow for a new Segments implementation where multiple Segments can share the same RocksDBStore instance, while having each Segment manage its own open iterators. (See for more.) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia opened a new pull request, #13143: KAFKA-14491: [3/N] Add logical key value segments
vcrfxia opened a new pull request, #13143: URL: https://github.com/apache/kafka/pull/13143 (This PR is stacked on https://github.com/apache/kafka/pull/13142. The first commit does not need to be reviewed separately.) Today's KeyValueSegments create a new RocksDB instance for each KeyValueSegment. This PR introduces an analogous LogicalKeyValueSegments implementation, with corresponding LogicalKeyValueSegment, which shares a single physical RocksDB instance across all "logical" segments. This will be used for the RocksDB versioned store implementation proposed in [KIP-889](https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13138: MINOR: Small cleanups in refactored consumer implementation
hachikuji commented on code in PR #13138: URL: https://github.com/apache/kafka/pull/13138#discussion_r1083199766 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ## @@ -149,11 +156,14 @@ private void onSuccessfulResponse(final FindCoordinatorResponseData.Coordinator coordinator.host(), coordinator.port()); log.info("Discovered group coordinator {}", coordinator); -coordinatorRequestState.reset(); +coordinatorRequestState.onSuccessfulAttempt(currentTimeMs); } -private void onFailedCoordinatorResponse(final Exception exception, final long currentTimeMs) { -coordinatorRequestState.updateLastFailedAttempt(currentTimeMs); +private void onFailedResponse( +final long currentTimeMs, +final Throwable exception +) { +coordinatorRequestState.onFailedAttempt(currentTimeMs); Review Comment: Hmm, I think it seems ok to log the time of the failed response. Any time spent in `nonRetriableErrorHandler.handle(exception)` can count toward the backoff. Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store
mjsax commented on code in PR #13126: URL: https://github.com/apache/kafka/pull/13126#discussion_r1083223501 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}. + * The value format is: + * + * + + , reverse-sorted by timestamp> + + * + * Negative {@code value_size} is used to indicate that the value stored is a tombstone, in order to + * distinguish from empty array which has {@code value_size} of zero. In practice, {@code value_size} + * is always set to -1 for the tombstone case, though this need not be true in general. + */ +final class RocksDBVersionedStoreSegmentValueFormatter { +private static final int TIMESTAMP_SIZE = 8; +private static final int VALUE_SIZE = 4; + +/** + * @return the validTo timestamp of the latest record in the provided segment + */ +static long getNextTimestamp(final byte[] segmentValue) { +return ByteBuffer.wrap(segmentValue).getLong(0); +} + +/** + * Returns whether the provided segment is "empty." An empty segment is one that + * contains only a single tombstone with no validTo timestamp specified. In this case, + * the serialized segment contains only the timestamp of the tombstone (stored as the segment's + * {@code nextTimestamp}) and nothing else. + * + * This can happen if, e.g., the only record inserted for a particular key is + * a tombstone. In this case, the tombstone must be stored in a segment + * (as the latest value store does not store tombstones), but also has no validTo + * timestamp associated with it. + * + * @return whether the segment is "empty" + */ +static boolean isEmpty(final byte[] segmentValue) { +return segmentValue.length <= TIMESTAMP_SIZE; +} + +/** + * Requires that the segment is not empty. Caller is responsible for verifying that this + * is the case before calling this method. + * + * @return the timestamp of the earliest record in the provided segment. + */ +static long getMinTimestamp(final byte[] segmentValue) { +return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE); +} + +/** + * @return the deserialized segment value + */ +static SegmentValue deserialize(final byte[] segmentValue) { +return new PartiallyDeserializedSegmentValue(segmentValue); +} + +/** + * Creates a new segment value that contains the provided record. + * + * @param value the record value + * @param validFrom the record's timestamp + * @param validTo the record's validTo timestamp + * @return the newly created segment value + */ +static SegmentValue newSegmentValueWithRecord( +final byte[] value, final long validFrom, final long validTo) { +return new PartiallyDeserializedSegmentValue(value, validFrom, validTo); +} + +/** + * Creates a new empty segment value. + * + * @param timestamp the timestamp of the tombstone for this empty segment value + * @return the newly created segment value + */ +static SegmentValue newSegmentValueWithTombstone(final long timestamp) { +return new PartiallyDeserializedSegmentValue(timestamp); +} + +interface SegmentValue { + +/** + * @return whether the segment is empty. See + * {@link RocksDBVersionedStoreSegmentValueFormatter#isEmpty(byte[])} for details. + */ +boolean isEmpty(); + +/** + * Finds the latest record in this segment with timestamp not exceeding the provided + * timestamp bound. This method requires that the provided timestamp bound exists in + * this segment, i.e., the segment is not empty, and the provided timestamp bound is + * at least minTimestamp and is smaller than nextTimestamp. +
[GitHub] [kafka] mjsax commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store
mjsax commented on code in PR #13126: URL: https://github.com/apache/kafka/pull/13126#discussion_r1083224137 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}. + * The value format is: + * + * + + , reverse-sorted by timestamp> + + * + * Negative {@code value_size} is used to indicate that the value stored is a tombstone, in order to + * distinguish from empty array which has {@code value_size} of zero. In practice, {@code value_size} + * is always set to -1 for the tombstone case, though this need not be true in general. + */ +final class RocksDBVersionedStoreSegmentValueFormatter { +private static final int TIMESTAMP_SIZE = 8; +private static final int VALUE_SIZE = 4; + +/** + * @return the validTo timestamp of the latest record in the provided segment + */ +static long getNextTimestamp(final byte[] segmentValue) { +return ByteBuffer.wrap(segmentValue).getLong(0); +} + +/** + * Returns whether the provided segment is "empty." An empty segment is one that + * contains only a single tombstone with no validTo timestamp specified. In this case, + * the serialized segment contains only the timestamp of the tombstone (stored as the segment's + * {@code nextTimestamp}) and nothing else. + * + * This can happen if, e.g., the only record inserted for a particular key is + * a tombstone. In this case, the tombstone must be stored in a segment + * (as the latest value store does not store tombstones), but also has no validTo + * timestamp associated with it. + * + * @return whether the segment is "empty" + */ +static boolean isEmpty(final byte[] segmentValue) { +return segmentValue.length <= TIMESTAMP_SIZE; +} + +/** + * Requires that the segment is not empty. Caller is responsible for verifying that this + * is the case before calling this method. + * + * @return the timestamp of the earliest record in the provided segment. + */ +static long getMinTimestamp(final byte[] segmentValue) { +return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE); +} + +/** + * @return the deserialized segment value + */ +static SegmentValue deserialize(final byte[] segmentValue) { +return new PartiallyDeserializedSegmentValue(segmentValue); +} + +/** + * Creates a new segment value that contains the provided record. + * + * @param value the record value + * @param validFrom the record's timestamp + * @param validTo the record's validTo timestamp + * @return the newly created segment value + */ +static SegmentValue newSegmentValueWithRecord( +final byte[] value, final long validFrom, final long validTo) { +return new PartiallyDeserializedSegmentValue(value, validFrom, validTo); +} + +/** + * Creates a new empty segment value. + * + * @param timestamp the timestamp of the tombstone for this empty segment value + * @return the newly created segment value + */ +static SegmentValue newSegmentValueWithTombstone(final long timestamp) { +return new PartiallyDeserializedSegmentValue(timestamp); +} + +interface SegmentValue { + +/** + * @return whether the segment is empty. See + * {@link RocksDBVersionedStoreSegmentValueFormatter#isEmpty(byte[])} for details. + */ +boolean isEmpty(); + +/** + * Finds the latest record in this segment with timestamp not exceeding the provided + * timestamp bound. This method requires that the provided timestamp bound exists in + * this segment, i.e., the segment is not empty, and the provided timestamp bound is + * at least minTimestamp and is smaller than nextTimestamp. +
[GitHub] [kafka] mjsax commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store
mjsax commented on code in PR #13126: URL: https://github.com/apache/kafka/pull/13126#discussion_r1081924210 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}. + * The value format is: + * + * + + , reverse-sorted by timestamp> + + * + * Negative {@code value_size} is used to indicate that the value stored is a tombstone, in order to + * distinguish from empty array which has {@code value_size} of zero. In practice, {@code value_size} + * is always set to -1 for the tombstone case, though this need not be true in general. + */ +final class RocksDBVersionedStoreSegmentValueFormatter { +private static final int TIMESTAMP_SIZE = 8; +private static final int VALUE_SIZE = 4; + +/** + * @return the validTo timestamp of the latest record in the provided segment + */ +static long getNextTimestamp(final byte[] segmentValue) { +return ByteBuffer.wrap(segmentValue).getLong(0); +} + +/** + * Returns whether the provided segment is "empty." An empty segment is one that + * contains only a single tombstone with no validTo timestamp specified. In this case, + * the serialized segment contains only the timestamp of the tombstone (stored as the segment's + * {@code nextTimestamp}) and nothing else. + * + * This can happen if, e.g., the only record inserted for a particular key is + * a tombstone. In this case, the tombstone must be stored in a segment + * (as the latest value store does not store tombstones), but also has no validTo + * timestamp associated with it. + * + * @return whether the segment is "empty" + */ +static boolean isEmpty(final byte[] segmentValue) { +return segmentValue.length <= TIMESTAMP_SIZE; +} + +/** + * Requires that the segment is not empty. Caller is responsible for verifying that this + * is the case before calling this method. + * + * @return the timestamp of the earliest record in the provided segment. + */ +static long getMinTimestamp(final byte[] segmentValue) { +return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE); +} + +/** + * @return the deserialized segment value + */ +static SegmentValue deserialize(final byte[] segmentValue) { +return new PartiallyDeserializedSegmentValue(segmentValue); +} + +/** + * Creates a new segment value that contains the provided record. + * + * @param value the record value + * @param validFrom the record's timestamp + * @param validTo the record's validTo timestamp + * @return the newly created segment value + */ +static SegmentValue newSegmentValueWithRecord( +final byte[] value, final long validFrom, final long validTo) { +return new PartiallyDeserializedSegmentValue(value, validFrom, validTo); +} + +/** + * Creates a new empty segment value. + * + * @param timestamp the timestamp of the tombstone for this empty segment value + * @return the newly created segment value + */ +static SegmentValue newSegmentValueWithTombstone(final long timestamp) { +return new PartiallyDeserializedSegmentValue(timestamp); +} + +interface SegmentValue { + +/** + * @return whether the segment is empty. See + * {@link RocksDBVersionedStoreSegmentValueFormatter#isEmpty(byte[])} for details. + */ +boolean isEmpty(); + +/** + * Finds the latest record in this segment with timestamp not exceeding the provided + * timestamp bound. This method requires that the provided timestamp bound exists in + * this segment, i.e., the segment is not empty, and the provided timestamp bound is + * at least minTimestamp and is smaller than nextTimestamp. +
[GitHub] [kafka] mjsax commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store
mjsax commented on code in PR #13126: URL: https://github.com/apache/kafka/pull/13126#discussion_r1083231834 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}. + * The value format is: + * + * + + , reverse-sorted by timestamp> + + * + * Negative {@code value_size} is used to indicate that the value stored is a tombstone, in order to + * distinguish from empty array which has {@code value_size} of zero. In practice, {@code value_size} + * is always set to -1 for the tombstone case, though this need not be true in general. + */ +final class RocksDBVersionedStoreSegmentValueFormatter { +private static final int TIMESTAMP_SIZE = 8; +private static final int VALUE_SIZE = 4; + +/** + * @return the validTo timestamp of the latest record in the provided segment + */ +static long getNextTimestamp(final byte[] segmentValue) { +return ByteBuffer.wrap(segmentValue).getLong(0); +} + +/** + * Returns whether the provided segment is "empty." An empty segment is one that + * contains only a single tombstone with no validTo timestamp specified. In this case, + * the serialized segment contains only the timestamp of the tombstone (stored as the segment's + * {@code nextTimestamp}) and nothing else. + * + * This can happen if, e.g., the only record inserted for a particular key is + * a tombstone. In this case, the tombstone must be stored in a segment + * (as the latest value store does not store tombstones), but also has no validTo + * timestamp associated with it. Review Comment: Thanks for the explanation. It seems to be "weird" edge case, because `nextTimestamp` is usually the "largest validTo" of the segment, but we not store the "validFrom" of the tombstone in it. In general, I prefer to have some "invariant" as it makes it simpler to reason about the code, but introducing this edge case void the invariant that `nextTimestamp` is the "largest validTo" of the segment. Maybe it's ok to go with the current code, but it seems we need to clearly document this in the top level JavaDoc -- otherwise it might be very hard to understand the code because a empty segment is encoded differently to a non-empty segment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yufeiyan1220 commented on pull request #13125: KAFKA-14626: Kafka Consumer Coordinator does not cleanup all metrics after shutdown
yufeiyan1220 commented on PR #13125: URL: https://github.com/apache/kafka/pull/13125#issuecomment-1399206848 > Seems like we aren't particularly consistent at removing these metrics and sensors, fetcher would be another example. Mind making the clean up more comprehensive? I have made the change that is ensure all metrics about consumer are removed after shutting down. When it comes to producer, I found that all producer metrics are already closed, so I don't need to add more stuff for producer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov opened a new pull request, #13144: KAFKA-14463 Invoke of ConnectorClientConfigOverridePolicy#close
nizhikov opened a new pull request, #13144: URL: https://github.com/apache/kafka/pull/13144 `ConnectorClientConfigOverridePolicy` implements `AutoCloseable` but close method not called. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
vamossagar12 commented on code in PR #13131: URL: https://github.com/apache/kafka/pull/13131#discussion_r1083271427 ## core/src/main/scala/kafka/admin/ConfigCommand.scala: ## @@ -771,8 +772,8 @@ object ConfigCommand extends Logging { .withRequiredArg .describedAs("command config property file") .ofType(classOf[String]) -val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.") -val describeOpt = parser.accepts("describe", "List configs for the given entity.") +val alterOpt: OptionSpec[String] = parser.accepts("alter", "Alter the configuration for the entity.").withOptionalArg() +val describeOpt: OptionSpec[String] = parser.accepts("describe", "List configs for the given entity.").withOptionalArg() Review Comment: The addition of `OptionSpec` and `withOptionalArg` is not necessary strictly right? ## server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +/** + * Helper functions for dealing with command line utilities. + */ +public class CommandLineUtils { +/** + * Check if there are no options or `--help` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.args.length == 0 || commandOpts.options.has(commandOpts.helpOpt); +} + +/** + * Check if there is `--version` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintVersionNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.options.has(commandOpts.versionOpt); +} + +/** + * Check and print help message if there is no options or `--help` option + * from command line, if `--version` is specified on the command line + * print version information and exit. + * NOTE: The function name is not strictly speaking correct anymore + * as it also checks whether the version needs to be printed, but + * refactoring this would have meant changing all command line tools + * and unnecessarily increased the blast radius of this change. + * + * @param commandOpts Acceptable options for a command + * @param message Message to display on successful check + */ +public static void printHelpAndExitIfNeeded(CommandDefaultOptions commandOpts, String message) { +if (isPrintHelpNeeded(commandOpts)) { +printUsageAndDie(commandOpts.parser, message); +} +if (isPrintVersionNeeded(commandOpts)) { +printVersionAndDie(); +} +} + +/** + * Check that all the listed options are present. + */ +public static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec... requiredList) { +for (OptionSpec arg : requiredList) { +if (!options.has(arg)) { +printUsageAndDie(parser, String.format("Missing required argument \"%s\"", arg)); +} +} +} + +/** + * Check that none of the listed options are present. + */ +public static void checkInvalidArgs(OptionParser parser, +OptionSet options, +OptionSpec usedOption, +OptionSpec... invalidOptions) { +if (options.has(usedOption)) { +for (OptionSpec arg : invalidOptions) { +if (options.has(arg)) { +printUsageAndDie(parser,
[GitHub] [kafka] vamossagar12 commented on pull request #11592: KAFKA-13501: Avoid state restore via rebalance if standbys are enabled
vamossagar12 commented on PR #11592: URL: https://github.com/apache/kafka/pull/11592#issuecomment-1399237152 @mjsax , This is a very old PR of mine which didn't get merged. On the ticket I see `new-streams-runtime-should-fix` added as a label. Is this fix needed anymore or should I close it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 closed pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata
vamossagar12 closed pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata URL: https://github.com/apache/kafka/pull/9756 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13121: MINOR: Remove some connect tests from Java 17 block list
ijuma commented on PR #13121: URL: https://github.com/apache/kafka/pull/13121#issuecomment-1399275209 The JDK 17 failed tests are not one of the ones I unblocked in this PR: > Build / JDK 17 and Scala 2.13 / org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.[1] true 10 min 1 > Build / JDK 17 and Scala 2.13 / org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.[2] false 1 min 11 sec1 > Build / JDK 17 and Scala 2.13 / org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.[2] false -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #13121: MINOR: Remove some connect tests from Java 17 block list
ijuma merged PR #13121: URL: https://github.com/apache/kafka/pull/13121 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
fvaleri commented on code in PR #13131: URL: https://github.com/apache/kafka/pull/13131#discussion_r1083318714 ## core/src/main/scala/kafka/admin/ConfigCommand.scala: ## @@ -771,8 +772,8 @@ object ConfigCommand extends Logging { .withRequiredArg .describedAs("command config property file") .ofType(classOf[String]) -val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.") -val describeOpt = parser.accepts("describe", "List configs for the given entity.") +val alterOpt: OptionSpec[String] = parser.accepts("alter", "Alter the configuration for the entity.").withOptionalArg() +val describeOpt: OptionSpec[String] = parser.accepts("describe", "List configs for the given entity.").withOptionalArg() Review Comment: Correct, it is a left over from a previous attempt. Reverting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
fvaleri commented on PR #13131: URL: https://github.com/apache/kafka/pull/13131#issuecomment-1399304541 @clolov @vamossagar12 I took your suggestions. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
fvaleri commented on code in PR #13131: URL: https://github.com/apache/kafka/pull/13131#discussion_r1083319516 ## server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +/** + * Helper functions for dealing with command line utilities. + */ +public class CommandLineUtils { +/** + * Check if there are no options or `--help` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.args.length == 0 || commandOpts.options.has(commandOpts.helpOpt); +} + +/** + * Check if there is `--version` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintVersionNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.options.has(commandOpts.versionOpt); +} + +/** + * Check and print help message if there is no options or `--help` option + * from command line, if `--version` is specified on the command line + * print version information and exit. + * NOTE: The function name is not strictly speaking correct anymore + * as it also checks whether the version needs to be printed, but + * refactoring this would have meant changing all command line tools + * and unnecessarily increased the blast radius of this change. + * + * @param commandOpts Acceptable options for a command + * @param message Message to display on successful check + */ +public static void printHelpAndExitIfNeeded(CommandDefaultOptions commandOpts, String message) { +if (isPrintHelpNeeded(commandOpts)) { +printUsageAndDie(commandOpts.parser, message); +} +if (isPrintVersionNeeded(commandOpts)) { +printVersionAndDie(); +} +} + +/** + * Check that all the listed options are present. + */ +public static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec... requiredList) { +for (OptionSpec arg : requiredList) { +if (!options.has(arg)) { +printUsageAndDie(parser, String.format("Missing required argument \"%s\"", arg)); +} +} +} + +/** + * Check that none of the listed options are present. + */ +public static void checkInvalidArgs(OptionParser parser, +OptionSet options, +OptionSpec usedOption, +OptionSpec... invalidOptions) { +if (options.has(usedOption)) { +for (OptionSpec arg : invalidOptions) { +if (options.has(arg)) { +printUsageAndDie(parser, String.format("Option \"%s\" can't be used with option \"%s\"", usedOption, arg)); +} +} +} +} + +/** + * Check that none of the listed options are present. + */ +public static void checkInvalidArgs(OptionParser parser, +OptionSet options, +OptionSpec usedOption, +Set> invalidOptions) { +OptionSpec[] array = new OptionSpec[invalidOptions.size()]; +invalidOptions.toArray(array); +checkInvalidArgs(parser, options, usedOption, array); +} + +/** + * Check that none of the listed options are present with the combination of used options. + */ +public static void
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
vamossagar12 commented on code in PR #13131: URL: https://github.com/apache/kafka/pull/13131#discussion_r1083413458 ## core/src/main/scala/kafka/utils/ToolsUtils.scala: ## @@ -64,4 +65,18 @@ object ToolsUtils { println(s"%-${maxLengthOfDisplayName}s : $specifier".format(metricName, value)) } } + + /** + * This is a simple wrapper around `CommandLineUtils.printUsageAndDie`. + * It is needed for tools migration (KAFKA-14525), as there is no Java equivalent for return type `Nothing`. + * Can be removed once [[kafka.admin.ConsumerGroupCommand]], [[kafka.tools.ConsoleConsumer]] + * and [[kafka.tools.ConsoleProducer]] are migrated. + * + * @param parser Command line options parser. + * @param message Error message. + */ + def printUsageAndDie(parser: OptionParser, message: String): Nothing = { Review Comment: I am thinking if we can add another parameter to this to say if we want to do an Exit(1) from this method or not. I see some places where Exit(1) is not being invoked from the scala layer. ## core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala: ## @@ -801,15 +801,21 @@ object ConsumerGroupCommand extends Logging { partitionsToReset.map { topicPartition => logStartOffsets.get(topicPartition) match { case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset)) -case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting starting offset of topic partition: $topicPartition") +case _ => { + CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting starting offset of topic partition: $topicPartition") + Exit.exit(1) Review Comment: Thanks @fvaleri . Wondering if these and other similar lines can be changed to use ToolsUtils. printUsageAndDie()? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
vamossagar12 commented on code in PR #13131: URL: https://github.com/apache/kafka/pull/13131#discussion_r1083413458 ## core/src/main/scala/kafka/utils/ToolsUtils.scala: ## @@ -64,4 +65,18 @@ object ToolsUtils { println(s"%-${maxLengthOfDisplayName}s : $specifier".format(metricName, value)) } } + + /** + * This is a simple wrapper around `CommandLineUtils.printUsageAndDie`. + * It is needed for tools migration (KAFKA-14525), as there is no Java equivalent for return type `Nothing`. + * Can be removed once [[kafka.admin.ConsumerGroupCommand]], [[kafka.tools.ConsoleConsumer]] + * and [[kafka.tools.ConsoleProducer]] are migrated. + * + * @param parser Command line options parser. + * @param message Error message. + */ + def printUsageAndDie(parser: OptionParser, message: String): Nothing = { Review Comment: I am thinking if we can add another parameter to this to say if we want to do an Exit(1) from this method or not. I see some places where Exit(1) is not being invoked from the the tools classes. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
vamossagar12 commented on PR #13131: URL: https://github.com/apache/kafka/pull/13131#issuecomment-1399432431 > @clolov @vamossagar12 took your suggestions. Thanks. Thanks @fvaleri ! Couple of minor comments. This should be good to go after that! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance
vamossagar12 commented on PR #12561: URL: https://github.com/apache/kafka/pull/12561#issuecomment-1399432804 hwy @C0urante , I was thinking should the exponential backoff thing that we have introduced as part of this PR should go somewhere in the docs? I am saying this since this is a deviation from how things worked prior to this. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison opened a new pull request, #13145: MINOR: Remove duplicate empty string check
mimaison opened a new pull request, #13145: URL: https://github.com/apache/kafka/pull/13145 `Values.parseString()` handles empty strings and returns the same `SchemaAndValue` [[0]](https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L383-L389) so we don't need to do the check in `toConnectHeader()`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13094: MINOR: Various cleanups in client tests
mimaison commented on code in PR #13094: URL: https://github.com/apache/kafka/pull/13094#discussion_r1083436604 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java: ## @@ -331,17 +331,15 @@ public void testStressfulSituation() throws Exception { 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, CompressionType.NONE, 0); List threads = new ArrayList<>(); for (int i = 0; i < numThreads; i++) { -threads.add(new Thread() { -public void run() { -for (int i = 0; i < msgs; i++) { -try { -accum.append(topic, i % numParts, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), cluster); -} catch (Exception e) { -e.printStackTrace(); -} +threads.add(new Thread(() -> { +for (int i1 = 0; i1 < msgs; i1++) { Review Comment: Good spot! I had not realized IntelliJ had automatically named it `i1` which is a bit strange. I renamed it to `j`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #13080: KAFKA-14575: Move ClusterTool to tools module
mimaison merged PR #13080: URL: https://github.com/apache/kafka/pull/13080 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hgeraldino opened a new pull request, #13146: KIP-991 Add deletedConnector flag when stopping tasks
hgeraldino opened a new pull request, #13146: URL: https://github.com/apache/kafka/pull/13146 Companion PR for [KIP-901: Add flag connectorDeleted flag when stopping task](https://cwiki.apache.org/confluence/display/KAFKA/KIP-901%3A+Add+flag+connectorDeleted+flag+when+stopping+task) This PR: * Introduces a new `default void stop(boolean deletedConnector)` method to the `connect.connector.Task` interface, to indicate that a task is being stopped due to the connector being deleted. * Patches the existing unit test, and introduces a new test case to validate the new logic ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #13141: MINOR: fix flaky integrations tests by using 60s default timeout for startup
ableegoldman commented on PR #13141: URL: https://github.com/apache/kafka/pull/13141#issuecomment-1399645849 test failures are unrelated, will merge to trunk and cherrypick to 3.4 to help stabilize the release -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #13141: MINOR: fix flaky integrations tests by using 60s default timeout for startup
ableegoldman merged PR #13141: URL: https://github.com/apache/kafka/pull/13141 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sgn2607 commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…
sgn2607 commented on code in PR #11748: URL: https://github.com/apache/kafka/pull/11748#discussion_r1083630069 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -169,6 +172,7 @@ public String version() { return listConsumerGroupOffsets(group).entrySet().stream() .filter(x -> shouldCheckpointTopic(x.getKey().topic())) .map(x -> checkpoint(group, x.getKey(), x.getValue())) +.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do not emit checkpoints for partitions that don't have offset-syncs Review Comment: @dadufour I'm facing similar issue w.r.to the consumer group offset handling during the failback and thank you for lifting it up in the Strimzi forum that help me land up in this thread. @mimaison I thought shall check if you have any idea if there is a KIP to handle this behavior in MM2 or is it already fixed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sgn2607 commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…
sgn2607 commented on code in PR #11748: URL: https://github.com/apache/kafka/pull/11748#discussion_r1083630069 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -169,6 +172,7 @@ public String version() { return listConsumerGroupOffsets(group).entrySet().stream() .filter(x -> shouldCheckpointTopic(x.getKey().topic())) .map(x -> checkpoint(group, x.getKey(), x.getValue())) +.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do not emit checkpoints for partitions that don't have offset-syncs Review Comment: @dadufour I'm facing similar issue w.r.to the consumer group offset handling during the failback and thank you for lifting it up in the Strimzi forum that help me land up in this thread. @mimaison I thought I shall check if you have any idea if there is a KIP to handle this behavior in MM2 or is it already fixed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sgn2607 commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…
sgn2607 commented on code in PR #11748: URL: https://github.com/apache/kafka/pull/11748#discussion_r1083630069 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -169,6 +172,7 @@ public String version() { return listConsumerGroupOffsets(group).entrySet().stream() .filter(x -> shouldCheckpointTopic(x.getKey().topic())) .map(x -> checkpoint(group, x.getKey(), x.getValue())) +.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do not emit checkpoints for partitions that don't have offset-syncs Review Comment: @dadufour I'm facing similar issue w.r.to the consumer group offset handling during the failback and thank you for lifting it up in the Strimzi forum that help me land up in this thread. @mimaison I thought I shall check if there is a KIP to handle this behavior in MM2 or is it already fixed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda closed pull request #8725: KAFKA-9608: Transaction Event Simulation Test
abbccdda closed pull request #8725: KAFKA-9608: Transaction Event Simulation Test URL: https://github.com/apache/kafka/pull/8725 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
fvaleri commented on code in PR #13131: URL: https://github.com/apache/kafka/pull/13131#discussion_r1083709872 ## core/src/main/scala/kafka/utils/ToolsUtils.scala: ## @@ -64,4 +65,18 @@ object ToolsUtils { println(s"%-${maxLengthOfDisplayName}s : $specifier".format(metricName, value)) } } + + /** + * This is a simple wrapper around `CommandLineUtils.printUsageAndDie`. + * It is needed for tools migration (KAFKA-14525), as there is no Java equivalent for return type `Nothing`. + * Can be removed once [[kafka.admin.ConsumerGroupCommand]], [[kafka.tools.ConsoleConsumer]] + * and [[kafka.tools.ConsoleProducer]] are migrated. + * + * @param parser Command line options parser. + * @param message Error message. + */ + def printUsageAndDie(parser: OptionParser, message: String): Nothing = { Review Comment: In the original code we always call exit, as the method name implies, so I think it's fine. ## core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala: ## @@ -801,15 +801,21 @@ object ConsumerGroupCommand extends Logging { partitionsToReset.map { topicPartition => logStartOffsets.get(topicPartition) match { case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset)) -case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting starting offset of topic partition: $topicPartition") +case _ => { + CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting starting offset of topic partition: $topicPartition") + Exit.exit(1) Review Comment: Yes, there were some some replacements that I missed. Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12818: KAFKA-14133: Replace EasyMock with Mockito in streams tests
clolov commented on PR #12818: URL: https://github.com/apache/kafka/pull/12818#issuecomment-1399926748 Thank you very much for the review and merge! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13127: Kafka 14586: Moving StreamResetter to tools
vamossagar12 commented on code in PR #13127: URL: https://github.com/apache/kafka/pull/13127#discussion_r1083769752 ## build.gradle: ## @@ -1757,6 +1757,7 @@ project(':tools') { archivesBaseName = "kafka-tools" dependencies { +implementation project(':core') Review Comment: btw, @fvaleri , unrelated to this PR, but there seems to another core dependency from `ReplicaVerificationTools`. Plz check my comment on https://issues.apache.org/jira/browse/KAFKA-14583. I can file an issue and migrate that one as well if you can also confirm the same. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
vamossagar12 commented on PR #13131: URL: https://github.com/apache/kafka/pull/13131#issuecomment-1399976298 Thanks @fvaleri . LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #13147: MINOR: temporarily disable the 'false' parameter of SmokeTestDriverIntegrationTest
ableegoldman commented on PR #13147: URL: https://github.com/apache/kafka/pull/13147#issuecomment-1400043590 cc @lucasbru @guozhangwang @mjsax -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request, #13147: MINOR: temporarily disable the 'false' parameter of SmokeTestDriverIntegrationTest
ableegoldman opened a new pull request, #13147: URL: https://github.com/apache/kafka/pull/13147 Need to get a clean build for 3.4 and this test has been extremely flaky. I'm looking into the failure as well, and want to pinpoint whether it's the `true` build that's broken or it's the parameterization itself causing this -- thus, let's start by temporarily disabling the `false` parameter first. See [this comment/KAFKA-14533](https://issues.apache.org/jira/browse/KAFKA-14533?focusedCommentId=17679729&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17679729) for more details -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1083847838 ## clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java: ## @@ -26,21 +26,25 @@ import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.common.utils.ByteBufferOutputStream; -import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.DataInputStream; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; public class ZstdFactory { +/** + * Default compression level + */ +private static final int DEFAULT_COMPRESSION_LEVEL = 3; Review Comment: Removed this change from this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1083831592 ## clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java: ## @@ -273,20 +272,32 @@ public int partitionLeaderEpoch() { public DataInputStream recordInputStream(BufferSupplier bufferSupplier) { final ByteBuffer buffer = this.buffer.duplicate(); buffer.position(RECORDS_OFFSET); -return new DataInputStream(compressionType().wrapForInput(buffer, magic(), bufferSupplier)); +final InputStream decompressedStream = compressionType().wrapForInput(buffer, magic(), bufferSupplier); +return decompressedStream instanceof DataInputStream ? (DataInputStream) decompressedStream : new DataInputStream(decompressedStream); } private CloseableIterator compressedIterator(BufferSupplier bufferSupplier, boolean skipKeyValue) { final DataInputStream inputStream = recordInputStream(bufferSupplier); if (skipKeyValue) { // this buffer is used to skip length delimited fields like key, value, headers -byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE]; +final ByteBuffer skipBuffer = bufferSupplier.get(compressionType().getRecommendedDOutSize()); Review Comment: I have mistakenly added that I have changed it to the description which I will fix. I would like to make this (skipping) a separate code change because I think that it is a performance regression since it has an extra buffer allocation. This is because zstd-jni implementation of skip() (de)allocates a "skip" buffer (from the buffer pool) for skipping [1]. Alternatively, in current implementation, we read all data in the same 16KB output buffer (which is allocated only once). In both cases, the amount of data copy from native to Java is same. The only difference is whether we read & skip in our code or we read & skip in zstd-jni code. Pushing skip to zstd-jni would be beneficial when it further pushed it down to native layer. [1] https://github.com/luben/zstd-jni/blob/master/src/main/java/com/github/luben/zstd/ZstdInputStreamNoFinalizer.java#L228 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1083852068 ## clients/src/main/java/org/apache/kafka/common/record/CompressionType.java: ## @@ -47,6 +47,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVer public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) { return new ByteBufferInputStream(buffer); } + +@Override +public int getRecommendedDOutSize() { +return 2 * 1024; // 2KB Review Comment: Changed to throw an UnSupportedException in the abstract class and Uncompressed case does not overload this method anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
clolov commented on code in PR #13131: URL: https://github.com/apache/kafka/pull/13131#discussion_r1083906152 ## server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +/** + * Helper functions for dealing with command line utilities. + */ +public class CommandLineUtils { +/** + * Check if there are no options or `--help` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.args.length == 0 || commandOpts.options.has(commandOpts.helpOpt); +} + +/** + * Check if there is `--version` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintVersionNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.options.has(commandOpts.versionOpt); +} + +/** + * Check and print help message if there is no options or `--help` option + * from command line, if `--version` is specified on the command line + * print version information and exit. + * NOTE: The function name is not strictly speaking correct anymore + * as it also checks whether the version needs to be printed, but + * refactoring this would have meant changing all command line tools + * and unnecessarily increased the blast radius of this change. + * + * @param commandOpts Acceptable options for a command + * @param message Message to display on successful check + */ +public static void printHelpAndExitIfNeeded(CommandDefaultOptions commandOpts, String message) { +if (isPrintHelpNeeded(commandOpts)) { +printUsageAndDie(commandOpts.parser, message); +} +if (isPrintVersionNeeded(commandOpts)) { +printVersionAndDie(); +} +} + +/** + * Check that all the listed options are present. + */ +public static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec... requiredList) { +for (OptionSpec arg : requiredList) { +if (!options.has(arg)) { +printUsageAndDie(parser, String.format("Missing required argument \"%s\"", arg)); +} +} +} + +/** + * Check that none of the listed options are present. + */ +public static void checkInvalidArgs(OptionParser parser, +OptionSet options, +OptionSpec usedOption, +OptionSpec... invalidOptions) { +if (options.has(usedOption)) { +for (OptionSpec arg : invalidOptions) { +if (options.has(arg)) { +printUsageAndDie(parser, String.format("Option \"%s\" can't be used with option \"%s\"", usedOption, arg)); +} +} +} +} + +/** + * Check that none of the listed options are present. + */ +public static void checkInvalidArgs(OptionParser parser, +OptionSet options, +OptionSpec usedOption, +Set> invalidOptions) { +OptionSpec[] array = new OptionSpec[invalidOptions.size()]; +invalidOptions.toArray(array); +checkInvalidArgs(parser, options, usedOption, array); +} + +/** + * Check that none of the listed options are present with the combination of used options. + */ +public static void c
[GitHub] [kafka] clolov commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
clolov commented on code in PR #13131: URL: https://github.com/apache/kafka/pull/13131#discussion_r1083909966 ## core/src/main/scala/kafka/tools/ConsoleConsumer.scala: ## @@ -352,9 +353,11 @@ object ConsoleConsumer extends Logging { } else if (options.has(offsetOpt)) CommandLineUtils.printUsageAndDie(parser, "The partition is required when offset is specified.") -def invalidOffset(offset: String): Nothing = +def invalidOffset(offset: String): Nothing = { CommandLineUtils.printUsageAndDie(parser, s"The provided offset value '$offset' is incorrect. Valid values are " + "'earliest', 'latest', or a non-negative long.") + Exit.exit(1) Review Comment: A similar question about using `ToolsUtils.printUsageAndDie(...)` here as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
clolov commented on code in PR #13131: URL: https://github.com/apache/kafka/pull/13131#discussion_r1083920980 ## server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +/** + * Helper functions for dealing with command line utilities. + */ +public class CommandLineUtils { +/** + * Check if there are no options or `--help` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.args.length == 0 || commandOpts.options.has(commandOpts.helpOpt); +} + +/** + * Check if there is `--version` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintVersionNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.options.has(commandOpts.versionOpt); +} + +/** + * Check and print help message if there is no options or `--help` option + * from command line, if `--version` is specified on the command line + * print version information and exit. + * NOTE: The function name is not strictly speaking correct anymore + * as it also checks whether the version needs to be printed, but + * refactoring this would have meant changing all command line tools + * and unnecessarily increased the blast radius of this change. + * + * @param commandOpts Acceptable options for a command + * @param message Message to display on successful check + */ +public static void printHelpAndExitIfNeeded(CommandDefaultOptions commandOpts, String message) { +if (isPrintHelpNeeded(commandOpts)) { +printUsageAndDie(commandOpts.parser, message); +} +if (isPrintVersionNeeded(commandOpts)) { +printVersionAndDie(); +} +} + +/** + * Check that all the listed options are present. + */ +public static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec... requiredList) { +for (OptionSpec arg : requiredList) { +if (!options.has(arg)) { +printUsageAndDie(parser, String.format("Missing required argument \"%s\"", arg)); +} +} +} + +/** + * Check that none of the listed options are present. + */ +public static void checkInvalidArgs(OptionParser parser, +OptionSet options, +OptionSpec usedOption, +OptionSpec... invalidOptions) { +if (options.has(usedOption)) { +for (OptionSpec arg : invalidOptions) { +if (options.has(arg)) { +printUsageAndDie(parser, String.format("Option \"%s\" can't be used with option \"%s\"", usedOption, arg)); +} +} +} +} + +/** + * Check that none of the listed options are present. + */ +public static void checkInvalidArgs(OptionParser parser, +OptionSet options, +OptionSpec usedOption, +Set> invalidOptions) { +OptionSpec[] array = new OptionSpec[invalidOptions.size()]; +invalidOptions.toArray(array); +checkInvalidArgs(parser, options, usedOption, array); +} + +/** + * Check that none of the listed options are present with the combination of used options. + */ +public static void c
[GitHub] [kafka] clolov commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
clolov commented on code in PR #13131: URL: https://github.com/apache/kafka/pull/13131#discussion_r1083920980 ## server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +/** + * Helper functions for dealing with command line utilities. + */ +public class CommandLineUtils { +/** + * Check if there are no options or `--help` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.args.length == 0 || commandOpts.options.has(commandOpts.helpOpt); +} + +/** + * Check if there is `--version` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintVersionNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.options.has(commandOpts.versionOpt); +} + +/** + * Check and print help message if there is no options or `--help` option + * from command line, if `--version` is specified on the command line + * print version information and exit. + * NOTE: The function name is not strictly speaking correct anymore + * as it also checks whether the version needs to be printed, but + * refactoring this would have meant changing all command line tools + * and unnecessarily increased the blast radius of this change. + * + * @param commandOpts Acceptable options for a command + * @param message Message to display on successful check + */ +public static void printHelpAndExitIfNeeded(CommandDefaultOptions commandOpts, String message) { +if (isPrintHelpNeeded(commandOpts)) { +printUsageAndDie(commandOpts.parser, message); +} +if (isPrintVersionNeeded(commandOpts)) { +printVersionAndDie(); +} +} + +/** + * Check that all the listed options are present. + */ +public static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec... requiredList) { +for (OptionSpec arg : requiredList) { +if (!options.has(arg)) { +printUsageAndDie(parser, String.format("Missing required argument \"%s\"", arg)); +} +} +} + +/** + * Check that none of the listed options are present. + */ +public static void checkInvalidArgs(OptionParser parser, +OptionSet options, +OptionSpec usedOption, +OptionSpec... invalidOptions) { +if (options.has(usedOption)) { +for (OptionSpec arg : invalidOptions) { +if (options.has(arg)) { +printUsageAndDie(parser, String.format("Option \"%s\" can't be used with option \"%s\"", usedOption, arg)); +} +} +} +} + +/** + * Check that none of the listed options are present. + */ +public static void checkInvalidArgs(OptionParser parser, +OptionSet options, +OptionSpec usedOption, +Set> invalidOptions) { +OptionSpec[] array = new OptionSpec[invalidOptions.size()]; +invalidOptions.toArray(array); +checkInvalidArgs(parser, options, usedOption, array); +} + +/** + * Check that none of the listed options are present with the combination of used options. + */ +public static void c
[GitHub] [kafka] clolov commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
clolov commented on code in PR #13131: URL: https://github.com/apache/kafka/pull/13131#discussion_r1083920980 ## server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +/** + * Helper functions for dealing with command line utilities. + */ +public class CommandLineUtils { +/** + * Check if there are no options or `--help` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.args.length == 0 || commandOpts.options.has(commandOpts.helpOpt); +} + +/** + * Check if there is `--version` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintVersionNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.options.has(commandOpts.versionOpt); +} + +/** + * Check and print help message if there is no options or `--help` option + * from command line, if `--version` is specified on the command line + * print version information and exit. + * NOTE: The function name is not strictly speaking correct anymore + * as it also checks whether the version needs to be printed, but + * refactoring this would have meant changing all command line tools + * and unnecessarily increased the blast radius of this change. + * + * @param commandOpts Acceptable options for a command + * @param message Message to display on successful check + */ +public static void printHelpAndExitIfNeeded(CommandDefaultOptions commandOpts, String message) { +if (isPrintHelpNeeded(commandOpts)) { +printUsageAndDie(commandOpts.parser, message); +} +if (isPrintVersionNeeded(commandOpts)) { +printVersionAndDie(); +} +} + +/** + * Check that all the listed options are present. + */ +public static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec... requiredList) { +for (OptionSpec arg : requiredList) { +if (!options.has(arg)) { +printUsageAndDie(parser, String.format("Missing required argument \"%s\"", arg)); +} +} +} + +/** + * Check that none of the listed options are present. + */ +public static void checkInvalidArgs(OptionParser parser, +OptionSet options, +OptionSpec usedOption, +OptionSpec... invalidOptions) { +if (options.has(usedOption)) { +for (OptionSpec arg : invalidOptions) { +if (options.has(arg)) { +printUsageAndDie(parser, String.format("Option \"%s\" can't be used with option \"%s\"", usedOption, arg)); +} +} +} +} + +/** + * Check that none of the listed options are present. + */ +public static void checkInvalidArgs(OptionParser parser, +OptionSet options, +OptionSpec usedOption, +Set> invalidOptions) { +OptionSpec[] array = new OptionSpec[invalidOptions.size()]; +invalidOptions.toArray(array); +checkInvalidArgs(parser, options, usedOption, array); +} + +/** + * Check that none of the listed options are present with the combination of used options. + */ +public static void c
[GitHub] [kafka] clolov commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
clolov commented on code in PR #13131: URL: https://github.com/apache/kafka/pull/13131#discussion_r1083906152 ## server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +/** + * Helper functions for dealing with command line utilities. + */ +public class CommandLineUtils { +/** + * Check if there are no options or `--help` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.args.length == 0 || commandOpts.options.has(commandOpts.helpOpt); +} + +/** + * Check if there is `--version` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintVersionNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.options.has(commandOpts.versionOpt); +} + +/** + * Check and print help message if there is no options or `--help` option + * from command line, if `--version` is specified on the command line + * print version information and exit. + * NOTE: The function name is not strictly speaking correct anymore + * as it also checks whether the version needs to be printed, but + * refactoring this would have meant changing all command line tools + * and unnecessarily increased the blast radius of this change. + * + * @param commandOpts Acceptable options for a command + * @param message Message to display on successful check + */ +public static void printHelpAndExitIfNeeded(CommandDefaultOptions commandOpts, String message) { +if (isPrintHelpNeeded(commandOpts)) { +printUsageAndDie(commandOpts.parser, message); +} +if (isPrintVersionNeeded(commandOpts)) { +printVersionAndDie(); +} +} + +/** + * Check that all the listed options are present. + */ +public static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec... requiredList) { +for (OptionSpec arg : requiredList) { +if (!options.has(arg)) { +printUsageAndDie(parser, String.format("Missing required argument \"%s\"", arg)); +} +} +} + +/** + * Check that none of the listed options are present. + */ +public static void checkInvalidArgs(OptionParser parser, +OptionSet options, +OptionSpec usedOption, +OptionSpec... invalidOptions) { +if (options.has(usedOption)) { +for (OptionSpec arg : invalidOptions) { +if (options.has(arg)) { +printUsageAndDie(parser, String.format("Option \"%s\" can't be used with option \"%s\"", usedOption, arg)); +} +} +} +} + +/** + * Check that none of the listed options are present. + */ +public static void checkInvalidArgs(OptionParser parser, +OptionSet options, +OptionSpec usedOption, +Set> invalidOptions) { +OptionSpec[] array = new OptionSpec[invalidOptions.size()]; +invalidOptions.toArray(array); +checkInvalidArgs(parser, options, usedOption, array); +} + +/** + * Check that none of the listed options are present with the combination of used options. + */ +public static void c
[GitHub] [kafka] mimaison commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…
mimaison commented on code in PR #11748: URL: https://github.com/apache/kafka/pull/11748#discussion_r1083928100 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -169,6 +172,7 @@ public String version() { return listConsumerGroupOffsets(group).entrySet().stream() .filter(x -> shouldCheckpointTopic(x.getKey().topic())) .map(x -> checkpoint(group, x.getKey(), x.getValue())) +.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do not emit checkpoints for partitions that don't have offset-syncs Review Comment: @sgn2607 I'm not aware of anybody working on this. I don't think there's even a Jira clearly stating this requirement. Feel free to work on it if you want. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
clolov commented on code in PR #13131: URL: https://github.com/apache/kafka/pull/13131#discussion_r1083908982 ## core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala: ## @@ -100,6 +101,7 @@ object ZkSecurityMigrator extends Logging { false case _ => CommandLineUtils.printUsageAndDie(opts.parser, usageMessage) +Exit.exit(1) Review Comment: Could you use the new `ToolsUtils.printUsageAndDie(...)` which you introduced? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #10826: KAFKA-7632: Support Compression Level
clolov commented on PR #10826: URL: https://github.com/apache/kafka/pull/10826#issuecomment-1400189273 Hello @dongjinleekr! What is the current state of this pull request? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #13059: MINOR: KafkaConfig should not expose internal config when queried for non-internal values
mimaison merged PR #13059: URL: https://github.com/apache/kafka/pull/13059 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #13094: MINOR: Various cleanups in client tests
mimaison merged PR #13094: URL: https://github.com/apache/kafka/pull/13094 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
fvaleri commented on code in PR #13131: URL: https://github.com/apache/kafka/pull/13131#discussion_r1084030517 ## core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala: ## @@ -100,6 +101,7 @@ object ZkSecurityMigrator extends Logging { false case _ => CommandLineUtils.printUsageAndDie(opts.parser, usageMessage) +Exit.exit(1) Review Comment: Sure. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1084093810 ## clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java: ## @@ -62,10 +68,11 @@ public void release(ByteBuffer buffer) { } }; -// Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance -// in cases where the caller reads a small number of bytes (potentially a single byte). -return new BufferedInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer), -bufferPool), 16 * 1024); +// We do not use an intermediate buffer to store the decompressed data as a result of JNI read() calls using +// `ZstdInputStreamNoFinalizer` here. Every read() call to `DataInputStream` will be a JNI call and the +// caller is expected to balance the tradeoff between reading large amount of data vs. making multiple JNI +// calls. +return new DataInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer), bufferPool)); Review Comment: 1. Thanks for pointing out. This is an artifact of some other changes I was trying to do. Fix it now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1084097512 ## clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java: ## @@ -62,10 +68,11 @@ public void release(ByteBuffer buffer) { } }; -// Set output buffer (uncompressed) to 16 KB (none by default) to ensure reasonable performance -// in cases where the caller reads a small number of bytes (potentially a single byte). -return new BufferedInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer), -bufferPool), 16 * 1024); +// We do not use an intermediate buffer to store the decompressed data as a result of JNI read() calls using +// `ZstdInputStreamNoFinalizer` here. Every read() call to `DataInputStream` will be a JNI call and the +// caller is expected to balance the tradeoff between reading large amount of data vs. making multiple JNI +// calls. +return new DataInputStream(new ZstdInputStreamNoFinalizer(new ByteBufferInputStream(buffer), bufferPool)); Review Comment: 2. For broker, the number of JNI calls remain same because prior to this change, we were making JNI calls in chunks of 16KB (using BufferedInputStream) and now we are making JNI calls in chunks of 16KB based on decompression buffer size. For consumer, the number of JNI calls *will change*. Earlier, consumer was making multiple calls in chunks of 16KB (using BufferedInputStream) and now it is making one call to read the entire data. Note that consumer does not use "skipKeyValueIterator" variation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
divijvaidya commented on PR #13135: URL: https://github.com/apache/kafka/pull/13135#issuecomment-1400407941 TODO (will update PR in a short while) - 1. Add benchmark for case when batch contains single 10 byte message 2. Test consumer performance -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance
C0urante commented on PR #12561: URL: https://github.com/apache/kafka/pull/12561#issuecomment-1400491466 @vamossagar12 I don't think it's necessary to call this out anywhere unless it's caused unexpected issues with our users. The intention behind the exponential backoff is to avoid rebalance storms but I'm still not sure when those would ever realistically happen, so until we see otherwise, I'd prefer to leave it as an internal implementation detail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request, #13148: KAFKA-14645: Use plugin classloader when retrieving connector plugin config definitions
C0urante opened a new pull request, #13148: URL: https://github.com/apache/kafka/pull/13148 [Jira](https://issues.apache.org/jira/browse/KAFKA-14645) If we don't switch to the classloader of a plugin before loading its `ConfigDef`, then classloading bugs can appear for, e.g., properties with the `CLASS` type. See https://github.com/kcctl/kcctl/issues/266 for an instance of this kind of bug. This PR adds a classloader swap in the part of the code base that's responsible for servicing requests to the `GET /connector-plugins//config` endpoint. The existing monolithic unit test for this logic is broken out into dedicated individual unit tests for each kind of connector plugin; this is done to avoid having to reset expectations on the mocked `Plugins` object when verifying calls to `withClassLoader`, since in the unit testing environment the same classloader may be used for multiple different plugin types. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13148: KAFKA-14645: Use plugin classloader when retrieving connector plugin config definitions
C0urante commented on code in PR #13148: URL: https://github.com/apache/kafka/pull/13148#discussion_r1084283540 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -821,13 +827,14 @@ public List connectorPluginConfig(String pluginName) { default: throw new BadRequestException("Invalid plugin type " + pluginType + ". Valid types are sink, source, converter, header_converter, transformation, predicate."); } -} catch (ClassNotFoundException cnfe) { -throw new NotFoundException("Unknown plugin " + pluginName + "."); -} -for (ConfigDef.ConfigKey configKey : configDefs.configKeys().values()) { -results.add(AbstractHerder.convertConfigKey(configKey)); +List results = new ArrayList<>(); +for (ConfigDef.ConfigKey configKey : configDefs.configKeys().values()) { +results.add(AbstractHerder.convertConfigKey(configKey)); +} +return results; +} catch (ClassNotFoundException e) { +throw new ConnectException("Failed to load plugin class or one of its dependencies", e); Review Comment: IMO an HTTP 500 response (which will be the result of throwing a `ConnectException` here) is more warranted in this place since we've already checked to see that the plugin type is recognized on the worker by this point. If things break here, it's more likely that the worker or one of its plugins is set up or packaged incorrectly than that the user issued a bad request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
ijuma commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1084344515 ## clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java: ## @@ -273,20 +272,32 @@ public int partitionLeaderEpoch() { public DataInputStream recordInputStream(BufferSupplier bufferSupplier) { final ByteBuffer buffer = this.buffer.duplicate(); buffer.position(RECORDS_OFFSET); -return new DataInputStream(compressionType().wrapForInput(buffer, magic(), bufferSupplier)); +final InputStream decompressedStream = compressionType().wrapForInput(buffer, magic(), bufferSupplier); +return decompressedStream instanceof DataInputStream ? (DataInputStream) decompressedStream : new DataInputStream(decompressedStream); } private CloseableIterator compressedIterator(BufferSupplier bufferSupplier, boolean skipKeyValue) { final DataInputStream inputStream = recordInputStream(bufferSupplier); if (skipKeyValue) { // this buffer is used to skip length delimited fields like key, value, headers -byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE]; +final ByteBuffer skipBuffer = bufferSupplier.get(compressionType().getRecommendedDOutSize()); Review Comment: Since we cache buffers per thread, I think you mean we will use two buffers instead of one per thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
ijuma commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1084344515 ## clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java: ## @@ -273,20 +272,32 @@ public int partitionLeaderEpoch() { public DataInputStream recordInputStream(BufferSupplier bufferSupplier) { final ByteBuffer buffer = this.buffer.duplicate(); buffer.position(RECORDS_OFFSET); -return new DataInputStream(compressionType().wrapForInput(buffer, magic(), bufferSupplier)); +final InputStream decompressedStream = compressionType().wrapForInput(buffer, magic(), bufferSupplier); +return decompressedStream instanceof DataInputStream ? (DataInputStream) decompressedStream : new DataInputStream(decompressedStream); } private CloseableIterator compressedIterator(BufferSupplier bufferSupplier, boolean skipKeyValue) { final DataInputStream inputStream = recordInputStream(bufferSupplier); if (skipKeyValue) { // this buffer is used to skip length delimited fields like key, value, headers -byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE]; +final ByteBuffer skipBuffer = bufferSupplier.get(compressionType().getRecommendedDOutSize()); Review Comment: Since we cache buffers per thread, I think you mean we will use two buffers instead of one per thread (for the zstd case). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest
ijuma commented on code in PR #12781: URL: https://github.com/apache/kafka/pull/12781#discussion_r1084369349 ## connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java: ## @@ -98,7 +98,7 @@ private Optional> producer; private TopicAdmin admin; -private Thread thread; Review Comment: We have to mutate this variable from a test? Generally, it's better to expose a method for what you need to do versus exposing the mutable variable in this way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store
vcrfxia commented on code in PR #13126: URL: https://github.com/apache/kafka/pull/13126#discussion_r1082994727 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}. + * The value format is: + * + * + + , reverse-sorted by timestamp> + + * + * Negative {@code value_size} is used to indicate that the value stored is a tombstone, in order to + * distinguish from empty array which has {@code value_size} of zero. In practice, {@code value_size} + * is always set to -1 for the tombstone case, though this need not be true in general. + */ +final class RocksDBVersionedStoreSegmentValueFormatter { +private static final int TIMESTAMP_SIZE = 8; +private static final int VALUE_SIZE = 4; + +/** + * @return the validTo timestamp of the latest record in the provided segment + */ +static long getNextTimestamp(final byte[] segmentValue) { +return ByteBuffer.wrap(segmentValue).getLong(0); +} + +/** + * Returns whether the provided segment is "empty." An empty segment is one that + * contains only a single tombstone with no validTo timestamp specified. In this case, + * the serialized segment contains only the timestamp of the tombstone (stored as the segment's + * {@code nextTimestamp}) and nothing else. + * + * This can happen if, e.g., the only record inserted for a particular key is + * a tombstone. In this case, the tombstone must be stored in a segment + * (as the latest value store does not store tombstones), but also has no validTo + * timestamp associated with it. + * + * @return whether the segment is "empty" + */ +static boolean isEmpty(final byte[] segmentValue) { +return segmentValue.length <= TIMESTAMP_SIZE; +} + +/** + * Requires that the segment is not empty. Caller is responsible for verifying that this + * is the case before calling this method. Review Comment: No, `nextTimestamp` is always present, even if the segment is empty. In the case where the segment is empty, we still need to store the tombstone's timestamp somewhere, and we choose to store it in `nextTimestamp` since `nextTimestamp` is serialized first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] erichaagdev opened a new pull request, #13149: Remove unnecessary asCollection causing eager dependency resolution
erichaagdev opened a new pull request, #13149: URL: https://github.com/apache/kafka/pull/13149 The call to `asCollection()` causes several configurations to be resolved eagerly, and potentially unnecessarily. Dropping `asCollection()` ensures the configurations are only resolved when they are needed. Before: https://scans.gradle.com/s/ivl2vfw2sq252/performance/dependency-resolution#dependency-resolution-configuration-user-initiated After: https://scans.gradle.com/s/uxkjpjvylz6qw/performance/dependency-resolution#dependency-resolution-configuration-user-initiated ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [x] Verify test coverage and CI build status - [x] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft
cmccabe commented on code in PR #13116: URL: https://github.com/apache/kafka/pull/13116#discussion_r1084419497 ## core/src/main/scala/kafka/server/ControllerApis.scala: ## @@ -392,14 +394,36 @@ class ControllerApis(val requestChannel: RequestChannel, val describableTopicNames = getDescribableTopics.apply(allowedTopicNames).asJava val effectiveRequest = request.duplicate() val iterator = effectiveRequest.topics().iterator() +var totalRequestedPartitionCount = 0 +val defaultPartitionCont = config.numPartitions.intValue() while (iterator.hasNext) { val creatableTopic = iterator.next() if (duplicateTopicNames.contains(creatableTopic.name()) || !authorizedTopicNames.contains(creatableTopic.name())) { iterator.remove() - } -} -controller.createTopics(context, effectiveRequest, describableTopicNames).thenApply { response => + } else { +if (!creatableTopic.assignments().isEmpty) { + totalRequestedPartitionCount += creatableTopic.assignments().size() +} else if (creatableTopic.numPartitions() > 0) + totalRequestedPartitionCount += creatableTopic.numPartitions() +else + totalRequestedPartitionCount += defaultPartitionCont + } +} +val future = try { + if (!effectiveRequest.validateOnly()) +controllerMutationQuota.record(totalRequestedPartitionCount) + controller.createTopics(context, effectiveRequest, describableTopicNames) +} catch { + case e: ThrottlingQuotaExceededException => +val apiError = ApiError.fromThrowable(e) +val data = new CreateTopicsResponseData +effectiveRequest.topics().forEach(topic => + data.topics.add(new CreateTopicsResponseData.CreatableTopicResult().setName(topic.name).setErrorCode(apiError.error.code).setErrorMessage(apiError.message))) +data.setThrottleTimeMs(e.throttleTimeMs()) +CompletableFuture.completedFuture(data) +} Review Comment: I don't think you need to catch this here, since we have a catch block on the request handler that will deal with "send back error X to response Y" If you do have a catch block here you'd certainly need a return statement as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft
cmccabe commented on PR #13116: URL: https://github.com/apache/kafka/pull/13116#issuecomment-1400816232 > Also note that I didn't implement the full logic -- QuorumController has additional checks that could in fact cause a specific topic request to be rejected (e.g. explicit assignments not contiguous partition numbers starting with 0). This could lead to a calculation of partition counts for throttling purposes that exceeds the actual partition counts requested -- which could cause throttling in cases where technically it should not. I think arguably if you are making requests that fail, your quota should be dinged for those requests. Surely we don't want to let the guy spinning in a tight loop making tons of bad requests go un-throttled? I would even maybe include requests that fail authorization in the quota, although I know the ZK implementation might not. Authorization is expensive; you should get charged for wasting the authorizer's time with bad rqeuests. Maybe we should charge for validateOnly requests as well. They are almost as expensive as non-validateOnly requests... the only difference is no records written. > The benefit of the Scala approach is that we avoid doing work in the controller itself when throttling does occur. We don't generate all of the Metadata Log records, for example. Note that createPartitions and delateTopics will have to get access to the partition counts, which currently I think is not available. One big issue is that ControllerMutationQuota takes some locks, and I am worried that the controller thread could get blocked behind those locks. This is similar to the Authorizer where for better or worse, we went with a blocking implementation that would be too slow to call from the quorum controller thread (for example, the Authorizer implementation might send out a blocking request to ZK or other external system, etc.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13144: KAFKA-14463 Invoke of ConnectorClientConfigOverridePolicy#close
C0urante commented on code in PR #13144: URL: https://github.com/apache/kafka/pull/13144#discussion_r1084433485 ## connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java: ## @@ -27,7 +27,7 @@ public abstract class BaseConnectorClientConfigOverridePolicyTest { -protected abstract ConnectorClientConfigOverridePolicy policyToTest(); Review Comment: Good eye! ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ## @@ -227,9 +227,14 @@ public class DistributedHerderTest { private SinkConnectorConfig conn1SinkConfig; private SinkConnectorConfig conn1SinkConfigUpdated; private short connectProtocolVersion; +private boolean connectorClientConfigOverridePolicyClosed = false; private final ConnectorClientConfigOverridePolicy -noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy(); - +noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy() { +@Override +public void close() { +connectorClientConfigOverridePolicyClosed = true; +} +}; Review Comment: I like this testing strategy overall, but I think this logic can go into a dedicated class, similar to the [SampleSinkConnector class](https://github.com/apache/kafka/blob/00e5803cd3af89011254e734232308618403309d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleSinkConnector.java) that we use for unit tests that require a sink connector. The `SampleConnectorClientConfigOverridePolicy` class (feel free to rename) could then expose a public `boolean isClosed()` method, which would replace the `connectorClientConfigOverridePolicyClosed` field here. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -151,6 +151,11 @@ protected void stopServices() { this.configBackingStore.stop(); this.worker.stop(); this.connectorExecutor.shutdown(); +try { +this.connectorClientConfigOverridePolicy.close(); +} catch (Exception e) { +log.warn("Exception while stop connectorClientConfigOverridePolicy:", e); +} Review Comment: This can be a one-liner with the `Utils` class: ```suggestion Utils.closeQuietly(this.connectorClientConfigOverridePolicy, "connector client config override policy"); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #12802: KAFKA-14311: Connect Worker clean shutdown does not cleanly stop connectors/tasks
C0urante merged PR #12802: URL: https://github.com/apache/kafka/pull/12802 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2
gharris1727 commented on code in PR #13137: URL: https://github.com/apache/kafka/pull/13137#discussion_r1084435808 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -228,12 +229,17 @@ public class DistributedHerder extends AbstractHerder implements Runnable { * @param kafkaClusterId the identifier of the Kafka cluster to use for internal topics; may not be null * @param statusBackingStore the backing store for statuses; may not be null * @param configBackingStore the backing store for connector configurations; may not be null - * @param restUrlthe URL of this herder's REST API; may not be null + * @param restUrlthe URL of this herder's REST API; may not be null, but may be an arbitrary placeholder + * value if this worker does not expose a REST API + * @param restClient a REST client that can be used to issue requests to other workers in the cluster; may + * be null if inter-worker communication is not enabled * @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden *in connector configurations; may not be null + * @param restNamespace zero or more path elements to prepend to the paths of forwarded REST requests; may be empty, but not null * @param uponShutdown any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped}, * after all services and resources owned by this herder are stopped */ +// TODO: Do we really need two separate public constructors? Review Comment: interesting, the for-testing constructor appears unused, and has been for as long as it's existed (ever since #321). We can push this refactor out to a different PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2
C0urante commented on code in PR #13137: URL: https://github.com/apache/kafka/pull/13137#discussion_r1084460160 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -228,12 +229,17 @@ public class DistributedHerder extends AbstractHerder implements Runnable { * @param kafkaClusterId the identifier of the Kafka cluster to use for internal topics; may not be null * @param statusBackingStore the backing store for statuses; may not be null * @param configBackingStore the backing store for connector configurations; may not be null - * @param restUrlthe URL of this herder's REST API; may not be null + * @param restUrlthe URL of this herder's REST API; may not be null, but may be an arbitrary placeholder + * value if this worker does not expose a REST API + * @param restClient a REST client that can be used to issue requests to other workers in the cluster; may + * be null if inter-worker communication is not enabled * @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden *in connector configurations; may not be null + * @param restNamespace zero or more path elements to prepend to the paths of forwarded REST requests; may be empty, but not null * @param uponShutdown any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped}, * after all services and resources owned by this herder are stopped */ +// TODO: Do we really need two separate public constructors? Review Comment: Blegh, this comment was left in from an earlier draft where there was an additional constructor. I refactored it out before opening the PR, but apparently forgot to remove the comment. Took it out now, thanks for catching! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13138: MINOR: Small cleanups in refactored consumer implementation
guozhangwang commented on code in PR #13138: URL: https://github.com/apache/kafka/pull/13138#discussion_r1084463267 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java: ## @@ -149,11 +156,14 @@ private void onSuccessfulResponse(final FindCoordinatorResponseData.Coordinator coordinator.host(), coordinator.port()); log.info("Discovered group coordinator {}", coordinator); -coordinatorRequestState.reset(); +coordinatorRequestState.onSuccessfulAttempt(currentTimeMs); } -private void onFailedCoordinatorResponse(final Exception exception, final long currentTimeMs) { -coordinatorRequestState.updateLastFailedAttempt(currentTimeMs); +private void onFailedResponse( +final long currentTimeMs, +final Throwable exception +) { +coordinatorRequestState.onFailedAttempt(currentTimeMs); Review Comment: Yeah that makes sense too. I was thinking on `onSuccessfulAttempt` we record the time in the end of the function so why not doing the same, but I think its does not matter much. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13125: KAFKA-14626: Kafka Consumer Coordinator does not cleanup all metrics after shutdown
philipnee commented on PR #13125: URL: https://github.com/apache/kafka/pull/13125#issuecomment-1400882904 hey @cmccabe - would you have time to take a look at this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store
vcrfxia commented on code in PR #13126: URL: https://github.com/apache/kafka/pull/13126#discussion_r1084484753 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}. + * The value format is: + * + * + + , reverse-sorted by timestamp> + + * + * Negative {@code value_size} is used to indicate that the value stored is a tombstone, in order to + * distinguish from empty array which has {@code value_size} of zero. In practice, {@code value_size} + * is always set to -1 for the tombstone case, though this need not be true in general. + */ +final class RocksDBVersionedStoreSegmentValueFormatter { +private static final int TIMESTAMP_SIZE = 8; +private static final int VALUE_SIZE = 4; + +/** + * @return the validTo timestamp of the latest record in the provided segment + */ +static long getNextTimestamp(final byte[] segmentValue) { +return ByteBuffer.wrap(segmentValue).getLong(0); +} + +/** + * Returns whether the provided segment is "empty." An empty segment is one that + * contains only a single tombstone with no validTo timestamp specified. In this case, + * the serialized segment contains only the timestamp of the tombstone (stored as the segment's + * {@code nextTimestamp}) and nothing else. + * + * This can happen if, e.g., the only record inserted for a particular key is + * a tombstone. In this case, the tombstone must be stored in a segment + * (as the latest value store does not store tombstones), but also has no validTo + * timestamp associated with it. Review Comment: That's fair. I added an extra paragraph into the top-level javadoc just now. LMK what you think. > In general, I prefer to have some "invariant" as it makes it simpler to reason about the code, but introducing this edge case void the invariant that `nextTimestamp` is the "largest validTo" of the segment. I completely agree, but this "empty segment" case really is an edge case which cannot be combined with the general case. Here's some more context (possibly too much detail 🙂) on why we can't use `nextTimestamp = ` in the empty segment case, which is what we'd need to maintain that `nextTimestamp` is always the largest validTo of the segment: In the implementation of the versioned store itself (PR coming soon), the latest record version for a particular key will be stored in a "latest value store," and older record versions will be stored in "segment stores" based on their validTo timestamps, except when the latest record version (for a particular key) is a tombstone. In this case, we don't want to store the tombstone in the latest value store because there's no expiry mechanism for the latest value store (and the tombstones might accumulate indefinitely). So, the tombstone is stored in a segment. But if these special tombstones have `validTo = infinity`, then this violates the invariant that "record versions are stored into segments based on their validTo timestamp." (We don't want to repeatedly move the tombstone into earlier and earlier segments as newer segments are created, because the whole point of putting them into a segment is so that they eventually expire.) Violating this invariant is a big problem for store efficiency. Suppose a new record version is put to the store later, long after a tombstone has been put (but before the tombstone has expired). In order to find the tombstone and update its validTo timestamp, we'd have to check every single segment (until we find an existing record version). We'd have to do this for every single put, since we wouldn't know whether the latest record version is a tombstone or not. In contrast, if we allow the validTo timestamp for the tombstone of the empty segment to be the tombstone's
[GitHub] [kafka] nizhikov closed pull request #12574: KAFKA-13908 Rethrow ExecutionException to preserve original cause
nizhikov closed pull request #12574: KAFKA-13908 Rethrow ExecutionException to preserve original cause URL: https://github.com/apache/kafka/pull/12574 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store
vcrfxia commented on code in PR #13126: URL: https://github.com/apache/kafka/pull/13126#discussion_r1084488305 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java: ## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** + * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}. + * The value format is: + * + * + + , reverse-sorted by timestamp> + + * + * Negative {@code value_size} is used to indicate that the value stored is a tombstone, in order to + * distinguish from empty array which has {@code value_size} of zero. In practice, {@code value_size} + * is always set to -1 for the tombstone case, though this need not be true in general. + */ +final class RocksDBVersionedStoreSegmentValueFormatter { +private static final int TIMESTAMP_SIZE = 8; +private static final int VALUE_SIZE = 4; + +/** + * @return the validTo timestamp of the latest record in the provided segment + */ +static long getNextTimestamp(final byte[] segmentValue) { +return ByteBuffer.wrap(segmentValue).getLong(0); +} + +/** + * Returns whether the provided segment is "empty." An empty segment is one that + * contains only a single tombstone with no validTo timestamp specified. In this case, + * the serialized segment contains only the timestamp of the tombstone (stored as the segment's + * {@code nextTimestamp}) and nothing else. + * + * This can happen if, e.g., the only record inserted for a particular key is + * a tombstone. In this case, the tombstone must be stored in a segment + * (as the latest value store does not store tombstones), but also has no validTo + * timestamp associated with it. + * + * @return whether the segment is "empty" + */ +static boolean isEmpty(final byte[] segmentValue) { +return segmentValue.length <= TIMESTAMP_SIZE; +} + +/** + * Requires that the segment is not empty. Caller is responsible for verifying that this + * is the case before calling this method. + * + * @return the timestamp of the earliest record in the provided segment. + */ +static long getMinTimestamp(final byte[] segmentValue) { +return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE); +} + +/** + * @return the deserialized segment value + */ +static SegmentValue deserialize(final byte[] segmentValue) { +return new PartiallyDeserializedSegmentValue(segmentValue); +} + +/** + * Creates a new segment value that contains the provided record. + * + * @param value the record value + * @param validFrom the record's timestamp + * @param validTo the record's validTo timestamp + * @return the newly created segment value + */ +static SegmentValue newSegmentValueWithRecord( +final byte[] value, final long validFrom, final long validTo) { +return new PartiallyDeserializedSegmentValue(value, validFrom, validTo); +} + +/** + * Creates a new empty segment value. + * + * @param timestamp the timestamp of the tombstone for this empty segment value + * @return the newly created segment value + */ +static SegmentValue newSegmentValueWithTombstone(final long timestamp) { +return new PartiallyDeserializedSegmentValue(timestamp); +} + +interface SegmentValue { + +/** + * @return whether the segment is empty. See + * {@link RocksDBVersionedStoreSegmentValueFormatter#isEmpty(byte[])} for details. + */ +boolean isEmpty(); + +/** + * Finds the latest record in this segment with timestamp not exceeding the provided + * timestamp bound. This method requires that the provided timestamp bound exists in + * this segment, i.e., the segment is not empty, and the provided timestamp bound is + * at least minTimestamp and is smaller than nextTimestamp. +
[GitHub] [kafka] vcrfxia commented on pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store
vcrfxia commented on PR #13126: URL: https://github.com/apache/kafka/pull/13126#issuecomment-1400904752 Thanks for your reviews, @mjsax ! I pushed another commit just now to incorporate your latest suggestions (javadocs changes only) and responded inline to the main points of discussion. The build passed on the previous go (besides flaky integration test failures) so if the latest changes look good to you, I believe we should be ready to merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on a diff in pull request #13144: KAFKA-14463 Invoke of ConnectorClientConfigOverridePolicy#close
nizhikov commented on code in PR #13144: URL: https://github.com/apache/kafka/pull/13144#discussion_r1084491140 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java: ## @@ -227,9 +227,14 @@ public class DistributedHerderTest { private SinkConnectorConfig conn1SinkConfig; private SinkConnectorConfig conn1SinkConfigUpdated; private short connectProtocolVersion; +private boolean connectorClientConfigOverridePolicyClosed = false; private final ConnectorClientConfigOverridePolicy -noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy(); - +noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy() { +@Override +public void close() { +connectorClientConfigOverridePolicyClosed = true; +} +}; Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #13144: KAFKA-14463 Invoke of ConnectorClientConfigOverridePolicy#close
nizhikov commented on PR #13144: URL: https://github.com/apache/kafka/pull/13144#issuecomment-1400908925 @C0urante Thanks for the review. I've applied your suggestions. Please, take a look one more time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13120: MINOR: Connect Javadocs improvements
gharris1727 commented on code in PR #13120: URL: https://github.com/apache/kafka/pull/13120#discussion_r1084469223 ## connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigOverridePolicy.java: ## @@ -23,25 +23,25 @@ import java.util.List; /** - * An interface for enforcing a policy on overriding of client configs via the connector configs. - * - * Common use cases are ability to provide principal per connector, sasl.jaas.config + * An interface for enforcing a policy on overriding of Kafka client configs via the connector configs. + * + * Common use cases are ability to provide principal per connector, sasl.jaas.config * and/or enforcing that the producer/consumer configurations for optimizations are within acceptable ranges. */ public interface ConnectorClientConfigOverridePolicy extends Configurable, AutoCloseable { /** - * Worker will invoke this while constructing the producer for the SourceConnectors, DLQ for SinkConnectors and the consumer for the - * SinkConnectors to validate if all of the overridden client configurations are allowed per the - * policy implementation. This would also be invoked during the validation of connector configs via the Rest API. - * + * Workers will invoke this while constructing producer for SourceConnectors, DLQs for SinkConnectors and + * consumers for SinkConnectors to validate if all of the overridden client configurations are allowed per the Review Comment: We probably don't need to enumerate all of the use-cases for kafka clients here, and can keep that scoped to the ConnectorClientConfigRequest javadoc. ```suggestion * Workers will invoke this before configuring per-connector Kafka admin, producer, and consumer * client instances to validate if all of the overridden client configurations are allowed per the ``` ## connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java: ## @@ -131,8 +131,8 @@ public void reconfigure(Map props) { /** * Validate the connector configuration values against configuration definitions. * @param connectorConfigs the provided configuration values - * @return List of Config, each Config contains the updated configuration information given - * the current configuration values. + * @return {@link Config}, essentially a list of {@link ConfigValue}s containing the updated configuration + * information given the current configuration values. Review Comment: ```suggestion * @return a parsed and validated {@link Config} containing any relevant validation errors with the raw * {@code connectorConfigs} which should prevent this configuration from being used. ``` ## connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java: ## @@ -88,15 +88,15 @@ public void initialize(SinkTaskContext context) { public abstract void start(Map props); /** - * Put the records in the sink. Usually this should send the records to the sink asynchronously - * and immediately return. - * + * Put the records in the sink. This should either write them to the downstream system or batch them for + * later writing Review Comment: I agree that it's not necessary for this javadoc to prescribe asynchronous behavior, but it should certainly point out the pitfall that asynchronous users need to take special care. ```suggestion * Put the records in the sink. If this method returns before the records are durably written, * the task must implement {@link #flush(Map)} or {@link #preCommit(Map)} to ensure that * only durably written record offsets are committed, and that no records are dropped during failures. ``` ## connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigRequest.java: ## @@ -44,25 +44,25 @@ public ConnectorClientConfigRequest( } /** - * Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SOURCE}. - * Provides Config with prefix {@code consumer.override.} for {@link ConnectorType#SINK}. - * Provides Config with prefix {@code producer.override.} for {@link ConnectorType#SINK} for DLQ. - * Provides Config with prefix {@code admin.override.} for {@link ConnectorType#SINK} for DLQ. + * Provides Config with prefix "{@code producer.override.}" for {@link ConnectorType#SOURCE}. + * Provides Config with prefix "{@code consumer.override.}" for {@link ConnectorType#SINK}. + * Provides Config with prefix "{@code producer.override.}" for {@link ConnectorType#SINK} for DLQ. + * Provides Config with prefix "{@code admin.override.}" for {@link ConnectorType#SINK} for DLQ. Review Comment: We should also mention some more recent usages of this that haven't been updated in this documentation: * Admin for fencing zomb