[jira] [Commented] (KAFKA-10182) Change number of partitions of __consumer_offsets

2020-06-18 Thread huxihx (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139155#comment-17139155
 ] 

huxihx commented on KAFKA-10182:


You could use `Admin.createPartitions` API to increase partition count for this 
internal topic.

> Change number of partitions of __consumer_offsets 
> --
>
> Key: KAFKA-10182
> URL: https://issues.apache.org/jira/browse/KAFKA-10182
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Xu Zhang
>Priority: Major
>
> {{currently __consumer_offsets}} cannot be changed for the lifetime of the 
> cluster, and it's generally a really bad idea to change the number of 
> partitions for __consumer_offsets after it is initially created. Because 
> hashing for consumer group name to partition to change, which means the group 
> coordinator will have no history.
>  
> Is there a way to change the number of partitions for __consumer_offsets? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on pull request #8878: MINOR: Generator config-specific HTML ids

2020-06-18 Thread GitBox


mimaison commented on pull request #8878:
URL: https://github.com/apache/kafka/pull/8878#issuecomment-645861700


   I think it's a useful fix but it's worth noting it will break all existing 
links. These links were only added a couple of releases ago too, so it's 
probably acceptable.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #8878: MINOR: Generator config-specific HTML ids

2020-06-18 Thread GitBox


tombentley commented on pull request #8878:
URL: https://github.com/apache/kafka/pull/8878#issuecomment-645868566


   Another thought I had about improving the docs (though not for this PR) 
would be to have an index of links in alphabetical order at the start of each 
section (e.g. after "Broker Configs" header and before the table). This would 
make it much simpler to find a specific config (if you knew its name). 
Currently you can click on the top level ToC to get to, e.g., the "Broker 
Configs" section, but then have to scroll for pages searching (and they're not 
in alphabetical order) until you find the one you want. With an index it would 
be two clicks, one (binary) search of a sorted list and no scrolling. We might 
not want this for all configs, but for the longer ones I think it would be 
valuable.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9509) Fix flaky test MirrorConnectorsIntegrationTest.testReplication

2020-06-18 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-9509:


Assignee: Luke Chen  (was: Sanjana Kaundinya)

> Fix flaky test MirrorConnectorsIntegrationTest.testReplication
> --
>
> Key: KAFKA-9509
> URL: https://issues.apache.org/jira/browse/KAFKA-9509
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Sanjana Kaundinya
>Assignee: Luke Chen
>Priority: Major
> Fix For: 2.5.0
>
>
> The test 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication
>  is a flaky test for MirrorMaker 2.0. Its flakiness lies in the timing of 
> when the connectors and tasks are started up. The fix for this would make it 
> such that when the connectors are started up, to wait until the REST endpoint 
> returns a positive number of tasks to be confident that we can start testing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9509) Fix flaky test MirrorConnectorsIntegrationTest.testReplication

2020-06-18 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139217#comment-17139217
 ] 

Luke Chen commented on KAFKA-9509:
--

[https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk8/runs/4650/log/?start=0]
[https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk11/runs/1578/log/?start=0]
[https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/226/log/?start=0]
[https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk11/runs/1579/log/?start=0]

 

org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
testReplication FAILED
 java.lang.RuntimeException: Could not find enough records. found 0, expected 
100
 at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.consume(EmbeddedKafkaCluster.java:435)
 at 
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:222)

[https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk8/runs/4651/log/?start=0]


org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
testReplication FAILED
 java.lang.RuntimeException: Could not find enough records. found 0, expected 
100
 at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.consume(EmbeddedKafkaCluster.java:435)
 at 
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:218)

> Fix flaky test MirrorConnectorsIntegrationTest.testReplication
> --
>
> Key: KAFKA-9509
> URL: https://issues.apache.org/jira/browse/KAFKA-9509
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Sanjana Kaundinya
>Assignee: Luke Chen
>Priority: Major
> Fix For: 2.5.0
>
>
> The test 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication
>  is a flaky test for MirrorMaker 2.0. Its flakiness lies in the timing of 
> when the connectors and tasks are started up. The fix for this would make it 
> such that when the connectors are started up, to wait until the REST endpoint 
> returns a positive number of tasks to be confident that we can start testing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9509) Fix flaky test MirrorConnectorsIntegrationTest.testReplication

2020-06-18 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139218#comment-17139218
 ] 

Luke Chen commented on KAFKA-9509:
--

I take it over since it failed quite often recently, and also failed my PR 
testing!! :<

 

> Fix flaky test MirrorConnectorsIntegrationTest.testReplication
> --
>
> Key: KAFKA-9509
> URL: https://issues.apache.org/jira/browse/KAFKA-9509
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Sanjana Kaundinya
>Assignee: Luke Chen
>Priority: Major
> Fix For: 2.5.0
>
>
> The test 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication
>  is a flaky test for MirrorMaker 2.0. Its flakiness lies in the timing of 
> when the connectors and tasks are started up. The fix for this would make it 
> such that when the connectors are started up, to wait until the REST endpoint 
> returns a positive number of tasks to be confident that we can start testing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] d8tltanc commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

2020-06-18 Thread GitBox


d8tltanc commented on pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-645877170


   Thanks @abbccdda for the suggestions on the utility class.
   
   @skaundinya15 @ijuma I've opened this new PR for KIP-580 exponential retry 
backoff implementation. I've finalized the patch and it's ready for reviews. 
Please take your time and feel free to leave comments. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc edited a comment on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

2020-06-18 Thread GitBox


d8tltanc edited a comment on pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-645877170







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test

2020-06-18 Thread GitBox


showuon opened a new pull request #8894:
URL: https://github.com/apache/kafka/pull/8894


   The test MirrorConnectorsIntegrationTest.testReplication failed too often 
recently. It failed on the build at least 6 times (I didn't check all failed 
builds) in today's(6/18) trunk build, and also failed my PR testing! I think it 
should be fixed soon to save developer's time. 
   
   The test is to test MM2 replication. And recently, it failed all because the 
Records were not replicated to primary/backup cluster yet, so that the consumer 
cannot retrieve the records in time.  In this PR, I add retries to these 
consumer.poll, to have 3 retries to poll the records, and keep the original. It 
should make the tests more stable.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test

2020-06-18 Thread GitBox


showuon commented on pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#issuecomment-645886831


   @ryannedolan @skaundinya15 @kkonstantine , could you review this PR to fix 
flaky test? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #8604: KIP-597: MirrorMaker2 internal topics Formatters

2020-06-18 Thread GitBox


mimaison commented on pull request #8604:
URL: https://github.com/apache/kafka/pull/8604#issuecomment-645910843


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gunnarmorling opened a new pull request #8895: KAFKA-8398 Avoiding NPE in ByteBufferUnmapper#unmap()

2020-06-18 Thread GitBox


gunnarmorling opened a new pull request #8895:
URL: https://github.com/apache/kafka/pull/8895


   Regularly seeing this NPE when shutting down the broker in our tests. I 
*think* simply returning early in this case should suffice.
   
   Hey @ijuma, could you take a a look at this one perhaps? Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10183) MirrorMaker creates duplicate messages in target cluster

2020-06-18 Thread Liraz Sharaby (Jira)
Liraz Sharaby created KAFKA-10183:
-

 Summary: MirrorMaker creates duplicate messages in target cluster
 Key: KAFKA-10183
 URL: https://issues.apache.org/jira/browse/KAFKA-10183
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.5.0, 2.4.0
 Environment: Centos7.7
Reporter: Liraz Sharaby


Issue: Mirror maker creates a consumer-producer pair per server listed in 
bootstrap.servers (mirrormaker config), resulting in duplicate messages in 
target cluster.

When specifying 3 bootstrap servers, target topic will have 3 times the 
messages its source does.
When specifying a single bootstrap server, only 1 consumer-producer pair is 
created, and message count is identical in source and target topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ryannedolan commented on a change in pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test

2020-06-18 Thread GitBox


ryannedolan commented on a change in pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#discussion_r442313614



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -207,23 +212,45 @@ public void close() {
 backup.stop();
 }
 
+// throw exception after 3 retries, and print expected error messages
+private void assertEqualsWithConsumeRetries(final String errorMsg,

Review comment:
   fwiw this doesn't adhere to kafka style guide (looks like Kafka Streams 
to me)

##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -207,23 +212,45 @@ public void close() {
 backup.stop();
 }
 
+// throw exception after 3 retries, and print expected error messages
+private void assertEqualsWithConsumeRetries(final String errorMsg,
+final int numRecordsProduces,
+final int timeout,
+final ClusterType clusterType,
+final String... topics) throws 
InterruptedException {
+int retries = 3;
+while (retries-- > 0) {
+try {
+int actualNum = clusterType == ClusterType.PRIMARY ?
+primary.kafka().consume(numRecordsProduces, timeout, 
topics).count() :

Review comment:
   these are really strange side-effects to have an an assert statement. I 
see what you are trying to do, but this is probably not the way to do it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ryannedolan commented on a change in pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test

2020-06-18 Thread GitBox


ryannedolan commented on a change in pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#discussion_r442314691



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -207,23 +212,45 @@ public void close() {
 backup.stop();
 }
 
+// throw exception after 3 retries, and print expected error messages
+private void assertEqualsWithConsumeRetries(final String errorMsg,
+final int numRecordsProduces,
+final int timeout,
+final ClusterType clusterType,
+final String... topics) throws 
InterruptedException {
+int retries = 3;
+while (retries-- > 0) {
+try {
+int actualNum = clusterType == ClusterType.PRIMARY ?
+primary.kafka().consume(numRecordsProduces, timeout, 
topics).count() :

Review comment:
   these are really strange side-effects to have in an assert statement. I 
see what you are trying to do, but this is probably not the way to do it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-06-18 Thread GitBox


junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r442330606



##
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
##
@@ -106,12 +107,29 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
 }
   }
 
-  def createGroupMembers(groupPrefix: String): Set[GroupMember] = {
-(0 until nGroups).flatMap { i =>
-  new Group(s"$groupPrefix$i", nMembersPerGroup, groupCoordinator, 
replicaManager).members
-}.toSet
+  /**
+   * make CompleteTxnOperation and CommitTxnOffsetsOperation complete 
atomically since they don't typically overlap.
+   * Otherwise CompleteTxnOperation may see a pending offsetAndMetadata 
without an appendedBatchOffset.
+   */
+  private val txnLock = new ReentrantLock
+  private val allGroupMembers = mutable.ArrayBuffer[GroupMember]()
+
+  def groupMembers(groupId: String, nMembers: Int, groupCoordinator: 
GroupCoordinator): Seq[GroupMember] = {
+val groupPartitionId = groupCoordinator.partitionFor(groupId)
+groupCoordinator.groupManager.addPartitionOwnership(groupPartitionId)
+val members = (0 until nMembers).map(i => new GroupMember(groupId = 
groupId,
+  groupPartitionId = groupPartitionId,
+  leader = i == 0,
+  // same producerId to tests more on transactional conflicts.
+  producerId = 1000,
+  txnLock = txnLock))
+allGroupMembers ++= members

Review comment:
   Since createGroupMembers() is called in multiple tests, it seems we will 
be accumulating allGroupMembers across tests. That seems unexpected?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-06-18 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r442337184



##
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
##
@@ -106,12 +107,29 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
 }
   }
 
-  def createGroupMembers(groupPrefix: String): Set[GroupMember] = {
-(0 until nGroups).flatMap { i =>
-  new Group(s"$groupPrefix$i", nMembersPerGroup, groupCoordinator, 
replicaManager).members
-}.toSet
+  /**
+   * make CompleteTxnOperation and CommitTxnOffsetsOperation complete 
atomically since they don't typically overlap.
+   * Otherwise CompleteTxnOperation may see a pending offsetAndMetadata 
without an appendedBatchOffset.
+   */
+  private val txnLock = new ReentrantLock
+  private val allGroupMembers = mutable.ArrayBuffer[GroupMember]()
+
+  def groupMembers(groupId: String, nMembers: Int, groupCoordinator: 
GroupCoordinator): Seq[GroupMember] = {
+val groupPartitionId = groupCoordinator.partitionFor(groupId)
+groupCoordinator.groupManager.addPartitionOwnership(groupPartitionId)
+val members = (0 until nMembers).map(i => new GroupMember(groupId = 
groupId,
+  groupPartitionId = groupPartitionId,
+  leader = i == 0,
+  // same producerId to tests more on transactional conflicts.
+  producerId = 1000,
+  txnLock = txnLock))
+allGroupMembers ++= members

Review comment:
   Junit, by default, creates a new instance for each test case so 
```allGroupMembers``` is always new one for each test case. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages

2020-06-18 Thread GitBox


hachikuji commented on pull request #8850:
URL: https://github.com/apache/kafka/pull/8850#issuecomment-646125726


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages

2020-06-18 Thread GitBox


hachikuji commented on pull request #8850:
URL: https://github.com/apache/kafka/pull/8850#issuecomment-646125880


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages

2020-06-18 Thread GitBox


hachikuji commented on pull request #8850:
URL: https://github.com/apache/kafka/pull/8850#issuecomment-646126053


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories

2020-06-18 Thread GitBox


ijuma commented on pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#issuecomment-646140719


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB

2020-06-18 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139533#comment-17139533
 ] 

Guozhang Wang commented on KAFKA-10005:
---

So just to have a quick summary, my proposal is primarily in three folds:

1) use {{db.addFileWithFileInfo(externalSstFileInfo)}} during restoration to 
add batch of records as SST files directly, this is to replace the impact of 
bulk loading.
2) move the restoration off the stream thread to a different thread (pool), for 
both restoring active tasks as well as updating standby tasks.
3) if needed, we also disable compaction during the restoration, and do a 
one-phase full compaction when we complete.

We already have an internal BulkLoadStore interface which e.g. RocksDBStore 
extends, we can leverage that interface to "toggle" restoration mode for 1) and 
3) above.

cc [~cadonna]

> Decouple RestoreListener from RestoreCallback and not enable bulk loading for 
> RocksDB
> -
>
> Key: KAFKA-10005
> URL: https://issues.apache.org/jira/browse/KAFKA-10005
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.6.0
>
>
> In Kafka Streams we have two restoration callbacks:
> * RestoreCallback (BatchingRestoreCallback): specified per-store via 
> registration to specify the logic of applying a batch of records read from 
> the changelog to the store. Used for both updating standby tasks and 
> restoring active tasks.
> * RestoreListener: specified per-instance via `setRestoreListener`, to 
> specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`.
> As we can see these two callbacks are for quite different purposes, however 
> today we allow user's to register a per-store RestoreCallback which is also 
> implementing the RestoreListener. Such weird mixing is actually motivated by 
> Streams internal usage to enable / disable bulk loading inside RocksDB. For 
> user's however this is less meaningful to specify a callback to be a listener 
> since the `onRestoreStart / End` has the storeName passed in, so that users 
> can just define different listening logic if needed for different stores.
> On the other hand, this mixing of two callbacks enforces Streams to check 
> internally if the passed in per-store callback is also implementing listener, 
> and if yes trigger their calls, which increases the complexity. Besides, 
> toggle rocksDB for bulk loading requires us to open / close / reopen / 
> reclose 4 times during the restoration which could also be costly.
> Given that we have KIP-441 in place, I think we should consider different 
> ways other than toggle bulk loading during restoration for Streams (e.g. 
> using different threads for restoration).
> The proposal for this ticket is to completely decouple the listener from 
> callback -- i.e. we would not presume users passing in a callback function 
> that implements both RestoreCallback and RestoreListener, and also for 
> RocksDB we replace the bulk loading mechanism with other ways of 
> optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442354307



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;

Review comment:
   No. The caller will ensure that the node is in the connecting state. 
I'll add an IllegalStateException here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;
+}
+
+/**
+ * Test if the connection to the given node has reached its timeout
+ * @param id the connection to fetch the state for
+ * @param now the current time in ms
+ */
+public boolean isConnectionSetupTimeout(String id, long now) {
+return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
   Good catch. I'll make the logic record it in both `connecting` and 
`disconnected`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2020-06-18 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-10184:
--
Priority: Minor  (was: Major)

> Flaky 
> HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
> --
>
> Key: KAFKA-10184
> URL: https://issues.apache.org/jira/browse/KAFKA-10184
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Minor
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 12. Input 
> records haven't all been written to the changelog: 442
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>   at com.sun.pr

[jira] [Created] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2020-06-18 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-10184:
-

 Summary: Flaky 
HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
 Key: KAFKA-10184
 URL: https://issues.apache.org/jira/browse/KAFKA-10184
 Project: Kafka
  Issue Type: Bug
  Components: streams, unit tests
Reporter: Guozhang Wang


{code}
Stacktrace
java.lang.AssertionError: Condition not met within timeout 12. Input 
records haven't all been written to the changelog: 442
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
at 
org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149)
at 
org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.Dele

[jira] [Updated] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2020-06-18 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-10184:
--
Issue Type: Test  (was: Bug)

> Flaky 
> HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
> --
>
> Key: KAFKA-10184
> URL: https://issues.apache.org/jira/browse/KAFKA-10184
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Major
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 12. Input 
> records haven't all been written to the changelog: 442
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>   at com.sun.pro

[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;
+}
+
+/**
+ * Test if the connection to the given node has reached its timeout
+ * @param id the connection to fetch the state for
+ * @param now the current time in ms
+ */
+public boolean isConnectionSetupTimeout(String id, long now) {
+return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
   I think so. The `lastConnectAttemptMs` is updated in both `connecting` 
and `disconnected`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;
+}
+
+/**
+ * Test if the connection to the given node has reached its timeout
+ * @param id the connection to fetch the state for
+ * @param now the current time in ms
+ */
+public boolean isConnectionSetupTimeout(String id, long now) {
+return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
   I think so. The `lastConnectAttemptMs` is updated in both `connecting` 
and `disconnected`. (Line 145 & Line 157)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442355123



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -357,6 +398,36 @@ private NodeConnectionState nodeState(String id) {
 return state;
 }
 
+/**
+ * Get the id set of nodes which are in CONNECTING state
+ */
+public Set connectingNodes() {
+return this.connectingNodes;
+}
+
+/**
+ * Get the timestamp of the latest connection attempt of a given node
+ * @param id the connection to fetch the state for
+ */
+public long lastConnectAttemptMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState == null ? 0 : nodeState.lastConnectAttemptMs;
+}
+
+public long connectionSetupTimeoutMs(String id) {
+NodeConnectionState nodeState = this.nodeState.get(id);
+return nodeState.connectionSetupTimeoutMs;
+}
+
+/**
+ * Test if the connection to the given node has reached its timeout
+ * @param id the connection to fetch the state for
+ * @param now the current time in ms
+ */
+public boolean isConnectionSetupTimeout(String id, long now) {
+return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);

Review comment:
   I think so. The `lastConnectAttemptMs` is updated in both `connecting` 
(Line 145 & Line 157) and `disconnected`. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442362673



##
File path: 
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##
@@ -103,6 +103,12 @@
 Utils.join(SecurityProtocol.names(), ", ") + ".";
 public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = 
"socket.connection.setup.timeout.ms";
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The 
amount of time the client will wait for the initial socket connection to be 
built. If the connection is not built before the timeout elapses the network 
client will close the socket channel. The default value will be 10 seconds.";

Review comment:
   Make sense. I'll change the description and remove the defaults in the 
doc.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442363632



##
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##
@@ -678,7 +696,11 @@ public Node leastLoadedNode(long now) {
 } else if 
(connectionStates.isPreparingConnection(node.idString())) {
 foundConnecting = node;
 } else if (canConnect(node, now)) {
-foundCanConnect = node;
+if (foundCanConnect == null ||
+
this.connectionStates.lastConnectAttemptMs(foundCanConnect.idString()) >
+
this.connectionStates.lastConnectAttemptMs(node.idString())) {
+foundCanConnect = node;
+}

Review comment:
   Yes





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2020-06-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139543#comment-17139543
 ] 

Sophie Blee-Goldman commented on KAFKA-10184:
-

Yeah, it's failing on the setup and hasn't even gotten to the real test at all 
cc/ [~vvcephei] seems like "500" was still too high :/

> Flaky 
> HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
> --
>
> Key: KAFKA-10184
> URL: https://issues.apache.org/jira/browse/KAFKA-10184
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Minor
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 12. Input 
> records haven't all been written to the changelog: 442
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoad

[GitHub] [kafka] edoardocomar commented on pull request #4204: KAFKA-5238: BrokerTopicMetrics can be recreated after topic is deleted

2020-06-18 Thread GitBox


edoardocomar commented on pull request #4204:
URL: https://github.com/apache/kafka/pull/4204#issuecomment-646162106


   After @hachikuji fixes in https://github.com/apache/kafka/pull/8586 
   the metrics are no longer ticked at the end of a DelayedFetch, so the time 
window for topic deletion is almost non existent and the only guard code needed 
is left in `KafkaApis`, as shown by the unit test added by this PR.
   This PR is now tiny and would be nice to have it merged :-)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2020-06-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139546#comment-17139546
 ] 

Sophie Blee-Goldman commented on KAFKA-10184:
-

[~vvcephei] Could we maybe do something like "write as many records as you can 
in the 12ms timeout"? Since as you said the whole point is just to make 
sure we have enough records to represent a "reasonably large" number, if it 
takes 2 minutes to write only 100 records then those 100 records represent a 
heavy load (apparently...)

> Flaky 
> HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
> --
>
> Key: KAFKA-10184
> URL: https://issues.apache.org/jira/browse/KAFKA-10184
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Minor
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 12. Input 
> records haven't all been written to the changelog: 442
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch

[jira] [Commented] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2020-06-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139548#comment-17139548
 ] 

Sophie Blee-Goldman commented on KAFKA-10184:
-

Or instead (or in addition to the above) maybe we should wait for the streams 
to be in RUNNING before we start the timeout for writing records

> Flaky 
> HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
> --
>
> Key: KAFKA-10184
> URL: https://issues.apache.org/jira/browse/KAFKA-10184
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Minor
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 12. Input 
> records haven't all been written to the changelog: 442
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(Context

[GitHub] [kafka] hachikuji commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

2020-06-18 Thread GitBox


hachikuji commented on pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#issuecomment-646168538


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8672: KAFKA-10002; Improve performances of StopReplicaRequest with large number of partitions to be deleted

2020-06-18 Thread GitBox


hachikuji commented on pull request #8672:
URL: https://github.com/apache/kafka/pull/8672#issuecomment-646173735







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442372873



##
File path: 
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##
@@ -103,6 +103,12 @@
 Utils.join(SecurityProtocol.names(), ", ") + ".";
 public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = 
"socket.connection.setup.timeout.ms";
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The 
amount of time the client will wait for the initial socket connection to be 
built. If the connection is not built before the timeout elapses the network 
client will close the socket channel. The default value will be 10 seconds.";
+
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG = 
"socket.connection.setup.timeout.max.ms";
+public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = 
"The maximum amount of time the client will wait for the initial socket 
connection to be built. The connection setup timeout will increase 
exponentially for each consecutive connection failure up to this maximum. To 
avoid connection storms, a randomization factor of 0.2 will be applied to the 
backoff resulting in a random range between 20% below and 20% above the 
computed value. The default value will be 127 seconds.";

Review comment:
   Refactored





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442375424



##
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##
@@ -786,6 +808,29 @@ private void handleAbortedSends(List 
responses) {
 abortedSends.clear();
 }
 
+/**
+ * Handle socket channel connection timeout. The timeout will hit iff a 
connection
+ * stays at the ConnectionState.CONNECTING state longer than the timeout 
value,
+ * as indicated by ClusterConnectionStates.NodeConnectionState.
+ *
+ * @param responses The list of responses to update
+ * @param now The current time
+ */
+private void handleTimeoutConnections(List responses, long 
now) {
+Set connectingNodes = connectionStates.connectingNodes();
+for (String nodeId: connectingNodes) {

Review comment:
   Refactored





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #8891: KAFKA-10143; Restore ability to change throttle of active reassignment

2020-06-18 Thread GitBox


cmccabe commented on a change in pull request #8891:
URL: https://github.com/apache/kafka/pull/8891#discussion_r442378048



##
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##
@@ -930,6 +935,38 @@ object ReassignPartitionsCommand extends Logging {
 (brokerListToReassign, topicsToReassign)
   }
 
+  /**
+   * The entry point for --alter-throttles. At least one throttle value must 
be provided.
+   *
+   * @param admin The Admin instance to use
+   * @param interBrokerThrottle The new inter-broker throttle or -1 to leave 
it unchanged
+   * @param logDirThrottle The new alter-log-dir throttle or -1 to leave it 
unchanged
+   */
+  def alterThrottles(admin: Admin,
+ interBrokerThrottle: Long,
+ logDirThrottle: Long): Unit = {
+if (interBrokerThrottle < 0 && logDirThrottle < 0) {
+  throw new TerseReassignmentFailureException("No valid throttle values 
provided to --alter-throttle")

Review comment:
   It would be good to include the flags needed to pass in a throttle in 
this message





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442382471



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
##
@@ -149,6 +155,16 @@
 atLeast(0),
 Importance.MEDIUM,
 REQUEST_TIMEOUT_MS_DOC)
+
.define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_CONFIG,
+Type.LONG,
+10 * 1000,
+Importance.MEDIUM,
+
CommonClientConfigs.SOCKET_CONNECTIONS_SETUP_TIMEOUT_MS_DOC)
+
.define(SOCKET_CONNECTIONS_SETUP_TIMEOUT_MAX_MS_CONFIG,
+Type.LONG,
+127 * 1000,

Review comment:
   Sounds good. WIll refactor.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10185) Streams should log summarized restoration information at info level

2020-06-18 Thread John Roesler (Jira)
John Roesler created KAFKA-10185:


 Summary: Streams should log summarized restoration information at 
info level
 Key: KAFKA-10185
 URL: https://issues.apache.org/jira/browse/KAFKA-10185
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442387660



##
File path: 
clients/src/test/java/org/apache/kafka/common/utils/GeometricProgressionTest.java
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class GeometricProgressionTest {
+@Test
+public void testGeometricProgression() {
+long scaleFactor = 100;
+int ratio = 2;
+long termMax = 2000;
+double jitter = 0.2;
+GeometricProgression geometricProgression = new GeometricProgression(
+scaleFactor, ratio, termMax, jitter
+);
+
+for (int i = 0; i <= 100; i++) {
+for (int n = 0; n <= 4; n++) {
+assertEquals(scaleFactor * Math.pow(ratio, n), 
geometricProgression.term(n),
+scaleFactor * Math.pow(ratio, n) * jitter);
+}
+System.out.println(geometricProgression.term(5));

Review comment:
   Oh, right. I missed removing it.

##
File path: 
clients/src/test/java/org/apache/kafka/common/utils/GeometricProgressionTest.java
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class GeometricProgressionTest {
+@Test
+public void testGeometricProgression() {
+long scaleFactor = 100;
+int ratio = 2;
+long termMax = 2000;
+double jitter = 0.2;
+GeometricProgression geometricProgression = new GeometricProgression(
+scaleFactor, ratio, termMax, jitter
+);
+
+for (int i = 0; i <= 100; i++) {
+for (int n = 0; n <= 4; n++) {
+assertEquals(scaleFactor * Math.pow(ratio, n), 
geometricProgression.term(n),
+scaleFactor * Math.pow(ratio, n) * jitter);
+}
+System.out.println(geometricProgression.term(5));

Review comment:
   Oh, right. I forgot removing it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #8896: KAFKA-10185: Restoration info logging

2020-06-18 Thread GitBox


vvcephei opened a new pull request #8896:
URL: https://github.com/apache/kafka/pull/8896


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442389711



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {

Review comment:
   Good idea. Will go for `ExponentialBackoff`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r442394058



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+private final int ratio;
+private final double expMax;
+private final long scaleFactor;
+private final double jitter;
+
+public GeometricProgression(long scaleFactor, int ratio, long termMax, 
double jitter) {
+this.scaleFactor = scaleFactor;
+this.ratio = ratio;
+this.jitter = jitter;
+this.expMax = termMax > scaleFactor ?
+Math.log(termMax / (double) Math.max(scaleFactor, 1)) / 
Math.log(ratio) : 0;
+}
+
+public long term(long n) {

Review comment:
   As we noticed in your earlier comments, the same value of `attempts` may 
correspond to different terms.
   connection_timeout = constant * 2 ^ (attempts)
   reconnect_backoff = constant * 2 ^ (attempts - 1)
   (in KIP-580) retry_backoff = constant * 2 ^ (attempts - 1) 
   So I think using `retries` or `attempts` instead of `n` might also confuse 
people. Shall we think of another naming? 

##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+private final int ratio;
+private final double expMax;
+private final long scaleFactor;
+private final double jitter;
+
+public GeometricProgression(long scaleFactor, int ratio, long termMax, 
double jitter) {
+this.scaleFactor = scaleFactor;
+this.ratio = ratio;
+this.jitter = jitter;
+this.expMax = termMax > scaleFactor ?
+Math.log(termMax / (double) Math.max(scaleFactor, 1)) / 
Math.log(ratio) : 0;
+}
+
+public long term(long n) {

Review comment:
   As we noticed in your earlier comments, the same value of `attempts` may 
correspond to different terms.
   
   connection_timeout = constant * 2 ^ (attempts)
   reconnect_backoff = constant * 2 ^ (attempts - 1)
   (in KIP-580) retry_backoff = constant * 2 ^ (attempts - 1) 
   
   So I think using `retries` or `attempts` instead of `n` might also confuse 
people. Shall we think of another naming? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructur

[GitHub] [kafka] hachikuji commented on pull request #8891: KAFKA-10143; Restore ability to change throttle of active reassignment

2020-06-18 Thread GitBox


hachikuji commented on pull request #8891:
URL: https://github.com/apache/kafka/pull/8891#issuecomment-646209534


   Had some discussion offline with @cmccabe . The intention in KIP-455 is to 
use --additional to resubmit the reassignment and change the quota. I found 
that this did not work as expected when I tried it, so let me try to modify 
this patch so that the new integration tests use this behavior. Possibly we 
don't need --alter-throttle, but the tests and improved documentation would 
still be useful.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8896: KAFKA-10185: Restoration info logging

2020-06-18 Thread GitBox


vvcephei commented on a change in pull request #8896:
URL: https://github.com/apache/kafka/pull/8896#discussion_r442397523



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##
@@ -415,19 +418,20 @@ public void restore() {
 // for restoring active and updating standby we may prefer 
different poll time
 // in order to make sure we call the main consumer#poll in 
time.
 // TODO: once we move ChangelogReader to a separate thread 
this may no longer be a concern
-polledRecords = 
restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? 
Duration.ZERO : pollTime);
+polledRecords = restoreConsumer.poll(state == 
ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);

Review comment:
   trivial cleanup

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##
@@ -415,19 +418,20 @@ public void restore() {
 // for restoring active and updating standby we may prefer 
different poll time
 // in order to make sure we call the main consumer#poll in 
time.
 // TODO: once we move ChangelogReader to a separate thread 
this may no longer be a concern
-polledRecords = 
restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? 
Duration.ZERO : pollTime);
+polledRecords = restoreConsumer.poll(state == 
ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);
 } catch (final InvalidOffsetException e) {
-log.warn("Encountered {} fetching records from restore 
consumer for partitions {}, it is likely that " +
+log.warn("Encountered " + e.getClass().getName() +
+" fetching records from restore consumer for partitions " 
+ e.partitions() + ", it is likely that " +
 "the consumer's position has fallen out of the topic 
partition offset range because the topic was " +
 "truncated or compacted on the broker, marking the 
corresponding tasks as corrupted and re-initializing" +
-" it later.", e.getClass().getName(), e.partitions());
+" it later.", e);

Review comment:
   Added the exception itself as the "cause" of the warning. The actual 
message of the IOE is actually pretty good at explaining the root cause.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##
@@ -446,6 +450,38 @@ public void restore() {
 }
 
 maybeUpdateLimitOffsetsForStandbyChangelogs();
+
+maybeLogRestorationProgress();

Review comment:
   This is the main change. Once every ten seconds, we will log the 
progress for each active restoring changelog.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##
@@ -415,19 +418,20 @@ public void restore() {
 // for restoring active and updating standby we may prefer 
different poll time
 // in order to make sure we call the main consumer#poll in 
time.
 // TODO: once we move ChangelogReader to a separate thread 
this may no longer be a concern
-polledRecords = 
restoreConsumer.poll(state.equals(ChangelogReaderState.STANDBY_UPDATING) ? 
Duration.ZERO : pollTime);
+polledRecords = restoreConsumer.poll(state == 
ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);
 } catch (final InvalidOffsetException e) {
-log.warn("Encountered {} fetching records from restore 
consumer for partitions {}, it is likely that " +
+log.warn("Encountered " + e.getClass().getName() +
+" fetching records from restore consumer for partitions " 
+ e.partitions() + ", it is likely that " +
 "the consumer's position has fallen out of the topic 
partition offset range because the topic was " +
 "truncated or compacted on the broker, marking the 
corresponding tasks as corrupted and re-initializing" +
-" it later.", e.getClass().getName(), e.partitions());
+" it later.", e);
 
 final Map> 
taskWithCorruptedChangelogs = new HashMap<>();
 for (final TopicPartition partition : e.partitions()) {
 final TaskId taskId = 
changelogs.get(partition).stateManager.taskId();
 taskWithCorruptedChangelogs.computeIfAbsent(taskId, k -> 
new HashSet<>()).add(partition);
 }
-throw new TaskCorruptedException(taskWithCorruptedChangelogs);
+throw new TaskCorruptedException(taskWithCorruptedChangelogs, 
e);

Review comment:
  

[GitHub] [kafka] d8tltanc commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-18 Thread GitBox


d8tltanc commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-646217062


   Thanks, @dajac for the comments. I've modified the PR per your suggestions. 
   @rajinisivaram Do you think we can start testing?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #8876: KAFKA-10167: use the admin client to read end-offset

2020-06-18 Thread GitBox


guozhangwang merged pull request #8876:
URL: https://github.com/apache/kafka/pull/8876


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8876: KAFKA-10167: use the admin client to read end-offset

2020-06-18 Thread GitBox


guozhangwang commented on pull request #8876:
URL: https://github.com/apache/kafka/pull/8876#issuecomment-646234217


   Cherry-pick to 2.6.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB

2020-06-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139916#comment-17139916
 ] 

Sophie Blee-Goldman commented on KAFKA-10005:
-

I've been thinking about this lately and I'm not quite convinced we should move 
restoration to a separate thread as well (only standbys). With KIP-441, the 
majority of restoration will actually be done as a standby task. Only the last 
(hopefully-trivial) tail end of the changelog will be restored as an active 
task. Is that worth the overhead of thread synchronization to hand off tasks 
between the restore thread(s) and the main one? I'm not sure

> Decouple RestoreListener from RestoreCallback and not enable bulk loading for 
> RocksDB
> -
>
> Key: KAFKA-10005
> URL: https://issues.apache.org/jira/browse/KAFKA-10005
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.6.0
>
>
> In Kafka Streams we have two restoration callbacks:
> * RestoreCallback (BatchingRestoreCallback): specified per-store via 
> registration to specify the logic of applying a batch of records read from 
> the changelog to the store. Used for both updating standby tasks and 
> restoring active tasks.
> * RestoreListener: specified per-instance via `setRestoreListener`, to 
> specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`.
> As we can see these two callbacks are for quite different purposes, however 
> today we allow user's to register a per-store RestoreCallback which is also 
> implementing the RestoreListener. Such weird mixing is actually motivated by 
> Streams internal usage to enable / disable bulk loading inside RocksDB. For 
> user's however this is less meaningful to specify a callback to be a listener 
> since the `onRestoreStart / End` has the storeName passed in, so that users 
> can just define different listening logic if needed for different stores.
> On the other hand, this mixing of two callbacks enforces Streams to check 
> internally if the passed in per-store callback is also implementing listener, 
> and if yes trigger their calls, which increases the complexity. Besides, 
> toggle rocksDB for bulk loading requires us to open / close / reopen / 
> reclose 4 times during the restoration which could also be costly.
> Given that we have KIP-441 in place, I think we should consider different 
> ways other than toggle bulk loading during restoration for Streams (e.g. 
> using different threads for restoration).
> The proposal for this ticket is to completely decouple the listener from 
> callback -- i.e. we would not presume users passing in a callback function 
> that implements both RestoreCallback and RestoreListener, and also for 
> RocksDB we replace the bulk loading mechanism with other ways of 
> optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB

2020-06-18 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139919#comment-17139919
 ] 

Guozhang Wang commented on KAFKA-10005:
---

I think even if we only move standbys to separate threads, there are still 
synchronization required for IQ and rebalance needs, but I think we need to get 
to a POC to have a clear idea how much thread synchronization overhead would be 
incurred. We can discuss about this further if we see the synchronization 
overhead with just standby and standby + active is very different.

> Decouple RestoreListener from RestoreCallback and not enable bulk loading for 
> RocksDB
> -
>
> Key: KAFKA-10005
> URL: https://issues.apache.org/jira/browse/KAFKA-10005
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.6.0
>
>
> In Kafka Streams we have two restoration callbacks:
> * RestoreCallback (BatchingRestoreCallback): specified per-store via 
> registration to specify the logic of applying a batch of records read from 
> the changelog to the store. Used for both updating standby tasks and 
> restoring active tasks.
> * RestoreListener: specified per-instance via `setRestoreListener`, to 
> specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`.
> As we can see these two callbacks are for quite different purposes, however 
> today we allow user's to register a per-store RestoreCallback which is also 
> implementing the RestoreListener. Such weird mixing is actually motivated by 
> Streams internal usage to enable / disable bulk loading inside RocksDB. For 
> user's however this is less meaningful to specify a callback to be a listener 
> since the `onRestoreStart / End` has the storeName passed in, so that users 
> can just define different listening logic if needed for different stores.
> On the other hand, this mixing of two callbacks enforces Streams to check 
> internally if the passed in per-store callback is also implementing listener, 
> and if yes trigger their calls, which increases the complexity. Besides, 
> toggle rocksDB for bulk loading requires us to open / close / reopen / 
> reclose 4 times during the restoration which could also be costly.
> Given that we have KIP-441 in place, I think we should consider different 
> ways other than toggle bulk loading during restoration for Streams (e.g. 
> using different threads for restoration).
> The proposal for this ticket is to completely decouple the listener from 
> callback -- i.e. we would not presume users passing in a callback function 
> that implements both RestoreCallback and RestoreListener, and also for 
> RocksDB we replace the bulk loading mechanism with other ways of 
> optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB

2020-06-18 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139533#comment-17139533
 ] 

Guozhang Wang edited comment on KAFKA-10005 at 6/18/20, 6:58 PM:
-

So just to have a quick summary, my proposal is primarily in three folds:

1) use {{db.addFileWithFileInfo(externalSstFileInfo)}} during restoration to 
add batch of records as SST files directly, this is to replace the impact of 
bulk loading.
2) move the restoration off the stream thread to a different thread (pool), for 
both restoring active tasks as well as updating standby tasks.
3) if needed, we also disable compaction during the restoration, and do a 
one-phase full compaction when we complete. I'm keeping it as "optional" for 
now since disabling compaction has both pros and cons, and if we have good 
performance from 1/2) alone then maybe we can afford to keep compaction enabled.

We already have an internal BulkLoadStore interface which e.g. RocksDBStore 
extends, we can leverage that interface to "toggle" restoration mode for 1) and 
3) above.

cc [~cadonna]


was (Author: guozhang):
So just to have a quick summary, my proposal is primarily in three folds:

1) use {{db.addFileWithFileInfo(externalSstFileInfo)}} during restoration to 
add batch of records as SST files directly, this is to replace the impact of 
bulk loading.
2) move the restoration off the stream thread to a different thread (pool), for 
both restoring active tasks as well as updating standby tasks.
3) if needed, we also disable compaction during the restoration, and do a 
one-phase full compaction when we complete.

We already have an internal BulkLoadStore interface which e.g. RocksDBStore 
extends, we can leverage that interface to "toggle" restoration mode for 1) and 
3) above.

cc [~cadonna]

> Decouple RestoreListener from RestoreCallback and not enable bulk loading for 
> RocksDB
> -
>
> Key: KAFKA-10005
> URL: https://issues.apache.org/jira/browse/KAFKA-10005
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.6.0
>
>
> In Kafka Streams we have two restoration callbacks:
> * RestoreCallback (BatchingRestoreCallback): specified per-store via 
> registration to specify the logic of applying a batch of records read from 
> the changelog to the store. Used for both updating standby tasks and 
> restoring active tasks.
> * RestoreListener: specified per-instance via `setRestoreListener`, to 
> specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`.
> As we can see these two callbacks are for quite different purposes, however 
> today we allow user's to register a per-store RestoreCallback which is also 
> implementing the RestoreListener. Such weird mixing is actually motivated by 
> Streams internal usage to enable / disable bulk loading inside RocksDB. For 
> user's however this is less meaningful to specify a callback to be a listener 
> since the `onRestoreStart / End` has the storeName passed in, so that users 
> can just define different listening logic if needed for different stores.
> On the other hand, this mixing of two callbacks enforces Streams to check 
> internally if the passed in per-store callback is also implementing listener, 
> and if yes trigger their calls, which increases the complexity. Besides, 
> toggle rocksDB for bulk loading requires us to open / close / reopen / 
> reclose 4 times during the restoration which could also be costly.
> Given that we have KIP-441 in place, I think we should consider different 
> ways other than toggle bulk loading during restoration for Streams (e.g. 
> using different threads for restoration).
> The proposal for this ticket is to completely decouple the listener from 
> callback -- i.e. we would not presume users passing in a callback function 
> that implements both RestoreCallback and RestoreListener, and also for 
> RocksDB we replace the bulk loading mechanism with other ways of 
> optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB

2020-06-18 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139925#comment-17139925
 ] 

Sophie Blee-Goldman commented on KAFKA-10005:
-

Yeah, I didn't mean to imply we'd be synchronization-free with only standbys in 
a separate thread. But at least we'd only need to sync at rebalances, whereas 
with restoration in a separate thread we'll need to continually check for 
newly-restored tasks that should be taken over by the main thread. Anyways I 
agree, a POC and maybe some benchmarking should have the last word

> Decouple RestoreListener from RestoreCallback and not enable bulk loading for 
> RocksDB
> -
>
> Key: KAFKA-10005
> URL: https://issues.apache.org/jira/browse/KAFKA-10005
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.6.0
>
>
> In Kafka Streams we have two restoration callbacks:
> * RestoreCallback (BatchingRestoreCallback): specified per-store via 
> registration to specify the logic of applying a batch of records read from 
> the changelog to the store. Used for both updating standby tasks and 
> restoring active tasks.
> * RestoreListener: specified per-instance via `setRestoreListener`, to 
> specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`.
> As we can see these two callbacks are for quite different purposes, however 
> today we allow user's to register a per-store RestoreCallback which is also 
> implementing the RestoreListener. Such weird mixing is actually motivated by 
> Streams internal usage to enable / disable bulk loading inside RocksDB. For 
> user's however this is less meaningful to specify a callback to be a listener 
> since the `onRestoreStart / End` has the storeName passed in, so that users 
> can just define different listening logic if needed for different stores.
> On the other hand, this mixing of two callbacks enforces Streams to check 
> internally if the passed in per-store callback is also implementing listener, 
> and if yes trigger their calls, which increases the complexity. Besides, 
> toggle rocksDB for bulk loading requires us to open / close / reopen / 
> reclose 4 times during the restoration which could also be costly.
> Given that we have KIP-441 in place, I think we should consider different 
> ways other than toggle bulk loading during restoration for Streams (e.g. 
> using different threads for restoration).
> The proposal for this ticket is to completely decouple the listener from 
> callback -- i.e. we would not presume users passing in a callback function 
> that implements both RestoreCallback and RestoreListener, and also for 
> RocksDB we replace the bulk loading mechanism with other ways of 
> optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-18 Thread Rohan Desai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139926#comment-17139926
 ] 

Rohan Desai commented on KAFKA-10179:
-

I'm not sure it's correct to use the same "topic" name for materializing 
optimized source tables, as it's logically different data. In the normal flow 
(not recovery), we're taking the topic data, validating/transforming it by 
deserializing it (which might apply some transforms like projecting just fields 
of interest), and then serializing it, and then writing it into the store. So 
the "topic" we pass to the serializer should be different since it represents 
different data from the source topic.

This has consequences in practice when used with a schema registry using the 
confluent serializers. If we use the same topic, `serialize` might register a 
different schema with the source subject, which we probably don't want.

I think the technically correct thing to do (though this is of course more 
expensive) would be (when the source table is optimized) to deserialize and 
serialize each record when restoring.

Another issue that I think exists (need to try to reproduce) that 
deserializing/serializing would solve is skipped validation. The source topic 
deserializer functions as a sort of validator for records from the source 
topic. When the streams app is configured to skip on deserialization errors, 
bad source records are just skipped. However if we restore by just writing 
those records to the state store, we now hit the deserialization error when 
reading the state store, which is a query-killing error.

 

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] skaundinya15 commented on a change in pull request #8894: KAFKA-9509: Add retries for mirrorClient consume records to fix flaky test

2020-06-18 Thread GitBox


skaundinya15 commented on a change in pull request #8894:
URL: https://github.com/apache/kafka/pull/8894#discussion_r442445728



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -207,23 +212,45 @@ public void close() {
 backup.stop();
 }
 
+// throw exception after 3 retries, and print expected error messages
+private void assertEqualsWithConsumeRetries(final String errorMsg,
+final int numRecordsProduces,
+final int timeout,
+final ClusterType clusterType,
+final String... topics) throws 
InterruptedException {
+int retries = 3;
+while (retries-- > 0) {
+try {
+int actualNum = clusterType == ClusterType.PRIMARY ?
+primary.kafka().consume(numRecordsProduces, timeout, 
topics).count() :
+backup.kafka().consume(numRecordsProduces, timeout, 
topics).count();
+if (numRecordsProduces == actualNum)
+return;
+} catch (Throwable e) {
+log.error("Could not find enough records with {} retries 
left", retries, e);
+}
+}
+throw new InterruptedException(errorMsg);
+}
+
 @Test
 public void testReplication() throws InterruptedException {
 MirrorClient primaryClient = new 
MirrorClient(mm2Config.clientConfig("primary"));
 MirrorClient backupClient = new 
MirrorClient(mm2Config.clientConfig("backup"));
 
-assertEquals("Records were not produced to primary cluster.", 
NUM_RECORDS_PRODUCED,

Review comment:
   I'd agree with @ryannedolan here. We could use the `waitForCondition` in 
`TestUtils.java` instead to wait for the condition necessary instead. More 
details on that is here: 
https://github.com/apache/kafka/blob/d8cc6fe8e36329c647736773d9d66de89c447409/clients/src/test/java/org/apache/kafka/test/TestUtils.java#L370-L371





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #8672: KAFKA-10002; Improve performances of StopReplicaRequest with large number of partitions to be deleted

2020-06-18 Thread GitBox


dajac commented on pull request #8672:
URL: https://github.com/apache/kafka/pull/8672#issuecomment-646271944


   @hachikuji I just rebased and fixed the build issue. Could you re-trigger 
jenkins please?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10185) Streams should log summarized restoration information at info level

2020-06-18 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139955#comment-17139955
 ] 

Boyang Chen commented on KAFKA-10185:
-

Could we add the context to this ticket?

> Streams should log summarized restoration information at info level
> ---
>
> Key: KAFKA-10185
> URL: https://issues.apache.org/jira/browse/KAFKA-10185
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas

2020-06-18 Thread Mateusz Jadczyk (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139963#comment-17139963
 ] 

Mateusz Jadczyk commented on KAFKA-9891:


LGTM thanks for looking into it and making sure that 2.5.1/2.6 should be safe 
to use

> Invalid state store content after task migration with exactly_once and 
> standby replicas
> ---
>
> Key: KAFKA-9891
> URL: https://issues.apache.org/jira/browse/KAFKA-9891
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.1, 2.5.0, 2.4.1
>Reporter: Mateusz Jadczyk
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
> Attachments: failedtest, failedtest2, failedtest3, failedtest3_bug, 
> state_store_operations.txt, tasks_assignment.txt
>
>
> We have a simple command id deduplication mechanism (very similar to the one 
> from Kafka Streams examples) based on Kafka Streams State Stores. It stores 
> command ids from the past hour in _persistentWindowStore_. We encountered a 
> problem with the store if there's an exception thrown later in that topology.
>  We run 3 nodes using docker, each with multiple threads set for this 
> particular Streams Application.
> The business flow is as follows (performed within a single subtopology):
>  *  a valid command is sent with command id 
> (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active 
> task 1_2. First node in the topology analyses if this is a duplicate by 
> checking in the state store (_COMMAND_ID_STORE_), if not puts the command id 
> in the state store and processes the command properly.
>  * an invalid command is sent with the same key but new command id 
> (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the 
> duplicated command id is performed, it's not a duplicate, command id is put 
> into the state store. Next node in the topology throws an exception which 
> causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, 
> offsets are not committed. I double checked for the changelog topic - 
> relevant messages are not committed. Therefore, the changelog topic contains 
> only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and 
> not the one which caused a failure.
>  * in the meantime a standby task 1_2 running on NODE 3 replicated 
> _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local 
> _COMMAND_ID_STORE_
>  * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. 
> It checks if this command id is a duplicate - no, it isn't - tries to process 
> the faulty command and throws an exception. Again, transaction aborted, all 
> looks fine.
>  * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, 
> *it is a duplicate!* Even though the transaction has been aborted and the 
> changelog doesn't contain this command id: 
> _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._
>  
> After digging into the Streams logs and some discussion on ([Stack 
> Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan])
>  we concluded it has something to do with checkpoint files. Here are the 
> detailed logs relevant to checkpoint files.
>  
> {code:java}
> NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> Processor-COMMAND_ID_STORE-changelog at checkpoint null
> NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
> standby-task [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/Processor/1_2/.checkpoint
> NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/

[jira] [Assigned] (KAFKA-10160) Kafka MM2 consumer configuration

2020-06-18 Thread sats (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sats reassigned KAFKA-10160:


Assignee: sats

> Kafka MM2 consumer configuration
> 
>
> Key: KAFKA-10160
> URL: https://issues.apache.org/jira/browse/KAFKA-10160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.0, 2.4.1
>Reporter: Pavol Ipoth
>Assignee: sats
>Priority: Major
>  Labels: configuration, kafka, mirror-maker
>
> [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L51,]
>  according this producer/consumer level properties should be configured as 
> e.g. somesource->sometarget.consumer.client.id, i try to set 
> somesource->sometarget.consumer.auto.offset.reset=latest, but without 
> success, consumer always tries to fetch earliest, not sure if bug or my 
> misconfiguration, but then at least some update to docu would be useful



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

2020-06-18 Thread GitBox


vvcephei commented on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-646303241


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

2020-06-18 Thread GitBox


vvcephei commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r442497609



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -148,27 +150,35 @@
 private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
 private final TopicPartition t3p3 = new TopicPartition("topic3", 3);
 
-private final List infos = asList(
-new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
-);
-
-private final SubscriptionInfo defaultSubscriptionInfo = getInfo(UUID_1, 
EMPTY_TASKS, EMPTY_TASKS);
+private final List partitionInfos = getPartitionInfos(3, 3);
+{
+partitionInfos.add(new PartitionInfo("topic3", 3, Node.noNode(), new 
Node[0], new Node[0]));
+}
 
 private final Cluster metadata = new Cluster(
 "cluster",
 Collections.singletonList(Node.noNode()),
-infos,
+partitionInfos,
 emptySet(),
-emptySet());
+emptySet()
+);
+
+/* Used by the scale test for large apps/clusters */
+private static final int NUM_TOPICS_XL = 10;
+private static final int NUM_PARTITIONS_PER_TOPIC_XL = 1_000;
+private static final int NUM_CONSUMERS_XL = 100;
+private static final List TOPICS_LIST_XL = new ArrayList<>();
+private static final Map CHANGELOG_END_OFFSETS_XL = 
new HashMap<>();
+private static final List PARTITION_INFOS_XL = 
getPartitionInfos(NUM_TOPICS_XL, NUM_PARTITIONS_PER_TOPIC_XL);
+private static final Cluster CLUSTER_METADATA_XL = new Cluster(
+"cluster",
+Collections.singletonList(Node.noNode()),
+PARTITION_INFOS_XL,
+emptySet(),
+emptySet()
+);

Review comment:
   If there's a whole set of constants only used by one test, one might 
wonder whether that test shouldn't just be in its own class...

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -148,27 +150,35 @@
 private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
 private final TopicPartition t3p3 = new TopicPartition("topic3", 3);
 
-private final List infos = asList(
-new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new 
Node[0]),
-new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
-);
-
-private final SubscriptionInfo defaultSubscriptionInfo = getInfo(UUID_1, 
EMPTY_TASKS, EMPTY_TASKS);
+private final List partitionInfos = getPartitionInfos(3, 3);
+{
+partitionInfos.add(new PartitionInfo("topic3", 3, Node.noNode(), new 
Node[0], new Node[0]));
+}

Review comment:
   Can you just pass this as an argument to `getPartitionInfos` so that we 
can do all the initialization in the assignment instead of needing an 
initialization block? The fact that this field is used in another field 
initialization statement makes the initialization block kind of questionable, 
since you have to read the JVM spec to know if this block executes before or 
after the usage.
   
   Alternatively, maybe the prior code was actually better, because you can see 
exactly what data you're testing with, instead of having to go read another 
method to understand what `getPartitioninfos(3, 3)` might mean.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and

[GitHub] [kafka] ConcurrencyPractitioner commented on pull request #8881: KIP-557: Add emit on change support to Kafka Streams

2020-06-18 Thread GitBox


ConcurrencyPractitioner commented on pull request #8881:
URL: https://github.com/apache/kafka/pull/8881#issuecomment-646304787


   @vvcephei Think you have time to look at this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #8897: MINOR; Use the automated protocol for the Consumer Protocol's subscriptions and assignments

2020-06-18 Thread GitBox


dajac opened a new pull request #8897:
URL: https://github.com/apache/kafka/pull/8897


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vinothchandar opened a new pull request #8898: KAFKA-10138: Prefer --bootstrap-server for reassign_partitions command in ducktape tests

2020-06-18 Thread GitBox


vinothchandar opened a new pull request #8898:
URL: https://github.com/apache/kafka/pull/8898


   
   
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-06-17--001.1592453352--vinothchandar--KC342-ducktape--e64cc463b/report.html
   
   Both ThrottlingTest and ReassignPartitionsTest, which invokes these methods 
pass locally and twice in CI. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8896: KAFKA-10185: Restoration info logging

2020-06-18 Thread GitBox


vvcephei commented on a change in pull request #8896:
URL: https://github.com/apache/kafka/pull/8896#discussion_r442529975



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##
@@ -496,8 +539,9 @@ private void bufferChangelogRecords(final ChangelogMetadata 
changelogMetadata, f
 } else {
 changelogMetadata.bufferedRecords.add(record);
 final long offset = record.offset();
-if (changelogMetadata.restoreEndOffset == null || offset < 
changelogMetadata.restoreEndOffset)
+if (changelogMetadata.restoreEndOffset == null || offset < 
changelogMetadata.restoreEndOffset) {
 changelogMetadata.bufferedLimitIndex = 
changelogMetadata.bufferedRecords.size();
+}

Review comment:
   I've rolled back a bunch of accidental formatting changes, but left the 
ones that are actually code style compliance issues (like using brackets around 
conditional bodies).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #8891: KAFKA-10143; Restore ability to change throttle of active reassignment

2020-06-18 Thread GitBox


cmccabe commented on pull request #8891:
URL: https://github.com/apache/kafka/pull/8891#issuecomment-646334265


   The refactors to the tool generally look good.  I thought we always allowed 
throttles to be set to 0 for some reason, though?  Looks like this change 
removes that ability which we probably don't want... although I doubt many 
people are using it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #8891: KAFKA-10143; Restore ability to change throttle of active reassignment

2020-06-18 Thread GitBox


cmccabe commented on a change in pull request #8891:
URL: https://github.com/apache/kafka/pull/8891#discussion_r442533488



##
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##
@@ -1685,10 +1702,9 @@ object ReassignPartitionsCommand extends Logging {
   opts.cancelOpt -> collection.immutable.Seq(
 opts.reassignmentJsonFileOpt
   ),
-  opts.listOpt -> collection.immutable.Seq(
-  )
+  opts.listOpt -> collection.immutable.Seq.empty

Review comment:
   seems to break the symmetry a bit, doesn't it?  Although, I don't feel 
strongly about this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8896: KAFKA-10185: Restoration info logging

2020-06-18 Thread GitBox


vvcephei commented on a change in pull request #8896:
URL: https://github.com/apache/kafka/pull/8896#discussion_r442531300



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
##
@@ -223,6 +227,7 @@ public void 
shouldInitializeChangelogAndCheckForCompletion() {
 @Test
 public void shouldPollWithRightTimeout() {
 
EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
+
EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 
5L));

Review comment:
   This is moderately obnoxious... The addition of logging these values 
means that these tests will get a NullPointerException unless we mock this 
call, but the mock is irrelevant to the test outcome.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10169) KafkaException: Failing batch since transaction was aborted

2020-06-18 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-10169:
---

Assignee: Sophie Blee-Goldman

> KafkaException: Failing batch since transaction was aborted
> ---
>
> Key: KAFKA-10169
> URL: https://issues.apache.org/jira/browse/KAFKA-10169
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.6.0
>
>
> We've seen the following exception in our eos-beta test application recently:
> {code:java}
> [2020-06-13T00:09:14-07:00] 
> (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) 
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic 
> stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-25-changelog for task 
> 1_2 due to: [2020-06-13T00:09:14-07:00] 
> (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) 
> org.apache.kafka.common.KafkaException: Failing batch since transaction was 
> aborted [2020-06-13T00:09:14-07:00] 
> (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) 
> Exception handler choose to FAIL the processing, no more records would be 
> sent. at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:213)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:185)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1347)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>  at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748) [2020-06-13T00:09:14-07:00] 
> (streams-soak-2-6-all-fixes-eos-beta_soak_i-0ae30dd12c4fb7018_streamslog) 
> Caused by: org.apache.kafka.common.KafkaException: Failing batch since 
> transaction was aborted at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423)
>  ... 3 more
> {code}
> Somewhat unclear if this is an issue with eos-beta specifically, or just eos 
> in general. But several threads have died over the course of a few days in 
> the eos-beta application, while none so far have died on the eos-alpha 
> application.
> It's also unclear (at least to me) whether this is definitely an issue in 
> Streams or possibly a bug in the producer (or even the broker, although that 
> seems unlikely)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception

2020-06-18 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10186:
---

 Summary: Aborting transaction with pending data should throw 
non-fatal exception
 Key: KAFKA-10186
 URL: https://issues.apache.org/jira/browse/KAFKA-10186
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Reporter: Sophie Blee-Goldman


Currently if you try to abort a transaction with any pending (non-flushed) 
data, the send exception is set to
{code:java}
 KafkaException("Failing batch since transaction was aborted"){code}
This exception type is generally considered fatal, but this is a valid state to 
be in -- the point of throwing the exception is to alert that the records will 
not be sent, not that you are in an unrecoverable error state.

We should throw a different (possibly new) type of exception here to 
distinguish from fatal and recoverable errors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #8891: KAFKA-10143; Restore ability to change throttle of active reassignment

2020-06-18 Thread GitBox


cmccabe commented on a change in pull request #8891:
URL: https://github.com/apache/kafka/pull/8891#discussion_r442535630



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -647,8 +647,14 @@ class KafkaController(val config: KafkaConfig,
 info(s"Skipping reassignment of $tp since the topic is currently being 
deleted")
 new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does 
not exist.")
   } else {
-val assignedReplicas = controllerContext.partitionReplicaAssignment(tp)
-if (assignedReplicas.nonEmpty) {
+val assignment = controllerContext.partitionFullReplicaAssignment(tp)
+if (assignment == ReplicaAssignment.empty) {
+  new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does 
not exist.")
+} else if (assignment == reassignment) {

Review comment:
   It seems like the thing to agree on is the final state, right?  This 
comparison is taking into account the current replica set which may change over 
the course of the reassignment...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception

2020-06-18 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10186:

Labels: needs-kip newbie newbie++  (was: needs-kip)

> Aborting transaction with pending data should throw non-fatal exception
> ---
>
> Key: KAFKA-10186
> URL: https://issues.apache.org/jira/browse/KAFKA-10186
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> Currently if you try to abort a transaction with any pending (non-flushed) 
> data, the send exception is set to
> {code:java}
>  KafkaException("Failing batch since transaction was aborted"){code}
> This exception type is generally considered fatal, but this is a valid state 
> to be in -- the point of throwing the exception is to alert that the records 
> will not be sent, not that you are in an unrecoverable error state.
> We should throw a different (possibly new) type of exception here to 
> distinguish from fatal and recoverable errors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

2020-06-18 Thread GitBox


mumrah commented on pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#issuecomment-646338829


   @hachikuji yea that's the check I was referring to (where we disregard the 
fetch response, errors included). Do you think any of the errors we handle 
besides OOOR are worth handling in the case that we're no longer in the 
FETCHING state? Like maybe one of the errors that triggers a metadata update?
   
   However, that might be adding complexity for little gain. I'm fine with it 
either way.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #8899: MINOR: Gate test coverage plugin behind Gradle property

2020-06-18 Thread GitBox


ijuma opened a new pull request #8899:
URL: https://github.com/apache/kafka/pull/8899


   Most builds don't require test coverage output, so it's wasteful
   to spend cycles tracking coverage information for each method
   invoked.
   
   I ran a quick test in a fast desktop machine, the absolute
   difference will be larger in a slower machine. The tests were
   executed after `./gradlew clean` and with a gradlew daemon
   that was started just before the test (and mildly warmed up
   with `./gradlew clean` again).
   
   `./gradlew unitTest --continue --profile`:
   * With coverage enabled: 6m32s
   * With coverage disabled: 5m47s
   
   I ran the same test twice and the results were within 2s of
   each other, so reasonably consistent.
   
   16% reduction in the time taken to run the unit tests is a
   reasonable gain with little downside, so I think this is a
   good change.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

2020-06-18 Thread GitBox


hachikuji commented on pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#issuecomment-646348733


   @mumrah Hmm, I think I like the current approach of discarding the response 
if we're no longer in the same state in which the fetch state was sent. Mainly 
because it's simple. Arguably we could do something more refined. For example, 
a topic authorization error is still going to be relevant even if the partition 
is being reset. However, since we're talking about rare cases, it doesn't seem 
too worthwhile to try and optimize; worst case, we'll send the request again 
and get the same error.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10167) Streams EOS-Beta should not try to get end-offsets as read-committed

2020-06-18 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-10167.
---
Resolution: Fixed

> Streams EOS-Beta should not try to get end-offsets as read-committed
> 
>
> Key: KAFKA-10167
> URL: https://issues.apache.org/jira/browse/KAFKA-10167
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0
>
>
> This is a bug discovered with the new EOS protocol (KIP-447), here's the 
> context:
> In Streams when we are assigned with the new active tasks, we would first try 
> to restore the state from the changelog topic all the way to the log end 
> offset, and then we can transit from the `restoring` to the `running` state 
> to start processing the task.
> Before KIP-447, the end-offset call is only triggered after we've passed the 
> synchronization barrier at the txn-coordinator which would guarantee that the 
> txn-marker has been sent and received (otherwise we would error with 
> CONCURRENT_TRANSACTIONS and let the producer retry), and when the txn-marker 
> is received, it also means that the marker has been fully replicated, which 
> in turn guarantees that the data written before that marker has been fully 
> replicated. As a result, when we send the list-offset with `read-committed` 
> flag we are guaranteed that the returned offset == LSO == high-watermark.
> After KIP-447 however, we do not fence on the txn-coordinator but on 
> group-coordinator upon offset-fetch, and the group-coordinator would return 
> the fetching offset right after it has received the replicated the txn-marker 
> sent to it. However, since the txn-marker are sent to different brokers in 
> parallel, and even within the same broker markers of different partitions are 
> appended / replicated independently as well, so when the fetch-offset request 
> returns it is NOT guaranteed that the LSO on other data partitions would have 
> been advanced as well. And hence in that case the `endOffset` call may 
> returned a smaller offset, causing data loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman opened a new pull request #8900: KAFKA-10169: swallow non-fatal KafkaException and don't abort transaction during clean close

2020-06-18 Thread GitBox


ableegoldman opened a new pull request #8900:
URL: https://github.com/apache/kafka/pull/8900


   If there's any pending data and we haven't flushed the producer when we 
abort a transaction, a KafkaException is returned for the previous `send`. This 
is a bit misleading, since the situation is not an unrecoverable error and so 
the Kafka Exception is really non-fatal. For now, we should just catch and 
swallow this in the RecordCollector (see also: 
[KAFKA-10169](https://issues.apache.org/jira/browse/KAFKA-10186))
   
   The reason we ended up aborting an un-flushed transaction was due to the 
combination of
   a. always aborting the ongoing transaction when any task is closed/revoked
   b. only committing (and flushing) if at least one of the revoked tasks needs 
to be committed
   
   Given the above, we can end up with an ongoing transaction that isn't 
committed since none of the revoked tasks have any data in the transaction. We 
then abort the transaction anyway, when those tasks are closed. So in addition 
to the above (swallowing this exception), we should avoid unnecessarily 
aborting data for tasks that haven't been revoked.
   
   We can handle this by splitting the RecordCollector's `close` into a dirty 
and clean flavor: if dirty, we need to abort the transaction since it may be 
dirty due to the commit attempt failing. But if clean, we can skip aborting the 
transaction since we know that either we just committed and thus there is no 
ongoing transaction to abort, or else the transaction in flight contains no 
data from the tasks being closed



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vinothchandar commented on pull request #8898: KAFKA-10138: Prefer --bootstrap-server for reassign_partitions command in ducktape tests

2020-06-18 Thread GitBox


vinothchandar commented on pull request #8898:
URL: https://github.com/apache/kafka/pull/8898#issuecomment-646360446


   @cmccabe Please take a pass when you can.. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

2020-06-18 Thread GitBox


mumrah commented on pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#issuecomment-646361035


   Sounds good to me



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8871: MINOR: code cleanup for inconsistent naming

2020-06-18 Thread GitBox


mjsax commented on pull request #8871:
URL: https://github.com/apache/kafka/pull/8871#issuecomment-646365697


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

2020-06-18 Thread GitBox


mjsax commented on pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#issuecomment-646367241


   @sneakyburro Any update on this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception

2020-06-18 Thread Arun R (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140069#comment-17140069
 ] 

Arun R commented on KAFKA-10186:


I would love to take a look if no one else is looking at it.

> Aborting transaction with pending data should throw non-fatal exception
> ---
>
> Key: KAFKA-10186
> URL: https://issues.apache.org/jira/browse/KAFKA-10186
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> Currently if you try to abort a transaction with any pending (non-flushed) 
> data, the send exception is set to
> {code:java}
>  KafkaException("Failing batch since transaction was aborted"){code}
> This exception type is generally considered fatal, but this is a valid state 
> to be in -- the point of throwing the exception is to alert that the records 
> will not be sent, not that you are in an unrecoverable error state.
> We should throw a different (possibly new) type of exception here to 
> distinguish from fatal and recoverable errors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages

2020-06-18 Thread GitBox


hachikuji commented on pull request #8850:
URL: https://github.com/apache/kafka/pull/8850#issuecomment-646368944


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages

2020-06-18 Thread GitBox


hachikuji commented on pull request #8850:
URL: https://github.com/apache/kafka/pull/8850#issuecomment-646369171







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10114) Kafka producer stuck after broker crash

2020-06-18 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140073#comment-17140073
 ] 

Jason Gustafson commented on KAFKA-10114:
-

[~ibinyami] Hmm, this is still not very clear to me.

> I think records were not timed out because the sequence that times them out 
> happens after you have a producer id. Network thread is stuck on 
> maybeWaitForProducerId(called from Sender.java:306) while the relevant 
> failBatch invocation is only called from sendProducerData() which is executed 
> after maybeWaitForProducerId (called from Sender.java:334)

The logic in `awaitNodeReady` is not blocked on batch completion, but request 
completion. We do not need `failBatch` in order to complete a pending request 
and free up room for additional requests. If you can reproduce this, it would 
be very helpful to see TRACE level logging from the producer.

> Kafka producer stuck after broker crash
> ---
>
> Key: KAFKA-10114
> URL: https://issues.apache.org/jira/browse/KAFKA-10114
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.3.1, 2.4.1
>Reporter: Itamar Benjamin
>Priority: Critical
>
> Today two of our kafka brokers crashed (cluster of 3 brokers), and producers 
> were not able to send new messages. After brokers started again all producers 
> resumed sending data except for a single one.
> at the beginning producer rejected all new messages with TimeoutException:
>  
> {code:java}
>  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> incoming-mutable-RuntimeIIL-1:12 ms has passed since batch creation
> {code}
>  
> then after sometime exception changed to
>  
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory 
> within the configured max blocking time 6 ms.
> {code}
>  
>  
> jstack shows kafka-producer-network-thread is waiting to get producer id:
>  
> {code:java}
> "kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 
> cpu=63594017.16ms elapsed=1511219.38s tid=0x7fffd8353000 nid=0x4fa4 
> sleeping [0x7ff55c177000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
> at java.lang.Thread.sleep(java.base@11.0.1/Native Method)
> at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296)
> at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41)
> at 
> org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565)
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
> at java.lang.Thread.run(java.base@11.0.1/Thread.java:834)   Locked 
> ownable synchronizers:
> - None
> {code}
>  
> digging into maybeWaitForProducerId(), it waits until some broker is ready 
> (awaitNodeReady function) which in return calls leastLoadedNode() on 
> NetworkClient. This one iterates over all brokers and checks if a request can 
> be sent to it using canSendRequest().
> This is the code for canSendRequest():
>  
> {code:java}
> return connectionStates.isReady(node, now) && selector.isChannelReady(node) 
> && inFlightRequests.canSendMore(node)
> {code}
>  
>  
> using some debugging tools i saw this expression always evaluates to false 
> since the last part (canSendMore) is false. 
>  
> This is the code for canSendMore:
> {code:java}
> public boolean canSendMore(String node) { 
> Deque queue = requests.get(node); return queue 
> == null || queue.isEmpty() || (queue.peekFirst().send.completed() && 
> queue.size() < this.maxInFlightRequestsPerConnection); }
> {code}
>  
>  
> i verified 
> {code:java}
> queue.peekFirst().send.completed()
> {code}
> is true, and that leads to the live lock - since requests queues are full for 
> all nodes a new request to check broker availability and reconnect to it 
> cannot be submitted.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140075#comment-17140075
 ] 

Matthias J. Sax commented on KAFKA-10179:
-

{quote}I'm not sure it's correct to use the same "topic" name for materializing 
optimized source tables, as it's logically different data. In the normal flow 
(not recovery), we're taking the topic data, validating/transforming it by 
deserializing it (which might apply some transforms like projecting just fields 
of interest), and then serializing it, and then writing it into the store. So 
the "topic" we pass to the serializer should be different since it represents 
different data from the source topic.

For this case, the soure-topic-changelog optimization does no apply, and the 
store would always have its own changelog topic. And thus, the input-topic 
schema registered in the SR should not be "touched", and the write to the 
changelog topic should register a new scheme using the changelog topic name. 
Thus, no naming issue in SR should happen.
{quote}
The source-topic-changelog optimization really only applies, if the data in the 
input topic is exactly the same as in the changelog topic and thus, we avoid 
creating the changelog topic. To ensure this, we don't allow any processing to 
happen in between. The data would be deserialized and re-serialized using the 
same Serde (this is inefficiency we pay, as we also need to send the 
de-serialized data downstream for further processing).
{quote}Another issue that I think exists (need to try to reproduce) that 
deserializing/serializing would solve is skipped validation. The source topic 
deserializer functions as a sort of validator for records from the source 
topic. When the streams app is configured to skip on deserialization errors, 
bad source records are just skipped. However if we restore by just writing 
those records to the state store, we now hit the deserialization error when 
reading the state store, which is a query-killing error.
{quote}
This is a know issue and tracked via: 
https://issues.apache.org/jira/browse/KAFKA-8037

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140075#comment-17140075
 ] 

Matthias J. Sax edited comment on KAFKA-10179 at 6/19/20, 12:42 AM:


{quote}I'm not sure it's correct to use the same "topic" name for materializing 
optimized source tables, as it's logically different data. In the normal flow 
(not recovery), we're taking the topic data, validating/transforming it by 
deserializing it (which might apply some transforms like projecting just fields 
of interest), and then serializing it, and then writing it into the store. So 
the "topic" we pass to the serializer should be different since it represents 
different data from the source topic.

For this case, the soure-topic-changelog optimization does no apply, and the 
store would always have its own changelog topic. And thus, the input-topic 
schema registered in the SR should not be "touched", and the write to the 
changelog topic should register a new scheme using the changelog topic name. 
Thus, no naming issue in SR should happen.
{quote}
The source-topic-changelog optimization really only applies, if the data in the 
input topic is exactly the same as in the changelog topic and thus, we avoid 
creating the changelog topic. To ensure this, we don't allow any processing to 
happen in between. The data would be deserialized and re-serialized using the 
same Serde (this is inefficiency we pay, as we also need to send the 
de-serialized data downstream for further processing).
{quote}Another issue that I think exists (need to try to reproduce) that 
deserializing/serializing would solve is skipped validation. The source topic 
deserializer functions as a sort of validator for records from the source 
topic. When the streams app is configured to skip on deserialization errors, 
bad source records are just skipped. However if we restore by just writing 
those records to the state store, we now hit the deserialization error when 
reading the state store, which is a query-killing error.
{quote}
This is a known issue and tracked via: 
https://issues.apache.org/jira/browse/KAFKA-8037


was (Author: mjsax):
{quote}I'm not sure it's correct to use the same "topic" name for materializing 
optimized source tables, as it's logically different data. In the normal flow 
(not recovery), we're taking the topic data, validating/transforming it by 
deserializing it (which might apply some transforms like projecting just fields 
of interest), and then serializing it, and then writing it into the store. So 
the "topic" we pass to the serializer should be different since it represents 
different data from the source topic.

For this case, the soure-topic-changelog optimization does no apply, and the 
store would always have its own changelog topic. And thus, the input-topic 
schema registered in the SR should not be "touched", and the write to the 
changelog topic should register a new scheme using the changelog topic name. 
Thus, no naming issue in SR should happen.
{quote}
The source-topic-changelog optimization really only applies, if the data in the 
input topic is exactly the same as in the changelog topic and thus, we avoid 
creating the changelog topic. To ensure this, we don't allow any processing to 
happen in between. The data would be deserialized and re-serialized using the 
same Serde (this is inefficiency we pay, as we also need to send the 
de-serialized data downstream for further processing).
{quote}Another issue that I think exists (need to try to reproduce) that 
deserializing/serializing would solve is skipped validation. The source topic 
deserializer functions as a sort of validator for records from the source 
topic. When the streams app is configured to skip on deserialization errors, 
bad source records are just skipped. However if we restore by just writing 
those records to the state store, we now hit the deserialization error when 
reading the state store, which is a query-killing error.
{quote}
This is a know issue and tracked via: 
https://issues.apache.org/jira/browse/KAFKA-8037

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most se

[GitHub] [kafka] mjsax commented on a change in pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…

2020-06-18 Thread GitBox


mjsax commented on a change in pull request #8887:
URL: https://github.com/apache/kafka/pull/8887#discussion_r442574113



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -741,29 +741,23 @@ void shutdown(final boolean clean) {
 
 for (final Task task : tasks.values()) {
 if (task.isActive()) {
-try {
-
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
-} catch (final RuntimeException e) {
-if (clean) {
-firstException.compareAndSet(null, e);
-} else {
-log.warn("Ignoring an exception while closing task " + 
task.id() + " producer.", e);
-}
-}
+executeAndMaybeSwallow(
+clean,
+() -> 
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()),
+e -> firstException.compareAndSet(null, e),
+e -> log.warn("Ignoring an exception while closing task " 
+ task.id() + " producer.", e)
+);
 }
 }
 
 tasks.clear();
 
-try {
-activeTaskCreator.closeThreadProducerIfNeeded();
-} catch (final RuntimeException e) {
-if (clean) {
-firstException.compareAndSet(null, e);
-} else {
-log.warn("Ignoring an exception while closing thread 
producer.", e);
-}
-}
+executeAndMaybeSwallow(
+clean,
+() -> activeTaskCreator.closeThreadProducerIfNeeded(),

Review comment:
   This is a lambda. Do you mean method reference? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8887: KAFKA-10135: Extract Task#executeAndMaybeSwallow to be a general utility function into TaskManager…

2020-06-18 Thread GitBox


mjsax commented on a change in pull request #8887:
URL: https://github.com/apache/kafka/pull/8887#discussion_r442574574



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -1048,4 +1042,28 @@ public String toString(final String indent) {
 Set lockedTaskDirectories() {
 return Collections.unmodifiableSet(lockedTaskDirectories);
 }
+
+public static void executeAndMaybeSwallow(final boolean clean,
+  final Runnable runnable,
+  final 
java.util.function.Consumer actionIfClean,
+  final 
java.util.function.Consumer actionIfNotClean) {

Review comment:
   I am wondering if adding this method to `TaskManager` is the best choice 
(cf https://issues.apache.org/jira/browse/KAFKA-10055). Thoughts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8886: KAFKA-9891: fix corrupted StandbyTask state

2020-06-18 Thread GitBox


mjsax commented on pull request #8886:
URL: https://github.com/apache/kafka/pull/8886#issuecomment-646376402


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8886: KAFKA-9891: fix corrupted StandbyTask state

2020-06-18 Thread GitBox


mjsax commented on pull request #8886:
URL: https://github.com/apache/kafka/pull/8886#issuecomment-646376357


   Java 8 passed.
   Java 11:
   ```
   
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Issue Comment Deleted] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception

2020-06-18 Thread Arun R (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arun R updated KAFKA-10186:
---
Comment: was deleted

(was: I would love to take a look if no one else is looking at it.)

> Aborting transaction with pending data should throw non-fatal exception
> ---
>
> Key: KAFKA-10186
> URL: https://issues.apache.org/jira/browse/KAFKA-10186
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> Currently if you try to abort a transaction with any pending (non-flushed) 
> data, the send exception is set to
> {code:java}
>  KafkaException("Failing batch since transaction was aborted"){code}
> This exception type is generally considered fatal, but this is a valid state 
> to be in -- the point of throwing the exception is to alert that the records 
> will not be sent, not that you are in an unrecoverable error state.
> We should throw a different (possibly new) type of exception here to 
> distinguish from fatal and recoverable errors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8266) Improve `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`

2020-06-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140082#comment-17140082
 ] 

Matthias J. Sax commented on KAFKA-8266:


The other ticket is resolved, but I just saw another failure for this test: 
[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7034/testReport/junit/kafka.api/ConsumerBounceTest/testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup/]
{quote}org.scalatest.exceptions.TestFailedException: The remaining consumers in 
the group could not fetch the expected records at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at 
org.scalatest.Assertions.fail(Assertions.scala:1091) at 
org.scalatest.Assertions.fail$(Assertions.scala:1087) at 
org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:329){quote}
\cc [~dajac]

> Improve 
> `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`
> 
>
> Key: KAFKA-8266
> URL: https://issues.apache.org/jira/browse/KAFKA-8266
> Project: Kafka
>  Issue Type: Test
>Reporter: Jason Gustafson
>Priority: Major
>
> Some additional validation could be done after the member gets kicked out. 
> The main thing is showing that the group can continue to consume data and 
> commit offsets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

2020-06-18 Thread GitBox


hachikuji merged pull request #8822:
URL: https://github.com/apache/kafka/pull/8822


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10113) LogTruncationException sets fetch offsets incorrectly

2020-06-18 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10113.
-
Fix Version/s: 2.6.0
   Resolution: Fixed

> LogTruncationException sets fetch offsets incorrectly
> -
>
> Key: KAFKA-10113
> URL: https://issues.apache.org/jira/browse/KAFKA-10113
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.6.0
>
>
> LogTruncationException, which extends OffsetOutOfRangeException, takes the 
> divergent offsets in the constructor. These are the first offsets known to 
> diverge from what the consumer read. These are then passed to the 
> OffsetOutOfRangeException incorrectly as the out of range fetch offsets. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10113) LogTruncationException sets fetch offsets incorrectly

2020-06-18 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-10113:

Affects Version/s: 2.5.0
   2.4.1

> LogTruncationException sets fetch offsets incorrectly
> -
>
> Key: KAFKA-10113
> URL: https://issues.apache.org/jira/browse/KAFKA-10113
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.0, 2.4.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.6.0
>
>
> LogTruncationException, which extends OffsetOutOfRangeException, takes the 
> divergent offsets in the constructor. These are the first offsets known to 
> diverge from what the consumer read. These are then passed to the 
> OffsetOutOfRangeException incorrectly as the out of range fetch offsets. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2020-06-18 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140088#comment-17140088
 ] 

John Roesler commented on KAFKA-10184:
--

What in the world... How can we not have processed even 500 records in two 
minutes?

I agree waiting for start up first would probably help. Do we have any logs 
that could confirm the hypothesis that the startup phase is eating up a bunch 
of our timeout?

We should probably decrease the size of the records down from a whopping 1kB. 
My intent was to bridge the integration and system test worlds by creating 
“realistic” data here, but maybe that was expecting too much of the CIT 
infrastructure. 

> Flaky 
> HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
> --
>
> Key: KAFKA-10184
> URL: https://issues.apache.org/jira/browse/KAFKA-10184
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Minor
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 12. Input 
> records haven't all been written to the changelog: 442
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43

  1   2   >