[GitHub] [kafka] rondagostino commented on a change in pull request #9370: KAFKA-9234: Added Nullability Annotations to Admin

2020-10-05 Thread GitBox


rondagostino commented on a change in pull request #9370:
URL: https://github.com/apache/kafka/pull/9370#discussion_r499595845



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4160,15 +4231,18 @@ void handleFailure(Throwable throwable) {
 }
 
 @Override
-public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List users, 
DescribeUserScramCredentialsOptions options) {
+public @NotNull DescribeUserScramCredentialsResult 
describeUserScramCredentials(
+final @Nullable List<@NotNull String> users,
+final @NotNull DescribeUserScramCredentialsOptions options
+) {
 final KafkaFutureImpl 
dataFuture = new KafkaFutureImpl<>();
 final long now = time.milliseconds();
 Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
 new LeastLoadedNodeProvider()) {
 @Override
 public DescribeUserScramCredentialsRequest.Builder 
createRequest(int timeoutMs) {
 return new DescribeUserScramCredentialsRequest.Builder(
-new 
DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user ->
+new 
DescribeUserScramCredentialsRequestData().setUsers(users == null ? 
Collections.emptyList() : users.stream().map(user ->

Review comment:
   Not 100% sure the parens are needed, but either they are or it increases 
clarity/decreases confusion to add them.
   ```suggestion
   new 
DescribeUserScramCredentialsRequestData().setUsers((users == null ? 
Collections.emptyList() : users).stream().map(user ->
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Fleshgrinder commented on a change in pull request #9370: KAFKA-9234: Added Nullability Annotations to Admin

2020-10-05 Thread GitBox


Fleshgrinder commented on a change in pull request #9370:
URL: https://github.com/apache/kafka/pull/9370#discussion_r499643570



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4160,15 +4231,18 @@ void handleFailure(Throwable throwable) {
 }
 
 @Override
-public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List users, 
DescribeUserScramCredentialsOptions options) {
+public @NotNull DescribeUserScramCredentialsResult 
describeUserScramCredentials(
+final @Nullable List<@NotNull String> users,
+final @NotNull DescribeUserScramCredentialsOptions options
+) {
 final KafkaFutureImpl 
dataFuture = new KafkaFutureImpl<>();
 final long now = time.milliseconds();
 Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
 new LeastLoadedNodeProvider()) {
 @Override
 public DescribeUserScramCredentialsRequest.Builder 
createRequest(int timeoutMs) {
 return new DescribeUserScramCredentialsRequest.Builder(
-new 
DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user ->
+new 
DescribeUserScramCredentialsRequestData().setUsers(users == null ? 
Collections.emptyList() : users.stream().map(user ->

Review comment:
   This way we would also run the empty map through _stream_, which is not 
required. I tried to make the change with as little impact as possible but if I 
were to write it then I would do it as follows:
   
   ```java
   Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) {
   @Override
   public DescribeUserScramCredentialsRequest.Builder 
createRequest(final int timeoutMs) {
   final DescribeUserScramCredentialsRequestData requestData = 
new DescribeUserScramCredentialsRequestData();
   
   if (users != null) {
   final List userNames = new 
ArrayList<>(users.size());
   for (final String user : users) {
   userNames.add(new UserName().setName(user));
   }
   requestData.setUsers(userNames);
   }
   
   return new 
DescribeUserScramCredentialsRequest.Builder(requestData);
   }
   ```
   
   This is more code, yes, and it is not using _stream_ anymore but it is both 
clearer, faster, and allocates less.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9370: KAFKA-9234: Added Nullability Annotations to Admin

2020-10-05 Thread GitBox


rondagostino commented on a change in pull request #9370:
URL: https://github.com/apache/kafka/pull/9370#discussion_r499647441



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4160,15 +4231,18 @@ void handleFailure(Throwable throwable) {
 }
 
 @Override
-public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List users, 
DescribeUserScramCredentialsOptions options) {
+public @NotNull DescribeUserScramCredentialsResult 
describeUserScramCredentials(
+final @Nullable List<@NotNull String> users,
+final @NotNull DescribeUserScramCredentialsOptions options
+) {
 final KafkaFutureImpl 
dataFuture = new KafkaFutureImpl<>();
 final long now = time.milliseconds();
 Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
 new LeastLoadedNodeProvider()) {
 @Override
 public DescribeUserScramCredentialsRequest.Builder 
createRequest(int timeoutMs) {
 return new DescribeUserScramCredentialsRequest.Builder(
-new 
DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user ->
+new 
DescribeUserScramCredentialsRequestData().setUsers(users == null ? 
Collections.emptyList() : users.stream().map(user ->

Review comment:
   I’m thinking it would be best to fix this bug in a separate “MINOR: fix 
potential NPE...” PR since this PR may or may not get merged.  Do you agree? If 
so, either I can do it, or feel free to do it — I’ll go with whichever you wish.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-10-05 Thread GitBox


bbejeck commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-703675565


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-10-05 Thread GitBox


bbejeck commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-703675276


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Fleshgrinder commented on a change in pull request #9370: KAFKA-9234: Added Nullability Annotations to Admin

2020-10-05 Thread GitBox


Fleshgrinder commented on a change in pull request #9370:
URL: https://github.com/apache/kafka/pull/9370#discussion_r499643570



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4160,15 +4231,18 @@ void handleFailure(Throwable throwable) {
 }
 
 @Override
-public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List users, 
DescribeUserScramCredentialsOptions options) {
+public @NotNull DescribeUserScramCredentialsResult 
describeUserScramCredentials(
+final @Nullable List<@NotNull String> users,
+final @NotNull DescribeUserScramCredentialsOptions options
+) {
 final KafkaFutureImpl 
dataFuture = new KafkaFutureImpl<>();
 final long now = time.milliseconds();
 Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
 new LeastLoadedNodeProvider()) {
 @Override
 public DescribeUserScramCredentialsRequest.Builder 
createRequest(int timeoutMs) {
 return new DescribeUserScramCredentialsRequest.Builder(
-new 
DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user ->
+new 
DescribeUserScramCredentialsRequestData().setUsers(users == null ? 
Collections.emptyList() : users.stream().map(user ->

Review comment:
   This way we would also run the empty map through _stream_, which is not 
required. I tried to make the change with as little impact as possible but if I 
were to write it then I would do it as follows:
   
   ```java
   Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) {
   @Override
   public DescribeUserScramCredentialsRequest.Builder 
createRequest(final int timeoutMs) {
   final DescribeUserScramCredentialsRequestData requestData = 
new DescribeUserScramCredentialsRequestData();
   
   if (users != null && !users.isEmpty()) {
   final List userNames = new 
ArrayList<>(users.size());
   for (final String user : users) {
   userNames.add(new UserName().setName(user));
   }
   requestData.setUsers(userNames);
   }
   
   return new 
DescribeUserScramCredentialsRequest.Builder(requestData);
   }
   ```
   
   This is more code, yes, and it is not using _stream_ anymore but it is both 
clearer, faster, and allocates less.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Fleshgrinder commented on a change in pull request #9370: KAFKA-9234: Added Nullability Annotations to Admin

2020-10-05 Thread GitBox


Fleshgrinder commented on a change in pull request #9370:
URL: https://github.com/apache/kafka/pull/9370#discussion_r499654983



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4160,15 +4231,18 @@ void handleFailure(Throwable throwable) {
 }
 
 @Override
-public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List users, 
DescribeUserScramCredentialsOptions options) {
+public @NotNull DescribeUserScramCredentialsResult 
describeUserScramCredentials(
+final @Nullable List<@NotNull String> users,
+final @NotNull DescribeUserScramCredentialsOptions options
+) {
 final KafkaFutureImpl 
dataFuture = new KafkaFutureImpl<>();
 final long now = time.milliseconds();
 Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
 new LeastLoadedNodeProvider()) {
 @Override
 public DescribeUserScramCredentialsRequest.Builder 
createRequest(int timeoutMs) {
 return new DescribeUserScramCredentialsRequest.Builder(
-new 
DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user ->
+new 
DescribeUserScramCredentialsRequestData().setUsers(users == null ? 
Collections.emptyList() : users.stream().map(user ->

Review comment:
   Happy to help, PR incoming. 😊 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on a change in pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered

2020-10-05 Thread GitBox


bbejeck commented on a change in pull request #9237:
URL: https://github.com/apache/kafka/pull/9237#discussion_r499670799



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -633,10 +633,20 @@ public final void addInternalTopic(final String topicName,
 }
 
 public final void copartitionSources(final Collection sourceNodes) 
{
-copartitionSourceGroups.add(Collections.unmodifiableSet(new 
HashSet<>(sourceNodes)));
+copartitionSourceGroups.add(new HashSet<>(sourceNodes));
 }
 
-public void validateCopartition() {
+public final void maybeUpdateCopartitionSourceGroups(final String 
replacedNodeName,
+ final String 
optimizedNodeName) {
+for (final Set copartitionSourceGroup : 
copartitionSourceGroups) {
+if (copartitionSourceGroup.contains(replacedNodeName)) {
+copartitionSourceGroup.remove(replacedNodeName);
+copartitionSourceGroup.add(optimizedNodeName);
+}
+}
+}
+
+public synchronized void validateCopartition() {

Review comment:
   nit: I think we can remove `synchronized` here as well





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9369: KAFKA-4715: Ignore case of CompressionType and OffsetResetStrategy

2020-10-05 Thread GitBox


vvcephei commented on pull request #9369:
URL: https://github.com/apache/kafka/pull/9369#issuecomment-703701703


   Thanks for the PR @Fleshgrinder ,
   
   The code change looks good to me. Should we have some tests for this?
   
   Thanks!
   -John



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Fleshgrinder commented on pull request #9369: KAFKA-4715: Ignore case of CompressionType and OffsetResetStrategy

2020-10-05 Thread GitBox


Fleshgrinder commented on pull request #9369:
URL: https://github.com/apache/kafka/pull/9369#issuecomment-703703414


   > The code change looks good to me. Should we have some tests for this?
   
   Always I'd say. 😊 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Fleshgrinder opened a new pull request #9374: MINOR: Fix NPE in KafkaAdminClient.describeUserScramCredentials

2020-10-05 Thread GitBox


Fleshgrinder opened a new pull request #9374:
URL: https://github.com/apache/kafka/pull/9374


   This bug was detected as part of #9370 and @rondagostino and I decided that 
this should be fixed right away and not until the other PR eventually gets 
merged (or not). I am not only properly handling `null` everywhere but also 
rewrote the function to be more readable (and more efficient, but I think this 
is not important here). I extended the existing test to go through all possible 
permutations of the users argument to make sure that we never get an NPE.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Fleshgrinder commented on a change in pull request #9374: MINOR: Fix NPE in KafkaAdminClient.describeUserScramCredentials

2020-10-05 Thread GitBox


Fleshgrinder commented on a change in pull request #9374:
URL: https://github.com/apache/kafka/pull/9374#discussion_r499687848



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4166,10 +4167,22 @@ public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List

Review comment:
   `users.stream()` is the NPE source





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on a change in pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered

2020-10-05 Thread GitBox


lkokhreidze commented on a change in pull request #9237:
URL: https://github.com/apache/kafka/pull/9237#discussion_r499691112



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -633,10 +633,20 @@ public final void addInternalTopic(final String topicName,
 }
 
 public final void copartitionSources(final Collection sourceNodes) 
{
-copartitionSourceGroups.add(Collections.unmodifiableSet(new 
HashSet<>(sourceNodes)));
+copartitionSourceGroups.add(new HashSet<>(sourceNodes));
 }
 
-public void validateCopartition() {
+public final void maybeUpdateCopartitionSourceGroups(final String 
replacedNodeName,
+ final String 
optimizedNodeName) {
+for (final Set copartitionSourceGroup : 
copartitionSourceGroups) {
+if (copartitionSourceGroup.contains(replacedNodeName)) {
+copartitionSourceGroup.remove(replacedNodeName);
+copartitionSourceGroup.add(optimizedNodeName);
+}
+}
+}
+
+public synchronized void validateCopartition() {

Review comment:
   Sorry, somehow missed this one. On it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on a change in pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered

2020-10-05 Thread GitBox


lkokhreidze commented on a change in pull request #9237:
URL: https://github.com/apache/kafka/pull/9237#discussion_r499692083



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -633,10 +633,20 @@ public final void addInternalTopic(final String topicName,
 }
 
 public final void copartitionSources(final Collection sourceNodes) 
{
-copartitionSourceGroups.add(Collections.unmodifiableSet(new 
HashSet<>(sourceNodes)));
+copartitionSourceGroups.add(new HashSet<>(sourceNodes));
 }
 
-public void validateCopartition() {
+public final void maybeUpdateCopartitionSourceGroups(final String 
replacedNodeName,
+ final String 
optimizedNodeName) {
+for (final Set copartitionSourceGroup : 
copartitionSourceGroups) {
+if (copartitionSourceGroup.contains(replacedNodeName)) {
+copartitionSourceGroup.remove(replacedNodeName);
+copartitionSourceGroup.add(optimizedNodeName);
+}
+}
+}
+
+public synchronized void validateCopartition() {

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RamanVerma commented on a change in pull request #9364: KAFKA-10471 Mark broker crash during log loading as unclean shutdown

2020-10-05 Thread GitBox


RamanVerma commented on a change in pull request #9364:
URL: https://github.com/apache/kafka/pull/9364#discussion_r499715151



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -244,7 +244,8 @@ class Log(@volatile private var _dir: File,
   val producerIdExpirationCheckIntervalMs: Int,
   val topicPartition: TopicPartition,
   val producerStateManager: ProducerStateManager,
-  logDirFailureChannel: LogDirFailureChannel) extends Logging with 
KafkaMetricsGroup {
+  logDirFailureChannel: LogDirFailureChannel,
+  val hadCleanShutdown: Boolean = true) extends Logging with 
KafkaMetricsGroup {

Review comment:
   Done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10574) Infinite loop in SimpleHeaderConverter and Values classes

2020-10-05 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-10574:
-

 Summary: Infinite loop in SimpleHeaderConverter and Values classes
 Key: KAFKA-10574
 URL: https://issues.apache.org/jira/browse/KAFKA-10574
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.5.1, 2.6.0, 2.4.1, 2.5.0, 2.3.1, 2.4.0, 2.2.2, 2.2.1, 
2.3.0, 2.1.1, 2.2.0, 2.1.0, 2.0.1, 2.0.0, 1.1.1, 1.1.0, 1.1.2, 2.0.2
Reporter: Chris Egerton
Assignee: Chris Egerton


A header value with the byte sequence {{0xEF, 0xBF, 0xBF}} will cause an 
infinite loop in the {{Values::parseString}} method. Since that method is 
invoked by the default header converter ({{SimpleHeaderConverter}}), any sink 
record with that byte array will, by default, cause a sink task reading that 
record to stall forever.

This occurs because that byte sequence, when parsed as a UTF-8 string and then 
read by a 
[StringCharacterIterator|https://docs.oracle.com/javase/8/docs/api/java/text/StringCharacterIterator.html],
 causes the 
[CharacterIterator.DONE|https://docs.oracle.com/javase/8/docs/api/java/text/CharacterIterator.html#DONE]
 character to be returned from 
[StringCharacterIterator::current|https://docs.oracle.com/javase/8/docs/api/java/text/StringCharacterIterator.html#current--],
 
[StringCharacterIterator::next|https://docs.oracle.com/javase/8/docs/api/java/text/StringCharacterIterator.html#next--],
 etc., and a check for that character is used by the {{Values}} class for its 
parsing logic.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10574) Infinite loop in SimpleHeaderConverter and Values classes

2020-10-05 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208163#comment-17208163
 ] 

Chris Egerton edited comment on KAFKA-10574 at 10/5/20, 4:18 PM:
-

This can be reproduced by adding the following test case to the {{ValuesTest}} 
class (beware that running this test will cause an infinite loop and it will 
need to be terminated manually):

{code:java}
@Test
public void shouldNotEncounterInfiniteLoop() {
byte[] bytes = new byte[] { -17, -65,  -65 };
String str = new String(bytes, StandardCharsets.UTF_8);
Values.parseString(str);
}
{code}



was (Author: chrisegerton):
This can be reproduced by adding the following test case to the {{ValuesTest}} 
class:

 
{code:java}
@Test
public void shouldNotEncounterInfiniteLoop() {
byte[] bytes = new byte[] { -17, -65,  -65 };
String str = new String(bytes, StandardCharsets.UTF_8);
Values.parseString(str);
}
{code}


> Infinite loop in SimpleHeaderConverter and Values classes
> -
>
> Key: KAFKA-10574
> URL: https://issues.apache.org/jira/browse/KAFKA-10574
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0, 1.1.1, 1.1.2, 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 
> 2.0.2, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> A header value with the byte sequence {{0xEF, 0xBF, 0xBF}} will cause an 
> infinite loop in the {{Values::parseString}} method. Since that method is 
> invoked by the default header converter ({{SimpleHeaderConverter}}), any sink 
> record with that byte array will, by default, cause a sink task reading that 
> record to stall forever.
> This occurs because that byte sequence, when parsed as a UTF-8 string and 
> then read by a 
> [StringCharacterIterator|https://docs.oracle.com/javase/8/docs/api/java/text/StringCharacterIterator.html],
>  causes the 
> [CharacterIterator.DONE|https://docs.oracle.com/javase/8/docs/api/java/text/CharacterIterator.html#DONE]
>  character to be returned from 
> [StringCharacterIterator::current|https://docs.oracle.com/javase/8/docs/api/java/text/StringCharacterIterator.html#current--],
>  
> [StringCharacterIterator::next|https://docs.oracle.com/javase/8/docs/api/java/text/StringCharacterIterator.html#next--],
>  etc., and a check for that character is used by the {{Values}} class for its 
> parsing logic.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] RamanVerma commented on a change in pull request #9364: KAFKA-10471 Mark broker crash during log loading as unclean shutdown

2020-10-05 Thread GitBox


RamanVerma commented on a change in pull request #9364:
URL: https://github.com/apache/kafka/pull/9364#discussion_r499718784



##
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##
@@ -4447,9 +4504,10 @@ class LogTest {
 
   private def recoverAndCheck(config: LogConfig,
   expectedKeys: Iterable[Long],
-  expectDeletedFiles: Boolean = true): Log = {
+  expectDeletedFiles: Boolean = true,

Review comment:
   I think you meant the `hadCleanShutdown` parameter I added. I did not 
add the `expectDeletedFiles` parameter.
   I will remove the new parameter and add a comment here to indicate that the 
method always assumes we had a hard reset.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10574) Infinite loop in SimpleHeaderConverter and Values classes

2020-10-05 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208163#comment-17208163
 ] 

Chris Egerton commented on KAFKA-10574:
---

This can be reproduced by adding the following test case to the {{ValuesTest}} 
class:

 
{code:java}
@Test
public void shouldNotEncounterInfiniteLoop() {
byte[] bytes = new byte[] { -17, -65,  -65 };
String str = new String(bytes, StandardCharsets.UTF_8);
Values.parseString(str);
}
{code}


> Infinite loop in SimpleHeaderConverter and Values classes
> -
>
> Key: KAFKA-10574
> URL: https://issues.apache.org/jira/browse/KAFKA-10574
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0, 1.1.1, 1.1.2, 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 
> 2.0.2, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> A header value with the byte sequence {{0xEF, 0xBF, 0xBF}} will cause an 
> infinite loop in the {{Values::parseString}} method. Since that method is 
> invoked by the default header converter ({{SimpleHeaderConverter}}), any sink 
> record with that byte array will, by default, cause a sink task reading that 
> record to stall forever.
> This occurs because that byte sequence, when parsed as a UTF-8 string and 
> then read by a 
> [StringCharacterIterator|https://docs.oracle.com/javase/8/docs/api/java/text/StringCharacterIterator.html],
>  causes the 
> [CharacterIterator.DONE|https://docs.oracle.com/javase/8/docs/api/java/text/CharacterIterator.html#DONE]
>  character to be returned from 
> [StringCharacterIterator::current|https://docs.oracle.com/javase/8/docs/api/java/text/StringCharacterIterator.html#current--],
>  
> [StringCharacterIterator::next|https://docs.oracle.com/javase/8/docs/api/java/text/StringCharacterIterator.html#next--],
>  etc., and a check for that character is used by the {{Values}} class for its 
> parsing logic.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] RamanVerma commented on a change in pull request #9364: KAFKA-10471 Mark broker crash during log loading as unclean shutdown

2020-10-05 Thread GitBox


RamanVerma commented on a change in pull request #9364:
URL: https://github.com/apache/kafka/pull/9364#discussion_r499724455



##
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##
@@ -2882,11 +2953,8 @@ class LogTest {
 records.foreach(segment.append _)
 segment.close()
 
-// Create clean shutdown file so that we do not split during the load
-createCleanShutdownFile()
-
 val logConfig = LogTest.createLogConfig(indexIntervalBytes = 1, 
fileDeleteDelayMs = 1000)
-val log = createLog(logDir, logConfig, recoveryPoint = Long.MaxValue)
+val log = createLog(logDir, logConfig, recoveryPoint = Long.MaxValue, 
lastShutdownClean = true)

Review comment:
   Removed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


wcarlson5 commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499729181



##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public interface StreamsUncaughtExceptionHandler {
+/**
+ * Inspect a record and the exception received.
+ * @param exception the actual exception
+ */
+StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse 
handle(final Exception exception);

Review comment:
   Hmmm that make sense mostly. However it seems to handle the application 
shutdown case we need to the keep the thread alive to trigger a rebalance. 
Could we somehow remove the option of shutdown for some cases?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RamanVerma commented on a change in pull request #9364: KAFKA-10471 Mark broker crash during log loading as unclean shutdown

2020-10-05 Thread GitBox


RamanVerma commented on a change in pull request #9364:
URL: https://github.com/apache/kafka/pull/9364#discussion_r499731061



##
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##
@@ -3073,9 +3139,8 @@ class LogTest {
 // check if recovery was attempted. Even if the recovery point is 0L, 
recovery should not be attempted as the
 // clean shutdown file exists.
 recoveryPoint = log.logEndOffset
-log = createLog(logDir, logConfig)
+log = createLog(logDir, logConfig, lastShutdownClean = true)

Review comment:
   Removed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


wcarlson5 commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499732219



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
##
@@ -54,6 +54,8 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 if (assignmentErrorCode.get() == 
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
 log.error("Received error code {}", assignmentErrorCode.get());
 throw new MissingSourceTopicException("One or more source topics 
were missing during rebalance");
+} else if (assignmentErrorCode.get() == 
AssignorError.SHUTDOWN_REQUESTED.code()) {
+streamThread.shutdown(); //TODO: 663 should set client to error if 
all streams are dead

Review comment:
   that is legacy, removed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


wcarlson5 commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499735461



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -616,7 +616,7 @@ public void 
shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState(
 final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
 streams.start();
 try {
-streams.setUncaughtExceptionHandler(null);
+
streams.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler) null);

Review comment:
   The test isn't really related to null its check to see if the client is 
in the correct state to set the handler. 
   
   The reason I added the cast is because the complier complains otherwise. I 
added the same test for the new handler below. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


wcarlson5 commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499737675



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -529,8 +541,7 @@ public void run() {
 }
 }
 
-log.error("Encountered the following exception during processing " 
+
-"and the thread is going to shut down: ", e);
+handleStreamsUncaughtException(e);
 throw e;

Review comment:
   you are right. It caused a problem with Clients with a single thread as 
well I moved it into run loop and this resolved the problems





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante opened a new pull request #9375: KAFKA-10574: Fix infinite loop in Values::parseString

2020-10-05 Thread GitBox


C0urante opened a new pull request #9375:
URL: https://github.com/apache/kafka/pull/9375


   [Jira](https://issues.apache.org/jira/browse/KAFKA-10574)
   
   The special byte sequence `0xEF, 0xBF, 0xBF`, when parsed as a UTF-8 string, 
causes the `StringCharacterIterator` to return `CharacterIterator.DONE` from 
its `next()`, `current()`, and other similar methods. This caused an infinite 
loop in the `Values` class whenever that byte sequence was encountered.
   
   The fix is pretty simple. To see if we're at the end of a string, we compare 
`StandardCharacterIterator::getIndex` to 
`StandardCharacterIterator::getEndIndex`, instead of comparing the last-read 
character to `CharacterIterator.DONE` (since that character may occur in 
strings that we're parsing).
   
   I've added a unit test that replicates this bug (when the fix in the 
`Values` class is not present). I gave it a timeout of five seconds in case 
someone accidentally re-creates the infinite loop in order to save some time 
and potential confusion.
   
   All existing unit tests for the `Values` class pass with this change.
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #9375: KAFKA-10574: Fix infinite loop in Values::parseString

2020-10-05 Thread GitBox


C0urante commented on pull request #9375:
URL: https://github.com/apache/kafka/pull/9375#issuecomment-703756981


   @rhauch @kkonstantine can one of you take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


wcarlson5 commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499739384



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -436,6 +496,8 @@ private void maybeSetError() {
 }
 
 if (setState(State.ERROR)) {
+metrics.close();

Review comment:
   Maybe when we transition out of error we should restart them. I don't 
think the metrics thread really matters, but I closed the the state cleaner bc 
@cadonna thought it might cause state loss we would want to avoid





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


wcarlson5 commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499739935



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -550,6 +561,10 @@ void runLoop() {
 // until the rebalance is completed before we close and commit the 
tasks
 while (isRunning() || taskManager.isRebalanceInProgress()) {
 try {
+if (shutdownRequested.get()) {
+sendShutdownRequest(shutdownTypeRequested);
+return;

Review comment:
   you are right again. See previous reponse





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


wcarlson5 commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499740251



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
+ * terminates due to an uncaught exception.
+ *
+ * @param eh the uncaught exception handler of type {@link 
StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes 
the current handler
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler eh) {

Review comment:
   should I also change the old handler? it uses the same name





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


wcarlson5 commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499741079



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
##
@@ -82,11 +83,21 @@ public SubscriptionInfo(final int version,
 final UUID processId,
 final String userEndPoint,
 final Map taskOffsetSums) {
+this(version, latestSupportedVersion, processId, userEndPoint, 
taskOffsetSums, new AtomicInteger(0));
+}
+
+public SubscriptionInfo(final int version,
+final int latestSupportedVersion,
+final UUID processId,
+final String userEndPoint,
+final Map taskOffsetSums,
+final AtomicInteger shutdownRequested) {

Review comment:
   sure





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on a change in pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered

2020-10-05 Thread GitBox


lkokhreidze commented on a change in pull request #9237:
URL: https://github.com/apache/kafka/pull/9237#discussion_r485886423



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
##
@@ -0,0 +1,242 @@
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(value = Parameterized.class)
+@Category({IntegrationTest.class})
+public class StreamTableJoinTopologyOptimizationIntegrationTest {

Review comment:
   There's already another `StreamTableIntegrationTest` present, but it 
works with `TopologyTestDriver` so I thought it would be better and easier to 
keep them separate.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10559) Don't shutdown the entire app upon TimeoutException during internal topic validation

2020-10-05 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208194#comment-17208194
 ] 

Sagar Rao commented on KAFKA-10559:
---

hey [~ableegoldman], I can pick this one if needed? Is there anything more that 
you would want to add apart ffrom the nicely worded description? 

> Don't shutdown the entire app upon TimeoutException during internal topic 
> validation
> 
>
> Key: KAFKA-10559
> URL: https://issues.apache.org/jira/browse/KAFKA-10559
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.7.0
>
>
> During some of the KIP-572 work, we made things pretty brittle by changing 
> the StreamsPartitionAssignor to send the `INCOMPLETE_SOURCE_TOPIC_METADATA` 
> error code and shut down the entire application if a TimeoutException is hit 
> during the internal topic creation/validation.
> Internal topic validation occurs during every rebalance, and we have seen it 
> time out on topic discovery in unstable environments. So shutting down the 
> entire application seems like a step in the wrong direction, and antithetical 
> to the goal of KIP-572 (improving the resiliency of Streams in the face of 
> TimeoutExceptions)
> I'm not totally sure what the previous behavior was, but it seems to me we 
> have three options:
>  # Rethrow the TimeoutException and allow it to kill the thread
>  # Swallow the TimeoutException and retry the rebalance indefinitely
>  # Some combination of the above: swallow the TimeoutException but don't 
> retry indefinitely:
>  ## Start a timer and allow retrying rebalances for up the configured 
> task.timeout.ms, the timeout config introduced in KIP-572
>  ## Retry for some constant number of rebalances
> I think if we go with option 3, then shutting down the entire application is 
> relatively more palatable, as we have given the environment a chance to 
> stabilize.
> But, killing the thread still seems preferable, given the two new features 
> that are coming out soon: the ability to start up new threads, and the 
> improved exception handler that allows the user to choose to shut down the 
> entire application if that's really what they want. Once users have this 
> level of control over the application, we should allow them to decide how 
> they want to handle exceptional cases like this, rather than forcing an 
> option on them (eg shutdown everything) 
>  
> Imo we should fix this before 2.7 comes out, even if it's just a partial fix 
> (eg we do option 1 in 2.7, but plan to implement option 3 eventually)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-7334) Suggest changing config for state.dir in case of FileNotFoundException

2020-10-05 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-7334:
---
Labels: newbie  (was: )

> Suggest changing config for state.dir in case of FileNotFoundException
> --
>
> Key: KAFKA-7334
> URL: https://issues.apache.org/jira/browse/KAFKA-7334
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Priority: Major
>  Labels: newbie
>
> Quoting stack trace from KAFKA-5998 :
> {code}
> WARN [2018-08-22 03:17:03,745] 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager: task 
> [0_45] Failed to write offset checkpoint file to 
> /tmp/kafka-streams/
> {{ /0_45/.checkpoint: {}}}
> {{ ! java.nio.file.NoSuchFileException: 
> /tmp/kafka-streams//0_45/.checkpoint.tmp}}
> {{ ! at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)}}
> {{ ! at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)}}
> {code}
> When state.dir is left at default configuration, there is a chance that 
> certain files under the state directory are cleaned by OS since the default 
> dir starts with /tmp/kafka-streams.
> [~mjsax] and I proposed to suggest user, through exception message, to change 
> the location for state.dir .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on a change in pull request #9375: KAFKA-10574: Fix infinite loop in Values::parseString

2020-10-05 Thread GitBox


kkonstantine commented on a change in pull request #9375:
URL: https://github.com/apache/kafka/pull/9375#discussion_r499750739



##
File path: 
connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
##
@@ -73,6 +76,15 @@
 INT_LIST.add(-987654321);
 }
 
+@Test(timeout = 5000)
+public void shouldNotEncounterInfiniteLoop() {
+byte[] bytes = new byte[] { -17, -65,  -65 };

Review comment:
   we need a comment here to explain things. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #9375: KAFKA-10574: Fix infinite loop in Values::parseString

2020-10-05 Thread GitBox


C0urante commented on a change in pull request #9375:
URL: https://github.com/apache/kafka/pull/9375#discussion_r499754411



##
File path: 
connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
##
@@ -73,6 +76,15 @@
 INT_LIST.add(-987654321);
 }
 
+@Test(timeout = 5000)
+public void shouldNotEncounterInfiniteLoop() {
+byte[] bytes = new byte[] { -17, -65,  -65 };

Review comment:
   👍  good call, will add





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #9375: KAFKA-10574: Fix infinite loop in Values::parseString

2020-10-05 Thread GitBox


C0urante commented on a change in pull request #9375:
URL: https://github.com/apache/kafka/pull/9375#discussion_r499754411



##
File path: 
connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
##
@@ -73,6 +76,15 @@
 INT_LIST.add(-987654321);
 }
 
+@Test(timeout = 5000)
+public void shouldNotEncounterInfiniteLoop() {
+byte[] bytes = new byte[] { -17, -65,  -65 };

Review comment:
   👍   good call, will add 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #9375: KAFKA-10574: Fix infinite loop in Values::parseString

2020-10-05 Thread GitBox


C0urante commented on pull request #9375:
URL: https://github.com/apache/kafka/pull/9375#issuecomment-703775022


   Thanks @kkonstantine, I've added a comment and addressed the Checkstyle 
issues.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10436) Implement KIP-478 Topology changes

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10436:
-
Component/s: streams

> Implement KIP-478 Topology changes
> --
>
> Key: KAFKA-10436
> URL: https://issues.apache.org/jira/browse/KAFKA-10436
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10437:
-
Component/s: streams-test-utils

> KIP-478: Implement test-utils changes
> -
>
> Key: KAFKA-10437
> URL: https://issues.apache.org/jira/browse/KAFKA-10437
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, streams-test-utils
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10437:
-
Component/s: (was: streams-test-utils)

> KIP-478: Implement test-utils changes
> -
>
> Key: KAFKA-10437
> URL: https://issues.apache.org/jira/browse/KAFKA-10437
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10437:
-
Component/s: streams

> KIP-478: Implement test-utils changes
> -
>
> Key: KAFKA-10437
> URL: https://issues.apache.org/jira/browse/KAFKA-10437
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10535) KIP-478: Implement StateStoreContext and Record

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10535:
-
Component/s: streams

> KIP-478: Implement StateStoreContext and Record
> ---
>
> Key: KAFKA-10535
> URL: https://issues.apache.org/jira/browse/KAFKA-10535
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10535) KIP-478: Implement StateStoreContext and Record

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10535:
-
Fix Version/s: 2.7.0

> KIP-478: Implement StateStoreContext and Record
> ---
>
> Key: KAFKA-10535
> URL: https://issues.apache.org/jira/browse/KAFKA-10535
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10536) KIP-478: Implement KStream changes

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10536:
-
Component/s: streams

> KIP-478: Implement KStream changes
> --
>
> Key: KAFKA-10536
> URL: https://issues.apache.org/jira/browse/KAFKA-10536
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10535) KIP-478: Implement StateStoreContext and Record

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10535.
--
Resolution: Fixed

> KIP-478: Implement StateStoreContext and Record
> ---
>
> Key: KAFKA-10535
> URL: https://issues.apache.org/jira/browse/KAFKA-10535
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10536) KIP-478: Implement KStream changes

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10536:
-
Fix Version/s: 2.7.0

> KIP-478: Implement KStream changes
> --
>
> Key: KAFKA-10536
> URL: https://issues.apache.org/jira/browse/KAFKA-10536
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10437:
-
Fix Version/s: 2.7.0

> KIP-478: Implement test-utils changes
> -
>
> Key: KAFKA-10437
> URL: https://issues.apache.org/jira/browse/KAFKA-10437
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10537) Convert KStreamImpl filters to new PAPI

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10537:
-
Component/s: streams

> Convert KStreamImpl filters to new PAPI
> ---
>
> Key: KAFKA-10537
> URL: https://issues.apache.org/jira/browse/KAFKA-10537
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10542) Convert KTable maps to new PAPI

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10542:
-
Component/s: streams

> Convert KTable maps to new PAPI
> ---
>
> Key: KAFKA-10542
> URL: https://issues.apache.org/jira/browse/KAFKA-10542
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10543) Convert KTable joins to new PAPI

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10543:
-
Component/s: streams

> Convert KTable joins to new PAPI
> 
>
> Key: KAFKA-10543
> URL: https://issues.apache.org/jira/browse/KAFKA-10543
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10541) Convert KTable filters to new PAPI

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10541:
-
Component/s: streams

> Convert KTable filters to new PAPI
> --
>
> Key: KAFKA-10541
> URL: https://issues.apache.org/jira/browse/KAFKA-10541
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10540) Convert KStream aggregations to new PAPI

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10540:
-
Component/s: streams

> Convert KStream aggregations to new PAPI
> 
>
> Key: KAFKA-10540
> URL: https://issues.apache.org/jira/browse/KAFKA-10540
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10539) Convert KStreamImpl joins to new PAPI

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10539:
-
Component/s: streams

> Convert KStreamImpl joins to new PAPI
> -
>
> Key: KAFKA-10539
> URL: https://issues.apache.org/jira/browse/KAFKA-10539
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10538) Convert KStreamImpl maps to new PAPI

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10538:
-
Component/s: streams

> Convert KStreamImpl maps to new PAPI
> 
>
> Key: KAFKA-10538
> URL: https://issues.apache.org/jira/browse/KAFKA-10538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10544) Convert KTable aggregations to new PAPI

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10544:
-
Component/s: streams

> Convert KTable aggregations to new PAPI
> ---
>
> Key: KAFKA-10544
> URL: https://issues.apache.org/jira/browse/KAFKA-10544
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10546) KIP-478: Deprecate old PAPI

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10546:
-
Component/s: streams

> KIP-478: Deprecate old PAPI
> ---
>
> Key: KAFKA-10546
> URL: https://issues.apache.org/jira/browse/KAFKA-10546
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> Can't be done until after the DSL internals are migrated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10562) Delegate the store wrappers to the new init method

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10562:
-
Component/s: streams

> Delegate the store wrappers to the new init method
> --
>
> Key: KAFKA-10562
> URL: https://issues.apache.org/jira/browse/KAFKA-10562
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10562) KIP-478: Delegate the store wrappers to the new init method

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10562:
-
Summary: KIP-478: Delegate the store wrappers to the new init method  (was: 
Delegate the store wrappers to the new init method)

> KIP-478: Delegate the store wrappers to the new init method
> ---
>
> Key: KAFKA-10562
> URL: https://issues.apache.org/jira/browse/KAFKA-10562
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10530) kafka-streams-application-reset misses some internal topics

2020-10-05 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10530.
--
Resolution: Duplicate

Closing now, since this seems like a duplicate report, and visual code 
inspection indicates it should have been fixed.

If you do still see this [~oweiler] , please feel free to re-open the ticket.

> kafka-streams-application-reset misses some internal topics
> ---
>
> Key: KAFKA-10530
> URL: https://issues.apache.org/jira/browse/KAFKA-10530
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 2.6.0
>Reporter: Oliver Weiler
>Priority: Major
>
> While the \{{kafka-streams-application-reset}} tool works in most cases, it 
> misses some internal topics when using {{Foreign Key Table-Table Joins}}.
> After execution, there are still two internal topics left which were not 
> deleted
> {code}
> bv4-indexer-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic
> bbv4-indexer-717e6cc5-acb2-498d-9d08-4814aaa71c81-StreamThread-1-consumer 
> bbv4-indexer-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-14-topic
> {code}
> The reason seems to be the {{StreamsResetter.isInternalTopic}} which requires 
> the internal topic to end with {{-changelog}} or {{-repartition}} (which the 
> mentioned topics don't).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10555) Improve client state machine

2020-10-05 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208231#comment-17208231
 ] 

Sophie Blee-Goldman commented on KAFKA-10555:
-

Just to clarify, I do agree with Matthias that we shouldn't transit to ERROR if 
the last stream thread is removed via the new removeStreamThread() method.

I thought we were only considering to transit to ERROR if the last thread died, 
but to transit to NOT_RUNNING if the last thread was removed by the user. This 
seems consistent with the current behavior and maintains the same semantic 
meaning of the ERROR state, imo. I don't think we can say that "transiting to 
ERROR if the last thread is removed" is following the current behavior, because 
there is no way to remove a thread at the moment. So, we should just do what 
makes the most sense for this case. Personally I think that would be to transit 
to NOT_RUNNING, since this is not an error or exceptional case but rather a 
valid user action.

I also agree with something that [~vvcephei] suggested earlier, which is that 
this should be part of the KIP discussion. At the very least, we should raise 
the final proposal on the discussion thread in case there are any objections.

> Improve client state machine
> 
>
> Key: KAFKA-10555
> URL: https://issues.apache.org/jira/browse/KAFKA-10555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> The KafkaStreams client exposes its state to the user for monitoring purpose 
> (ie, RUNNING, REBALANCING etc). The state of the client depends on the 
> state(s) of the internal StreamThreads that have their own states.
> Furthermore, the client state has impact on what the user can do with the 
> client. For example, active task can only be queried in RUNNING state and 
> similar.
> With KIP-671 and KIP-663 we improved error handling capabilities and allow to 
> add/remove stream thread dynamically. We allow adding/removing threads only 
> in RUNNING and REBALANCING state. This puts us in a "weird" position, because 
> if we enter ERROR state (ie, if the last thread dies), we cannot add new 
> threads and longer. However, if we have multiple threads and one dies, we 
> don't enter ERROR state and do allow to recover the thread.
> Before the KIPs the definition of ERROR state was clear, however, with both 
> KIPs it seem that we should revisit its semantics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9585) Flaky Test: LagFetchIntegrationTest#shouldFetchLagsDuringRebalancingWithOptimization

2020-10-05 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-9585.
--
Resolution: Cannot Reproduce

> Flaky Test: 
> LagFetchIntegrationTest#shouldFetchLagsDuringRebalancingWithOptimization
> 
>
> Key: KAFKA-9585
> URL: https://issues.apache.org/jira/browse/KAFKA-9585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test
>
> Failed for me locally with 
> {noformat}
> java.lang.AssertionError: Condition not met within timeout 12. Should 
> obtain non-empty lag information eventually
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499753420



##
File path: core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
##
@@ -185,7 +185,7 @@ class BrokerEndPointTest {
   "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
   "listener_security_protocol_map":{"CLIENT":"SSL", 
"REPLICATION":"PLAINTEXT"},
   "rack":"dc1",
-  "features": {"feature1": {"min_version": 1, "max_version": 2}, 
"feature2": {"min_version": 2, "max_version": 4}}
+  "features": {"feature1": {"min_version": 1, "first_active_version": 1, 
"max_version": 2}, "feature2": {"min_version": 2, "first_active_version": 2, 
"max_version": 4}}

Review comment:
   Should we revert the changes here?

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,161 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises support for.
+   * Each broker advertises the version ranges of its own supported features 
in its own
+   * BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there was an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has now been upgraded to a newer version that supports 
the feature versioning
+   *system (KIP-584). But the IBP config is still set to lower than 
KAFKA_2_7_IV0, and may be
+   *set to a higher value later. In this case, we want to start with no 
finalized features and
+   *allow the user to finalize them whenever they are ready i.e. in the 
future whenever the
+   *user sets IBP config to be greater than or equal to KAFKA_2_7_IV0, 
then the user could start
+   *finalizing the features. This process ensures we do not enable all the 
possible features
+   *immediately after an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absen

[GitHub] [kafka] bbejeck commented on pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered

2020-10-05 Thread GitBox


bbejeck commented on pull request #9237:
URL: https://github.com/apache/kafka/pull/9237#issuecomment-703803731


   @lkokhreidze, thanks for the quick update. I'll make another pass soon.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soondenana commented on pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

2020-10-05 Thread GitBox


soondenana commented on pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#issuecomment-703805880


   There was an error when building `streams.examples`:
   
   ```
   [2020-10-05T08:40:05.722Z] [ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-archetype-plugin:3.2.0:generate (default-cli) on 
project standalone-pom: A Maven project already exists in the directory 
/home/jenkins/workspace/Kafka_kafka-pr_PR-9347/streams/quickstart/test-streams-archetype/streams.examples
 -> [Help 1]
   ```
   
   The failure is not related to this PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10559) Don't shutdown the entire app upon TimeoutException during internal topic validation

2020-10-05 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208257#comment-17208257
 ] 

Sophie Blee-Goldman commented on KAFKA-10559:
-

[~sagarrao] Yeah, go ahead! This should be a pretty small PR so it would be 
great if we could knock it out in the next week or two. Just ping me when it's 
ready.

For the PR itself, I think it sounds reasonable to just rethrow the 
TimeoutException to kill the thread. The "add/recover stream thread" 
functionality will probably slip 2.7, but it'll be implemented soon. So we 
don't really need to go out of our way to save a single thread from death in 
rare circumstances imo

> Don't shutdown the entire app upon TimeoutException during internal topic 
> validation
> 
>
> Key: KAFKA-10559
> URL: https://issues.apache.org/jira/browse/KAFKA-10559
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.7.0
>
>
> During some of the KIP-572 work, we made things pretty brittle by changing 
> the StreamsPartitionAssignor to send the `INCOMPLETE_SOURCE_TOPIC_METADATA` 
> error code and shut down the entire application if a TimeoutException is hit 
> during the internal topic creation/validation.
> Internal topic validation occurs during every rebalance, and we have seen it 
> time out on topic discovery in unstable environments. So shutting down the 
> entire application seems like a step in the wrong direction, and antithetical 
> to the goal of KIP-572 (improving the resiliency of Streams in the face of 
> TimeoutExceptions)
> I'm not totally sure what the previous behavior was, but it seems to me we 
> have three options:
>  # Rethrow the TimeoutException and allow it to kill the thread
>  # Swallow the TimeoutException and retry the rebalance indefinitely
>  # Some combination of the above: swallow the TimeoutException but don't 
> retry indefinitely:
>  ## Start a timer and allow retrying rebalances for up the configured 
> task.timeout.ms, the timeout config introduced in KIP-572
>  ## Retry for some constant number of rebalances
> I think if we go with option 3, then shutting down the entire application is 
> relatively more palatable, as we have given the environment a chance to 
> stabilize.
> But, killing the thread still seems preferable, given the two new features 
> that are coming out soon: the ability to start up new threads, and the 
> improved exception handler that allows the user to choose to shut down the 
> entire application if that's really what they want. Once users have this 
> level of control over the application, we should allow them to decide how 
> they want to handle exceptional cases like this, rather than forcing an 
> option on them (eg shutdown everything) 
>  
> Imo we should fix this before 2.7 comes out, even if it's just a partial fix 
> (eg we do option 1 in 2.7, but plan to implement option 3 eventually)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10559) Don't shutdown the entire app upon TimeoutException during internal topic validation

2020-10-05 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-10559:
---

Assignee: Sagar Rao

> Don't shutdown the entire app upon TimeoutException during internal topic 
> validation
> 
>
> Key: KAFKA-10559
> URL: https://issues.apache.org/jira/browse/KAFKA-10559
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Blocker
> Fix For: 2.7.0
>
>
> During some of the KIP-572 work, we made things pretty brittle by changing 
> the StreamsPartitionAssignor to send the `INCOMPLETE_SOURCE_TOPIC_METADATA` 
> error code and shut down the entire application if a TimeoutException is hit 
> during the internal topic creation/validation.
> Internal topic validation occurs during every rebalance, and we have seen it 
> time out on topic discovery in unstable environments. So shutting down the 
> entire application seems like a step in the wrong direction, and antithetical 
> to the goal of KIP-572 (improving the resiliency of Streams in the face of 
> TimeoutExceptions)
> I'm not totally sure what the previous behavior was, but it seems to me we 
> have three options:
>  # Rethrow the TimeoutException and allow it to kill the thread
>  # Swallow the TimeoutException and retry the rebalance indefinitely
>  # Some combination of the above: swallow the TimeoutException but don't 
> retry indefinitely:
>  ## Start a timer and allow retrying rebalances for up the configured 
> task.timeout.ms, the timeout config introduced in KIP-572
>  ## Retry for some constant number of rebalances
> I think if we go with option 3, then shutting down the entire application is 
> relatively more palatable, as we have given the environment a chance to 
> stabilize.
> But, killing the thread still seems preferable, given the two new features 
> that are coming out soon: the ability to start up new threads, and the 
> improved exception handler that allows the user to choose to shut down the 
> entire application if that's really what they want. Once users have this 
> level of control over the application, we should allow them to decide how 
> they want to handle exceptional cases like this, rather than forcing an 
> option on them (eg shutdown everything) 
>  
> Imo we should fix this before 2.7 comes out, even if it's just a partial fix 
> (eg we do option 1 in 2.7, but plan to implement option 3 eventually)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] piotrrzysko commented on pull request #9371: KAFKA-10510: Validate replication factor consistency on reassignment

2020-10-05 Thread GitBox


piotrrzysko commented on pull request #9371:
URL: https://github.com/apache/kafka/pull/9371#issuecomment-703823226


   Hi @stanislavkozlovski, would you mind taking a look at this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499810462



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,161 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises support for.
+   * Each broker advertises the version ranges of its own supported features 
in its own
+   * BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there was an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has now been upgraded to a newer version that supports 
the feature versioning
+   *system (KIP-584). But the IBP config is still set to lower than 
KAFKA_2_7_IV0, and may be
+   *set to a higher value later. In this case, we want to start with no 
finalized features and
+   *allow the user to finalize them whenever they are ready i.e. in the 
future whenever the
+   *user sets IBP config to be greater than or equal to KAFKA_2_7_IV0, 
then the user could start
+   *finalizing the features. This process ensures we do not enable all the 
possible features
+   *immediately after an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent.
+   *- If the node is absent, it will react by creating a FeatureZNode 
with disabled status
+   *  and empty finalized features.
+   *- Otherwise, if a node already exists in enabled status then the 
controller will just
+   *  flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled.
+   * - If the node is in disabled status, the controller won’t upgrade 
all features immediately.
+   *   

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499811265



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -272,6 +281,161 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises support for.
+   * Each broker advertises the version ranges of its own supported features 
in its own
+   * BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there was an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has now been upgraded to a newer version that supports 
the feature versioning
+   *system (KIP-584). But the IBP config is still set to lower than 
KAFKA_2_7_IV0, and may be
+   *set to a higher value later. In this case, we want to start with no 
finalized features and
+   *allow the user to finalize them whenever they are ready i.e. in the 
future whenever the
+   *user sets IBP config to be greater than or equal to KAFKA_2_7_IV0, 
then the user could start
+   *finalizing the features. This process ensures we do not enable all the 
possible features
+   *immediately after an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent.
+   *- If the node is absent, it will react by creating a FeatureZNode 
with disabled status
+   *  and empty finalized features.
+   *- Otherwise, if a node already exists in enabled status then the 
controller will just
+   *  flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled.
+   * - If the node is in disabled status, the controller won’t upgrade 
all features immediately.
+   *   

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499811752



##
File path: core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
##
@@ -185,7 +185,7 @@ class BrokerEndPointTest {
   "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
   "listener_security_protocol_map":{"CLIENT":"SSL", 
"REPLICATION":"PLAINTEXT"},
   "rack":"dc1",
-  "features": {"feature1": {"min_version": 1, "max_version": 2}, 
"feature2": {"min_version": 2, "max_version": 4}}
+  "features": {"feature1": {"min_version": 1, "first_active_version": 1, 
"max_version": 2}, "feature2": {"min_version": 2, "first_active_version": 2, 
"max_version": 4}}

Review comment:
   Done. Nice catch!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch merged pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

2020-10-05 Thread GitBox


rhauch merged pull request #9347:
URL: https://github.com/apache/kafka/pull/9347


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499812076



##
File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
##
@@ -0,0 +1,580 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.{Optional, Properties}
+import java.util.concurrent.ExecutionException
+
+import kafka.api.KAFKA_2_7_IV0
+import kafka.utils.TestUtils
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
+import kafka.utils.TestUtils.waitUntilTrue
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, 
FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.feature.FinalizedVersionRange
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.message.UpdateFeaturesRequestData
+import 
org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{UpdateFeaturesRequest, 
UpdateFeaturesResponse}
+import org.apache.kafka.common.utils.Utils
+import org.junit.Test
+import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, 
assertNotNull, assertTrue}
+import org.scalatest.Assertions.intercept
+
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+import scala.util.matching.Regex
+
+class UpdateFeaturesTest extends BaseRequestTest {
+
+  override def brokerCount = 3
+
+  override def brokerPropertyOverrides(props: Properties): Unit = {
+props.put(KafkaConfig.InterBrokerProtocolVersionProp, 
KAFKA_2_7_IV0.toString)
+  }
+
+  private def defaultSupportedFeatures(): Features[SupportedVersionRange] = {
+Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
SupportedVersionRange(1, 3
+  }
+
+  private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
FinalizedVersionRange(1, 2
+  }
+
+  private def updateSupportedFeatures(
+features: Features[SupportedVersionRange], targetServers: 
Set[KafkaServer]): Unit = {
+targetServers.foreach(s => {
+  s.brokerFeatures.setSupportedFeatures(features)
+  s.zkClient.updateBrokerInfo(s.createBrokerInfo)
+})
+
+// Wait until updates to all BrokerZNode supported features propagate to 
the controller.
+val brokerIds = targetServers.map(s => s.config.brokerId)
+waitUntilTrue(
+  () => servers.exists(s => {
+if (s.kafkaController.isActive) {
+  s.kafkaController.controllerContext.liveOrShuttingDownBrokers
+.filter(b => brokerIds.contains(b.id))
+.forall(b => {
+  b.features.equals(features)
+})
+} else {
+  false
+}
+  }),
+  "Controller did not get broker updates")
+  }
+
+  private def updateSupportedFeaturesInAllBrokers(features: 
Features[SupportedVersionRange]): Unit = {
+updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
+  }
+
+  private def updateFeatureZNode(features: Features[FinalizedVersionRange]): 
Int = {
+val server = serverForId(0).get
+val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features)
+val newVersion = server.zkClient.updateFeatureZNode(newNode)
+servers.foreach(s => {
+  s.featureCache.waitUntilEpochOrThrow(newVersion, 
s.config.zkConnectionTimeoutMs)
+})
+newVersion
+  }
+
+  private def getFeatureZNode(): FeatureZNode = {
+val (mayBeFeatureZNodeBytes, version) = 
serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path)
+assertNotEquals(version, ZkVersion.UnknownVersion)
+FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+  }
+
+  private def finalizedFeatures(features: java.util.Map[String, 
org.apache.kafka.clients.admin.FinalizedVersionRange]): 
Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(features.asScala.map {
+  case(name, versionRange) =>
+(name, new FinalizedVersionRange(versionRange.minVersionLevel(), 
versionRan

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499816373



##
File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
##
@@ -0,0 +1,580 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.{Optional, Properties}
+import java.util.concurrent.ExecutionException
+
+import kafka.api.KAFKA_2_7_IV0
+import kafka.utils.TestUtils
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
+import kafka.utils.TestUtils.waitUntilTrue
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, 
FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.feature.FinalizedVersionRange
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.message.UpdateFeaturesRequestData
+import 
org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{UpdateFeaturesRequest, 
UpdateFeaturesResponse}
+import org.apache.kafka.common.utils.Utils
+import org.junit.Test
+import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, 
assertNotNull, assertTrue}
+import org.scalatest.Assertions.intercept
+
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+import scala.util.matching.Regex
+
+class UpdateFeaturesTest extends BaseRequestTest {
+
+  override def brokerCount = 3
+
+  override def brokerPropertyOverrides(props: Properties): Unit = {
+props.put(KafkaConfig.InterBrokerProtocolVersionProp, 
KAFKA_2_7_IV0.toString)
+  }
+
+  private def defaultSupportedFeatures(): Features[SupportedVersionRange] = {
+Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
SupportedVersionRange(1, 3
+  }
+
+  private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
FinalizedVersionRange(1, 2
+  }
+
+  private def updateSupportedFeatures(
+features: Features[SupportedVersionRange], targetServers: 
Set[KafkaServer]): Unit = {
+targetServers.foreach(s => {
+  s.brokerFeatures.setSupportedFeatures(features)
+  s.zkClient.updateBrokerInfo(s.createBrokerInfo)
+})
+
+// Wait until updates to all BrokerZNode supported features propagate to 
the controller.
+val brokerIds = targetServers.map(s => s.config.brokerId)
+waitUntilTrue(
+  () => servers.exists(s => {
+if (s.kafkaController.isActive) {
+  s.kafkaController.controllerContext.liveOrShuttingDownBrokers
+.filter(b => brokerIds.contains(b.id))
+.forall(b => {
+  b.features.equals(features)
+})
+} else {
+  false
+}
+  }),
+  "Controller did not get broker updates")
+  }
+
+  private def updateSupportedFeaturesInAllBrokers(features: 
Features[SupportedVersionRange]): Unit = {
+updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
+  }
+
+  private def updateFeatureZNode(features: Features[FinalizedVersionRange]): 
Int = {
+val server = serverForId(0).get
+val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features)
+val newVersion = server.zkClient.updateFeatureZNode(newNode)
+servers.foreach(s => {
+  s.featureCache.waitUntilEpochOrThrow(newVersion, 
s.config.zkConnectionTimeoutMs)
+})
+newVersion
+  }
+
+  private def getFeatureZNode(): FeatureZNode = {
+val (mayBeFeatureZNodeBytes, version) = 
serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path)
+assertNotEquals(version, ZkVersion.UnknownVersion)
+FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+  }
+
+  private def finalizedFeatures(features: java.util.Map[String, 
org.apache.kafka.clients.admin.FinalizedVersionRange]): 
Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(features.asScala.map {
+  case(name, versionRange) =>
+(name, new FinalizedVersionRange(versionRange.minVersionLevel(), 
versionRan

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499816619



##
File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
##
@@ -0,0 +1,580 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.{Optional, Properties}
+import java.util.concurrent.ExecutionException
+
+import kafka.api.KAFKA_2_7_IV0
+import kafka.utils.TestUtils
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
+import kafka.utils.TestUtils.waitUntilTrue
+import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, 
FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
+import org.apache.kafka.common.errors.InvalidRequestException
+import org.apache.kafka.common.feature.FinalizedVersionRange
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
+import org.apache.kafka.common.message.UpdateFeaturesRequestData
+import 
org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{UpdateFeaturesRequest, 
UpdateFeaturesResponse}
+import org.apache.kafka.common.utils.Utils
+import org.junit.Test
+import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, 
assertNotNull, assertTrue}
+import org.scalatest.Assertions.intercept
+
+import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
+import scala.util.matching.Regex
+
+class UpdateFeaturesTest extends BaseRequestTest {
+
+  override def brokerCount = 3
+
+  override def brokerPropertyOverrides(props: Properties): Unit = {
+props.put(KafkaConfig.InterBrokerProtocolVersionProp, 
KAFKA_2_7_IV0.toString)
+  }
+
+  private def defaultSupportedFeatures(): Features[SupportedVersionRange] = {
+Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
SupportedVersionRange(1, 3
+  }
+
+  private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new 
FinalizedVersionRange(1, 2
+  }
+
+  private def updateSupportedFeatures(
+features: Features[SupportedVersionRange], targetServers: 
Set[KafkaServer]): Unit = {
+targetServers.foreach(s => {
+  s.brokerFeatures.setSupportedFeatures(features)
+  s.zkClient.updateBrokerInfo(s.createBrokerInfo)
+})
+
+// Wait until updates to all BrokerZNode supported features propagate to 
the controller.
+val brokerIds = targetServers.map(s => s.config.brokerId)
+waitUntilTrue(
+  () => servers.exists(s => {
+if (s.kafkaController.isActive) {
+  s.kafkaController.controllerContext.liveOrShuttingDownBrokers
+.filter(b => brokerIds.contains(b.id))
+.forall(b => {
+  b.features.equals(features)
+})
+} else {
+  false
+}
+  }),
+  "Controller did not get broker updates")
+  }
+
+  private def updateSupportedFeaturesInAllBrokers(features: 
Features[SupportedVersionRange]): Unit = {
+updateSupportedFeatures(features, Set[KafkaServer]() ++ servers)
+  }
+
+  private def updateFeatureZNode(features: Features[FinalizedVersionRange]): 
Int = {
+val server = serverForId(0).get
+val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features)
+val newVersion = server.zkClient.updateFeatureZNode(newNode)
+servers.foreach(s => {
+  s.featureCache.waitUntilEpochOrThrow(newVersion, 
s.config.zkConnectionTimeoutMs)
+})
+newVersion
+  }
+
+  private def getFeatureZNode(): FeatureZNode = {
+val (mayBeFeatureZNodeBytes, version) = 
serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path)
+assertNotEquals(version, ZkVersion.UnknownVersion)
+FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+  }
+
+  private def finalizedFeatures(features: java.util.Map[String, 
org.apache.kafka.clients.admin.FinalizedVersionRange]): 
Features[FinalizedVersionRange] = {
+Features.finalizedFeatures(features.asScala.map {
+  case(name, versionRange) =>
+(name, new FinalizedVersionRange(versionRange.minVersionLevel(), 
versionRan

[GitHub] [kafka] dima5rr commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

2020-10-05 Thread GitBox


dima5rr commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-703838454


   Hi @guozhangwang can you trigger new build, looks like flaky tests?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10531) KafkaBasedLog can sleep for negative values

2020-10-05 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-10531:
--
Fix Version/s: 2.5.2
   2.7.0

> KafkaBasedLog can sleep for negative values
> ---
>
> Key: KAFKA-10531
> URL: https://issues.apache.org/jira/browse/KAFKA-10531
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.6.0
>Reporter: Vikas Singh
>Assignee: Vikas Singh
>Priority: Major
> Fix For: 2.7.0, 2.5.2, 2.6.1
>
>
> {{time.milliseconds}} is not monotonic, so this code can throw :
> {{java.lang.IllegalArgumentException: timeout value is negative}}
>  
> {code:java}
> long started = time.milliseconds();
> while (partitionInfos == null && time.milliseconds() - started < 
> CREATE_TOPIC_TIMEOUT_MS) {
> partitionInfos = consumer.partitionsFor(topic);
> Utils.sleep(Math.min(time.milliseconds() - started, 1000));
> }
> {code}
> We need to check for negative value before sleeping.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


vvcephei commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499836489



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
+ * terminates due to an uncaught exception.
+ *
+ * @param eh the uncaught exception handler of type {@link 
StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes 
the current handler
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler eh) {

Review comment:
   Oh, sure. Now I know why you picked this name :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


vvcephei commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499838819



##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public interface StreamsUncaughtExceptionHandler {
+/**
+ * Inspect a record and the exception received.
+ * @param exception the actual exception
+ */
+StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse 
handle(final Exception exception);

Review comment:
   That's a good question. Maybe we could just document that that option is 
an unsuitable response to an Error, and also log an `ERROR` message if you 
select it in response to an Error. It's not _always_ bad to ignore an Error, 
but it usually is. We can leave it to users to decide what they want to do.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


vvcephei commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499842547



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -436,6 +496,8 @@ private void maybeSetError() {
 }
 
 if (setState(State.ERROR)) {
+metrics.close();

Review comment:
   It certainly might. I'm just wary of how far into the uncanny valley 
we're going here. Streams is going to be put into a state that's very similar 
to the one that `close()` produces, but not identical. What will then happen 
when they _do_ call close?
   
   OTOH, we could instead change direction on the "error-vs-shutdown" debase 
and just make all these methods call `close(ZERO)` instead. Then, the _real_ 
close method will be invoked, and Streams will go through a well-known 
transition through `PENDING_SHUTDOWN` to `NOT_RUNNING`.
   
   It would then be a problem for a later date (after KIP-663) if someone 
wanted to request that instead the app should stop all running threads so they 
can manually call "addThread" later.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


vvcephei commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499842547



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -436,6 +496,8 @@ private void maybeSetError() {
 }
 
 if (setState(State.ERROR)) {
+metrics.close();

Review comment:
   It certainly might. I'm just wary of how far into the uncanny valley 
we're going here. Streams is going to be put into a state that's very similar 
to the one that `close()` produces, but not identical. What will then happen 
when they _do_ call close? What will happen when we realize that something else 
needs to be done as part of closing the instance (will we even remember that we 
should consider doing it here as well)?
   
   OTOH, we could instead change direction on the "error-vs-shutdown" debase 
and just make all these methods call `close(ZERO)` instead. Then, the _real_ 
close method will be invoked, and Streams will go through a well-known 
transition through `PENDING_SHUTDOWN` to `NOT_RUNNING`.
   
   It would then be a problem for a later date (after KIP-663) if someone 
wanted to request that instead the app should stop all running threads so they 
can manually call "addThread" later.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


vvcephei commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499843780



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -616,7 +616,7 @@ public void 
shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState(
 final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
 streams.start();
 try {
-streams.setUncaughtExceptionHandler(null);
+
streams.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler) null);

Review comment:
   Ah, gotcha. Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r499847021



##
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##
@@ -715,7 +747,58 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
 doAnswer((_: InvocationOnMock) => {
   latch.countDown()
 }).doCallRealMethod().when(spyThread).awaitShutdown()
-controller.shutdown() 
+controller.shutdown()
+  }
+
+  private def testControllerFeatureZNodeSetup(initialZNode: 
Option[FeatureZNode],
+  interBrokerProtocolVersion: 
ApiVersion): Unit = {
+val versionBeforeOpt = initialZNode match {
+  case Some(node) =>
+zkClient.createFeatureZNode(node)
+Some(zkClient.getDataAndVersion(FeatureZNode.path)._2)
+  case None =>
+Option.empty
+}
+servers = makeServers(1, interBrokerProtocolVersion = 
Some(interBrokerProtocolVersion))
+TestUtils.waitUntilControllerElected(zkClient)

Review comment:
   Done. Please take a look at the fix. I've added logic to wait for 
processing on a dummy event just after waiting for controller election. I'm 
hoping this will make sure the controller failover logic is completed before 
the test proceeds further to make assertions.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9323: KAFKA-10514: Fix unit test for state directory cleanup

2020-10-05 Thread GitBox


vvcephei commented on pull request #9323:
URL: https://github.com/apache/kafka/pull/9323#issuecomment-703867815


   Cherry-picked to 2.6



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

2020-10-05 Thread GitBox


vvcephei commented on pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#issuecomment-703867931


   Cherry-picked to 2.6



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8353: KAFKA-9764: Remove stream simple benchmark suite

2020-10-05 Thread GitBox


vvcephei commented on pull request #8353:
URL: https://github.com/apache/kafka/pull/8353#issuecomment-703869908


   cherry-picked to 2.5



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik commented on pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#issuecomment-703873486


   @junrao Thanks for the review! I've addressed the latest comments in 
e55358fd1a00f12ef98fc4d2d649a297ddf146da .



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik edited a comment on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-10-05 Thread GitBox


kowshik edited a comment on pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#issuecomment-703873486


   @junrao Thanks for the review! I've addressed the latest comments in 
e55358fd1a00f12ef98fc4d2d649a297ddf146da . The PR is ready for another pass.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8353: KAFKA-9764: Remove stream simple benchmark suite

2020-10-05 Thread GitBox


vvcephei commented on pull request #8353:
URL: https://github.com/apache/kafka/pull/8353#issuecomment-703874939


   Cherry-picked to 2.4 and 2.3



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


wcarlson5 commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499868891



##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public interface StreamsUncaughtExceptionHandler {
+/**
+ * Inspect a record and the exception received.
+ * @param exception the actual exception
+ */
+StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse 
handle(final Exception exception);

Review comment:
   That seems to be a fine solution. It will still attempt the shutdown but 
may fail. As long as we warn I guess it will work





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


wcarlson5 commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499871031



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -436,6 +496,8 @@ private void maybeSetError() {
 }
 
 if (setState(State.ERROR)) {
+metrics.close();

Review comment:
   We did have it this way in the kip. If we stick to this for now I think 
that we can clear this up easily when we decide what we want to do with the 
sates in general in when we take care of the discussion in KIP-663 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-05 Thread GitBox


wcarlson5 commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r499871426



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
+ * terminates due to an uncaught exception.
+ *
+ * @param eh the uncaught exception handler of type {@link 
StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes 
the current handler
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler eh) {
+final StreamsUncaughtExceptionHandler handler = exception -> 
handleStreamsUncaughtException(exception, eh);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+for (final StreamThread thread : threads) {
+if (eh != null)  {
+thread.setStreamsUncaughtExceptionHandler(handler);
+} else {
+final StreamsUncaughtExceptionHandler defaultHandler = 
exception ->
+
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse.SHUTDOWN_STREAM_THREAD;
+
thread.setStreamsUncaughtExceptionHandler(defaultHandler);
+}
+}
+} else {
+throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+"Current state is: " + state);
+}
+}
+}
+
+private 
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse 
handleStreamsUncaughtException(final Exception e,
+   
final StreamsUncaughtExceptionHandler 
streamsUncaughtExceptionHandler) {
+final 
StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse action 
= streamsUncaughtExceptionHandler.handle(e);
+switch (action) {
+case SHUTDOWN_STREAM_THREAD:
+log.error("Encountered the following exception during 
processing " +
+"and the thread is going to shut down: ", e);
+break;
+case REPLACE_STREAM_THREAD:
+log.error("Encountered the following exception during 
processing " +
+"and the the stream thread will be replaced: ", e); 
//TODO: add then remove, wait until 663 is merged
+break;
+case SHUTDOWN_KAFKA_STREAMS_CLIENT:
+log.error("Encountered the following exception during 
processing " +
+"and the client is going to shut down: ", e);
+for (final StreamThread streamThread: threads) {
+streamThread.shutdown();
+}

Review comment:
   I am okay with renaming, but I will wait to for everything else to be 
cleared up to see if it is still necessary.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #9376: MINOR: Remove `TargetVoters` from `DescribeQuorum`

2020-10-05 Thread GitBox


hachikuji opened a new pull request #9376:
URL: https://github.com/apache/kafka/pull/9376


   This field is leftover from the early days of the KIP when it covered 
reassignment. Since the API is not exposed yet, should be no harm updating the 
first version.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2020-10-05 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208316#comment-17208316
 ] 

Sophie Blee-Goldman commented on KAFKA-5998:


[~sandeep.lakdaw...@gmail.com] are you running all three instances on the same 
machine with a shared state directory? And/or using /tmp as the state directory?

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Assignee: John Roesler
>Priority: Critical
> Fix For: 2.2.2, 2.4.0, 2.3.1
>
> Attachments: 5998.v1.txt, 5998.v2.txt, Kafka5998.zip, Topology.txt, 
> exc.txt, props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.intern

[GitHub] [kafka] hachikuji merged pull request #9349: MINOR: add proper checks to KafkaConsumer.groupMetadata

2020-10-05 Thread GitBox


hachikuji merged pull request #9349:
URL: https://github.com/apache/kafka/pull/9349


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #9377: MINOR: Move `RaftRequestHandler` to `tools` package

2020-10-05 Thread GitBox


hachikuji opened a new pull request #9377:
URL: https://github.com/apache/kafka/pull/9377


   To avoid confusion since is only used by `TestRaftServer`, this PR moves 
`RaftRequestHandler` to the `tools` package.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7421) Deadlock in Kafka Connect

2020-10-05 Thread Kyle Leiby (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208344#comment-17208344
 ] 

Kyle Leiby commented on KAFKA-7421:
---

Hi all, we've been encountering a similar deadlock (I think the same as the one 
[~xakassi] is seeing). We are running a single Debezium JAR inside a 
{{confluentinc/cp-kafka-connect-base:5.5.1-1-deb8}} container. We tried several 
5.x Debian 8 images, and encounter the deadlocks in all of them.

Here's the relevant portion from an example thread dump:
{code:java}
Found one Java-level deadlock:
=
"StartAndStopExecutor-connect-1-2":
  waiting to lock monitor 0x7f2b68001d58 (object 0xc118c3c8, a 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader),
  which is held by "StartAndStopExecutor-connect-1-1"
"StartAndStopExecutor-connect-1-1":
  waiting to lock monitor 0x7f2b68001eb8 (object 0xc510, a 
org.apache.kafka.connect.runtime.isolation.PluginClassLoader),
  which is held by "StartAndStopExecutor-connect-1-2"

Java stack information for the threads listed above:
===
"StartAndStopExecutor-connect-1-2":
at java.lang.ClassLoader.loadClass(ClassLoader.java:404)
- waiting to lock <0xc118c3c8> (a 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:397)
at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
- locked <0xc6a9e908> (a java.lang.Object)
at 
org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
- locked <0xc6a9e908> (a java.lang.Object)
- locked <0xc510> (a 
org.apache.kafka.connect.runtime.isolation.PluginClassLoader)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:719)
at 
org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:311)
at 
org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:215)
at 
org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:209)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:432)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1186)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:127)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1201)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1197)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
"StartAndStopExecutor-connect-1-1":
at 
org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:91)
- waiting to lock <0xc510> (a 
org.apache.kafka.connect.runtime.isolation.PluginClassLoader)
at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:394)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:719)
at 
org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:311)
at 
org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:215)
at 
org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:209)
at 
org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:251)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1229)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:127)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:1245)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:1241)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(Thre

[GitHub] [kafka] guozhangwang commented on pull request #9377: MINOR: Move `RaftRequestHandler` to `tools` package

2020-10-05 Thread GitBox


guozhangwang commented on pull request #9377:
URL: https://github.com/apache/kafka/pull/9377#issuecomment-703906935


   LGTM!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #9342: MINOR: Update doc for raft state metrics

2020-10-05 Thread GitBox


guozhangwang merged pull request #9342:
URL: https://github.com/apache/kafka/pull/9342


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on a change in pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered

2020-10-05 Thread GitBox


bbejeck commented on a change in pull request #9237:
URL: https://github.com/apache/kafka/pull/9237#discussion_r499908753



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(value = Parameterized.class)
+@Category({IntegrationTest.class})
+public class StreamTableJoinTopologyOptimizationIntegrationTest {
+private static final int NUM_BROKERS = 1;
+
+@ClassRule
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+
+private String tableTopic;
+private String inputTopic;
+private String outputTopic;
+private String applicationId;
+
+private Properties streamsConfiguration;
+
+@Rule
+public TestName testName = new TestName();
+
+@Parameterized.Parameter
+public String topologyOptimization;
+
+@Parameterized.Parameters(name = "Optimization = {0}")
+public static Collection topologyOptimization() {
+return Arrays.asList(new String[][]{
+{StreamsConfig.OPTIMIZE},
+{StreamsConfig.NO_OPTIMIZATION}
+});
+}
+
+@Before
+public void before() throws InterruptedException {
+streamsConfiguration = new Properties();
+
+final String safeTestName = safeUniqueTestName(getClass(), testName);
+
+tableTopic = "table-topic" + safeTestName;
+inputTopic = "stream-topic-" + safeTestName;
+outputTopic = "output-topic-" + safeTestName;
+applicationId = "app-" + safeTestName;
+
+CLUSTER.createTopic(inputTopic, 4, 1);
+CLUSTER.createTopic(tableTopic, 2, 1);
+CLUSTER.createTopic(outputTopic, 4, 1);
+
+streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
+streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_

  1   2   >