[GitHub] [kafka] tombentley commented on a change in pull request #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy

2020-04-22 Thread GitBox


tombentley commented on a change in pull request #8417:
URL: https://github.com/apache/kafka/pull/8417#discussion_r412754269



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
##
@@ -57,6 +59,10 @@ public ByteBuffer serialize(ApiKeys apiKey, short version, 
int correlationId) {
 return Collections.singletonMap(error, 1);
 }
 
+protected Map errorCounts(Stream errors) {
+return errors.collect(Collectors.groupingBy(e -> e, 
Collectors.summingInt(e -> 1)));
+}
+
 protected Map errorCounts(Collection errors) {

Review comment:
   @chia7712 you're right, but the only two remaining callers of this 
method are for RPCs which haven't been converted to the message generator. 
There's little benefit to changing them when there are already PRs for 
converting those RPCs, and I'm planning to remove this method entirely when 
those PRs have been merged. I guess I could mark this method as `@Deprecated`.
   
   Relatedly there's only a single caller of the `apiErrorCounts(Map errors)` method which is also for an not-yet-converted RPC with a PR. 
If this gets merged first I'll be able to remove `apiErrorCounts(Map errors)` in that PR. 

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
##
@@ -52,10 +51,9 @@ protected Struct toStruct(short version) {
 @Override
 public Map errorCounts() {
 Map counts = new HashMap<>();

Review comment:
   @chia7712 what I've tried to do in this PR so far is:
   
   * Change `for` stmt  + updateErrorCounts to use `forEach` consistently
   * Change calls to `errorCounts(Collection)` to `errorCounts(Stream)`
   
   I've not tried to change all code to use either `forEach` or 
`errorCounts(Stream)`. Obviously we could do that, but @ijuma seems happy 
enough with continuing to have these two ways to do it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs

2020-04-22 Thread GitBox


dajac commented on a change in pull request #8311:
URL: https://github.com/apache/kafka/pull/8311#discussion_r412287554



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -2207,21 +2212,33 @@ public AlterReplicaLogDirsResult 
alterReplicaLogDirs(Map());
 
-Map> replicaAssignmentByBroker = 
new HashMap<>();
+Map replicaAssignmentByBroker 
= new HashMap<>();
 for (Map.Entry entry: 
replicaAssignment.entrySet()) {
 TopicPartitionReplica replica = entry.getKey();
 String logDir = entry.getValue();
 int brokerId = replica.brokerId();
 TopicPartition topicPartition = new 
TopicPartition(replica.topic(), replica.partition());

Review comment:
   `topicPartition` is not used except for getting the topic and the 
partition above. `replica.topic()` and `replica.partition()` could be directly 
used instead.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -2207,21 +2212,33 @@ public AlterReplicaLogDirsResult 
alterReplicaLogDirs(Map());
 
-Map> replicaAssignmentByBroker = 
new HashMap<>();
+Map replicaAssignmentByBroker 
= new HashMap<>();
 for (Map.Entry entry: 
replicaAssignment.entrySet()) {
 TopicPartitionReplica replica = entry.getKey();
 String logDir = entry.getValue();
 int brokerId = replica.brokerId();
 TopicPartition topicPartition = new 
TopicPartition(replica.topic(), replica.partition());
-if (!replicaAssignmentByBroker.containsKey(brokerId))
-replicaAssignmentByBroker.put(brokerId, new HashMap<>());
-replicaAssignmentByBroker.get(brokerId).put(topicPartition, 
logDir);
+AlterReplicaLogDirsRequestData value = 
replicaAssignmentByBroker.computeIfAbsent(brokerId,

Review comment:
   nit: Could we rename `value` to something like `alterReplicaLogDirs`?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -2207,21 +2212,33 @@ public AlterReplicaLogDirsResult 
alterReplicaLogDirs(Map());
 
-Map> replicaAssignmentByBroker = 
new HashMap<>();
+Map replicaAssignmentByBroker 
= new HashMap<>();
 for (Map.Entry entry: 
replicaAssignment.entrySet()) {
 TopicPartitionReplica replica = entry.getKey();
 String logDir = entry.getValue();
 int brokerId = replica.brokerId();
 TopicPartition topicPartition = new 
TopicPartition(replica.topic(), replica.partition());
-if (!replicaAssignmentByBroker.containsKey(brokerId))
-replicaAssignmentByBroker.put(brokerId, new HashMap<>());
-replicaAssignmentByBroker.get(brokerId).put(topicPartition, 
logDir);
+AlterReplicaLogDirsRequestData value = 
replicaAssignmentByBroker.computeIfAbsent(brokerId,
+key -> new AlterReplicaLogDirsRequestData());
+AlterReplicaLogDir alterReplicaLogDir = value.dirs().find(logDir);
+if (alterReplicaLogDir == null) {
+alterReplicaLogDir = new AlterReplicaLogDir();
+alterReplicaLogDir.setPath(logDir);
+value.dirs().add(alterReplicaLogDir);
+}
+AlterReplicaLogDirTopic alterReplicaLogDirTopic = 
alterReplicaLogDir.topics().find(topicPartition.topic());
+if (alterReplicaLogDirTopic == null) {
+alterReplicaLogDirTopic = new AlterReplicaLogDirTopic();
+alterReplicaLogDir.topics().add(alterReplicaLogDirTopic);
+}
+alterReplicaLogDirTopic.setName(topicPartition.topic())

Review comment:
   `setName` could be done only once within the if statement.

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java
##
@@ -17,122 +17,53 @@
 
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
-
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData;
+import org.apache.kafka

[GitHub] [kafka] dajac commented on a change in pull request #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch

2020-04-22 Thread GitBox


dajac commented on a change in pull request #8509:
URL: https://github.com/apache/kafka/pull/8509#discussion_r412807377



##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -1553,6 +1552,179 @@ class KafkaApisTest {
 assertEquals(Errors.INVALID_REQUEST, response.error)
   }
 
+  @Test
+  def testUpdateMetadataRequestWithCurrentBrokerEpoch(): Unit = {
+val currentBrokerEpoch = 1239875L
+testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, 
Errors.NONE)
+  }
+
+  @Test
+  def testUpdateMetadataRequestWithNewerBrokerEpochIsValid(): Unit = {
+val currentBrokerEpoch = 1239875L
+testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1, 
Errors.NONE)
+  }
+
+  @Test
+  def testUpdateMetadataRequestWithStaleBrokerEpochIsRejected(): Unit = {
+val currentBrokerEpoch = 1239875L
+testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1, 
Errors.STALE_BROKER_EPOCH)
+  }
+
+  def testUpdateMetadataRequest(currentBrokerEpoch: Long, 
brokerEpochInRequest: Long, expectedError: Errors): Unit = {
+val updateMetadataRequest = createBasicMetadataRequest("topicA", 1, 
brokerEpochInRequest)
+val request = buildRequest(updateMetadataRequest)
+
+val capturedResponse: Capture[RequestChannel.Response] = 
EasyMock.newCapture()
+
+EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+EasyMock.expect(replicaManager.maybeUpdateMetadataCache(
+  EasyMock.eq(request.context.correlationId),
+  EasyMock.anyObject()
+)).andStubReturn(
+  Seq()
+)
+
+
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+EasyMock.replay(replicaManager, controller, requestChannel)
+
+createKafkaApis().handleUpdateMetadataRequest(request)
+val updateMetadataResponse = readResponse(ApiKeys.UPDATE_METADATA, 
updateMetadataRequest, capturedResponse)
+  .asInstanceOf[UpdateMetadataResponse]
+assertEquals(expectedError, updateMetadataResponse.error())
+EasyMock.verify(replicaManager)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithCurrentBrokerEpoch(): Unit = {
+val currentBrokerEpoch = 1239875L
+testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch, 
Errors.NONE)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithNewerBrokerEpochIsValid(): Unit = {
+val currentBrokerEpoch = 1239875L
+testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch + 1, 
Errors.NONE)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithStaleBrokerEpochIsRejected(): Unit = {
+val currentBrokerEpoch = 1239875L
+testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch - 1, 
Errors.STALE_BROKER_EPOCH)
+  }
+
+  def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest: 
Long, expectedError: Errors): Unit = {
+val controllerId = 2
+val controllerEpoch = 6
+val capturedResponse: Capture[RequestChannel.Response] = 
EasyMock.newCapture()
+val partitionStates = Seq(
+  new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
+.setTopicName("topicW")
+.setPartitionIndex(1)
+.setControllerEpoch(1)
+.setLeader(0)
+.setLeaderEpoch(1)
+.setIsr(asList(0, 1))
+.setZkVersion(2)
+.setReplicas(asList(0, 1, 2))
+.setIsNew(false)
+).asJava
+val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
+  ApiKeys.LEADER_AND_ISR.latestVersion,
+  controllerId,
+  controllerEpoch,
+  brokerEpochInRequest,
+  partitionStates,
+  asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091))
+).build()
+val request = buildRequest(leaderAndIsrRequest)
+val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+  .setErrorCode(Errors.NONE.code)
+  .setPartitionErrors(asList()))
+
+EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+EasyMock.expect(replicaManager.becomeLeaderOrFollower(
+  EasyMock.eq(request.context.correlationId),
+  EasyMock.anyObject(),
+  EasyMock.anyObject()
+)).andStubReturn(
+  response
+)
+
+
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+EasyMock.replay(replicaManager, controller, requestChannel)
+
+createKafkaApis().handleLeaderAndIsrRequest(request)
+val leaderAndIsrResponse = readResponse(ApiKeys.LEADER_AND_ISR, 
leaderAndIsrRequest, capturedResponse)
+  .asInstanceOf[LeaderAndIsrResponse]
+assertEquals(expectedError, leaderAndIsrResponse.error())
+EasyMock.verify(replicaManager)
+  }
+
+  @Test
+  def testStopReplicaRequestWithCurrentBrokerEpoch(): Unit = {
+val currentBrokerEpoch = 1239875L
+testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, 
Errors.NONE)

Review comment:
   This and the two above should use `testStopReplicaRequest` instead of 
`testUpdateMetadataRequest`.

##
File path: core/src/test/sc

[jira] [Created] (KAFKA-9902) java client api can not completely take out the kafka-consumer-groups.sh output of information

2020-04-22 Thread startjava (Jira)
startjava created KAFKA-9902:


 Summary: java client api can not completely take out the 
kafka-consumer-groups.sh output of information
 Key: KAFKA-9902
 URL: https://issues.apache.org/jira/browse/KAFKA-9902
 Project: Kafka
  Issue Type: Test
Reporter: startjava


Why the java client api is not with:
.kafka-consumer-groups.sh --bootstrap-server localhost:9081 --describe --group 
test
The method corresponding to the command, I can not get together GROUP, TOPIC, 
PARTITION, CURRENT-OFFSET, LOG-END-OFFSET, LAG, CONSUMER-ID, HOST, CLIENT-ID 
these columns of information, search materials know need to be taken 
separately, which makes our developers very troublesome, and this feature is 
very common.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] leonardge commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR

2020-04-22 Thread GitBox


leonardge commented on a change in pull request #8524:
URL: https://github.com/apache/kafka/pull/8524#discussion_r412857024



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1066,6 +1066,7 @@ class KafkaController(val config: KafkaConfig,
 // do this check only if the broker is live and there are no 
partitions being reassigned currently
 // and preferred replica election is not in progress
 val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp 
=> controllerContext.isReplicaOnline(leaderBroker, tp) &&
+  
controllerContext.partitionLeadershipInfo(tp).leaderAndIsr.isr.contains(leaderBroker)
 &&

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] harshitshah4 edited a comment on issue #8512: KAFKA-6024: Consider moving validation in KafkaConsumer ahead of call to acquireAndEnsureOpen()

2020-04-22 Thread GitBox


harshitshah4 edited a comment on issue #8512:
URL: https://github.com/apache/kafka/pull/8512#issuecomment-616683601


   @omkreddy @hachikuji can you review the changes ? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] harshitshah4 removed a comment on issue #8512: KAFKA-6024: Consider moving validation in KafkaConsumer ahead of call to acquireAndEnsureOpen()

2020-04-22 Thread GitBox


harshitshah4 removed a comment on issue #8512:
URL: https://github.com/apache/kafka/pull/8512#issuecomment-617756461


   I feel that there are more methods that may require similar changes .



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] harshitshah4 commented on issue #8512: KAFKA-6024: Consider moving validation in KafkaConsumer ahead of call to acquireAndEnsureOpen()

2020-04-22 Thread GitBox


harshitshah4 commented on issue #8512:
URL: https://github.com/apache/kafka/pull/8512#issuecomment-617756461


   I feel that there are more methods that may require similar changes .



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on issue #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs

2020-04-22 Thread GitBox


tombentley commented on issue #8311:
URL: https://github.com/apache/kafka/pull/8311#issuecomment-617764518


   Thanks @dajac, there were some useful comments and good spots there. I've 
pushed some fixes.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8530: KAFKA-9388: Refactor integration tests to always use different application ids

2020-04-22 Thread GitBox


chia7712 commented on a change in pull request #8530:
URL: https://github.com/apache/kafka/pull/8530#discussion_r412959693



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
##
@@ -119,7 +124,7 @@ public void shouldRestoreStateFromSourceTopic() throws 
Exception {
 final AtomicInteger numReceived = new AtomicInteger(0);
 final StreamsBuilder builder = new StreamsBuilder();
 
-final Properties props = props(APPID);
+final Properties props = props(APPID + name.getMethodName());

Review comment:
   the following app id should be changed as well. (I can't add comment to 
the line for this PR)
   ```scala
   private void setCommittedOffset(final String topic, final int 
limitDelta) {
   final Properties consumerConfig = new Properties();  final 
Properties consumerConfig = new Properties();
   consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()); 
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
   consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APPID);   
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APPID);
   ```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
##
@@ -107,6 +109,9 @@ private Properties props(final String applicationId) {
 return streamsConfiguration;
 }
 
+@Rule
+public TestName name = new TestName();

Review comment:
   the changelog topics are created by ```BeforeClass``` so it seems we 
need to add ```Before``` to create changelog for different *method name*.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski commented on issue #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR

2020-04-22 Thread GitBox


stanislavkozlovski commented on issue #8524:
URL: https://github.com/apache/kafka/pull/8524#issuecomment-617770854


   After discussing online, we figured there isn't an easy way to test this 
scenario. There's significant work to be done to make KafkaController unit 
test-able
   
   Good catch on the 
`testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionEnabled`!
   
   I think this change looks good. Let's wait for @hachikuji or @junrao to take 
a look.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on issue #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-22 Thread GitBox


ijuma commented on issue #8517:
URL: https://github.com/apache/kafka/pull/8517#issuecomment-617772737


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy

2020-04-22 Thread GitBox


ijuma commented on a change in pull request #8417:
URL: https://github.com/apache/kafka/pull/8417#discussion_r412973974



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java
##
@@ -66,14 +64,12 @@ public int throttleTimeMs() {
 public Map errorCounts() {
 Map counts = new HashMap<>();
 Errors topLevelErr = Errors.forCode(data.errorCode());
-counts.put(topLevelErr, counts.getOrDefault(topLevelErr, 0) + 1);
+updateErrorCounts(counts, topLevelErr);
 
-for (ReassignableTopicResponse topicResponse : data.responses()) {
-for (ReassignablePartitionResponse partitionResponse : 
topicResponse.partitions()) {
-Errors error = Errors.forCode(partitionResponse.errorCode());
-counts.put(error, counts.getOrDefault(error, 0) + 1);
-}
-}
+data.responses().forEach(topicResponse ->
+topicResponse.partitions().forEach(partitionResponse ->
+updateErrorCounts(counts, 
Errors.forCode(partitionResponse.errorCode()))

Review comment:
   Indent.

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionReassignmentsResponse.java
##
@@ -66,14 +64,12 @@ public int throttleTimeMs() {
 public Map errorCounts() {
 Map counts = new HashMap<>();
 Errors topLevelErr = Errors.forCode(data.errorCode());

Review comment:
   Maybe inline.

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
##
@@ -95,13 +94,8 @@ public OffsetCommitResponseData data() {
 
 @Override
 public Map errorCounts() {
-List errors = new ArrayList<>();
-for (OffsetCommitResponseTopic topic : data.topics()) {
-for (OffsetCommitResponsePartition partition : topic.partitions()) 
{
-errors.add(Errors.forCode(partition.errorCode()));
-}
-}
-return errorCounts(errors);
+return errorCounts(data.topics().stream().flatMap(topicResult -> 
topicResult.partitions().stream())

Review comment:
   Is there a reason why we are not doing the map within the flatMap?

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
##
@@ -91,7 +91,9 @@ public int throttleTimeMs() {
 
 @Override
 public Map errorCounts() {
-return errorCounts(errors().values());
+return errorCounts(data.topics().stream()
+.flatMap(topic -> topic.partitions().stream())

Review comment:
   Any reason why we don't do the map inside the flatMap?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9652) Throttle time metric needs to be updated for KIP-219

2020-04-22 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089674#comment-17089674
 ] 

Ismael Juma commented on KAFKA-9652:


[~agam] Are you working on this? While looking at another issue, I fixed it in 
a local branch. I can submit a PR if you haven't started on it.

> Throttle time metric needs to be updated for KIP-219
> 
>
> Key: KAFKA-9652
> URL: https://issues.apache.org/jira/browse/KAFKA-9652
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Agam Brahma
>Priority: Major
>
> KIP-219 changed the throttling logic so that responses are returned 
> immediately. The logic for updating the throttle time in `RequestChannel` 
> appears to have not been updated to reflect this change and instead reflects 
> the old behavior where the timing is based on the time between remote 
> completion and response completion. This means the metric will pretty much 
> always show negligible throttling.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9589) LogValidatorTest#testLogAppendTimeNonCompressedV2 is not executed and does not pass

2020-04-22 Thread Wang Ge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wang Ge reassigned KAFKA-9589:
--

Assignee: Wang Ge

> LogValidatorTest#testLogAppendTimeNonCompressedV2 is not executed and does 
> not pass
> ---
>
> Key: KAFKA-9589
> URL: https://issues.apache.org/jira/browse/KAFKA-9589
> Project: Kafka
>  Issue Type: Bug
>Reporter: Stanislav Kozlovski
>Assignee: Wang Ge
>Priority: Major
>
> The LogValidatorTest#testLogAppendTimeNonCompressedV2 test does not execute 
> because it's missing a '@Test' annotation.
> When executed locally, it fails with the following error:
> {code:java}
> java.lang.AssertionError: The offset of max timestamp should be 0 
> Expected :0
> Actual   :2
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9903) kafka ShutdownableThread judge thread isRuning status has some bug

2020-04-22 Thread shilin Lu (Jira)
shilin Lu created KAFKA-9903:


 Summary: kafka ShutdownableThread  judge thread isRuning status 
has some bug
 Key: KAFKA-9903
 URL: https://issues.apache.org/jira/browse/KAFKA-9903
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.3.1
Reporter: shilin Lu
 Attachments: image-2020-04-22-21-28-03-154.png

h2. 1.bug
{code:java}
override def run(): Unit = {
  isStarted = true
  info("Starting")
  try {
while (isRunning)
  doWork()
  } catch {
case e: FatalExitError =>
  shutdownInitiated.countDown()
  shutdownComplete.countDown()
  info("Stopped")
  Exit.exit(e.statusCode())
case e: Throwable =>
  if (isRunning)
error("Error due to", e)
  } finally {
shutdownInitiated.countDown()
shutdownComplete.countDown()
  }
  info("Stopped")
}

def isRunning: Boolean = {
  shutdownInitiated.getCount() != 0
}{code}
1.when replicaThread has exception which is not fatalExitError, the thread will 
exit,and run finally logic(countdown the shutdownComplete conutdownLatch),but 
shutdownInitiated is not be countdown.

2.with 1, shutdownInitiated is just not countdown, its value is 1, isRunning 
logic just judge thread isRuning through shutdownInitiated != 0, so through 
this method to judge thread status is wrong.

3.isRunning method is used in shutdownIdleFetcherThreads, processFetchRequest, 
controller request send and oher else, maybe cause thread can't be remove and 
something can not be done
h2. 2.bugfix

Just like the following code,countdown shutdownInitiated in finally logic

 
{code:java}
override def run(): Unit = {
  isStarted = true
  info("Starting")
  try {
while (isRunning)
  doWork()
  } catch {
case e: FatalExitError =>
  shutdownInitiated.countDown()
  shutdownComplete.countDown()
  info("Stopped")
  Exit.exit(e.statusCode())
case e: Throwable =>
  if (isRunning)
error("Error due to", e)
  } finally {
shutdownInitiated.countDown()
shutdownComplete.countDown()
  }
  info("Stopped")
}
{code}
 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9903) kafka ShutdownableThread judge thread isRuning status has some bug

2020-04-22 Thread shilin Lu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089688#comment-17089688
 ] 

shilin Lu commented on KAFKA-9903:
--

[~guozhang] [~ijuma] please take a look at this issue ,thanks.  if it make 
sense, i will create a pr for this issue

> kafka ShutdownableThread  judge thread isRuning status has some bug
> ---
>
> Key: KAFKA-9903
> URL: https://issues.apache.org/jira/browse/KAFKA-9903
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.1
>Reporter: shilin Lu
>Priority: Major
> Attachments: image-2020-04-22-21-28-03-154.png
>
>
> h2. 1.bug
> {code:java}
> override def run(): Unit = {
>   isStarted = true
>   info("Starting")
>   try {
> while (isRunning)
>   doWork()
>   } catch {
> case e: FatalExitError =>
>   shutdownInitiated.countDown()
>   shutdownComplete.countDown()
>   info("Stopped")
>   Exit.exit(e.statusCode())
> case e: Throwable =>
>   if (isRunning)
> error("Error due to", e)
>   } finally {
> shutdownInitiated.countDown()
> shutdownComplete.countDown()
>   }
>   info("Stopped")
> }
> def isRunning: Boolean = {
>   shutdownInitiated.getCount() != 0
> }{code}
> 1.when replicaThread has exception which is not fatalExitError, the thread 
> will exit,and run finally logic(countdown the shutdownComplete 
> conutdownLatch),but shutdownInitiated is not be countdown.
> 2.with 1, shutdownInitiated is just not countdown, its value is 1, isRunning 
> logic just judge thread isRuning through shutdownInitiated != 0, so through 
> this method to judge thread status is wrong.
> 3.isRunning method is used in shutdownIdleFetcherThreads, 
> processFetchRequest, controller request send and oher else, maybe cause 
> thread can't be remove and something can not be done
> h2. 2.bugfix
> Just like the following code,countdown shutdownInitiated in finally logic
>  
> {code:java}
> override def run(): Unit = {
>   isStarted = true
>   info("Starting")
>   try {
> while (isRunning)
>   doWork()
>   } catch {
> case e: FatalExitError =>
>   shutdownInitiated.countDown()
>   shutdownComplete.countDown()
>   info("Stopped")
>   Exit.exit(e.statusCode())
> case e: Throwable =>
>   if (isRunning)
> error("Error due to", e)
>   } finally {
> shutdownInitiated.countDown()
> shutdownComplete.countDown()
>   }
>   info("Stopped")
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tombentley commented on a change in pull request #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy

2020-04-22 Thread GitBox


tombentley commented on a change in pull request #8417:
URL: https://github.com/apache/kafka/pull/8417#discussion_r413002568



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
##
@@ -95,13 +94,8 @@ public OffsetCommitResponseData data() {
 
 @Override
 public Map errorCounts() {
-List errors = new ArrayList<>();
-for (OffsetCommitResponseTopic topic : data.topics()) {
-for (OffsetCommitResponsePartition partition : topic.partitions()) 
{
-errors.add(Errors.forCode(partition.errorCode()));
-}
-}
-return errorCounts(errors);
+return errorCounts(data.topics().stream().flatMap(topicResult -> 
topicResult.partitions().stream())

Review comment:
   No _good_ reason. Good spot.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on issue #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy

2020-04-22 Thread GitBox


tombentley commented on issue #8417:
URL: https://github.com/apache/kafka/pull/8417#issuecomment-617793326


   @ijuma fixed your comments.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy

2020-04-22 Thread GitBox


tombentley commented on a change in pull request #8417:
URL: https://github.com/apache/kafka/pull/8417#discussion_r413002862



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
##
@@ -91,7 +91,9 @@ public int throttleTimeMs() {
 
 @Override
 public Map errorCounts() {
-return errorCounts(errors().values());
+return errorCounts(data.topics().stream()
+.flatMap(topic -> topic.partitions().stream())

Review comment:
   As above.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on issue #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy

2020-04-22 Thread GitBox


ijuma commented on issue #8417:
URL: https://github.com/apache/kafka/pull/8417#issuecomment-617797442


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on issue #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy

2020-04-22 Thread GitBox


ijuma commented on issue #8417:
URL: https://github.com/apache/kafka/pull/8417#issuecomment-617798142


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on issue #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy

2020-04-22 Thread GitBox


ijuma commented on issue #8417:
URL: https://github.com/apache/kafka/pull/8417#issuecomment-617798318


   @tombentley can you please update the PR description to summarize the perf 
testing you did?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on issue #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy

2020-04-22 Thread GitBox


tombentley commented on issue #8417:
URL: https://github.com/apache/kafka/pull/8417#issuecomment-617802853


   @ijuma done.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on issue #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy

2020-04-22 Thread GitBox


ijuma commented on issue #8417:
URL: https://github.com/apache/kafka/pull/8417#issuecomment-617808555


   Perfect, thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-22 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r413074912



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,19 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)

Review comment:
   We can, I was just trying to avoid the replica fetcher having to pass 
Time.SYSTEM in everywhere. If you prefer that I can change it. Is there any 
downside what I did though?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-22 Thread GitBox


ijuma commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r413076253



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,19 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)

Review comment:
   It should pass the actual `time` instance so that we can mock if we want:
   
   ```class ReplicaFetcherThread(name: String,
  fetcherId: Int,
  sourceBroker: BrokerEndPoint,
  brokerConfig: KafkaConfig,
  failedPartitions: FailedPartitions,
  replicaMgr: ReplicaManager,
  metrics: Metrics,
  time: Time,
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-22 Thread GitBox


ijuma commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r413076253



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,19 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)

Review comment:
   It should pass the actual `time` instance so that we can mock if we want:
   
   ```scala
   class ReplicaFetcherThread(name: String,
  fetcherId: Int,
  sourceBroker: BrokerEndPoint,
  brokerConfig: KafkaConfig,
  failedPartitions: FailedPartitions,
  replicaMgr: ReplicaManager,
  metrics: Metrics,
  time: Time,
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-22 Thread GitBox


ijuma commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r413076253



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,19 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)

Review comment:
   It should pass the actual `time` instance so that we can mock if we 
want. It has one in its constructor already:
   
   ```scala
   class ReplicaFetcherThread(name: String,
  fetcherId: Int,
  sourceBroker: BrokerEndPoint,
  brokerConfig: KafkaConfig,
  failedPartitions: FailedPartitions,
  replicaMgr: ReplicaManager,
  metrics: Metrics,
  time: Time,
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-22 Thread GitBox


ijuma commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r413077069



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,19 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {

Review comment:
   Does `time` need to be a `val`?

##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,19 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {

Review comment:
   Does `time` need to be a public `val`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-22 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r413085641



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,19 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {

Review comment:
   Good point. I'll make it private.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-22 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r413086046



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,19 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)

Review comment:
   Thanks, yes, I was just coming back to say your suggestion was better 
for testability reasons.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch

2020-04-22 Thread GitBox


hachikuji commented on a change in pull request #8509:
URL: https://github.com/apache/kafka/pull/8509#discussion_r413091037



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -3084,12 +3084,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 // Broker epoch in LeaderAndIsr/UpdateMetadata/StopReplica request is 
unknown
 // if the controller hasn't been upgraded to use KIP-380
 if (brokerEpochInRequest == AbstractControlRequest.UNKNOWN_BROKER_EPOCH) 
false
-else {
-  val curBrokerEpoch = controller.brokerEpoch
-  if (brokerEpochInRequest < curBrokerEpoch) true
-  else if (brokerEpochInRequest == curBrokerEpoch) false
-  else throw new IllegalStateException(s"Epoch $brokerEpochInRequest 
larger than current broker epoch $curBrokerEpoch")
-}
+else brokerEpochInRequest < controller.brokerEpoch

Review comment:
   Short comment here may be helpful about the case where the controller 
sees the epoch bump first.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-9904) Use ThreadLocalConcurrent to Replace Random

2020-04-22 Thread David Mollitor (Jira)
David Mollitor created KAFKA-9904:
-

 Summary: Use ThreadLocalConcurrent to Replace Random
 Key: KAFKA-9904
 URL: https://issues.apache.org/jira/browse/KAFKA-9904
 Project: Kafka
  Issue Type: Improvement
Reporter: David Mollitor


https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadLocalRandom.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] belugabehr opened a new pull request #8531: KAFAKA-9904: Use ThreadLocalConcurrent to Replace Random

2020-04-22 Thread GitBox


belugabehr opened a new pull request #8531:
URL: https://github.com/apache/kafka/pull/8531


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna opened a new pull request #8532: HOTFIX: Fix broker bounce system tests

2020-04-22 Thread GitBox


cadonna opened a new pull request #8532:
URL: https://github.com/apache/kafka/pull/8532


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on issue #8532: HOTFIX: Fix broker bounce system tests

2020-04-22 Thread GitBox


cadonna commented on issue #8532:
URL: https://github.com/apache/kafka/pull/8532#issuecomment-617869532


   Call for review: @mjsax @abbccdda 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8532: HOTFIX: Fix broker bounce system tests

2020-04-22 Thread GitBox


cadonna commented on a change in pull request #8532:
URL: https://github.com/apache/kafka/pull/8532#discussion_r413110487



##
File path: tests/kafkatest/tests/streams/streams_broker_bounce_test.py
##
@@ -164,7 +164,7 @@ def setup_system(self, start_processor=True, num_threads=3):
 
 # Start test harness
 self.driver = StreamsSmokeTestDriverService(self.test_context, 
self.kafka)
-self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka, num_threads)
+self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka, "at_least_once", num_threads)

Review comment:
   Is it enough to specify the processing guarantee as `at_least_once` here 
or do you want also for this test to include all processing guarantees in the 
test matrix?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8532: HOTFIX: Fix broker bounce system tests

2020-04-22 Thread GitBox


cadonna commented on a change in pull request #8532:
URL: https://github.com/apache/kafka/pull/8532#discussion_r413116285



##
File path: tests/kafkatest/tests/streams/streams_broker_bounce_test.py
##
@@ -164,7 +164,7 @@ def setup_system(self, start_processor=True, num_threads=3):
 
 # Start test harness
 self.driver = StreamsSmokeTestDriverService(self.test_context, 
self.kafka)
-self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka, num_threads)
+self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka, "at_least_once", num_threads)

Review comment:
   On a different note, now that the processing guarantee can be passed to 
the service, do we still need `StreamsEosTestJobRunnerService` and 
`StreamsComplexEosTestJobRunnerService`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on issue #8532: HOTFIX: Fix broker bounce system tests

2020-04-22 Thread GitBox


cadonna commented on issue #8532:
URL: https://github.com/apache/kafka/pull/8532#issuecomment-617883911


   Streams system tests run: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3914



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] leonardge opened a new pull request #8533: Fixed bug in log validator tests.

2020-04-22 Thread GitBox


leonardge opened a new pull request #8533:
URL: https://github.com/apache/kafka/pull/8533


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on issue #8454: KAFKA-9844; Maximum number of members within a group is not always enforced due to a race condition in join group

2020-04-22 Thread GitBox


dajac commented on issue #8454:
URL: https://github.com/apache/kafka/pull/8454#issuecomment-617907578


   @hachikuji Could we get this one merged?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps

2020-04-22 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089869#comment-17089869
 ] 

Matthias J. Sax commented on KAFKA-6817:


[~bob-barrett] – do you think KIP-360 fully addresses this issue?

> UnknownProducerIdException when writing messages with old timestamps
> 
>
> Key: KAFKA-6817
> URL: https://issues.apache.org/jira/browse/KAFKA-6817
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.1.0
>Reporter: Odin Standal
>Priority: Major
>
> We are seeing the following exception in our Kafka application: 
> {code:java}
> ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer 
> due to the following error: org.apache.kafka.streams.errors.StreamsException: 
> task [0_0] Abort sending since an error caught with a previous record (key 
> 22 value some-value timestamp 1519200902670) to topic 
> exactly-once-test-topic- v2 due to This exception is raised by the broker if 
> it could not locate the producer metadata associated with the producerId in 
> question. This could happen if, for instance, the producer's records were 
> deleted because their retention time had elapsed. Once the last records of 
> the producerId are removed, the producer's metadata is removed from the 
> broker, and future appends by the producer will return this exception. at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
> at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
>  at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) 
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> org.apache.kafka.common.errors.UnknownProducerIdException
> {code}
> We discovered this error when we had the need to reprocess old messages. See 
> more details on 
> [Stackoverflow|https://stackoverflow.com/questions/49872827/unknownproduceridexception-in-kafka-streams-when-enabling-exactly-once?noredirect=1#comment86901917_49872827]
> We have reproduced the error with a smaller example application. The error 
> occurs after 10 minutes of producing messages that have old timestamps (type 
> 1 year old). The topic we are writing to has a retention.ms set to 1 year so 
> we are expecting the messages to stay there.
> After digging through the ProducerStateManager-code in the Kafka source code 
> we have a theory of what might be wrong.
> The ProducerStateManager.removeExpiredProducers() seems to remove producers 
> from memory erroneously when processing records which are older than the 
> maxProducerIdExpirationMs (coming from the `transactional.id.expiration.ms` 
> configuration), which is set by default to 7 days. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #8530: KAFKA-9388: Refactor integration tests to always use different application ids

2020-04-22 Thread GitBox


guozhangwang commented on a change in pull request #8530:
URL: https://github.com/apache/kafka/pull/8530#discussion_r413168976



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java
##
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.common.utils.Utils.mkProperties;
-import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-
-public class KTableKTableForeignKeyJoinPseudoTopicTest {

Review comment:
   I moved this test into the non-integration unit tests.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
##
@@ -80,14 +83,18 @@ public static void setUpBeforeAllTests() throws Exception {
 STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
 STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5);
 
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
WallclockTimestampExtractor.class);
+STREAMS_CONFIG.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);

Review comment:
   For this test, we did need to reuse the created topics and hence I 
reduced the session / heartbeat timeout so that their rebalance timeout could 
be much smaller.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
##
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.integration;

Review comment:
   I merged this test with another as a non-integration test, since it uses 
TTD and does not really creates a cluster :)

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
##
@@ -104,6 +104,8 @@ private Properties props(final String applicationId) {
 
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
 streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000);
 streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
1000);

Review comment:
   Similar here, for this test I reduced the session / heartbeat timeout so 
that their rebalance timeout could be much smaller. I think it is simpler than 
changing a bunch of changelogs / source / sink / and app ids.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vinothchandar opened a new pull request #8534: KAFKA-9512: Backport Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration

2020-04-22 Thread GitBox


vinothchandar opened a new pull request #8534:
URL: https://github.com/apache/kafka/pull/8534


   - Added additional synchronization and increased timeouts to handle flakiness
   - Added some pre-cautionary retries when trying to obtain lag map
   
   *More detailed description of your change,
   Backport of PR #8076 
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vinothchandar commented on issue #8462: KAFKA-9846: Filter active tasks for running state in KafkaStreams#allLocalStorePartitionLags()

2020-04-22 Thread GitBox


vinothchandar commented on issue #8462:
URL: https://github.com/apache/kafka/pull/8462#issuecomment-617931148


   Opened a simple backport here #8534.. We can focus on that for fixing the 
test flakiness itself. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8495: MINOR: downgrade test should wait for ISR rejoin between rolls

2020-04-22 Thread GitBox


abbccdda commented on a change in pull request #8495:
URL: https://github.com/apache/kafka/pull/8495#discussion_r413199411



##
File path: tests/kafkatest/tests/core/downgrade_test.py
##
@@ -26,10 +27,12 @@
 from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_10_0, 
LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, 
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, V_0_9_0_0, V_0_11_0_0, 
DEV_BRANCH, KafkaVersion
 
 class TestDowngrade(EndToEndTest):
+PARTITIONS = 3
+REPLICATION_FACTOR = 3
 
 TOPIC_CONFIG = {
-"partitions": 3,
-"replication-factor": 3,
+"partitions": PARTITIONS,

Review comment:
   Is this necessary to be defined as constants?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on issue #8462: KAFKA-9846: Filter active tasks for running state in KafkaStreams#allLocalStorePartitionLags()

2020-04-22 Thread GitBox


guozhangwang commented on issue #8462:
URL: https://github.com/apache/kafka/pull/8462#issuecomment-617948771


   > Opened a simple backport here #8534.. We can focus on that for fixing the 
test flakiness itself.
   
   Sounds good, thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on issue #8534: KAFKA-9512: Backport Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration

2020-04-22 Thread GitBox


guozhangwang commented on issue #8534:
URL: https://github.com/apache/kafka/pull/8534#issuecomment-617949190


   @vinothchandar this PR is still in the `work in progress` state, is that 
ready for review and merge?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vinothchandar commented on issue #8534: KAFKA-9512: Backport Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration

2020-04-22 Thread GitBox


vinothchandar commented on issue #8534:
URL: https://github.com/apache/kafka/pull/8534#issuecomment-617950145


   Will remove the draft status in a little bit.. Running the test again on 2.5 
branch many times to confirm flakiness is gone... It will be ready for review 
and merge then



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8495: MINOR: downgrade test should wait for ISR rejoin between rolls

2020-04-22 Thread GitBox


lbradstreet commented on a change in pull request #8495:
URL: https://github.com/apache/kafka/pull/8495#discussion_r413216403



##
File path: tests/kafkatest/tests/core/downgrade_test.py
##
@@ -26,10 +27,12 @@
 from kafkatest.version import LATEST_0_9, LATEST_0_10, LATEST_0_10_0, 
LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, 
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, V_0_9_0_0, V_0_11_0_0, 
DEV_BRANCH, KafkaVersion
 
 class TestDowngrade(EndToEndTest):
+PARTITIONS = 3
+REPLICATION_FACTOR = 3
 
 TOPIC_CONFIG = {
-"partitions": 3,
-"replication-factor": 3,
+"partitions": PARTITIONS,

Review comment:
   Yes, see 
https://github.com/apache/kafka/pull/8495/files#diff-11673271221c04c0861f9c5e074a9783R80





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9903) kafka ShutdownableThread judge thread isRuning status has some bug

2020-04-22 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089926#comment-17089926
 ] 

Guozhang Wang commented on KAFKA-9903:
--

Hello Shilin,

Thanks for the report, and I think it is an issue indeed. However note that our 
`isThreadFailed: Boolean = isShutdownComplete && !isShutdownInitiated` actually 
is checking that the thread did not actively initiated a shutdown but has been 
shutdown, meaning it is shutdown due to an error, and hence we cannot simply 
countdown the shutdownInitiated in the finally block.

Instead, I think a more appropriate fix would be to change the logic of 
`isRunning: Boolean = !isShutdownInitiated` to `isRunning: Boolean = 
!isShutdownInitiated && !isShutdownComplete`, so that if the thread terminates 
due to errors, the `isRunning` boolean can return false. WDYT?

> kafka ShutdownableThread  judge thread isRuning status has some bug
> ---
>
> Key: KAFKA-9903
> URL: https://issues.apache.org/jira/browse/KAFKA-9903
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.1
>Reporter: shilin Lu
>Priority: Major
> Attachments: image-2020-04-22-21-28-03-154.png
>
>
> h2. 1.bug
> {code:java}
> override def run(): Unit = {
>   isStarted = true
>   info("Starting")
>   try {
> while (isRunning)
>   doWork()
>   } catch {
> case e: FatalExitError =>
>   shutdownInitiated.countDown()
>   shutdownComplete.countDown()
>   info("Stopped")
>   Exit.exit(e.statusCode())
> case e: Throwable =>
>   if (isRunning)
> error("Error due to", e)
>   } finally {
> shutdownInitiated.countDown()
> shutdownComplete.countDown()
>   }
>   info("Stopped")
> }
> def isRunning: Boolean = {
>   shutdownInitiated.getCount() != 0
> }{code}
> 1.when replicaThread has exception which is not fatalExitError, the thread 
> will exit,and run finally logic(countdown the shutdownComplete 
> conutdownLatch),but shutdownInitiated is not be countdown.
> 2.with 1, shutdownInitiated is just not countdown, its value is 1, isRunning 
> logic just judge thread isRuning through shutdownInitiated != 0, so through 
> this method to judge thread status is wrong.
> 3.isRunning method is used in shutdownIdleFetcherThreads, 
> processFetchRequest, controller request send and oher else, maybe cause 
> thread can't be remove and something can not be done
> h2. 2.bugfix
> Just like the following code,countdown shutdownInitiated in finally logic
>  
> {code:java}
> override def run(): Unit = {
>   isStarted = true
>   info("Starting")
>   try {
> while (isRunning)
>   doWork()
>   } catch {
> case e: FatalExitError =>
>   shutdownInitiated.countDown()
>   shutdownComplete.countDown()
>   info("Stopped")
>   Exit.exit(e.statusCode())
> case e: Throwable =>
>   if (isRunning)
> error("Error due to", e)
>   } finally {
> shutdownInitiated.countDown()
> shutdownComplete.countDown()
>   }
>   info("Stopped")
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9903) kafka ShutdownableThread judge thread isRuning status has some bug

2020-04-22 Thread shilin Lu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089936#comment-17089936
 ] 

shilin Lu commented on KAFKA-9903:
--

ok, i think your advice is make sense. i will create a pr for this issue, 
[~guozhang] can you assign this issue to me?thank you !

> kafka ShutdownableThread  judge thread isRuning status has some bug
> ---
>
> Key: KAFKA-9903
> URL: https://issues.apache.org/jira/browse/KAFKA-9903
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.1
>Reporter: shilin Lu
>Priority: Major
> Attachments: image-2020-04-22-21-28-03-154.png
>
>
> h2. 1.bug
> {code:java}
> override def run(): Unit = {
>   isStarted = true
>   info("Starting")
>   try {
> while (isRunning)
>   doWork()
>   } catch {
> case e: FatalExitError =>
>   shutdownInitiated.countDown()
>   shutdownComplete.countDown()
>   info("Stopped")
>   Exit.exit(e.statusCode())
> case e: Throwable =>
>   if (isRunning)
> error("Error due to", e)
>   } finally {
> shutdownInitiated.countDown()
> shutdownComplete.countDown()
>   }
>   info("Stopped")
> }
> def isRunning: Boolean = {
>   shutdownInitiated.getCount() != 0
> }{code}
> 1.when replicaThread has exception which is not fatalExitError, the thread 
> will exit,and run finally logic(countdown the shutdownComplete 
> conutdownLatch),but shutdownInitiated is not be countdown.
> 2.with 1, shutdownInitiated is just not countdown, its value is 1, isRunning 
> logic just judge thread isRuning through shutdownInitiated != 0, so through 
> this method to judge thread status is wrong.
> 3.isRunning method is used in shutdownIdleFetcherThreads, 
> processFetchRequest, controller request send and oher else, maybe cause 
> thread can't be remove and something can not be done
> h2. 2.bugfix
> Just like the following code,countdown shutdownInitiated in finally logic
>  
> {code:java}
> override def run(): Unit = {
>   isStarted = true
>   info("Starting")
>   try {
> while (isRunning)
>   doWork()
>   } catch {
> case e: FatalExitError =>
>   shutdownInitiated.countDown()
>   shutdownComplete.countDown()
>   info("Stopped")
>   Exit.exit(e.statusCode())
> case e: Throwable =>
>   if (isRunning)
> error("Error due to", e)
>   } finally {
> shutdownInitiated.countDown()
> shutdownComplete.countDown()
>   }
>   info("Stopped")
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lushilin opened a new pull request #8535: KAFKA-9903

2020-04-22 Thread GitBox


lushilin opened a new pull request #8535:
URL: https://github.com/apache/kafka/pull/8535


   ShutdownComplete will countdown in the finally block when thread shutdown 
due to an error, and in this case thread is not running.
   So isRunning logic should check isShutdownInitiated and isShutdownComplete.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lushilin commented on issue #8535: KAFKA-9903

2020-04-22 Thread GitBox


lushilin commented on issue #8535:
URL: https://github.com/apache/kafka/pull/8535#issuecomment-617984080


   @guozhangwang @ijuma @junrao PTAL, thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9903) kafka ShutdownableThread judge thread isRuning status has some bug

2020-04-22 Thread shilin Lu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089973#comment-17089973
 ] 

shilin Lu commented on KAFKA-9903:
--

pull request: [https://github.com/apache/kafka/pull/8535] , Please take a look.

> kafka ShutdownableThread  judge thread isRuning status has some bug
> ---
>
> Key: KAFKA-9903
> URL: https://issues.apache.org/jira/browse/KAFKA-9903
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.1
>Reporter: shilin Lu
>Priority: Major
> Attachments: image-2020-04-22-21-28-03-154.png
>
>
> h2. 1.bug
> {code:java}
> override def run(): Unit = {
>   isStarted = true
>   info("Starting")
>   try {
> while (isRunning)
>   doWork()
>   } catch {
> case e: FatalExitError =>
>   shutdownInitiated.countDown()
>   shutdownComplete.countDown()
>   info("Stopped")
>   Exit.exit(e.statusCode())
> case e: Throwable =>
>   if (isRunning)
> error("Error due to", e)
>   } finally {
> shutdownInitiated.countDown()
> shutdownComplete.countDown()
>   }
>   info("Stopped")
> }
> def isRunning: Boolean = {
>   shutdownInitiated.getCount() != 0
> }{code}
> 1.when replicaThread has exception which is not fatalExitError, the thread 
> will exit,and run finally logic(countdown the shutdownComplete 
> conutdownLatch),but shutdownInitiated is not be countdown.
> 2.with 1, shutdownInitiated is just not countdown, its value is 1, isRunning 
> logic just judge thread isRuning through shutdownInitiated != 0, so through 
> this method to judge thread status is wrong.
> 3.isRunning method is used in shutdownIdleFetcherThreads, 
> processFetchRequest, controller request send and oher else, maybe cause 
> thread can't be remove and something can not be done
> h2. 2.bugfix
> Just like the following code,countdown shutdownInitiated in finally logic
>  
> {code:java}
> override def run(): Unit = {
>   isStarted = true
>   info("Starting")
>   try {
> while (isRunning)
>   doWork()
>   } catch {
> case e: FatalExitError =>
>   shutdownInitiated.countDown()
>   shutdownComplete.countDown()
>   info("Stopped")
>   Exit.exit(e.statusCode())
> case e: Throwable =>
>   if (isRunning)
> error("Error due to", e)
>   } finally {
> shutdownInitiated.countDown()
> shutdownComplete.countDown()
>   }
>   info("Stopped")
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #8445: KAFKA-9823: Remember the sent generation for the coordinator request

2020-04-22 Thread GitBox


hachikuji commented on a change in pull request #8445:
URL: https://github.com/apache/kafka/pull/8445#discussion_r413250770



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##
@@ -466,24 +477,193 @@ public void 
testSyncGroupRequestWithFencedInstanceIdException() {
 }
 
 @Test
-public void testHeartbeatUnknownMemberResponseDuringRebalancing() throws 
InterruptedException {
+public void testJoinGroupUnknownMemberResponseWithOldGeneration() throws 
InterruptedException {
 setupCoordinator();
-mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+joinGroup();
 
-final int generation = 1;
+final AbstractCoordinator.Generation currGen = 
coordinator.generation();
 
-mockClient.prepareResponse(joinGroupFollowerResponse(generation, 
memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.NONE));
-mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+RequestFuture future = coordinator.sendJoinGroupRequest();
 
-coordinator.ensureActiveGroup();
+TestUtils.waitForCondition(() -> !mockClient.requests().isEmpty(), 
2000,
+"The join-group request was not sent in time after");

Review comment:
   nit: after.. what? I think you can drop "in time after." Here is the 
assertion that is used:
   ```
   assertThat("Condition not met within timeout " + maxWaitMs + ". 
" + conditionDetails,
   testCondition.conditionMet());
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch opened a new pull request #8536: KAFKA-9883: Add better error message when REST API forwards a request and leader is not known

2020-04-22 Thread GitBox


rhauch opened a new pull request #8536:
URL: https://github.com/apache/kafka/pull/8536


   When the Connect worker forwards a REST API request to the leader, it might 
get back a `RequestTargetException` that suggests the worker should forward the 
request to a different worker. This can happen when the leader changes, and the 
worker that receives the original request forwards the request to the worker 
that it thinks is the current leader, but that worker is not the current 
leader. In this case. In most cases, the worker that received the forwarded 
request includes the URL of the current leader, but it is possible (albeit 
rare) that the worker doesn’t know the current leader and will include a null 
leader URL in the resulting `RequestTargetException`.
   
   When this rare case happens, the user gets a null pointer exception in their 
response and the NPE is logged. Instead, the worker should catch this condition 
and provide a more useful error message that is similar to other existing error 
messages that might occur.
   
   Added a unit test that verifies this corner case is caught and this 
particular NPE does not occur.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9883) Connect request to restart task can result in IllegalArgumentError: "uriTemplate" parameter is null

2020-04-22 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch reassigned KAFKA-9883:


Assignee: Randall Hauch

> Connect request to restart task can result in IllegalArgumentError: 
> "uriTemplate" parameter is null
> ---
>
> Key: KAFKA-9883
> URL: https://issues.apache.org/jira/browse/KAFKA-9883
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Minor
>
> When attempting to restart a connector, the following is logged by Connect:
>  
> {code:java}
> ERROR Uncaught exception in REST call to 
> /connectors/my-connector/tasks/0/restart 
> (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper)
> java.lang.IllegalArgumentException: "uriTemplate" parameter is null.
> at 
> org.glassfish.jersey.uri.internal.JerseyUriBuilder.uri(JerseyUriBuilder.java:189)
> at 
> org.glassfish.jersey.uri.internal.JerseyUriBuilder.uri(JerseyUriBuilder.java:72)
> at javax.ws.rs.core.UriBuilder.fromUri(UriBuilder.java:96)
> at 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:263)
> at 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:298)
> at 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.restartTask(ConnectorsResource.java:218)
> {code}
> Resubmitting the restart REST request will usually resolve the problem.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vinothchandar commented on issue #8534: KAFKA-9512: Backport Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration

2020-04-22 Thread GitBox


vinothchandar commented on issue #8534:
URL: https://github.com/apache/kafka/pull/8534#issuecomment-618019176


   Ran for over 600 times without issues. @guozhangwang ready. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on issue #8482: KAFKA-9863: update the deprecated --zookeeper option in the documentation into --bootstrap-server

2020-04-22 Thread GitBox


cmccabe commented on issue #8482:
URL: https://github.com/apache/kafka/pull/8482#issuecomment-618027980


   LGTM.  Thanks, @showuon , @rondagostino 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on a change in pull request #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch

2020-04-22 Thread GitBox


apovzner commented on a change in pull request #8509:
URL: https://github.com/apache/kafka/pull/8509#discussion_r413337972



##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -1553,6 +1552,179 @@ class KafkaApisTest {
 assertEquals(Errors.INVALID_REQUEST, response.error)
   }
 
+  @Test
+  def testUpdateMetadataRequestWithCurrentBrokerEpoch(): Unit = {
+val currentBrokerEpoch = 1239875L
+testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, 
Errors.NONE)
+  }
+
+  @Test
+  def testUpdateMetadataRequestWithNewerBrokerEpochIsValid(): Unit = {
+val currentBrokerEpoch = 1239875L
+testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1, 
Errors.NONE)
+  }
+
+  @Test
+  def testUpdateMetadataRequestWithStaleBrokerEpochIsRejected(): Unit = {
+val currentBrokerEpoch = 1239875L
+testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1, 
Errors.STALE_BROKER_EPOCH)
+  }
+
+  def testUpdateMetadataRequest(currentBrokerEpoch: Long, 
brokerEpochInRequest: Long, expectedError: Errors): Unit = {
+val updateMetadataRequest = createBasicMetadataRequest("topicA", 1, 
brokerEpochInRequest)
+val request = buildRequest(updateMetadataRequest)
+
+val capturedResponse: Capture[RequestChannel.Response] = 
EasyMock.newCapture()
+
+EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+EasyMock.expect(replicaManager.maybeUpdateMetadataCache(
+  EasyMock.eq(request.context.correlationId),
+  EasyMock.anyObject()
+)).andStubReturn(
+  Seq()
+)
+
+
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+EasyMock.replay(replicaManager, controller, requestChannel)
+
+createKafkaApis().handleUpdateMetadataRequest(request)
+val updateMetadataResponse = readResponse(ApiKeys.UPDATE_METADATA, 
updateMetadataRequest, capturedResponse)
+  .asInstanceOf[UpdateMetadataResponse]
+assertEquals(expectedError, updateMetadataResponse.error())
+EasyMock.verify(replicaManager)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithCurrentBrokerEpoch(): Unit = {
+val currentBrokerEpoch = 1239875L
+testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch, 
Errors.NONE)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithNewerBrokerEpochIsValid(): Unit = {
+val currentBrokerEpoch = 1239875L
+testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch + 1, 
Errors.NONE)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithStaleBrokerEpochIsRejected(): Unit = {
+val currentBrokerEpoch = 1239875L
+testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch - 1, 
Errors.STALE_BROKER_EPOCH)
+  }
+
+  def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest: 
Long, expectedError: Errors): Unit = {
+val controllerId = 2
+val controllerEpoch = 6
+val capturedResponse: Capture[RequestChannel.Response] = 
EasyMock.newCapture()
+val partitionStates = Seq(
+  new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
+.setTopicName("topicW")
+.setPartitionIndex(1)
+.setControllerEpoch(1)
+.setLeader(0)
+.setLeaderEpoch(1)
+.setIsr(asList(0, 1))
+.setZkVersion(2)
+.setReplicas(asList(0, 1, 2))
+.setIsNew(false)
+).asJava
+val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
+  ApiKeys.LEADER_AND_ISR.latestVersion,
+  controllerId,
+  controllerEpoch,
+  brokerEpochInRequest,
+  partitionStates,
+  asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091))
+).build()
+val request = buildRequest(leaderAndIsrRequest)
+val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+  .setErrorCode(Errors.NONE.code)
+  .setPartitionErrors(asList()))
+
+EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+EasyMock.expect(replicaManager.becomeLeaderOrFollower(
+  EasyMock.eq(request.context.correlationId),
+  EasyMock.anyObject(),
+  EasyMock.anyObject()
+)).andStubReturn(
+  response
+)
+
+
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+EasyMock.replay(replicaManager, controller, requestChannel)
+
+createKafkaApis().handleLeaderAndIsrRequest(request)
+val leaderAndIsrResponse = readResponse(ApiKeys.LEADER_AND_ISR, 
leaderAndIsrRequest, capturedResponse)
+  .asInstanceOf[LeaderAndIsrResponse]
+assertEquals(expectedError, leaderAndIsrResponse.error())
+EasyMock.verify(replicaManager)
+  }
+
+  @Test
+  def testStopReplicaRequestWithCurrentBrokerEpoch(): Unit = {
+val currentBrokerEpoch = 1239875L
+testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, 
Errors.NONE)

Review comment:
   thanks for catching this and others below!





This is an automated mes

[GitHub] [kafka] apovzner commented on issue #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch

2020-04-22 Thread GitBox


apovzner commented on issue #8509:
URL: https://github.com/apache/kafka/pull/8509#issuecomment-618044488


   @dajac and @hachikuji thanks for your comments, I addressed them.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on issue #8530: KAFKA-9388: Refactor integration tests to always use different application ids

2020-04-22 Thread GitBox


guozhangwang commented on issue #8530:
URL: https://github.com/apache/kafka/pull/8530#issuecomment-618044905


   The Jenkins failures are due to known flaky tests, I'm going to merge the PR 
as is.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9388) Flaky Test StandbyTaskCreationIntegrationTest.shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables

2020-04-22 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-9388.
--
Fix Version/s: 2.6.0
 Assignee: Guozhang Wang
   Resolution: Fixed

> Flaky Test 
> StandbyTaskCreationIntegrationTest.shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables
> -
>
> Key: KAFKA-9388
> URL: https://issues.apache.org/jira/browse/KAFKA-9388
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.5.0
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.6.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/122/testReport/junit/org.apache.kafka.streams.integration/StandbyTaskCreationIntegrationTest/shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables/]
> {quote}java.lang.AssertionError: Condition not met within timeout 3. At 
> least one client did not reach state RUNNING with active tasks and stand-by 
> tasks: Client 1 is NOT OK, client 2 is NOT OK. at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:24) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:369) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:356) at 
> org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest.waitUntilBothClientAreOK(StandbyTaskCreationIntegrationTest.java:178)
>  at 
> org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest.shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables(StandbyTaskCreationIntegrationTest.java:141){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8369) Generate an immutable Map view for generated messages with a map key

2020-04-22 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17090056#comment-17090056
 ] 

Colin McCabe commented on KAFKA-8369:
-

So, the collection is the way it is to avoid having separate key and value 
classes.  We could possibly provide an option in the JSON to generate separate 
key and value objects if we really want that.

> Generate an immutable Map view for generated messages with a map key
> 
>
> Key: KAFKA-8369
> URL: https://issues.apache.org/jira/browse/KAFKA-8369
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> When using the "mapKey" feature, we get an ImplicitLinkedHashCollection which 
> can be used like a map using the `find()` API. The benefit of this is 
> hopefully avoiding a conversion to another type when handled by the broker, 
> but it is a little cumbersome to work with, so we often end up doing the 
> conversion anyway. One improvement would be to provide a way to convert this 
> collection to an immutable Map view so that it is easier to work with 
> directly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9905) The equals functions for generated classes should compare all fields

2020-04-22 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-9905:
---

 Summary: The equals functions for generated classes should compare 
all fields
 Key: KAFKA-9905
 URL: https://issues.apache.org/jira/browse/KAFKA-9905
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe
Assignee: Colin McCabe


The equals functions for generated classes should compare all fields, to avoid 
confusion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on issue #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch

2020-04-22 Thread GitBox


hachikuji commented on issue #8509:
URL: https://github.com/apache/kafka/pull/8509#issuecomment-618070266


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on issue #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch

2020-04-22 Thread GitBox


hachikuji commented on issue #8509:
URL: https://github.com/apache/kafka/pull/8509#issuecomment-618070328







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on issue #8504: KAFKA-9298: reuse mapped stream error in joins

2020-04-22 Thread GitBox


mjsax commented on issue #8504:
URL: https://github.com/apache/kafka/pull/8504#issuecomment-618073161


   > Ideally, the fix should be to generate a repartition topic name each time 
to avoid such issues. But IMHO that ship has already sailed because by 
introducing a new name generation will cause compatibility issues for existing 
topologies. 
   
   Why that? Because such a topology would hit the bug, it could never be 
deployed, and thus nobody can actually run such a topology? In fact, shouldn't 
be "burn" and index even if a name is provided (IIRC, we do this for some 
cases)?
   
   I agree thought, that merging repartition topics (as proposed in (1)) should 
be done if possible (it's a historic artifact that we did not merge them in the 
past and IMHO we should not make the same mistake again?).
   
   For (2), it's a tricky question because the different names are used for 
different stores and changelog topics  (ie, main purpose?) -- it seems to be a 
"nasty side effect" if we would end up with two repartition topics for this 
case? Of course, given the new `repartition()` operator, a user can work around 
it by using it after `map()` and before calling `join()`. Just brainstorming 
here what the impact could be and what tradeoff we want to pick.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8504: KAFKA-9298: reuse mapped stream error in joins

2020-04-22 Thread GitBox


mjsax commented on a change in pull request #8504:
URL: https://github.com/apache/kafka/pull/8504#discussion_r413380852



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
##
@@ -989,16 +994,18 @@ private void to(final TopicNameExtractor 
topicExtractor,
 null,
 optimizableRepartitionNodeBuilder);
 
-final OptimizableRepartitionNode optimizableRepartitionNode = 
optimizableRepartitionNodeBuilder.build();
-builder.addGraphNode(streamsGraphNode, optimizableRepartitionNode);
+if (repartitionNode == null || !name.equals(repartitionName)) {

Review comment:
   Hmmm... I am wondering if just bumping the index would be sufficient and 
the optimizer would merge the node automatically?
   
   I am also not sure about the code structure: so far, the DSL layer does not 
know much about optimizations (even if we "leak" a little bit into it, as we 
built up the `StreamsGraphNode` graph... We would push some optimization 
decisions into the DSL layer thus spreading out "optimization code"? On the 
other hand, just inserting one `OptimizableRepartitionNode` is much more 
efficient than inserting multiple and let the optimizer remove them later?
   
   I am also wondering, if we could do the same for other repartition topics? 
   
   Last question: this method is also use for stream-table joins and thus, if 
one joins a stream with two tables, would this change be backward incompatible? 
Or would two stream-table joins fail with the same `InvalidTopologyException`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8326: KAFKA-8639: Replace AddPartitionsToTxn with Automated Protocol

2020-04-22 Thread GitBox


guozhangwang commented on a change in pull request #8326:
URL: https://github.com/apache/kafka/pull/8326#discussion_r413374355



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
##
@@ -17,157 +17,109 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH;
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-
 public class AddPartitionsToTxnRequest extends AbstractRequest {
-private static final String TOPICS_KEY_NAME = "topics";
-private static final String PARTITIONS_KEY_NAME = "partitions";
-
-private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema(
-TRANSACTIONAL_ID,
-PRODUCER_ID,
-PRODUCER_EPOCH,
-new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
-TOPIC_NAME,
-new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32,
-"The partitions to add to the transaction."));
-
-/**
- * The version number is bumped to indicate that on quota violation 
brokers send out responses before throttling.
- */
-private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V1 = 
ADD_PARTITIONS_TO_TXN_REQUEST_V0;
-
-public static Schema[] schemaVersions() {
-return new Schema[]{ADD_PARTITIONS_TO_TXN_REQUEST_V0, 
ADD_PARTITIONS_TO_TXN_REQUEST_V1};
-}
+
+public final AddPartitionsToTxnRequestData data;
 
 public static class Builder extends 
AbstractRequest.Builder {
-private final String transactionalId;
-private final long producerId;
-private final short producerEpoch;
-private final List partitions;
+public final AddPartitionsToTxnRequestData data;
 
-public Builder(String transactionalId, long producerId, short 
producerEpoch, List partitions) {
+public Builder(final AddPartitionsToTxnRequestData data) {
 super(ApiKeys.ADD_PARTITIONS_TO_TXN);
-this.transactionalId = transactionalId;
-this.producerId = producerId;
-this.producerEpoch = producerEpoch;
-this.partitions = partitions;
+this.data = data;
+}
+
+public Builder(final String transactionalId,
+   final long producerId,
+   final short producerEpoch,
+   final List partitions) {
+super(ApiKeys.ADD_PARTITIONS_TO_TXN);
+
+Map> partitionMap = new HashMap<>();
+for (TopicPartition topicPartition : partitions) {
+String topicName = topicPartition.topic();
+
+List subPartitions = 
partitionMap.getOrDefault(topicName,
+new ArrayList<>());
+subPartitions.add(topicPartition.partition());
+partitionMap.put(topicName, subPartitions);
+}
+
+AddPartitionsToTxnTopicCollection topics = new 
AddPartitionsToTxnTopicCollection();
+for (Map.Entry> partitionEntry : 
partitionMap.entrySet()) {
+topics.add(new AddPartitionsToTxnTopic()
+   .setName(partitionEntry.getKey())
+   .setPartitions(partitionEntry.getValue()));
+}
+
+this.data = new AddPartitionsToTxnRequestData()
+.setTransactionalId(transactionalId)
+.setProducerId(producerId)
+.setProducerEpoch(producerEpoch)
+.setTopics(topics);
 }
 
 @Override
 public AddPartitionsToTxnRequest build(short version) {
-return new AddPartitionsToTxnRequest(version, transactionalId, 
producerId, producerEpoch, partitions);
+return new AddPartitionsToTxnRequest(data, version);
 }

[GitHub] [kafka] C0urante commented on issue #8069: KAFKA-9374: Make connector interactions asynchronous

2020-04-22 Thread GitBox


C0urante commented on issue #8069:
URL: https://github.com/apache/kafka/pull/8069#issuecomment-618097440


   @ncliang I've made some updates to the PR and rebased on the latest trunk; 
would you be willing to do another pass?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-04-22 Thread GitBox


guozhangwang commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r413417021



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() {
 validateOffsetsAsync(partitionsToValidate);
 }
 
+/**
+ * For each partition which needs validation, make an asynchronous request 
to get the end-offsets for the partition
+ * with the epoch less than or equal to the epoch the partition last saw.
+ *
+ * Requests are grouped by Node for efficiency.
+ */
+private void validateOffsetsAsync(Map partitionsToValidate) {
+final Map> 
regrouped =
+regroupFetchPositionsByLeader(partitionsToValidate);
+
+regrouped.forEach((node, fetchPositions) -> {
+if (node.isEmpty()) {
+metadata.requestUpdate();
+return;
+}
+
+NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
+if (nodeApiVersions == null) {
+client.tryConnect(node);
+return;
+}
+
+if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
+log.debug("Skipping validation of fetch offsets for partitions 
{} since the broker does not " +
+  "support the required protocol version 
(introduced in Kafka 2.3)",
+fetchPositions.keySet());
+completeAllValidations(fetchPositions);
+return;
+}
+
+// We need to get the client epoch state before sending out the 
leader epoch request, and use it to
+// decide whether we need to validate offsets.
+if (!metadata.hasReliableLeaderEpochs()) {
+log.debug("Skipping validation of fetch offsets for partitions 
{} since the provided leader broker " +
+  "is not reliable", fetchPositions.keySet());
+completeAllValidations(fetchPositions);
+return;
+}
+
+subscriptions.setNextAllowedRetry(fetchPositions.keySet(), 
time.milliseconds() + requestTimeoutMs);
+
+RequestFuture 
future =
+offsetsForLeaderEpochClient.sendAsyncRequest(node, 
fetchPositions);
+
+future.addListener(new 
RequestFutureListener() {
+@Override
+public void 
onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) {
+Map 
truncationWithoutResetPolicy = new HashMap<>();
+if (!offsetsResult.partitionsToRetry().isEmpty()) {
+
subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), 
time.milliseconds() + retryBackoffMs);
+metadata.requestUpdate();
+}
+
+// For each OffsetsForLeader response, check if the 
end-offset is lower than our current offset
+// for the partition. If so, it means we have experienced 
log truncation and need to reposition
+// that partition's offset.
+//
+// In addition, check whether the returned offset and 
epoch are valid. If not, then we should treat
+// it as out of range and update metadata for rediscovery.
+offsetsResult.endOffsets().forEach((respTopicPartition, 
respEndOffset) -> {
+if (respEndOffset.hasUndefinedEpochOrOffset()) {
+// Should attempt to find the new leader in the 
next try.
+log.debug("Requesting metadata update for 
partition {} due to undefined epoch or offset {}",

Review comment:
   nit: `... or offset {} from OffsetsForLeaderEpoch response`

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
##
@@ -86,4 +84,9 @@ public boolean equals(Object o) {
 public int hashCode() {
 return Objects.hash(error, leaderEpoch, endOffset);
 }
+
+public boolean hasUndefinedEpochOrOffset() {
+return this.endOffset == UNDEFINED_EPOCH_OFFSET ||

Review comment:
   For my own understanding: if endOffset is UNDEFINED the epoch should 
always be UNDEFINED too? If that's the case we can just rely on `leaderEpoch` 
alone?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() {
 validateOffsetsAsync(partitionsToValidate);
 }
 
+/**
+ * For each partition which needs validation, make an asynchronous request 
to get the end-offsets for the partition
+ * with the epoch less than or equal to the epoch the partition last saw.
+ *
+ * Requests are grouped by Node for efficiency.
+

[GitHub] [kafka] guozhangwang commented on issue #8534: KAFKA-9512: Backport Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration

2020-04-22 Thread GitBox


guozhangwang commented on issue #8534:
URL: https://github.com/apache/kafka/pull/8534#issuecomment-618104185


   Thanks @vinothchandar !!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR

2020-04-22 Thread GitBox


junrao commented on a change in pull request #8524:
URL: https://github.com/apache/kafka/pull/8524#discussion_r413436495



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1068,7 +1068,9 @@ class KafkaController(val config: KafkaConfig,
 val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp 
=> controllerContext.isReplicaOnline(leaderBroker, tp) &&
   controllerContext.partitionsBeingReassigned.isEmpty &&
   !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
-  controllerContext.allTopics.contains(tp.topic))
+  controllerContext.allTopics.contains(tp.topic) &&
+  controllerContext.partitionLeadershipInfo.get(tp).forall(l => 
l.leaderAndIsr.isr.contains(leaderBroker))

Review comment:
   The preferred leader election also checks for live brokers. So, perhaps 
we could just call 
PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection() 
here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9906) Is bytesSinceLastIndexEntry updated correctly when LogSegment.append() is called ?

2020-04-22 Thread Xiang Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiang Zhang updated KAFKA-9906:
---
Summary: Is bytesSinceLastIndexEntry updated correctly when 
LogSegment.append() is called ?  (was: Is bytesSinceLastIndexEntry updated 
correct when LogSegment.append() is called ?)

> Is bytesSinceLastIndexEntry updated correctly when LogSegment.append() is 
> called ?
> --
>
> Key: KAFKA-9906
> URL: https://issues.apache.org/jira/browse/KAFKA-9906
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Xiang Zhang
>Priority: Major
>
> I was reading code in LogSegment.scala and I found the code in 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9906) Is bytesSinceLastIndexEntry updated correct when LogSegment.append() is called ?

2020-04-22 Thread Xiang Zhang (Jira)
Xiang Zhang created KAFKA-9906:
--

 Summary: Is bytesSinceLastIndexEntry updated correct when 
LogSegment.append() is called ?
 Key: KAFKA-9906
 URL: https://issues.apache.org/jira/browse/KAFKA-9906
 Project: Kafka
  Issue Type: Improvement
Reporter: Xiang Zhang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9906) Is bytesSinceLastIndexEntry updated correct when LogSegment.append() is called ?

2020-04-22 Thread Xiang Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiang Zhang updated KAFKA-9906:
---
Description: I was reading code in LogSegment.scala and I found the code in 

> Is bytesSinceLastIndexEntry updated correct when LogSegment.append() is 
> called ?
> 
>
> Key: KAFKA-9906
> URL: https://issues.apache.org/jira/browse/KAFKA-9906
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Xiang Zhang
>Priority: Major
>
> I was reading code in LogSegment.scala and I found the code in 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9906) Is bytesSinceLastIndexEntry updated correctly when LogSegment.append() is called ?

2020-04-22 Thread Xiang Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiang Zhang updated KAFKA-9906:
---
Description: 
I was reading code in LogSegment.scala and I found the code below:

 
{code:java}
def append(largestOffset: Long,
   largestTimestamp: Long,
   shallowOffsetOfMaxTimestamp: Long,
   records: MemoryRecords): Unit = {
  ...
  val appendedBytes = log.append(records)
  if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex.append(largestOffset, physicalPosition)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
bytesSinceLastIndexEntry = 0
  }
  bytesSinceLastIndexEntry += records.sizeInBytes
}

{code}
when bytesSinceLastIndexEntry > indexIntervalBytes, we update the offsetIndex 
and maybe the timeIndex and set bytesSinceLastIndexEntry to zero, which makes 
sense to me because we just update the index. However, following that, 
bytesSinceLastIndexEntry is incremented by records.sizeInBytes, which I find 
confusing since the records are appended before the index are updated. Maybe it 
should work like this :
{code:java}
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
  offsetIndex.append(largestOffset, physicalPosition)
  timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
  bytesSinceLastIndexEntry = 0
} else {
  bytesSinceLastIndexEntry += records.sizeInBytes
}{code}
 Sorry if I misunderstood this.

  was:I was reading code in LogSegment.scala and I found the code in 


> Is bytesSinceLastIndexEntry updated correctly when LogSegment.append() is 
> called ?
> --
>
> Key: KAFKA-9906
> URL: https://issues.apache.org/jira/browse/KAFKA-9906
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Xiang Zhang
>Priority: Major
>
> I was reading code in LogSegment.scala and I found the code below:
>  
> {code:java}
> def append(largestOffset: Long,
>largestTimestamp: Long,
>shallowOffsetOfMaxTimestamp: Long,
>records: MemoryRecords): Unit = {
>   ...
>   val appendedBytes = log.append(records)
>   if (bytesSinceLastIndexEntry > indexIntervalBytes) {
> offsetIndex.append(largestOffset, physicalPosition)
> timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
> bytesSinceLastIndexEntry = 0
>   }
>   bytesSinceLastIndexEntry += records.sizeInBytes
> }
> {code}
> when bytesSinceLastIndexEntry > indexIntervalBytes, we update the offsetIndex 
> and maybe the timeIndex and set bytesSinceLastIndexEntry to zero, which makes 
> sense to me because we just update the index. However, following that, 
> bytesSinceLastIndexEntry is incremented by records.sizeInBytes, which I find 
> confusing since the records are appended before the index are updated. Maybe 
> it should work like this :
> {code:java}
> if (bytesSinceLastIndexEntry > indexIntervalBytes) {
>   offsetIndex.append(largestOffset, physicalPosition)
>   timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
>   bytesSinceLastIndexEntry = 0
> } else {
>   bytesSinceLastIndexEntry += records.sizeInBytes
> }{code}
>  Sorry if I misunderstood this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9906) Is bytesSinceLastIndexEntry updated correctly in LogSegment.append()?

2020-04-22 Thread Xiang Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiang Zhang updated KAFKA-9906:
---
Summary: Is bytesSinceLastIndexEntry updated correctly in 
LogSegment.append()?  (was: Is bytesSinceLastIndexEntry updated correctly when 
LogSegment.append() is called ?)

> Is bytesSinceLastIndexEntry updated correctly in LogSegment.append()?
> -
>
> Key: KAFKA-9906
> URL: https://issues.apache.org/jira/browse/KAFKA-9906
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Xiang Zhang
>Priority: Major
>
> I was reading code in LogSegment.scala and I found the code below:
>  
> {code:java}
> def append(largestOffset: Long,
>largestTimestamp: Long,
>shallowOffsetOfMaxTimestamp: Long,
>records: MemoryRecords): Unit = {
>   ...
>   val appendedBytes = log.append(records)
>   if (bytesSinceLastIndexEntry > indexIntervalBytes) {
> offsetIndex.append(largestOffset, physicalPosition)
> timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
> bytesSinceLastIndexEntry = 0
>   }
>   bytesSinceLastIndexEntry += records.sizeInBytes
> }
> {code}
> when bytesSinceLastIndexEntry > indexIntervalBytes, we update the offsetIndex 
> and maybe the timeIndex and set bytesSinceLastIndexEntry to zero, which makes 
> sense to me because we just update the index. However, following that, 
> bytesSinceLastIndexEntry is incremented by records.sizeInBytes, which I find 
> confusing since the records are appended before the index are updated. Maybe 
> it should work like this :
> {code:java}
> if (bytesSinceLastIndexEntry > indexIntervalBytes) {
>   offsetIndex.append(largestOffset, physicalPosition)
>   timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
>   bytesSinceLastIndexEntry = 0
> } else {
>   bytesSinceLastIndexEntry += records.sizeInBytes
> }{code}
>  Sorry if I misunderstood this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-04-22 Thread GitBox


abbccdda commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r413442881



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() {
 validateOffsetsAsync(partitionsToValidate);
 }
 
+/**
+ * For each partition which needs validation, make an asynchronous request 
to get the end-offsets for the partition
+ * with the epoch less than or equal to the epoch the partition last saw.
+ *
+ * Requests are grouped by Node for efficiency.
+ */
+private void validateOffsetsAsync(Map partitionsToValidate) {
+final Map> 
regrouped =
+regroupFetchPositionsByLeader(partitionsToValidate);
+
+regrouped.forEach((node, fetchPositions) -> {
+if (node.isEmpty()) {
+metadata.requestUpdate();
+return;
+}
+
+NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
+if (nodeApiVersions == null) {
+client.tryConnect(node);
+return;
+}
+
+if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
+log.debug("Skipping validation of fetch offsets for partitions 
{} since the broker does not " +
+  "support the required protocol version 
(introduced in Kafka 2.3)",
+fetchPositions.keySet());
+completeAllValidations(fetchPositions);
+return;
+}
+
+// We need to get the client epoch state before sending out the 
leader epoch request, and use it to
+// decide whether we need to validate offsets.
+if (!metadata.hasReliableLeaderEpochs()) {

Review comment:
   Np, will do





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9857) Failed to build image ducker-ak-openjdk-8 on arm

2020-04-22 Thread jiamei xie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiamei xie updated KAFKA-9857:
--
Component/s: core

> Failed to build image ducker-ak-openjdk-8 on arm
> 
>
> Key: KAFKA-9857
> URL: https://issues.apache.org/jira/browse/KAFKA-9857
> Project: Kafka
>  Issue Type: Bug
>  Components: build, core, system tests
>Reporter: jiamei xie
>Assignee: jiamei xie
>Priority: Major
>
> It failed to build image ducker-ak-openjdk-8 on arm and below is its log. 
> This issue is to fix it.
> kafka/tests/docker$ ./run_tests.sh
> Sending build context to Docker daemon  53.76kB
> Step 1/43 : ARG jdk_version=openjdk:8
> Step 2/43 : FROM $jdk_version
> 8: Pulling from library/openjdk
> no matching manifest for linux/arm64/v8 in the manifest list entries
> docker failed
> + die 'ducker-ak up failed'
> + echo ducker-ak up failed
> ducker-ak up failed
> + exit 1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9857) Failed to build image ducker-ak-openjdk-8 on arm

2020-04-22 Thread jiamei xie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiamei xie updated KAFKA-9857:
--
Component/s: build

> Failed to build image ducker-ak-openjdk-8 on arm
> 
>
> Key: KAFKA-9857
> URL: https://issues.apache.org/jira/browse/KAFKA-9857
> Project: Kafka
>  Issue Type: Bug
>  Components: build, system tests
>Reporter: jiamei xie
>Assignee: jiamei xie
>Priority: Major
>
> It failed to build image ducker-ak-openjdk-8 on arm and below is its log. 
> This issue is to fix it.
> kafka/tests/docker$ ./run_tests.sh
> Sending build context to Docker daemon  53.76kB
> Step 1/43 : ARG jdk_version=openjdk:8
> Step 2/43 : FROM $jdk_version
> 8: Pulling from library/openjdk
> no matching manifest for linux/arm64/v8 in the manifest list entries
> docker failed
> + die 'ducker-ak up failed'
> + echo ducker-ak up failed
> ducker-ak up failed
> + exit 1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9804) Extract ConsumerPerform config into one file

2020-04-22 Thread jiamei xie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiamei xie updated KAFKA-9804:
--
Component/s: tools

> Extract ConsumerPerform config into one file
> 
>
> Key: KAFKA-9804
> URL: https://issues.apache.org/jira/browse/KAFKA-9804
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, tools
>Reporter: jiamei xie
>Assignee: jiamei xie
>Priority: Major
>
> Configs for producer has been  extracted out of PerfConfig  in 
> https://github.com/apache/kafka/pull/3613/commits. The remaining in 
> PerfConfig are configs for consumer.  And ConsumerPerformance also has 
> configs for consumer.  Separating these configs into two classes is not 
> concise.  So we can put all configs into class 
> ConsumerPerformance.ConsumerPerfConfig.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jiameixie commented on issue #8489: KAFKA-9857:Failed to build image ducker-ak-openjdk-8 on arm

2020-04-22 Thread GitBox


jiameixie commented on issue #8489:
URL: https://github.com/apache/kafka/pull/8489#issuecomment-618132157


   @guozhangwang @ijuma @junrao PTAL, thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-04-22 Thread GitBox


abbccdda commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r413468323



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
##
@@ -85,10 +85,6 @@ protected OffsetForEpochResult handleResponse(
 case KAFKA_STORAGE_ERROR:
 case OFFSET_NOT_AVAILABLE:
 case LEADER_NOT_AVAILABLE:
-logger().debug("Attempt to fetch offsets for partition {} 
failed due to {}, retrying.",

Review comment:
   It is exactly the same as the subsequent cases handling.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on issue #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch

2020-04-22 Thread GitBox


hachikuji commented on issue #8509:
URL: https://github.com/apache/kafka/pull/8509#issuecomment-618147106


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8326: KAFKA-8639: Replace AddPartitionsToTxn with Automated Protocol

2020-04-22 Thread GitBox


abbccdda commented on a change in pull request #8326:
URL: https://github.com/apache/kafka/pull/8326#discussion_r413471840



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
##
@@ -17,157 +17,109 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH;
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-
 public class AddPartitionsToTxnRequest extends AbstractRequest {
-private static final String TOPICS_KEY_NAME = "topics";
-private static final String PARTITIONS_KEY_NAME = "partitions";
-
-private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema(
-TRANSACTIONAL_ID,
-PRODUCER_ID,
-PRODUCER_EPOCH,
-new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
-TOPIC_NAME,
-new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32,
-"The partitions to add to the transaction."));
-
-/**
- * The version number is bumped to indicate that on quota violation 
brokers send out responses before throttling.
- */
-private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V1 = 
ADD_PARTITIONS_TO_TXN_REQUEST_V0;
-
-public static Schema[] schemaVersions() {
-return new Schema[]{ADD_PARTITIONS_TO_TXN_REQUEST_V0, 
ADD_PARTITIONS_TO_TXN_REQUEST_V1};
-}
+
+public final AddPartitionsToTxnRequestData data;
 
 public static class Builder extends 
AbstractRequest.Builder {
-private final String transactionalId;
-private final long producerId;
-private final short producerEpoch;
-private final List partitions;
+public final AddPartitionsToTxnRequestData data;
 
-public Builder(String transactionalId, long producerId, short 
producerEpoch, List partitions) {
+public Builder(final AddPartitionsToTxnRequestData data) {
 super(ApiKeys.ADD_PARTITIONS_TO_TXN);
-this.transactionalId = transactionalId;
-this.producerId = producerId;
-this.producerEpoch = producerEpoch;
-this.partitions = partitions;
+this.data = data;
+}
+
+public Builder(final String transactionalId,
+   final long producerId,
+   final short producerEpoch,
+   final List partitions) {
+super(ApiKeys.ADD_PARTITIONS_TO_TXN);
+
+Map> partitionMap = new HashMap<>();
+for (TopicPartition topicPartition : partitions) {
+String topicName = topicPartition.topic();
+
+List subPartitions = 
partitionMap.getOrDefault(topicName,
+new ArrayList<>());
+subPartitions.add(topicPartition.partition());
+partitionMap.put(topicName, subPartitions);
+}
+
+AddPartitionsToTxnTopicCollection topics = new 
AddPartitionsToTxnTopicCollection();
+for (Map.Entry> partitionEntry : 
partitionMap.entrySet()) {
+topics.add(new AddPartitionsToTxnTopic()
+   .setName(partitionEntry.getKey())
+   .setPartitions(partitionEntry.getValue()));
+}
+
+this.data = new AddPartitionsToTxnRequestData()
+.setTransactionalId(transactionalId)
+.setProducerId(producerId)
+.setProducerEpoch(producerEpoch)
+.setTopics(topics);
 }
 
 @Override
 public AddPartitionsToTxnRequest build(short version) {
-return new AddPartitionsToTxnRequest(version, transactionalId, 
producerId, producerEpoch, partitions);
+return new AddPartitionsToTxnRequest(data, version);
 }
 
 

[GitHub] [kafka] abbccdda commented on a change in pull request #8326: KAFKA-8639: Replace AddPartitionsToTxn with Automated Protocol

2020-04-22 Thread GitBox


abbccdda commented on a change in pull request #8326:
URL: https://github.com/apache/kafka/pull/8326#discussion_r413473580



##
File path: 
core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
##
@@ -27,9 +27,9 @@ import org.junit.{Before, Test}
 
 import scala.jdk.CollectionConverters._
 
-class AddPartitionsToTxnRequestTest extends BaseRequestTest {
-  private val topic1 = "foobartopic"
-  val numPartitions = 3
+class AddPartitionsToTxnRequestServerTest extends BaseRequestTest {

Review comment:
   Because its name has a conflict with our added 
`AddPartitionsToTxnRequestTest`, just want to clarify that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8248: KAFKA-9501: convert between active and standby without closing stores

2020-04-22 Thread GitBox


ableegoldman commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r391363596



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##
@@ -132,47 +130,74 @@ private static String getTaskProducerClientId(final 
String threadClientId, final
 partitions
 );
 
-if (threadProducer == null) {
-final String taskProducerClientId = 
getTaskProducerClientId(threadId, taskId);
-final Map producerConfigs = 
config.getProducerConfigs(taskProducerClientId);
-producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
applicationId + "-" + taskId);
-log.info("Creating producer client for task {}", taskId);
-taskProducers.put(taskId, 
clientSupplier.getProducer(producerConfigs));
-}
-
-final RecordCollector recordCollector = new RecordCollectorImpl(
-logContext,
-taskId,
-consumer,
-threadProducer != null ?
-new StreamsProducer(threadProducer, false, logContext, 
applicationId) :
-new StreamsProducer(taskProducers.get(taskId), true, 
logContext, applicationId),
-config.defaultProductionExceptionHandler(),
-
EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
-streamsMetrics
-);
-
-final Task task = new StreamTask(
+createdTasks.add(createStreamTask(
 taskId,
 partitions,
-topology,
 consumer,
-config,
-streamsMetrics,
-stateDirectory,
-cache,
-time,
+logContext,
 stateManager,
-recordCollector
-);
-
-log.trace("Created task {} with assigned partitions {}", taskId, 
partitions);
-createdTasks.add(task);
-createTaskSensor.record();
+topology));
 }
 return createdTasks;
 }
 
+private StreamTask createStreamTask(final TaskId taskId,
+final Set partitions,
+final Consumer 
consumer,
+final LogContext logContext,
+final ProcessorStateManager 
stateManager,
+final ProcessorTopology topology) {
+if (threadProducer == null) {
+final String taskProducerClientId = 
getTaskProducerClientId(threadId, taskId);
+final Map producerConfigs = 
config.getProducerConfigs(taskProducerClientId);
+producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
applicationId + "-" + taskId);
+log.info("Creating producer client for task {}", taskId);
+taskProducers.put(taskId, 
clientSupplier.getProducer(producerConfigs));
+}
+
+final RecordCollector recordCollector = new RecordCollectorImpl(
+logContext,
+taskId,
+consumer,
+threadProducer != null ?
+new StreamsProducer(threadProducer, false, logContext, 
applicationId) :
+   
   new StreamsProducer(taskProducers.get(taskId), true, logContext, 
applicationId),
+config.defaultProductionExceptionHandler(),
+
EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
+streamsMetrics
+);
+
+final StreamTask task = new StreamTask(
+taskId,
+partitions,
+topology,
+consumer,
+config,
+streamsMetrics,
+stateDirectory,
+cache,
+time,
+stateManager,
+recordCollector
+);
+
+log.trace("Created task {} with assigned partitions {}", taskId, 
partitions);
+createTaskSensor.record();
+return task;
+}
+
+StreamTask convertStandbyToActive(final StandbyTask standbyTask,
+  final Set partitions,
+  final Consumer consumer) 
{
+return createStreamTask(
+standbyTask.id,
+partitions,
+consumer,
+getLogContext(standbyTask.id),
+standbyTask.stateMgr,
+standbyTask.topology);

Review comment:
   The `topology` is created but never initialized for a standby, therefore 
we don't need to worry about closing it and can reuse it here





This is an automated message from the Apache Git Service.
To

[GitHub] [kafka] chia7712 commented on issue #5935: KAFKA-7665: Replace BaseConsumerRecord with ConsumerRecord in MM

2020-04-22 Thread GitBox


chia7712 commented on issue #5935:
URL: https://github.com/apache/kafka/pull/5935#issuecomment-618162318


   @huxihx Are you still working on this? I'd like to complete both of KIP and 
this PR :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jiameixie commented on issue #8529: KAFKA-9901:Fix streams_broker_bounce_test error

2020-04-22 Thread GitBox


jiameixie commented on issue #8529:
URL: https://github.com/apache/kafka/pull/8529#issuecomment-618169833


   @guozhangwang @ijuma @junrao PTAL, thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9893) Configurable TCP connection timeout and improve the initial metadata fetch

2020-04-22 Thread Cheng Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Tan updated KAFKA-9893:
-
Summary: Configurable TCP connection timeout and improve the initial 
metadata fetch  (was: Configurable TCP connection timeout for AdminClient)

> Configurable TCP connection timeout and improve the initial metadata fetch
> --
>
> Key: KAFKA-9893
> URL: https://issues.apache.org/jira/browse/KAFKA-9893
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Cheng Tan
>Assignee: Cheng Tan
>Priority: Major
>
> We do not currently allow for connection timeouts to be defined within 
> AdminClient, and as a result rely on the default OS settings to determine 
> whether a broker is inactive before selecting an alternate broker from 
> bootstrap.
> In the case of a connection timeout on initial handshake, and where 
> tcp_syn_retries is the default (6), we won't timeout an unresponsive broker 
> until ~127s - while the client will timeout sooner (~120s).
> Reducing tcp_syn_retries should mitigate the issue depending on the number of 
> unresponsive brokers within the bootstrap, though this will be applied system 
> wide, and it would be good if we could instead configure connection timeouts 
> for AdminClient.
> The use case where this came up was a customer performing DC failover tests 
> with a stretch cluster.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jiameixie commented on issue #8446: KAFKA-9804:Extract consumer configs out of PerfConfig

2020-04-22 Thread GitBox


jiameixie commented on issue #8446:
URL: https://github.com/apache/kafka/pull/8446#issuecomment-618169937


   @guozhangwang @ijuma @junrao PTAL, thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #8069: KAFKA-9374: Make connector interactions asynchronous

2020-04-22 Thread GitBox


C0urante commented on a change in pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#discussion_r413499718



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##
@@ -263,17 +267,20 @@ public boolean startConnector(
 Plugins.compareAndSwapLoaders(savedLoader);
 workerMetricsGroup.recordConnectorStartupFailure();
 statusListener.onFailure(connName, t);
-return false;
+onConnectorStateChange.onCompletion(t, null);
+return;
 }
+workerConnector.transitionTo(initialState, onConnectorStateChange);

Review comment:
   This part still needs some work; it's in an inconsistent state because I 
modified `Worker::startConnector` to have no return value and instead 
communicate all success or failure of the connector startup through the 
callback, but haven't taken care of issues like possibly invoking the callback 
twice (once in this method, and once in the `WorkerConnector` instance), making 
sure to swap plugin classloaders at the right times, and preventing a possible 
race with the check to see if the connector already exists based on whether its 
name is present as a key in the `connectors` map.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9804) Extract ConsumerPerform config into one file

2020-04-22 Thread jiamei xie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiamei xie updated KAFKA-9804:
--
Labels: newbie pull-request-available  (was: )

> Extract ConsumerPerform config into one file
> 
>
> Key: KAFKA-9804
> URL: https://issues.apache.org/jira/browse/KAFKA-9804
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, tools
>Reporter: jiamei xie
>Assignee: jiamei xie
>Priority: Major
>  Labels: newbie, pull-request-available
>
> Configs for producer has been  extracted out of PerfConfig  in 
> https://github.com/apache/kafka/pull/3613/commits. The remaining in 
> PerfConfig are configs for consumer.  And ConsumerPerformance also has 
> configs for consumer.  Separating these configs into two classes is not 
> concise.  So we can put all configs into class 
> ConsumerPerformance.ConsumerPerfConfig.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9893) Configurable TCP connection timeout and improve the initial metadata fetch

2020-04-22 Thread Cheng Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Tan updated KAFKA-9893:
-
Description: 
This issue has two parts:
 # Support TCP connection timeout described in KIP-601
 # Currently, the LeastLoadedNodeProvider might provide an offline/invalid node 
when no nodes provided in --boostrap-server option is not connected. The 
Cluster class shuffled the nodes to balance the initial pressure (I guess) and 
the LeastLoadedNodeProvider will always provide the same node, which is the 
last node after shuffling. Consequently, though we may provide several 
bootstrap servers, we might hit timeout if any of the servers shutdown.

The implementation strategy for 1 is described in KIP-601

The solution for 2 is to implement a round-robin candidate node selection when 
every node is unconnected. We can either
 # shuffle the nodes every time we hit the "no node connected" status
 # keep the status of the nodes' try times and clean the try times after any of 
the nodes gets connected.

 

  was:
We do not currently allow for connection timeouts to be defined within 
AdminClient, and as a result rely on the default OS settings to determine 
whether a broker is inactive before selecting an alternate broker from 
bootstrap.

In the case of a connection timeout on initial handshake, and where 
tcp_syn_retries is the default (6), we won't timeout an unresponsive broker 
until ~127s - while the client will timeout sooner (~120s).

Reducing tcp_syn_retries should mitigate the issue depending on the number of 
unresponsive brokers within the bootstrap, though this will be applied system 
wide, and it would be good if we could instead configure connection timeouts 
for AdminClient.

The use case where this came up was a customer performing DC failover tests 
with a stretch cluster.

 


> Configurable TCP connection timeout and improve the initial metadata fetch
> --
>
> Key: KAFKA-9893
> URL: https://issues.apache.org/jira/browse/KAFKA-9893
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Cheng Tan
>Assignee: Cheng Tan
>Priority: Major
>
> This issue has two parts:
>  # Support TCP connection timeout described in KIP-601
>  # Currently, the LeastLoadedNodeProvider might provide an offline/invalid 
> node when no nodes provided in --boostrap-server option is not connected. The 
> Cluster class shuffled the nodes to balance the initial pressure (I guess) 
> and the LeastLoadedNodeProvider will always provide the same node, which is 
> the last node after shuffling. Consequently, though we may provide several 
> bootstrap servers, we might hit timeout if any of the servers shutdown.
> The implementation strategy for 1 is described in KIP-601
> The solution for 2 is to implement a round-robin candidate node selection 
> when every node is unconnected. We can either
>  # shuffle the nodes every time we hit the "no node connected" status
>  # keep the status of the nodes' try times and clean the try times after any 
> of the nodes gets connected.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on issue #8533: KAFKA-9589: Fixed bug in V2 log validator tests

2020-04-22 Thread GitBox


ijuma commented on issue #8533:
URL: https://github.com/apache/kafka/pull/8533#issuecomment-618192475


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-9907) Switch default build to Scala 2.13

2020-04-22 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-9907:
--

 Summary: Switch default build to Scala 2.13
 Key: KAFKA-9907
 URL: https://issues.apache.org/jira/browse/KAFKA-9907
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
Assignee: Ismael Juma
 Fix For: 2.6.0


Scala 2.13.2 introduces support for suppressing warnings, which makes it 
possible to enable fatal warnings. This is useful enough from a development 
perspective to justify this change.

In addition, Scala 2.13.2 also has a Vector implementation with significant 
performance improvements and encoding of String matches to switches.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >