[GitHub] [kafka] C0urante opened a new pull request, #13137: KAFKA-15086: Intra-cluster communication for MirrorMaker 2

2023-01-20 Thread via GitHub


C0urante opened a new pull request, #13137:
URL: https://github.com/apache/kafka/pull/13137

   [Jira](https://issues.apache.org/jira/browse/KAFKA-10586)
   
   Implements the internal REST API changes described in 
[KIP-710](https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters).
   
   A lot of the diff here comes from shuffling existing parts of the code base 
into new parts, without significantly altering them. This is necessary in order 
to make the current REST API logic for Kafka Connect more extensible and 
reusable, which in turn allows us to add logic for the dedicated Mirror Maker 2 
REST API with less effort.
   
   Shuffled but not extensively rewritten portions include:
   - `WorkerConfig` properties being extracted into the new `RestServerConfig` 
class
   - Pulling out some REST forwarding logic previously internal to the 
`ConnectorsResource` class into the new `HerderRequestHandler` class
   - Pulling out all REST logic for internal endpoints from the 
`ConnectorsResource` class into the new `InternalClusterResource` class
   
   These changes should not affect the docs generated by, e.g., the 
`DistributedConfig` class, but should allow other `AbstractConfig` classes to 
easily add properties related to the (public- or internal-facing) Connect REST 
API.
   
   An integration test is also added that runs a multi-node, dedicated Mirror 
Maker 2 cluster with exactly-once support enabled in order to test out both 
code paths related to intra-cluster communication.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

2023-01-20 Thread via GitHub


C0urante commented on PR #13137:
URL: https://github.com/apache/kafka/pull/13137#issuecomment-1398797302

   @gharris1727 @viktorsomogyi @mimaison would you mind taking a look at this 
when you have a moment?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji opened a new pull request, #13138: MINOR: Small cleanups in refactored consumer implementation

2023-01-20 Thread via GitHub


hachikuji opened a new pull request, #13138:
URL: https://github.com/apache/kafka/pull/13138

   This patch contains a few cleanups in the new refactored consumer logic:
   
   - Use `CompletableFuture` instead of `RequestFuture` in 
`NetworkClientDelegate`. This is a much more extensible API and it avoids tying 
the new implementation to `ConsumerNetworkClient`.
   - Fix call to `isReady` in `NetworkClientDelegate`. We need the call to 
`ready` to initiate the connection.
   - Ensure backoff is enforced even after successful `FindCoordinator` 
request. This avoids a tight loop while metadata is converging after a 
coordinator change.
   - `RequestState` was incorrectly use the reconnect backoff as the retry 
backoff. In fact, we don't currently have a retry backoff max, so the use of 
`ExponentialBackoff` is unnecessary, but I've left it since we may add this in 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients.
 
   - Minor cleanups in test cases to avoid unused classes/fields.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13138: MINOR: Small cleanups in refactored consumer implementation

2023-01-20 Thread via GitHub


philipnee commented on code in PR #13138:
URL: https://github.com/apache/kafka/pull/13138#discussion_r1082970121


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -97,8 +97,8 @@ private void trySend(final long currentTimeMs) {
 unsent.timer.update(currentTimeMs);
 if (unsent.timer.isExpired()) {
 iterator.remove();
-unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
-"Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+unsent.callback.onFailure(new TimeoutException(
+"Failed to send request after " + unsent.timer.timeoutMs() 
+ " " + "ms."));

Review Comment:
   maybe 
   ` "Failed to send request after " + unsent.timer.timeoutMs() + " ms."`
   
   I'm not sure why I added an extra  `+ " "`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions

2023-01-20 Thread via GitHub


kirktrue commented on code in PR #12813:
URL: https://github.com/apache/kafka/pull/12813#discussion_r1082997576


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -641,7 +643,7 @@ private void completeBatch(ProducerBatch batch, 
ProduceResponse.PartitionRespons
 // thus it is not safe to reassign the sequence.
 failBatch(batch, response, batch.attempts() < this.retries);
 }
-if (error.exception() instanceof InvalidMetadataException) {
+if (error.exception() instanceof InvalidMetadataException || 
error.exception() instanceof TimeoutException) {

Review Comment:
   I believe it is necessary and added a comment to the code to explain why. 
LMK if that makes sense. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions

2023-01-20 Thread via GitHub


kirktrue commented on code in PR #12813:
URL: https://github.com/apache/kafka/pull/12813#discussion_r1082997268


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -552,9 +552,11 @@ private void handleProduceResponse(ClientResponse 
response, Map

[GitHub] [kafka] kirktrue commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions

2023-01-20 Thread via GitHub


kirktrue commented on code in PR #12813:
URL: https://github.com/apache/kafka/pull/12813#discussion_r1082998126


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -2519,7 +2519,7 @@ public void testInflightBatchesExpireOnDeliveryTimeout() 
throws InterruptedExcep
 
 Map responseMap = 
new HashMap<>();
 responseMap.put(tp0, new 
ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
-client.respond(new ProduceResponse(responseMap));
+client.respond(new ProduceResponse(responseMap), true, true);

Review Comment:
   Ah! I reverted that change and added a dedicated 
`testMetadataRefreshOnRequestTimeout` test method to `SenderTest`. Let me know 
if that new test is sufficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13138: MINOR: Small cleanups in refactored consumer implementation

2023-01-20 Thread via GitHub


hachikuji commented on code in PR #13138:
URL: https://github.com/apache/kafka/pull/13138#discussion_r1083069990


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -97,8 +97,8 @@ private void trySend(final long currentTimeMs) {
 unsent.timer.update(currentTimeMs);
 if (unsent.timer.isExpired()) {
 iterator.remove();
-unsent.callback.ifPresent(c -> c.onFailure(new 
TimeoutException(
-"Failed to send request after " + 
unsent.timer.timeoutMs() + " " + "ms.")));
+unsent.callback.onFailure(new TimeoutException(
+"Failed to send request after " + unsent.timer.timeoutMs() 
+ " " + "ms."));

Review Comment:
   Ack. Wil fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #13134: MINOR: fix flaky StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription

2023-01-20 Thread via GitHub


ableegoldman commented on PR #13134:
URL: https://github.com/apache/kafka/pull/13134#issuecomment-1398999167

   It's just timing out, there's no error beyond that


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #13134: MINOR: fix flaky StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription

2023-01-20 Thread via GitHub


ableegoldman merged PR #13134:
URL: https://github.com/apache/kafka/pull/13134


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #13134: MINOR: fix flaky StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription

2023-01-20 Thread via GitHub


ableegoldman commented on PR #13134:
URL: https://github.com/apache/kafka/pull/13134#issuecomment-1399019480

   Merged to trunk and cherrypicked to 3.4


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #13132: MINOR: fix warnings in Streams javadocs

2023-01-20 Thread via GitHub


ableegoldman merged PR #13132:
URL: https://github.com/apache/kafka/pull/13132


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee opened a new pull request, #13139: MINOR: Update outdate documentation for the SubscriptionState

2023-01-20 Thread via GitHub


philipnee opened a new pull request, #13139:
URL: https://github.com/apache/kafka/pull/13139

   The current documentation indicates two positions are tracked, but these 
positions were removed a few years ago. Now we use a single position to track 
the last consumed record. Updated the documentation to reflect to the current 
state.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji opened a new pull request, #13140: KAFKA-14644: Process should crash after failure in Raft IO thread

2023-01-20 Thread via GitHub


hachikuji opened a new pull request, #13140:
URL: https://github.com/apache/kafka/pull/13140

   Unexpected errors caught in the Raft IO thread should cause the process to 
stop. This is similar to the handling of exceptions in the controller.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #13139: MINOR: Update outdate documentation for the SubscriptionState

2023-01-20 Thread via GitHub


guozhangwang merged PR #13139:
URL: https://github.com/apache/kafka/pull/13139


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13140: KAFKA-14644: Process should crash after failure in Raft IO thread

2023-01-20 Thread via GitHub


cmccabe commented on code in PR #13140:
URL: https://github.com/apache/kafka/pull/13140#discussion_r1083120749


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -47,19 +47,27 @@ import org.apache.kafka.raft.RaftConfig.{AddressSpec, 
InetAddressSpec, NON_ROUTA
 import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, 
LeaderAndEpoch, RaftClient, RaftConfig, RaftRequest, ReplicatedLog}
 import org.apache.kafka.server.common.serialization.RecordSerde
 import org.apache.kafka.server.util.KafkaScheduler
+import org.apache.kafka.server.fault.FaultHandler
 
 import scala.jdk.CollectionConverters._
 
 object KafkaRaftManager {
   class RaftIoThread(
 client: KafkaRaftClient[_],
-threadNamePrefix: String
+threadNamePrefix: String,
+fatalFaultHandler: FaultHandler
   ) extends ShutdownableThread(
 name = threadNamePrefix + "-io-thread",
 isInterruptible = false
   ) {
 override def doWork(): Unit = {
-  client.poll()
+  try {
+client.poll()
+  } catch {
+case t: Throwable =>
+  fatalFaultHandler.handleFault("Unexpected error in raft IO thread", 
t)

Review Comment:
   suggest doing `throw fatalFaultHandler.handleFault(...)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13140: KAFKA-14644: Process should crash after failure in Raft IO thread

2023-01-20 Thread via GitHub


cmccabe commented on code in PR #13140:
URL: https://github.com/apache/kafka/pull/13140#discussion_r1083122187


##
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala:
##
@@ -221,16 +225,23 @@ class RaftManagerTest {
   @Test
   def testUncaughtExceptionInIoThread(): Unit = {
 val raftClient = mock(classOf[KafkaRaftClient[String]])
-val ioThread = new RaftIoThread(raftClient, threadNamePrefix = "test-raft")
+val faultHandler = mock(classOf[FaultHandler])

Review Comment:
   easier to just use `MockFaultHandler`
   
   you can then look at `MockFaultHandler.firstException` to at least see if it 
has the `Unexpected error in raft IO thread` text



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13138: MINOR: Small cleanups in refactored consumer implementation

2023-01-20 Thread via GitHub


guozhangwang commented on code in PR #13138:
URL: https://github.com/apache/kafka/pull/13138#discussion_r1083125061


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -149,11 +156,14 @@ private void onSuccessfulResponse(final 
FindCoordinatorResponseData.Coordinator
 coordinator.host(),
 coordinator.port());
 log.info("Discovered group coordinator {}", coordinator);
-coordinatorRequestState.reset();
+coordinatorRequestState.onSuccessfulAttempt(currentTimeMs);
 }
 
-private void onFailedCoordinatorResponse(final Exception exception, final 
long currentTimeMs) {
-coordinatorRequestState.updateLastFailedAttempt(currentTimeMs);
+private void onFailedResponse(
+final long currentTimeMs,
+final Throwable exception
+) {
+coordinatorRequestState.onFailedAttempt(currentTimeMs);

Review Comment:
   How about moving this to the end of the function as well, since 
`nonRetriableErrorHandler.handle(exception);` may take time?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -72,17 +73,16 @@ public NetworkClientDelegate(
  * @param currentTimeMs current time
  * @return a list of client response
  */
-public List poll(final long timeoutMs, final long 
currentTimeMs) {
+public void poll(final long timeoutMs, final long currentTimeMs) {
 trySend(currentTimeMs);
 
 long pollTimeoutMs = timeoutMs;
 if (!unsentRequests.isEmpty()) {
 pollTimeoutMs = Math.min(retryBackoffMs, pollTimeoutMs);
 }
-List res = this.client.poll(pollTimeoutMs, 
currentTimeMs);
+
+this.client.poll(pollTimeoutMs, currentTimeMs);
 checkDisconnects();
-wakeup();

Review Comment:
   We can also remove the function in 
https://github.com/apache/kafka/pull/13138/files#diff-a5be3b830b3475fd019bb6c218b7b81d5f5f25f77587f33e48c92c4dc4271ed5R168
 as well right?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##
@@ -72,17 +73,16 @@ public NetworkClientDelegate(
  * @param currentTimeMs current time
  * @return a list of client response
  */
-public List poll(final long timeoutMs, final long 
currentTimeMs) {
+public void poll(final long timeoutMs, final long currentTimeMs) {
 trySend(currentTimeMs);
 
 long pollTimeoutMs = timeoutMs;
 if (!unsentRequests.isEmpty()) {
 pollTimeoutMs = Math.min(retryBackoffMs, pollTimeoutMs);
 }
-List res = this.client.poll(pollTimeoutMs, 
currentTimeMs);
+
+this.client.poll(pollTimeoutMs, currentTimeMs);
 checkDisconnects();
-wakeup();

Review Comment:
   NVM, the client is not exposed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12813: KAFKA-14317: ProduceRequest timeouts are logged as network exceptions

2023-01-20 Thread via GitHub


guozhangwang commented on code in PR #12813:
URL: https://github.com/apache/kafka/pull/12813#discussion_r1083134655


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -549,7 +549,13 @@ private boolean awaitNodeReady(Node node, 
FindCoordinatorRequest.CoordinatorType
 private void handleProduceResponse(ClientResponse response, 
Map batches, long now) {
 RequestHeader requestHeader = response.requestHeader();
 int correlationId = requestHeader.correlationId();
-if (response.wasDisconnected()) {
+if (response.wasTimedOut()) {
+log.trace("Cancelled request with header {} due to node {} being 
disconnected due to timeout",

Review Comment:
   very nit: `due to the last request timed out`? just to distinguish from 
connection timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue closed pull request #12945: KAFKA-14365: Refactor Fetcher to allow different implementations

2023-01-20 Thread via GitHub


kirktrue closed pull request #12945: KAFKA-14365: Refactor Fetcher to allow 
different implementations
URL: https://github.com/apache/kafka/pull/12945


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13139: MINOR: Update outdate documentation for the SubscriptionState

2023-01-20 Thread via GitHub


philipnee commented on PR #13139:
URL: https://github.com/apache/kafka/pull/13139#issuecomment-1399093410

   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman opened a new pull request, #13141: MINOR: fix flaky integrations tests by using 60s default timeout for startup

2023-01-20 Thread via GitHub


ableegoldman opened a new pull request, #13141:
URL: https://github.com/apache/kafka/pull/13141

   The timeouts used for starting up Streams and waiting for the RUNNING state 
are all over the place across our integration tests, with some as low as 15s 
(which are unsurprisingly rather flaky). We use 60s as the default timeout for 
other APIs in the `IntegrationTestUtils` so we should do the same for 
`#startApplicationAndWaitUntilRunning`
   
   I also noticed that we have several versions of that exact API in 
`StreamsTestUtils`, so I migrated everyone over to the 
`IntegrationTestUtils#startApplicationAndWaitUntilRunning` and added a few 
overloads for ease of use, including one for single KafkaStreams apps and one 
for using the default timeout


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia opened a new pull request, #13142: KAFKA-14491: [2/N] Refactor RocksDB store open iterator management

2023-01-20 Thread via GitHub


vcrfxia opened a new pull request, #13142:
URL: https://github.com/apache/kafka/pull/13142

   This PR refactors how the list of open iterators for RocksDB stores is 
managed. Prior to this PR, the `openIterators` list was passed into the 
constructor for the iterators themselves, allowing `RocksDbIterator.close()` to 
remove the iterator from the `openIterators` list. After this PR, the iterators 
themselves will not know about lists of open iterators. Instead, a generic 
close callback is exposed, and it's the responsibility of the store that 
creates a new iterator to set the callback on the iterator, to ensure that 
closing an iterator removes the iterator from the list of open iterators.
   
   This refactor is desirable because it enables more flexible iterator 
lifecycle management. Building on top of this, RocksDBStore is updated with an 
option to allow the user (i.e., the caller of methods such as `range()` and 
`prefixScan()` which return iterators) to pass a custom `openIterators` list 
for the new iterator to be stored in. This will allow for a new Segments 
implementation where multiple Segments can share the same RocksDBStore 
instance, while having each Segment manage its own open iterators. (See 
 for more.)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia opened a new pull request, #13143: KAFKA-14491: [3/N] Add logical key value segments

2023-01-20 Thread via GitHub


vcrfxia opened a new pull request, #13143:
URL: https://github.com/apache/kafka/pull/13143

   (This PR is stacked on https://github.com/apache/kafka/pull/13142. The first 
commit does not need to be reviewed separately.)
   
   Today's KeyValueSegments create a new RocksDB instance for each 
KeyValueSegment. This PR introduces an analogous LogicalKeyValueSegments 
implementation, with corresponding LogicalKeyValueSegment, which shares a 
single physical RocksDB instance across all "logical" segments. This will be 
used for the RocksDB versioned store implementation proposed in 
[KIP-889](https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13138: MINOR: Small cleanups in refactored consumer implementation

2023-01-20 Thread via GitHub


hachikuji commented on code in PR #13138:
URL: https://github.com/apache/kafka/pull/13138#discussion_r1083199766


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -149,11 +156,14 @@ private void onSuccessfulResponse(final 
FindCoordinatorResponseData.Coordinator
 coordinator.host(),
 coordinator.port());
 log.info("Discovered group coordinator {}", coordinator);
-coordinatorRequestState.reset();
+coordinatorRequestState.onSuccessfulAttempt(currentTimeMs);
 }
 
-private void onFailedCoordinatorResponse(final Exception exception, final 
long currentTimeMs) {
-coordinatorRequestState.updateLastFailedAttempt(currentTimeMs);
+private void onFailedResponse(
+final long currentTimeMs,
+final Throwable exception
+) {
+coordinatorRequestState.onFailedAttempt(currentTimeMs);

Review Comment:
   Hmm, I think it seems ok to log the time of the failed response. Any time 
spent in `nonRetriableErrorHandler.handle(exception)` can count toward the 
backoff. Does that make sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-01-20 Thread via GitHub


mjsax commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1083223501


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * 
+ *  +  + , 
reverse-sorted by timestamp> + 
+ * 
+ * Negative {@code value_size} is used to indicate that the value stored is a 
tombstone, in order to
+ * distinguish from empty array which has {@code value_size} of zero. In 
practice, {@code value_size}
+ * is always set to -1 for the tombstone case, though this need not be true in 
general.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+
+/**
+ * @return the validTo timestamp of the latest record in the provided 
segment
+ */
+static long getNextTimestamp(final byte[] segmentValue) {
+return ByteBuffer.wrap(segmentValue).getLong(0);
+}
+
+/**
+ * Returns whether the provided segment is "empty." An empty segment is 
one that
+ * contains only a single tombstone with no validTo timestamp specified. 
In this case,
+ * the serialized segment contains only the timestamp of the tombstone 
(stored as the segment's
+ * {@code nextTimestamp}) and nothing else.
+ * 
+ * This can happen if, e.g., the only record inserted for a particular key 
is
+ * a tombstone. In this case, the tombstone must be stored in a segment
+ * (as the latest value store does not store tombstones), but also has no 
validTo
+ * timestamp associated with it.
+ *
+ * @return whether the segment is "empty"
+ */
+static boolean isEmpty(final byte[] segmentValue) {
+return segmentValue.length <= TIMESTAMP_SIZE;
+}
+
+/**
+ * Requires that the segment is not empty. Caller is responsible for 
verifying that this
+ * is the case before calling this method.
+ *
+ * @return the timestamp of the earliest record in the provided segment.
+ */
+static long getMinTimestamp(final byte[] segmentValue) {
+return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
+}
+
+/**
+ * @return the deserialized segment value
+ */
+static SegmentValue deserialize(final byte[] segmentValue) {
+return new PartiallyDeserializedSegmentValue(segmentValue);
+}
+
+/**
+ * Creates a new segment value that contains the provided record.
+ *
+ * @param value the record value
+ * @param validFrom the record's timestamp
+ * @param validTo the record's validTo timestamp
+ * @return the newly created segment value
+ */
+static SegmentValue newSegmentValueWithRecord(
+final byte[] value, final long validFrom, final long validTo) {
+return new PartiallyDeserializedSegmentValue(value, validFrom, 
validTo);
+}
+
+/**
+ * Creates a new empty segment value.
+ *
+ * @param timestamp the timestamp of the tombstone for this empty segment 
value
+ * @return the newly created segment value
+ */
+static SegmentValue newSegmentValueWithTombstone(final long timestamp) {
+return new PartiallyDeserializedSegmentValue(timestamp);
+}
+
+interface SegmentValue {
+
+/**
+ * @return whether the segment is empty. See
+ * {@link RocksDBVersionedStoreSegmentValueFormatter#isEmpty(byte[])} 
for details.
+ */
+boolean isEmpty();
+
+/**
+ * Finds the latest record in this segment with timestamp not 
exceeding the provided
+ * timestamp bound. This method requires that the provided timestamp 
bound exists in
+ * this segment, i.e., the segment is not empty, and the provided 
timestamp bound is
+ * at least minTimestamp and is smaller than nextTimestamp.
+   

[GitHub] [kafka] mjsax commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-01-20 Thread via GitHub


mjsax commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1083224137


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * 
+ *  +  + , 
reverse-sorted by timestamp> + 
+ * 
+ * Negative {@code value_size} is used to indicate that the value stored is a 
tombstone, in order to
+ * distinguish from empty array which has {@code value_size} of zero. In 
practice, {@code value_size}
+ * is always set to -1 for the tombstone case, though this need not be true in 
general.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+
+/**
+ * @return the validTo timestamp of the latest record in the provided 
segment
+ */
+static long getNextTimestamp(final byte[] segmentValue) {
+return ByteBuffer.wrap(segmentValue).getLong(0);
+}
+
+/**
+ * Returns whether the provided segment is "empty." An empty segment is 
one that
+ * contains only a single tombstone with no validTo timestamp specified. 
In this case,
+ * the serialized segment contains only the timestamp of the tombstone 
(stored as the segment's
+ * {@code nextTimestamp}) and nothing else.
+ * 
+ * This can happen if, e.g., the only record inserted for a particular key 
is
+ * a tombstone. In this case, the tombstone must be stored in a segment
+ * (as the latest value store does not store tombstones), but also has no 
validTo
+ * timestamp associated with it.
+ *
+ * @return whether the segment is "empty"
+ */
+static boolean isEmpty(final byte[] segmentValue) {
+return segmentValue.length <= TIMESTAMP_SIZE;
+}
+
+/**
+ * Requires that the segment is not empty. Caller is responsible for 
verifying that this
+ * is the case before calling this method.
+ *
+ * @return the timestamp of the earliest record in the provided segment.
+ */
+static long getMinTimestamp(final byte[] segmentValue) {
+return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
+}
+
+/**
+ * @return the deserialized segment value
+ */
+static SegmentValue deserialize(final byte[] segmentValue) {
+return new PartiallyDeserializedSegmentValue(segmentValue);
+}
+
+/**
+ * Creates a new segment value that contains the provided record.
+ *
+ * @param value the record value
+ * @param validFrom the record's timestamp
+ * @param validTo the record's validTo timestamp
+ * @return the newly created segment value
+ */
+static SegmentValue newSegmentValueWithRecord(
+final byte[] value, final long validFrom, final long validTo) {
+return new PartiallyDeserializedSegmentValue(value, validFrom, 
validTo);
+}
+
+/**
+ * Creates a new empty segment value.
+ *
+ * @param timestamp the timestamp of the tombstone for this empty segment 
value
+ * @return the newly created segment value
+ */
+static SegmentValue newSegmentValueWithTombstone(final long timestamp) {
+return new PartiallyDeserializedSegmentValue(timestamp);
+}
+
+interface SegmentValue {
+
+/**
+ * @return whether the segment is empty. See
+ * {@link RocksDBVersionedStoreSegmentValueFormatter#isEmpty(byte[])} 
for details.
+ */
+boolean isEmpty();
+
+/**
+ * Finds the latest record in this segment with timestamp not 
exceeding the provided
+ * timestamp bound. This method requires that the provided timestamp 
bound exists in
+ * this segment, i.e., the segment is not empty, and the provided 
timestamp bound is
+ * at least minTimestamp and is smaller than nextTimestamp.
+   

[GitHub] [kafka] mjsax commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-01-20 Thread via GitHub


mjsax commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1081924210


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * 
+ *  +  + , 
reverse-sorted by timestamp> + 
+ * 
+ * Negative {@code value_size} is used to indicate that the value stored is a 
tombstone, in order to
+ * distinguish from empty array which has {@code value_size} of zero. In 
practice, {@code value_size}
+ * is always set to -1 for the tombstone case, though this need not be true in 
general.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+
+/**
+ * @return the validTo timestamp of the latest record in the provided 
segment
+ */
+static long getNextTimestamp(final byte[] segmentValue) {
+return ByteBuffer.wrap(segmentValue).getLong(0);
+}
+
+/**
+ * Returns whether the provided segment is "empty." An empty segment is 
one that
+ * contains only a single tombstone with no validTo timestamp specified. 
In this case,
+ * the serialized segment contains only the timestamp of the tombstone 
(stored as the segment's
+ * {@code nextTimestamp}) and nothing else.
+ * 
+ * This can happen if, e.g., the only record inserted for a particular key 
is
+ * a tombstone. In this case, the tombstone must be stored in a segment
+ * (as the latest value store does not store tombstones), but also has no 
validTo
+ * timestamp associated with it.
+ *
+ * @return whether the segment is "empty"
+ */
+static boolean isEmpty(final byte[] segmentValue) {
+return segmentValue.length <= TIMESTAMP_SIZE;
+}
+
+/**
+ * Requires that the segment is not empty. Caller is responsible for 
verifying that this
+ * is the case before calling this method.
+ *
+ * @return the timestamp of the earliest record in the provided segment.
+ */
+static long getMinTimestamp(final byte[] segmentValue) {
+return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
+}
+
+/**
+ * @return the deserialized segment value
+ */
+static SegmentValue deserialize(final byte[] segmentValue) {
+return new PartiallyDeserializedSegmentValue(segmentValue);
+}
+
+/**
+ * Creates a new segment value that contains the provided record.
+ *
+ * @param value the record value
+ * @param validFrom the record's timestamp
+ * @param validTo the record's validTo timestamp
+ * @return the newly created segment value
+ */
+static SegmentValue newSegmentValueWithRecord(
+final byte[] value, final long validFrom, final long validTo) {
+return new PartiallyDeserializedSegmentValue(value, validFrom, 
validTo);
+}
+
+/**
+ * Creates a new empty segment value.
+ *
+ * @param timestamp the timestamp of the tombstone for this empty segment 
value
+ * @return the newly created segment value
+ */
+static SegmentValue newSegmentValueWithTombstone(final long timestamp) {
+return new PartiallyDeserializedSegmentValue(timestamp);
+}
+
+interface SegmentValue {
+
+/**
+ * @return whether the segment is empty. See
+ * {@link RocksDBVersionedStoreSegmentValueFormatter#isEmpty(byte[])} 
for details.
+ */
+boolean isEmpty();
+
+/**
+ * Finds the latest record in this segment with timestamp not 
exceeding the provided
+ * timestamp bound. This method requires that the provided timestamp 
bound exists in
+ * this segment, i.e., the segment is not empty, and the provided 
timestamp bound is
+ * at least minTimestamp and is smaller than nextTimestamp.
+   

[GitHub] [kafka] mjsax commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-01-20 Thread via GitHub


mjsax commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1083231834


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * 
+ *  +  + , 
reverse-sorted by timestamp> + 
+ * 
+ * Negative {@code value_size} is used to indicate that the value stored is a 
tombstone, in order to
+ * distinguish from empty array which has {@code value_size} of zero. In 
practice, {@code value_size}
+ * is always set to -1 for the tombstone case, though this need not be true in 
general.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+
+/**
+ * @return the validTo timestamp of the latest record in the provided 
segment
+ */
+static long getNextTimestamp(final byte[] segmentValue) {
+return ByteBuffer.wrap(segmentValue).getLong(0);
+}
+
+/**
+ * Returns whether the provided segment is "empty." An empty segment is 
one that
+ * contains only a single tombstone with no validTo timestamp specified. 
In this case,
+ * the serialized segment contains only the timestamp of the tombstone 
(stored as the segment's
+ * {@code nextTimestamp}) and nothing else.
+ * 
+ * This can happen if, e.g., the only record inserted for a particular key 
is
+ * a tombstone. In this case, the tombstone must be stored in a segment
+ * (as the latest value store does not store tombstones), but also has no 
validTo
+ * timestamp associated with it.

Review Comment:
   Thanks for the explanation.
   
   It seems to be "weird" edge case, because `nextTimestamp` is usually the 
"largest validTo" of the segment, but we not store the "validFrom" of the 
tombstone in it.
   
   In general, I prefer to have some "invariant" as it makes it simpler to 
reason about the code, but introducing this edge case void the invariant that 
`nextTimestamp` is the "largest validTo" of the segment.
   
   Maybe it's ok to go with the current code, but it seems we need to clearly 
document this in the top level JavaDoc -- otherwise it might be very hard to 
understand the code because a empty segment is encoded differently to a 
non-empty segment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yufeiyan1220 commented on pull request #13125: KAFKA-14626: Kafka Consumer Coordinator does not cleanup all metrics after shutdown

2023-01-21 Thread via GitHub


yufeiyan1220 commented on PR #13125:
URL: https://github.com/apache/kafka/pull/13125#issuecomment-1399206848

   > Seems like we aren't particularly consistent at removing these metrics and 
sensors, fetcher would be another example. Mind making the clean up more 
comprehensive?
   
   I have made the change that is ensure all metrics about consumer are removed 
after shutting down. When it comes to producer, I found that all producer 
metrics are already closed, so I don't need to add more stuff for producer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov opened a new pull request, #13144: KAFKA-14463 Invoke of ConnectorClientConfigOverridePolicy#close

2023-01-21 Thread via GitHub


nizhikov opened a new pull request, #13144:
URL: https://github.com/apache/kafka/pull/13144

   `ConnectorClientConfigOverridePolicy` implements `AutoCloseable` but close 
method not called.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-21 Thread via GitHub


vamossagar12 commented on code in PR #13131:
URL: https://github.com/apache/kafka/pull/13131#discussion_r1083271427


##
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##
@@ -771,8 +772,8 @@ object ConfigCommand extends Logging {
   .withRequiredArg
   .describedAs("command config property file")
   .ofType(classOf[String])
-val alterOpt = parser.accepts("alter", "Alter the configuration for the 
entity.")
-val describeOpt = parser.accepts("describe", "List configs for the given 
entity.")
+val alterOpt: OptionSpec[String] = parser.accepts("alter", "Alter the 
configuration for the entity.").withOptionalArg()
+val describeOpt: OptionSpec[String] = parser.accepts("describe", "List 
configs for the given entity.").withOptionalArg()

Review Comment:
   The addition of `OptionSpec` and `withOptionalArg` is not necessary strictly 
right?



##
server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Helper functions for dealing with command line utilities.
+ */
+public class CommandLineUtils {
+/**
+ * Check if there are no options or `--help` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) 
{
+return commandOpts.args.length == 0 || 
commandOpts.options.has(commandOpts.helpOpt);
+}
+
+/**
+ * Check if there is `--version` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintVersionNeeded(CommandDefaultOptions 
commandOpts) {
+return commandOpts.options.has(commandOpts.versionOpt);
+}
+
+/**
+ * Check and print help message if there is no options or `--help` option
+ * from command line, if `--version` is specified on the command line
+ * print version information and exit.
+ * NOTE: The function name is not strictly speaking correct anymore
+ * as it also checks whether the version needs to be printed, but
+ * refactoring this would have meant changing all command line tools
+ * and unnecessarily increased the blast radius of this change.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @param message Message to display on successful check
+ */
+public static void printHelpAndExitIfNeeded(CommandDefaultOptions 
commandOpts, String message) {
+if (isPrintHelpNeeded(commandOpts)) {
+printUsageAndDie(commandOpts.parser, message);
+}
+if (isPrintVersionNeeded(commandOpts)) {
+printVersionAndDie();
+}
+}
+
+/**
+ * Check that all the listed options are present.
+ */
+public static void checkRequiredArgs(OptionParser parser, OptionSet 
options, OptionSpec... requiredList) {
+for (OptionSpec arg : requiredList) {
+if (!options.has(arg)) {
+printUsageAndDie(parser, String.format("Missing required 
argument \"%s\"", arg));
+}
+}
+}
+
+/**
+ * Check that none of the listed options are present.
+ */
+public static void checkInvalidArgs(OptionParser parser,
+OptionSet options,
+OptionSpec usedOption,
+OptionSpec... invalidOptions) {
+if (options.has(usedOption)) {
+for (OptionSpec arg : invalidOptions) {
+if (options.has(arg)) {
+printUsageAndDie(parser, 

[GitHub] [kafka] vamossagar12 commented on pull request #11592: KAFKA-13501: Avoid state restore via rebalance if standbys are enabled

2023-01-21 Thread via GitHub


vamossagar12 commented on PR #11592:
URL: https://github.com/apache/kafka/pull/11592#issuecomment-1399237152

   @mjsax , This is a very old PR of mine which didn't get merged. On the 
ticket I see `new-streams-runtime-should-fix` added as a label. Is this fix 
needed anymore or should I close it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 closed pull request #9756: KAFKA-10652: Adding size based linger semnatics to Raft metadata

2023-01-21 Thread via GitHub


vamossagar12 closed pull request #9756: KAFKA-10652: Adding size based linger 
semnatics to Raft metadata
URL: https://github.com/apache/kafka/pull/9756


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13121: MINOR: Remove some connect tests from Java 17 block list

2023-01-21 Thread via GitHub


ijuma commented on PR #13121:
URL: https://github.com/apache/kafka/pull/13121#issuecomment-1399275209

   The JDK 17 failed tests are not one of the ones I unblocked in this PR:
   
   >  Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.[1] true
  10 min  1
   >  Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.[2] false   
  1 min 11 sec1
   >  Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.[2] false


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #13121: MINOR: Remove some connect tests from Java 17 block list

2023-01-21 Thread via GitHub


ijuma merged PR #13121:
URL: https://github.com/apache/kafka/pull/13121


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-21 Thread via GitHub


fvaleri commented on code in PR #13131:
URL: https://github.com/apache/kafka/pull/13131#discussion_r1083318714


##
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##
@@ -771,8 +772,8 @@ object ConfigCommand extends Logging {
   .withRequiredArg
   .describedAs("command config property file")
   .ofType(classOf[String])
-val alterOpt = parser.accepts("alter", "Alter the configuration for the 
entity.")
-val describeOpt = parser.accepts("describe", "List configs for the given 
entity.")
+val alterOpt: OptionSpec[String] = parser.accepts("alter", "Alter the 
configuration for the entity.").withOptionalArg()
+val describeOpt: OptionSpec[String] = parser.accepts("describe", "List 
configs for the given entity.").withOptionalArg()

Review Comment:
   Correct, it is a left over from a previous attempt. Reverting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-21 Thread via GitHub


fvaleri commented on PR #13131:
URL: https://github.com/apache/kafka/pull/13131#issuecomment-1399304541

   @clolov @vamossagar12 I took your suggestions. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-21 Thread via GitHub


fvaleri commented on code in PR #13131:
URL: https://github.com/apache/kafka/pull/13131#discussion_r1083319516


##
server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Helper functions for dealing with command line utilities.
+ */
+public class CommandLineUtils {
+/**
+ * Check if there are no options or `--help` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) 
{
+return commandOpts.args.length == 0 || 
commandOpts.options.has(commandOpts.helpOpt);
+}
+
+/**
+ * Check if there is `--version` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintVersionNeeded(CommandDefaultOptions 
commandOpts) {
+return commandOpts.options.has(commandOpts.versionOpt);
+}
+
+/**
+ * Check and print help message if there is no options or `--help` option
+ * from command line, if `--version` is specified on the command line
+ * print version information and exit.
+ * NOTE: The function name is not strictly speaking correct anymore
+ * as it also checks whether the version needs to be printed, but
+ * refactoring this would have meant changing all command line tools
+ * and unnecessarily increased the blast radius of this change.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @param message Message to display on successful check
+ */
+public static void printHelpAndExitIfNeeded(CommandDefaultOptions 
commandOpts, String message) {
+if (isPrintHelpNeeded(commandOpts)) {
+printUsageAndDie(commandOpts.parser, message);
+}
+if (isPrintVersionNeeded(commandOpts)) {
+printVersionAndDie();
+}
+}
+
+/**
+ * Check that all the listed options are present.
+ */
+public static void checkRequiredArgs(OptionParser parser, OptionSet 
options, OptionSpec... requiredList) {
+for (OptionSpec arg : requiredList) {
+if (!options.has(arg)) {
+printUsageAndDie(parser, String.format("Missing required 
argument \"%s\"", arg));
+}
+}
+}
+
+/**
+ * Check that none of the listed options are present.
+ */
+public static void checkInvalidArgs(OptionParser parser,
+OptionSet options,
+OptionSpec usedOption,
+OptionSpec... invalidOptions) {
+if (options.has(usedOption)) {
+for (OptionSpec arg : invalidOptions) {
+if (options.has(arg)) {
+printUsageAndDie(parser, String.format("Option \"%s\" 
can't be used with option \"%s\"", usedOption, arg));
+}
+}
+}
+}
+
+/**
+ * Check that none of the listed options are present.
+ */
+public static void checkInvalidArgs(OptionParser parser,
+OptionSet options,
+OptionSpec usedOption,
+Set> invalidOptions) {
+OptionSpec[] array = new OptionSpec[invalidOptions.size()];
+invalidOptions.toArray(array);
+checkInvalidArgs(parser, options, usedOption, array);
+}
+
+/**
+ * Check that none of the listed options are present with the combination 
of used options.
+ */
+public static void 

[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-22 Thread via GitHub


vamossagar12 commented on code in PR #13131:
URL: https://github.com/apache/kafka/pull/13131#discussion_r1083413458


##
core/src/main/scala/kafka/utils/ToolsUtils.scala:
##
@@ -64,4 +65,18 @@ object ToolsUtils {
 println(s"%-${maxLengthOfDisplayName}s : 
$specifier".format(metricName, value))
 }
   }
+
+  /**
+   * This is a simple wrapper around `CommandLineUtils.printUsageAndDie`.
+   * It is needed for tools migration (KAFKA-14525), as there is no Java 
equivalent for return type `Nothing`.
+   * Can be removed once [[kafka.admin.ConsumerGroupCommand]], 
[[kafka.tools.ConsoleConsumer]]
+   * and [[kafka.tools.ConsoleProducer]] are migrated.
+   *
+   * @param parser Command line options parser.
+   * @param message Error message.
+   */
+  def printUsageAndDie(parser: OptionParser, message: String): Nothing = {

Review Comment:
   I am thinking if we can add another parameter to this to say if we want to 
do an Exit(1) from this method or not. I see some places where Exit(1) is not 
being invoked from the scala layer. 



##
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##
@@ -801,15 +801,21 @@ object ConsumerGroupCommand extends Logging {
 partitionsToReset.map { topicPartition =>
   logStartOffsets.get(topicPartition) match {
 case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, 
new OffsetAndMetadata(offset))
-case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error 
getting starting offset of topic partition: $topicPartition")
+case _ => {
+  CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting 
starting offset of topic partition: $topicPartition")
+  Exit.exit(1)

Review Comment:
   Thanks @fvaleri . Wondering if these and other similar lines can be changed 
to use ToolsUtils. printUsageAndDie()? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-22 Thread via GitHub


vamossagar12 commented on code in PR #13131:
URL: https://github.com/apache/kafka/pull/13131#discussion_r1083413458


##
core/src/main/scala/kafka/utils/ToolsUtils.scala:
##
@@ -64,4 +65,18 @@ object ToolsUtils {
 println(s"%-${maxLengthOfDisplayName}s : 
$specifier".format(metricName, value))
 }
   }
+
+  /**
+   * This is a simple wrapper around `CommandLineUtils.printUsageAndDie`.
+   * It is needed for tools migration (KAFKA-14525), as there is no Java 
equivalent for return type `Nothing`.
+   * Can be removed once [[kafka.admin.ConsumerGroupCommand]], 
[[kafka.tools.ConsoleConsumer]]
+   * and [[kafka.tools.ConsoleProducer]] are migrated.
+   *
+   * @param parser Command line options parser.
+   * @param message Error message.
+   */
+  def printUsageAndDie(parser: OptionParser, message: String): Nothing = {

Review Comment:
   I am thinking if we can add another parameter to this to say if we want to 
do an Exit(1) from this method or not. I see some places where Exit(1) is not 
being invoked from the the tools classes. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-22 Thread via GitHub


vamossagar12 commented on PR #13131:
URL: https://github.com/apache/kafka/pull/13131#issuecomment-1399432431

   > @clolov @vamossagar12 took your suggestions. Thanks.
   
   Thanks @fvaleri ! Couple of minor comments. This should be good to go after 
that!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

2023-01-22 Thread via GitHub


vamossagar12 commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1399432804

   hwy @C0urante , I was thinking should the exponential backoff thing that we 
have introduced as part of this PR should go somewhere in the docs? I am saying 
this since this is a deviation from how things worked prior to this. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison opened a new pull request, #13145: MINOR: Remove duplicate empty string check

2023-01-22 Thread via GitHub


mimaison opened a new pull request, #13145:
URL: https://github.com/apache/kafka/pull/13145

   `Values.parseString()` handles empty strings and returns the same 
`SchemaAndValue` 
[[0]](https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L383-L389)
 so we don't need to do the check in `toConnectHeader()`. 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13094: MINOR: Various cleanups in client tests

2023-01-22 Thread via GitHub


mimaison commented on code in PR #13094:
URL: https://github.com/apache/kafka/pull/13094#discussion_r1083436604


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -331,17 +331,15 @@ public void testStressfulSituation() throws Exception {
 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, 
CompressionType.NONE, 0);
 List threads = new ArrayList<>();
 for (int i = 0; i < numThreads; i++) {
-threads.add(new Thread() {
-public void run() {
-for (int i = 0; i < msgs; i++) {
-try {
-accum.append(topic, i % numParts, 0L, key, value, 
Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(), 
cluster);
-} catch (Exception e) {
-e.printStackTrace();
-}
+threads.add(new Thread(() -> {
+for (int i1 = 0; i1 < msgs; i1++) {

Review Comment:
   Good spot! I had not realized IntelliJ had automatically named it `i1` which 
is a bit strange. I renamed it to `j`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #13080: KAFKA-14575: Move ClusterTool to tools module

2023-01-22 Thread via GitHub


mimaison merged PR #13080:
URL: https://github.com/apache/kafka/pull/13080


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hgeraldino opened a new pull request, #13146: KIP-991 Add deletedConnector flag when stopping tasks

2023-01-22 Thread via GitHub


hgeraldino opened a new pull request, #13146:
URL: https://github.com/apache/kafka/pull/13146

   Companion PR for [KIP-901: Add flag connectorDeleted flag when stopping 
task](https://cwiki.apache.org/confluence/display/KAFKA/KIP-901%3A+Add+flag+connectorDeleted+flag+when+stopping+task)
   This PR:
   
   * Introduces a new `default void stop(boolean deletedConnector)` method to 
the `connect.connector.Task` interface, to indicate that a task is being 
stopped due to the connector being deleted.
   * Patches the existing unit test, and introduces a new test case to validate 
the new logic
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #13141: MINOR: fix flaky integrations tests by using 60s default timeout for startup

2023-01-22 Thread via GitHub


ableegoldman commented on PR #13141:
URL: https://github.com/apache/kafka/pull/13141#issuecomment-1399645849

   test failures are unrelated, will merge to trunk and cherrypick to 3.4 to 
help stabilize the release


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #13141: MINOR: fix flaky integrations tests by using 60s default timeout for startup

2023-01-22 Thread via GitHub


ableegoldman merged PR #13141:
URL: https://github.com/apache/kafka/pull/13141


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] sgn2607 commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2023-01-22 Thread via GitHub


sgn2607 commented on code in PR #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r1083630069


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -169,6 +172,7 @@ public String version() {
 return listConsumerGroupOffsets(group).entrySet().stream()
 .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
 .map(x -> checkpoint(group, x.getKey(), x.getValue()))
+.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do 
not emit checkpoints for partitions that don't have offset-syncs

Review Comment:
   @dadufour I'm facing similar issue w.r.to the consumer group offset handling 
during the failback and thank you for lifting it up in the Strimzi forum that 
help me land up in this thread.  @mimaison I thought shall check if you have 
any idea if there is a KIP to handle this behavior in MM2 or is it already 
fixed? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] sgn2607 commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2023-01-22 Thread via GitHub


sgn2607 commented on code in PR #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r1083630069


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -169,6 +172,7 @@ public String version() {
 return listConsumerGroupOffsets(group).entrySet().stream()
 .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
 .map(x -> checkpoint(group, x.getKey(), x.getValue()))
+.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do 
not emit checkpoints for partitions that don't have offset-syncs

Review Comment:
   @dadufour I'm facing similar issue w.r.to the consumer group offset handling 
during the failback and thank you for lifting it up in the Strimzi forum that 
help me land up in this thread.  @mimaison I thought I shall check if you have 
any idea if there is a KIP to handle this behavior in MM2 or is it already 
fixed? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] sgn2607 commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2023-01-22 Thread via GitHub


sgn2607 commented on code in PR #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r1083630069


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -169,6 +172,7 @@ public String version() {
 return listConsumerGroupOffsets(group).entrySet().stream()
 .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
 .map(x -> checkpoint(group, x.getKey(), x.getValue()))
+.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do 
not emit checkpoints for partitions that don't have offset-syncs

Review Comment:
   @dadufour I'm facing similar issue w.r.to the consumer group offset handling 
during the failback and thank you for lifting it up in the Strimzi forum that 
help me land up in this thread.  @mimaison I thought I shall check if there is 
a KIP to handle this behavior in MM2 or is it already fixed? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abbccdda closed pull request #8725: KAFKA-9608: Transaction Event Simulation Test

2023-01-22 Thread via GitHub


abbccdda closed pull request #8725: KAFKA-9608: Transaction Event Simulation 
Test
URL: https://github.com/apache/kafka/pull/8725


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-22 Thread via GitHub


fvaleri commented on code in PR #13131:
URL: https://github.com/apache/kafka/pull/13131#discussion_r1083709872


##
core/src/main/scala/kafka/utils/ToolsUtils.scala:
##
@@ -64,4 +65,18 @@ object ToolsUtils {
 println(s"%-${maxLengthOfDisplayName}s : 
$specifier".format(metricName, value))
 }
   }
+
+  /**
+   * This is a simple wrapper around `CommandLineUtils.printUsageAndDie`.
+   * It is needed for tools migration (KAFKA-14525), as there is no Java 
equivalent for return type `Nothing`.
+   * Can be removed once [[kafka.admin.ConsumerGroupCommand]], 
[[kafka.tools.ConsoleConsumer]]
+   * and [[kafka.tools.ConsoleProducer]] are migrated.
+   *
+   * @param parser Command line options parser.
+   * @param message Error message.
+   */
+  def printUsageAndDie(parser: OptionParser, message: String): Nothing = {

Review Comment:
   In the original code we always call exit, as the method name implies, so I 
think it's fine.



##
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##
@@ -801,15 +801,21 @@ object ConsumerGroupCommand extends Logging {
 partitionsToReset.map { topicPartition =>
   logStartOffsets.get(topicPartition) match {
 case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, 
new OffsetAndMetadata(offset))
-case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error 
getting starting offset of topic partition: $topicPartition")
+case _ => {
+  CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting 
starting offset of topic partition: $topicPartition")
+  Exit.exit(1)

Review Comment:
   Yes, there were some some replacements that I missed. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #12818: KAFKA-14133: Replace EasyMock with Mockito in streams tests

2023-01-22 Thread via GitHub


clolov commented on PR #12818:
URL: https://github.com/apache/kafka/pull/12818#issuecomment-1399926748

   Thank you very much for the review and merge!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13127: Kafka 14586: Moving StreamResetter to tools

2023-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13127:
URL: https://github.com/apache/kafka/pull/13127#discussion_r1083769752


##
build.gradle:
##
@@ -1757,6 +1757,7 @@ project(':tools') {
   archivesBaseName = "kafka-tools"
 
   dependencies {
+implementation project(':core')

Review Comment:
   btw, @fvaleri , unrelated to this PR, but there seems to another core 
dependency from `ReplicaVerificationTools`. Plz check my comment on 
https://issues.apache.org/jira/browse/KAFKA-14583. I can file an issue and 
migrate that one as well if you can also confirm the same. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-23 Thread via GitHub


vamossagar12 commented on PR #13131:
URL: https://github.com/apache/kafka/pull/13131#issuecomment-1399976298

   Thanks @fvaleri . LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #13147: MINOR: temporarily disable the 'false' parameter of SmokeTestDriverIntegrationTest

2023-01-23 Thread via GitHub


ableegoldman commented on PR #13147:
URL: https://github.com/apache/kafka/pull/13147#issuecomment-1400043590

   cc @lucasbru @guozhangwang @mjsax 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman opened a new pull request, #13147: MINOR: temporarily disable the 'false' parameter of SmokeTestDriverIntegrationTest

2023-01-23 Thread via GitHub


ableegoldman opened a new pull request, #13147:
URL: https://github.com/apache/kafka/pull/13147

   Need to get a clean build for 3.4 and this test has been extremely flaky. 
I'm looking into the failure as well, and want to pinpoint whether it's the 
`true` build that's broken or it's the parameterization itself causing this -- 
thus, let's start by temporarily disabling the `false` parameter first.
   
   See [this 
comment/KAFKA-14533](https://issues.apache.org/jira/browse/KAFKA-14533?focusedCommentId=17679729&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17679729)
 for more details
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-23 Thread via GitHub


divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1083847838


##
clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java:
##
@@ -26,21 +26,25 @@
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 
-import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 public class ZstdFactory {
+/**
+ * Default compression level
+ */
+private static final int DEFAULT_COMPRESSION_LEVEL = 3;

Review Comment:
   Removed this change from this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-23 Thread via GitHub


divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1083831592


##
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java:
##
@@ -273,20 +272,32 @@ public int partitionLeaderEpoch() {
 public DataInputStream recordInputStream(BufferSupplier bufferSupplier) {
 final ByteBuffer buffer = this.buffer.duplicate();
 buffer.position(RECORDS_OFFSET);
-return new DataInputStream(compressionType().wrapForInput(buffer, 
magic(), bufferSupplier));
+final InputStream decompressedStream = 
compressionType().wrapForInput(buffer, magic(), bufferSupplier);
+return decompressedStream instanceof DataInputStream ? 
(DataInputStream) decompressedStream : new DataInputStream(decompressedStream);
 }
 
 private CloseableIterator compressedIterator(BufferSupplier 
bufferSupplier, boolean skipKeyValue) {
 final DataInputStream inputStream = recordInputStream(bufferSupplier);
 
 if (skipKeyValue) {
 // this buffer is used to skip length delimited fields like key, 
value, headers
-byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];
+final ByteBuffer skipBuffer = 
bufferSupplier.get(compressionType().getRecommendedDOutSize());

Review Comment:
   I have mistakenly added that I have changed it to the description which I 
will fix.
   
   I would like to make this (skipping) a separate code change because I think 
that it is a performance regression since it has an extra buffer allocation.
   
   This is because zstd-jni implementation of skip() (de)allocates a "skip" 
buffer (from the buffer pool) for skipping [1]. Alternatively, in current 
implementation, we read all data in the same 16KB output buffer (which is 
allocated only once). In both cases, the amount of data copy from native to 
Java is same. The only difference is whether we read & skip in our code or we 
read & skip in zstd-jni code. Pushing skip to zstd-jni would be beneficial when 
it further pushed it down to native layer.
   
   [1] 
https://github.com/luben/zstd-jni/blob/master/src/main/java/com/github/luben/zstd/ZstdInputStreamNoFinalizer.java#L228
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-23 Thread via GitHub


divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1083852068


##
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##
@@ -47,6 +47,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream 
buffer, byte messageVer
 public InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
 return new ByteBufferInputStream(buffer);
 }
+
+@Override
+public int getRecommendedDOutSize() {
+return 2 * 1024; // 2KB

Review Comment:
   Changed to throw an UnSupportedException in the abstract class and 
Uncompressed case does not overload this method anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-23 Thread via GitHub


clolov commented on code in PR #13131:
URL: https://github.com/apache/kafka/pull/13131#discussion_r1083906152


##
server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Helper functions for dealing with command line utilities.
+ */
+public class CommandLineUtils {
+/**
+ * Check if there are no options or `--help` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) 
{
+return commandOpts.args.length == 0 || 
commandOpts.options.has(commandOpts.helpOpt);
+}
+
+/**
+ * Check if there is `--version` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintVersionNeeded(CommandDefaultOptions 
commandOpts) {
+return commandOpts.options.has(commandOpts.versionOpt);
+}
+
+/**
+ * Check and print help message if there is no options or `--help` option
+ * from command line, if `--version` is specified on the command line
+ * print version information and exit.
+ * NOTE: The function name is not strictly speaking correct anymore
+ * as it also checks whether the version needs to be printed, but
+ * refactoring this would have meant changing all command line tools
+ * and unnecessarily increased the blast radius of this change.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @param message Message to display on successful check
+ */
+public static void printHelpAndExitIfNeeded(CommandDefaultOptions 
commandOpts, String message) {
+if (isPrintHelpNeeded(commandOpts)) {
+printUsageAndDie(commandOpts.parser, message);
+}
+if (isPrintVersionNeeded(commandOpts)) {
+printVersionAndDie();
+}
+}
+
+/**
+ * Check that all the listed options are present.
+ */
+public static void checkRequiredArgs(OptionParser parser, OptionSet 
options, OptionSpec... requiredList) {
+for (OptionSpec arg : requiredList) {
+if (!options.has(arg)) {
+printUsageAndDie(parser, String.format("Missing required 
argument \"%s\"", arg));
+}
+}
+}
+
+/**
+ * Check that none of the listed options are present.
+ */
+public static void checkInvalidArgs(OptionParser parser,
+OptionSet options,
+OptionSpec usedOption,
+OptionSpec... invalidOptions) {
+if (options.has(usedOption)) {
+for (OptionSpec arg : invalidOptions) {
+if (options.has(arg)) {
+printUsageAndDie(parser, String.format("Option \"%s\" 
can't be used with option \"%s\"", usedOption, arg));
+}
+}
+}
+}
+
+/**
+ * Check that none of the listed options are present.
+ */
+public static void checkInvalidArgs(OptionParser parser,
+OptionSet options,
+OptionSpec usedOption,
+Set> invalidOptions) {
+OptionSpec[] array = new OptionSpec[invalidOptions.size()];
+invalidOptions.toArray(array);
+checkInvalidArgs(parser, options, usedOption, array);
+}
+
+/**
+ * Check that none of the listed options are present with the combination 
of used options.
+ */
+public static void c

[GitHub] [kafka] clolov commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-23 Thread via GitHub


clolov commented on code in PR #13131:
URL: https://github.com/apache/kafka/pull/13131#discussion_r1083909966


##
core/src/main/scala/kafka/tools/ConsoleConsumer.scala:
##
@@ -352,9 +353,11 @@ object ConsoleConsumer extends Logging {
 } else if (options.has(offsetOpt))
   CommandLineUtils.printUsageAndDie(parser, "The partition is required 
when offset is specified.")
 
-def invalidOffset(offset: String): Nothing =
+def invalidOffset(offset: String): Nothing = {
   CommandLineUtils.printUsageAndDie(parser, s"The provided offset value 
'$offset' is incorrect. Valid values are " +
 "'earliest', 'latest', or a non-negative long.")
+  Exit.exit(1)

Review Comment:
   A similar question about using `ToolsUtils.printUsageAndDie(...)` here as 
well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-23 Thread via GitHub


clolov commented on code in PR #13131:
URL: https://github.com/apache/kafka/pull/13131#discussion_r1083920980


##
server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Helper functions for dealing with command line utilities.
+ */
+public class CommandLineUtils {
+/**
+ * Check if there are no options or `--help` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) 
{
+return commandOpts.args.length == 0 || 
commandOpts.options.has(commandOpts.helpOpt);
+}
+
+/**
+ * Check if there is `--version` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintVersionNeeded(CommandDefaultOptions 
commandOpts) {
+return commandOpts.options.has(commandOpts.versionOpt);
+}
+
+/**
+ * Check and print help message if there is no options or `--help` option
+ * from command line, if `--version` is specified on the command line
+ * print version information and exit.
+ * NOTE: The function name is not strictly speaking correct anymore
+ * as it also checks whether the version needs to be printed, but
+ * refactoring this would have meant changing all command line tools
+ * and unnecessarily increased the blast radius of this change.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @param message Message to display on successful check
+ */
+public static void printHelpAndExitIfNeeded(CommandDefaultOptions 
commandOpts, String message) {
+if (isPrintHelpNeeded(commandOpts)) {
+printUsageAndDie(commandOpts.parser, message);
+}
+if (isPrintVersionNeeded(commandOpts)) {
+printVersionAndDie();
+}
+}
+
+/**
+ * Check that all the listed options are present.
+ */
+public static void checkRequiredArgs(OptionParser parser, OptionSet 
options, OptionSpec... requiredList) {
+for (OptionSpec arg : requiredList) {
+if (!options.has(arg)) {
+printUsageAndDie(parser, String.format("Missing required 
argument \"%s\"", arg));
+}
+}
+}
+
+/**
+ * Check that none of the listed options are present.
+ */
+public static void checkInvalidArgs(OptionParser parser,
+OptionSet options,
+OptionSpec usedOption,
+OptionSpec... invalidOptions) {
+if (options.has(usedOption)) {
+for (OptionSpec arg : invalidOptions) {
+if (options.has(arg)) {
+printUsageAndDie(parser, String.format("Option \"%s\" 
can't be used with option \"%s\"", usedOption, arg));
+}
+}
+}
+}
+
+/**
+ * Check that none of the listed options are present.
+ */
+public static void checkInvalidArgs(OptionParser parser,
+OptionSet options,
+OptionSpec usedOption,
+Set> invalidOptions) {
+OptionSpec[] array = new OptionSpec[invalidOptions.size()];
+invalidOptions.toArray(array);
+checkInvalidArgs(parser, options, usedOption, array);
+}
+
+/**
+ * Check that none of the listed options are present with the combination 
of used options.
+ */
+public static void c

[GitHub] [kafka] clolov commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-23 Thread via GitHub


clolov commented on code in PR #13131:
URL: https://github.com/apache/kafka/pull/13131#discussion_r1083920980


##
server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Helper functions for dealing with command line utilities.
+ */
+public class CommandLineUtils {
+/**
+ * Check if there are no options or `--help` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) 
{
+return commandOpts.args.length == 0 || 
commandOpts.options.has(commandOpts.helpOpt);
+}
+
+/**
+ * Check if there is `--version` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintVersionNeeded(CommandDefaultOptions 
commandOpts) {
+return commandOpts.options.has(commandOpts.versionOpt);
+}
+
+/**
+ * Check and print help message if there is no options or `--help` option
+ * from command line, if `--version` is specified on the command line
+ * print version information and exit.
+ * NOTE: The function name is not strictly speaking correct anymore
+ * as it also checks whether the version needs to be printed, but
+ * refactoring this would have meant changing all command line tools
+ * and unnecessarily increased the blast radius of this change.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @param message Message to display on successful check
+ */
+public static void printHelpAndExitIfNeeded(CommandDefaultOptions 
commandOpts, String message) {
+if (isPrintHelpNeeded(commandOpts)) {
+printUsageAndDie(commandOpts.parser, message);
+}
+if (isPrintVersionNeeded(commandOpts)) {
+printVersionAndDie();
+}
+}
+
+/**
+ * Check that all the listed options are present.
+ */
+public static void checkRequiredArgs(OptionParser parser, OptionSet 
options, OptionSpec... requiredList) {
+for (OptionSpec arg : requiredList) {
+if (!options.has(arg)) {
+printUsageAndDie(parser, String.format("Missing required 
argument \"%s\"", arg));
+}
+}
+}
+
+/**
+ * Check that none of the listed options are present.
+ */
+public static void checkInvalidArgs(OptionParser parser,
+OptionSet options,
+OptionSpec usedOption,
+OptionSpec... invalidOptions) {
+if (options.has(usedOption)) {
+for (OptionSpec arg : invalidOptions) {
+if (options.has(arg)) {
+printUsageAndDie(parser, String.format("Option \"%s\" 
can't be used with option \"%s\"", usedOption, arg));
+}
+}
+}
+}
+
+/**
+ * Check that none of the listed options are present.
+ */
+public static void checkInvalidArgs(OptionParser parser,
+OptionSet options,
+OptionSpec usedOption,
+Set> invalidOptions) {
+OptionSpec[] array = new OptionSpec[invalidOptions.size()];
+invalidOptions.toArray(array);
+checkInvalidArgs(parser, options, usedOption, array);
+}
+
+/**
+ * Check that none of the listed options are present with the combination 
of used options.
+ */
+public static void c

[GitHub] [kafka] clolov commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-23 Thread via GitHub


clolov commented on code in PR #13131:
URL: https://github.com/apache/kafka/pull/13131#discussion_r1083920980


##
server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Helper functions for dealing with command line utilities.
+ */
+public class CommandLineUtils {
+/**
+ * Check if there are no options or `--help` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) 
{
+return commandOpts.args.length == 0 || 
commandOpts.options.has(commandOpts.helpOpt);
+}
+
+/**
+ * Check if there is `--version` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintVersionNeeded(CommandDefaultOptions 
commandOpts) {
+return commandOpts.options.has(commandOpts.versionOpt);
+}
+
+/**
+ * Check and print help message if there is no options or `--help` option
+ * from command line, if `--version` is specified on the command line
+ * print version information and exit.
+ * NOTE: The function name is not strictly speaking correct anymore
+ * as it also checks whether the version needs to be printed, but
+ * refactoring this would have meant changing all command line tools
+ * and unnecessarily increased the blast radius of this change.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @param message Message to display on successful check
+ */
+public static void printHelpAndExitIfNeeded(CommandDefaultOptions 
commandOpts, String message) {
+if (isPrintHelpNeeded(commandOpts)) {
+printUsageAndDie(commandOpts.parser, message);
+}
+if (isPrintVersionNeeded(commandOpts)) {
+printVersionAndDie();
+}
+}
+
+/**
+ * Check that all the listed options are present.
+ */
+public static void checkRequiredArgs(OptionParser parser, OptionSet 
options, OptionSpec... requiredList) {
+for (OptionSpec arg : requiredList) {
+if (!options.has(arg)) {
+printUsageAndDie(parser, String.format("Missing required 
argument \"%s\"", arg));
+}
+}
+}
+
+/**
+ * Check that none of the listed options are present.
+ */
+public static void checkInvalidArgs(OptionParser parser,
+OptionSet options,
+OptionSpec usedOption,
+OptionSpec... invalidOptions) {
+if (options.has(usedOption)) {
+for (OptionSpec arg : invalidOptions) {
+if (options.has(arg)) {
+printUsageAndDie(parser, String.format("Option \"%s\" 
can't be used with option \"%s\"", usedOption, arg));
+}
+}
+}
+}
+
+/**
+ * Check that none of the listed options are present.
+ */
+public static void checkInvalidArgs(OptionParser parser,
+OptionSet options,
+OptionSpec usedOption,
+Set> invalidOptions) {
+OptionSpec[] array = new OptionSpec[invalidOptions.size()];
+invalidOptions.toArray(array);
+checkInvalidArgs(parser, options, usedOption, array);
+}
+
+/**
+ * Check that none of the listed options are present with the combination 
of used options.
+ */
+public static void c

[GitHub] [kafka] clolov commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-23 Thread via GitHub


clolov commented on code in PR #13131:
URL: https://github.com/apache/kafka/pull/13131#discussion_r1083906152


##
server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Helper functions for dealing with command line utilities.
+ */
+public class CommandLineUtils {
+/**
+ * Check if there are no options or `--help` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) 
{
+return commandOpts.args.length == 0 || 
commandOpts.options.has(commandOpts.helpOpt);
+}
+
+/**
+ * Check if there is `--version` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintVersionNeeded(CommandDefaultOptions 
commandOpts) {
+return commandOpts.options.has(commandOpts.versionOpt);
+}
+
+/**
+ * Check and print help message if there is no options or `--help` option
+ * from command line, if `--version` is specified on the command line
+ * print version information and exit.
+ * NOTE: The function name is not strictly speaking correct anymore
+ * as it also checks whether the version needs to be printed, but
+ * refactoring this would have meant changing all command line tools
+ * and unnecessarily increased the blast radius of this change.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @param message Message to display on successful check
+ */
+public static void printHelpAndExitIfNeeded(CommandDefaultOptions 
commandOpts, String message) {
+if (isPrintHelpNeeded(commandOpts)) {
+printUsageAndDie(commandOpts.parser, message);
+}
+if (isPrintVersionNeeded(commandOpts)) {
+printVersionAndDie();
+}
+}
+
+/**
+ * Check that all the listed options are present.
+ */
+public static void checkRequiredArgs(OptionParser parser, OptionSet 
options, OptionSpec... requiredList) {
+for (OptionSpec arg : requiredList) {
+if (!options.has(arg)) {
+printUsageAndDie(parser, String.format("Missing required 
argument \"%s\"", arg));
+}
+}
+}
+
+/**
+ * Check that none of the listed options are present.
+ */
+public static void checkInvalidArgs(OptionParser parser,
+OptionSet options,
+OptionSpec usedOption,
+OptionSpec... invalidOptions) {
+if (options.has(usedOption)) {
+for (OptionSpec arg : invalidOptions) {
+if (options.has(arg)) {
+printUsageAndDie(parser, String.format("Option \"%s\" 
can't be used with option \"%s\"", usedOption, arg));
+}
+}
+}
+}
+
+/**
+ * Check that none of the listed options are present.
+ */
+public static void checkInvalidArgs(OptionParser parser,
+OptionSet options,
+OptionSpec usedOption,
+Set> invalidOptions) {
+OptionSpec[] array = new OptionSpec[invalidOptions.size()];
+invalidOptions.toArray(array);
+checkInvalidArgs(parser, options, usedOption, array);
+}
+
+/**
+ * Check that none of the listed options are present with the combination 
of used options.
+ */
+public static void c

[GitHub] [kafka] mimaison commented on a diff in pull request #11748: KAFKA-12635: Don't emit checkpoints for partitions without any offset…

2023-01-23 Thread via GitHub


mimaison commented on code in PR #11748:
URL: https://github.com/apache/kafka/pull/11748#discussion_r1083928100


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -169,6 +172,7 @@ public String version() {
 return listConsumerGroupOffsets(group).entrySet().stream()
 .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
 .map(x -> checkpoint(group, x.getKey(), x.getValue()))
+.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do 
not emit checkpoints for partitions that don't have offset-syncs

Review Comment:
   @sgn2607 I'm not aware of anybody working on this. I don't think there's 
even a Jira clearly stating this requirement. Feel free to work on it if you 
want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-23 Thread via GitHub


clolov commented on code in PR #13131:
URL: https://github.com/apache/kafka/pull/13131#discussion_r1083908982


##
core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala:
##
@@ -100,6 +101,7 @@ object ZkSecurityMigrator extends Logging {
 false
   case _ =>
 CommandLineUtils.printUsageAndDie(opts.parser, usageMessage)
+Exit.exit(1)

Review Comment:
   Could you use the new `ToolsUtils.printUsageAndDie(...)` which you 
introduced?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #10826: KAFKA-7632: Support Compression Level

2023-01-23 Thread via GitHub


clolov commented on PR #10826:
URL: https://github.com/apache/kafka/pull/10826#issuecomment-1400189273

   Hello @dongjinleekr! What is the current state of this pull request?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #13059: MINOR: KafkaConfig should not expose internal config when queried for non-internal values

2023-01-23 Thread via GitHub


mimaison merged PR #13059:
URL: https://github.com/apache/kafka/pull/13059


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #13094: MINOR: Various cleanups in client tests

2023-01-23 Thread via GitHub


mimaison merged PR #13094:
URL: https://github.com/apache/kafka/pull/13094


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-23 Thread via GitHub


fvaleri commented on code in PR #13131:
URL: https://github.com/apache/kafka/pull/13131#discussion_r1084030517


##
core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala:
##
@@ -100,6 +101,7 @@ object ZkSecurityMigrator extends Logging {
 false
   case _ =>
 CommandLineUtils.printUsageAndDie(opts.parser, usageMessage)
+Exit.exit(1)

Review Comment:
   Sure. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-23 Thread via GitHub


divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1084093810


##
clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java:
##
@@ -62,10 +68,11 @@ public void release(ByteBuffer buffer) {
 }
 };
 
-// Set output buffer (uncompressed) to 16 KB (none by default) to 
ensure reasonable performance
-// in cases where the caller reads a small number of bytes 
(potentially a single byte).
-return new BufferedInputStream(new ZstdInputStreamNoFinalizer(new 
ByteBufferInputStream(buffer),
-bufferPool), 16 * 1024);
+// We do not use an intermediate buffer to store the decompressed 
data as a result of JNI read() calls using
+// `ZstdInputStreamNoFinalizer` here. Every read() call to 
`DataInputStream` will be a JNI call and the
+// caller is expected to balance the tradeoff between reading 
large amount of data vs. making multiple JNI
+// calls.
+return new DataInputStream(new ZstdInputStreamNoFinalizer(new 
ByteBufferInputStream(buffer), bufferPool));

Review Comment:
   1. Thanks for pointing out. This is an artifact of some other changes I was 
trying to do. Fix it now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-23 Thread via GitHub


divijvaidya commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1084097512


##
clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java:
##
@@ -62,10 +68,11 @@ public void release(ByteBuffer buffer) {
 }
 };
 
-// Set output buffer (uncompressed) to 16 KB (none by default) to 
ensure reasonable performance
-// in cases where the caller reads a small number of bytes 
(potentially a single byte).
-return new BufferedInputStream(new ZstdInputStreamNoFinalizer(new 
ByteBufferInputStream(buffer),
-bufferPool), 16 * 1024);
+// We do not use an intermediate buffer to store the decompressed 
data as a result of JNI read() calls using
+// `ZstdInputStreamNoFinalizer` here. Every read() call to 
`DataInputStream` will be a JNI call and the
+// caller is expected to balance the tradeoff between reading 
large amount of data vs. making multiple JNI
+// calls.
+return new DataInputStream(new ZstdInputStreamNoFinalizer(new 
ByteBufferInputStream(buffer), bufferPool));

Review Comment:
   2. For broker, the number of JNI calls remain same because prior to this 
change, we were making JNI calls in chunks of 16KB (using BufferedInputStream) 
and now we are making JNI calls in chunks of 16KB based on decompression buffer 
size.
   
   For consumer, the number of JNI calls *will change*. Earlier, consumer was 
making multiple calls in chunks of 16KB (using BufferedInputStream) and now it 
is making one call to read the entire data. Note that consumer does not use 
"skipKeyValueIterator" variation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-23 Thread via GitHub


divijvaidya commented on PR #13135:
URL: https://github.com/apache/kafka/pull/13135#issuecomment-1400407941

   TODO (will update PR in a short while) - 
   
   1. Add benchmark for case when batch contains single 10 byte message
   2. Test consumer performance
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

2023-01-23 Thread via GitHub


C0urante commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1400491466

   @vamossagar12 I don't think it's necessary to call this out anywhere unless 
it's caused unexpected issues with our users. The intention behind the 
exponential backoff is to avoid rebalance storms but I'm still not sure when 
those would ever realistically happen, so until we see otherwise, I'd prefer to 
leave it as an internal implementation detail.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante opened a new pull request, #13148: KAFKA-14645: Use plugin classloader when retrieving connector plugin config definitions

2023-01-23 Thread via GitHub


C0urante opened a new pull request, #13148:
URL: https://github.com/apache/kafka/pull/13148

   [Jira](https://issues.apache.org/jira/browse/KAFKA-14645)
   
   If we don't switch to the classloader of a plugin before loading its 
`ConfigDef`, then classloading bugs can appear for, e.g., properties with the 
`CLASS` type. See https://github.com/kcctl/kcctl/issues/266 for an instance of 
this kind of bug.
   
   This PR adds a classloader swap in the part of the code base that's 
responsible for servicing requests to the `GET 
/connector-plugins//config` endpoint.
   
   The existing monolithic unit test for this logic is broken out into 
dedicated individual unit tests for each kind of connector plugin; this is done 
to avoid having to reset expectations on the mocked `Plugins` object when 
verifying calls to `withClassLoader`, since in the unit testing environment the 
same classloader may be used for multiple different plugin types.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13148: KAFKA-14645: Use plugin classloader when retrieving connector plugin config definitions

2023-01-23 Thread via GitHub


C0urante commented on code in PR #13148:
URL: https://github.com/apache/kafka/pull/13148#discussion_r1084283540


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -821,13 +827,14 @@ public List connectorPluginConfig(String 
pluginName) {
 default:
 throw new BadRequestException("Invalid plugin type " + 
pluginType + ". Valid types are sink, source, converter, header_converter, 
transformation, predicate.");
 }
-} catch (ClassNotFoundException cnfe) {
-throw new NotFoundException("Unknown plugin " + pluginName + ".");
-}
-for (ConfigDef.ConfigKey configKey : configDefs.configKeys().values()) 
{
-results.add(AbstractHerder.convertConfigKey(configKey));
+List results = new ArrayList<>();
+for (ConfigDef.ConfigKey configKey : 
configDefs.configKeys().values()) {
+results.add(AbstractHerder.convertConfigKey(configKey));
+}
+return results;
+} catch (ClassNotFoundException e) {
+throw new ConnectException("Failed to load plugin class or one of 
its dependencies", e);

Review Comment:
   IMO an HTTP 500 response (which will be the result of throwing a 
`ConnectException` here) is more warranted in this place since we've already 
checked to see that the plugin type is recognized on the worker by this point. 
If things break here, it's more likely that the worker or one of its plugins is 
set up or packaged incorrectly than that the user issued a bad request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-23 Thread via GitHub


ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1084344515


##
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java:
##
@@ -273,20 +272,32 @@ public int partitionLeaderEpoch() {
 public DataInputStream recordInputStream(BufferSupplier bufferSupplier) {
 final ByteBuffer buffer = this.buffer.duplicate();
 buffer.position(RECORDS_OFFSET);
-return new DataInputStream(compressionType().wrapForInput(buffer, 
magic(), bufferSupplier));
+final InputStream decompressedStream = 
compressionType().wrapForInput(buffer, magic(), bufferSupplier);
+return decompressedStream instanceof DataInputStream ? 
(DataInputStream) decompressedStream : new DataInputStream(decompressedStream);
 }
 
 private CloseableIterator compressedIterator(BufferSupplier 
bufferSupplier, boolean skipKeyValue) {
 final DataInputStream inputStream = recordInputStream(bufferSupplier);
 
 if (skipKeyValue) {
 // this buffer is used to skip length delimited fields like key, 
value, headers
-byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];
+final ByteBuffer skipBuffer = 
bufferSupplier.get(compressionType().getRecommendedDOutSize());

Review Comment:
   Since we cache buffers per thread, I think you mean we will use two buffers 
instead of one per thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-23 Thread via GitHub


ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1084344515


##
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java:
##
@@ -273,20 +272,32 @@ public int partitionLeaderEpoch() {
 public DataInputStream recordInputStream(BufferSupplier bufferSupplier) {
 final ByteBuffer buffer = this.buffer.duplicate();
 buffer.position(RECORDS_OFFSET);
-return new DataInputStream(compressionType().wrapForInput(buffer, 
magic(), bufferSupplier));
+final InputStream decompressedStream = 
compressionType().wrapForInput(buffer, magic(), bufferSupplier);
+return decompressedStream instanceof DataInputStream ? 
(DataInputStream) decompressedStream : new DataInputStream(decompressedStream);
 }
 
 private CloseableIterator compressedIterator(BufferSupplier 
bufferSupplier, boolean skipKeyValue) {
 final DataInputStream inputStream = recordInputStream(bufferSupplier);
 
 if (skipKeyValue) {
 // this buffer is used to skip length delimited fields like key, 
value, headers
-byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];
+final ByteBuffer skipBuffer = 
bufferSupplier.get(compressionType().getRecommendedDOutSize());

Review Comment:
   Since we cache buffers per thread, I think you mean we will use two buffers 
instead of one per thread (for the zstd case).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest

2023-01-23 Thread via GitHub


ijuma commented on code in PR #12781:
URL: https://github.com/apache/kafka/pull/12781#discussion_r1084369349


##
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java:
##
@@ -98,7 +98,7 @@
 private Optional> producer;
 private TopicAdmin admin;
 
-private Thread thread;

Review Comment:
   We have to mutate this variable from a test? Generally, it's better to 
expose a method for what you need to do versus exposing the mutable variable in 
this way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-01-23 Thread via GitHub


vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1082994727


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * 
+ *  +  + , 
reverse-sorted by timestamp> + 
+ * 
+ * Negative {@code value_size} is used to indicate that the value stored is a 
tombstone, in order to
+ * distinguish from empty array which has {@code value_size} of zero. In 
practice, {@code value_size}
+ * is always set to -1 for the tombstone case, though this need not be true in 
general.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+
+/**
+ * @return the validTo timestamp of the latest record in the provided 
segment
+ */
+static long getNextTimestamp(final byte[] segmentValue) {
+return ByteBuffer.wrap(segmentValue).getLong(0);
+}
+
+/**
+ * Returns whether the provided segment is "empty." An empty segment is 
one that
+ * contains only a single tombstone with no validTo timestamp specified. 
In this case,
+ * the serialized segment contains only the timestamp of the tombstone 
(stored as the segment's
+ * {@code nextTimestamp}) and nothing else.
+ * 
+ * This can happen if, e.g., the only record inserted for a particular key 
is
+ * a tombstone. In this case, the tombstone must be stored in a segment
+ * (as the latest value store does not store tombstones), but also has no 
validTo
+ * timestamp associated with it.
+ *
+ * @return whether the segment is "empty"
+ */
+static boolean isEmpty(final byte[] segmentValue) {
+return segmentValue.length <= TIMESTAMP_SIZE;
+}
+
+/**
+ * Requires that the segment is not empty. Caller is responsible for 
verifying that this
+ * is the case before calling this method.

Review Comment:
   No, `nextTimestamp` is always present, even if the segment is empty. In the 
case where the segment is empty, we still need to store the tombstone's 
timestamp somewhere, and we choose to store it in `nextTimestamp` since 
`nextTimestamp` is serialized first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] erichaagdev opened a new pull request, #13149: Remove unnecessary asCollection causing eager dependency resolution

2023-01-23 Thread via GitHub


erichaagdev opened a new pull request, #13149:
URL: https://github.com/apache/kafka/pull/13149

   The call to `asCollection()` causes several configurations to be resolved 
eagerly, and potentially unnecessarily. Dropping `asCollection()` ensures the 
configurations are only resolved when they are needed.
   
   Before: 
https://scans.gradle.com/s/ivl2vfw2sq252/performance/dependency-resolution#dependency-resolution-configuration-user-initiated
   After: 
https://scans.gradle.com/s/uxkjpjvylz6qw/performance/dependency-resolution#dependency-resolution-configuration-user-initiated
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-01-23 Thread via GitHub


cmccabe commented on code in PR #13116:
URL: https://github.com/apache/kafka/pull/13116#discussion_r1084419497


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -392,14 +394,36 @@ class ControllerApis(val requestChannel: RequestChannel,
 val describableTopicNames = 
getDescribableTopics.apply(allowedTopicNames).asJava
 val effectiveRequest = request.duplicate()
 val iterator = effectiveRequest.topics().iterator()
+var totalRequestedPartitionCount = 0
+val defaultPartitionCont = config.numPartitions.intValue()
 while (iterator.hasNext) {
   val creatableTopic = iterator.next()
   if (duplicateTopicNames.contains(creatableTopic.name()) ||
   !authorizedTopicNames.contains(creatableTopic.name())) {
 iterator.remove()
-  }
-}
-controller.createTopics(context, effectiveRequest, 
describableTopicNames).thenApply { response =>
+  } else {
+if (!creatableTopic.assignments().isEmpty) {
+  totalRequestedPartitionCount += creatableTopic.assignments().size()
+} else if (creatableTopic.numPartitions() > 0)
+  totalRequestedPartitionCount += creatableTopic.numPartitions()
+else
+  totalRequestedPartitionCount += defaultPartitionCont
+  }
+}
+val future = try {
+  if (!effectiveRequest.validateOnly())
+controllerMutationQuota.record(totalRequestedPartitionCount)
+  controller.createTopics(context, effectiveRequest, describableTopicNames)
+} catch {
+  case e: ThrottlingQuotaExceededException =>
+val apiError = ApiError.fromThrowable(e)
+val data = new CreateTopicsResponseData
+effectiveRequest.topics().forEach(topic =>
+  data.topics.add(new 
CreateTopicsResponseData.CreatableTopicResult().setName(topic.name).setErrorCode(apiError.error.code).setErrorMessage(apiError.message)))
+data.setThrottleTimeMs(e.throttleTimeMs())
+CompletableFuture.completedFuture(data)
+}

Review Comment:
   I don't think you need to catch this here, since we have a catch block on 
the request handler that will deal with "send back error X to response Y"
   
   If you do have a catch block here you'd certainly need a return statement as 
well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-01-23 Thread via GitHub


cmccabe commented on PR #13116:
URL: https://github.com/apache/kafka/pull/13116#issuecomment-1400816232

   > Also note that I didn't implement the full logic -- QuorumController has 
additional checks that could in fact cause a specific topic request to be 
rejected (e.g. explicit assignments not contiguous partition numbers starting 
with 0). This could lead to a calculation of partition counts for throttling 
purposes that exceeds the actual partition counts requested -- which could 
cause throttling in cases where technically it should not.
   
   I think arguably if you are making requests that fail, your quota should be 
dinged for those requests. Surely we don't want to let the guy spinning in a 
tight loop making tons of bad requests go un-throttled? I would even maybe 
include requests that fail authorization in the quota, although I know the ZK 
implementation might not. Authorization is expensive; you should get charged 
for wasting the authorizer's time with bad rqeuests.
   
   Maybe we should charge for validateOnly requests as well. They are almost as 
expensive as non-validateOnly requests... the only difference is no records 
written.
   
   > The benefit of the Scala approach is that we avoid doing work in the 
controller itself when throttling does occur. We don't generate all of the 
Metadata Log records, for example. Note that createPartitions and delateTopics 
will have to get access to the partition counts, which currently I think is not 
available.
   
   One big issue is that ControllerMutationQuota takes some locks, and I am 
worried that the controller thread could get blocked behind those locks. This 
is similar to the Authorizer where for better or worse, we went with a blocking 
implementation that would be too slow to call from the quorum controller thread 
(for example, the Authorizer implementation might send out a blocking request 
to ZK or other external system, etc.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13144: KAFKA-14463 Invoke of ConnectorClientConfigOverridePolicy#close

2023-01-23 Thread via GitHub


C0urante commented on code in PR #13144:
URL: https://github.com/apache/kafka/pull/13144#discussion_r1084433485


##
connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java:
##
@@ -27,7 +27,7 @@
 
 public abstract class BaseConnectorClientConfigOverridePolicyTest {
 
-protected abstract ConnectorClientConfigOverridePolicy  policyToTest();

Review Comment:
   Good eye!



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -227,9 +227,14 @@ public class DistributedHerderTest {
 private SinkConnectorConfig conn1SinkConfig;
 private SinkConnectorConfig conn1SinkConfigUpdated;
 private short connectProtocolVersion;
+private boolean connectorClientConfigOverridePolicyClosed = false;
 private final ConnectorClientConfigOverridePolicy
-noneConnectorClientConfigOverridePolicy = new 
NoneConnectorClientConfigOverridePolicy();
-
+noneConnectorClientConfigOverridePolicy = new 
NoneConnectorClientConfigOverridePolicy() {
+@Override
+public void close() {
+connectorClientConfigOverridePolicyClosed = true;
+}
+};

Review Comment:
   I like this testing strategy overall, but I think this logic can go into a 
dedicated class, similar to the [SampleSinkConnector 
class](https://github.com/apache/kafka/blob/00e5803cd3af89011254e734232308618403309d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SampleSinkConnector.java)
 that we use for unit tests that require a sink connector.
   
   The `SampleConnectorClientConfigOverridePolicy` class (feel free to rename) 
could then expose a public `boolean isClosed()` method, which would replace the 
`connectorClientConfigOverridePolicyClosed` field here.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -151,6 +151,11 @@ protected void stopServices() {
 this.configBackingStore.stop();
 this.worker.stop();
 this.connectorExecutor.shutdown();
+try {
+this.connectorClientConfigOverridePolicy.close();
+} catch (Exception e) {
+log.warn("Exception while stop 
connectorClientConfigOverridePolicy:", e);
+}

Review Comment:
   This can be a one-liner with the `Utils` class:
   
   ```suggestion
   Utils.closeQuietly(this.connectorClientConfigOverridePolicy, 
"connector client config override policy");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #12802: KAFKA-14311: Connect Worker clean shutdown does not cleanly stop connectors/tasks

2023-01-23 Thread via GitHub


C0urante merged PR #12802:
URL: https://github.com/apache/kafka/pull/12802


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

2023-01-23 Thread via GitHub


gharris1727 commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1084435808


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -228,12 +229,17 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
  * @param kafkaClusterId the identifier of the Kafka cluster to use 
for internal topics; may not be null
  * @param statusBackingStore the backing store for statuses; may not be 
null
  * @param configBackingStore the backing store for connector 
configurations; may not be null
- * @param restUrlthe URL of this herder's REST API; may not be 
null
+ * @param restUrlthe URL of this herder's REST API; may not be 
null, but may be an arbitrary placeholder
+ *   value if this worker does not expose a REST 
API
+ * @param restClient a REST client that can be used to issue 
requests to other workers in the cluster; may
+ *   be null if inter-worker communication is not 
enabled
  * @param connectorClientConfigOverridePolicy the policy specifying the 
client configuration properties that may be overridden
  *in connector configurations; 
may not be null
+ * @param restNamespace  zero or more path elements to prepend to the 
paths of forwarded REST requests; may be empty, but not null
  * @param uponShutdown   any {@link AutoCloseable} objects that should 
be closed when this herder is {@link #stop() stopped},
  *   after all services and resources owned by 
this herder are stopped
  */
+// TODO: Do we really need two separate public constructors?

Review Comment:
   interesting, the for-testing constructor appears unused, and has been for as 
long as it's existed (ever since #321).
   We can push this refactor out to a different PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

2023-01-23 Thread via GitHub


C0urante commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1084460160


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -228,12 +229,17 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
  * @param kafkaClusterId the identifier of the Kafka cluster to use 
for internal topics; may not be null
  * @param statusBackingStore the backing store for statuses; may not be 
null
  * @param configBackingStore the backing store for connector 
configurations; may not be null
- * @param restUrlthe URL of this herder's REST API; may not be 
null
+ * @param restUrlthe URL of this herder's REST API; may not be 
null, but may be an arbitrary placeholder
+ *   value if this worker does not expose a REST 
API
+ * @param restClient a REST client that can be used to issue 
requests to other workers in the cluster; may
+ *   be null if inter-worker communication is not 
enabled
  * @param connectorClientConfigOverridePolicy the policy specifying the 
client configuration properties that may be overridden
  *in connector configurations; 
may not be null
+ * @param restNamespace  zero or more path elements to prepend to the 
paths of forwarded REST requests; may be empty, but not null
  * @param uponShutdown   any {@link AutoCloseable} objects that should 
be closed when this herder is {@link #stop() stopped},
  *   after all services and resources owned by 
this herder are stopped
  */
+// TODO: Do we really need two separate public constructors?

Review Comment:
   Blegh, this comment was left in from an earlier draft where there was an 
additional constructor. I refactored it out before opening the PR, but 
apparently forgot to remove the comment. Took it out now, thanks for catching!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13138: MINOR: Small cleanups in refactored consumer implementation

2023-01-23 Thread via GitHub


guozhangwang commented on code in PR #13138:
URL: https://github.com/apache/kafka/pull/13138#discussion_r1084463267


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##
@@ -149,11 +156,14 @@ private void onSuccessfulResponse(final 
FindCoordinatorResponseData.Coordinator
 coordinator.host(),
 coordinator.port());
 log.info("Discovered group coordinator {}", coordinator);
-coordinatorRequestState.reset();
+coordinatorRequestState.onSuccessfulAttempt(currentTimeMs);
 }
 
-private void onFailedCoordinatorResponse(final Exception exception, final 
long currentTimeMs) {
-coordinatorRequestState.updateLastFailedAttempt(currentTimeMs);
+private void onFailedResponse(
+final long currentTimeMs,
+final Throwable exception
+) {
+coordinatorRequestState.onFailedAttempt(currentTimeMs);

Review Comment:
   Yeah that makes sense too. I was thinking on `onSuccessfulAttempt` we record 
the time in the end of the function so why not doing the same, but I think its 
does not matter much.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13125: KAFKA-14626: Kafka Consumer Coordinator does not cleanup all metrics after shutdown

2023-01-23 Thread via GitHub


philipnee commented on PR #13125:
URL: https://github.com/apache/kafka/pull/13125#issuecomment-1400882904

   hey @cmccabe - would you have time to take a look at this?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-01-23 Thread via GitHub


vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1084484753


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * 
+ *  +  + , 
reverse-sorted by timestamp> + 
+ * 
+ * Negative {@code value_size} is used to indicate that the value stored is a 
tombstone, in order to
+ * distinguish from empty array which has {@code value_size} of zero. In 
practice, {@code value_size}
+ * is always set to -1 for the tombstone case, though this need not be true in 
general.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+
+/**
+ * @return the validTo timestamp of the latest record in the provided 
segment
+ */
+static long getNextTimestamp(final byte[] segmentValue) {
+return ByteBuffer.wrap(segmentValue).getLong(0);
+}
+
+/**
+ * Returns whether the provided segment is "empty." An empty segment is 
one that
+ * contains only a single tombstone with no validTo timestamp specified. 
In this case,
+ * the serialized segment contains only the timestamp of the tombstone 
(stored as the segment's
+ * {@code nextTimestamp}) and nothing else.
+ * 
+ * This can happen if, e.g., the only record inserted for a particular key 
is
+ * a tombstone. In this case, the tombstone must be stored in a segment
+ * (as the latest value store does not store tombstones), but also has no 
validTo
+ * timestamp associated with it.

Review Comment:
   That's fair. I added an extra paragraph into the top-level javadoc just now. 
LMK what you think.
   
   > In general, I prefer to have some "invariant" as it makes it simpler to 
reason about the code, but introducing this edge case void the invariant that 
`nextTimestamp` is the "largest validTo" of the segment.
   
   I completely agree, but this "empty segment" case really is an edge case 
which cannot be combined with the general case. Here's some more context 
(possibly too much detail 🙂) on why we can't use `nextTimestamp = 
` in the empty segment case, which is what we'd need to 
maintain that `nextTimestamp` is always the largest validTo of the segment:
   
   In the implementation of the versioned store itself (PR coming soon), the 
latest record version for a particular key will be stored in a "latest value 
store," and older record versions will be stored in "segment stores" based on 
their validTo timestamps, except when the latest record version (for a 
particular key) is a tombstone. In this case, we don't want to store the 
tombstone in the latest value store because there's no expiry mechanism for the 
latest value store (and the tombstones might accumulate indefinitely). So, the 
tombstone is stored in a segment. But if these special tombstones have `validTo 
= infinity`, then this violates the invariant that "record versions are stored 
into segments based on their validTo timestamp." (We don't want to repeatedly 
move the tombstone into earlier and earlier segments as newer segments are 
created, because the whole point of putting them into a segment is so that they 
eventually expire.) Violating this invariant is a big problem for store
  efficiency. Suppose a new record version is put to the store later, long 
after a tombstone has been put (but before the tombstone has expired). In order 
to find the tombstone and update its validTo timestamp, we'd have to check 
every single segment (until we find an existing record version). We'd have to 
do this for every single put, since we wouldn't know whether the latest record 
version is a tombstone or not.
   
   In contrast, if we allow the validTo timestamp for the tombstone of the 
empty segment to be the tombstone's 

[GitHub] [kafka] nizhikov closed pull request #12574: KAFKA-13908 Rethrow ExecutionException to preserve original cause

2023-01-23 Thread via GitHub


nizhikov closed pull request #12574: KAFKA-13908 Rethrow ExecutionException to 
preserve original cause
URL: https://github.com/apache/kafka/pull/12574


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-01-23 Thread via GitHub


vcrfxia commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1084488305


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in 
segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * 
+ *  +  + , 
reverse-sorted by timestamp> + 
+ * 
+ * Negative {@code value_size} is used to indicate that the value stored is a 
tombstone, in order to
+ * distinguish from empty array which has {@code value_size} of zero. In 
practice, {@code value_size}
+ * is always set to -1 for the tombstone case, though this need not be true in 
general.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+private static final int TIMESTAMP_SIZE = 8;
+private static final int VALUE_SIZE = 4;
+
+/**
+ * @return the validTo timestamp of the latest record in the provided 
segment
+ */
+static long getNextTimestamp(final byte[] segmentValue) {
+return ByteBuffer.wrap(segmentValue).getLong(0);
+}
+
+/**
+ * Returns whether the provided segment is "empty." An empty segment is 
one that
+ * contains only a single tombstone with no validTo timestamp specified. 
In this case,
+ * the serialized segment contains only the timestamp of the tombstone 
(stored as the segment's
+ * {@code nextTimestamp}) and nothing else.
+ * 
+ * This can happen if, e.g., the only record inserted for a particular key 
is
+ * a tombstone. In this case, the tombstone must be stored in a segment
+ * (as the latest value store does not store tombstones), but also has no 
validTo
+ * timestamp associated with it.
+ *
+ * @return whether the segment is "empty"
+ */
+static boolean isEmpty(final byte[] segmentValue) {
+return segmentValue.length <= TIMESTAMP_SIZE;
+}
+
+/**
+ * Requires that the segment is not empty. Caller is responsible for 
verifying that this
+ * is the case before calling this method.
+ *
+ * @return the timestamp of the earliest record in the provided segment.
+ */
+static long getMinTimestamp(final byte[] segmentValue) {
+return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
+}
+
+/**
+ * @return the deserialized segment value
+ */
+static SegmentValue deserialize(final byte[] segmentValue) {
+return new PartiallyDeserializedSegmentValue(segmentValue);
+}
+
+/**
+ * Creates a new segment value that contains the provided record.
+ *
+ * @param value the record value
+ * @param validFrom the record's timestamp
+ * @param validTo the record's validTo timestamp
+ * @return the newly created segment value
+ */
+static SegmentValue newSegmentValueWithRecord(
+final byte[] value, final long validFrom, final long validTo) {
+return new PartiallyDeserializedSegmentValue(value, validFrom, 
validTo);
+}
+
+/**
+ * Creates a new empty segment value.
+ *
+ * @param timestamp the timestamp of the tombstone for this empty segment 
value
+ * @return the newly created segment value
+ */
+static SegmentValue newSegmentValueWithTombstone(final long timestamp) {
+return new PartiallyDeserializedSegmentValue(timestamp);
+}
+
+interface SegmentValue {
+
+/**
+ * @return whether the segment is empty. See
+ * {@link RocksDBVersionedStoreSegmentValueFormatter#isEmpty(byte[])} 
for details.
+ */
+boolean isEmpty();
+
+/**
+ * Finds the latest record in this segment with timestamp not 
exceeding the provided
+ * timestamp bound. This method requires that the provided timestamp 
bound exists in
+ * this segment, i.e., the segment is not empty, and the provided 
timestamp bound is
+ * at least minTimestamp and is smaller than nextTimestamp.
+ 

[GitHub] [kafka] vcrfxia commented on pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-01-23 Thread via GitHub


vcrfxia commented on PR #13126:
URL: https://github.com/apache/kafka/pull/13126#issuecomment-1400904752

   Thanks for your reviews, @mjsax ! I pushed another commit just now to 
incorporate your latest suggestions (javadocs changes only) and responded 
inline to the main points of discussion. The build passed on the previous go 
(besides flaky integration test failures) so if the latest changes look good to 
you, I believe we should be ready to merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on a diff in pull request #13144: KAFKA-14463 Invoke of ConnectorClientConfigOverridePolicy#close

2023-01-23 Thread via GitHub


nizhikov commented on code in PR #13144:
URL: https://github.com/apache/kafka/pull/13144#discussion_r1084491140


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -227,9 +227,14 @@ public class DistributedHerderTest {
 private SinkConnectorConfig conn1SinkConfig;
 private SinkConnectorConfig conn1SinkConfigUpdated;
 private short connectProtocolVersion;
+private boolean connectorClientConfigOverridePolicyClosed = false;
 private final ConnectorClientConfigOverridePolicy
-noneConnectorClientConfigOverridePolicy = new 
NoneConnectorClientConfigOverridePolicy();
-
+noneConnectorClientConfigOverridePolicy = new 
NoneConnectorClientConfigOverridePolicy() {
+@Override
+public void close() {
+connectorClientConfigOverridePolicyClosed = true;
+}
+};

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on pull request #13144: KAFKA-14463 Invoke of ConnectorClientConfigOverridePolicy#close

2023-01-23 Thread via GitHub


nizhikov commented on PR #13144:
URL: https://github.com/apache/kafka/pull/13144#issuecomment-1400908925

   @C0urante Thanks for the review. I've applied your suggestions. Please, take 
a look one more time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13120: MINOR: Connect Javadocs improvements

2023-01-23 Thread via GitHub


gharris1727 commented on code in PR #13120:
URL: https://github.com/apache/kafka/pull/13120#discussion_r1084469223


##
connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigOverridePolicy.java:
##
@@ -23,25 +23,25 @@
 import java.util.List;
 
 /**
- * An interface for enforcing a policy on overriding of client configs via 
the connector configs.
- *
- * Common use cases are ability to provide principal per connector, 
sasl.jaas.config
+ * An interface for enforcing a policy on overriding of Kafka client configs 
via the connector configs.
+ * 
+ * Common use cases are ability to provide principal per connector, 
sasl.jaas.config
  * and/or enforcing that the producer/consumer configurations for 
optimizations are within acceptable ranges.
  */
 public interface ConnectorClientConfigOverridePolicy extends Configurable, 
AutoCloseable {
 
 
 /**
- * Worker will invoke this while constructing the producer for the 
SourceConnectors,  DLQ for SinkConnectors and the consumer for the
- * SinkConnectors to validate if all of the overridden client 
configurations are allowed per the
- * policy implementation. This would also be invoked during the validation 
of connector configs via the Rest API.
- *
+ * Workers will invoke this while constructing producer for 
SourceConnectors, DLQs for SinkConnectors and
+ * consumers for SinkConnectors to validate if all of the overridden 
client configurations are allowed per the

Review Comment:
   We probably don't need to enumerate all of the use-cases for kafka clients 
here, and can keep that scoped to the ConnectorClientConfigRequest javadoc.
   ```suggestion
* Workers will invoke this before configuring per-connector Kafka 
admin, producer, and consumer
* client instances to validate if all of the overridden client 
configurations are allowed per the
   ```



##
connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java:
##
@@ -131,8 +131,8 @@ public void reconfigure(Map props) {
 /**
  * Validate the connector configuration values against configuration 
definitions.
  * @param connectorConfigs the provided configuration values
- * @return List of Config, each Config contains the updated configuration 
information given
- * the current configuration values.
+ * @return {@link Config}, essentially a list of {@link ConfigValue}s 
containing the updated configuration
+ * information given the current configuration values.

Review Comment:
   ```suggestion
* @return a parsed and validated {@link Config} containing any relevant 
validation errors with the raw
* {@code connectorConfigs} which should prevent this configuration from 
being used.
   ```



##
connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java:
##
@@ -88,15 +88,15 @@ public void initialize(SinkTaskContext context) {
 public abstract void start(Map props);
 
 /**
- * Put the records in the sink. Usually this should send the records to 
the sink asynchronously
- * and immediately return.
- *
+ * Put the records in the sink. This should either write them to the 
downstream system or batch them for
+ * later writing

Review Comment:
   I agree that it's not necessary for this javadoc to prescribe asynchronous 
behavior, but it should certainly point out the pitfall that asynchronous users 
need to take special care.
   ```suggestion
* Put the records in the sink. If this method returns before the 
records are durably written,
* the task must implement {@link #flush(Map)} or {@link 
#preCommit(Map)} to ensure that
* only durably written record offsets are committed, and that no 
records are dropped during failures.
   ```



##
connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigRequest.java:
##
@@ -44,25 +44,25 @@ public ConnectorClientConfigRequest(
 }
 
 /**
- * Provides Config with prefix {@code producer.override.} for {@link 
ConnectorType#SOURCE}.
- * Provides Config with prefix {@code consumer.override.} for {@link 
ConnectorType#SINK}.
- * Provides Config with prefix {@code producer.override.} for {@link 
ConnectorType#SINK} for DLQ.
- * Provides Config with prefix {@code admin.override.} for {@link 
ConnectorType#SINK} for DLQ.
+ * Provides Config with prefix "{@code producer.override.}" for {@link 
ConnectorType#SOURCE}.
+ * Provides Config with prefix "{@code consumer.override.}" for {@link 
ConnectorType#SINK}.
+ * Provides Config with prefix "{@code producer.override.}" for {@link 
ConnectorType#SINK} for DLQ.
+ * Provides Config with prefix "{@code admin.override.}" for {@link 
ConnectorType#SINK} for DLQ.

Review Comment:
   We should also mention some more recent usages of this that haven't been 
updated in this documentation:
   * Admin for fencing zomb

  1   2   3   4   5   6   7   8   9   10   >