[GitHub] [kafka] abbccdda commented on pull request #8907: MINOR: code cleanup for `VOut` inconsistent naming

2020-07-24 Thread GitBox


abbccdda commented on pull request #8907:
URL: https://github.com/apache/kafka/pull/8907#issuecomment-663817294


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-24 Thread GitBox


chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-663818439


   @junrao 
   
   I have rebased this PR to include fix of ```group_mode_transactions_test```. 
Could you run system tests again? Except for ```streams_eos_test``` and 
transaction tests, other tests work well on my local.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 edited a comment on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-24 Thread GitBox


chia7712 edited a comment on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-663818439


   @junrao 
   
   I have rebased this PR to include fix of ```group_mode_transactions_test```. 
Could you run system tests again? Except for ```streams_eos_test```, 
```streams.streams_eos_test``` and transaction tests, other tests work well on 
my local.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 edited a comment on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-24 Thread GitBox


chia7712 edited a comment on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-663818439


   @junrao 
   
   I have rebased this PR to include fix of ```group_mode_transactions_test```. 
Could you run system tests again? Except for ```streams_eos_test```, 
```streams_standby_replica_test``` and transaction tests, other tests work well 
on my local.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] AshishRoyJava commented on pull request #9034: KAFKA-10246 : AbstractProcessorContext topic() throws NPE

2020-07-25 Thread GitBox


AshishRoyJava commented on pull request #9034:
URL: https://github.com/apache/kafka/pull/9034#issuecomment-663825200


   All checks are failing due to some other test case failure. Can anyone look 
into the logs please?
   @abbccdda 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huxihx commented on pull request #9071: KAFKA-10305: Print usage when parsing fails for ConsumerPerformance

2020-07-25 Thread GitBox


huxihx commented on pull request #9071:
URL: https://github.com/apache/kafka/pull/9071#issuecomment-663829002


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9034: KAFKA-10246 : AbstractProcessorContext topic() throws NPE

2020-07-25 Thread GitBox


chia7712 commented on pull request #9034:
URL: https://github.com/apache/kafka/pull/9034#issuecomment-663833458


   @AshishRoyJava Nice finding and fixing!
   
   > AbstractProcessorContext topic() throws NullPointerException when 
modifying a state store within the DSL from a punctuator
   
   Could you share more details? It seems to me it is weird to see a 
```ProcessorRecordContext``` carrying ```null``` topic.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rootex- commented on pull request #6700: KAFKA-7817 ConsumerGroupCommand Regex Feature

2020-07-25 Thread GitBox


rootex- commented on pull request #6700:
URL: https://github.com/apache/kafka/pull/6700#issuecomment-663834666


   @abbccdda Yes, sure 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Mathieu1124 opened a new pull request #9076: MINOR - fix typo

2020-07-25 Thread GitBox


Mathieu1124 opened a new pull request #9076:
URL: https://github.com/apache/kafka/pull/9076


   *FIx typo in ProducerConfig*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huxihx commented on pull request #9071: KAFKA-10305: Print usage when parsing fails for ConsumerPerformance

2020-07-25 Thread GitBox


huxihx commented on pull request #9071:
URL: https://github.com/apache/kafka/pull/9071#issuecomment-663842491


   @omkreddy  Thanks for the review, merging to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huxihx merged pull request #9071: KAFKA-10305: Print usage when parsing fails for ConsumerPerformance

2020-07-25 Thread GitBox


huxihx merged pull request #9071:
URL: https://github.com/apache/kafka/pull/9071


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #9077: MINOR: remove NewTopic#NO_PARTITIONS and NewTopic#NO_REPLICATION_FACT…

2020-07-25 Thread GitBox


chia7712 opened a new pull request #9077:
URL: https://github.com/apache/kafka/pull/9077


   The constants used to represent no_partition and no_replica are important 
factor in creating topic. Hence, we should have consistent reference to code 
base.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rgroothuijsen opened a new pull request #9078: KAFKA-10132: Return correct value types for MBean attributes

2020-07-25 Thread GitBox


rgroothuijsen opened a new pull request #9078:
URL: https://github.com/apache/kafka/pull/9078


   Currently, JMX outputs all metrics as having type `double`, even if they are 
strings or other types of numbers. This PR gets the type from the metric's 
value if possible, with `null` as a fallback.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rgroothuijsen commented on pull request #9078: KAFKA-10132: Return correct value types for MBean attributes

2020-07-25 Thread GitBox


rgroothuijsen commented on pull request #9078:
URL: https://github.com/apache/kafka/pull/9078#issuecomment-663854901


   I'm not entirely sure about returning raw nulls as a fallback, however, or 
how permissive it should be about null values in general. Thoughts?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-25 Thread GitBox


vvcephei commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460404740



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+final AtomicLong position = new AtomicLong();
+retryUntilSuccessOrThrowOnTaskTimeout(
+() -> 
position.set(globalConsumer.position(topicPartition)),
+String.format(
+"Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+topicPartition
+)
+);
+offset = position.get();
 }
 
-long offset = globalConsumer.position(topicPartition);
 final Long highWatermark = highWatermarks.get(topicPartition);
 final RecordBatchingStateRestoreCallback stateRestoreAdapter =
 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
 stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
 long restoreCount = 0L;
 
+long deadlineMs = NO_DEADLINE;
 while (offset < highWatermark) {
 try {
 final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+if (records.isEmpty()) {
+if (taskTimeoutMs == 0L) {
+deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
   Thanks, @mjsax , that's fair.
   
   I promise I'm no equivocating here; I'm just trying to understand why my 
intuition is that this is wrong.
   
   It seems like maybe the fundamental problem here is that we can't 
distinguish among a successful poll that returns no data, a failure to poll, 
and a pending async fetch request. The one thing we know is that the end offset 
is beyond our current position, so there _should_ be data to poll, so we can 
assume that an empty return means either that the fetch failed internally or it 
hasn't completed yet.
   
   Stepping back, this seems to be related to the problem of task idling, in 
which it's pointless to "idle" for a time so short that we have no chance to 
actually get a response back from the broker.
   
   I feel like this is substantially my fault from #4855 / KIP-266. The purpose 
of making this API completely async was to avoid harming liveness in situations 
where we might have a relatively strict deadline. But that's not the case here.
   
   I guess the "poor man's" solution we're going for here is to block poll for 
at least long enough to allow for a complete fetch round-trip from the broker. 
If we know that there was a round-trip, and we didn't get any data, then we can 
conclude that there was an error (since we know there is data to get). Since we 
can't know that there was a round trip, we weaken the condition to: if we know 
it's been long enough that there should have been a round-trip and we don't get 
data, we conclude there was probably an error.
   
   In your KIP, we specified we would start the task timer _after_ the first 
error, so it seems like we really want to just block the poll for the 
round-trip time, and then apply your "update deadline, etc." function. I'm with 
you now that to get the round-trip time, we have to extract some config(s) from 
the Consumer. This is a pretty awkward hack, but now that I've thought it 
through, it seems the best we can do. Maybe we can mull it over and file an 
improvement jira for the Consumer to improve use cases like this.
   
   Anyway, it seems like the "poll time" config is irrelevant, we just need to 
know what config to grab that corresponds to completing a fetch request with 
high probability. It seems like we shouldn't need to update metadata, so we 
would send a fetch request on the first poll call, and we just need to block 
for whatever time bounds the fetch response time.
   
   I'm honestly not sure what timeout would be best here. It looks like the 
ConsumerNetworkClient will just wait for a response until it gets a 
"disconnect" (L598). Is that a socket timeout? I'm not sure.





Thi

[GitHub] [kafka] vvcephei commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-25 Thread GitBox


vvcephei commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460404740



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+final AtomicLong position = new AtomicLong();
+retryUntilSuccessOrThrowOnTaskTimeout(
+() -> 
position.set(globalConsumer.position(topicPartition)),
+String.format(
+"Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+topicPartition
+)
+);
+offset = position.get();
 }
 
-long offset = globalConsumer.position(topicPartition);
 final Long highWatermark = highWatermarks.get(topicPartition);
 final RecordBatchingStateRestoreCallback stateRestoreAdapter =
 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
 stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
 long restoreCount = 0L;
 
+long deadlineMs = NO_DEADLINE;
 while (offset < highWatermark) {
 try {
 final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+if (records.isEmpty()) {
+if (taskTimeoutMs == 0L) {
+deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
   Thanks, @mjsax , that's fair.
   
   I promise I'm not equivocating here; I'm just trying to figure out what my 
intuition is trying to tell me.
   
   It seems like maybe the fundamental problem here is that we can't 
distinguish among a successful poll that returns no data, a failure to poll, 
and a pending async fetch request. The one thing we know is that the end offset 
is beyond our current position, so there _should_ be data to poll, so we can 
assume that an empty return means either that the fetch failed internally or it 
hasn't completed yet.
   
   Stepping back, this seems to be related to the problem of task idling, in 
which it's pointless to "idle" for a time so short that we have no chance to 
actually get a response back from the broker.
   
   I feel like this is substantially my fault from #4855 / KIP-266. The purpose 
of making this API completely async was to avoid harming liveness in situations 
where we might have a relatively strict deadline. But that's not the case here.
   
   I guess the "poor man's" solution we're going for here is to block poll for 
at least long enough to allow for a complete fetch round-trip from the broker. 
If we know that there was a round-trip, and we didn't get any data, then we can 
conclude that there was an error (since we know there is data to get). Since we 
can't know that there was a round trip, we weaken the condition to: if we know 
it's been long enough that there should have been a round-trip and we don't get 
data, we conclude there was probably an error.
   
   In your KIP, we specified we would start the task timer _after_ the first 
error, so it seems like we really want to just block the poll for the 
round-trip time, and then apply your "update deadline, etc." function. I'm with 
you now that to get the round-trip time, we have to extract some config(s) from 
the Consumer. This is a pretty awkward hack, but now that I've thought it 
through, it seems the best we can do. Maybe we can mull it over and file an 
improvement jira for the Consumer to improve use cases like this.
   
   Anyway, it seems like the "poll time" config is irrelevant, we just need to 
know what config to grab that corresponds to completing a fetch request with 
high probability. It seems like we shouldn't need to update metadata, so we 
would send a fetch request on the first poll call, and we just need to block 
for whatever time bounds the fetch response time.
   
   I'm honestly not sure what timeout would be best here. It looks like the 
ConsumerNetworkClient will just wait for a response until it gets a 
"disconnect" (L598). Is that a socket timeout? I'm not sure.





Th

[GitHub] [kafka] cadonna commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-25 Thread GitBox


cadonna commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663867610


   Java 11 failed with
   
   ```
   08:16:38 org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest 
> shouldUpgradeFromEosAlphaToEosBeta[true] FAILED
   08:16:38 java.lang.AssertionError: Did not receive all 20 records from 
topic multiPartitionOutputTopic within 6 ms
   08:16:38 Expected: is a value equal to or greater than <20>
   08:16:38  but: <0> was less than <20>
   ```
   
   Java 14 failed with 
   
   ```
   
   06:37:03 kafka.network.SocketServerTest > testIdleConnection FAILED
   06:37:03 org.scalatest.exceptions.TestFailedException: Failed to close 
idle channel
   
   ```
   
   Java 8 failed with
   
   ```
   java.lang.AssertionError: Did not receive all 168 records from topic 
multiPartitionOutputTopic within 6 ms
   Expected: is a value equal to or greater than <168>
but: <167> was less than <168>
   ```
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna edited a comment on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-25 Thread GitBox


cadonna edited a comment on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663867610


   Java 11 failed with
   
   ```
   08:16:38 org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest 
> shouldUpgradeFromEosAlphaToEosBeta[true] FAILED
   08:16:38 java.lang.AssertionError: Did not receive all 20 records from 
topic multiPartitionOutputTopic within 6 ms
   08:16:38 Expected: is a value equal to or greater than <20>
   08:16:38  but: <0> was less than <20>
   ```
   
   Java 14 failed with 
   
   ```
   
   06:37:03 kafka.network.SocketServerTest > testIdleConnection FAILED
   06:37:03 org.scalatest.exceptions.TestFailedException: Failed to close 
idle channel
   
   ```
   
   Java 8 failed with
   
   ```
   
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   
   java.lang.AssertionError: Did not receive all 168 records from topic 
multiPartitionOutputTopic within 6 ms
   Expected: is a value equal to or greater than <168>
but: <167> was less than <168>
   ```
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...

2020-07-25 Thread GitBox


ijuma commented on a change in pull request #9022:
URL: https://github.com/apache/kafka/pull/9022#discussion_r460416614



##
File path: 
core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##
@@ -672,11 +677,9 @@ class TopicCommandWithAdminClientTest extends 
KafkaServerTestHarness with Loggin
 adminClient.createTopics(
   Collections.singletonList(new NewTopic(testTopicName, partitions, 
replicationFactor).configs(configMap))).all().get()
 waitForTopicCreated(testTopicName)
-TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 
10, acks = -1)
+TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 
1000, acks = -1)

Review comment:
   The broker only knows about batches, not individual records.

##
File path: 
core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##
@@ -672,11 +677,9 @@ class TopicCommandWithAdminClientTest extends 
KafkaServerTestHarness with Loggin
 adminClient.createTopics(
   Collections.singletonList(new NewTopic(testTopicName, partitions, 
replicationFactor).configs(configMap))).all().get()
 waitForTopicCreated(testTopicName)
-TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 
10, acks = -1)
+TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 
1000, acks = -1)

Review comment:
   The broker only works in terms of record batches, not individual records.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...

2020-07-25 Thread GitBox


lbradstreet commented on a change in pull request #9022:
URL: https://github.com/apache/kafka/pull/9022#discussion_r460416611



##
File path: 
core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##
@@ -672,11 +677,9 @@ class TopicCommandWithAdminClientTest extends 
KafkaServerTestHarness with Loggin
 adminClient.createTopics(
   Collections.singletonList(new NewTopic(testTopicName, partitions, 
replicationFactor).configs(configMap))).all().get()
 waitForTopicCreated(testTopicName)
-TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 
10, acks = -1)
+TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 
1000, acks = -1)
 
 val brokerIds = servers.map(_.config.brokerId)
-TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, 
Set(tp), throttleBytes = 1)

Review comment:
   I simply meant that we still need the replication throttle so the 
reassignment doesn't complete before we perform our checks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...

2020-07-25 Thread GitBox


ijuma commented on a change in pull request #9022:
URL: https://github.com/apache/kafka/pull/9022#discussion_r460416797



##
File path: 
core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##
@@ -673,10 +678,14 @@ class TopicCommandWithAdminClientTest extends 
KafkaServerTestHarness with Loggin
   Collections.singletonList(new NewTopic(testTopicName, partitions, 
replicationFactor).configs(configMap))).all().get()
 waitForTopicCreated(testTopicName)
 TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 
10, acks = -1)
+Thread.sleep(10)

Review comment:
   We can call `flush` on the producer to force it to send the messages.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...

2020-07-25 Thread GitBox


ijuma commented on a change in pull request #9022:
URL: https://github.com/apache/kafka/pull/9022#discussion_r460417143



##
File path: 
core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##
@@ -673,10 +678,14 @@ class TopicCommandWithAdminClientTest extends 
KafkaServerTestHarness with Loggin
   Collections.singletonList(new NewTopic(testTopicName, partitions, 
replicationFactor).configs(configMap))).all().get()
 waitForTopicCreated(testTopicName)
 TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 
10, acks = -1)
+Thread.sleep(10)

Review comment:
   Looking a bit more, we actually close the producer after we generate the 
messages, so I don't think this sleep is needed at all.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9052: MINOR: TopologyTestDriver should not require dummy parameters

2020-07-25 Thread GitBox


abbccdda commented on pull request #9052:
URL: https://github.com/apache/kafka/pull/9052#issuecomment-663869941


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9034: KAFKA-10246 : AbstractProcessorContext topic() throws NPE

2020-07-25 Thread GitBox


abbccdda commented on pull request #9034:
URL: https://github.com/apache/kafka/pull/9034#issuecomment-663870109


   Test failure should be non related.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda merged pull request #9034: KAFKA-10246 : AbstractProcessorContext topic() throws NPE

2020-07-25 Thread GitBox


abbccdda merged pull request #9034:
URL: https://github.com/apache/kafka/pull/9034


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8907: MINOR: code cleanup for `VOut` inconsistent naming

2020-07-25 Thread GitBox


abbccdda commented on pull request #8907:
URL: https://github.com/apache/kafka/pull/8907#issuecomment-663870524


   Test failure non related.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda merged pull request #8907: MINOR: code cleanup for `VOut` inconsistent naming

2020-07-25 Thread GitBox


abbccdda merged pull request #8907:
URL: https://github.com/apache/kafka/pull/8907


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-25 Thread GitBox


chia7712 commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663870629







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...

2020-07-25 Thread GitBox


ijuma commented on pull request #9022:
URL: https://github.com/apache/kafka/pull/9022#issuecomment-663871416


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #9079: KAFKA-10308 fix flaky core/round_trip_fault_test.py

2020-07-25 Thread GitBox


chia7712 opened a new pull request #9079:
URL: https://github.com/apache/kafka/pull/9079


   Creating a topic may fail (due to timeout) in running system tests. However, 
```RoundTripWorker``` does not ignore ```TopicExistsException``` which makes 
```round_trip_fault_test.py``` be a flaky one.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...

2020-07-25 Thread GitBox


ijuma commented on pull request #9022:
URL: https://github.com/apache/kafka/pull/9022#issuecomment-663871604


   https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma removed a comment on pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...

2020-07-25 Thread GitBox


ijuma removed a comment on pull request #9022:
URL: https://github.com/apache/kafka/pull/9022#issuecomment-663871604


   https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...

2020-07-25 Thread GitBox


ijuma commented on pull request #9022:
URL: https://github.com/apache/kafka/pull/9022#issuecomment-663871642


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 closed pull request #5708: Minor: remove WorkerCoordinatorMetrics and instantiate the metrics in the constructor of WorkerCoordinator

2020-07-25 Thread GitBox


chia7712 closed pull request #5708:
URL: https://github.com/apache/kafka/pull/5708


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 closed pull request #8137: KAFKA-8967 Flaky test kafka.api.SaslSslAdminIntegrationTest.testCreat…

2020-07-25 Thread GitBox


chia7712 closed pull request #8137:
URL: https://github.com/apache/kafka/pull/8137


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException

2020-07-25 Thread GitBox


mjsax commented on pull request #9075:
URL: https://github.com/apache/kafka/pull/9075#issuecomment-663905239


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException

2020-07-25 Thread GitBox


mjsax commented on pull request #9075:
URL: https://github.com/apache/kafka/pull/9075#issuecomment-663905333


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-25 Thread GitBox


mjsax commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663906774


   @chia7712 Only committers can trigger Jenkins builds.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-25 Thread GitBox


mjsax commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663906866


   Java 8 passed the tests in question are know to be flaky. Might be good 
enough? Leave it to @vvcephei to make a call and merge.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-25 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460449901



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+final AtomicLong position = new AtomicLong();
+retryUntilSuccessOrThrowOnTaskTimeout(
+() -> 
position.set(globalConsumer.position(topicPartition)),
+String.format(
+"Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+topicPartition
+)
+);
+offset = position.get();
 }
 
-long offset = globalConsumer.position(topicPartition);
 final Long highWatermark = highWatermarks.get(topicPartition);
 final RecordBatchingStateRestoreCallback stateRestoreAdapter =
 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
 stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
 long restoreCount = 0L;
 
+long deadlineMs = NO_DEADLINE;
 while (offset < highWatermark) {
 try {
 final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+if (records.isEmpty()) {
+if (taskTimeoutMs == 0L) {
+deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
   I agree. The "issue" is really that `poll()` does _not_ throw a 
`TimeoutException`... Also, because we do manual assignment, `poll()` would 
never return "early" as it never need to wait for joining a consumer group. -- 
However, compare to `max.task.idle.ms`, we are in a better situation here, 
because we poll() for only a single partition at a time.
   
   I also agree, that applying `task.timeout.ms` should start _after_ we got a 
first timeout -- this was how the original code worked that you criticized as:
   > Man, this is confusing.
   
   And I agree, that the code was not straightforward to understand. But if we 
think it's the right thing to do, I am also happy to add it back :)
   
   I am also not an expert on all consumer internals, but from my 
understanding, fetch requests are send async in general, and if a fetch request 
fails, the consumer would actually not retry it but a retry would be triggered 
by the next `poll()` call. If there is no data available (ie, fetch request did 
not return yet) when `poll()` is called, the consumer would block internally 
until `poll(Duration)` timeout expires or until a fetch request returns 
(whatever comes first).
   
   Furthermore, before `poll()` returns, it always check if a fetch request is 
in-flight or not, and sends one if not.
   
   Thus, on the verify first call to `poll()` we know that no fetch request can 
be in-flight and we also know that `poll()` would send one, and block until it 
returns or `poll(Duration)` expired. Thus, if `poll()` does not block for at 
least `request.timeout.ms`, and we get empty back we don't know which case 
holds, however, if we use the request timeout, it seems that we _know_ if the 
fetch was successful or did time out? We also know, that a fetch request will 
be inflight after `poll()` returns. Thus, for any consecutive `poll()` applying 
request timeout also ensures that we know if the request was successful or not.
   
   I guess the only difference to what I just described to my original code 
was, that I uses `pollTime + requestTimeout`.
   
   Bottom line: I am not 100% sure what you propose? Should we go with the 
original design? Or with the new design? -- In the end, I think we don't need a 
follow up PR, and we can just try to get it right in this PR. I don't see any 
benefit in splitting it up into 2 PRs (because, as mentioned above, we fetch 
for a single partitions and thus it's a different case compared to 
`max.task.idle.ms` scenario).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For qu

[GitHub] [kafka] chia7712 commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-25 Thread GitBox


chia7712 commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663912187


   > Only committers can trigger Jenkins builds.
   
   It seems I can trigger the jenkins by comment “ Test this please”. Let me 
test it again.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-25 Thread GitBox


chia7712 commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663912465


   Jenkins is running now :(
   
   Maybe it does not exclude the committers from other Apache projects. At any 
rate, I will not trigger Jenkins in the future.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9049: MINOR: fix scala warnings

2020-07-25 Thread GitBox


ijuma commented on a change in pull request #9049:
URL: https://github.com/apache/kafka/pull/9049#discussion_r460453337



##
File path: core/src/main/scala/kafka/admin/ConfigCommand.scala
##
@@ -344,11 +342,14 @@ object ConfigCommand extends Config {
 val sensitiveEntries = newEntries.filter(_._2.value == null)
 if (sensitiveEntries.nonEmpty)
   throw new InvalidConfigurationException(s"All sensitive broker 
config entries must be specified for --alter, missing entries: 
${sensitiveEntries.keySet}")
-val newConfig = new JConfig(newEntries.asJava.values)
+
+val alterLogLevelEntries = (newEntries.values.map(new AlterConfigOp(_, 
AlterConfigOp.OpType.SET))
+  ++ configsToBeDeleted.map { k => new AlterConfigOp(new 
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
+  ).asJavaCollection
 
 val configResource = new ConfigResource(ConfigResource.Type.BROKER, 
entityNameHead)
 val alterOptions = new 
AlterConfigsOptions().timeoutMs(3).validateOnly(false)
-adminClient.alterConfigs(Map(configResource -> newConfig).asJava, 
alterOptions).all().get(60, TimeUnit.SECONDS)
+adminClient.incrementalAlterConfigs(Map(configResource -> 
alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)

Review comment:
   Some broker versions don't support incremental alter configs, so you 
would be breaking compatibility by making this change.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9049: MINOR: fix scala warnings

2020-07-25 Thread GitBox


ijuma commented on a change in pull request #9049:
URL: https://github.com/apache/kafka/pull/9049#discussion_r460453389



##
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##
@@ -384,8 +380,8 @@ class AclAuthorizer extends Authorizer with Logging {
 
 val prefixed = new ArrayBuffer[AclEntry]
 aclCacheSnapshot
-  .from(new ResourcePattern(resourceType, resourceName, 
PatternType.PREFIXED))
-  .to(new ResourcePattern(resourceType, resourceName.take(1), 
PatternType.PREFIXED))
+  .rangeFrom(new ResourcePattern(resourceType, resourceName, 
PatternType.PREFIXED))
+  .rangeTo(new ResourcePattern(resourceType, resourceName.take(1), 
PatternType.PREFIXED))

Review comment:
   Scala 2.12 doesn't have these methods.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9063: MINOR: Fixed deprecated Gradle build Properties.

2020-07-25 Thread GitBox


ijuma commented on pull request #9063:
URL: https://github.com/apache/kafka/pull/9063#issuecomment-663913074


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9063: MINOR: Fixed deprecated Gradle build Properties.

2020-07-25 Thread GitBox


ijuma commented on a change in pull request #9063:
URL: https://github.com/apache/kafka/pull/9063#discussion_r460453530



##
File path: build.gradle
##
@@ -304,7 +304,7 @@ subprojects {
   }
 
   test {
-maxParallelForks = userMaxForks ?: Runtime.runtime.availableProcessors()
+maxParallelForks = userMaxForks ?: Runtime.runtime.availableProcessors() 
as int

Review comment:
   What is the reason for the various `as` additions?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...

2020-07-25 Thread GitBox


ijuma commented on pull request #9022:
URL: https://github.com/apache/kafka/pull/9022#issuecomment-663913281


   https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9079: KAFKA-10308 fix flaky core/round_trip_fault_test.py

2020-07-25 Thread GitBox


ijuma commented on pull request #9079:
URL: https://github.com/apache/kafka/pull/9079#issuecomment-663913254


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma removed a comment on pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...

2020-07-25 Thread GitBox


ijuma removed a comment on pull request #9022:
URL: https://github.com/apache/kafka/pull/9022#issuecomment-663913281


   https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #9080: MINOR: Recommend Java 11

2020-07-25 Thread GitBox


ijuma opened a new pull request #9080:
URL: https://github.com/apache/kafka/pull/9080


   Java 11 has been recommended for a while, but the ops section had not been 
updated.
   Also tweak the text a bit to read better.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks

2020-07-25 Thread GitBox


jsancio commented on a change in pull request #9050:
URL: https://github.com/apache/kafka/pull/9050#discussion_r460467261



##
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##
@@ -598,6 +603,86 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
 }, "Broker fail to initialize after restart")
   }
 
+  @Test
+  def testPreemptionOnControllerShutdown(): Unit = {
+servers = makeServers(1, enableControlledShutdown = false)
+val controller = getController().kafkaController
+val count = new AtomicInteger(2)
+val latch = new CountDownLatch(1)
+val spyThread = spy(controller.eventManager.thread)
+controller.eventManager.setControllerEventThread(spyThread)
+val processedEvent = new MockEvent(ControllerState.TopicChange) {
+  override def process(): Unit = latch.await()
+  override def preempt(): Unit = {}
+}
+val preemptedEvent = new MockEvent(ControllerState.TopicChange) {
+  override def process(): Unit = {}
+  override def preempt(): Unit = count.decrementAndGet()
+}
+
+controller.eventManager.put(processedEvent)
+controller.eventManager.put(preemptedEvent)
+controller.eventManager.put(preemptedEvent)
+
+doAnswer((_: InvocationOnMock) => {
+  latch.countDown()
+}).doCallRealMethod().when(spyThread).awaitShutdown()

Review comment:
   Got it. Sounds good.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-25 Thread GitBox


mjsax commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663926510


   Ah yes. I work for all committers -- it's not project specific. Missed that 
you are HBase Committer/PMC :) 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks

2020-07-25 Thread GitBox


jsancio commented on a change in pull request #9050:
URL: https://github.com/apache/kafka/pull/9050#discussion_r460468241



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1986,101 +1965,125 @@ private[controller] class ControllerStats extends 
KafkaMetricsGroup {
 
 sealed trait ControllerEvent {
   def state: ControllerState
+  def preempt(): Unit

Review comment:
   I would add a documentation comment to this method explaining that this 
method will not be executed by the controller thread but instead it will be 
executed by some other thread.

##
File path: core/src/main/scala/kafka/controller/ControllerEventManager.scala
##
@@ -77,7 +77,7 @@ class ControllerEventManager(controllerId: Int,
   private val putLock = new ReentrantLock()
   private val queue = new LinkedBlockingQueue[QueuedEvent]
   // Visible for test
-  private[controller] val thread = new 
ControllerEventThread(ControllerEventThreadName)
+  private[controller] var thread = new 
ControllerEventThread(ControllerEventThreadName)

Review comment:
   This comment applies to `clearAndPut`:
   
   ```scala
 def clearAndPut(event: ControllerEvent): QueuedEvent = inLock(putLock) {
   queue.forEach(_.preempt(processor))
   queue.clear()
   put(event)
 }
   ```
   I think there is a bug here where at most one event will be process twice. 
Once by `_preempt(processor)` and once by `doWork`. I think we can fix this 
concurrency bug if we use `LinkedBlockingQueue::drainTo`. E.g.
   
   ```scala
 def clearAndPut(event: ControllerEvent): QueuedEvent = {
   val events = ...;
   
   inLock(putLock) {
 queue.drainTo(events)
 put(event)
   }
   
   event.forEach(_.preempt(processor))
 }
   ```

##
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##
@@ -598,6 +603,86 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
 }, "Broker fail to initialize after restart")
   }
 
+  @Test
+  def testPreemptionOnControllerShutdown(): Unit = {
+servers = makeServers(1, enableControlledShutdown = false)
+val controller = getController().kafkaController
+val count = new AtomicInteger(2)
+val latch = new CountDownLatch(1)
+val spyThread = spy(controller.eventManager.thread)
+controller.eventManager.setControllerEventThread(spyThread)
+val processedEvent = new MockEvent(ControllerState.TopicChange) {
+  override def process(): Unit = latch.await()
+  override def preempt(): Unit = {}
+}
+val preemptedEvent = new MockEvent(ControllerState.TopicChange) {
+  override def process(): Unit = {}
+  override def preempt(): Unit = count.decrementAndGet()

Review comment:
   This method should be executed by the thread that is running this test. 
If you agree, no need to use an `AtomicInteger`.

##
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##
@@ -598,6 +603,86 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
 }, "Broker fail to initialize after restart")
   }
 
+  @Test
+  def testPreemptionOnControllerShutdown(): Unit = {
+servers = makeServers(1, enableControlledShutdown = false)
+val controller = getController().kafkaController
+val count = new AtomicInteger(2)
+val latch = new CountDownLatch(1)
+val spyThread = spy(controller.eventManager.thread)
+controller.eventManager.setControllerEventThread(spyThread)
+val processedEvent = new MockEvent(ControllerState.TopicChange) {
+  override def process(): Unit = latch.await()
+  override def preempt(): Unit = {}
+}
+val preemptedEvent = new MockEvent(ControllerState.TopicChange) {
+  override def process(): Unit = {}
+  override def preempt(): Unit = count.decrementAndGet()
+}
+
+controller.eventManager.put(processedEvent)
+controller.eventManager.put(preemptedEvent)
+controller.eventManager.put(preemptedEvent)
+
+doAnswer((_: InvocationOnMock) => {
+  latch.countDown()
+}).doCallRealMethod().when(spyThread).awaitShutdown()

Review comment:
   The important part is that we want to do `latch.countDown` after 
`initiateShutdown` has been called so that the controller thread doesn't pick 
up a new event because `isRunning` is `false`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks

2020-07-25 Thread GitBox


jsancio commented on a change in pull request #9050:
URL: https://github.com/apache/kafka/pull/9050#discussion_r460468239



##
File path: core/src/main/scala/kafka/controller/ControllerEventManager.scala
##
@@ -77,7 +77,7 @@ class ControllerEventManager(controllerId: Int,
   private val putLock = new ReentrantLock()
   private val queue = new LinkedBlockingQueue[QueuedEvent]
   // Visible for test
-  private[controller] val thread = new 
ControllerEventThread(ControllerEventThreadName)
+  private[controller] var thread = new 
ControllerEventThread(ControllerEventThreadName)

Review comment:
   This comment applies to `clearAndPut`:
   
   ```scala
 def clearAndPut(event: ControllerEvent): QueuedEvent = inLock(putLock) {
   queue.forEach(_.preempt(processor))
   queue.clear()
   put(event)
 }
   ```
   I think there is a bug here where at most one event will be process twice. 
Once by `_preempt(processor)` and once by `doWork`. I think we can fix this 
concurrency bug if we use `LinkedBlockingQueue::drainTo`. E.g.
   
   ```scala
 def clearAndPut(event: ControllerEvent): QueuedEvent = {
   val preemptedEvents = ...;
   
   inLock(putLock) {
 queue.drainTo(preemptedEvents)
 put(event)
   }
   
   preemtedEvents.forEach(_.preempt(processor))
 }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vitojeng commented on pull request #8907: MINOR: code cleanup for `VOut` inconsistent naming

2020-07-25 Thread GitBox


vitojeng commented on pull request #8907:
URL: https://github.com/apache/kafka/pull/8907#issuecomment-663928064


   Thanks @abbccdda !



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-26 Thread GitBox


showuon edited a comment on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-663329158


   @mimaison , thanks for your comments. I've updated in this commit: 
https://github.com/apache/kafka/pull/9029/commits/5345c6835ef42da973b794634d9b8d65f27ee80a.
 What I did are:
   1. remove unused `time` variable and import
   2. Use simple `for` loop instead of lambda loop
   3. We'll wait for the consumer consume all 100 records before continuing
   
   For other improvements, @ning2008wisc will address in KAFKA-10304. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9022: KAFKA-10158: Fix flaky testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress

2020-07-26 Thread GitBox


ijuma commented on pull request #9022:
URL: https://github.com/apache/kafka/pull/9022#issuecomment-664002055


   Green build, merging to trunk and 2.6.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #9022: KAFKA-10158: Fix flaky testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress

2020-07-26 Thread GitBox


ijuma merged pull request #9022:
URL: https://github.com/apache/kafka/pull/9022


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma edited a comment on pull request #9022: KAFKA-10158: Fix flaky testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress

2020-07-26 Thread GitBox


ijuma edited a comment on pull request #9022:
URL: https://github.com/apache/kafka/pull/9022#issuecomment-664002055


   Green build, merging to trunk, 2.6 and 2.5.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] sasakitoa opened a new pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively

2020-07-26 Thread GitBox


sasakitoa opened a new pull request #9081:
URL: https://github.com/apache/kafka/pull/9081


   This PR will change KafkaProducer#sendOffsetsToTransaction to be affected by 
max.block.ms to avoid blocking infinitively.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #9082: MINOR: Update dependencies for Kafka 2.7 (part 1)

2020-07-26 Thread GitBox


ijuma opened a new pull request #9082:
URL: https://github.com/apache/kafka/pull/9082


   I left out updates that could be risky. Preliminary testing indicates
   we can build (including spotBugs) and run tests with Java 15 with
   these changes. I will do more thorough testing once Java 15 reaches
   release candidate stage in a few weeks.
   
   Minor updates with mostly bug fixes:
   - Scala: 2.12.11 -> 2.12.12
   - Bouncy castle: 1.64 -> 1.66
   - HttpClient: 4.5.11 -> 4.5.12
   - Mockito: 3.3.3 -> 3.4.4
   - Netty: 4.5.10 -> 4.5.11
   - Snappy: 1.1.7.3 -> 1.1.7.6
   - Zstd: 1.4.5-2 -> 1.4.5-6
   
   Gradle plugin upgrades:
   - Gradle versions: 0.28.0 -> 0.29.0
   - Git: 4.0.1 -> 4.0.2
   - Scoverage: 1.4.1 -> 1.4.2
   - Shadow: 5.2.0 -> 6.0.0
   - Test Retry: 1.1.5 -> 1.1.6
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rgroothuijsen removed a comment on pull request #9078: KAFKA-10132: Return correct value types for MBean attributes

2020-07-26 Thread GitBox


rgroothuijsen removed a comment on pull request #9078:
URL: https://github.com/apache/kafka/pull/9078#issuecomment-663854901


   I'm not entirely sure about returning raw nulls as a fallback, however, or 
how permissive it should be about null values in general. Thoughts?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException

2020-07-26 Thread GitBox


mjsax merged pull request #9075:
URL: https://github.com/apache/kafka/pull/9075


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9031: KAFKA-10298: replace abstract Windows with a proper interface

2020-07-26 Thread GitBox


vvcephei commented on a change in pull request #9031:
URL: https://github.com/apache/kafka/pull/9031#discussion_r460562257



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/EnumerableWindowDefinition.java
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+
+import java.time.Duration;
+import java.util.Map;
+
+import static 
org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
+
+/**
+ * The window specification for fixed size windows that is used to define 
window boundaries and grace period.
+ * 
+ * Grace period defines how long to wait on out-of-order events. That is, 
windows will continue to accept new records until {@code stream_time >= 
window_end + grace_period}.
+ * Records that arrive after the grace period passed are considered 
late and will not be processed but are dropped.
+ * 
+ * @param  type of the window instance
+ */
+public interface EnumerableWindowDefinition {
+
+
+/**
+ * List all possible windows that contain the provided timestamp,
+ * indexed by non-negative window start timestamps.
+ *
+ * @param timestamp the timestamp window should get created for
+ * @return a map of {@code windowStartTimestamp -> Window} entries
+ */
+Map windowsFor(final long timestamp);
+
+/**
+ * Return the size of the specified windows in milliseconds.

Review comment:
   ```suggestion
* Return an upper bound on the size of windows in milliseconds.
* Used to determine the lower bound on store retention time.
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #9080: MINOR: Recommend Java 11

2020-07-26 Thread GitBox


ijuma merged pull request #9080:
URL: https://github.com/apache/kafka/pull/9080


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException

2020-07-26 Thread GitBox


mjsax commented on pull request #9075:
URL: https://github.com/apache/kafka/pull/9075#issuecomment-664031539


   Merged to `trunk` and cherry-picked to `2.6` branch.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-26 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460449901



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+final AtomicLong position = new AtomicLong();
+retryUntilSuccessOrThrowOnTaskTimeout(
+() -> 
position.set(globalConsumer.position(topicPartition)),
+String.format(
+"Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+topicPartition
+)
+);
+offset = position.get();
 }
 
-long offset = globalConsumer.position(topicPartition);
 final Long highWatermark = highWatermarks.get(topicPartition);
 final RecordBatchingStateRestoreCallback stateRestoreAdapter =
 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
 stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
 long restoreCount = 0L;
 
+long deadlineMs = NO_DEADLINE;
 while (offset < highWatermark) {
 try {
 final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+if (records.isEmpty()) {
+if (taskTimeoutMs == 0L) {
+deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
   I agree. The "issue" is really that `poll()` does _not_ throw a 
`TimeoutException`... Also, because we do manual assignment, `poll()` would 
never return "early" as it never need to wait for joining a consumer group. -- 
However, compare to `max.task.idle.ms`, we are in a better situation here, 
because we poll() for only a single partition at a time.
   
   I also agree, that applying `task.timeout.ms` should start _after_ we got a 
first timeout -- this was how the original code worked that you criticized as:
   > Man, this is confusing.
   
   And I agree, that the code was not straightforward to understand. But if we 
think it's the right thing to do, I am also happy to add it back :)
   
   I am also not an expert on all consumer internals, but from my 
understanding, fetch requests are send async in general, and if a fetch request 
fails, the consumer would actually not retry it but a retry would be triggered 
by the next `poll()` call. If there is no data available (ie, fetch request did 
not return yet) when `poll()` is called, the consumer would block internally 
until `poll(Duration)` timeout expires or until a fetch request returns 
(whatever comes first).
   
   Furthermore, before `poll()` returns, it always check if a fetch request is 
in-flight or not, and sends one if not.
   
   Thus, on the verify first call to `poll()` we know that no fetch request can 
be in-flight and we also know that `poll()` would send one, and block until it 
returns or `poll(Duration)` expires. Thus, if `poll()` does not block for at 
least `request.timeout.ms`, and we get empty back we don't know which case 
holds, however, if we use the request timeout, it seems that we _know_ if the 
fetch was successful or did time out? We also know, that a fetch request will 
be inflight after `poll()` returns. Thus, for any consecutive `poll()` applying 
request timeout also ensures that we know if the request was successful or not.
   
   I guess the only difference to what I just described to my original code 
was, that I uses `pollTime + requestTimeout`.
   
   Bottom line: I am not 100% sure what you propose? Should we go with the 
original design? Or with the new design? -- In the end, I think we don't need a 
follow up PR, and we can just try to get it right in this PR. I don't see any 
benefit in splitting it up into 2 PRs (because, as mentioned above, we fetch 
for a single partitions and thus it's a different case compared to 
`max.task.idle.ms` scenario).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For qu

[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-26 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r460449901



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+final AtomicLong position = new AtomicLong();
+retryUntilSuccessOrThrowOnTaskTimeout(
+() -> 
position.set(globalConsumer.position(topicPartition)),
+String.format(
+"Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+topicPartition
+)
+);
+offset = position.get();
 }
 
-long offset = globalConsumer.position(topicPartition);
 final Long highWatermark = highWatermarks.get(topicPartition);
 final RecordBatchingStateRestoreCallback stateRestoreAdapter =
 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
 stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
 long restoreCount = 0L;
 
+long deadlineMs = NO_DEADLINE;
 while (offset < highWatermark) {
 try {
 final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+if (records.isEmpty()) {
+if (taskTimeoutMs == 0L) {
+deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
   I agree. The "issue" is really that `poll()` does _not_ throw a 
`TimeoutException`... Also, because we do manual assignment, `poll()` would 
never return "early" as it never need to wait for joining a consumer group. -- 
However, compare to `max.task.idle.ms`, we are in a better situation here, 
because we poll() for only a single partition at a time.
   
   I also agree, that applying `task.timeout.ms` should start _after_ we got a 
first timeout -- this was how the original code worked that you criticized as:
   > Man, this is confusing.
   
   And I agree, that the code was not straightforward to understand. But if we 
think it's the right thing to do, I am also happy to add it back :)
   
   I am also not an expert on all consumer internals, but from my 
understanding, fetch requests are send async in general, and if a fetch request 
fails, the consumer would actually not retry it but a retry would be triggered 
by the next `poll()` call. If there is no data available (ie, fetch request did 
not return yet) when `poll()` is called, the consumer would block internally 
until `poll(Duration)` timeout expires or until a fetch request returns 
(whatever comes first).
   
   Furthermore, before `poll()` returns, it always check if a fetch request is 
in-flight or not, and sends one if not.
   
   Thus, on the verify first call to `poll()` we know that no fetch request can 
be in-flight and we also know that `poll()` would send one, and block until it 
returns or `poll(Duration)` expires. Thus, if `poll()` does not block for at 
least `request.timeout.ms`, and we get an empty result back we don't know which 
case holds, however, if we use the request timeout, it seems that we _know_ if 
the fetch was successful or did time out? We also know, that a fetch request 
will be inflight after `poll()` returns. Thus, for any consecutive `poll()` 
applying request timeout also ensures that we know if the request was 
successful or not.
   
   I guess the only difference to what I just described to my original code 
was, that I uses `pollTime + requestTimeout`.
   
   Bottom line: I am not 100% sure what you propose? Should we go with the 
original design? Or with the new design? -- In the end, I think we don't need a 
follow up PR, and we can just try to get it right in this PR. I don't see any 
benefit in splitting it up into 2 PRs (because, as mentioned above, we fetch 
for a single partitions and thus it's a different case compared to 
`max.task.idle.ms` scenario).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comme

[GitHub] [kafka] ning2008wisc commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-26 Thread GitBox


ning2008wisc commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-664045085


   Hi @showuon thanks for your work, a minor thing - do you mind to consolidate 
/ merge the current 4 commits into 1 commit? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #9068: MINOR: INFO log4j when request re-join

2020-07-26 Thread GitBox


guozhangwang merged pull request #9068:
URL: https://github.com/apache/kafka/pull/9068


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-26 Thread GitBox


guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r460588651



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##
@@ -93,8 +93,8 @@ public boolean isActive() {
 public void initializeIfNeeded() {
 if (state() == State.CREATED) {
 StateManagerUtil.registerStateStores(log, logPrefix, topology, 
stateMgr, stateDirectory, processorContext);
-
 // initialize the snapshot with the current offsets as we don't 
need to commit then until they change

Review comment:
   Ack





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-26 Thread GitBox


guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r460590891



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
 static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;

Review comment:
   You mean when the accumulated pending data is larger than the memtable, 
we should checkpoint?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-26 Thread GitBox


guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r460592249



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -454,6 +456,41 @@ public void flush() {
 }
 }
 
+public void flushCache() {
+RuntimeException firstException = null;
+// attempting to flush the stores
+if (!stores.isEmpty()) {
+log.debug("Flushing all store caches registered in the state 
manager: {}", stores);
+for (final StateStoreMetadata metadata : stores.values()) {
+final StateStore store = metadata.stateStore;
+
+try {
+// buffer should be flushed to send all records to 
changelog
+if (store instanceof TimeOrderedKeyValueBuffer) {

Review comment:
   My plan is actually to remove the `flushCache` once we decoupled caching 
with emitting (see the TODO comment on the caller).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-26 Thread GitBox


guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r460599173



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##
@@ -49,6 +61,31 @@
 this.stateDirectory = stateDirectory;
 }
 
+protected void initializeCheckpoint() {
+// we will delete the local checkpoint file after registering the 
state stores and loading them into the
+// state manager, therefore we should initialize the snapshot as empty 
to indicate over-write checkpoint needed
+offsetSnapshotSinceLastFlush = Collections.emptyMap();
+}
+
+/**
+ * The following exceptions maybe thrown from the state manager flushing 
call
+ *
+ * @throws TaskMigratedException recoverable error sending changelog 
records that would cause the task to be removed
+ * @throws StreamsException fatal error when flushing the state store, for 
example sending changelog records failed
+ *  or flushing state store get IO errors; such 
error should cause the thread to die
+ */
+protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
+final Map offsetSnapshot = 
stateMgr.changelogOffsets();
+if (StateManagerUtil.checkpointNeeded(enforceCheckpoint, 
offsetSnapshotSinceLastFlush, offsetSnapshot)) {
+// since there's no written offsets we can checkpoint with empty 
map,
+// and the state's current offset would be used to checkpoint
+stateMgr.flush();
+stateMgr.checkpoint(Collections.emptyMap());

Review comment:
   I see what did you mean now. Will do.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huxihx commented on a change in pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively

2020-07-26 Thread GitBox


huxihx commented on a change in pull request #9081:
URL: https://github.com/apache/kafka/pull/9081#discussion_r460608228



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -687,7 +687,7 @@ public void sendOffsetsToTransaction(Map offs
 throwIfProducerClosed();
 TransactionalRequestResult result = 
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
 sender.wakeup();
-result.await();
+result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);

Review comment:
   javadoc for this method should be updated as well.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huxihx commented on a change in pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively

2020-07-26 Thread GitBox


huxihx commented on a change in pull request #9081:
URL: https://github.com/apache/kafka/pull/9081#discussion_r460609876



##
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##
@@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness {
 TestUtils.waitUntilTrue(() => 
offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot 
read committed offset")
   }
 
+  @Test(expected = classOf[TimeoutException])
+  def testSendOffsetsToTransactionTimeout(): Unit = {
+val producer = createTransactionalProducer("transactionProducer", 
maxBlockMs = 1000)
+producer.initTransactions()
+producer.beginTransaction()
+producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, 
"foo".getBytes, "bar".getBytes))
+
+for (i <- 0 until servers.size)
+  killBroker(i)
+
+val offsets = new mutable.HashMap[TopicPartition, 
OffsetAndMetadata]().asJava
+offsets.put(new TopicPartition(topic1, 0), new OffsetAndMetadata(0))
+try {
+  producer.sendOffsetsToTransaction(offsets, "test-group")

Review comment:
   Using `Map(new TopicPartition(topic1, 0) -> new 
OffsetAndMetadata(0)).asJava` is better. No need to import 
scala.collection.mutable package.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Mathieu1124 commented on pull request #9076: MINOR - fix typo

2020-07-26 Thread GitBox


Mathieu1124 commented on pull request #9076:
URL: https://github.com/apache/kafka/pull/9076#issuecomment-664081255


   > I think it was grammatically correct as it stood before this change. This 
change, if it were to occur, would have to state `if the leader fails` (the 
current change says `if the leader fail`, which is incorrect grammar). So I 
believe this is a choice between two grammatically correct possibilities. I 
personally don't see the need for a change.
   
   Reasonable



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Mathieu1124 closed pull request #9076: MINOR - fix typo

2020-07-26 Thread GitBox


Mathieu1124 closed pull request #9076:
URL: https://github.com/apache/kafka/pull/9076


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-26 Thread GitBox


guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r460621030



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -243,18 +242,24 @@ public void handleAssignment(final Map> activeTasks,
 
 for (final Task task : tasksToClose) {
 try {
-if (task.isActive()) {
-// Active tasks are revoked and suspended/committed during 
#handleRevocation
-if (!task.state().equals(State.SUSPENDED)) {
-log.error("Active task {} should be suspended prior to 
attempting to close but was in {}",
-  task.id(), task.state());
-throw new IllegalStateException("Active task " + 
task.id() + " should have been suspended");
-}
-} else {
-task.suspend();
-task.prepareCommit();
-task.postCommit();
+// Always try to first suspend and commit the task before 
closing it;
+// some tasks may already be suspended which should be a no-op.
+//
+// Also since active tasks should already be suspended / 
committed and
+// standby tasks should have no offsets to commit, we should 
expect nothing to commit
+task.suspend();
+
+final Map offsets = 
task.prepareCommit();
+
+if (!offsets.isEmpty()) {
+log.error("Task {} should has been committed prior to 
attempting to close, but it reports non-empty offsets {} to commit",

Review comment:
   I've reordered in handleRevocation so that commit is called before 
suspension; also even if commit failed with TaskMigrated, we would still try to 
continue completing the suspension so in handleAssignment we should have all 
active tasks suspended or should be closed dirty.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9077: MINOR: remove NewTopic#NO_PARTITIONS and NewTopic#NO_REPLICATION_FACT…

2020-07-26 Thread GitBox


abbccdda commented on pull request #9077:
URL: https://github.com/apache/kafka/pull/9077#issuecomment-664098665


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9077: MINOR: remove NewTopic#NO_PARTITIONS and NewTopic#NO_REPLICATION_FACT…

2020-07-26 Thread GitBox


abbccdda commented on pull request #9077:
URL: https://github.com/apache/kafka/pull/9077#issuecomment-664098712


   got 2/3 in previous run



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9049: MINOR: fix scala warnings

2020-07-26 Thread GitBox


abbccdda commented on pull request #9049:
URL: https://github.com/apache/kafka/pull/9049#issuecomment-664102269


   Sg, will close the PR @ijuma 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda closed pull request #9049: MINOR: fix scala warnings

2020-07-26 Thread GitBox


abbccdda closed pull request #9049:
URL: https://github.com/apache/kafka/pull/9049


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9077: MINOR: remove NewTopic#NO_PARTITIONS and NewTopic#NO_REPLICATION_FACT…

2020-07-26 Thread GitBox


chia7712 commented on pull request #9077:
URL: https://github.com/apache/kafka/pull/9077#issuecomment-664103549


   @abbccdda thanks for reviews!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang opened a new pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed [WIP]

2020-07-26 Thread GitBox


guozhangwang opened a new pull request #9083:
URL: https://github.com/apache/kafka/pull/9083


   Should be reviewed after https://github.com/apache/kafka/pull/8964 is 
merged, in which we first commit (flush) then suspend.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-26 Thread GitBox


showuon commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-664133647


   hi @ning2008wisc , thanks for your suggestion. but I don't think it's good 
to merge all the 4 commits into 1. I think the commit history is a kind of 
records to keep why the author did this change at that time. Someone can always 
know why we did this change by checking the commit history or the PR records. 
Also, the reviewer can know which commits have reviewed, and which one is new 
added to address reviewer's previous comments (like the above comments I left 
to @mimaison ).
   
   I know there's some debate over that, but I think if this project(Kafka) 
doesn't have this rule, it should be fine to keep it as is. Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang opened a new pull request #9084: MINOR: Preserve Kafka exception from RebalanceListener [Do Not Merge]

2020-07-26 Thread GitBox


guozhangwang opened a new pull request #9084:
URL: https://github.com/apache/kafka/pull/9084


   Some of the rebalance listener may be implemented by Kafka as well, e.g. 
Connect and Streams, and if the exception thrown is actually a KafkaException, 
then we should not wrap it but directly throw the exception from the listener.
   
   Unit tests to be added; cc @abbccdda to review.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-26 Thread GitBox


showuon edited a comment on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-664133647


   hi @ning2008wisc , thanks for your suggestion. but I don't think it's good 
to merge all the 4 commits into 1. I think the commit history is a kind of 
records to keep why the author did this change at that time. Someone can always 
know why we did this change by checking the commit history or the PR records. 
Also, the reviewer can know which commits have reviewed, and which one is new 
added (like the above comments I left to @mimaison ).
   
   I know there's some debate over that, but I think if this project(Kafka) 
doesn't have this rule, it should be fine to keep it as is. Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-26 Thread GitBox


showuon edited a comment on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-664133647


   hi @ning2008wisc , thanks for your suggestion. but I don't think it's good 
to merge all the 4 commits into 1. I think the commit history is a kind of 
records to keep why the author did this change at that time. Someone can always 
know why we did this change by checking the commit history or the PR records. 
Also, the reviewer can know which commits have reviewed, and which one is new 
added (like the above comments I left to @mimaison ).
   
   I know there's some debate over that, but I think if this project(Kafka) 
doesn't have this rule, I'd prefer to keep it as is. Thank you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9084: MINOR: Preserve Kafka exception from RebalanceListener [Do Not Merge]

2020-07-26 Thread GitBox


abbccdda commented on a change in pull request #9084:
URL: https://github.com/apache/kafka/pull/9084#discussion_r460671888



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -728,7 +728,11 @@ protected void onJoinPrepare(int generation, String 
memberId) {
 subscriptions.resetGroupSubscription();
 
 if (exception != null) {
-throw new KafkaException("User rebalance callback throws an 
error", exception);
+if (exception instanceof KafkaException) {
+throw (KafkaException) exception;

Review comment:
   Why do we need this cast?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand

2020-07-27 Thread GitBox


viktorsomogyi commented on pull request #4090:
URL: https://github.com/apache/kafka/pull/4090#issuecomment-664175913


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-07-27 Thread GitBox


viktorsomogyi commented on pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#issuecomment-664176258


   @ijuma would you please review this or suggest who is the right committer to 
help with this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-07-27 Thread GitBox


ijuma commented on pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#issuecomment-664180174


   @mimaison, would you be able to review this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand

2020-07-27 Thread GitBox


ijuma commented on a change in pull request #4090:
URL: https://github.com/apache/kafka/pull/4090#discussion_r460711206



##
File path: core/src/test/scala/unit/kafka/utils/JsonTest.scala
##
@@ -41,14 +42,23 @@ class JsonTest {
 val jnf = JsonNodeFactory.instance
 
 assertEquals(Json.parseFull("{}"), Some(JsonValue(new ObjectNode(jnf
+assertEquals(Json.tryParseFull("{}"), Right(JsonValue(new 
ObjectNode(jnf
+org.junit.Assert.assertThrows(classOf[IllegalArgumentException], () => 
Json.tryParseFull(null))
+org.junit.Assert.assertThrows(classOf[IllegalArgumentException], () => 
Json.tryParseBytes(null))

Review comment:
   Why can't we just say `assertThrows` here?

##
File path: core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala
##
@@ -30,7 +30,7 @@ import org.apache.kafka.common.{Node, TopicPartition, 
TopicPartitionInfo, TopicP
 import org.junit.Assert.{assertEquals, assertFalse, assertThrows, assertTrue}
 import org.junit.function.ThrowingRunnable
 import org.junit.rules.Timeout
-import org.junit.{After, Assert, Before, Rule, Test}
+import org.junit._

Review comment:
   We should not use a wildcard imports here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9067: MINOR: Streams integration tests should not call exit

2020-07-27 Thread GitBox


ijuma commented on a change in pull request #9067:
URL: https://github.com/apache/kafka/pull/9067#discussion_r460714125



##
File path: checkstyle/suppressions.xml
##
@@ -17,8 +17,14 @@
   files="(ApiMessageType).java|MessageDataGenerator.java"/>
 
+
 
 
+

Review comment:
   It seems really brittle to have line numbers in the suppression. Could 
we:
   1. Remove the `lines` part.
   2. Give a name to the `Regexp` so that we only exclude the one related to 
`System.exit` (not sure if this is possible)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9067: MINOR: Streams integration tests should not call exit

2020-07-27 Thread GitBox


ijuma commented on a change in pull request #9067:
URL: https://github.com/apache/kafka/pull/9067#discussion_r460714547



##
File path: 
streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java
##
@@ -153,8 +154,8 @@ public void run() {
 streams.start();
 latch.await();
 } catch (final Throwable e) {
-System.exit(1);
+Exit.exit(1);
 }
-System.exit(0);
+Exit.exit(0);

Review comment:
   I think it was intentional not to use an internal class in demos that 
others may copy.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9067: MINOR: Streams integration tests should not call exit

2020-07-27 Thread GitBox


ijuma commented on a change in pull request #9067:
URL: https://github.com/apache/kafka/pull/9067#discussion_r460714731



##
File path: tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
##
@@ -241,7 +240,7 @@ public static VerifiableProducer 
createFromArgs(ArgumentParser parser, String[]
 } else {
 parser.printHelp();
 // Can't use `Exit.exit` here because it didn't exist until 
0.11.0.0.
-System.exit(0);
+Exit.exit(0);

Review comment:
   Did you read the comment immediately above this line?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9067: MINOR: Streams integration tests should not call exit

2020-07-27 Thread GitBox


ijuma commented on a change in pull request #9067:
URL: https://github.com/apache/kafka/pull/9067#discussion_r460714731



##
File path: tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
##
@@ -241,7 +240,7 @@ public static VerifiableProducer 
createFromArgs(ArgumentParser parser, String[]
 } else {
 parser.printHelp();
 // Can't use `Exit.exit` here because it didn't exist until 
0.11.0.0.
-System.exit(0);
+Exit.exit(0);

Review comment:
   Did you read the comment immediately above this line? Only applies to 
`VerifiableConsumer`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9067: MINOR: Streams integration tests should not call exit

2020-07-27 Thread GitBox


ijuma commented on a change in pull request #9067:
URL: https://github.com/apache/kafka/pull/9067#discussion_r460714731



##
File path: tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
##
@@ -241,7 +240,7 @@ public static VerifiableProducer 
createFromArgs(ArgumentParser parser, String[]
 } else {
 parser.printHelp();
 // Can't use `Exit.exit` here because it didn't exist until 
0.11.0.0.
-System.exit(0);
+Exit.exit(0);

Review comment:
   Did you read the comment immediately above this line? Also applies to 
`VerifiableConsumer`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8936: KAFKA-10207: Fixed padded timeindex causing premature data deletion

2020-07-27 Thread GitBox


ijuma commented on pull request #8936:
URL: https://github.com/apache/kafka/pull/8936#issuecomment-664201898


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   4   5   6   7   8   9   10   >