[GitHub] [kafka] tombentley commented on a change in pull request #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy
tombentley commented on a change in pull request #8417: URL: https://github.com/apache/kafka/pull/8417#discussion_r412754269 ## File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java ## @@ -57,6 +59,10 @@ public ByteBuffer serialize(ApiKeys apiKey, short version, int correlationId) { return Collections.singletonMap(error, 1); } +protected Map errorCounts(Stream errors) { +return errors.collect(Collectors.groupingBy(e -> e, Collectors.summingInt(e -> 1))); +} + protected Map errorCounts(Collection errors) { Review comment: @chia7712 you're right, but the only two remaining callers of this method are for RPCs which haven't been converted to the message generator. There's little benefit to changing them when there are already PRs for converting those RPCs, and I'm planning to remove this method entirely when those PRs have been merged. I guess I could mark this method as `@Deprecated`. Relatedly there's only a single caller of the `apiErrorCounts(Map errors)` method which is also for an not-yet-converted RPC with a PR. If this gets merged first I'll be able to remove `apiErrorCounts(Map errors)` in that PR. ## File path: clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java ## @@ -52,10 +51,9 @@ protected Struct toStruct(short version) { @Override public Map errorCounts() { Map counts = new HashMap<>(); Review comment: @chia7712 what I've tried to do in this PR so far is: * Change `for` stmt + updateErrorCounts to use `forEach` consistently * Change calls to `errorCounts(Collection)` to `errorCounts(Stream)` I've not tried to change all code to use either `forEach` or `errorCounts(Stream)`. Obviously we could do that, but @ijuma seems happy enough with continuing to have these two ways to do it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs
dajac commented on a change in pull request #8311: URL: https://github.com/apache/kafka/pull/8311#discussion_r412287554 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -2207,21 +2212,33 @@ public AlterReplicaLogDirsResult alterReplicaLogDirs(Map()); -Map> replicaAssignmentByBroker = new HashMap<>(); +Map replicaAssignmentByBroker = new HashMap<>(); for (Map.Entry entry: replicaAssignment.entrySet()) { TopicPartitionReplica replica = entry.getKey(); String logDir = entry.getValue(); int brokerId = replica.brokerId(); TopicPartition topicPartition = new TopicPartition(replica.topic(), replica.partition()); Review comment: `topicPartition` is not used except for getting the topic and the partition above. `replica.topic()` and `replica.partition()` could be directly used instead. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -2207,21 +2212,33 @@ public AlterReplicaLogDirsResult alterReplicaLogDirs(Map()); -Map> replicaAssignmentByBroker = new HashMap<>(); +Map replicaAssignmentByBroker = new HashMap<>(); for (Map.Entry entry: replicaAssignment.entrySet()) { TopicPartitionReplica replica = entry.getKey(); String logDir = entry.getValue(); int brokerId = replica.brokerId(); TopicPartition topicPartition = new TopicPartition(replica.topic(), replica.partition()); -if (!replicaAssignmentByBroker.containsKey(brokerId)) -replicaAssignmentByBroker.put(brokerId, new HashMap<>()); -replicaAssignmentByBroker.get(brokerId).put(topicPartition, logDir); +AlterReplicaLogDirsRequestData value = replicaAssignmentByBroker.computeIfAbsent(brokerId, Review comment: nit: Could we rename `value` to something like `alterReplicaLogDirs`? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -2207,21 +2212,33 @@ public AlterReplicaLogDirsResult alterReplicaLogDirs(Map()); -Map> replicaAssignmentByBroker = new HashMap<>(); +Map replicaAssignmentByBroker = new HashMap<>(); for (Map.Entry entry: replicaAssignment.entrySet()) { TopicPartitionReplica replica = entry.getKey(); String logDir = entry.getValue(); int brokerId = replica.brokerId(); TopicPartition topicPartition = new TopicPartition(replica.topic(), replica.partition()); -if (!replicaAssignmentByBroker.containsKey(brokerId)) -replicaAssignmentByBroker.put(brokerId, new HashMap<>()); -replicaAssignmentByBroker.get(brokerId).put(topicPartition, logDir); +AlterReplicaLogDirsRequestData value = replicaAssignmentByBroker.computeIfAbsent(brokerId, +key -> new AlterReplicaLogDirsRequestData()); +AlterReplicaLogDir alterReplicaLogDir = value.dirs().find(logDir); +if (alterReplicaLogDir == null) { +alterReplicaLogDir = new AlterReplicaLogDir(); +alterReplicaLogDir.setPath(logDir); +value.dirs().add(alterReplicaLogDir); +} +AlterReplicaLogDirTopic alterReplicaLogDirTopic = alterReplicaLogDir.topics().find(topicPartition.topic()); +if (alterReplicaLogDirTopic == null) { +alterReplicaLogDirTopic = new AlterReplicaLogDirTopic(); +alterReplicaLogDir.topics().add(alterReplicaLogDirTopic); +} +alterReplicaLogDirTopic.setName(topicPartition.topic()) Review comment: `setName` could be done only once within the if statement. ## File path: clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java ## @@ -17,122 +17,53 @@ package org.apache.kafka.common.requests; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.utils.CollectionUtils; - import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; -import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; -import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; +import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData; +import org.apache.kafka
[GitHub] [kafka] dajac commented on a change in pull request #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch
dajac commented on a change in pull request #8509: URL: https://github.com/apache/kafka/pull/8509#discussion_r412807377 ## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ## @@ -1553,6 +1552,179 @@ class KafkaApisTest { assertEquals(Errors.INVALID_REQUEST, response.error) } + @Test + def testUpdateMetadataRequestWithCurrentBrokerEpoch(): Unit = { +val currentBrokerEpoch = 1239875L +testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE) + } + + @Test + def testUpdateMetadataRequestWithNewerBrokerEpochIsValid(): Unit = { +val currentBrokerEpoch = 1239875L +testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE) + } + + @Test + def testUpdateMetadataRequestWithStaleBrokerEpochIsRejected(): Unit = { +val currentBrokerEpoch = 1239875L +testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH) + } + + def testUpdateMetadataRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = { +val updateMetadataRequest = createBasicMetadataRequest("topicA", 1, brokerEpochInRequest) +val request = buildRequest(updateMetadataRequest) + +val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture() + +EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch) +EasyMock.expect(replicaManager.maybeUpdateMetadataCache( + EasyMock.eq(request.context.correlationId), + EasyMock.anyObject() +)).andStubReturn( + Seq() +) + + EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) +EasyMock.replay(replicaManager, controller, requestChannel) + +createKafkaApis().handleUpdateMetadataRequest(request) +val updateMetadataResponse = readResponse(ApiKeys.UPDATE_METADATA, updateMetadataRequest, capturedResponse) + .asInstanceOf[UpdateMetadataResponse] +assertEquals(expectedError, updateMetadataResponse.error()) +EasyMock.verify(replicaManager) + } + + @Test + def testLeaderAndIsrRequestWithCurrentBrokerEpoch(): Unit = { +val currentBrokerEpoch = 1239875L +testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE) + } + + @Test + def testLeaderAndIsrRequestWithNewerBrokerEpochIsValid(): Unit = { +val currentBrokerEpoch = 1239875L +testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE) + } + + @Test + def testLeaderAndIsrRequestWithStaleBrokerEpochIsRejected(): Unit = { +val currentBrokerEpoch = 1239875L +testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH) + } + + def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = { +val controllerId = 2 +val controllerEpoch = 6 +val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture() +val partitionStates = Seq( + new LeaderAndIsrRequestData.LeaderAndIsrPartitionState() +.setTopicName("topicW") +.setPartitionIndex(1) +.setControllerEpoch(1) +.setLeader(0) +.setLeaderEpoch(1) +.setIsr(asList(0, 1)) +.setZkVersion(2) +.setReplicas(asList(0, 1, 2)) +.setIsNew(false) +).asJava +val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder( + ApiKeys.LEADER_AND_ISR.latestVersion, + controllerId, + controllerEpoch, + brokerEpochInRequest, + partitionStates, + asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091)) +).build() +val request = buildRequest(leaderAndIsrRequest) +val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData() + .setErrorCode(Errors.NONE.code) + .setPartitionErrors(asList())) + +EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch) +EasyMock.expect(replicaManager.becomeLeaderOrFollower( + EasyMock.eq(request.context.correlationId), + EasyMock.anyObject(), + EasyMock.anyObject() +)).andStubReturn( + response +) + + EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) +EasyMock.replay(replicaManager, controller, requestChannel) + +createKafkaApis().handleLeaderAndIsrRequest(request) +val leaderAndIsrResponse = readResponse(ApiKeys.LEADER_AND_ISR, leaderAndIsrRequest, capturedResponse) + .asInstanceOf[LeaderAndIsrResponse] +assertEquals(expectedError, leaderAndIsrResponse.error()) +EasyMock.verify(replicaManager) + } + + @Test + def testStopReplicaRequestWithCurrentBrokerEpoch(): Unit = { +val currentBrokerEpoch = 1239875L +testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE) Review comment: This and the two above should use `testStopReplicaRequest` instead of `testUpdateMetadataRequest`. ## File path: core/src/test/sc
[jira] [Created] (KAFKA-9902) java client api can not completely take out the kafka-consumer-groups.sh output of information
startjava created KAFKA-9902: Summary: java client api can not completely take out the kafka-consumer-groups.sh output of information Key: KAFKA-9902 URL: https://issues.apache.org/jira/browse/KAFKA-9902 Project: Kafka Issue Type: Test Reporter: startjava Why the java client api is not with: .kafka-consumer-groups.sh --bootstrap-server localhost:9081 --describe --group test The method corresponding to the command, I can not get together GROUP, TOPIC, PARTITION, CURRENT-OFFSET, LOG-END-OFFSET, LAG, CONSUMER-ID, HOST, CLIENT-ID these columns of information, search materials know need to be taken separately, which makes our developers very troublesome, and this feature is very common. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] leonardge commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR
leonardge commented on a change in pull request #8524: URL: https://github.com/apache/kafka/pull/8524#discussion_r412857024 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1066,6 +1066,7 @@ class KafkaController(val config: KafkaConfig, // do this check only if the broker is live and there are no partitions being reassigned currently // and preferred replica election is not in progress val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) && + controllerContext.partitionLeadershipInfo(tp).leaderAndIsr.isr.contains(leaderBroker) && Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] harshitshah4 edited a comment on issue #8512: KAFKA-6024: Consider moving validation in KafkaConsumer ahead of call to acquireAndEnsureOpen()
harshitshah4 edited a comment on issue #8512: URL: https://github.com/apache/kafka/pull/8512#issuecomment-616683601 @omkreddy @hachikuji can you review the changes ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] harshitshah4 removed a comment on issue #8512: KAFKA-6024: Consider moving validation in KafkaConsumer ahead of call to acquireAndEnsureOpen()
harshitshah4 removed a comment on issue #8512: URL: https://github.com/apache/kafka/pull/8512#issuecomment-617756461 I feel that there are more methods that may require similar changes . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] harshitshah4 commented on issue #8512: KAFKA-6024: Consider moving validation in KafkaConsumer ahead of call to acquireAndEnsureOpen()
harshitshah4 commented on issue #8512: URL: https://github.com/apache/kafka/pull/8512#issuecomment-617756461 I feel that there are more methods that may require similar changes . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on issue #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs
tombentley commented on issue #8311: URL: https://github.com/apache/kafka/pull/8311#issuecomment-617764518 Thanks @dajac, there were some useful comments and good spots there. I've pushed some fixes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8530: KAFKA-9388: Refactor integration tests to always use different application ids
chia7712 commented on a change in pull request #8530: URL: https://github.com/apache/kafka/pull/8530#discussion_r412959693 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java ## @@ -119,7 +124,7 @@ public void shouldRestoreStateFromSourceTopic() throws Exception { final AtomicInteger numReceived = new AtomicInteger(0); final StreamsBuilder builder = new StreamsBuilder(); -final Properties props = props(APPID); +final Properties props = props(APPID + name.getMethodName()); Review comment: the following app id should be changed as well. (I can't add comment to the line for this PR) ```scala private void setCommittedOffset(final String topic, final int limitDelta) { final Properties consumerConfig = new Properties(); final Properties consumerConfig = new Properties(); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APPID); consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APPID); ``` ## File path: streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java ## @@ -107,6 +109,9 @@ private Properties props(final String applicationId) { return streamsConfiguration; } +@Rule +public TestName name = new TestName(); Review comment: the changelog topics are created by ```BeforeClass``` so it seems we need to add ```Before``` to create changelog for different *method name*. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on issue #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR
stanislavkozlovski commented on issue #8524: URL: https://github.com/apache/kafka/pull/8524#issuecomment-617770854 After discussing online, we figured there isn't an easy way to test this scenario. There's significant work to be done to make KafkaController unit test-able Good catch on the `testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionEnabled`! I think this change looks good. Let's wait for @hachikuji or @junrao to take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on issue #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
ijuma commented on issue #8517: URL: https://github.com/apache/kafka/pull/8517#issuecomment-617772737 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy
ijuma commented on a change in pull request #8417: URL: https://github.com/apache/kafka/pull/8417#discussion_r412973974 ## File path: clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java ## @@ -66,14 +64,12 @@ public int throttleTimeMs() { public Map errorCounts() { Map counts = new HashMap<>(); Errors topLevelErr = Errors.forCode(data.errorCode()); -counts.put(topLevelErr, counts.getOrDefault(topLevelErr, 0) + 1); +updateErrorCounts(counts, topLevelErr); -for (ReassignableTopicResponse topicResponse : data.responses()) { -for (ReassignablePartitionResponse partitionResponse : topicResponse.partitions()) { -Errors error = Errors.forCode(partitionResponse.errorCode()); -counts.put(error, counts.getOrDefault(error, 0) + 1); -} -} +data.responses().forEach(topicResponse -> +topicResponse.partitions().forEach(partitionResponse -> +updateErrorCounts(counts, Errors.forCode(partitionResponse.errorCode())) Review comment: Indent. ## File path: clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java ## @@ -66,14 +64,12 @@ public int throttleTimeMs() { public Map errorCounts() { Map counts = new HashMap<>(); Errors topLevelErr = Errors.forCode(data.errorCode()); Review comment: Maybe inline. ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java ## @@ -95,13 +94,8 @@ public OffsetCommitResponseData data() { @Override public Map errorCounts() { -List errors = new ArrayList<>(); -for (OffsetCommitResponseTopic topic : data.topics()) { -for (OffsetCommitResponsePartition partition : topic.partitions()) { -errors.add(Errors.forCode(partition.errorCode())); -} -} -return errorCounts(errors); +return errorCounts(data.topics().stream().flatMap(topicResult -> topicResult.partitions().stream()) Review comment: Is there a reason why we are not doing the map within the flatMap? ## File path: clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java ## @@ -91,7 +91,9 @@ public int throttleTimeMs() { @Override public Map errorCounts() { -return errorCounts(errors().values()); +return errorCounts(data.topics().stream() +.flatMap(topic -> topic.partitions().stream()) Review comment: Any reason why we don't do the map inside the flatMap? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9652) Throttle time metric needs to be updated for KIP-219
[ https://issues.apache.org/jira/browse/KAFKA-9652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089674#comment-17089674 ] Ismael Juma commented on KAFKA-9652: [~agam] Are you working on this? While looking at another issue, I fixed it in a local branch. I can submit a PR if you haven't started on it. > Throttle time metric needs to be updated for KIP-219 > > > Key: KAFKA-9652 > URL: https://issues.apache.org/jira/browse/KAFKA-9652 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Agam Brahma >Priority: Major > > KIP-219 changed the throttling logic so that responses are returned > immediately. The logic for updating the throttle time in `RequestChannel` > appears to have not been updated to reflect this change and instead reflects > the old behavior where the timing is based on the time between remote > completion and response completion. This means the metric will pretty much > always show negligible throttling. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9589) LogValidatorTest#testLogAppendTimeNonCompressedV2 is not executed and does not pass
[ https://issues.apache.org/jira/browse/KAFKA-9589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang Ge reassigned KAFKA-9589: -- Assignee: Wang Ge > LogValidatorTest#testLogAppendTimeNonCompressedV2 is not executed and does > not pass > --- > > Key: KAFKA-9589 > URL: https://issues.apache.org/jira/browse/KAFKA-9589 > Project: Kafka > Issue Type: Bug >Reporter: Stanislav Kozlovski >Assignee: Wang Ge >Priority: Major > > The LogValidatorTest#testLogAppendTimeNonCompressedV2 test does not execute > because it's missing a '@Test' annotation. > When executed locally, it fails with the following error: > {code:java} > java.lang.AssertionError: The offset of max timestamp should be 0 > Expected :0 > Actual :2 > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9903) kafka ShutdownableThread judge thread isRuning status has some bug
shilin Lu created KAFKA-9903: Summary: kafka ShutdownableThread judge thread isRuning status has some bug Key: KAFKA-9903 URL: https://issues.apache.org/jira/browse/KAFKA-9903 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.3.1 Reporter: shilin Lu Attachments: image-2020-04-22-21-28-03-154.png h2. 1.bug {code:java} override def run(): Unit = { isStarted = true info("Starting") try { while (isRunning) doWork() } catch { case e: FatalExitError => shutdownInitiated.countDown() shutdownComplete.countDown() info("Stopped") Exit.exit(e.statusCode()) case e: Throwable => if (isRunning) error("Error due to", e) } finally { shutdownInitiated.countDown() shutdownComplete.countDown() } info("Stopped") } def isRunning: Boolean = { shutdownInitiated.getCount() != 0 }{code} 1.when replicaThread has exception which is not fatalExitError, the thread will exit,and run finally logic(countdown the shutdownComplete conutdownLatch),but shutdownInitiated is not be countdown. 2.with 1, shutdownInitiated is just not countdown, its value is 1, isRunning logic just judge thread isRuning through shutdownInitiated != 0, so through this method to judge thread status is wrong. 3.isRunning method is used in shutdownIdleFetcherThreads, processFetchRequest, controller request send and oher else, maybe cause thread can't be remove and something can not be done h2. 2.bugfix Just like the following code,countdown shutdownInitiated in finally logic {code:java} override def run(): Unit = { isStarted = true info("Starting") try { while (isRunning) doWork() } catch { case e: FatalExitError => shutdownInitiated.countDown() shutdownComplete.countDown() info("Stopped") Exit.exit(e.statusCode()) case e: Throwable => if (isRunning) error("Error due to", e) } finally { shutdownInitiated.countDown() shutdownComplete.countDown() } info("Stopped") } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9903) kafka ShutdownableThread judge thread isRuning status has some bug
[ https://issues.apache.org/jira/browse/KAFKA-9903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089688#comment-17089688 ] shilin Lu commented on KAFKA-9903: -- [~guozhang] [~ijuma] please take a look at this issue ,thanks. if it make sense, i will create a pr for this issue > kafka ShutdownableThread judge thread isRuning status has some bug > --- > > Key: KAFKA-9903 > URL: https://issues.apache.org/jira/browse/KAFKA-9903 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.3.1 >Reporter: shilin Lu >Priority: Major > Attachments: image-2020-04-22-21-28-03-154.png > > > h2. 1.bug > {code:java} > override def run(): Unit = { > isStarted = true > info("Starting") > try { > while (isRunning) > doWork() > } catch { > case e: FatalExitError => > shutdownInitiated.countDown() > shutdownComplete.countDown() > info("Stopped") > Exit.exit(e.statusCode()) > case e: Throwable => > if (isRunning) > error("Error due to", e) > } finally { > shutdownInitiated.countDown() > shutdownComplete.countDown() > } > info("Stopped") > } > def isRunning: Boolean = { > shutdownInitiated.getCount() != 0 > }{code} > 1.when replicaThread has exception which is not fatalExitError, the thread > will exit,and run finally logic(countdown the shutdownComplete > conutdownLatch),but shutdownInitiated is not be countdown. > 2.with 1, shutdownInitiated is just not countdown, its value is 1, isRunning > logic just judge thread isRuning through shutdownInitiated != 0, so through > this method to judge thread status is wrong. > 3.isRunning method is used in shutdownIdleFetcherThreads, > processFetchRequest, controller request send and oher else, maybe cause > thread can't be remove and something can not be done > h2. 2.bugfix > Just like the following code,countdown shutdownInitiated in finally logic > > {code:java} > override def run(): Unit = { > isStarted = true > info("Starting") > try { > while (isRunning) > doWork() > } catch { > case e: FatalExitError => > shutdownInitiated.countDown() > shutdownComplete.countDown() > info("Stopped") > Exit.exit(e.statusCode()) > case e: Throwable => > if (isRunning) > error("Error due to", e) > } finally { > shutdownInitiated.countDown() > shutdownComplete.countDown() > } > info("Stopped") > } > {code} > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] tombentley commented on a change in pull request #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy
tombentley commented on a change in pull request #8417: URL: https://github.com/apache/kafka/pull/8417#discussion_r413002568 ## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java ## @@ -95,13 +94,8 @@ public OffsetCommitResponseData data() { @Override public Map errorCounts() { -List errors = new ArrayList<>(); -for (OffsetCommitResponseTopic topic : data.topics()) { -for (OffsetCommitResponsePartition partition : topic.partitions()) { -errors.add(Errors.forCode(partition.errorCode())); -} -} -return errorCounts(errors); +return errorCounts(data.topics().stream().flatMap(topicResult -> topicResult.partitions().stream()) Review comment: No _good_ reason. Good spot. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on issue #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy
tombentley commented on issue #8417: URL: https://github.com/apache/kafka/pull/8417#issuecomment-617793326 @ijuma fixed your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a change in pull request #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy
tombentley commented on a change in pull request #8417: URL: https://github.com/apache/kafka/pull/8417#discussion_r413002862 ## File path: clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java ## @@ -91,7 +91,9 @@ public int throttleTimeMs() { @Override public Map errorCounts() { -return errorCounts(errors().values()); +return errorCounts(data.topics().stream() +.flatMap(topic -> topic.partitions().stream()) Review comment: As above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on issue #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy
ijuma commented on issue #8417: URL: https://github.com/apache/kafka/pull/8417#issuecomment-617797442 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on issue #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy
ijuma commented on issue #8417: URL: https://github.com/apache/kafka/pull/8417#issuecomment-617798142 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on issue #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy
ijuma commented on issue #8417: URL: https://github.com/apache/kafka/pull/8417#issuecomment-617798318 @tombentley can you please update the PR description to summarize the perf testing you did? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on issue #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy
tombentley commented on issue #8417: URL: https://github.com/apache/kafka/pull/8417#issuecomment-617802853 @ijuma done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on issue #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy
ijuma commented on issue #8417: URL: https://github.com/apache/kafka/pull/8417#issuecomment-617808555 Perfect, thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
lbradstreet commented on a change in pull request #8517: URL: https://github.com/apache/kafka/pull/8517#discussion_r413074912 ## File path: core/src/main/scala/kafka/utils/DelayedItem.scala ## @@ -21,24 +21,19 @@ import java.util.concurrent._ import org.apache.kafka.common.utils.Time -import scala.math._ +class DelayedItem(val delayMs: Long, val time: Time) extends Logging { + def this(delayMs: Long) = this(delayMs, Time.SYSTEM) Review comment: We can, I was just trying to avoid the replica fetcher having to pass Time.SYSTEM in everywhere. If you prefer that I can change it. Is there any downside what I did though? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
ijuma commented on a change in pull request #8517: URL: https://github.com/apache/kafka/pull/8517#discussion_r413076253 ## File path: core/src/main/scala/kafka/utils/DelayedItem.scala ## @@ -21,24 +21,19 @@ import java.util.concurrent._ import org.apache.kafka.common.utils.Time -import scala.math._ +class DelayedItem(val delayMs: Long, val time: Time) extends Logging { + def this(delayMs: Long) = this(delayMs, Time.SYSTEM) Review comment: It should pass the actual `time` instance so that we can mock if we want: ```class ReplicaFetcherThread(name: String, fetcherId: Int, sourceBroker: BrokerEndPoint, brokerConfig: KafkaConfig, failedPartitions: FailedPartitions, replicaMgr: ReplicaManager, metrics: Metrics, time: Time, ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
ijuma commented on a change in pull request #8517: URL: https://github.com/apache/kafka/pull/8517#discussion_r413076253 ## File path: core/src/main/scala/kafka/utils/DelayedItem.scala ## @@ -21,24 +21,19 @@ import java.util.concurrent._ import org.apache.kafka.common.utils.Time -import scala.math._ +class DelayedItem(val delayMs: Long, val time: Time) extends Logging { + def this(delayMs: Long) = this(delayMs, Time.SYSTEM) Review comment: It should pass the actual `time` instance so that we can mock if we want: ```scala class ReplicaFetcherThread(name: String, fetcherId: Int, sourceBroker: BrokerEndPoint, brokerConfig: KafkaConfig, failedPartitions: FailedPartitions, replicaMgr: ReplicaManager, metrics: Metrics, time: Time, ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
ijuma commented on a change in pull request #8517: URL: https://github.com/apache/kafka/pull/8517#discussion_r413076253 ## File path: core/src/main/scala/kafka/utils/DelayedItem.scala ## @@ -21,24 +21,19 @@ import java.util.concurrent._ import org.apache.kafka.common.utils.Time -import scala.math._ +class DelayedItem(val delayMs: Long, val time: Time) extends Logging { + def this(delayMs: Long) = this(delayMs, Time.SYSTEM) Review comment: It should pass the actual `time` instance so that we can mock if we want. It has one in its constructor already: ```scala class ReplicaFetcherThread(name: String, fetcherId: Int, sourceBroker: BrokerEndPoint, brokerConfig: KafkaConfig, failedPartitions: FailedPartitions, replicaMgr: ReplicaManager, metrics: Metrics, time: Time, ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
ijuma commented on a change in pull request #8517: URL: https://github.com/apache/kafka/pull/8517#discussion_r413077069 ## File path: core/src/main/scala/kafka/utils/DelayedItem.scala ## @@ -21,24 +21,19 @@ import java.util.concurrent._ import org.apache.kafka.common.utils.Time -import scala.math._ +class DelayedItem(val delayMs: Long, val time: Time) extends Logging { Review comment: Does `time` need to be a `val`? ## File path: core/src/main/scala/kafka/utils/DelayedItem.scala ## @@ -21,24 +21,19 @@ import java.util.concurrent._ import org.apache.kafka.common.utils.Time -import scala.math._ +class DelayedItem(val delayMs: Long, val time: Time) extends Logging { Review comment: Does `time` need to be a public `val`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
lbradstreet commented on a change in pull request #8517: URL: https://github.com/apache/kafka/pull/8517#discussion_r413085641 ## File path: core/src/main/scala/kafka/utils/DelayedItem.scala ## @@ -21,24 +21,19 @@ import java.util.concurrent._ import org.apache.kafka.common.utils.Time -import scala.math._ +class DelayedItem(val delayMs: Long, val time: Time) extends Logging { Review comment: Good point. I'll make it private. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
lbradstreet commented on a change in pull request #8517: URL: https://github.com/apache/kafka/pull/8517#discussion_r413086046 ## File path: core/src/main/scala/kafka/utils/DelayedItem.scala ## @@ -21,24 +21,19 @@ import java.util.concurrent._ import org.apache.kafka.common.utils.Time -import scala.math._ +class DelayedItem(val delayMs: Long, val time: Time) extends Logging { + def this(delayMs: Long) = this(delayMs, Time.SYSTEM) Review comment: Thanks, yes, I was just coming back to say your suggestion was better for testability reasons. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch
hachikuji commented on a change in pull request #8509: URL: https://github.com/apache/kafka/pull/8509#discussion_r413091037 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -3084,12 +3084,7 @@ class KafkaApis(val requestChannel: RequestChannel, // Broker epoch in LeaderAndIsr/UpdateMetadata/StopReplica request is unknown // if the controller hasn't been upgraded to use KIP-380 if (brokerEpochInRequest == AbstractControlRequest.UNKNOWN_BROKER_EPOCH) false -else { - val curBrokerEpoch = controller.brokerEpoch - if (brokerEpochInRequest < curBrokerEpoch) true - else if (brokerEpochInRequest == curBrokerEpoch) false - else throw new IllegalStateException(s"Epoch $brokerEpochInRequest larger than current broker epoch $curBrokerEpoch") -} +else brokerEpochInRequest < controller.brokerEpoch Review comment: Short comment here may be helpful about the case where the controller sees the epoch bump first. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-9904) Use ThreadLocalConcurrent to Replace Random
David Mollitor created KAFKA-9904: - Summary: Use ThreadLocalConcurrent to Replace Random Key: KAFKA-9904 URL: https://issues.apache.org/jira/browse/KAFKA-9904 Project: Kafka Issue Type: Improvement Reporter: David Mollitor https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadLocalRandom.html -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] belugabehr opened a new pull request #8531: KAFAKA-9904: Use ThreadLocalConcurrent to Replace Random
belugabehr opened a new pull request #8531: URL: https://github.com/apache/kafka/pull/8531 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request #8532: HOTFIX: Fix broker bounce system tests
cadonna opened a new pull request #8532: URL: https://github.com/apache/kafka/pull/8532 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on issue #8532: HOTFIX: Fix broker bounce system tests
cadonna commented on issue #8532: URL: https://github.com/apache/kafka/pull/8532#issuecomment-617869532 Call for review: @mjsax @abbccdda This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8532: HOTFIX: Fix broker bounce system tests
cadonna commented on a change in pull request #8532: URL: https://github.com/apache/kafka/pull/8532#discussion_r413110487 ## File path: tests/kafkatest/tests/streams/streams_broker_bounce_test.py ## @@ -164,7 +164,7 @@ def setup_system(self, start_processor=True, num_threads=3): # Start test harness self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) -self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, num_threads) +self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, "at_least_once", num_threads) Review comment: Is it enough to specify the processing guarantee as `at_least_once` here or do you want also for this test to include all processing guarantees in the test matrix? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8532: HOTFIX: Fix broker bounce system tests
cadonna commented on a change in pull request #8532: URL: https://github.com/apache/kafka/pull/8532#discussion_r413116285 ## File path: tests/kafkatest/tests/streams/streams_broker_bounce_test.py ## @@ -164,7 +164,7 @@ def setup_system(self, start_processor=True, num_threads=3): # Start test harness self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) -self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, num_threads) +self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, "at_least_once", num_threads) Review comment: On a different note, now that the processing guarantee can be passed to the service, do we still need `StreamsEosTestJobRunnerService` and `StreamsComplexEosTestJobRunnerService`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on issue #8532: HOTFIX: Fix broker bounce system tests
cadonna commented on issue #8532: URL: https://github.com/apache/kafka/pull/8532#issuecomment-617883911 Streams system tests run: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3914 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] leonardge opened a new pull request #8533: Fixed bug in log validator tests.
leonardge opened a new pull request #8533: URL: https://github.com/apache/kafka/pull/8533 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on issue #8454: KAFKA-9844; Maximum number of members within a group is not always enforced due to a race condition in join group
dajac commented on issue #8454: URL: https://github.com/apache/kafka/pull/8454#issuecomment-617907578 @hachikuji Could we get this one merged? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps
[ https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089869#comment-17089869 ] Matthias J. Sax commented on KAFKA-6817: [~bob-barrett] – do you think KIP-360 fully addresses this issue? > UnknownProducerIdException when writing messages with old timestamps > > > Key: KAFKA-6817 > URL: https://issues.apache.org/jira/browse/KAFKA-6817 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.1.0 >Reporter: Odin Standal >Priority: Major > > We are seeing the following exception in our Kafka application: > {code:java} > ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer > due to the following error: org.apache.kafka.streams.errors.StreamsException: > task [0_0] Abort sending since an error caught with a previous record (key > 22 value some-value timestamp 1519200902670) to topic > exactly-once-test-topic- v2 due to This exception is raised by the broker if > it could not locate the producer metadata associated with the producerId in > question. This could happen if, for instance, the producer's records were > deleted because their retention time had elapsed. Once the last records of > the producerId are removed, the producer's metadata is removed from the > broker, and future appends by the producer will return this exception. at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481) > at > org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) > at > org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at > java.lang.Thread.run(Thread.java:748) Caused by: > org.apache.kafka.common.errors.UnknownProducerIdException > {code} > We discovered this error when we had the need to reprocess old messages. See > more details on > [Stackoverflow|https://stackoverflow.com/questions/49872827/unknownproduceridexception-in-kafka-streams-when-enabling-exactly-once?noredirect=1#comment86901917_49872827] > We have reproduced the error with a smaller example application. The error > occurs after 10 minutes of producing messages that have old timestamps (type > 1 year old). The topic we are writing to has a retention.ms set to 1 year so > we are expecting the messages to stay there. > After digging through the ProducerStateManager-code in the Kafka source code > we have a theory of what might be wrong. > The ProducerStateManager.removeExpiredProducers() seems to remove producers > from memory erroneously when processing records which are older than the > maxProducerIdExpirationMs (coming from the `transactional.id.expiration.ms` > configuration), which is set by default to 7 days. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #8530: KAFKA-9388: Refactor integration tests to always use different application ids
guozhangwang commented on a change in pull request #8530: URL: https://github.com/apache/kafka/pull/8530#discussion_r413168976 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java ## @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.integration; - -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.TestInputTopic; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyTestDriver; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.utils.UniqueTopicSerdeScope; -import org.apache.kafka.test.TestUtils; -import org.junit.Test; - -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; - -import static org.apache.kafka.common.utils.Utils.mkEntry; -import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.common.utils.Utils.mkProperties; -import static org.apache.kafka.common.utils.Utils.mkSet; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; - - -public class KTableKTableForeignKeyJoinPseudoTopicTest { Review comment: I moved this test into the non-integration unit tests. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java ## @@ -80,14 +83,18 @@ public static void setUpBeforeAllTests() throws Exception { STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); +STREAMS_CONFIG.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); Review comment: For this test, we did need to reuse the created topics and hence I reduced the session / heartbeat timeout so that their rebalance timeout could be much smaller. ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java ## @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.integration; Review comment: I merged this test with another as a non-integration test, since it uses TTD and does not really creates a cluster :) ## File path: streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java ## @@ -104,6 +104,8 @@ private Properties props(final String applicationId) { streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); +streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); Review comment: Similar here, for this test I reduced the session / heartbeat timeout so that their rebalance timeout could be much smaller. I think it is simpler than changing a bunch of changelogs / source / sink / and app ids. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vinothchandar opened a new pull request #8534: KAFKA-9512: Backport Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration
vinothchandar opened a new pull request #8534: URL: https://github.com/apache/kafka/pull/8534 - Added additional synchronization and increased timeouts to handle flakiness - Added some pre-cautionary retries when trying to obtain lag map *More detailed description of your change, Backport of PR #8076 *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vinothchandar commented on issue #8462: KAFKA-9846: Filter active tasks for running state in KafkaStreams#allLocalStorePartitionLags()
vinothchandar commented on issue #8462: URL: https://github.com/apache/kafka/pull/8462#issuecomment-617931148 Opened a simple backport here #8534.. We can focus on that for fixing the test flakiness itself. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8495: MINOR: downgrade test should wait for ISR rejoin between rolls
abbccdda commented on a change in pull request #8495: URL: https://github.com/apache/kafka/pull/8495#discussion_r413199411 ## File path: tests/kafkatest/tests/core/downgrade_test.py ## @@ -26,10 +27,12 @@ from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, V_0_9_0_0, V_0_11_0_0, DEV_BRANCH, KafkaVersion class TestDowngrade(EndToEndTest): +PARTITIONS = 3 +REPLICATION_FACTOR = 3 TOPIC_CONFIG = { -"partitions": 3, -"replication-factor": 3, +"partitions": PARTITIONS, Review comment: Is this necessary to be defined as constants? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on issue #8462: KAFKA-9846: Filter active tasks for running state in KafkaStreams#allLocalStorePartitionLags()
guozhangwang commented on issue #8462: URL: https://github.com/apache/kafka/pull/8462#issuecomment-617948771 > Opened a simple backport here #8534.. We can focus on that for fixing the test flakiness itself. Sounds good, thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on issue #8534: KAFKA-9512: Backport Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration
guozhangwang commented on issue #8534: URL: https://github.com/apache/kafka/pull/8534#issuecomment-617949190 @vinothchandar this PR is still in the `work in progress` state, is that ready for review and merge? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vinothchandar commented on issue #8534: KAFKA-9512: Backport Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration
vinothchandar commented on issue #8534: URL: https://github.com/apache/kafka/pull/8534#issuecomment-617950145 Will remove the draft status in a little bit.. Running the test again on 2.5 branch many times to confirm flakiness is gone... It will be ready for review and merge then This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #8495: MINOR: downgrade test should wait for ISR rejoin between rolls
lbradstreet commented on a change in pull request #8495: URL: https://github.com/apache/kafka/pull/8495#discussion_r413216403 ## File path: tests/kafkatest/tests/core/downgrade_test.py ## @@ -26,10 +27,12 @@ from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, V_0_9_0_0, V_0_11_0_0, DEV_BRANCH, KafkaVersion class TestDowngrade(EndToEndTest): +PARTITIONS = 3 +REPLICATION_FACTOR = 3 TOPIC_CONFIG = { -"partitions": 3, -"replication-factor": 3, +"partitions": PARTITIONS, Review comment: Yes, see https://github.com/apache/kafka/pull/8495/files#diff-11673271221c04c0861f9c5e074a9783R80 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9903) kafka ShutdownableThread judge thread isRuning status has some bug
[ https://issues.apache.org/jira/browse/KAFKA-9903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089926#comment-17089926 ] Guozhang Wang commented on KAFKA-9903: -- Hello Shilin, Thanks for the report, and I think it is an issue indeed. However note that our `isThreadFailed: Boolean = isShutdownComplete && !isShutdownInitiated` actually is checking that the thread did not actively initiated a shutdown but has been shutdown, meaning it is shutdown due to an error, and hence we cannot simply countdown the shutdownInitiated in the finally block. Instead, I think a more appropriate fix would be to change the logic of `isRunning: Boolean = !isShutdownInitiated` to `isRunning: Boolean = !isShutdownInitiated && !isShutdownComplete`, so that if the thread terminates due to errors, the `isRunning` boolean can return false. WDYT? > kafka ShutdownableThread judge thread isRuning status has some bug > --- > > Key: KAFKA-9903 > URL: https://issues.apache.org/jira/browse/KAFKA-9903 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.3.1 >Reporter: shilin Lu >Priority: Major > Attachments: image-2020-04-22-21-28-03-154.png > > > h2. 1.bug > {code:java} > override def run(): Unit = { > isStarted = true > info("Starting") > try { > while (isRunning) > doWork() > } catch { > case e: FatalExitError => > shutdownInitiated.countDown() > shutdownComplete.countDown() > info("Stopped") > Exit.exit(e.statusCode()) > case e: Throwable => > if (isRunning) > error("Error due to", e) > } finally { > shutdownInitiated.countDown() > shutdownComplete.countDown() > } > info("Stopped") > } > def isRunning: Boolean = { > shutdownInitiated.getCount() != 0 > }{code} > 1.when replicaThread has exception which is not fatalExitError, the thread > will exit,and run finally logic(countdown the shutdownComplete > conutdownLatch),but shutdownInitiated is not be countdown. > 2.with 1, shutdownInitiated is just not countdown, its value is 1, isRunning > logic just judge thread isRuning through shutdownInitiated != 0, so through > this method to judge thread status is wrong. > 3.isRunning method is used in shutdownIdleFetcherThreads, > processFetchRequest, controller request send and oher else, maybe cause > thread can't be remove and something can not be done > h2. 2.bugfix > Just like the following code,countdown shutdownInitiated in finally logic > > {code:java} > override def run(): Unit = { > isStarted = true > info("Starting") > try { > while (isRunning) > doWork() > } catch { > case e: FatalExitError => > shutdownInitiated.countDown() > shutdownComplete.countDown() > info("Stopped") > Exit.exit(e.statusCode()) > case e: Throwable => > if (isRunning) > error("Error due to", e) > } finally { > shutdownInitiated.countDown() > shutdownComplete.countDown() > } > info("Stopped") > } > {code} > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9903) kafka ShutdownableThread judge thread isRuning status has some bug
[ https://issues.apache.org/jira/browse/KAFKA-9903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089936#comment-17089936 ] shilin Lu commented on KAFKA-9903: -- ok, i think your advice is make sense. i will create a pr for this issue, [~guozhang] can you assign this issue to me?thank you ! > kafka ShutdownableThread judge thread isRuning status has some bug > --- > > Key: KAFKA-9903 > URL: https://issues.apache.org/jira/browse/KAFKA-9903 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.3.1 >Reporter: shilin Lu >Priority: Major > Attachments: image-2020-04-22-21-28-03-154.png > > > h2. 1.bug > {code:java} > override def run(): Unit = { > isStarted = true > info("Starting") > try { > while (isRunning) > doWork() > } catch { > case e: FatalExitError => > shutdownInitiated.countDown() > shutdownComplete.countDown() > info("Stopped") > Exit.exit(e.statusCode()) > case e: Throwable => > if (isRunning) > error("Error due to", e) > } finally { > shutdownInitiated.countDown() > shutdownComplete.countDown() > } > info("Stopped") > } > def isRunning: Boolean = { > shutdownInitiated.getCount() != 0 > }{code} > 1.when replicaThread has exception which is not fatalExitError, the thread > will exit,and run finally logic(countdown the shutdownComplete > conutdownLatch),but shutdownInitiated is not be countdown. > 2.with 1, shutdownInitiated is just not countdown, its value is 1, isRunning > logic just judge thread isRuning through shutdownInitiated != 0, so through > this method to judge thread status is wrong. > 3.isRunning method is used in shutdownIdleFetcherThreads, > processFetchRequest, controller request send and oher else, maybe cause > thread can't be remove and something can not be done > h2. 2.bugfix > Just like the following code,countdown shutdownInitiated in finally logic > > {code:java} > override def run(): Unit = { > isStarted = true > info("Starting") > try { > while (isRunning) > doWork() > } catch { > case e: FatalExitError => > shutdownInitiated.countDown() > shutdownComplete.countDown() > info("Stopped") > Exit.exit(e.statusCode()) > case e: Throwable => > if (isRunning) > error("Error due to", e) > } finally { > shutdownInitiated.countDown() > shutdownComplete.countDown() > } > info("Stopped") > } > {code} > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] lushilin opened a new pull request #8535: KAFKA-9903
lushilin opened a new pull request #8535: URL: https://github.com/apache/kafka/pull/8535 ShutdownComplete will countdown in the finally block when thread shutdown due to an error, and in this case thread is not running. So isRunning logic should check isShutdownInitiated and isShutdownComplete. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lushilin commented on issue #8535: KAFKA-9903
lushilin commented on issue #8535: URL: https://github.com/apache/kafka/pull/8535#issuecomment-617984080 @guozhangwang @ijuma @junrao PTAL, thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9903) kafka ShutdownableThread judge thread isRuning status has some bug
[ https://issues.apache.org/jira/browse/KAFKA-9903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089973#comment-17089973 ] shilin Lu commented on KAFKA-9903: -- pull request: [https://github.com/apache/kafka/pull/8535] , Please take a look. > kafka ShutdownableThread judge thread isRuning status has some bug > --- > > Key: KAFKA-9903 > URL: https://issues.apache.org/jira/browse/KAFKA-9903 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.3.1 >Reporter: shilin Lu >Priority: Major > Attachments: image-2020-04-22-21-28-03-154.png > > > h2. 1.bug > {code:java} > override def run(): Unit = { > isStarted = true > info("Starting") > try { > while (isRunning) > doWork() > } catch { > case e: FatalExitError => > shutdownInitiated.countDown() > shutdownComplete.countDown() > info("Stopped") > Exit.exit(e.statusCode()) > case e: Throwable => > if (isRunning) > error("Error due to", e) > } finally { > shutdownInitiated.countDown() > shutdownComplete.countDown() > } > info("Stopped") > } > def isRunning: Boolean = { > shutdownInitiated.getCount() != 0 > }{code} > 1.when replicaThread has exception which is not fatalExitError, the thread > will exit,and run finally logic(countdown the shutdownComplete > conutdownLatch),but shutdownInitiated is not be countdown. > 2.with 1, shutdownInitiated is just not countdown, its value is 1, isRunning > logic just judge thread isRuning through shutdownInitiated != 0, so through > this method to judge thread status is wrong. > 3.isRunning method is used in shutdownIdleFetcherThreads, > processFetchRequest, controller request send and oher else, maybe cause > thread can't be remove and something can not be done > h2. 2.bugfix > Just like the following code,countdown shutdownInitiated in finally logic > > {code:java} > override def run(): Unit = { > isStarted = true > info("Starting") > try { > while (isRunning) > doWork() > } catch { > case e: FatalExitError => > shutdownInitiated.countDown() > shutdownComplete.countDown() > info("Stopped") > Exit.exit(e.statusCode()) > case e: Throwable => > if (isRunning) > error("Error due to", e) > } finally { > shutdownInitiated.countDown() > shutdownComplete.countDown() > } > info("Stopped") > } > {code} > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #8445: KAFKA-9823: Remember the sent generation for the coordinator request
hachikuji commented on a change in pull request #8445: URL: https://github.com/apache/kafka/pull/8445#discussion_r413250770 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ## @@ -466,24 +477,193 @@ public void testSyncGroupRequestWithFencedInstanceIdException() { } @Test -public void testHeartbeatUnknownMemberResponseDuringRebalancing() throws InterruptedException { +public void testJoinGroupUnknownMemberResponseWithOldGeneration() throws InterruptedException { setupCoordinator(); -mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); +joinGroup(); -final int generation = 1; +final AbstractCoordinator.Generation currGen = coordinator.generation(); -mockClient.prepareResponse(joinGroupFollowerResponse(generation, memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.NONE)); -mockClient.prepareResponse(syncGroupResponse(Errors.NONE)); +RequestFuture future = coordinator.sendJoinGroupRequest(); -coordinator.ensureActiveGroup(); +TestUtils.waitForCondition(() -> !mockClient.requests().isEmpty(), 2000, +"The join-group request was not sent in time after"); Review comment: nit: after.. what? I think you can drop "in time after." Here is the assertion that is used: ``` assertThat("Condition not met within timeout " + maxWaitMs + ". " + conditionDetails, testCondition.conditionMet()); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch opened a new pull request #8536: KAFKA-9883: Add better error message when REST API forwards a request and leader is not known
rhauch opened a new pull request #8536: URL: https://github.com/apache/kafka/pull/8536 When the Connect worker forwards a REST API request to the leader, it might get back a `RequestTargetException` that suggests the worker should forward the request to a different worker. This can happen when the leader changes, and the worker that receives the original request forwards the request to the worker that it thinks is the current leader, but that worker is not the current leader. In this case. In most cases, the worker that received the forwarded request includes the URL of the current leader, but it is possible (albeit rare) that the worker doesn’t know the current leader and will include a null leader URL in the resulting `RequestTargetException`. When this rare case happens, the user gets a null pointer exception in their response and the NPE is logged. Instead, the worker should catch this condition and provide a more useful error message that is similar to other existing error messages that might occur. Added a unit test that verifies this corner case is caught and this particular NPE does not occur. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-9883) Connect request to restart task can result in IllegalArgumentError: "uriTemplate" parameter is null
[ https://issues.apache.org/jira/browse/KAFKA-9883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch reassigned KAFKA-9883: Assignee: Randall Hauch > Connect request to restart task can result in IllegalArgumentError: > "uriTemplate" parameter is null > --- > > Key: KAFKA-9883 > URL: https://issues.apache.org/jira/browse/KAFKA-9883 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Minor > > When attempting to restart a connector, the following is logged by Connect: > > {code:java} > ERROR Uncaught exception in REST call to > /connectors/my-connector/tasks/0/restart > (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper) > java.lang.IllegalArgumentException: "uriTemplate" parameter is null. > at > org.glassfish.jersey.uri.internal.JerseyUriBuilder.uri(JerseyUriBuilder.java:189) > at > org.glassfish.jersey.uri.internal.JerseyUriBuilder.uri(JerseyUriBuilder.java:72) > at javax.ws.rs.core.UriBuilder.fromUri(UriBuilder.java:96) > at > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:263) > at > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:298) > at > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.restartTask(ConnectorsResource.java:218) > {code} > Resubmitting the restart REST request will usually resolve the problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vinothchandar commented on issue #8534: KAFKA-9512: Backport Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration
vinothchandar commented on issue #8534: URL: https://github.com/apache/kafka/pull/8534#issuecomment-618019176 Ran for over 600 times without issues. @guozhangwang ready. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on issue #8482: KAFKA-9863: update the deprecated --zookeeper option in the documentation into --bootstrap-server
cmccabe commented on issue #8482: URL: https://github.com/apache/kafka/pull/8482#issuecomment-618027980 LGTM. Thanks, @showuon , @rondagostino This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch
apovzner commented on a change in pull request #8509: URL: https://github.com/apache/kafka/pull/8509#discussion_r413337972 ## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ## @@ -1553,6 +1552,179 @@ class KafkaApisTest { assertEquals(Errors.INVALID_REQUEST, response.error) } + @Test + def testUpdateMetadataRequestWithCurrentBrokerEpoch(): Unit = { +val currentBrokerEpoch = 1239875L +testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE) + } + + @Test + def testUpdateMetadataRequestWithNewerBrokerEpochIsValid(): Unit = { +val currentBrokerEpoch = 1239875L +testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE) + } + + @Test + def testUpdateMetadataRequestWithStaleBrokerEpochIsRejected(): Unit = { +val currentBrokerEpoch = 1239875L +testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH) + } + + def testUpdateMetadataRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = { +val updateMetadataRequest = createBasicMetadataRequest("topicA", 1, brokerEpochInRequest) +val request = buildRequest(updateMetadataRequest) + +val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture() + +EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch) +EasyMock.expect(replicaManager.maybeUpdateMetadataCache( + EasyMock.eq(request.context.correlationId), + EasyMock.anyObject() +)).andStubReturn( + Seq() +) + + EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) +EasyMock.replay(replicaManager, controller, requestChannel) + +createKafkaApis().handleUpdateMetadataRequest(request) +val updateMetadataResponse = readResponse(ApiKeys.UPDATE_METADATA, updateMetadataRequest, capturedResponse) + .asInstanceOf[UpdateMetadataResponse] +assertEquals(expectedError, updateMetadataResponse.error()) +EasyMock.verify(replicaManager) + } + + @Test + def testLeaderAndIsrRequestWithCurrentBrokerEpoch(): Unit = { +val currentBrokerEpoch = 1239875L +testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE) + } + + @Test + def testLeaderAndIsrRequestWithNewerBrokerEpochIsValid(): Unit = { +val currentBrokerEpoch = 1239875L +testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE) + } + + @Test + def testLeaderAndIsrRequestWithStaleBrokerEpochIsRejected(): Unit = { +val currentBrokerEpoch = 1239875L +testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH) + } + + def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = { +val controllerId = 2 +val controllerEpoch = 6 +val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture() +val partitionStates = Seq( + new LeaderAndIsrRequestData.LeaderAndIsrPartitionState() +.setTopicName("topicW") +.setPartitionIndex(1) +.setControllerEpoch(1) +.setLeader(0) +.setLeaderEpoch(1) +.setIsr(asList(0, 1)) +.setZkVersion(2) +.setReplicas(asList(0, 1, 2)) +.setIsNew(false) +).asJava +val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder( + ApiKeys.LEADER_AND_ISR.latestVersion, + controllerId, + controllerEpoch, + brokerEpochInRequest, + partitionStates, + asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091)) +).build() +val request = buildRequest(leaderAndIsrRequest) +val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData() + .setErrorCode(Errors.NONE.code) + .setPartitionErrors(asList())) + +EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch) +EasyMock.expect(replicaManager.becomeLeaderOrFollower( + EasyMock.eq(request.context.correlationId), + EasyMock.anyObject(), + EasyMock.anyObject() +)).andStubReturn( + response +) + + EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) +EasyMock.replay(replicaManager, controller, requestChannel) + +createKafkaApis().handleLeaderAndIsrRequest(request) +val leaderAndIsrResponse = readResponse(ApiKeys.LEADER_AND_ISR, leaderAndIsrRequest, capturedResponse) + .asInstanceOf[LeaderAndIsrResponse] +assertEquals(expectedError, leaderAndIsrResponse.error()) +EasyMock.verify(replicaManager) + } + + @Test + def testStopReplicaRequestWithCurrentBrokerEpoch(): Unit = { +val currentBrokerEpoch = 1239875L +testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE) Review comment: thanks for catching this and others below! This is an automated mes
[GitHub] [kafka] apovzner commented on issue #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch
apovzner commented on issue #8509: URL: https://github.com/apache/kafka/pull/8509#issuecomment-618044488 @dajac and @hachikuji thanks for your comments, I addressed them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on issue #8530: KAFKA-9388: Refactor integration tests to always use different application ids
guozhangwang commented on issue #8530: URL: https://github.com/apache/kafka/pull/8530#issuecomment-618044905 The Jenkins failures are due to known flaky tests, I'm going to merge the PR as is. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9388) Flaky Test StandbyTaskCreationIntegrationTest.shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables
[ https://issues.apache.org/jira/browse/KAFKA-9388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-9388. -- Fix Version/s: 2.6.0 Assignee: Guozhang Wang Resolution: Fixed > Flaky Test > StandbyTaskCreationIntegrationTest.shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables > - > > Key: KAFKA-9388 > URL: https://issues.apache.org/jira/browse/KAFKA-9388 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.5.0 >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Critical > Labels: flaky-test > Fix For: 2.6.0 > > > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/122/testReport/junit/org.apache.kafka.streams.integration/StandbyTaskCreationIntegrationTest/shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables/] > {quote}java.lang.AssertionError: Condition not met within timeout 3. At > least one client did not reach state RUNNING with active tasks and stand-by > tasks: Client 1 is NOT OK, client 2 is NOT OK. at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:24) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:369) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:356) at > org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest.waitUntilBothClientAreOK(StandbyTaskCreationIntegrationTest.java:178) > at > org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest.shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables(StandbyTaskCreationIntegrationTest.java:141){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8369) Generate an immutable Map view for generated messages with a map key
[ https://issues.apache.org/jira/browse/KAFKA-8369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17090056#comment-17090056 ] Colin McCabe commented on KAFKA-8369: - So, the collection is the way it is to avoid having separate key and value classes. We could possibly provide an option in the JSON to generate separate key and value objects if we really want that. > Generate an immutable Map view for generated messages with a map key > > > Key: KAFKA-8369 > URL: https://issues.apache.org/jira/browse/KAFKA-8369 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > When using the "mapKey" feature, we get an ImplicitLinkedHashCollection which > can be used like a map using the `find()` API. The benefit of this is > hopefully avoiding a conversion to another type when handled by the broker, > but it is a little cumbersome to work with, so we often end up doing the > conversion anyway. One improvement would be to provide a way to convert this > collection to an immutable Map view so that it is easier to work with > directly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9905) The equals functions for generated classes should compare all fields
Colin McCabe created KAFKA-9905: --- Summary: The equals functions for generated classes should compare all fields Key: KAFKA-9905 URL: https://issues.apache.org/jira/browse/KAFKA-9905 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe Assignee: Colin McCabe The equals functions for generated classes should compare all fields, to avoid confusion. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on issue #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch
hachikuji commented on issue #8509: URL: https://github.com/apache/kafka/pull/8509#issuecomment-618070266 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on issue #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch
hachikuji commented on issue #8509: URL: https://github.com/apache/kafka/pull/8509#issuecomment-618070328 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on issue #8504: KAFKA-9298: reuse mapped stream error in joins
mjsax commented on issue #8504: URL: https://github.com/apache/kafka/pull/8504#issuecomment-618073161 > Ideally, the fix should be to generate a repartition topic name each time to avoid such issues. But IMHO that ship has already sailed because by introducing a new name generation will cause compatibility issues for existing topologies. Why that? Because such a topology would hit the bug, it could never be deployed, and thus nobody can actually run such a topology? In fact, shouldn't be "burn" and index even if a name is provided (IIRC, we do this for some cases)? I agree thought, that merging repartition topics (as proposed in (1)) should be done if possible (it's a historic artifact that we did not merge them in the past and IMHO we should not make the same mistake again?). For (2), it's a tricky question because the different names are used for different stores and changelog topics (ie, main purpose?) -- it seems to be a "nasty side effect" if we would end up with two repartition topics for this case? Of course, given the new `repartition()` operator, a user can work around it by using it after `map()` and before calling `join()`. Just brainstorming here what the impact could be and what tradeoff we want to pick. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8504: KAFKA-9298: reuse mapped stream error in joins
mjsax commented on a change in pull request #8504: URL: https://github.com/apache/kafka/pull/8504#discussion_r413380852 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ## @@ -989,16 +994,18 @@ private void to(final TopicNameExtractor topicExtractor, null, optimizableRepartitionNodeBuilder); -final OptimizableRepartitionNode optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build(); -builder.addGraphNode(streamsGraphNode, optimizableRepartitionNode); +if (repartitionNode == null || !name.equals(repartitionName)) { Review comment: Hmmm... I am wondering if just bumping the index would be sufficient and the optimizer would merge the node automatically? I am also not sure about the code structure: so far, the DSL layer does not know much about optimizations (even if we "leak" a little bit into it, as we built up the `StreamsGraphNode` graph... We would push some optimization decisions into the DSL layer thus spreading out "optimization code"? On the other hand, just inserting one `OptimizableRepartitionNode` is much more efficient than inserting multiple and let the optimizer remove them later? I am also wondering, if we could do the same for other repartition topics? Last question: this method is also use for stream-table joins and thus, if one joins a stream with two tables, would this change be backward incompatible? Or would two stream-table joins fail with the same `InvalidTopologyException`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8326: KAFKA-8639: Replace AddPartitionsToTxn with Automated Protocol
guozhangwang commented on a change in pull request #8326: URL: https://github.com/apache/kafka/pull/8326#discussion_r413374355 ## File path: clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java ## @@ -17,157 +17,109 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData; +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic; +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.utils.CollectionUtils; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH; -import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; -import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID; -import static org.apache.kafka.common.protocol.types.Type.INT32; - public class AddPartitionsToTxnRequest extends AbstractRequest { -private static final String TOPICS_KEY_NAME = "topics"; -private static final String PARTITIONS_KEY_NAME = "partitions"; - -private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema( -TRANSACTIONAL_ID, -PRODUCER_ID, -PRODUCER_EPOCH, -new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema( -TOPIC_NAME, -new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32, -"The partitions to add to the transaction.")); - -/** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ -private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V1 = ADD_PARTITIONS_TO_TXN_REQUEST_V0; - -public static Schema[] schemaVersions() { -return new Schema[]{ADD_PARTITIONS_TO_TXN_REQUEST_V0, ADD_PARTITIONS_TO_TXN_REQUEST_V1}; -} + +public final AddPartitionsToTxnRequestData data; public static class Builder extends AbstractRequest.Builder { -private final String transactionalId; -private final long producerId; -private final short producerEpoch; -private final List partitions; +public final AddPartitionsToTxnRequestData data; -public Builder(String transactionalId, long producerId, short producerEpoch, List partitions) { +public Builder(final AddPartitionsToTxnRequestData data) { super(ApiKeys.ADD_PARTITIONS_TO_TXN); -this.transactionalId = transactionalId; -this.producerId = producerId; -this.producerEpoch = producerEpoch; -this.partitions = partitions; +this.data = data; +} + +public Builder(final String transactionalId, + final long producerId, + final short producerEpoch, + final List partitions) { +super(ApiKeys.ADD_PARTITIONS_TO_TXN); + +Map> partitionMap = new HashMap<>(); +for (TopicPartition topicPartition : partitions) { +String topicName = topicPartition.topic(); + +List subPartitions = partitionMap.getOrDefault(topicName, +new ArrayList<>()); +subPartitions.add(topicPartition.partition()); +partitionMap.put(topicName, subPartitions); +} + +AddPartitionsToTxnTopicCollection topics = new AddPartitionsToTxnTopicCollection(); +for (Map.Entry> partitionEntry : partitionMap.entrySet()) { +topics.add(new AddPartitionsToTxnTopic() + .setName(partitionEntry.getKey()) + .setPartitions(partitionEntry.getValue())); +} + +this.data = new AddPartitionsToTxnRequestData() +.setTransactionalId(transactionalId) +.setProducerId(producerId) +.setProducerEpoch(producerEpoch) +.setTopics(topics); } @Override public AddPartitionsToTxnRequest build(short version) { -return new AddPartitionsToTxnRequest(version, transactionalId, producerId, producerEpoch, partitions); +return new AddPartitionsToTxnRequest(data, version); }
[GitHub] [kafka] C0urante commented on issue #8069: KAFKA-9374: Make connector interactions asynchronous
C0urante commented on issue #8069: URL: https://github.com/apache/kafka/pull/8069#issuecomment-618097440 @ncliang I've made some updates to the PR and rebased on the latest trunk; would you be willing to do another pass? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable
guozhangwang commented on a change in pull request #8486: URL: https://github.com/apache/kafka/pull/8486#discussion_r413417021 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() { validateOffsetsAsync(partitionsToValidate); } +/** + * For each partition which needs validation, make an asynchronous request to get the end-offsets for the partition + * with the epoch less than or equal to the epoch the partition last saw. + * + * Requests are grouped by Node for efficiency. + */ +private void validateOffsetsAsync(Map partitionsToValidate) { +final Map> regrouped = +regroupFetchPositionsByLeader(partitionsToValidate); + +regrouped.forEach((node, fetchPositions) -> { +if (node.isEmpty()) { +metadata.requestUpdate(); +return; +} + +NodeApiVersions nodeApiVersions = apiVersions.get(node.idString()); +if (nodeApiVersions == null) { +client.tryConnect(node); +return; +} + +if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) { +log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not " + + "support the required protocol version (introduced in Kafka 2.3)", +fetchPositions.keySet()); +completeAllValidations(fetchPositions); +return; +} + +// We need to get the client epoch state before sending out the leader epoch request, and use it to +// decide whether we need to validate offsets. +if (!metadata.hasReliableLeaderEpochs()) { +log.debug("Skipping validation of fetch offsets for partitions {} since the provided leader broker " + + "is not reliable", fetchPositions.keySet()); +completeAllValidations(fetchPositions); +return; +} + +subscriptions.setNextAllowedRetry(fetchPositions.keySet(), time.milliseconds() + requestTimeoutMs); + +RequestFuture future = +offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPositions); + +future.addListener(new RequestFutureListener() { +@Override +public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) { +Map truncationWithoutResetPolicy = new HashMap<>(); +if (!offsetsResult.partitionsToRetry().isEmpty()) { + subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), time.milliseconds() + retryBackoffMs); +metadata.requestUpdate(); +} + +// For each OffsetsForLeader response, check if the end-offset is lower than our current offset +// for the partition. If so, it means we have experienced log truncation and need to reposition +// that partition's offset. +// +// In addition, check whether the returned offset and epoch are valid. If not, then we should treat +// it as out of range and update metadata for rediscovery. +offsetsResult.endOffsets().forEach((respTopicPartition, respEndOffset) -> { +if (respEndOffset.hasUndefinedEpochOrOffset()) { +// Should attempt to find the new leader in the next try. +log.debug("Requesting metadata update for partition {} due to undefined epoch or offset {}", Review comment: nit: `... or offset {} from OffsetsForLeaderEpoch response` ## File path: clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java ## @@ -86,4 +84,9 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(error, leaderEpoch, endOffset); } + +public boolean hasUndefinedEpochOrOffset() { +return this.endOffset == UNDEFINED_EPOCH_OFFSET || Review comment: For my own understanding: if endOffset is UNDEFINED the epoch should always be UNDEFINED too? If that's the case we can just rely on `leaderEpoch` alone? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() { validateOffsetsAsync(partitionsToValidate); } +/** + * For each partition which needs validation, make an asynchronous request to get the end-offsets for the partition + * with the epoch less than or equal to the epoch the partition last saw. + * + * Requests are grouped by Node for efficiency. +
[GitHub] [kafka] guozhangwang commented on issue #8534: KAFKA-9512: Backport Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration
guozhangwang commented on issue #8534: URL: https://github.com/apache/kafka/pull/8534#issuecomment-618104185 Thanks @vinothchandar !! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR
junrao commented on a change in pull request #8524: URL: https://github.com/apache/kafka/pull/8524#discussion_r413436495 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1068,7 +1068,9 @@ class KafkaController(val config: KafkaConfig, val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) && controllerContext.partitionsBeingReassigned.isEmpty && !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) && - controllerContext.allTopics.contains(tp.topic)) + controllerContext.allTopics.contains(tp.topic) && + controllerContext.partitionLeadershipInfo.get(tp).forall(l => l.leaderAndIsr.isr.contains(leaderBroker)) Review comment: The preferred leader election also checks for live brokers. So, perhaps we could just call PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection() here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9906) Is bytesSinceLastIndexEntry updated correctly when LogSegment.append() is called ?
[ https://issues.apache.org/jira/browse/KAFKA-9906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiang Zhang updated KAFKA-9906: --- Summary: Is bytesSinceLastIndexEntry updated correctly when LogSegment.append() is called ? (was: Is bytesSinceLastIndexEntry updated correct when LogSegment.append() is called ?) > Is bytesSinceLastIndexEntry updated correctly when LogSegment.append() is > called ? > -- > > Key: KAFKA-9906 > URL: https://issues.apache.org/jira/browse/KAFKA-9906 > Project: Kafka > Issue Type: Improvement >Reporter: Xiang Zhang >Priority: Major > > I was reading code in LogSegment.scala and I found the code in -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9906) Is bytesSinceLastIndexEntry updated correct when LogSegment.append() is called ?
Xiang Zhang created KAFKA-9906: -- Summary: Is bytesSinceLastIndexEntry updated correct when LogSegment.append() is called ? Key: KAFKA-9906 URL: https://issues.apache.org/jira/browse/KAFKA-9906 Project: Kafka Issue Type: Improvement Reporter: Xiang Zhang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9906) Is bytesSinceLastIndexEntry updated correct when LogSegment.append() is called ?
[ https://issues.apache.org/jira/browse/KAFKA-9906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiang Zhang updated KAFKA-9906: --- Description: I was reading code in LogSegment.scala and I found the code in > Is bytesSinceLastIndexEntry updated correct when LogSegment.append() is > called ? > > > Key: KAFKA-9906 > URL: https://issues.apache.org/jira/browse/KAFKA-9906 > Project: Kafka > Issue Type: Improvement >Reporter: Xiang Zhang >Priority: Major > > I was reading code in LogSegment.scala and I found the code in -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9906) Is bytesSinceLastIndexEntry updated correctly when LogSegment.append() is called ?
[ https://issues.apache.org/jira/browse/KAFKA-9906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiang Zhang updated KAFKA-9906: --- Description: I was reading code in LogSegment.scala and I found the code below: {code:java} def append(largestOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = { ... val appendedBytes = log.append(records) if (bytesSinceLastIndexEntry > indexIntervalBytes) { offsetIndex.append(largestOffset, physicalPosition) timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar) bytesSinceLastIndexEntry = 0 } bytesSinceLastIndexEntry += records.sizeInBytes } {code} when bytesSinceLastIndexEntry > indexIntervalBytes, we update the offsetIndex and maybe the timeIndex and set bytesSinceLastIndexEntry to zero, which makes sense to me because we just update the index. However, following that, bytesSinceLastIndexEntry is incremented by records.sizeInBytes, which I find confusing since the records are appended before the index are updated. Maybe it should work like this : {code:java} if (bytesSinceLastIndexEntry > indexIntervalBytes) { offsetIndex.append(largestOffset, physicalPosition) timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar) bytesSinceLastIndexEntry = 0 } else { bytesSinceLastIndexEntry += records.sizeInBytes }{code} Sorry if I misunderstood this. was:I was reading code in LogSegment.scala and I found the code in > Is bytesSinceLastIndexEntry updated correctly when LogSegment.append() is > called ? > -- > > Key: KAFKA-9906 > URL: https://issues.apache.org/jira/browse/KAFKA-9906 > Project: Kafka > Issue Type: Improvement >Reporter: Xiang Zhang >Priority: Major > > I was reading code in LogSegment.scala and I found the code below: > > {code:java} > def append(largestOffset: Long, >largestTimestamp: Long, >shallowOffsetOfMaxTimestamp: Long, >records: MemoryRecords): Unit = { > ... > val appendedBytes = log.append(records) > if (bytesSinceLastIndexEntry > indexIntervalBytes) { > offsetIndex.append(largestOffset, physicalPosition) > timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar) > bytesSinceLastIndexEntry = 0 > } > bytesSinceLastIndexEntry += records.sizeInBytes > } > {code} > when bytesSinceLastIndexEntry > indexIntervalBytes, we update the offsetIndex > and maybe the timeIndex and set bytesSinceLastIndexEntry to zero, which makes > sense to me because we just update the index. However, following that, > bytesSinceLastIndexEntry is incremented by records.sizeInBytes, which I find > confusing since the records are appended before the index are updated. Maybe > it should work like this : > {code:java} > if (bytesSinceLastIndexEntry > indexIntervalBytes) { > offsetIndex.append(largestOffset, physicalPosition) > timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar) > bytesSinceLastIndexEntry = 0 > } else { > bytesSinceLastIndexEntry += records.sizeInBytes > }{code} > Sorry if I misunderstood this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9906) Is bytesSinceLastIndexEntry updated correctly in LogSegment.append()?
[ https://issues.apache.org/jira/browse/KAFKA-9906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiang Zhang updated KAFKA-9906: --- Summary: Is bytesSinceLastIndexEntry updated correctly in LogSegment.append()? (was: Is bytesSinceLastIndexEntry updated correctly when LogSegment.append() is called ?) > Is bytesSinceLastIndexEntry updated correctly in LogSegment.append()? > - > > Key: KAFKA-9906 > URL: https://issues.apache.org/jira/browse/KAFKA-9906 > Project: Kafka > Issue Type: Improvement >Reporter: Xiang Zhang >Priority: Major > > I was reading code in LogSegment.scala and I found the code below: > > {code:java} > def append(largestOffset: Long, >largestTimestamp: Long, >shallowOffsetOfMaxTimestamp: Long, >records: MemoryRecords): Unit = { > ... > val appendedBytes = log.append(records) > if (bytesSinceLastIndexEntry > indexIntervalBytes) { > offsetIndex.append(largestOffset, physicalPosition) > timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar) > bytesSinceLastIndexEntry = 0 > } > bytesSinceLastIndexEntry += records.sizeInBytes > } > {code} > when bytesSinceLastIndexEntry > indexIntervalBytes, we update the offsetIndex > and maybe the timeIndex and set bytesSinceLastIndexEntry to zero, which makes > sense to me because we just update the index. However, following that, > bytesSinceLastIndexEntry is incremented by records.sizeInBytes, which I find > confusing since the records are appended before the index are updated. Maybe > it should work like this : > {code:java} > if (bytesSinceLastIndexEntry > indexIntervalBytes) { > offsetIndex.append(largestOffset, physicalPosition) > timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar) > bytesSinceLastIndexEntry = 0 > } else { > bytesSinceLastIndexEntry += records.sizeInBytes > }{code} > Sorry if I misunderstood this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable
abbccdda commented on a change in pull request #8486: URL: https://github.com/apache/kafka/pull/8486#discussion_r413442881 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() { validateOffsetsAsync(partitionsToValidate); } +/** + * For each partition which needs validation, make an asynchronous request to get the end-offsets for the partition + * with the epoch less than or equal to the epoch the partition last saw. + * + * Requests are grouped by Node for efficiency. + */ +private void validateOffsetsAsync(Map partitionsToValidate) { +final Map> regrouped = +regroupFetchPositionsByLeader(partitionsToValidate); + +regrouped.forEach((node, fetchPositions) -> { +if (node.isEmpty()) { +metadata.requestUpdate(); +return; +} + +NodeApiVersions nodeApiVersions = apiVersions.get(node.idString()); +if (nodeApiVersions == null) { +client.tryConnect(node); +return; +} + +if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) { +log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not " + + "support the required protocol version (introduced in Kafka 2.3)", +fetchPositions.keySet()); +completeAllValidations(fetchPositions); +return; +} + +// We need to get the client epoch state before sending out the leader epoch request, and use it to +// decide whether we need to validate offsets. +if (!metadata.hasReliableLeaderEpochs()) { Review comment: Np, will do This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9857) Failed to build image ducker-ak-openjdk-8 on arm
[ https://issues.apache.org/jira/browse/KAFKA-9857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jiamei xie updated KAFKA-9857: -- Component/s: core > Failed to build image ducker-ak-openjdk-8 on arm > > > Key: KAFKA-9857 > URL: https://issues.apache.org/jira/browse/KAFKA-9857 > Project: Kafka > Issue Type: Bug > Components: build, core, system tests >Reporter: jiamei xie >Assignee: jiamei xie >Priority: Major > > It failed to build image ducker-ak-openjdk-8 on arm and below is its log. > This issue is to fix it. > kafka/tests/docker$ ./run_tests.sh > Sending build context to Docker daemon 53.76kB > Step 1/43 : ARG jdk_version=openjdk:8 > Step 2/43 : FROM $jdk_version > 8: Pulling from library/openjdk > no matching manifest for linux/arm64/v8 in the manifest list entries > docker failed > + die 'ducker-ak up failed' > + echo ducker-ak up failed > ducker-ak up failed > + exit 1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9857) Failed to build image ducker-ak-openjdk-8 on arm
[ https://issues.apache.org/jira/browse/KAFKA-9857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jiamei xie updated KAFKA-9857: -- Component/s: build > Failed to build image ducker-ak-openjdk-8 on arm > > > Key: KAFKA-9857 > URL: https://issues.apache.org/jira/browse/KAFKA-9857 > Project: Kafka > Issue Type: Bug > Components: build, system tests >Reporter: jiamei xie >Assignee: jiamei xie >Priority: Major > > It failed to build image ducker-ak-openjdk-8 on arm and below is its log. > This issue is to fix it. > kafka/tests/docker$ ./run_tests.sh > Sending build context to Docker daemon 53.76kB > Step 1/43 : ARG jdk_version=openjdk:8 > Step 2/43 : FROM $jdk_version > 8: Pulling from library/openjdk > no matching manifest for linux/arm64/v8 in the manifest list entries > docker failed > + die 'ducker-ak up failed' > + echo ducker-ak up failed > ducker-ak up failed > + exit 1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9804) Extract ConsumerPerform config into one file
[ https://issues.apache.org/jira/browse/KAFKA-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jiamei xie updated KAFKA-9804: -- Component/s: tools > Extract ConsumerPerform config into one file > > > Key: KAFKA-9804 > URL: https://issues.apache.org/jira/browse/KAFKA-9804 > Project: Kafka > Issue Type: Bug > Components: consumer, tools >Reporter: jiamei xie >Assignee: jiamei xie >Priority: Major > > Configs for producer has been extracted out of PerfConfig in > https://github.com/apache/kafka/pull/3613/commits. The remaining in > PerfConfig are configs for consumer. And ConsumerPerformance also has > configs for consumer. Separating these configs into two classes is not > concise. So we can put all configs into class > ConsumerPerformance.ConsumerPerfConfig. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jiameixie commented on issue #8489: KAFKA-9857:Failed to build image ducker-ak-openjdk-8 on arm
jiameixie commented on issue #8489: URL: https://github.com/apache/kafka/pull/8489#issuecomment-618132157 @guozhangwang @ijuma @junrao PTAL, thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable
abbccdda commented on a change in pull request #8486: URL: https://github.com/apache/kafka/pull/8486#discussion_r413468323 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java ## @@ -85,10 +85,6 @@ protected OffsetForEpochResult handleResponse( case KAFKA_STORAGE_ERROR: case OFFSET_NOT_AVAILABLE: case LEADER_NOT_AVAILABLE: -logger().debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.", Review comment: It is exactly the same as the subsequent cases handling. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on issue #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch
hachikuji commented on issue #8509: URL: https://github.com/apache/kafka/pull/8509#issuecomment-618147106 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8326: KAFKA-8639: Replace AddPartitionsToTxn with Automated Protocol
abbccdda commented on a change in pull request #8326: URL: https://github.com/apache/kafka/pull/8326#discussion_r413471840 ## File path: clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java ## @@ -17,157 +17,109 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData; +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic; +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.utils.CollectionUtils; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH; -import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; -import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID; -import static org.apache.kafka.common.protocol.types.Type.INT32; - public class AddPartitionsToTxnRequest extends AbstractRequest { -private static final String TOPICS_KEY_NAME = "topics"; -private static final String PARTITIONS_KEY_NAME = "partitions"; - -private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema( -TRANSACTIONAL_ID, -PRODUCER_ID, -PRODUCER_EPOCH, -new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema( -TOPIC_NAME, -new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32, -"The partitions to add to the transaction.")); - -/** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ -private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V1 = ADD_PARTITIONS_TO_TXN_REQUEST_V0; - -public static Schema[] schemaVersions() { -return new Schema[]{ADD_PARTITIONS_TO_TXN_REQUEST_V0, ADD_PARTITIONS_TO_TXN_REQUEST_V1}; -} + +public final AddPartitionsToTxnRequestData data; public static class Builder extends AbstractRequest.Builder { -private final String transactionalId; -private final long producerId; -private final short producerEpoch; -private final List partitions; +public final AddPartitionsToTxnRequestData data; -public Builder(String transactionalId, long producerId, short producerEpoch, List partitions) { +public Builder(final AddPartitionsToTxnRequestData data) { super(ApiKeys.ADD_PARTITIONS_TO_TXN); -this.transactionalId = transactionalId; -this.producerId = producerId; -this.producerEpoch = producerEpoch; -this.partitions = partitions; +this.data = data; +} + +public Builder(final String transactionalId, + final long producerId, + final short producerEpoch, + final List partitions) { +super(ApiKeys.ADD_PARTITIONS_TO_TXN); + +Map> partitionMap = new HashMap<>(); +for (TopicPartition topicPartition : partitions) { +String topicName = topicPartition.topic(); + +List subPartitions = partitionMap.getOrDefault(topicName, +new ArrayList<>()); +subPartitions.add(topicPartition.partition()); +partitionMap.put(topicName, subPartitions); +} + +AddPartitionsToTxnTopicCollection topics = new AddPartitionsToTxnTopicCollection(); +for (Map.Entry> partitionEntry : partitionMap.entrySet()) { +topics.add(new AddPartitionsToTxnTopic() + .setName(partitionEntry.getKey()) + .setPartitions(partitionEntry.getValue())); +} + +this.data = new AddPartitionsToTxnRequestData() +.setTransactionalId(transactionalId) +.setProducerId(producerId) +.setProducerEpoch(producerEpoch) +.setTopics(topics); } @Override public AddPartitionsToTxnRequest build(short version) { -return new AddPartitionsToTxnRequest(version, transactionalId, producerId, producerEpoch, partitions); +return new AddPartitionsToTxnRequest(data, version); }
[GitHub] [kafka] abbccdda commented on a change in pull request #8326: KAFKA-8639: Replace AddPartitionsToTxn with Automated Protocol
abbccdda commented on a change in pull request #8326: URL: https://github.com/apache/kafka/pull/8326#discussion_r413473580 ## File path: core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala ## @@ -27,9 +27,9 @@ import org.junit.{Before, Test} import scala.jdk.CollectionConverters._ -class AddPartitionsToTxnRequestTest extends BaseRequestTest { - private val topic1 = "foobartopic" - val numPartitions = 3 +class AddPartitionsToTxnRequestServerTest extends BaseRequestTest { Review comment: Because its name has a conflict with our added `AddPartitionsToTxnRequestTest`, just want to clarify that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8248: KAFKA-9501: convert between active and standby without closing stores
ableegoldman commented on a change in pull request #8248: URL: https://github.com/apache/kafka/pull/8248#discussion_r391363596 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java ## @@ -132,47 +130,74 @@ private static String getTaskProducerClientId(final String threadClientId, final partitions ); -if (threadProducer == null) { -final String taskProducerClientId = getTaskProducerClientId(threadId, taskId); -final Map producerConfigs = config.getProducerConfigs(taskProducerClientId); -producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + taskId); -log.info("Creating producer client for task {}", taskId); -taskProducers.put(taskId, clientSupplier.getProducer(producerConfigs)); -} - -final RecordCollector recordCollector = new RecordCollectorImpl( -logContext, -taskId, -consumer, -threadProducer != null ? -new StreamsProducer(threadProducer, false, logContext, applicationId) : -new StreamsProducer(taskProducers.get(taskId), true, logContext, applicationId), -config.defaultProductionExceptionHandler(), - EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)), -streamsMetrics -); - -final Task task = new StreamTask( +createdTasks.add(createStreamTask( taskId, partitions, -topology, consumer, -config, -streamsMetrics, -stateDirectory, -cache, -time, +logContext, stateManager, -recordCollector -); - -log.trace("Created task {} with assigned partitions {}", taskId, partitions); -createdTasks.add(task); -createTaskSensor.record(); +topology)); } return createdTasks; } +private StreamTask createStreamTask(final TaskId taskId, +final Set partitions, +final Consumer consumer, +final LogContext logContext, +final ProcessorStateManager stateManager, +final ProcessorTopology topology) { +if (threadProducer == null) { +final String taskProducerClientId = getTaskProducerClientId(threadId, taskId); +final Map producerConfigs = config.getProducerConfigs(taskProducerClientId); +producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + taskId); +log.info("Creating producer client for task {}", taskId); +taskProducers.put(taskId, clientSupplier.getProducer(producerConfigs)); +} + +final RecordCollector recordCollector = new RecordCollectorImpl( +logContext, +taskId, +consumer, +threadProducer != null ? +new StreamsProducer(threadProducer, false, logContext, applicationId) : + new StreamsProducer(taskProducers.get(taskId), true, logContext, applicationId), +config.defaultProductionExceptionHandler(), + EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)), +streamsMetrics +); + +final StreamTask task = new StreamTask( +taskId, +partitions, +topology, +consumer, +config, +streamsMetrics, +stateDirectory, +cache, +time, +stateManager, +recordCollector +); + +log.trace("Created task {} with assigned partitions {}", taskId, partitions); +createTaskSensor.record(); +return task; +} + +StreamTask convertStandbyToActive(final StandbyTask standbyTask, + final Set partitions, + final Consumer consumer) { +return createStreamTask( +standbyTask.id, +partitions, +consumer, +getLogContext(standbyTask.id), +standbyTask.stateMgr, +standbyTask.topology); Review comment: The `topology` is created but never initialized for a standby, therefore we don't need to worry about closing it and can reuse it here This is an automated message from the Apache Git Service. To
[GitHub] [kafka] chia7712 commented on issue #5935: KAFKA-7665: Replace BaseConsumerRecord with ConsumerRecord in MM
chia7712 commented on issue #5935: URL: https://github.com/apache/kafka/pull/5935#issuecomment-618162318 @huxihx Are you still working on this? I'd like to complete both of KIP and this PR :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jiameixie commented on issue #8529: KAFKA-9901:Fix streams_broker_bounce_test error
jiameixie commented on issue #8529: URL: https://github.com/apache/kafka/pull/8529#issuecomment-618169833 @guozhangwang @ijuma @junrao PTAL, thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9893) Configurable TCP connection timeout and improve the initial metadata fetch
[ https://issues.apache.org/jira/browse/KAFKA-9893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Tan updated KAFKA-9893: - Summary: Configurable TCP connection timeout and improve the initial metadata fetch (was: Configurable TCP connection timeout for AdminClient) > Configurable TCP connection timeout and improve the initial metadata fetch > -- > > Key: KAFKA-9893 > URL: https://issues.apache.org/jira/browse/KAFKA-9893 > Project: Kafka > Issue Type: New Feature >Reporter: Cheng Tan >Assignee: Cheng Tan >Priority: Major > > We do not currently allow for connection timeouts to be defined within > AdminClient, and as a result rely on the default OS settings to determine > whether a broker is inactive before selecting an alternate broker from > bootstrap. > In the case of a connection timeout on initial handshake, and where > tcp_syn_retries is the default (6), we won't timeout an unresponsive broker > until ~127s - while the client will timeout sooner (~120s). > Reducing tcp_syn_retries should mitigate the issue depending on the number of > unresponsive brokers within the bootstrap, though this will be applied system > wide, and it would be good if we could instead configure connection timeouts > for AdminClient. > The use case where this came up was a customer performing DC failover tests > with a stretch cluster. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jiameixie commented on issue #8446: KAFKA-9804:Extract consumer configs out of PerfConfig
jiameixie commented on issue #8446: URL: https://github.com/apache/kafka/pull/8446#issuecomment-618169937 @guozhangwang @ijuma @junrao PTAL, thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #8069: KAFKA-9374: Make connector interactions asynchronous
C0urante commented on a change in pull request #8069: URL: https://github.com/apache/kafka/pull/8069#discussion_r413499718 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ## @@ -263,17 +267,20 @@ public boolean startConnector( Plugins.compareAndSwapLoaders(savedLoader); workerMetricsGroup.recordConnectorStartupFailure(); statusListener.onFailure(connName, t); -return false; +onConnectorStateChange.onCompletion(t, null); +return; } +workerConnector.transitionTo(initialState, onConnectorStateChange); Review comment: This part still needs some work; it's in an inconsistent state because I modified `Worker::startConnector` to have no return value and instead communicate all success or failure of the connector startup through the callback, but haven't taken care of issues like possibly invoking the callback twice (once in this method, and once in the `WorkerConnector` instance), making sure to swap plugin classloaders at the right times, and preventing a possible race with the check to see if the connector already exists based on whether its name is present as a key in the `connectors` map. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9804) Extract ConsumerPerform config into one file
[ https://issues.apache.org/jira/browse/KAFKA-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jiamei xie updated KAFKA-9804: -- Labels: newbie pull-request-available (was: ) > Extract ConsumerPerform config into one file > > > Key: KAFKA-9804 > URL: https://issues.apache.org/jira/browse/KAFKA-9804 > Project: Kafka > Issue Type: Bug > Components: consumer, tools >Reporter: jiamei xie >Assignee: jiamei xie >Priority: Major > Labels: newbie, pull-request-available > > Configs for producer has been extracted out of PerfConfig in > https://github.com/apache/kafka/pull/3613/commits. The remaining in > PerfConfig are configs for consumer. And ConsumerPerformance also has > configs for consumer. Separating these configs into two classes is not > concise. So we can put all configs into class > ConsumerPerformance.ConsumerPerfConfig. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9893) Configurable TCP connection timeout and improve the initial metadata fetch
[ https://issues.apache.org/jira/browse/KAFKA-9893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Tan updated KAFKA-9893: - Description: This issue has two parts: # Support TCP connection timeout described in KIP-601 # Currently, the LeastLoadedNodeProvider might provide an offline/invalid node when no nodes provided in --boostrap-server option is not connected. The Cluster class shuffled the nodes to balance the initial pressure (I guess) and the LeastLoadedNodeProvider will always provide the same node, which is the last node after shuffling. Consequently, though we may provide several bootstrap servers, we might hit timeout if any of the servers shutdown. The implementation strategy for 1 is described in KIP-601 The solution for 2 is to implement a round-robin candidate node selection when every node is unconnected. We can either # shuffle the nodes every time we hit the "no node connected" status # keep the status of the nodes' try times and clean the try times after any of the nodes gets connected. was: We do not currently allow for connection timeouts to be defined within AdminClient, and as a result rely on the default OS settings to determine whether a broker is inactive before selecting an alternate broker from bootstrap. In the case of a connection timeout on initial handshake, and where tcp_syn_retries is the default (6), we won't timeout an unresponsive broker until ~127s - while the client will timeout sooner (~120s). Reducing tcp_syn_retries should mitigate the issue depending on the number of unresponsive brokers within the bootstrap, though this will be applied system wide, and it would be good if we could instead configure connection timeouts for AdminClient. The use case where this came up was a customer performing DC failover tests with a stretch cluster. > Configurable TCP connection timeout and improve the initial metadata fetch > -- > > Key: KAFKA-9893 > URL: https://issues.apache.org/jira/browse/KAFKA-9893 > Project: Kafka > Issue Type: New Feature >Reporter: Cheng Tan >Assignee: Cheng Tan >Priority: Major > > This issue has two parts: > # Support TCP connection timeout described in KIP-601 > # Currently, the LeastLoadedNodeProvider might provide an offline/invalid > node when no nodes provided in --boostrap-server option is not connected. The > Cluster class shuffled the nodes to balance the initial pressure (I guess) > and the LeastLoadedNodeProvider will always provide the same node, which is > the last node after shuffling. Consequently, though we may provide several > bootstrap servers, we might hit timeout if any of the servers shutdown. > The implementation strategy for 1 is described in KIP-601 > The solution for 2 is to implement a round-robin candidate node selection > when every node is unconnected. We can either > # shuffle the nodes every time we hit the "no node connected" status > # keep the status of the nodes' try times and clean the try times after any > of the nodes gets connected. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on issue #8533: KAFKA-9589: Fixed bug in V2 log validator tests
ijuma commented on issue #8533: URL: https://github.com/apache/kafka/pull/8533#issuecomment-618192475 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-9907) Switch default build to Scala 2.13
Ismael Juma created KAFKA-9907: -- Summary: Switch default build to Scala 2.13 Key: KAFKA-9907 URL: https://issues.apache.org/jira/browse/KAFKA-9907 Project: Kafka Issue Type: Improvement Reporter: Ismael Juma Assignee: Ismael Juma Fix For: 2.6.0 Scala 2.13.2 introduces support for suppressing warnings, which makes it possible to enable fatal warnings. This is useful enough from a development perspective to justify this change. In addition, Scala 2.13.2 also has a Vector implementation with significant performance improvements and encoding of String matches to switches. -- This message was sent by Atlassian Jira (v8.3.4#803005)