[GitHub] [kafka] abbccdda commented on pull request #8907: MINOR: code cleanup for `VOut` inconsistent naming
abbccdda commented on pull request #8907: URL: https://github.com/apache/kafka/pull/8907#issuecomment-663817294 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-663818439 @junrao I have rebased this PR to include fix of ```group_mode_transactions_test```. Could you run system tests again? Except for ```streams_eos_test``` and transaction tests, other tests work well on my local. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 edited a comment on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 edited a comment on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-663818439 @junrao I have rebased this PR to include fix of ```group_mode_transactions_test```. Could you run system tests again? Except for ```streams_eos_test```, ```streams.streams_eos_test``` and transaction tests, other tests work well on my local. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 edited a comment on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 edited a comment on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-663818439 @junrao I have rebased this PR to include fix of ```group_mode_transactions_test```. Could you run system tests again? Except for ```streams_eos_test```, ```streams_standby_replica_test``` and transaction tests, other tests work well on my local. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] AshishRoyJava commented on pull request #9034: KAFKA-10246 : AbstractProcessorContext topic() throws NPE
AshishRoyJava commented on pull request #9034: URL: https://github.com/apache/kafka/pull/9034#issuecomment-663825200 All checks are failing due to some other test case failure. Can anyone look into the logs please? @abbccdda This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] huxihx commented on pull request #9071: KAFKA-10305: Print usage when parsing fails for ConsumerPerformance
huxihx commented on pull request #9071: URL: https://github.com/apache/kafka/pull/9071#issuecomment-663829002 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9034: KAFKA-10246 : AbstractProcessorContext topic() throws NPE
chia7712 commented on pull request #9034: URL: https://github.com/apache/kafka/pull/9034#issuecomment-663833458 @AshishRoyJava Nice finding and fixing! > AbstractProcessorContext topic() throws NullPointerException when modifying a state store within the DSL from a punctuator Could you share more details? It seems to me it is weird to see a ```ProcessorRecordContext``` carrying ```null``` topic. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rootex- commented on pull request #6700: KAFKA-7817 ConsumerGroupCommand Regex Feature
rootex- commented on pull request #6700: URL: https://github.com/apache/kafka/pull/6700#issuecomment-663834666 @abbccdda Yes, sure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Mathieu1124 opened a new pull request #9076: MINOR - fix typo
Mathieu1124 opened a new pull request #9076: URL: https://github.com/apache/kafka/pull/9076 *FIx typo in ProducerConfig* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] huxihx commented on pull request #9071: KAFKA-10305: Print usage when parsing fails for ConsumerPerformance
huxihx commented on pull request #9071: URL: https://github.com/apache/kafka/pull/9071#issuecomment-663842491 @omkreddy Thanks for the review, merging to trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] huxihx merged pull request #9071: KAFKA-10305: Print usage when parsing fails for ConsumerPerformance
huxihx merged pull request #9071: URL: https://github.com/apache/kafka/pull/9071 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 opened a new pull request #9077: MINOR: remove NewTopic#NO_PARTITIONS and NewTopic#NO_REPLICATION_FACT…
chia7712 opened a new pull request #9077: URL: https://github.com/apache/kafka/pull/9077 The constants used to represent no_partition and no_replica are important factor in creating topic. Hence, we should have consistent reference to code base. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rgroothuijsen opened a new pull request #9078: KAFKA-10132: Return correct value types for MBean attributes
rgroothuijsen opened a new pull request #9078: URL: https://github.com/apache/kafka/pull/9078 Currently, JMX outputs all metrics as having type `double`, even if they are strings or other types of numbers. This PR gets the type from the metric's value if possible, with `null` as a fallback. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rgroothuijsen commented on pull request #9078: KAFKA-10132: Return correct value types for MBean attributes
rgroothuijsen commented on pull request #9078: URL: https://github.com/apache/kafka/pull/9078#issuecomment-663854901 I'm not entirely sure about returning raw nulls as a fallback, however, or how permissive it should be about null values in general. Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
vvcephei commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460404740 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +final AtomicLong position = new AtomicLong(); +retryUntilSuccessOrThrowOnTaskTimeout( +() -> position.set(globalConsumer.position(topicPartition)), +String.format( +"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", +topicPartition +) +); +offset = position.get(); } -long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; +long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords records = globalConsumer.poll(pollTime); +if (records.isEmpty()) { +if (taskTimeoutMs == 0L) { +deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: Thanks, @mjsax , that's fair. I promise I'm no equivocating here; I'm just trying to understand why my intuition is that this is wrong. It seems like maybe the fundamental problem here is that we can't distinguish among a successful poll that returns no data, a failure to poll, and a pending async fetch request. The one thing we know is that the end offset is beyond our current position, so there _should_ be data to poll, so we can assume that an empty return means either that the fetch failed internally or it hasn't completed yet. Stepping back, this seems to be related to the problem of task idling, in which it's pointless to "idle" for a time so short that we have no chance to actually get a response back from the broker. I feel like this is substantially my fault from #4855 / KIP-266. The purpose of making this API completely async was to avoid harming liveness in situations where we might have a relatively strict deadline. But that's not the case here. I guess the "poor man's" solution we're going for here is to block poll for at least long enough to allow for a complete fetch round-trip from the broker. If we know that there was a round-trip, and we didn't get any data, then we can conclude that there was an error (since we know there is data to get). Since we can't know that there was a round trip, we weaken the condition to: if we know it's been long enough that there should have been a round-trip and we don't get data, we conclude there was probably an error. In your KIP, we specified we would start the task timer _after_ the first error, so it seems like we really want to just block the poll for the round-trip time, and then apply your "update deadline, etc." function. I'm with you now that to get the round-trip time, we have to extract some config(s) from the Consumer. This is a pretty awkward hack, but now that I've thought it through, it seems the best we can do. Maybe we can mull it over and file an improvement jira for the Consumer to improve use cases like this. Anyway, it seems like the "poll time" config is irrelevant, we just need to know what config to grab that corresponds to completing a fetch request with high probability. It seems like we shouldn't need to update metadata, so we would send a fetch request on the first poll call, and we just need to block for whatever time bounds the fetch response time. I'm honestly not sure what timeout would be best here. It looks like the ConsumerNetworkClient will just wait for a response until it gets a "disconnect" (L598). Is that a socket timeout? I'm not sure. Thi
[GitHub] [kafka] vvcephei commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
vvcephei commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460404740 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +final AtomicLong position = new AtomicLong(); +retryUntilSuccessOrThrowOnTaskTimeout( +() -> position.set(globalConsumer.position(topicPartition)), +String.format( +"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", +topicPartition +) +); +offset = position.get(); } -long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; +long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords records = globalConsumer.poll(pollTime); +if (records.isEmpty()) { +if (taskTimeoutMs == 0L) { +deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: Thanks, @mjsax , that's fair. I promise I'm not equivocating here; I'm just trying to figure out what my intuition is trying to tell me. It seems like maybe the fundamental problem here is that we can't distinguish among a successful poll that returns no data, a failure to poll, and a pending async fetch request. The one thing we know is that the end offset is beyond our current position, so there _should_ be data to poll, so we can assume that an empty return means either that the fetch failed internally or it hasn't completed yet. Stepping back, this seems to be related to the problem of task idling, in which it's pointless to "idle" for a time so short that we have no chance to actually get a response back from the broker. I feel like this is substantially my fault from #4855 / KIP-266. The purpose of making this API completely async was to avoid harming liveness in situations where we might have a relatively strict deadline. But that's not the case here. I guess the "poor man's" solution we're going for here is to block poll for at least long enough to allow for a complete fetch round-trip from the broker. If we know that there was a round-trip, and we didn't get any data, then we can conclude that there was an error (since we know there is data to get). Since we can't know that there was a round trip, we weaken the condition to: if we know it's been long enough that there should have been a round-trip and we don't get data, we conclude there was probably an error. In your KIP, we specified we would start the task timer _after_ the first error, so it seems like we really want to just block the poll for the round-trip time, and then apply your "update deadline, etc." function. I'm with you now that to get the round-trip time, we have to extract some config(s) from the Consumer. This is a pretty awkward hack, but now that I've thought it through, it seems the best we can do. Maybe we can mull it over and file an improvement jira for the Consumer to improve use cases like this. Anyway, it seems like the "poll time" config is irrelevant, we just need to know what config to grab that corresponds to completing a fetch request with high probability. It seems like we shouldn't need to update metadata, so we would send a fetch request on the first poll call, and we just need to block for whatever time bounds the fetch response time. I'm honestly not sure what timeout would be best here. It looks like the ConsumerNetworkClient will just wait for a response until it gets a "disconnect" (L598). Is that a socket timeout? I'm not sure. Th
[GitHub] [kafka] cadonna commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
cadonna commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663867610 Java 11 failed with ``` 08:16:38 org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest > shouldUpgradeFromEosAlphaToEosBeta[true] FAILED 08:16:38 java.lang.AssertionError: Did not receive all 20 records from topic multiPartitionOutputTopic within 6 ms 08:16:38 Expected: is a value equal to or greater than <20> 08:16:38 but: <0> was less than <20> ``` Java 14 failed with ``` 06:37:03 kafka.network.SocketServerTest > testIdleConnection FAILED 06:37:03 org.scalatest.exceptions.TestFailedException: Failed to close idle channel ``` Java 8 failed with ``` java.lang.AssertionError: Did not receive all 168 records from topic multiPartitionOutputTopic within 6 ms Expected: is a value equal to or greater than <168> but: <167> was less than <168> ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna edited a comment on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
cadonna edited a comment on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663867610 Java 11 failed with ``` 08:16:38 org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest > shouldUpgradeFromEosAlphaToEosBeta[true] FAILED 08:16:38 java.lang.AssertionError: Did not receive all 20 records from topic multiPartitionOutputTopic within 6 ms 08:16:38 Expected: is a value equal to or greater than <20> 08:16:38 but: <0> was less than <20> ``` Java 14 failed with ``` 06:37:03 kafka.network.SocketServerTest > testIdleConnection FAILED 06:37:03 org.scalatest.exceptions.TestFailedException: Failed to close idle channel ``` Java 8 failed with ``` org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] java.lang.AssertionError: Did not receive all 168 records from topic multiPartitionOutputTopic within 6 ms Expected: is a value equal to or greater than <168> but: <167> was less than <168> ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...
ijuma commented on a change in pull request #9022: URL: https://github.com/apache/kafka/pull/9022#discussion_r460416614 ## File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala ## @@ -672,11 +677,9 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin adminClient.createTopics( Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(configMap))).all().get() waitForTopicCreated(testTopicName) -TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1) +TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 1000, acks = -1) Review comment: The broker only knows about batches, not individual records. ## File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala ## @@ -672,11 +677,9 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin adminClient.createTopics( Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(configMap))).all().get() waitForTopicCreated(testTopicName) -TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1) +TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 1000, acks = -1) Review comment: The broker only works in terms of record batches, not individual records. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...
lbradstreet commented on a change in pull request #9022: URL: https://github.com/apache/kafka/pull/9022#discussion_r460416611 ## File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala ## @@ -672,11 +677,9 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin adminClient.createTopics( Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(configMap))).all().get() waitForTopicCreated(testTopicName) -TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1) +TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 1000, acks = -1) val brokerIds = servers.map(_.config.brokerId) -TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp), throttleBytes = 1) Review comment: I simply meant that we still need the replication throttle so the reassignment doesn't complete before we perform our checks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...
ijuma commented on a change in pull request #9022: URL: https://github.com/apache/kafka/pull/9022#discussion_r460416797 ## File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala ## @@ -673,10 +678,14 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(configMap))).all().get() waitForTopicCreated(testTopicName) TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1) +Thread.sleep(10) Review comment: We can call `flush` on the producer to force it to send the messages. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...
ijuma commented on a change in pull request #9022: URL: https://github.com/apache/kafka/pull/9022#discussion_r460417143 ## File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala ## @@ -673,10 +678,14 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(configMap))).all().get() waitForTopicCreated(testTopicName) TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1) +Thread.sleep(10) Review comment: Looking a bit more, we actually close the producer after we generate the messages, so I don't think this sleep is needed at all. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9052: MINOR: TopologyTestDriver should not require dummy parameters
abbccdda commented on pull request #9052: URL: https://github.com/apache/kafka/pull/9052#issuecomment-663869941 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9034: KAFKA-10246 : AbstractProcessorContext topic() throws NPE
abbccdda commented on pull request #9034: URL: https://github.com/apache/kafka/pull/9034#issuecomment-663870109 Test failure should be non related. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda merged pull request #9034: KAFKA-10246 : AbstractProcessorContext topic() throws NPE
abbccdda merged pull request #9034: URL: https://github.com/apache/kafka/pull/9034 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #8907: MINOR: code cleanup for `VOut` inconsistent naming
abbccdda commented on pull request #8907: URL: https://github.com/apache/kafka/pull/8907#issuecomment-663870524 Test failure non related. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda merged pull request #8907: MINOR: code cleanup for `VOut` inconsistent naming
abbccdda merged pull request #8907: URL: https://github.com/apache/kafka/pull/8907 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
chia7712 commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663870629 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...
ijuma commented on pull request #9022: URL: https://github.com/apache/kafka/pull/9022#issuecomment-663871416 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 opened a new pull request #9079: KAFKA-10308 fix flaky core/round_trip_fault_test.py
chia7712 opened a new pull request #9079: URL: https://github.com/apache/kafka/pull/9079 Creating a topic may fail (due to timeout) in running system tests. However, ```RoundTripWorker``` does not ignore ```TopicExistsException``` which makes ```round_trip_fault_test.py``` be a flaky one. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...
ijuma commented on pull request #9022: URL: https://github.com/apache/kafka/pull/9022#issuecomment-663871604 https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma removed a comment on pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...
ijuma removed a comment on pull request #9022: URL: https://github.com/apache/kafka/pull/9022#issuecomment-663871604 https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...
ijuma commented on pull request #9022: URL: https://github.com/apache/kafka/pull/9022#issuecomment-663871642 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 closed pull request #5708: Minor: remove WorkerCoordinatorMetrics and instantiate the metrics in the constructor of WorkerCoordinator
chia7712 closed pull request #5708: URL: https://github.com/apache/kafka/pull/5708 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 closed pull request #8137: KAFKA-8967 Flaky test kafka.api.SaslSslAdminIntegrationTest.testCreat…
chia7712 closed pull request #8137: URL: https://github.com/apache/kafka/pull/8137 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException
mjsax commented on pull request #9075: URL: https://github.com/apache/kafka/pull/9075#issuecomment-663905239 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException
mjsax commented on pull request #9075: URL: https://github.com/apache/kafka/pull/9075#issuecomment-663905333 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
mjsax commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663906774 @chia7712 Only committers can trigger Jenkins builds. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
mjsax commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663906866 Java 8 passed the tests in question are know to be flaky. Might be good enough? Leave it to @vvcephei to make a call and merge. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460449901 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +final AtomicLong position = new AtomicLong(); +retryUntilSuccessOrThrowOnTaskTimeout( +() -> position.set(globalConsumer.position(topicPartition)), +String.format( +"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", +topicPartition +) +); +offset = position.get(); } -long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; +long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords records = globalConsumer.poll(pollTime); +if (records.isEmpty()) { +if (taskTimeoutMs == 0L) { +deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: I agree. The "issue" is really that `poll()` does _not_ throw a `TimeoutException`... Also, because we do manual assignment, `poll()` would never return "early" as it never need to wait for joining a consumer group. -- However, compare to `max.task.idle.ms`, we are in a better situation here, because we poll() for only a single partition at a time. I also agree, that applying `task.timeout.ms` should start _after_ we got a first timeout -- this was how the original code worked that you criticized as: > Man, this is confusing. And I agree, that the code was not straightforward to understand. But if we think it's the right thing to do, I am also happy to add it back :) I am also not an expert on all consumer internals, but from my understanding, fetch requests are send async in general, and if a fetch request fails, the consumer would actually not retry it but a retry would be triggered by the next `poll()` call. If there is no data available (ie, fetch request did not return yet) when `poll()` is called, the consumer would block internally until `poll(Duration)` timeout expires or until a fetch request returns (whatever comes first). Furthermore, before `poll()` returns, it always check if a fetch request is in-flight or not, and sends one if not. Thus, on the verify first call to `poll()` we know that no fetch request can be in-flight and we also know that `poll()` would send one, and block until it returns or `poll(Duration)` expired. Thus, if `poll()` does not block for at least `request.timeout.ms`, and we get empty back we don't know which case holds, however, if we use the request timeout, it seems that we _know_ if the fetch was successful or did time out? We also know, that a fetch request will be inflight after `poll()` returns. Thus, for any consecutive `poll()` applying request timeout also ensures that we know if the request was successful or not. I guess the only difference to what I just described to my original code was, that I uses `pollTime + requestTimeout`. Bottom line: I am not 100% sure what you propose? Should we go with the original design? Or with the new design? -- In the end, I think we don't need a follow up PR, and we can just try to get it right in this PR. I don't see any benefit in splitting it up into 2 PRs (because, as mentioned above, we fetch for a single partitions and thus it's a different case compared to `max.task.idle.ms` scenario). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For qu
[GitHub] [kafka] chia7712 commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
chia7712 commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663912187 > Only committers can trigger Jenkins builds. It seems I can trigger the jenkins by comment “ Test this please”. Let me test it again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
chia7712 commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663912465 Jenkins is running now :( Maybe it does not exclude the committers from other Apache projects. At any rate, I will not trigger Jenkins in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9049: MINOR: fix scala warnings
ijuma commented on a change in pull request #9049: URL: https://github.com/apache/kafka/pull/9049#discussion_r460453337 ## File path: core/src/main/scala/kafka/admin/ConfigCommand.scala ## @@ -344,11 +342,14 @@ object ConfigCommand extends Config { val sensitiveEntries = newEntries.filter(_._2.value == null) if (sensitiveEntries.nonEmpty) throw new InvalidConfigurationException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}") -val newConfig = new JConfig(newEntries.asJava.values) + +val alterLogLevelEntries = (newEntries.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET)) + ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) } + ).asJavaCollection val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityNameHead) val alterOptions = new AlterConfigsOptions().timeoutMs(3).validateOnly(false) -adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) +adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) Review comment: Some broker versions don't support incremental alter configs, so you would be breaking compatibility by making this change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9049: MINOR: fix scala warnings
ijuma commented on a change in pull request #9049: URL: https://github.com/apache/kafka/pull/9049#discussion_r460453389 ## File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala ## @@ -384,8 +380,8 @@ class AclAuthorizer extends Authorizer with Logging { val prefixed = new ArrayBuffer[AclEntry] aclCacheSnapshot - .from(new ResourcePattern(resourceType, resourceName, PatternType.PREFIXED)) - .to(new ResourcePattern(resourceType, resourceName.take(1), PatternType.PREFIXED)) + .rangeFrom(new ResourcePattern(resourceType, resourceName, PatternType.PREFIXED)) + .rangeTo(new ResourcePattern(resourceType, resourceName.take(1), PatternType.PREFIXED)) Review comment: Scala 2.12 doesn't have these methods. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9063: MINOR: Fixed deprecated Gradle build Properties.
ijuma commented on pull request #9063: URL: https://github.com/apache/kafka/pull/9063#issuecomment-663913074 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9063: MINOR: Fixed deprecated Gradle build Properties.
ijuma commented on a change in pull request #9063: URL: https://github.com/apache/kafka/pull/9063#discussion_r460453530 ## File path: build.gradle ## @@ -304,7 +304,7 @@ subprojects { } test { -maxParallelForks = userMaxForks ?: Runtime.runtime.availableProcessors() +maxParallelForks = userMaxForks ?: Runtime.runtime.availableProcessors() as int Review comment: What is the reason for the various `as` additions? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...
ijuma commented on pull request #9022: URL: https://github.com/apache/kafka/pull/9022#issuecomment-663913281 https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9079: KAFKA-10308 fix flaky core/round_trip_fault_test.py
ijuma commented on pull request #9079: URL: https://github.com/apache/kafka/pull/9079#issuecomment-663913254 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma removed a comment on pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...
ijuma removed a comment on pull request #9022: URL: https://github.com/apache/kafka/pull/9022#issuecomment-663913281 https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request #9080: MINOR: Recommend Java 11
ijuma opened a new pull request #9080: URL: https://github.com/apache/kafka/pull/9080 Java 11 has been recommended for a while, but the ops section had not been updated. Also tweak the text a bit to read better. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks
jsancio commented on a change in pull request #9050: URL: https://github.com/apache/kafka/pull/9050#discussion_r460467261 ## File path: core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala ## @@ -598,6 +603,86 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { }, "Broker fail to initialize after restart") } + @Test + def testPreemptionOnControllerShutdown(): Unit = { +servers = makeServers(1, enableControlledShutdown = false) +val controller = getController().kafkaController +val count = new AtomicInteger(2) +val latch = new CountDownLatch(1) +val spyThread = spy(controller.eventManager.thread) +controller.eventManager.setControllerEventThread(spyThread) +val processedEvent = new MockEvent(ControllerState.TopicChange) { + override def process(): Unit = latch.await() + override def preempt(): Unit = {} +} +val preemptedEvent = new MockEvent(ControllerState.TopicChange) { + override def process(): Unit = {} + override def preempt(): Unit = count.decrementAndGet() +} + +controller.eventManager.put(processedEvent) +controller.eventManager.put(preemptedEvent) +controller.eventManager.put(preemptedEvent) + +doAnswer((_: InvocationOnMock) => { + latch.countDown() +}).doCallRealMethod().when(spyThread).awaitShutdown() Review comment: Got it. Sounds good. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
mjsax commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663926510 Ah yes. I work for all committers -- it's not project specific. Missed that you are HBase Committer/PMC :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks
jsancio commented on a change in pull request #9050: URL: https://github.com/apache/kafka/pull/9050#discussion_r460468241 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1986,101 +1965,125 @@ private[controller] class ControllerStats extends KafkaMetricsGroup { sealed trait ControllerEvent { def state: ControllerState + def preempt(): Unit Review comment: I would add a documentation comment to this method explaining that this method will not be executed by the controller thread but instead it will be executed by some other thread. ## File path: core/src/main/scala/kafka/controller/ControllerEventManager.scala ## @@ -77,7 +77,7 @@ class ControllerEventManager(controllerId: Int, private val putLock = new ReentrantLock() private val queue = new LinkedBlockingQueue[QueuedEvent] // Visible for test - private[controller] val thread = new ControllerEventThread(ControllerEventThreadName) + private[controller] var thread = new ControllerEventThread(ControllerEventThreadName) Review comment: This comment applies to `clearAndPut`: ```scala def clearAndPut(event: ControllerEvent): QueuedEvent = inLock(putLock) { queue.forEach(_.preempt(processor)) queue.clear() put(event) } ``` I think there is a bug here where at most one event will be process twice. Once by `_preempt(processor)` and once by `doWork`. I think we can fix this concurrency bug if we use `LinkedBlockingQueue::drainTo`. E.g. ```scala def clearAndPut(event: ControllerEvent): QueuedEvent = { val events = ...; inLock(putLock) { queue.drainTo(events) put(event) } event.forEach(_.preempt(processor)) } ``` ## File path: core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala ## @@ -598,6 +603,86 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { }, "Broker fail to initialize after restart") } + @Test + def testPreemptionOnControllerShutdown(): Unit = { +servers = makeServers(1, enableControlledShutdown = false) +val controller = getController().kafkaController +val count = new AtomicInteger(2) +val latch = new CountDownLatch(1) +val spyThread = spy(controller.eventManager.thread) +controller.eventManager.setControllerEventThread(spyThread) +val processedEvent = new MockEvent(ControllerState.TopicChange) { + override def process(): Unit = latch.await() + override def preempt(): Unit = {} +} +val preemptedEvent = new MockEvent(ControllerState.TopicChange) { + override def process(): Unit = {} + override def preempt(): Unit = count.decrementAndGet() Review comment: This method should be executed by the thread that is running this test. If you agree, no need to use an `AtomicInteger`. ## File path: core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala ## @@ -598,6 +603,86 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { }, "Broker fail to initialize after restart") } + @Test + def testPreemptionOnControllerShutdown(): Unit = { +servers = makeServers(1, enableControlledShutdown = false) +val controller = getController().kafkaController +val count = new AtomicInteger(2) +val latch = new CountDownLatch(1) +val spyThread = spy(controller.eventManager.thread) +controller.eventManager.setControllerEventThread(spyThread) +val processedEvent = new MockEvent(ControllerState.TopicChange) { + override def process(): Unit = latch.await() + override def preempt(): Unit = {} +} +val preemptedEvent = new MockEvent(ControllerState.TopicChange) { + override def process(): Unit = {} + override def preempt(): Unit = count.decrementAndGet() +} + +controller.eventManager.put(processedEvent) +controller.eventManager.put(preemptedEvent) +controller.eventManager.put(preemptedEvent) + +doAnswer((_: InvocationOnMock) => { + latch.countDown() +}).doCallRealMethod().when(spyThread).awaitShutdown() Review comment: The important part is that we want to do `latch.countDown` after `initiateShutdown` has been called so that the controller thread doesn't pick up a new event because `isRunning` is `false`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks
jsancio commented on a change in pull request #9050: URL: https://github.com/apache/kafka/pull/9050#discussion_r460468239 ## File path: core/src/main/scala/kafka/controller/ControllerEventManager.scala ## @@ -77,7 +77,7 @@ class ControllerEventManager(controllerId: Int, private val putLock = new ReentrantLock() private val queue = new LinkedBlockingQueue[QueuedEvent] // Visible for test - private[controller] val thread = new ControllerEventThread(ControllerEventThreadName) + private[controller] var thread = new ControllerEventThread(ControllerEventThreadName) Review comment: This comment applies to `clearAndPut`: ```scala def clearAndPut(event: ControllerEvent): QueuedEvent = inLock(putLock) { queue.forEach(_.preempt(processor)) queue.clear() put(event) } ``` I think there is a bug here where at most one event will be process twice. Once by `_preempt(processor)` and once by `doWork`. I think we can fix this concurrency bug if we use `LinkedBlockingQueue::drainTo`. E.g. ```scala def clearAndPut(event: ControllerEvent): QueuedEvent = { val preemptedEvents = ...; inLock(putLock) { queue.drainTo(preemptedEvents) put(event) } preemtedEvents.forEach(_.preempt(processor)) } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vitojeng commented on pull request #8907: MINOR: code cleanup for `VOut` inconsistent naming
vitojeng commented on pull request #8907: URL: https://github.com/apache/kafka/pull/8907#issuecomment-663928064 Thanks @abbccdda ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon edited a comment on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
showuon edited a comment on pull request #9029: URL: https://github.com/apache/kafka/pull/9029#issuecomment-663329158 @mimaison , thanks for your comments. I've updated in this commit: https://github.com/apache/kafka/pull/9029/commits/5345c6835ef42da973b794634d9b8d65f27ee80a. What I did are: 1. remove unused `time` variable and import 2. Use simple `for` loop instead of lambda loop 3. We'll wait for the consumer consume all 100 records before continuing For other improvements, @ning2008wisc will address in KAFKA-10304. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9022: KAFKA-10158: Fix flaky testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress
ijuma commented on pull request #9022: URL: https://github.com/apache/kafka/pull/9022#issuecomment-664002055 Green build, merging to trunk and 2.6. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #9022: KAFKA-10158: Fix flaky testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress
ijuma merged pull request #9022: URL: https://github.com/apache/kafka/pull/9022 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma edited a comment on pull request #9022: KAFKA-10158: Fix flaky testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress
ijuma edited a comment on pull request #9022: URL: https://github.com/apache/kafka/pull/9022#issuecomment-664002055 Green build, merging to trunk, 2.6 and 2.5. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sasakitoa opened a new pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively
sasakitoa opened a new pull request #9081: URL: https://github.com/apache/kafka/pull/9081 This PR will change KafkaProducer#sendOffsetsToTransaction to be affected by max.block.ms to avoid blocking infinitively. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request #9082: MINOR: Update dependencies for Kafka 2.7 (part 1)
ijuma opened a new pull request #9082: URL: https://github.com/apache/kafka/pull/9082 I left out updates that could be risky. Preliminary testing indicates we can build (including spotBugs) and run tests with Java 15 with these changes. I will do more thorough testing once Java 15 reaches release candidate stage in a few weeks. Minor updates with mostly bug fixes: - Scala: 2.12.11 -> 2.12.12 - Bouncy castle: 1.64 -> 1.66 - HttpClient: 4.5.11 -> 4.5.12 - Mockito: 3.3.3 -> 3.4.4 - Netty: 4.5.10 -> 4.5.11 - Snappy: 1.1.7.3 -> 1.1.7.6 - Zstd: 1.4.5-2 -> 1.4.5-6 Gradle plugin upgrades: - Gradle versions: 0.28.0 -> 0.29.0 - Git: 4.0.1 -> 4.0.2 - Scoverage: 1.4.1 -> 1.4.2 - Shadow: 5.2.0 -> 6.0.0 - Test Retry: 1.1.5 -> 1.1.6 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rgroothuijsen removed a comment on pull request #9078: KAFKA-10132: Return correct value types for MBean attributes
rgroothuijsen removed a comment on pull request #9078: URL: https://github.com/apache/kafka/pull/9078#issuecomment-663854901 I'm not entirely sure about returning raw nulls as a fallback, however, or how permissive it should be about null values in general. Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException
mjsax merged pull request #9075: URL: https://github.com/apache/kafka/pull/9075 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9031: KAFKA-10298: replace abstract Windows with a proper interface
vvcephei commented on a change in pull request #9031: URL: https://github.com/apache/kafka/pull/9031#discussion_r460562257 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/EnumerableWindowDefinition.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; + +import java.time.Duration; +import java.util.Map; + +import static org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS; + +/** + * The window specification for fixed size windows that is used to define window boundaries and grace period. + * + * Grace period defines how long to wait on out-of-order events. That is, windows will continue to accept new records until {@code stream_time >= window_end + grace_period}. + * Records that arrive after the grace period passed are considered late and will not be processed but are dropped. + * + * @param type of the window instance + */ +public interface EnumerableWindowDefinition { + + +/** + * List all possible windows that contain the provided timestamp, + * indexed by non-negative window start timestamps. + * + * @param timestamp the timestamp window should get created for + * @return a map of {@code windowStartTimestamp -> Window} entries + */ +Map windowsFor(final long timestamp); + +/** + * Return the size of the specified windows in milliseconds. Review comment: ```suggestion * Return an upper bound on the size of windows in milliseconds. * Used to determine the lower bound on store retention time. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #9080: MINOR: Recommend Java 11
ijuma merged pull request #9080: URL: https://github.com/apache/kafka/pull/9080 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9075: KAFKA-10306: GlobalThread should fail on InvalidOffsetException
mjsax commented on pull request #9075: URL: https://github.com/apache/kafka/pull/9075#issuecomment-664031539 Merged to `trunk` and cherry-picked to `2.6` branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460449901 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +final AtomicLong position = new AtomicLong(); +retryUntilSuccessOrThrowOnTaskTimeout( +() -> position.set(globalConsumer.position(topicPartition)), +String.format( +"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", +topicPartition +) +); +offset = position.get(); } -long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; +long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords records = globalConsumer.poll(pollTime); +if (records.isEmpty()) { +if (taskTimeoutMs == 0L) { +deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: I agree. The "issue" is really that `poll()` does _not_ throw a `TimeoutException`... Also, because we do manual assignment, `poll()` would never return "early" as it never need to wait for joining a consumer group. -- However, compare to `max.task.idle.ms`, we are in a better situation here, because we poll() for only a single partition at a time. I also agree, that applying `task.timeout.ms` should start _after_ we got a first timeout -- this was how the original code worked that you criticized as: > Man, this is confusing. And I agree, that the code was not straightforward to understand. But if we think it's the right thing to do, I am also happy to add it back :) I am also not an expert on all consumer internals, but from my understanding, fetch requests are send async in general, and if a fetch request fails, the consumer would actually not retry it but a retry would be triggered by the next `poll()` call. If there is no data available (ie, fetch request did not return yet) when `poll()` is called, the consumer would block internally until `poll(Duration)` timeout expires or until a fetch request returns (whatever comes first). Furthermore, before `poll()` returns, it always check if a fetch request is in-flight or not, and sends one if not. Thus, on the verify first call to `poll()` we know that no fetch request can be in-flight and we also know that `poll()` would send one, and block until it returns or `poll(Duration)` expires. Thus, if `poll()` does not block for at least `request.timeout.ms`, and we get empty back we don't know which case holds, however, if we use the request timeout, it seems that we _know_ if the fetch was successful or did time out? We also know, that a fetch request will be inflight after `poll()` returns. Thus, for any consecutive `poll()` applying request timeout also ensures that we know if the request was successful or not. I guess the only difference to what I just described to my original code was, that I uses `pollTime + requestTimeout`. Bottom line: I am not 100% sure what you propose? Should we go with the original design? Or with the new design? -- In the end, I think we don't need a follow up PR, and we can just try to get it right in this PR. I don't see any benefit in splitting it up into 2 PRs (because, as mentioned above, we fetch for a single partitions and thus it's a different case compared to `max.task.idle.ms` scenario). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For qu
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r460449901 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +final AtomicLong position = new AtomicLong(); +retryUntilSuccessOrThrowOnTaskTimeout( +() -> position.set(globalConsumer.position(topicPartition)), +String.format( +"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", +topicPartition +) +); +offset = position.get(); } -long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; +long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords records = globalConsumer.poll(pollTime); +if (records.isEmpty()) { +if (taskTimeoutMs == 0L) { +deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: I agree. The "issue" is really that `poll()` does _not_ throw a `TimeoutException`... Also, because we do manual assignment, `poll()` would never return "early" as it never need to wait for joining a consumer group. -- However, compare to `max.task.idle.ms`, we are in a better situation here, because we poll() for only a single partition at a time. I also agree, that applying `task.timeout.ms` should start _after_ we got a first timeout -- this was how the original code worked that you criticized as: > Man, this is confusing. And I agree, that the code was not straightforward to understand. But if we think it's the right thing to do, I am also happy to add it back :) I am also not an expert on all consumer internals, but from my understanding, fetch requests are send async in general, and if a fetch request fails, the consumer would actually not retry it but a retry would be triggered by the next `poll()` call. If there is no data available (ie, fetch request did not return yet) when `poll()` is called, the consumer would block internally until `poll(Duration)` timeout expires or until a fetch request returns (whatever comes first). Furthermore, before `poll()` returns, it always check if a fetch request is in-flight or not, and sends one if not. Thus, on the verify first call to `poll()` we know that no fetch request can be in-flight and we also know that `poll()` would send one, and block until it returns or `poll(Duration)` expires. Thus, if `poll()` does not block for at least `request.timeout.ms`, and we get an empty result back we don't know which case holds, however, if we use the request timeout, it seems that we _know_ if the fetch was successful or did time out? We also know, that a fetch request will be inflight after `poll()` returns. Thus, for any consecutive `poll()` applying request timeout also ensures that we know if the request was successful or not. I guess the only difference to what I just described to my original code was, that I uses `pollTime + requestTimeout`. Bottom line: I am not 100% sure what you propose? Should we go with the original design? Or with the new design? -- In the end, I think we don't need a follow up PR, and we can just try to get it right in this PR. I don't see any benefit in splitting it up into 2 PRs (because, as mentioned above, we fetch for a single partitions and thus it's a different case compared to `max.task.idle.ms` scenario). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comme
[GitHub] [kafka] ning2008wisc commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
ning2008wisc commented on pull request #9029: URL: https://github.com/apache/kafka/pull/9029#issuecomment-664045085 Hi @showuon thanks for your work, a minor thing - do you mind to consolidate / merge the current 4 commits into 1 commit? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #9068: MINOR: INFO log4j when request re-join
guozhangwang merged pull request #9068: URL: https://github.com/apache/kafka/pull/9068 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting
guozhangwang commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r460588651 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ## @@ -93,8 +93,8 @@ public boolean isActive() { public void initializeIfNeeded() { if (state() == State.CREATED) { StateManagerUtil.registerStateStores(log, logPrefix, topology, stateMgr, stateDirectory, processorContext); - // initialize the snapshot with the current offsets as we don't need to commit then until they change Review comment: Ack This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting
guozhangwang commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r460590891 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java ## @@ -38,13 +41,39 @@ */ final class StateManagerUtil { static final String CHECKPOINT_FILE_NAME = ".checkpoint"; +static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L; Review comment: You mean when the accumulated pending data is larger than the memtable, we should checkpoint? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting
guozhangwang commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r460592249 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -454,6 +456,41 @@ public void flush() { } } +public void flushCache() { +RuntimeException firstException = null; +// attempting to flush the stores +if (!stores.isEmpty()) { +log.debug("Flushing all store caches registered in the state manager: {}", stores); +for (final StateStoreMetadata metadata : stores.values()) { +final StateStore store = metadata.stateStore; + +try { +// buffer should be flushed to send all records to changelog +if (store instanceof TimeOrderedKeyValueBuffer) { Review comment: My plan is actually to remove the `flushCache` once we decoupled caching with emitting (see the TODO comment on the caller). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting
guozhangwang commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r460599173 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ## @@ -49,6 +61,31 @@ this.stateDirectory = stateDirectory; } +protected void initializeCheckpoint() { +// we will delete the local checkpoint file after registering the state stores and loading them into the +// state manager, therefore we should initialize the snapshot as empty to indicate over-write checkpoint needed +offsetSnapshotSinceLastFlush = Collections.emptyMap(); +} + +/** + * The following exceptions maybe thrown from the state manager flushing call + * + * @throws TaskMigratedException recoverable error sending changelog records that would cause the task to be removed + * @throws StreamsException fatal error when flushing the state store, for example sending changelog records failed + * or flushing state store get IO errors; such error should cause the thread to die + */ +protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { +final Map offsetSnapshot = stateMgr.changelogOffsets(); +if (StateManagerUtil.checkpointNeeded(enforceCheckpoint, offsetSnapshotSinceLastFlush, offsetSnapshot)) { +// since there's no written offsets we can checkpoint with empty map, +// and the state's current offset would be used to checkpoint +stateMgr.flush(); +stateMgr.checkpoint(Collections.emptyMap()); Review comment: I see what did you mean now. Will do. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] huxihx commented on a change in pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively
huxihx commented on a change in pull request #9081: URL: https://github.com/apache/kafka/pull/9081#discussion_r460608228 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ## @@ -687,7 +687,7 @@ public void sendOffsetsToTransaction(Map offs throwIfProducerClosed(); TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, groupMetadata); sender.wakeup(); -result.await(); +result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS); Review comment: javadoc for this method should be updated as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] huxihx commented on a change in pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively
huxihx commented on a change in pull request #9081: URL: https://github.com/apache/kafka/pull/9081#discussion_r460609876 ## File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala ## @@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness { TestUtils.waitUntilTrue(() => offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot read committed offset") } + @Test(expected = classOf[TimeoutException]) + def testSendOffsetsToTransactionTimeout(): Unit = { +val producer = createTransactionalProducer("transactionProducer", maxBlockMs = 1000) +producer.initTransactions() +producer.beginTransaction() +producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foo".getBytes, "bar".getBytes)) + +for (i <- 0 until servers.size) + killBroker(i) + +val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata]().asJava +offsets.put(new TopicPartition(topic1, 0), new OffsetAndMetadata(0)) +try { + producer.sendOffsetsToTransaction(offsets, "test-group") Review comment: Using `Map(new TopicPartition(topic1, 0) -> new OffsetAndMetadata(0)).asJava` is better. No need to import scala.collection.mutable package. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Mathieu1124 commented on pull request #9076: MINOR - fix typo
Mathieu1124 commented on pull request #9076: URL: https://github.com/apache/kafka/pull/9076#issuecomment-664081255 > I think it was grammatically correct as it stood before this change. This change, if it were to occur, would have to state `if the leader fails` (the current change says `if the leader fail`, which is incorrect grammar). So I believe this is a choice between two grammatically correct possibilities. I personally don't see the need for a change. Reasonable This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Mathieu1124 closed pull request #9076: MINOR - fix typo
Mathieu1124 closed pull request #9076: URL: https://github.com/apache/kafka/pull/9076 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting
guozhangwang commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r460621030 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -243,18 +242,24 @@ public void handleAssignment(final Map> activeTasks, for (final Task task : tasksToClose) { try { -if (task.isActive()) { -// Active tasks are revoked and suspended/committed during #handleRevocation -if (!task.state().equals(State.SUSPENDED)) { -log.error("Active task {} should be suspended prior to attempting to close but was in {}", - task.id(), task.state()); -throw new IllegalStateException("Active task " + task.id() + " should have been suspended"); -} -} else { -task.suspend(); -task.prepareCommit(); -task.postCommit(); +// Always try to first suspend and commit the task before closing it; +// some tasks may already be suspended which should be a no-op. +// +// Also since active tasks should already be suspended / committed and +// standby tasks should have no offsets to commit, we should expect nothing to commit +task.suspend(); + +final Map offsets = task.prepareCommit(); + +if (!offsets.isEmpty()) { +log.error("Task {} should has been committed prior to attempting to close, but it reports non-empty offsets {} to commit", Review comment: I've reordered in handleRevocation so that commit is called before suspension; also even if commit failed with TaskMigrated, we would still try to continue completing the suspension so in handleAssignment we should have all active tasks suspended or should be closed dirty. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9077: MINOR: remove NewTopic#NO_PARTITIONS and NewTopic#NO_REPLICATION_FACT…
abbccdda commented on pull request #9077: URL: https://github.com/apache/kafka/pull/9077#issuecomment-664098665 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9077: MINOR: remove NewTopic#NO_PARTITIONS and NewTopic#NO_REPLICATION_FACT…
abbccdda commented on pull request #9077: URL: https://github.com/apache/kafka/pull/9077#issuecomment-664098712 got 2/3 in previous run This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9049: MINOR: fix scala warnings
abbccdda commented on pull request #9049: URL: https://github.com/apache/kafka/pull/9049#issuecomment-664102269 Sg, will close the PR @ijuma This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda closed pull request #9049: MINOR: fix scala warnings
abbccdda closed pull request #9049: URL: https://github.com/apache/kafka/pull/9049 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9077: MINOR: remove NewTopic#NO_PARTITIONS and NewTopic#NO_REPLICATION_FACT…
chia7712 commented on pull request #9077: URL: https://github.com/apache/kafka/pull/9077#issuecomment-664103549 @abbccdda thanks for reviews! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang opened a new pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed [WIP]
guozhangwang opened a new pull request #9083: URL: https://github.com/apache/kafka/pull/9083 Should be reviewed after https://github.com/apache/kafka/pull/8964 is merged, in which we first commit (flush) then suspend. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
showuon commented on pull request #9029: URL: https://github.com/apache/kafka/pull/9029#issuecomment-664133647 hi @ning2008wisc , thanks for your suggestion. but I don't think it's good to merge all the 4 commits into 1. I think the commit history is a kind of records to keep why the author did this change at that time. Someone can always know why we did this change by checking the commit history or the PR records. Also, the reviewer can know which commits have reviewed, and which one is new added to address reviewer's previous comments (like the above comments I left to @mimaison ). I know there's some debate over that, but I think if this project(Kafka) doesn't have this rule, it should be fine to keep it as is. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang opened a new pull request #9084: MINOR: Preserve Kafka exception from RebalanceListener [Do Not Merge]
guozhangwang opened a new pull request #9084: URL: https://github.com/apache/kafka/pull/9084 Some of the rebalance listener may be implemented by Kafka as well, e.g. Connect and Streams, and if the exception thrown is actually a KafkaException, then we should not wrap it but directly throw the exception from the listener. Unit tests to be added; cc @abbccdda to review. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon edited a comment on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
showuon edited a comment on pull request #9029: URL: https://github.com/apache/kafka/pull/9029#issuecomment-664133647 hi @ning2008wisc , thanks for your suggestion. but I don't think it's good to merge all the 4 commits into 1. I think the commit history is a kind of records to keep why the author did this change at that time. Someone can always know why we did this change by checking the commit history or the PR records. Also, the reviewer can know which commits have reviewed, and which one is new added (like the above comments I left to @mimaison ). I know there's some debate over that, but I think if this project(Kafka) doesn't have this rule, it should be fine to keep it as is. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon edited a comment on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
showuon edited a comment on pull request #9029: URL: https://github.com/apache/kafka/pull/9029#issuecomment-664133647 hi @ning2008wisc , thanks for your suggestion. but I don't think it's good to merge all the 4 commits into 1. I think the commit history is a kind of records to keep why the author did this change at that time. Someone can always know why we did this change by checking the commit history or the PR records. Also, the reviewer can know which commits have reviewed, and which one is new added (like the above comments I left to @mimaison ). I know there's some debate over that, but I think if this project(Kafka) doesn't have this rule, I'd prefer to keep it as is. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9084: MINOR: Preserve Kafka exception from RebalanceListener [Do Not Merge]
abbccdda commented on a change in pull request #9084: URL: https://github.com/apache/kafka/pull/9084#discussion_r460671888 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -728,7 +728,11 @@ protected void onJoinPrepare(int generation, String memberId) { subscriptions.resetGroupSubscription(); if (exception != null) { -throw new KafkaException("User rebalance callback throws an error", exception); +if (exception instanceof KafkaException) { +throw (KafkaException) exception; Review comment: Why do we need this cast? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand
viktorsomogyi commented on pull request #4090: URL: https://github.com/apache/kafka/pull/4090#issuecomment-664175913 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…
viktorsomogyi commented on pull request #8730: URL: https://github.com/apache/kafka/pull/8730#issuecomment-664176258 @ijuma would you please review this or suggest who is the right committer to help with this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…
ijuma commented on pull request #8730: URL: https://github.com/apache/kafka/pull/8730#issuecomment-664180174 @mimaison, would you be able to review this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #4090: [KAFKA-6084] Propagate JSON parsing errors in ReassignPartitionsCommand
ijuma commented on a change in pull request #4090: URL: https://github.com/apache/kafka/pull/4090#discussion_r460711206 ## File path: core/src/test/scala/unit/kafka/utils/JsonTest.scala ## @@ -41,14 +42,23 @@ class JsonTest { val jnf = JsonNodeFactory.instance assertEquals(Json.parseFull("{}"), Some(JsonValue(new ObjectNode(jnf +assertEquals(Json.tryParseFull("{}"), Right(JsonValue(new ObjectNode(jnf +org.junit.Assert.assertThrows(classOf[IllegalArgumentException], () => Json.tryParseFull(null)) +org.junit.Assert.assertThrows(classOf[IllegalArgumentException], () => Json.tryParseBytes(null)) Review comment: Why can't we just say `assertThrows` here? ## File path: core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala ## @@ -30,7 +30,7 @@ import org.apache.kafka.common.{Node, TopicPartition, TopicPartitionInfo, TopicP import org.junit.Assert.{assertEquals, assertFalse, assertThrows, assertTrue} import org.junit.function.ThrowingRunnable import org.junit.rules.Timeout -import org.junit.{After, Assert, Before, Rule, Test} +import org.junit._ Review comment: We should not use a wildcard imports here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9067: MINOR: Streams integration tests should not call exit
ijuma commented on a change in pull request #9067: URL: https://github.com/apache/kafka/pull/9067#discussion_r460714125 ## File path: checkstyle/suppressions.xml ## @@ -17,8 +17,14 @@ files="(ApiMessageType).java|MessageDataGenerator.java"/> + + Review comment: It seems really brittle to have line numbers in the suppression. Could we: 1. Remove the `lines` part. 2. Give a name to the `Regexp` so that we only exclude the one related to `System.exit` (not sure if this is possible) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9067: MINOR: Streams integration tests should not call exit
ijuma commented on a change in pull request #9067: URL: https://github.com/apache/kafka/pull/9067#discussion_r460714547 ## File path: streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java ## @@ -153,8 +154,8 @@ public void run() { streams.start(); latch.await(); } catch (final Throwable e) { -System.exit(1); +Exit.exit(1); } -System.exit(0); +Exit.exit(0); Review comment: I think it was intentional not to use an internal class in demos that others may copy. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9067: MINOR: Streams integration tests should not call exit
ijuma commented on a change in pull request #9067: URL: https://github.com/apache/kafka/pull/9067#discussion_r460714731 ## File path: tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java ## @@ -241,7 +240,7 @@ public static VerifiableProducer createFromArgs(ArgumentParser parser, String[] } else { parser.printHelp(); // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0. -System.exit(0); +Exit.exit(0); Review comment: Did you read the comment immediately above this line? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9067: MINOR: Streams integration tests should not call exit
ijuma commented on a change in pull request #9067: URL: https://github.com/apache/kafka/pull/9067#discussion_r460714731 ## File path: tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java ## @@ -241,7 +240,7 @@ public static VerifiableProducer createFromArgs(ArgumentParser parser, String[] } else { parser.printHelp(); // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0. -System.exit(0); +Exit.exit(0); Review comment: Did you read the comment immediately above this line? Only applies to `VerifiableConsumer`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9067: MINOR: Streams integration tests should not call exit
ijuma commented on a change in pull request #9067: URL: https://github.com/apache/kafka/pull/9067#discussion_r460714731 ## File path: tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java ## @@ -241,7 +240,7 @@ public static VerifiableProducer createFromArgs(ArgumentParser parser, String[] } else { parser.printHelp(); // Can't use `Exit.exit` here because it didn't exist until 0.11.0.0. -System.exit(0); +Exit.exit(0); Review comment: Did you read the comment immediately above this line? Also applies to `VerifiableConsumer`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8936: KAFKA-10207: Fixed padded timeindex causing premature data deletion
ijuma commented on pull request #8936: URL: https://github.com/apache/kafka/pull/8936#issuecomment-664201898 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org