[jira] [Commented] (KAFKA-10182) Change number of partitions of __consumer_offsets
[ https://issues.apache.org/jira/browse/KAFKA-10182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139155#comment-17139155 ] huxihx commented on KAFKA-10182: You could use `Admin.createPartitions` API to increase partition count for this internal topic. > Change number of partitions of __consumer_offsets > -- > > Key: KAFKA-10182 > URL: https://issues.apache.org/jira/browse/KAFKA-10182 > Project: Kafka > Issue Type: Improvement >Reporter: Xu Zhang >Priority: Major > > {{currently __consumer_offsets}} cannot be changed for the lifetime of the > cluster, and it's generally a really bad idea to change the number of > partitions for __consumer_offsets after it is initially created. Because > hashing for consumer group name to partition to change, which means the group > coordinator will have no history. > > Is there a way to change the number of partitions for __consumer_offsets? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on pull request #8878: MINOR: Generator config-specific HTML ids
mimaison commented on pull request #8878: URL: https://github.com/apache/kafka/pull/8878#issuecomment-645861700 I think it's a useful fix but it's worth noting it will break all existing links. These links were only added a couple of releases ago too, so it's probably acceptable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #8878: MINOR: Generator config-specific HTML ids
tombentley commented on pull request #8878: URL: https://github.com/apache/kafka/pull/8878#issuecomment-645868566 Another thought I had about improving the docs (though not for this PR) would be to have an index of links in alphabetical order at the start of each section (e.g. after "Broker Configs" header and before the table). This would make it much simpler to find a specific config (if you knew its name). Currently you can click on the top level ToC to get to, e.g., the "Broker Configs" section, but then have to scroll for pages searching (and they're not in alphabetical order) until you find the one you want. With an index it would be two clicks, one (binary) search of a sorted list and no scrolling. We might not want this for all configs, but for the longer ones I think it would be valuable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-9509) Fix flaky test MirrorConnectorsIntegrationTest.testReplication
[ https://issues.apache.org/jira/browse/KAFKA-9509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-9509: Assignee: Luke Chen (was: Sanjana Kaundinya) > Fix flaky test MirrorConnectorsIntegrationTest.testReplication > -- > > Key: KAFKA-9509 > URL: https://issues.apache.org/jira/browse/KAFKA-9509 > Project: Kafka > Issue Type: Test > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Sanjana Kaundinya >Assignee: Luke Chen >Priority: Major > Fix For: 2.5.0 > > > The test > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication > is a flaky test for MirrorMaker 2.0. Its flakiness lies in the timing of > when the connectors and tasks are started up. The fix for this would make it > such that when the connectors are started up, to wait until the REST endpoint > returns a positive number of tasks to be confident that we can start testing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9509) Fix flaky test MirrorConnectorsIntegrationTest.testReplication
[ https://issues.apache.org/jira/browse/KAFKA-9509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139217#comment-17139217 ] Luke Chen commented on KAFKA-9509: -- [https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk8/runs/4650/log/?start=0] [https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk11/runs/1578/log/?start=0] [https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/226/log/?start=0] [https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk11/runs/1579/log/?start=0] org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > testReplication FAILED java.lang.RuntimeException: Could not find enough records. found 0, expected 100 at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.consume(EmbeddedKafkaCluster.java:435) at org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:222) [https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk8/runs/4651/log/?start=0] org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > testReplication FAILED java.lang.RuntimeException: Could not find enough records. found 0, expected 100 at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.consume(EmbeddedKafkaCluster.java:435) at org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:218) > Fix flaky test MirrorConnectorsIntegrationTest.testReplication > -- > > Key: KAFKA-9509 > URL: https://issues.apache.org/jira/browse/KAFKA-9509 > Project: Kafka > Issue Type: Test > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Sanjana Kaundinya >Assignee: Luke Chen >Priority: Major > Fix For: 2.5.0 > > > The test > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication > is a flaky test for MirrorMaker 2.0. Its flakiness lies in the timing of > when the connectors and tasks are started up. The fix for this would make it > such that when the connectors are started up, to wait until the REST endpoint > returns a positive number of tasks to be confident that we can start testing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9509) Fix flaky test MirrorConnectorsIntegrationTest.testReplication
[ https://issues.apache.org/jira/browse/KAFKA-9509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139218#comment-17139218 ] Luke Chen commented on KAFKA-9509: -- I take it over since it failed quite often recently, and also failed my PR testing!! :< > Fix flaky test MirrorConnectorsIntegrationTest.testReplication > -- > > Key: KAFKA-9509 > URL: https://issues.apache.org/jira/browse/KAFKA-9509 > Project: Kafka > Issue Type: Test > Components: mirrormaker >Affects Versions: 2.4.0 >Reporter: Sanjana Kaundinya >Assignee: Luke Chen >Priority: Major > Fix For: 2.5.0 > > > The test > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication > is a flaky test for MirrorMaker 2.0. Its flakiness lies in the timing of > when the connectors and tasks are started up. The fix for this would make it > such that when the connectors are started up, to wait until the REST endpoint > returns a positive number of tasks to be confident that we can start testing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] d8tltanc commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation
d8tltanc commented on pull request #8846: URL: https://github.com/apache/kafka/pull/8846#issuecomment-645877170 Thanks @abbccdda for the suggestions on the utility class. @skaundinya15 @ijuma I've opened this new PR for KIP-580 exponential retry backoff implementation. I've finalized the patch and it's ready for reviews. Please take your time and feel free to leave comments. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc edited a comment on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation
d8tltanc edited a comment on pull request #8846: URL: https://github.com/apache/kafka/pull/8846#issuecomment-645877170 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test
showuon opened a new pull request #8894: URL: https://github.com/apache/kafka/pull/8894 The test MirrorConnectorsIntegrationTest.testReplication failed too often recently. It failed on the build at least 6 times (I didn't check all failed builds) in today's(6/18) trunk build, and also failed my PR testing! I think it should be fixed soon to save developer's time. The test is to test MM2 replication. And recently, it failed all because the Records were not replicated to primary/backup cluster yet, so that the consumer cannot retrieve the records in time. In this PR, I add retries to these consumer.poll, to have 3 retries to poll the records, and keep the original. It should make the tests more stable. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test
showuon commented on pull request #8894: URL: https://github.com/apache/kafka/pull/8894#issuecomment-645886831 @ryannedolan @skaundinya15 @kkonstantine , could you review this PR to fix flaky test? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters
mimaison commented on pull request #8604: URL: https://github.com/apache/kafka/pull/8604#issuecomment-645910843 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gunnarmorling opened a new pull request #8895: KAFKA-8398 Avoiding NPE in ByteBufferUnmapper#unmap()
gunnarmorling opened a new pull request #8895: URL: https://github.com/apache/kafka/pull/8895 Regularly seeing this NPE when shutting down the broker in our tests. I *think* simply returning early in this case should suffice. Hey @ijuma, could you take a a look at this one perhaps? Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10183) MirrorMaker creates duplicate messages in target cluster
Liraz Sharaby created KAFKA-10183: - Summary: MirrorMaker creates duplicate messages in target cluster Key: KAFKA-10183 URL: https://issues.apache.org/jira/browse/KAFKA-10183 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 2.5.0, 2.4.0 Environment: Centos7.7 Reporter: Liraz Sharaby Issue: Mirror maker creates a consumer-producer pair per server listed in bootstrap.servers (mirrormaker config), resulting in duplicate messages in target cluster. When specifying 3 bootstrap servers, target topic will have 3 times the messages its source does. When specifying a single bootstrap server, only 1 consumer-producer pair is created, and message count is identical in source and target topics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ryannedolan commented on a change in pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test
ryannedolan commented on a change in pull request #8894: URL: https://github.com/apache/kafka/pull/8894#discussion_r442313614 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -207,23 +212,45 @@ public void close() { backup.stop(); } +// throw exception after 3 retries, and print expected error messages +private void assertEqualsWithConsumeRetries(final String errorMsg, Review comment: fwiw this doesn't adhere to kafka style guide (looks like Kafka Streams to me) ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -207,23 +212,45 @@ public void close() { backup.stop(); } +// throw exception after 3 retries, and print expected error messages +private void assertEqualsWithConsumeRetries(final String errorMsg, +final int numRecordsProduces, +final int timeout, +final ClusterType clusterType, +final String... topics) throws InterruptedException { +int retries = 3; +while (retries-- > 0) { +try { +int actualNum = clusterType == ClusterType.PRIMARY ? +primary.kafka().consume(numRecordsProduces, timeout, topics).count() : Review comment: these are really strange side-effects to have an an assert statement. I see what you are trying to do, but this is probably not the way to do it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ryannedolan commented on a change in pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test
ryannedolan commented on a change in pull request #8894: URL: https://github.com/apache/kafka/pull/8894#discussion_r442314691 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -207,23 +212,45 @@ public void close() { backup.stop(); } +// throw exception after 3 retries, and print expected error messages +private void assertEqualsWithConsumeRetries(final String errorMsg, +final int numRecordsProduces, +final int timeout, +final ClusterType clusterType, +final String... topics) throws InterruptedException { +int retries = 3; +while (retries-- > 0) { +try { +int actualNum = clusterType == ClusterType.PRIMARY ? +primary.kafka().consume(numRecordsProduces, timeout, topics).count() : Review comment: these are really strange side-effects to have in an assert statement. I see what you are trying to do, but this is probably not the way to do it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r442330606 ## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala ## @@ -106,12 +107,29 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest } } - def createGroupMembers(groupPrefix: String): Set[GroupMember] = { -(0 until nGroups).flatMap { i => - new Group(s"$groupPrefix$i", nMembersPerGroup, groupCoordinator, replicaManager).members -}.toSet + /** + * make CompleteTxnOperation and CommitTxnOffsetsOperation complete atomically since they don't typically overlap. + * Otherwise CompleteTxnOperation may see a pending offsetAndMetadata without an appendedBatchOffset. + */ + private val txnLock = new ReentrantLock + private val allGroupMembers = mutable.ArrayBuffer[GroupMember]() + + def groupMembers(groupId: String, nMembers: Int, groupCoordinator: GroupCoordinator): Seq[GroupMember] = { +val groupPartitionId = groupCoordinator.partitionFor(groupId) +groupCoordinator.groupManager.addPartitionOwnership(groupPartitionId) +val members = (0 until nMembers).map(i => new GroupMember(groupId = groupId, + groupPartitionId = groupPartitionId, + leader = i == 0, + // same producerId to tests more on transactional conflicts. + producerId = 1000, + txnLock = txnLock)) +allGroupMembers ++= members Review comment: Since createGroupMembers() is called in multiple tests, it seems we will be accumulating allGroupMembers across tests. That seems unexpected? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r442337184 ## File path: core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala ## @@ -106,12 +107,29 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest } } - def createGroupMembers(groupPrefix: String): Set[GroupMember] = { -(0 until nGroups).flatMap { i => - new Group(s"$groupPrefix$i", nMembersPerGroup, groupCoordinator, replicaManager).members -}.toSet + /** + * make CompleteTxnOperation and CommitTxnOffsetsOperation complete atomically since they don't typically overlap. + * Otherwise CompleteTxnOperation may see a pending offsetAndMetadata without an appendedBatchOffset. + */ + private val txnLock = new ReentrantLock + private val allGroupMembers = mutable.ArrayBuffer[GroupMember]() + + def groupMembers(groupId: String, nMembers: Int, groupCoordinator: GroupCoordinator): Seq[GroupMember] = { +val groupPartitionId = groupCoordinator.partitionFor(groupId) +groupCoordinator.groupManager.addPartitionOwnership(groupPartitionId) +val members = (0 until nMembers).map(i => new GroupMember(groupId = groupId, + groupPartitionId = groupPartitionId, + leader = i == 0, + // same producerId to tests more on transactional conflicts. + producerId = 1000, + txnLock = txnLock)) +allGroupMembers ++= members Review comment: Junit, by default, creates a new instance for each test case so ```allGroupMembers``` is always new one for each test case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages
hachikuji commented on pull request #8850: URL: https://github.com/apache/kafka/pull/8850#issuecomment-646125726 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages
hachikuji commented on pull request #8850: URL: https://github.com/apache/kafka/pull/8850#issuecomment-646125880 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages
hachikuji commented on pull request #8850: URL: https://github.com/apache/kafka/pull/8850#issuecomment-646126053 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories
ijuma commented on pull request #7929: URL: https://github.com/apache/kafka/pull/7929#issuecomment-646140719 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139533#comment-17139533 ] Guozhang Wang commented on KAFKA-10005: --- So just to have a quick summary, my proposal is primarily in three folds: 1) use {{db.addFileWithFileInfo(externalSstFileInfo)}} during restoration to add batch of records as SST files directly, this is to replace the impact of bulk loading. 2) move the restoration off the stream thread to a different thread (pool), for both restoring active tasks as well as updating standby tasks. 3) if needed, we also disable compaction during the restoration, and do a one-phase full compaction when we complete. We already have an internal BulkLoadStore interface which e.g. RocksDBStore extends, we can leverage that interface to "toggle" restoration mode for 1) and 3) above. cc [~cadonna] > Decouple RestoreListener from RestoreCallback and not enable bulk loading for > RocksDB > - > > Key: KAFKA-10005 > URL: https://issues.apache.org/jira/browse/KAFKA-10005 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.6.0 > > > In Kafka Streams we have two restoration callbacks: > * RestoreCallback (BatchingRestoreCallback): specified per-store via > registration to specify the logic of applying a batch of records read from > the changelog to the store. Used for both updating standby tasks and > restoring active tasks. > * RestoreListener: specified per-instance via `setRestoreListener`, to > specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`. > As we can see these two callbacks are for quite different purposes, however > today we allow user's to register a per-store RestoreCallback which is also > implementing the RestoreListener. Such weird mixing is actually motivated by > Streams internal usage to enable / disable bulk loading inside RocksDB. For > user's however this is less meaningful to specify a callback to be a listener > since the `onRestoreStart / End` has the storeName passed in, so that users > can just define different listening logic if needed for different stores. > On the other hand, this mixing of two callbacks enforces Streams to check > internally if the passed in per-store callback is also implementing listener, > and if yes trigger their calls, which increases the complexity. Besides, > toggle rocksDB for bulk loading requires us to open / close / reopen / > reclose 4 times during the restoration which could also be costly. > Given that we have KIP-441 in place, I think we should consider different > ways other than toggle bulk loading during restoration for Streams (e.g. > using different threads for restoration). > The proposal for this ticket is to completely decouple the listener from > callback -- i.e. we would not presume users passing in a callback function > that implements both RestoreCallback and RestoreListener, and also for > RocksDB we replace the bulk loading mechanism with other ways of > optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442354307 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) { return state; } +/** + * Get the id set of nodes which are in CONNECTING state + */ +public Set connectingNodes() { +return this.connectingNodes; +} + +/** + * Get the timestamp of the latest connection attempt of a given node + * @param id the connection to fetch the state for + */ +public long lastConnectAttemptMs(String id) { +NodeConnectionState nodeState = this.nodeState.get(id); +return nodeState == null ? 0 : nodeState.lastConnectAttemptMs; +} + +public long connectionSetupTimeoutMs(String id) { +NodeConnectionState nodeState = this.nodeState.get(id); +return nodeState.connectionSetupTimeoutMs; Review comment: No. The caller will ensure that the node is in the connecting state. I'll add an IllegalStateException here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) { return state; } +/** + * Get the id set of nodes which are in CONNECTING state + */ +public Set connectingNodes() { +return this.connectingNodes; +} + +/** + * Get the timestamp of the latest connection attempt of a given node + * @param id the connection to fetch the state for + */ +public long lastConnectAttemptMs(String id) { +NodeConnectionState nodeState = this.nodeState.get(id); +return nodeState == null ? 0 : nodeState.lastConnectAttemptMs; +} + +public long connectionSetupTimeoutMs(String id) { +NodeConnectionState nodeState = this.nodeState.get(id); +return nodeState.connectionSetupTimeoutMs; +} + +/** + * Test if the connection to the given node has reached its timeout + * @param id the connection to fetch the state for + * @param now the current time in ms + */ +public boolean isConnectionSetupTimeout(String id, long now) { +return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id); Review comment: Good catch. I'll make the logic record it in both `connecting` and `disconnected`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
[ https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-10184: -- Priority: Minor (was: Major) > Flaky > HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores > -- > > Key: KAFKA-10184 > URL: https://issues.apache.org/jira/browse/KAFKA-10184 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Minor > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 12. Input > records haven't all been written to the changelog: 442 > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.pr
[jira] [Created] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
Guozhang Wang created KAFKA-10184: - Summary: Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores Key: KAFKA-10184 URL: https://issues.apache.org/jira/browse/KAFKA-10184 Project: Kafka Issue Type: Bug Components: streams, unit tests Reporter: Guozhang Wang {code} Stacktrace java.lang.AssertionError: Condition not met within timeout 12. Input records haven't all been written to the changelog: 442 at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) at org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149) at org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.Dele
[jira] [Updated] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
[ https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-10184: -- Issue Type: Test (was: Bug) > Flaky > HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores > -- > > Key: KAFKA-10184 > URL: https://issues.apache.org/jira/browse/KAFKA-10184 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Major > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 12. Input > records haven't all been written to the changelog: 442 > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.pro
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) { return state; } +/** + * Get the id set of nodes which are in CONNECTING state + */ +public Set connectingNodes() { +return this.connectingNodes; +} + +/** + * Get the timestamp of the latest connection attempt of a given node + * @param id the connection to fetch the state for + */ +public long lastConnectAttemptMs(String id) { +NodeConnectionState nodeState = this.nodeState.get(id); +return nodeState == null ? 0 : nodeState.lastConnectAttemptMs; +} + +public long connectionSetupTimeoutMs(String id) { +NodeConnectionState nodeState = this.nodeState.get(id); +return nodeState.connectionSetupTimeoutMs; +} + +/** + * Test if the connection to the given node has reached its timeout + * @param id the connection to fetch the state for + * @param now the current time in ms + */ +public boolean isConnectionSetupTimeout(String id, long now) { +return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id); Review comment: I think so. The `lastConnectAttemptMs` is updated in both `connecting` and `disconnected`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) { return state; } +/** + * Get the id set of nodes which are in CONNECTING state + */ +public Set connectingNodes() { +return this.connectingNodes; +} + +/** + * Get the timestamp of the latest connection attempt of a given node + * @param id the connection to fetch the state for + */ +public long lastConnectAttemptMs(String id) { +NodeConnectionState nodeState = this.nodeState.get(id); +return nodeState == null ? 0 : nodeState.lastConnectAttemptMs; +} + +public long connectionSetupTimeoutMs(String id) { +NodeConnectionState nodeState = this.nodeState.get(id); +return nodeState.connectionSetupTimeoutMs; +} + +/** + * Test if the connection to the given node has reached its timeout + * @param id the connection to fetch the state for + * @param now the current time in ms + */ +public boolean isConnectionSetupTimeout(String id, long now) { +return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id); Review comment: I think so. The `lastConnectAttemptMs` is updated in both `connecting` and `disconnected`. (Line 145 & Line 157) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) { return state; } +/** + * Get the id set of nodes which are in CONNECTING state + */ +public Set connectingNodes() { +return this.connectingNodes; +} + +/** + * Get the timestamp of the latest connection attempt of a given node + * @param id the connection to fetch the state for + */ +public long lastConnectAttemptMs(String id) { +NodeConnectionState nodeState = this.nodeState.get(id); +return nodeState == null ? 0 : nodeState.lastConnectAttemptMs; +} + +public long connectionSetupTimeoutMs(String id) { +NodeConnectionState nodeState = this.nodeState.get(id); +return nodeState.connectionSetupTimeoutMs; +} + +/** + * Test if the connection to the given node has reached its timeout + * @param id the connection to fetch the state for + * @param now the current time in ms + */ +public boolean isConnectionSetupTimeout(String id, long now) { +return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id); Review comment: I think so. The `lastConnectAttemptMs` is updated in both `connecting` (Line 145 & Line 157) and `disconnected`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442362673 ## File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java ## @@ -103,6 +103,12 @@ Utils.join(SecurityProtocol.names(), ", ") + "."; public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT"; +public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms"; +public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the initial socket connection to be built. If the connection is not built before the timeout elapses the network client will close the socket channel. The default value will be 10 seconds."; Review comment: Make sense. I'll change the description and remove the defaults in the doc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442363632 ## File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ## @@ -678,7 +696,11 @@ public Node leastLoadedNode(long now) { } else if (connectionStates.isPreparingConnection(node.idString())) { foundConnecting = node; } else if (canConnect(node, now)) { -foundCanConnect = node; +if (foundCanConnect == null || + this.connectionStates.lastConnectAttemptMs(foundCanConnect.idString()) > + this.connectionStates.lastConnectAttemptMs(node.idString())) { +foundCanConnect = node; +} Review comment: Yes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
[ https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139543#comment-17139543 ] Sophie Blee-Goldman commented on KAFKA-10184: - Yeah, it's failing on the setup and hasn't even gotten to the real test at all cc/ [~vvcephei] seems like "500" was still too high :/ > Flaky > HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores > -- > > Key: KAFKA-10184 > URL: https://issues.apache.org/jira/browse/KAFKA-10184 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Minor > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 12. Input > records haven't all been written to the changelog: 442 > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoad
[GitHub] [kafka] edoardocomar commented on pull request #4204: KAFKA-5238: BrokerTopicMetrics can be recreated after topic is deleted
edoardocomar commented on pull request #4204: URL: https://github.com/apache/kafka/pull/4204#issuecomment-646162106 After @hachikuji fixes in https://github.com/apache/kafka/pull/8586 the metrics are no longer ticked at the end of a DelayedFetch, so the time window for topic deletion is almost non existent and the only guard code needed is left in `KafkaApis`, as shown by the unit test added by this PR. This PR is now tiny and would be nice to have it merged :-) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
[ https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139546#comment-17139546 ] Sophie Blee-Goldman commented on KAFKA-10184: - [~vvcephei] Could we maybe do something like "write as many records as you can in the 12ms timeout"? Since as you said the whole point is just to make sure we have enough records to represent a "reasonably large" number, if it takes 2 minutes to write only 100 records then those 100 records represent a heavy load (apparently...) > Flaky > HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores > -- > > Key: KAFKA-10184 > URL: https://issues.apache.org/jira/browse/KAFKA-10184 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Minor > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 12. Input > records haven't all been written to the changelog: 442 > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch
[jira] [Commented] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
[ https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139548#comment-17139548 ] Sophie Blee-Goldman commented on KAFKA-10184: - Or instead (or in addition to the above) maybe we should wait for the streams to be in RUNNING before we start the timeout for writing records > Flaky > HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores > -- > > Key: KAFKA-10184 > URL: https://issues.apache.org/jira/browse/KAFKA-10184 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Minor > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 12. Input > records haven't all been written to the changelog: 442 > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(Context
[GitHub] [kafka] hachikuji commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException
hachikuji commented on pull request #8822: URL: https://github.com/apache/kafka/pull/8822#issuecomment-646168538 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8672: KAFKA-10002; Improve performances of StopReplicaRequest with large number of partitions to be deleted
hachikuji commented on pull request #8672: URL: https://github.com/apache/kafka/pull/8672#issuecomment-646173735 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442372873 ## File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java ## @@ -103,6 +103,12 @@ Utils.join(SecurityProtocol.names(), ", ") + "."; public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT"; +public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms"; +public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the initial socket connection to be built. If the connection is not built before the timeout elapses the network client will close the socket channel. The default value will be 10 seconds."; + +public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG = "socket.connection.setup.timeout.max.ms"; +public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = "The maximum amount of time the client will wait for the initial socket connection to be built. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value. The default value will be 127 seconds."; Review comment: Refactored This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442375424 ## File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ## @@ -786,6 +808,29 @@ private void handleAbortedSends(List responses) { abortedSends.clear(); } +/** + * Handle socket channel connection timeout. The timeout will hit iff a connection + * stays at the ConnectionState.CONNECTING state longer than the timeout value, + * as indicated by ClusterConnectionStates.NodeConnectionState. + * + * @param responses The list of responses to update + * @param now The current time + */ +private void handleTimeoutConnections(List responses, long now) { +Set connectingNodes = connectionStates.connectingNodes(); +for (String nodeId: connectingNodes) { Review comment: Refactored This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #8891: KAFKA-10143; Restore ability to change throttle of active reassignment
cmccabe commented on a change in pull request #8891: URL: https://github.com/apache/kafka/pull/8891#discussion_r442378048 ## File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ## @@ -930,6 +935,38 @@ object ReassignPartitionsCommand extends Logging { (brokerListToReassign, topicsToReassign) } + /** + * The entry point for --alter-throttles. At least one throttle value must be provided. + * + * @param admin The Admin instance to use + * @param interBrokerThrottle The new inter-broker throttle or -1 to leave it unchanged + * @param logDirThrottle The new alter-log-dir throttle or -1 to leave it unchanged + */ + def alterThrottles(admin: Admin, + interBrokerThrottle: Long, + logDirThrottle: Long): Unit = { +if (interBrokerThrottle < 0 && logDirThrottle < 0) { + throw new TerseReassignmentFailureException("No valid throttle values provided to --alter-throttle") Review comment: It would be good to include the flags needed to pass in a throttle in this message This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442382471 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java ## @@ -149,6 +155,16 @@ atLeast(0), Importance.MEDIUM, REQUEST_TIMEOUT_MS_DOC) + .define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_CONFIG, +Type.LONG, +10 * 1000, +Importance.MEDIUM, + CommonClientConfigs.SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_DOC) + .define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MAX_MS_CONFIG, +Type.LONG, +127 * 1000, Review comment: Sounds good. WIll refactor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10185) Streams should log summarized restoration information at info level
John Roesler created KAFKA-10185: Summary: Streams should log summarized restoration information at info level Key: KAFKA-10185 URL: https://issues.apache.org/jira/browse/KAFKA-10185 Project: Kafka Issue Type: Task Components: streams Reporter: John Roesler Assignee: John Roesler -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442387660 ## File path: clients/src/test/java/org/apache/kafka/common/utils/GeometricProgressionTest.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class GeometricProgressionTest { +@Test +public void testGeometricProgression() { +long scaleFactor = 100; +int ratio = 2; +long termMax = 2000; +double jitter = 0.2; +GeometricProgression geometricProgression = new GeometricProgression( +scaleFactor, ratio, termMax, jitter +); + +for (int i = 0; i <= 100; i++) { +for (int n = 0; n <= 4; n++) { +assertEquals(scaleFactor * Math.pow(ratio, n), geometricProgression.term(n), +scaleFactor * Math.pow(ratio, n) * jitter); +} +System.out.println(geometricProgression.term(5)); Review comment: Oh, right. I missed removing it. ## File path: clients/src/test/java/org/apache/kafka/common/utils/GeometricProgressionTest.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class GeometricProgressionTest { +@Test +public void testGeometricProgression() { +long scaleFactor = 100; +int ratio = 2; +long termMax = 2000; +double jitter = 0.2; +GeometricProgression geometricProgression = new GeometricProgression( +scaleFactor, ratio, termMax, jitter +); + +for (int i = 0; i <= 100; i++) { +for (int n = 0; n <= 4; n++) { +assertEquals(scaleFactor * Math.pow(ratio, n), geometricProgression.term(n), +scaleFactor * Math.pow(ratio, n) * jitter); +} +System.out.println(geometricProgression.term(5)); Review comment: Oh, right. I forgot removing it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei opened a new pull request #8896: KAFKA-10185: Restoration info logging
vvcephei opened a new pull request #8896: URL: https://github.com/apache/kafka/pull/8896 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442389711 ## File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * An util class for exponential backoff, backoff, etc... + * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n + * If scaleFactor is greater or equal than termMax, a constant term of will be provided + * This class is thread-safe + */ +public class GeometricProgression { Review comment: Good idea. Will go for `ExponentialBackoff` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on a change in pull request #8683: URL: https://github.com/apache/kafka/pull/8683#discussion_r442394058 ## File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * An util class for exponential backoff, backoff, etc... + * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n + * If scaleFactor is greater or equal than termMax, a constant term of will be provided + * This class is thread-safe + */ +public class GeometricProgression { +private final int ratio; +private final double expMax; +private final long scaleFactor; +private final double jitter; + +public GeometricProgression(long scaleFactor, int ratio, long termMax, double jitter) { +this.scaleFactor = scaleFactor; +this.ratio = ratio; +this.jitter = jitter; +this.expMax = termMax > scaleFactor ? +Math.log(termMax / (double) Math.max(scaleFactor, 1)) / Math.log(ratio) : 0; +} + +public long term(long n) { Review comment: As we noticed in your earlier comments, the same value of `attempts` may correspond to different terms. connection_timeout = constant * 2 ^ (attempts) reconnect_backoff = constant * 2 ^ (attempts - 1) (in KIP-580) retry_backoff = constant * 2 ^ (attempts - 1) So I think using `retries` or `attempts` instead of `n` might also confuse people. Shall we think of another naming? ## File path: clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * An util class for exponential backoff, backoff, etc... + * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * (ratio) ^ n + * If scaleFactor is greater or equal than termMax, a constant term of will be provided + * This class is thread-safe + */ +public class GeometricProgression { +private final int ratio; +private final double expMax; +private final long scaleFactor; +private final double jitter; + +public GeometricProgression(long scaleFactor, int ratio, long termMax, double jitter) { +this.scaleFactor = scaleFactor; +this.ratio = ratio; +this.jitter = jitter; +this.expMax = termMax > scaleFactor ? +Math.log(termMax / (double) Math.max(scaleFactor, 1)) / Math.log(ratio) : 0; +} + +public long term(long n) { Review comment: As we noticed in your earlier comments, the same value of `attempts` may correspond to different terms. connection_timeout = constant * 2 ^ (attempts) reconnect_backoff = constant * 2 ^ (attempts - 1) (in KIP-580) retry_backoff = constant * 2 ^ (attempts - 1) So I think using `retries` or `attempts` instead of `n` might also confuse people. Shall we think of another naming? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructur
[GitHub] [kafka] hachikuji commented on pull request #8891: KAFKA-10143; Restore ability to change throttle of active reassignment
hachikuji commented on pull request #8891: URL: https://github.com/apache/kafka/pull/8891#issuecomment-646209534 Had some discussion offline with @cmccabe . The intention in KIP-455 is to use --additional to resubmit the reassignment and change the quota. I found that this did not work as expected when I tried it, so let me try to modify this patch so that the new integration tests use this behavior. Possibly we don't need --alter-throttle, but the tests and improved documentation would still be useful. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8896: KAFKA-10185: Restoration info logging
vvcephei commented on a change in pull request #8896: URL: https://github.com/apache/kafka/pull/8896#discussion_r442397523 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ## @@ -415,19 +418,20 @@ public void restore() { // for restoring active and updating standby we may prefer different poll time // in order to make sure we call the main consumer#poll in time. // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern -polledRecords = restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? Duration.ZERO : pollTime); +polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime); Review comment: trivial cleanup ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ## @@ -415,19 +418,20 @@ public void restore() { // for restoring active and updating standby we may prefer different poll time // in order to make sure we call the main consumer#poll in time. // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern -polledRecords = restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? Duration.ZERO : pollTime); +polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime); } catch (final InvalidOffsetException e) { -log.warn("Encountered {} fetching records from restore consumer for partitions {}, it is likely that " + +log.warn("Encountered " + e.getClass().getName() + +" fetching records from restore consumer for partitions " + e.partitions() + ", it is likely that " + "the consumer's position has fallen out of the topic partition offset range because the topic was " + "truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" + -" it later.", e.getClass().getName(), e.partitions()); +" it later.", e); Review comment: Added the exception itself as the "cause" of the warning. The actual message of the IOE is actually pretty good at explaining the root cause. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ## @@ -446,6 +450,38 @@ public void restore() { } maybeUpdateLimitOffsetsForStandbyChangelogs(); + +maybeLogRestorationProgress(); Review comment: This is the main change. Once every ten seconds, we will log the progress for each active restoring changelog. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ## @@ -415,19 +418,20 @@ public void restore() { // for restoring active and updating standby we may prefer different poll time // in order to make sure we call the main consumer#poll in time. // TODO: once we move ChangelogReader to a separate thread this may no longer be a concern -polledRecords = restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? Duration.ZERO : pollTime); +polledRecords = restoreConsumer.poll(state == ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime); } catch (final InvalidOffsetException e) { -log.warn("Encountered {} fetching records from restore consumer for partitions {}, it is likely that " + +log.warn("Encountered " + e.getClass().getName() + +" fetching records from restore consumer for partitions " + e.partitions() + ", it is likely that " + "the consumer's position has fallen out of the topic partition offset range because the topic was " + "truncated or compacted on the broker, marking the corresponding tasks as corrupted and re-initializing" + -" it later.", e.getClass().getName(), e.partitions()); +" it later.", e); final Map> taskWithCorruptedChangelogs = new HashMap<>(); for (final TopicPartition partition : e.partitions()) { final TaskId taskId = changelogs.get(partition).stateManager.taskId(); taskWithCorruptedChangelogs.computeIfAbsent(taskId, k -> new HashSet<>()).add(partition); } -throw new TaskCorruptedException(taskWithCorruptedChangelogs); +throw new TaskCorruptedException(taskWithCorruptedChangelogs, e); Review comment:
[GitHub] [kafka] d8tltanc commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc commented on pull request #8683: URL: https://github.com/apache/kafka/pull/8683#issuecomment-646217062 Thanks, @dajac for the comments. I've modified the PR per your suggestions. @rajinisivaram Do you think we can start testing? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #8876: KAFKA-10167: use the admin client to read end-offset
guozhangwang merged pull request #8876: URL: https://github.com/apache/kafka/pull/8876 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8876: KAFKA-10167: use the admin client to read end-offset
guozhangwang commented on pull request #8876: URL: https://github.com/apache/kafka/pull/8876#issuecomment-646234217 Cherry-pick to 2.6. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139916#comment-17139916 ] Sophie Blee-Goldman commented on KAFKA-10005: - I've been thinking about this lately and I'm not quite convinced we should move restoration to a separate thread as well (only standbys). With KIP-441, the majority of restoration will actually be done as a standby task. Only the last (hopefully-trivial) tail end of the changelog will be restored as an active task. Is that worth the overhead of thread synchronization to hand off tasks between the restore thread(s) and the main one? I'm not sure > Decouple RestoreListener from RestoreCallback and not enable bulk loading for > RocksDB > - > > Key: KAFKA-10005 > URL: https://issues.apache.org/jira/browse/KAFKA-10005 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.6.0 > > > In Kafka Streams we have two restoration callbacks: > * RestoreCallback (BatchingRestoreCallback): specified per-store via > registration to specify the logic of applying a batch of records read from > the changelog to the store. Used for both updating standby tasks and > restoring active tasks. > * RestoreListener: specified per-instance via `setRestoreListener`, to > specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`. > As we can see these two callbacks are for quite different purposes, however > today we allow user's to register a per-store RestoreCallback which is also > implementing the RestoreListener. Such weird mixing is actually motivated by > Streams internal usage to enable / disable bulk loading inside RocksDB. For > user's however this is less meaningful to specify a callback to be a listener > since the `onRestoreStart / End` has the storeName passed in, so that users > can just define different listening logic if needed for different stores. > On the other hand, this mixing of two callbacks enforces Streams to check > internally if the passed in per-store callback is also implementing listener, > and if yes trigger their calls, which increases the complexity. Besides, > toggle rocksDB for bulk loading requires us to open / close / reopen / > reclose 4 times during the restoration which could also be costly. > Given that we have KIP-441 in place, I think we should consider different > ways other than toggle bulk loading during restoration for Streams (e.g. > using different threads for restoration). > The proposal for this ticket is to completely decouple the listener from > callback -- i.e. we would not presume users passing in a callback function > that implements both RestoreCallback and RestoreListener, and also for > RocksDB we replace the bulk loading mechanism with other ways of > optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139919#comment-17139919 ] Guozhang Wang commented on KAFKA-10005: --- I think even if we only move standbys to separate threads, there are still synchronization required for IQ and rebalance needs, but I think we need to get to a POC to have a clear idea how much thread synchronization overhead would be incurred. We can discuss about this further if we see the synchronization overhead with just standby and standby + active is very different. > Decouple RestoreListener from RestoreCallback and not enable bulk loading for > RocksDB > - > > Key: KAFKA-10005 > URL: https://issues.apache.org/jira/browse/KAFKA-10005 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.6.0 > > > In Kafka Streams we have two restoration callbacks: > * RestoreCallback (BatchingRestoreCallback): specified per-store via > registration to specify the logic of applying a batch of records read from > the changelog to the store. Used for both updating standby tasks and > restoring active tasks. > * RestoreListener: specified per-instance via `setRestoreListener`, to > specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`. > As we can see these two callbacks are for quite different purposes, however > today we allow user's to register a per-store RestoreCallback which is also > implementing the RestoreListener. Such weird mixing is actually motivated by > Streams internal usage to enable / disable bulk loading inside RocksDB. For > user's however this is less meaningful to specify a callback to be a listener > since the `onRestoreStart / End` has the storeName passed in, so that users > can just define different listening logic if needed for different stores. > On the other hand, this mixing of two callbacks enforces Streams to check > internally if the passed in per-store callback is also implementing listener, > and if yes trigger their calls, which increases the complexity. Besides, > toggle rocksDB for bulk loading requires us to open / close / reopen / > reclose 4 times during the restoration which could also be costly. > Given that we have KIP-441 in place, I think we should consider different > ways other than toggle bulk loading during restoration for Streams (e.g. > using different threads for restoration). > The proposal for this ticket is to completely decouple the listener from > callback -- i.e. we would not presume users passing in a callback function > that implements both RestoreCallback and RestoreListener, and also for > RocksDB we replace the bulk loading mechanism with other ways of > optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139533#comment-17139533 ] Guozhang Wang edited comment on KAFKA-10005 at 6/18/20, 6:58 PM: - So just to have a quick summary, my proposal is primarily in three folds: 1) use {{db.addFileWithFileInfo(externalSstFileInfo)}} during restoration to add batch of records as SST files directly, this is to replace the impact of bulk loading. 2) move the restoration off the stream thread to a different thread (pool), for both restoring active tasks as well as updating standby tasks. 3) if needed, we also disable compaction during the restoration, and do a one-phase full compaction when we complete. I'm keeping it as "optional" for now since disabling compaction has both pros and cons, and if we have good performance from 1/2) alone then maybe we can afford to keep compaction enabled. We already have an internal BulkLoadStore interface which e.g. RocksDBStore extends, we can leverage that interface to "toggle" restoration mode for 1) and 3) above. cc [~cadonna] was (Author: guozhang): So just to have a quick summary, my proposal is primarily in three folds: 1) use {{db.addFileWithFileInfo(externalSstFileInfo)}} during restoration to add batch of records as SST files directly, this is to replace the impact of bulk loading. 2) move the restoration off the stream thread to a different thread (pool), for both restoring active tasks as well as updating standby tasks. 3) if needed, we also disable compaction during the restoration, and do a one-phase full compaction when we complete. We already have an internal BulkLoadStore interface which e.g. RocksDBStore extends, we can leverage that interface to "toggle" restoration mode for 1) and 3) above. cc [~cadonna] > Decouple RestoreListener from RestoreCallback and not enable bulk loading for > RocksDB > - > > Key: KAFKA-10005 > URL: https://issues.apache.org/jira/browse/KAFKA-10005 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.6.0 > > > In Kafka Streams we have two restoration callbacks: > * RestoreCallback (BatchingRestoreCallback): specified per-store via > registration to specify the logic of applying a batch of records read from > the changelog to the store. Used for both updating standby tasks and > restoring active tasks. > * RestoreListener: specified per-instance via `setRestoreListener`, to > specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`. > As we can see these two callbacks are for quite different purposes, however > today we allow user's to register a per-store RestoreCallback which is also > implementing the RestoreListener. Such weird mixing is actually motivated by > Streams internal usage to enable / disable bulk loading inside RocksDB. For > user's however this is less meaningful to specify a callback to be a listener > since the `onRestoreStart / End` has the storeName passed in, so that users > can just define different listening logic if needed for different stores. > On the other hand, this mixing of two callbacks enforces Streams to check > internally if the passed in per-store callback is also implementing listener, > and if yes trigger their calls, which increases the complexity. Besides, > toggle rocksDB for bulk loading requires us to open / close / reopen / > reclose 4 times during the restoration which could also be costly. > Given that we have KIP-441 in place, I think we should consider different > ways other than toggle bulk loading during restoration for Streams (e.g. > using different threads for restoration). > The proposal for this ticket is to completely decouple the listener from > callback -- i.e. we would not presume users passing in a callback function > that implements both RestoreCallback and RestoreListener, and also for > RocksDB we replace the bulk loading mechanism with other ways of > optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139925#comment-17139925 ] Sophie Blee-Goldman commented on KAFKA-10005: - Yeah, I didn't mean to imply we'd be synchronization-free with only standbys in a separate thread. But at least we'd only need to sync at rebalances, whereas with restoration in a separate thread we'll need to continually check for newly-restored tasks that should be taken over by the main thread. Anyways I agree, a POC and maybe some benchmarking should have the last word > Decouple RestoreListener from RestoreCallback and not enable bulk loading for > RocksDB > - > > Key: KAFKA-10005 > URL: https://issues.apache.org/jira/browse/KAFKA-10005 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.6.0 > > > In Kafka Streams we have two restoration callbacks: > * RestoreCallback (BatchingRestoreCallback): specified per-store via > registration to specify the logic of applying a batch of records read from > the changelog to the store. Used for both updating standby tasks and > restoring active tasks. > * RestoreListener: specified per-instance via `setRestoreListener`, to > specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`. > As we can see these two callbacks are for quite different purposes, however > today we allow user's to register a per-store RestoreCallback which is also > implementing the RestoreListener. Such weird mixing is actually motivated by > Streams internal usage to enable / disable bulk loading inside RocksDB. For > user's however this is less meaningful to specify a callback to be a listener > since the `onRestoreStart / End` has the storeName passed in, so that users > can just define different listening logic if needed for different stores. > On the other hand, this mixing of two callbacks enforces Streams to check > internally if the passed in per-store callback is also implementing listener, > and if yes trigger their calls, which increases the complexity. Besides, > toggle rocksDB for bulk loading requires us to open / close / reopen / > reclose 4 times during the restoration which could also be costly. > Given that we have KIP-441 in place, I think we should consider different > ways other than toggle bulk loading during restoration for Streams (e.g. > using different threads for restoration). > The proposal for this ticket is to completely decouple the listener from > callback -- i.e. we would not presume users passing in a callback function > that implements both RestoreCallback and RestoreListener, and also for > RocksDB we replace the bulk loading mechanism with other ways of > optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139926#comment-17139926 ] Rohan Desai commented on KAFKA-10179: - I'm not sure it's correct to use the same "topic" name for materializing optimized source tables, as it's logically different data. In the normal flow (not recovery), we're taking the topic data, validating/transforming it by deserializing it (which might apply some transforms like projecting just fields of interest), and then serializing it, and then writing it into the store. So the "topic" we pass to the serializer should be different since it represents different data from the source topic. This has consequences in practice when used with a schema registry using the confluent serializers. If we use the same topic, `serialize` might register a different schema with the source subject, which we probably don't want. I think the technically correct thing to do (though this is of course more expensive) would be (when the source table is optimized) to deserialize and serialize each record when restoring. Another issue that I think exists (need to try to reproduce) that deserializing/serializing would solve is skipped validation. The source topic deserializer functions as a sort of validator for records from the source topic. When the streams app is configured to skip on deserialization errors, bad source records are just skipped. However if we restore by just writing those records to the state store, we now hit the deserialization error when reading the state store, which is a query-killing error. > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] skaundinya15 commented on a change in pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test
skaundinya15 commented on a change in pull request #8894: URL: https://github.com/apache/kafka/pull/8894#discussion_r442445728 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -207,23 +212,45 @@ public void close() { backup.stop(); } +// throw exception after 3 retries, and print expected error messages +private void assertEqualsWithConsumeRetries(final String errorMsg, +final int numRecordsProduces, +final int timeout, +final ClusterType clusterType, +final String... topics) throws InterruptedException { +int retries = 3; +while (retries-- > 0) { +try { +int actualNum = clusterType == ClusterType.PRIMARY ? +primary.kafka().consume(numRecordsProduces, timeout, topics).count() : +backup.kafka().consume(numRecordsProduces, timeout, topics).count(); +if (numRecordsProduces == actualNum) +return; +} catch (Throwable e) { +log.error("Could not find enough records with {} retries left", retries, e); +} +} +throw new InterruptedException(errorMsg); +} + @Test public void testReplication() throws InterruptedException { MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig("primary")); MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig("backup")); -assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PRODUCED, Review comment: I'd agree with @ryannedolan here. We could use the `waitForCondition` in `TestUtils.java` instead to wait for the condition necessary instead. More details on that is here: https://github.com/apache/kafka/blob/d8cc6fe8e36329c647736773d9d66de89c447409/clients/src/test/java/org/apache/kafka/test/TestUtils.java#L370-L371 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #8672: KAFKA-10002; Improve performances of StopReplicaRequest with large number of partitions to be deleted
dajac commented on pull request #8672: URL: https://github.com/apache/kafka/pull/8672#issuecomment-646271944 @hachikuji I just rebased and fixed the build issue. Could you re-trigger jenkins please? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10185) Streams should log summarized restoration information at info level
[ https://issues.apache.org/jira/browse/KAFKA-10185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139955#comment-17139955 ] Boyang Chen commented on KAFKA-10185: - Could we add the context to this ticket? > Streams should log summarized restoration information at info level > --- > > Key: KAFKA-10185 > URL: https://issues.apache.org/jira/browse/KAFKA-10185 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas
[ https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139963#comment-17139963 ] Mateusz Jadczyk commented on KAFKA-9891: LGTM thanks for looking into it and making sure that 2.5.1/2.6 should be safe to use > Invalid state store content after task migration with exactly_once and > standby replicas > --- > > Key: KAFKA-9891 > URL: https://issues.apache.org/jira/browse/KAFKA-9891 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1, 2.5.0, 2.4.1 >Reporter: Mateusz Jadczyk >Assignee: Matthias J. Sax >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > Attachments: failedtest, failedtest2, failedtest3, failedtest3_bug, > state_store_operations.txt, tasks_assignment.txt > > > We have a simple command id deduplication mechanism (very similar to the one > from Kafka Streams examples) based on Kafka Streams State Stores. It stores > command ids from the past hour in _persistentWindowStore_. We encountered a > problem with the store if there's an exception thrown later in that topology. > We run 3 nodes using docker, each with multiple threads set for this > particular Streams Application. > The business flow is as follows (performed within a single subtopology): > * a valid command is sent with command id > (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active > task 1_2. First node in the topology analyses if this is a duplicate by > checking in the state store (_COMMAND_ID_STORE_), if not puts the command id > in the state store and processes the command properly. > * an invalid command is sent with the same key but new command id > (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the > duplicated command id is performed, it's not a duplicate, command id is put > into the state store. Next node in the topology throws an exception which > causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, > offsets are not committed. I double checked for the changelog topic - > relevant messages are not committed. Therefore, the changelog topic contains > only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and > not the one which caused a failure. > * in the meantime a standby task 1_2 running on NODE 3 replicated > _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local > _COMMAND_ID_STORE_ > * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. > It checks if this command id is a duplicate - no, it isn't - tries to process > the faulty command and throws an exception. Again, transaction aborted, all > looks fine. > * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, > *it is a duplicate!* Even though the transaction has been aborted and the > changelog doesn't contain this command id: > _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._ > > After digging into the Streams logs and some discussion on ([Stack > Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan]) > we concluded it has something to do with checkpoint files. Here are the > detailed logs relevant to checkpoint files. > > {code:java} > NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Checkpointable offsets read from checkpoint: {} > NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Restoring state store COMMAND_ID_STORE from changelog topic > Processor-COMMAND_ID_STORE-changelog at checkpoint null > NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] > standby-task [1_2] Checkpointable offsets read from checkpoint: {} > NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] > o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] > o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > /tmp/kafka-streams/Processor/1_2/.checkpoint > NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] > o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/
[jira] [Assigned] (KAFKA-10160) Kafka MM2 consumer configuration
[ https://issues.apache.org/jira/browse/KAFKA-10160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sats reassigned KAFKA-10160: Assignee: sats > Kafka MM2 consumer configuration > > > Key: KAFKA-10160 > URL: https://issues.apache.org/jira/browse/KAFKA-10160 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.0, 2.4.1 >Reporter: Pavol Ipoth >Assignee: sats >Priority: Major > Labels: configuration, kafka, mirror-maker > > [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L51,] > according this producer/consumer level properties should be configured as > e.g. somesource->sometarget.consumer.client.id, i try to set > somesource->sometarget.consumer.auto.offset.reset=latest, but without > success, consumer always tries to fetch earliest, not sure if bug or my > misconfiguration, but then at least some update to docu would be useful -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster
vvcephei commented on pull request #8892: URL: https://github.com/apache/kafka/pull/8892#issuecomment-646303241 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster
vvcephei commented on a change in pull request #8892: URL: https://github.com/apache/kafka/pull/8892#discussion_r442497609 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java ## @@ -148,27 +150,35 @@ private final TopicPartition t3p2 = new TopicPartition("topic3", 2); private final TopicPartition t3p3 = new TopicPartition("topic3", 3); -private final List infos = asList( -new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]) -); - -private final SubscriptionInfo defaultSubscriptionInfo = getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS); +private final List partitionInfos = getPartitionInfos(3, 3); +{ +partitionInfos.add(new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])); +} private final Cluster metadata = new Cluster( "cluster", Collections.singletonList(Node.noNode()), -infos, +partitionInfos, emptySet(), -emptySet()); +emptySet() +); + +/* Used by the scale test for large apps/clusters */ +private static final int NUM_TOPICS_XL = 10; +private static final int NUM_PARTITIONS_PER_TOPIC_XL = 1_000; +private static final int NUM_CONSUMERS_XL = 100; +private static final List TOPICS_LIST_XL = new ArrayList<>(); +private static final Map CHANGELOG_END_OFFSETS_XL = new HashMap<>(); +private static final List PARTITION_INFOS_XL = getPartitionInfos(NUM_TOPICS_XL, NUM_PARTITIONS_PER_TOPIC_XL); +private static final Cluster CLUSTER_METADATA_XL = new Cluster( +"cluster", +Collections.singletonList(Node.noNode()), +PARTITION_INFOS_XL, +emptySet(), +emptySet() +); Review comment: If there's a whole set of constants only used by one test, one might wonder whether that test shouldn't just be in its own class... ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java ## @@ -148,27 +150,35 @@ private final TopicPartition t3p2 = new TopicPartition("topic3", 2); private final TopicPartition t3p3 = new TopicPartition("topic3", 3); -private final List infos = asList( -new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), -new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]) -); - -private final SubscriptionInfo defaultSubscriptionInfo = getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS); +private final List partitionInfos = getPartitionInfos(3, 3); +{ +partitionInfos.add(new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])); +} Review comment: Can you just pass this as an argument to `getPartitionInfos` so that we can do all the initialization in the assignment instead of needing an initialization block? The fact that this field is used in another field initialization statement makes the initialization block kind of questionable, since you have to read the JVM spec to know if this block executes before or after the usage. Alternatively, maybe the prior code was actually better, because you can see exactly what data you're testing with, instead of having to go read another method to understand what `getPartitioninfos(3, 3)` might mean. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and
[GitHub] [kafka] ConcurrencyPractitioner commented on pull request #8881: KIP-557: Add emit on change support to Kafka Streams
ConcurrencyPractitioner commented on pull request #8881: URL: https://github.com/apache/kafka/pull/8881#issuecomment-646304787 @vvcephei Think you have time to look at this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request #8897: MINOR; Use the automated protocol for the Consumer Protocol's subscriptions and assignments
dajac opened a new pull request #8897: URL: https://github.com/apache/kafka/pull/8897 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vinothchandar opened a new pull request #8898: KAFKA-10138: Prefer --bootstrap-server for reassign_partitions command in ducktape tests
vinothchandar opened a new pull request #8898: URL: https://github.com/apache/kafka/pull/8898 http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-06-17--001.1592453352--vinothchandar--KC342-ducktape--e64cc463b/report.html Both ThrottlingTest and ReassignPartitionsTest, which invokes these methods pass locally and twice in CI. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8896: KAFKA-10185: Restoration info logging
vvcephei commented on a change in pull request #8896: URL: https://github.com/apache/kafka/pull/8896#discussion_r442529975 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ## @@ -496,8 +539,9 @@ private void bufferChangelogRecords(final ChangelogMetadata changelogMetadata, f } else { changelogMetadata.bufferedRecords.add(record); final long offset = record.offset(); -if (changelogMetadata.restoreEndOffset == null || offset < changelogMetadata.restoreEndOffset) +if (changelogMetadata.restoreEndOffset == null || offset < changelogMetadata.restoreEndOffset) { changelogMetadata.bufferedLimitIndex = changelogMetadata.bufferedRecords.size(); +} Review comment: I've rolled back a bunch of accidental formatting changes, but left the ones that are actually code style compliance issues (like using brackets around conditional bodies). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #8891: KAFKA-10143; Restore ability to change throttle of active reassignment
cmccabe commented on pull request #8891: URL: https://github.com/apache/kafka/pull/8891#issuecomment-646334265 The refactors to the tool generally look good. I thought we always allowed throttles to be set to 0 for some reason, though? Looks like this change removes that ability which we probably don't want... although I doubt many people are using it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #8891: KAFKA-10143; Restore ability to change throttle of active reassignment
cmccabe commented on a change in pull request #8891: URL: https://github.com/apache/kafka/pull/8891#discussion_r442533488 ## File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ## @@ -1685,10 +1702,9 @@ object ReassignPartitionsCommand extends Logging { opts.cancelOpt -> collection.immutable.Seq( opts.reassignmentJsonFileOpt ), - opts.listOpt -> collection.immutable.Seq( - ) + opts.listOpt -> collection.immutable.Seq.empty Review comment: seems to break the symmetry a bit, doesn't it? Although, I don't feel strongly about this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8896: KAFKA-10185: Restoration info logging
vvcephei commented on a change in pull request #8896: URL: https://github.com/apache/kafka/pull/8896#discussion_r442531300 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java ## @@ -223,6 +227,7 @@ public void shouldInitializeChangelogAndCheckForCompletion() { @Test public void shouldPollWithRightTimeout() { EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes(); + EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 5L)); Review comment: This is moderately obnoxious... The addition of logging these values means that these tests will get a NullPointerException unless we mock this call, but the mock is irrelevant to the test outcome. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10169) KafkaException: Failing batch since transaction was aborted
[ https://issues.apache.org/jira/browse/KAFKA-10169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman reassigned KAFKA-10169: --- Assignee: Sophie Blee-Goldman > KafkaException: Failing batch since transaction was aborted > --- > > Key: KAFKA-10169 > URL: https://issues.apache.org/jira/browse/KAFKA-10169 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.6.0 > > > We've seen the following exception in our eos-beta test application recently: > {code:java} > [2020-06-13T00:09:14-07:00] > (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic > stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-25-changelog for task > 1_2 due to: [2020-06-13T00:09:14-07:00] > (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) > org.apache.kafka.common.KafkaException: Failing batch since transaction was > aborted [2020-06-13T00:09:14-07:00] > (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) > Exception handler choose to FAIL the processing, no more records would be > sent. at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:213) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:185) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1347) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748) [2020-06-13T00:09:14-07:00] > (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) > Caused by: org.apache.kafka.common.KafkaException: Failing batch since > transaction was aborted at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) > ... 3 more > {code} > Somewhat unclear if this is an issue with eos-beta specifically, or just eos > in general. But several threads have died over the course of a few days in > the eos-beta application, while none so far have died on the eos-alpha > application. > It's also unclear (at least to me) whether this is definitely an issue in > Streams or possibly a bug in the producer (or even the broker, although that > seems unlikely) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception
Sophie Blee-Goldman created KAFKA-10186: --- Summary: Aborting transaction with pending data should throw non-fatal exception Key: KAFKA-10186 URL: https://issues.apache.org/jira/browse/KAFKA-10186 Project: Kafka Issue Type: Improvement Components: producer Reporter: Sophie Blee-Goldman Currently if you try to abort a transaction with any pending (non-flushed) data, the send exception is set to {code:java} KafkaException("Failing batch since transaction was aborted"){code} This exception type is generally considered fatal, but this is a valid state to be in -- the point of throwing the exception is to alert that the records will not be sent, not that you are in an unrecoverable error state. We should throw a different (possibly new) type of exception here to distinguish from fatal and recoverable errors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #8891: KAFKA-10143; Restore ability to change throttle of active reassignment
cmccabe commented on a change in pull request #8891: URL: https://github.com/apache/kafka/pull/8891#discussion_r442535630 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -647,8 +647,14 @@ class KafkaController(val config: KafkaConfig, info(s"Skipping reassignment of $tp since the topic is currently being deleted") new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist.") } else { -val assignedReplicas = controllerContext.partitionReplicaAssignment(tp) -if (assignedReplicas.nonEmpty) { +val assignment = controllerContext.partitionFullReplicaAssignment(tp) +if (assignment == ReplicaAssignment.empty) { + new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist.") +} else if (assignment == reassignment) { Review comment: It seems like the thing to agree on is the final state, right? This comparison is taking into account the current replica set which may change over the course of the reassignment... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception
[ https://issues.apache.org/jira/browse/KAFKA-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-10186: Labels: needs-kip newbie newbie++ (was: needs-kip) > Aborting transaction with pending data should throw non-fatal exception > --- > > Key: KAFKA-10186 > URL: https://issues.apache.org/jira/browse/KAFKA-10186 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: needs-kip, newbie, newbie++ > > Currently if you try to abort a transaction with any pending (non-flushed) > data, the send exception is set to > {code:java} > KafkaException("Failing batch since transaction was aborted"){code} > This exception type is generally considered fatal, but this is a valid state > to be in -- the point of throwing the exception is to alert that the records > will not be sent, not that you are in an unrecoverable error state. > We should throw a different (possibly new) type of exception here to > distinguish from fatal and recoverable errors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException
mumrah commented on pull request #8822: URL: https://github.com/apache/kafka/pull/8822#issuecomment-646338829 @hachikuji yea that's the check I was referring to (where we disregard the fetch response, errors included). Do you think any of the errors we handle besides OOOR are worth handling in the case that we're no longer in the FETCHING state? Like maybe one of the errors that triggers a metadata update? However, that might be adding complexity for little gain. I'm fine with it either way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request #8899: MINOR: Gate test coverage plugin behind Gradle property
ijuma opened a new pull request #8899: URL: https://github.com/apache/kafka/pull/8899 Most builds don't require test coverage output, so it's wasteful to spend cycles tracking coverage information for each method invoked. I ran a quick test in a fast desktop machine, the absolute difference will be larger in a slower machine. The tests were executed after `./gradlew clean` and with a gradlew daemon that was started just before the test (and mildly warmed up with `./gradlew clean` again). `./gradlew unitTest --continue --profile`: * With coverage enabled: 6m32s * With coverage disabled: 5m47s I ran the same test twice and the results were within 2s of each other, so reasonably consistent. 16% reduction in the time taken to run the unit tests is a reasonable gain with little downside, so I think this is a good change. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException
hachikuji commented on pull request #8822: URL: https://github.com/apache/kafka/pull/8822#issuecomment-646348733 @mumrah Hmm, I think I like the current approach of discarding the response if we're no longer in the same state in which the fetch state was sent. Mainly because it's simple. Arguably we could do something more refined. For example, a topic authorization error is still going to be relevant even if the partition is being reset. However, since we're talking about rare cases, it doesn't seem too worthwhile to try and optimize; worst case, we'll send the request again and get the same error. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed
[ https://issues.apache.org/jira/browse/KAFKA-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-10167. --- Resolution: Fixed > Streams EOS-Beta should not try to get end-offsets as read-committed > > > Key: KAFKA-10167 > URL: https://issues.apache.org/jira/browse/KAFKA-10167 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0 > > > This is a bug discovered with the new EOS protocol (KIP-447), here's the > context: > In Streams when we are assigned with the new active tasks, we would first try > to restore the state from the changelog topic all the way to the log end > offset, and then we can transit from the `restoring` to the `running` state > to start processing the task. > Before KIP-447, the end-offset call is only triggered after we've passed the > synchronization barrier at the txn-coordinator which would guarantee that the > txn-marker has been sent and received (otherwise we would error with > CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker > is received, it also means that the marker has been fully replicated, which > in turn guarantees that the data written before that marker has been fully > replicated. As a result, when we send the list-offset with `read-committed` > flag we are guaranteed that the returned offset == LSO == high-watermark. > After KIP-447 however, we do not fence on the txn-coordinator but on > group-coordinator upon offset-fetch, and the group-coordinator would return > the fetching offset right after it has received the replicated the txn-marker > sent to it. However, since the txn-marker are sent to different brokers in > parallel, and even within the same broker markers of different partitions are > appended / replicated independently as well, so when the fetch-offset request > returns it is NOT guaranteed that the LSO on other data partitions would have > been advanced as well. And hence in that case the `endOffset` call may > returned a smaller offset, causing data loss. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman opened a new pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close
ableegoldman opened a new pull request #8900: URL: https://github.com/apache/kafka/pull/8900 If there's any pending data and we haven't flushed the producer when we abort a transaction, a KafkaException is returned for the previous `send`. This is a bit misleading, since the situation is not an unrecoverable error and so the Kafka Exception is really non-fatal. For now, we should just catch and swallow this in the RecordCollector (see also: [KAFKA-10169](https://issues.apache.org/jira/browse/KAFKA-10186)) The reason we ended up aborting an un-flushed transaction was due to the combination of a. always aborting the ongoing transaction when any task is closed/revoked b. only committing (and flushing) if at least one of the revoked tasks needs to be committed Given the above, we can end up with an ongoing transaction that isn't committed since none of the revoked tasks have any data in the transaction. We then abort the transaction anyway, when those tasks are closed. So in addition to the above (swallowing this exception), we should avoid unnecessarily aborting data for tasks that haven't been revoked. We can handle this by splitting the RecordCollector's `close` into a dirty and clean flavor: if dirty, we need to abort the transaction since it may be dirty due to the commit attempt failing. But if clean, we can skip aborting the transaction since we know that either we just committed and thus there is no ongoing transaction to abort, or else the transaction in flight contains no data from the tasks being closed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vinothchandar commented on pull request #8898: KAFKA-10138: Prefer --bootstrap-server for reassign_partitions command in ducktape tests
vinothchandar commented on pull request #8898: URL: https://github.com/apache/kafka/pull/8898#issuecomment-646360446 @cmccabe Please take a pass when you can.. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException
mumrah commented on pull request #8822: URL: https://github.com/apache/kafka/pull/8822#issuecomment-646361035 Sounds good to me This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8871: MINOR: code cleanup for inconsistent naming
mjsax commented on pull request #8871: URL: https://github.com/apache/kafka/pull/8871#issuecomment-646365697 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs
mjsax commented on pull request #8752: URL: https://github.com/apache/kafka/pull/8752#issuecomment-646367241 @sneakyburro Any update on this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception
[ https://issues.apache.org/jira/browse/KAFKA-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140069#comment-17140069 ] Arun R commented on KAFKA-10186: I would love to take a look if no one else is looking at it. > Aborting transaction with pending data should throw non-fatal exception > --- > > Key: KAFKA-10186 > URL: https://issues.apache.org/jira/browse/KAFKA-10186 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: needs-kip, newbie, newbie++ > > Currently if you try to abort a transaction with any pending (non-flushed) > data, the send exception is set to > {code:java} > KafkaException("Failing batch since transaction was aborted"){code} > This exception type is generally considered fatal, but this is a valid state > to be in -- the point of throwing the exception is to alert that the records > will not be sent, not that you are in an unrecoverable error state. > We should throw a different (possibly new) type of exception here to > distinguish from fatal and recoverable errors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages
hachikuji commented on pull request #8850: URL: https://github.com/apache/kafka/pull/8850#issuecomment-646368944 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages
hachikuji commented on pull request #8850: URL: https://github.com/apache/kafka/pull/8850#issuecomment-646369171 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10114) Kafka producer stuck after broker crash
[ https://issues.apache.org/jira/browse/KAFKA-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140073#comment-17140073 ] Jason Gustafson commented on KAFKA-10114: - [~ibinyami] Hmm, this is still not very clear to me. > I think records were not timed out because the sequence that times them out > happens after you have a producer id. Network thread is stuck on > maybeWaitForProducerId(called from Sender.java:306) while the relevant > failBatch invocation is only called from sendProducerData() which is executed > after maybeWaitForProducerId (called from Sender.java:334) The logic in `awaitNodeReady` is not blocked on batch completion, but request completion. We do not need `failBatch` in order to complete a pending request and free up room for additional requests. If you can reproduce this, it would be very helpful to see TRACE level logging from the producer. > Kafka producer stuck after broker crash > --- > > Key: KAFKA-10114 > URL: https://issues.apache.org/jira/browse/KAFKA-10114 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.3.1, 2.4.1 >Reporter: Itamar Benjamin >Priority: Critical > > Today two of our kafka brokers crashed (cluster of 3 brokers), and producers > were not able to send new messages. After brokers started again all producers > resumed sending data except for a single one. > at the beginning producer rejected all new messages with TimeoutException: > > {code:java} > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > incoming-mutable-RuntimeIIL-1:12 ms has passed since batch creation > {code} > > then after sometime exception changed to > > {code:java} > org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory > within the configured max blocking time 6 ms. > {code} > > > jstack shows kafka-producer-network-thread is waiting to get producer id: > > {code:java} > "kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 > cpu=63594017.16ms elapsed=1511219.38s tid=0x7fffd8353000 nid=0x4fa4 > sleeping [0x7ff55c177000] >java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(java.base@11.0.1/Native Method) > at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296) > at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41) > at > org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) > at java.lang.Thread.run(java.base@11.0.1/Thread.java:834) Locked > ownable synchronizers: > - None > {code} > > digging into maybeWaitForProducerId(), it waits until some broker is ready > (awaitNodeReady function) which in return calls leastLoadedNode() on > NetworkClient. This one iterates over all brokers and checks if a request can > be sent to it using canSendRequest(). > This is the code for canSendRequest(): > > {code:java} > return connectionStates.isReady(node, now) && selector.isChannelReady(node) > && inFlightRequests.canSendMore(node) > {code} > > > using some debugging tools i saw this expression always evaluates to false > since the last part (canSendMore) is false. > > This is the code for canSendMore: > {code:java} > public boolean canSendMore(String node) { > Deque queue = requests.get(node); return queue > == null || queue.isEmpty() || (queue.peekFirst().send.completed() && > queue.size() < this.maxInFlightRequestsPerConnection); } > {code} > > > i verified > {code:java} > queue.peekFirst().send.completed() > {code} > is true, and that leads to the live lock - since requests queues are full for > all nodes a new request to check broker availability and reconnect to it > cannot be submitted. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140075#comment-17140075 ] Matthias J. Sax commented on KAFKA-10179: - {quote}I'm not sure it's correct to use the same "topic" name for materializing optimized source tables, as it's logically different data. In the normal flow (not recovery), we're taking the topic data, validating/transforming it by deserializing it (which might apply some transforms like projecting just fields of interest), and then serializing it, and then writing it into the store. So the "topic" we pass to the serializer should be different since it represents different data from the source topic. For this case, the soure-topic-changelog optimization does no apply, and the store would always have its own changelog topic. And thus, the input-topic schema registered in the SR should not be "touched", and the write to the changelog topic should register a new scheme using the changelog topic name. Thus, no naming issue in SR should happen. {quote} The source-topic-changelog optimization really only applies, if the data in the input topic is exactly the same as in the changelog topic and thus, we avoid creating the changelog topic. To ensure this, we don't allow any processing to happen in between. The data would be deserialized and re-serialized using the same Serde (this is inefficiency we pay, as we also need to send the de-serialized data downstream for further processing). {quote}Another issue that I think exists (need to try to reproduce) that deserializing/serializing would solve is skipped validation. The source topic deserializer functions as a sort of validator for records from the source topic. When the streams app is configured to skip on deserialization errors, bad source records are just skipped. However if we restore by just writing those records to the state store, we now hit the deserialization error when reading the state store, which is a query-killing error. {quote} This is a know issue and tracked via: https://issues.apache.org/jira/browse/KAFKA-8037 > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140075#comment-17140075 ] Matthias J. Sax edited comment on KAFKA-10179 at 6/19/20, 12:42 AM: {quote}I'm not sure it's correct to use the same "topic" name for materializing optimized source tables, as it's logically different data. In the normal flow (not recovery), we're taking the topic data, validating/transforming it by deserializing it (which might apply some transforms like projecting just fields of interest), and then serializing it, and then writing it into the store. So the "topic" we pass to the serializer should be different since it represents different data from the source topic. For this case, the soure-topic-changelog optimization does no apply, and the store would always have its own changelog topic. And thus, the input-topic schema registered in the SR should not be "touched", and the write to the changelog topic should register a new scheme using the changelog topic name. Thus, no naming issue in SR should happen. {quote} The source-topic-changelog optimization really only applies, if the data in the input topic is exactly the same as in the changelog topic and thus, we avoid creating the changelog topic. To ensure this, we don't allow any processing to happen in between. The data would be deserialized and re-serialized using the same Serde (this is inefficiency we pay, as we also need to send the de-serialized data downstream for further processing). {quote}Another issue that I think exists (need to try to reproduce) that deserializing/serializing would solve is skipped validation. The source topic deserializer functions as a sort of validator for records from the source topic. When the streams app is configured to skip on deserialization errors, bad source records are just skipped. However if we restore by just writing those records to the state store, we now hit the deserialization error when reading the state store, which is a query-killing error. {quote} This is a known issue and tracked via: https://issues.apache.org/jira/browse/KAFKA-8037 was (Author: mjsax): {quote}I'm not sure it's correct to use the same "topic" name for materializing optimized source tables, as it's logically different data. In the normal flow (not recovery), we're taking the topic data, validating/transforming it by deserializing it (which might apply some transforms like projecting just fields of interest), and then serializing it, and then writing it into the store. So the "topic" we pass to the serializer should be different since it represents different data from the source topic. For this case, the soure-topic-changelog optimization does no apply, and the store would always have its own changelog topic. And thus, the input-topic schema registered in the SR should not be "touched", and the write to the changelog topic should register a new scheme using the changelog topic name. Thus, no naming issue in SR should happen. {quote} The source-topic-changelog optimization really only applies, if the data in the input topic is exactly the same as in the changelog topic and thus, we avoid creating the changelog topic. To ensure this, we don't allow any processing to happen in between. The data would be deserialized and re-serialized using the same Serde (this is inefficiency we pay, as we also need to send the de-serialized data downstream for further processing). {quote}Another issue that I think exists (need to try to reproduce) that deserializing/serializing would solve is skipped validation. The source topic deserializer functions as a sort of validator for records from the source topic. When the streams app is configured to skip on deserialization errors, bad source records are just skipped. However if we restore by just writing those records to the state store, we now hit the deserialization error when reading the state store, which is a query-killing error. {quote} This is a know issue and tracked via: https://issues.apache.org/jira/browse/KAFKA-8037 > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most se
[GitHub] [kafka] mjsax commented on a change in pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…
mjsax commented on a change in pull request #8887: URL: https://github.com/apache/kafka/pull/8887#discussion_r442574113 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -741,29 +741,23 @@ void shutdown(final boolean clean) { for (final Task task : tasks.values()) { if (task.isActive()) { -try { - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()); -} catch (final RuntimeException e) { -if (clean) { -firstException.compareAndSet(null, e); -} else { -log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e); -} -} +executeAndMaybeSwallow( +clean, +() -> activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()), +e -> firstException.compareAndSet(null, e), +e -> log.warn("Ignoring an exception while closing task " + task.id() + " producer.", e) +); } } tasks.clear(); -try { -activeTaskCreator.closeThreadProducerIfNeeded(); -} catch (final RuntimeException e) { -if (clean) { -firstException.compareAndSet(null, e); -} else { -log.warn("Ignoring an exception while closing thread producer.", e); -} -} +executeAndMaybeSwallow( +clean, +() -> activeTaskCreator.closeThreadProducerIfNeeded(), Review comment: This is a lambda. Do you mean method reference? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…
mjsax commented on a change in pull request #8887: URL: https://github.com/apache/kafka/pull/8887#discussion_r442574574 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -1048,4 +1042,28 @@ public String toString(final String indent) { Set lockedTaskDirectories() { return Collections.unmodifiableSet(lockedTaskDirectories); } + +public static void executeAndMaybeSwallow(final boolean clean, + final Runnable runnable, + final java.util.function.Consumer actionIfClean, + final java.util.function.Consumer actionIfNotClean) { Review comment: I am wondering if adding this method to `TaskManager` is the best choice (cf https://issues.apache.org/jira/browse/KAFKA-10055). Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8886: KAFKA-9891: fix corrupted StandbyTask state
mjsax commented on pull request #8886: URL: https://github.com/apache/kafka/pull/8886#issuecomment-646376402 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8886: KAFKA-9891: fix corrupted StandbyTask state
mjsax commented on pull request #8886: URL: https://github.com/apache/kafka/pull/8886#issuecomment-646376357 Java 8 passed. Java 11: ``` kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Issue Comment Deleted] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception
[ https://issues.apache.org/jira/browse/KAFKA-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arun R updated KAFKA-10186: --- Comment: was deleted (was: I would love to take a look if no one else is looking at it.) > Aborting transaction with pending data should throw non-fatal exception > --- > > Key: KAFKA-10186 > URL: https://issues.apache.org/jira/browse/KAFKA-10186 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: needs-kip, newbie, newbie++ > > Currently if you try to abort a transaction with any pending (non-flushed) > data, the send exception is set to > {code:java} > KafkaException("Failing batch since transaction was aborted"){code} > This exception type is generally considered fatal, but this is a valid state > to be in -- the point of throwing the exception is to alert that the records > will not be sent, not that you are in an unrecoverable error state. > We should throw a different (possibly new) type of exception here to > distinguish from fatal and recoverable errors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8266) Improve `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`
[ https://issues.apache.org/jira/browse/KAFKA-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140082#comment-17140082 ] Matthias J. Sax commented on KAFKA-8266: The other ticket is resolved, but I just saw another failure for this test: [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7034/testReport/junit/kafka.api/ConsumerBounceTest/testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup/] {quote}org.scalatest.exceptions.TestFailedException: The remaining consumers in the group could not fetch the expected records at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at org.scalatest.Assertions.fail(Assertions.scala:1091) at org.scalatest.Assertions.fail$(Assertions.scala:1087) at org.scalatest.Assertions$.fail(Assertions.scala:1389) at kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:329){quote} \cc [~dajac] > Improve > `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup` > > > Key: KAFKA-8266 > URL: https://issues.apache.org/jira/browse/KAFKA-8266 > Project: Kafka > Issue Type: Test >Reporter: Jason Gustafson >Priority: Major > > Some additional validation could be done after the member gets kicked out. > The main thing is showing that the group can continue to consume data and > commit offsets. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException
hachikuji merged pull request #8822: URL: https://github.com/apache/kafka/pull/8822 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10113) LogTruncationException sets fetch offsets incorrectly
[ https://issues.apache.org/jira/browse/KAFKA-10113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-10113. - Fix Version/s: 2.6.0 Resolution: Fixed > LogTruncationException sets fetch offsets incorrectly > - > > Key: KAFKA-10113 > URL: https://issues.apache.org/jira/browse/KAFKA-10113 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.6.0 > > > LogTruncationException, which extends OffsetOutOfRangeException, takes the > divergent offsets in the constructor. These are the first offsets known to > diverge from what the consumer read. These are then passed to the > OffsetOutOfRangeException incorrectly as the out of range fetch offsets. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10113) LogTruncationException sets fetch offsets incorrectly
[ https://issues.apache.org/jira/browse/KAFKA-10113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-10113: Affects Version/s: 2.5.0 2.4.1 > LogTruncationException sets fetch offsets incorrectly > - > > Key: KAFKA-10113 > URL: https://issues.apache.org/jira/browse/KAFKA-10113 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.0, 2.4.1 >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.6.0 > > > LogTruncationException, which extends OffsetOutOfRangeException, takes the > divergent offsets in the constructor. These are the first offsets known to > diverge from what the consumer read. These are then passed to the > OffsetOutOfRangeException incorrectly as the out of range fetch offsets. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
[ https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140088#comment-17140088 ] John Roesler commented on KAFKA-10184: -- What in the world... How can we not have processed even 500 records in two minutes? I agree waiting for start up first would probably help. Do we have any logs that could confirm the hypothesis that the startup phase is eating up a bunch of our timeout? We should probably decrease the size of the records down from a whopping 1kB. My intent was to bridge the integration and system test worlds by creating “realistic” data here, but maybe that was expecting too much of the CIT infrastructure. > Flaky > HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores > -- > > Key: KAFKA-10184 > URL: https://issues.apache.org/jira/browse/KAFKA-10184 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Minor > > {code} > Stacktrace > java.lang.AssertionError: Condition not met within timeout 12. Input > records haven't all been written to the changelog: 442 > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149) > at > org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43