[GitHub] [kafka] rondagostino commented on a change in pull request #9370: KAFKA-9234: Added Nullability Annotations to Admin
rondagostino commented on a change in pull request #9370: URL: https://github.com/apache/kafka/pull/9370#discussion_r499595845 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4160,15 +4231,18 @@ void handleFailure(Throwable throwable) { } @Override -public DescribeUserScramCredentialsResult describeUserScramCredentials(List users, DescribeUserScramCredentialsOptions options) { +public @NotNull DescribeUserScramCredentialsResult describeUserScramCredentials( +final @Nullable List<@NotNull String> users, +final @NotNull DescribeUserScramCredentialsOptions options +) { final KafkaFutureImpl dataFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override public DescribeUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { return new DescribeUserScramCredentialsRequest.Builder( -new DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user -> +new DescribeUserScramCredentialsRequestData().setUsers(users == null ? Collections.emptyList() : users.stream().map(user -> Review comment: Not 100% sure the parens are needed, but either they are or it increases clarity/decreases confusion to add them. ```suggestion new DescribeUserScramCredentialsRequestData().setUsers((users == null ? Collections.emptyList() : users).stream().map(user -> ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Fleshgrinder commented on a change in pull request #9370: KAFKA-9234: Added Nullability Annotations to Admin
Fleshgrinder commented on a change in pull request #9370: URL: https://github.com/apache/kafka/pull/9370#discussion_r499643570 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4160,15 +4231,18 @@ void handleFailure(Throwable throwable) { } @Override -public DescribeUserScramCredentialsResult describeUserScramCredentials(List users, DescribeUserScramCredentialsOptions options) { +public @NotNull DescribeUserScramCredentialsResult describeUserScramCredentials( +final @Nullable List<@NotNull String> users, +final @NotNull DescribeUserScramCredentialsOptions options +) { final KafkaFutureImpl dataFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override public DescribeUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { return new DescribeUserScramCredentialsRequest.Builder( -new DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user -> +new DescribeUserScramCredentialsRequestData().setUsers(users == null ? Collections.emptyList() : users.stream().map(user -> Review comment: This way we would also run the empty map through _stream_, which is not required. I tried to make the change with as little impact as possible but if I were to write it then I would do it as follows: ```java Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override public DescribeUserScramCredentialsRequest.Builder createRequest(final int timeoutMs) { final DescribeUserScramCredentialsRequestData requestData = new DescribeUserScramCredentialsRequestData(); if (users != null) { final List userNames = new ArrayList<>(users.size()); for (final String user : users) { userNames.add(new UserName().setName(user)); } requestData.setUsers(userNames); } return new DescribeUserScramCredentialsRequest.Builder(requestData); } ``` This is more code, yes, and it is not using _stream_ anymore but it is both clearer, faster, and allocates less. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9370: KAFKA-9234: Added Nullability Annotations to Admin
rondagostino commented on a change in pull request #9370: URL: https://github.com/apache/kafka/pull/9370#discussion_r499647441 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4160,15 +4231,18 @@ void handleFailure(Throwable throwable) { } @Override -public DescribeUserScramCredentialsResult describeUserScramCredentials(List users, DescribeUserScramCredentialsOptions options) { +public @NotNull DescribeUserScramCredentialsResult describeUserScramCredentials( +final @Nullable List<@NotNull String> users, +final @NotNull DescribeUserScramCredentialsOptions options +) { final KafkaFutureImpl dataFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override public DescribeUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { return new DescribeUserScramCredentialsRequest.Builder( -new DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user -> +new DescribeUserScramCredentialsRequestData().setUsers(users == null ? Collections.emptyList() : users.stream().map(user -> Review comment: I’m thinking it would be best to fix this bug in a separate “MINOR: fix potential NPE...” PR since this PR may or may not get merged. Do you agree? If so, either I can do it, or feel free to do it — I’ll go with whichever you wish. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter
bbejeck commented on pull request #9099: URL: https://github.com/apache/kafka/pull/9099#issuecomment-703675565 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter
bbejeck commented on pull request #9099: URL: https://github.com/apache/kafka/pull/9099#issuecomment-703675276 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Fleshgrinder commented on a change in pull request #9370: KAFKA-9234: Added Nullability Annotations to Admin
Fleshgrinder commented on a change in pull request #9370: URL: https://github.com/apache/kafka/pull/9370#discussion_r499643570 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4160,15 +4231,18 @@ void handleFailure(Throwable throwable) { } @Override -public DescribeUserScramCredentialsResult describeUserScramCredentials(List users, DescribeUserScramCredentialsOptions options) { +public @NotNull DescribeUserScramCredentialsResult describeUserScramCredentials( +final @Nullable List<@NotNull String> users, +final @NotNull DescribeUserScramCredentialsOptions options +) { final KafkaFutureImpl dataFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override public DescribeUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { return new DescribeUserScramCredentialsRequest.Builder( -new DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user -> +new DescribeUserScramCredentialsRequestData().setUsers(users == null ? Collections.emptyList() : users.stream().map(user -> Review comment: This way we would also run the empty map through _stream_, which is not required. I tried to make the change with as little impact as possible but if I were to write it then I would do it as follows: ```java Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override public DescribeUserScramCredentialsRequest.Builder createRequest(final int timeoutMs) { final DescribeUserScramCredentialsRequestData requestData = new DescribeUserScramCredentialsRequestData(); if (users != null && !users.isEmpty()) { final List userNames = new ArrayList<>(users.size()); for (final String user : users) { userNames.add(new UserName().setName(user)); } requestData.setUsers(userNames); } return new DescribeUserScramCredentialsRequest.Builder(requestData); } ``` This is more code, yes, and it is not using _stream_ anymore but it is both clearer, faster, and allocates less. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Fleshgrinder commented on a change in pull request #9370: KAFKA-9234: Added Nullability Annotations to Admin
Fleshgrinder commented on a change in pull request #9370: URL: https://github.com/apache/kafka/pull/9370#discussion_r499654983 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4160,15 +4231,18 @@ void handleFailure(Throwable throwable) { } @Override -public DescribeUserScramCredentialsResult describeUserScramCredentials(List users, DescribeUserScramCredentialsOptions options) { +public @NotNull DescribeUserScramCredentialsResult describeUserScramCredentials( +final @Nullable List<@NotNull String> users, +final @NotNull DescribeUserScramCredentialsOptions options +) { final KafkaFutureImpl dataFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override public DescribeUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { return new DescribeUserScramCredentialsRequest.Builder( -new DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user -> +new DescribeUserScramCredentialsRequestData().setUsers(users == null ? Collections.emptyList() : users.stream().map(user -> Review comment: Happy to help, PR incoming. 😊 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on a change in pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered
bbejeck commented on a change in pull request #9237: URL: https://github.com/apache/kafka/pull/9237#discussion_r499670799 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -633,10 +633,20 @@ public final void addInternalTopic(final String topicName, } public final void copartitionSources(final Collection sourceNodes) { -copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes))); +copartitionSourceGroups.add(new HashSet<>(sourceNodes)); } -public void validateCopartition() { +public final void maybeUpdateCopartitionSourceGroups(final String replacedNodeName, + final String optimizedNodeName) { +for (final Set copartitionSourceGroup : copartitionSourceGroups) { +if (copartitionSourceGroup.contains(replacedNodeName)) { +copartitionSourceGroup.remove(replacedNodeName); +copartitionSourceGroup.add(optimizedNodeName); +} +} +} + +public synchronized void validateCopartition() { Review comment: nit: I think we can remove `synchronized` here as well This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9369: KAFKA-4715: Ignore case of CompressionType and OffsetResetStrategy
vvcephei commented on pull request #9369: URL: https://github.com/apache/kafka/pull/9369#issuecomment-703701703 Thanks for the PR @Fleshgrinder , The code change looks good to me. Should we have some tests for this? Thanks! -John This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Fleshgrinder commented on pull request #9369: KAFKA-4715: Ignore case of CompressionType and OffsetResetStrategy
Fleshgrinder commented on pull request #9369: URL: https://github.com/apache/kafka/pull/9369#issuecomment-703703414 > The code change looks good to me. Should we have some tests for this? Always I'd say. 😊 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Fleshgrinder opened a new pull request #9374: MINOR: Fix NPE in KafkaAdminClient.describeUserScramCredentials
Fleshgrinder opened a new pull request #9374: URL: https://github.com/apache/kafka/pull/9374 This bug was detected as part of #9370 and @rondagostino and I decided that this should be fixed right away and not until the other PR eventually gets merged (or not). I am not only properly handling `null` everywhere but also rewrote the function to be more readable (and more efficient, but I think this is not important here). I extended the existing test to go through all possible permutations of the users argument to make sure that we never get an NPE. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Fleshgrinder commented on a change in pull request #9374: MINOR: Fix NPE in KafkaAdminClient.describeUserScramCredentials
Fleshgrinder commented on a change in pull request #9374: URL: https://github.com/apache/kafka/pull/9374#discussion_r499687848 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4166,10 +4167,22 @@ public DescribeUserScramCredentialsResult describeUserScramCredentials(List Review comment: `users.stream()` is the NPE source This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on a change in pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered
lkokhreidze commented on a change in pull request #9237: URL: https://github.com/apache/kafka/pull/9237#discussion_r499691112 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -633,10 +633,20 @@ public final void addInternalTopic(final String topicName, } public final void copartitionSources(final Collection sourceNodes) { -copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes))); +copartitionSourceGroups.add(new HashSet<>(sourceNodes)); } -public void validateCopartition() { +public final void maybeUpdateCopartitionSourceGroups(final String replacedNodeName, + final String optimizedNodeName) { +for (final Set copartitionSourceGroup : copartitionSourceGroups) { +if (copartitionSourceGroup.contains(replacedNodeName)) { +copartitionSourceGroup.remove(replacedNodeName); +copartitionSourceGroup.add(optimizedNodeName); +} +} +} + +public synchronized void validateCopartition() { Review comment: Sorry, somehow missed this one. On it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on a change in pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered
lkokhreidze commented on a change in pull request #9237: URL: https://github.com/apache/kafka/pull/9237#discussion_r499692083 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -633,10 +633,20 @@ public final void addInternalTopic(final String topicName, } public final void copartitionSources(final Collection sourceNodes) { -copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes))); +copartitionSourceGroups.add(new HashSet<>(sourceNodes)); } -public void validateCopartition() { +public final void maybeUpdateCopartitionSourceGroups(final String replacedNodeName, + final String optimizedNodeName) { +for (final Set copartitionSourceGroup : copartitionSourceGroups) { +if (copartitionSourceGroup.contains(replacedNodeName)) { +copartitionSourceGroup.remove(replacedNodeName); +copartitionSourceGroup.add(optimizedNodeName); +} +} +} + +public synchronized void validateCopartition() { Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RamanVerma commented on a change in pull request #9364: KAFKA-10471 Mark broker crash during log loading as unclean shutdown
RamanVerma commented on a change in pull request #9364: URL: https://github.com/apache/kafka/pull/9364#discussion_r499715151 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -244,7 +244,8 @@ class Log(@volatile private var _dir: File, val producerIdExpirationCheckIntervalMs: Int, val topicPartition: TopicPartition, val producerStateManager: ProducerStateManager, - logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup { + logDirFailureChannel: LogDirFailureChannel, + val hadCleanShutdown: Boolean = true) extends Logging with KafkaMetricsGroup { Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10574) Infinite loop in SimpleHeaderConverter and Values classes
Chris Egerton created KAFKA-10574: - Summary: Infinite loop in SimpleHeaderConverter and Values classes Key: KAFKA-10574 URL: https://issues.apache.org/jira/browse/KAFKA-10574 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.5.1, 2.6.0, 2.4.1, 2.5.0, 2.3.1, 2.4.0, 2.2.2, 2.2.1, 2.3.0, 2.1.1, 2.2.0, 2.1.0, 2.0.1, 2.0.0, 1.1.1, 1.1.0, 1.1.2, 2.0.2 Reporter: Chris Egerton Assignee: Chris Egerton A header value with the byte sequence {{0xEF, 0xBF, 0xBF}} will cause an infinite loop in the {{Values::parseString}} method. Since that method is invoked by the default header converter ({{SimpleHeaderConverter}}), any sink record with that byte array will, by default, cause a sink task reading that record to stall forever. This occurs because that byte sequence, when parsed as a UTF-8 string and then read by a [StringCharacterIterator|https://docs.oracle.com/javase/8/docs/api/java/text/StringCharacterIterator.html], causes the [CharacterIterator.DONE|https://docs.oracle.com/javase/8/docs/api/java/text/CharacterIterator.html#DONE] character to be returned from [StringCharacterIterator::current|https://docs.oracle.com/javase/8/docs/api/java/text/StringCharacterIterator.html#current--], [StringCharacterIterator::next|https://docs.oracle.com/javase/8/docs/api/java/text/StringCharacterIterator.html#next--], etc., and a check for that character is used by the {{Values}} class for its parsing logic. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10574) Infinite loop in SimpleHeaderConverter and Values classes
[ https://issues.apache.org/jira/browse/KAFKA-10574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208163#comment-17208163 ] Chris Egerton edited comment on KAFKA-10574 at 10/5/20, 4:18 PM: - This can be reproduced by adding the following test case to the {{ValuesTest}} class (beware that running this test will cause an infinite loop and it will need to be terminated manually): {code:java} @Test public void shouldNotEncounterInfiniteLoop() { byte[] bytes = new byte[] { -17, -65, -65 }; String str = new String(bytes, StandardCharsets.UTF_8); Values.parseString(str); } {code} was (Author: chrisegerton): This can be reproduced by adding the following test case to the {{ValuesTest}} class: {code:java} @Test public void shouldNotEncounterInfiniteLoop() { byte[] bytes = new byte[] { -17, -65, -65 }; String str = new String(bytes, StandardCharsets.UTF_8); Values.parseString(str); } {code} > Infinite loop in SimpleHeaderConverter and Values classes > - > > Key: KAFKA-10574 > URL: https://issues.apache.org/jira/browse/KAFKA-10574 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0, 1.1.1, 1.1.2, 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, > 2.0.2, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > A header value with the byte sequence {{0xEF, 0xBF, 0xBF}} will cause an > infinite loop in the {{Values::parseString}} method. Since that method is > invoked by the default header converter ({{SimpleHeaderConverter}}), any sink > record with that byte array will, by default, cause a sink task reading that > record to stall forever. > This occurs because that byte sequence, when parsed as a UTF-8 string and > then read by a > [StringCharacterIterator|https://docs.oracle.com/javase/8/docs/api/java/text/StringCharacterIterator.html], > causes the > [CharacterIterator.DONE|https://docs.oracle.com/javase/8/docs/api/java/text/CharacterIterator.html#DONE] > character to be returned from > [StringCharacterIterator::current|https://docs.oracle.com/javase/8/docs/api/java/text/StringCharacterIterator.html#current--], > > [StringCharacterIterator::next|https://docs.oracle.com/javase/8/docs/api/java/text/StringCharacterIterator.html#next--], > etc., and a check for that character is used by the {{Values}} class for its > parsing logic. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] RamanVerma commented on a change in pull request #9364: KAFKA-10471 Mark broker crash during log loading as unclean shutdown
RamanVerma commented on a change in pull request #9364: URL: https://github.com/apache/kafka/pull/9364#discussion_r499718784 ## File path: core/src/test/scala/unit/kafka/log/LogTest.scala ## @@ -4447,9 +4504,10 @@ class LogTest { private def recoverAndCheck(config: LogConfig, expectedKeys: Iterable[Long], - expectDeletedFiles: Boolean = true): Log = { + expectDeletedFiles: Boolean = true, Review comment: I think you meant the `hadCleanShutdown` parameter I added. I did not add the `expectDeletedFiles` parameter. I will remove the new parameter and add a comment here to indicate that the method always assumes we had a hard reset. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10574) Infinite loop in SimpleHeaderConverter and Values classes
[ https://issues.apache.org/jira/browse/KAFKA-10574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208163#comment-17208163 ] Chris Egerton commented on KAFKA-10574: --- This can be reproduced by adding the following test case to the {{ValuesTest}} class: {code:java} @Test public void shouldNotEncounterInfiniteLoop() { byte[] bytes = new byte[] { -17, -65, -65 }; String str = new String(bytes, StandardCharsets.UTF_8); Values.parseString(str); } {code} > Infinite loop in SimpleHeaderConverter and Values classes > - > > Key: KAFKA-10574 > URL: https://issues.apache.org/jira/browse/KAFKA-10574 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0, 1.1.1, 1.1.2, 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, > 2.0.2, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1, 2.5.0, 2.4.1, 2.6.0, 2.5.1 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > A header value with the byte sequence {{0xEF, 0xBF, 0xBF}} will cause an > infinite loop in the {{Values::parseString}} method. Since that method is > invoked by the default header converter ({{SimpleHeaderConverter}}), any sink > record with that byte array will, by default, cause a sink task reading that > record to stall forever. > This occurs because that byte sequence, when parsed as a UTF-8 string and > then read by a > [StringCharacterIterator|https://docs.oracle.com/javase/8/docs/api/java/text/StringCharacterIterator.html], > causes the > [CharacterIterator.DONE|https://docs.oracle.com/javase/8/docs/api/java/text/CharacterIterator.html#DONE] > character to be returned from > [StringCharacterIterator::current|https://docs.oracle.com/javase/8/docs/api/java/text/StringCharacterIterator.html#current--], > > [StringCharacterIterator::next|https://docs.oracle.com/javase/8/docs/api/java/text/StringCharacterIterator.html#next--], > etc., and a check for that character is used by the {{Values}} class for its > parsing logic. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] RamanVerma commented on a change in pull request #9364: KAFKA-10471 Mark broker crash during log loading as unclean shutdown
RamanVerma commented on a change in pull request #9364: URL: https://github.com/apache/kafka/pull/9364#discussion_r499724455 ## File path: core/src/test/scala/unit/kafka/log/LogTest.scala ## @@ -2882,11 +2953,8 @@ class LogTest { records.foreach(segment.append _) segment.close() -// Create clean shutdown file so that we do not split during the load -createCleanShutdownFile() - val logConfig = LogTest.createLogConfig(indexIntervalBytes = 1, fileDeleteDelayMs = 1000) -val log = createLog(logDir, logConfig, recoveryPoint = Long.MaxValue) +val log = createLog(logDir, logConfig, recoveryPoint = Long.MaxValue, lastShutdownClean = true) Review comment: Removed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
wcarlson5 commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499729181 ## File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public interface StreamsUncaughtExceptionHandler { +/** + * Inspect a record and the exception received. + * @param exception the actual exception + */ +StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse handle(final Exception exception); Review comment: Hmmm that make sense mostly. However it seems to handle the application shutdown case we need to the keep the thread alive to trigger a rebalance. Could we somehow remove the option of shutdown for some cases? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RamanVerma commented on a change in pull request #9364: KAFKA-10471 Mark broker crash during log loading as unclean shutdown
RamanVerma commented on a change in pull request #9364: URL: https://github.com/apache/kafka/pull/9364#discussion_r499731061 ## File path: core/src/test/scala/unit/kafka/log/LogTest.scala ## @@ -3073,9 +3139,8 @@ class LogTest { // check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the // clean shutdown file exists. recoveryPoint = log.logEndOffset -log = createLog(logDir, logConfig) +log = createLog(logDir, logConfig, lastShutdownClean = true) Review comment: Removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
wcarlson5 commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499732219 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java ## @@ -54,6 +54,8 @@ public void onPartitionsAssigned(final Collection partitions) { if (assignmentErrorCode.get() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) { log.error("Received error code {}", assignmentErrorCode.get()); throw new MissingSourceTopicException("One or more source topics were missing during rebalance"); +} else if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) { +streamThread.shutdown(); //TODO: 663 should set client to error if all streams are dead Review comment: that is legacy, removed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
wcarlson5 commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499735461 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -616,7 +616,7 @@ public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState( final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); streams.start(); try { -streams.setUncaughtExceptionHandler(null); + streams.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler) null); Review comment: The test isn't really related to null its check to see if the client is in the correct state to set the handler. The reason I added the cast is because the complier complains otherwise. I added the same test for the new handler below. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
wcarlson5 commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499737675 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -529,8 +541,7 @@ public void run() { } } -log.error("Encountered the following exception during processing " + -"and the thread is going to shut down: ", e); +handleStreamsUncaughtException(e); throw e; Review comment: you are right. It caused a problem with Clients with a single thread as well I moved it into run loop and this resolved the problems This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request #9375: KAFKA-10574: Fix infinite loop in Values::parseString
C0urante opened a new pull request #9375: URL: https://github.com/apache/kafka/pull/9375 [Jira](https://issues.apache.org/jira/browse/KAFKA-10574) The special byte sequence `0xEF, 0xBF, 0xBF`, when parsed as a UTF-8 string, causes the `StringCharacterIterator` to return `CharacterIterator.DONE` from its `next()`, `current()`, and other similar methods. This caused an infinite loop in the `Values` class whenever that byte sequence was encountered. The fix is pretty simple. To see if we're at the end of a string, we compare `StandardCharacterIterator::getIndex` to `StandardCharacterIterator::getEndIndex`, instead of comparing the last-read character to `CharacterIterator.DONE` (since that character may occur in strings that we're parsing). I've added a unit test that replicates this bug (when the fix in the `Values` class is not present). I gave it a timeout of five seconds in case someone accidentally re-creates the infinite loop in order to save some time and potential confusion. All existing unit tests for the `Values` class pass with this change. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #9375: KAFKA-10574: Fix infinite loop in Values::parseString
C0urante commented on pull request #9375: URL: https://github.com/apache/kafka/pull/9375#issuecomment-703756981 @rhauch @kkonstantine can one of you take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
wcarlson5 commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499739384 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -436,6 +496,8 @@ private void maybeSetError() { } if (setState(State.ERROR)) { +metrics.close(); Review comment: Maybe when we transition out of error we should restart them. I don't think the metrics thread really matters, but I closed the the state cleaner bc @cadonna thought it might cause state loss we would want to avoid This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
wcarlson5 commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499739935 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -550,6 +561,10 @@ void runLoop() { // until the rebalance is completed before we close and commit the tasks while (isRunning() || taskManager.isRebalanceInProgress()) { try { +if (shutdownRequested.get()) { +sendShutdownRequest(shutdownTypeRequested); +return; Review comment: you are right again. See previous reponse This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
wcarlson5 commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499740251 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly + * terminates due to an uncaught exception. + * + * @param eh the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes the current handler + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler eh) { Review comment: should I also change the old handler? it uses the same name This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
wcarlson5 commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499741079 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java ## @@ -82,11 +83,21 @@ public SubscriptionInfo(final int version, final UUID processId, final String userEndPoint, final Map taskOffsetSums) { +this(version, latestSupportedVersion, processId, userEndPoint, taskOffsetSums, new AtomicInteger(0)); +} + +public SubscriptionInfo(final int version, +final int latestSupportedVersion, +final UUID processId, +final String userEndPoint, +final Map taskOffsetSums, +final AtomicInteger shutdownRequested) { Review comment: sure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on a change in pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered
lkokhreidze commented on a change in pull request #9237: URL: https://github.com/apache/kafka/pull/9237#discussion_r485886423 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java ## @@ -0,0 +1,242 @@ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(value = Parameterized.class) +@Category({IntegrationTest.class}) +public class StreamTableJoinTopologyOptimizationIntegrationTest { Review comment: There's already another `StreamTableIntegrationTest` present, but it works with `TopologyTestDriver` so I thought it would be better and easier to keep them separate. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10559) Don't shutdown the entire app upon TimeoutException during internal topic validation
[ https://issues.apache.org/jira/browse/KAFKA-10559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208194#comment-17208194 ] Sagar Rao commented on KAFKA-10559: --- hey [~ableegoldman], I can pick this one if needed? Is there anything more that you would want to add apart ffrom the nicely worded description? > Don't shutdown the entire app upon TimeoutException during internal topic > validation > > > Key: KAFKA-10559 > URL: https://issues.apache.org/jira/browse/KAFKA-10559 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.7.0 > > > During some of the KIP-572 work, we made things pretty brittle by changing > the StreamsPartitionAssignor to send the `INCOMPLETE_SOURCE_TOPIC_METADATA` > error code and shut down the entire application if a TimeoutException is hit > during the internal topic creation/validation. > Internal topic validation occurs during every rebalance, and we have seen it > time out on topic discovery in unstable environments. So shutting down the > entire application seems like a step in the wrong direction, and antithetical > to the goal of KIP-572 (improving the resiliency of Streams in the face of > TimeoutExceptions) > I'm not totally sure what the previous behavior was, but it seems to me we > have three options: > # Rethrow the TimeoutException and allow it to kill the thread > # Swallow the TimeoutException and retry the rebalance indefinitely > # Some combination of the above: swallow the TimeoutException but don't > retry indefinitely: > ## Start a timer and allow retrying rebalances for up the configured > task.timeout.ms, the timeout config introduced in KIP-572 > ## Retry for some constant number of rebalances > I think if we go with option 3, then shutting down the entire application is > relatively more palatable, as we have given the environment a chance to > stabilize. > But, killing the thread still seems preferable, given the two new features > that are coming out soon: the ability to start up new threads, and the > improved exception handler that allows the user to choose to shut down the > entire application if that's really what they want. Once users have this > level of control over the application, we should allow them to decide how > they want to handle exceptional cases like this, rather than forcing an > option on them (eg shutdown everything) > > Imo we should fix this before 2.7 comes out, even if it's just a partial fix > (eg we do option 1 in 2.7, but plan to implement option 3 eventually) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-7334) Suggest changing config for state.dir in case of FileNotFoundException
[ https://issues.apache.org/jira/browse/KAFKA-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-7334: --- Labels: newbie (was: ) > Suggest changing config for state.dir in case of FileNotFoundException > -- > > Key: KAFKA-7334 > URL: https://issues.apache.org/jira/browse/KAFKA-7334 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu >Priority: Major > Labels: newbie > > Quoting stack trace from KAFKA-5998 : > {code} > WARN [2018-08-22 03:17:03,745] > org.apache.kafka.streams.processor.internals.ProcessorStateManager: task > [0_45] Failed to write offset checkpoint file to > /tmp/kafka-streams/ > {{ /0_45/.checkpoint: {}}} > {{ ! java.nio.file.NoSuchFileException: > /tmp/kafka-streams//0_45/.checkpoint.tmp}} > {{ ! at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)}} > {{ ! at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)}} > {code} > When state.dir is left at default configuration, there is a chance that > certain files under the state directory are cleaned by OS since the default > dir starts with /tmp/kafka-streams. > [~mjsax] and I proposed to suggest user, through exception message, to change > the location for state.dir . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine commented on a change in pull request #9375: KAFKA-10574: Fix infinite loop in Values::parseString
kkonstantine commented on a change in pull request #9375: URL: https://github.com/apache/kafka/pull/9375#discussion_r499750739 ## File path: connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java ## @@ -73,6 +76,15 @@ INT_LIST.add(-987654321); } +@Test(timeout = 5000) +public void shouldNotEncounterInfiniteLoop() { +byte[] bytes = new byte[] { -17, -65, -65 }; Review comment: we need a comment here to explain things. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #9375: KAFKA-10574: Fix infinite loop in Values::parseString
C0urante commented on a change in pull request #9375: URL: https://github.com/apache/kafka/pull/9375#discussion_r499754411 ## File path: connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java ## @@ -73,6 +76,15 @@ INT_LIST.add(-987654321); } +@Test(timeout = 5000) +public void shouldNotEncounterInfiniteLoop() { +byte[] bytes = new byte[] { -17, -65, -65 }; Review comment: 👍 good call, will add This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #9375: KAFKA-10574: Fix infinite loop in Values::parseString
C0urante commented on a change in pull request #9375: URL: https://github.com/apache/kafka/pull/9375#discussion_r499754411 ## File path: connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java ## @@ -73,6 +76,15 @@ INT_LIST.add(-987654321); } +@Test(timeout = 5000) +public void shouldNotEncounterInfiniteLoop() { +byte[] bytes = new byte[] { -17, -65, -65 }; Review comment: 👍 good call, will add This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #9375: KAFKA-10574: Fix infinite loop in Values::parseString
C0urante commented on pull request #9375: URL: https://github.com/apache/kafka/pull/9375#issuecomment-703775022 Thanks @kkonstantine, I've added a comment and addressed the Checkstyle issues. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10436) Implement KIP-478 Topology changes
[ https://issues.apache.org/jira/browse/KAFKA-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10436: - Component/s: streams > Implement KIP-478 Topology changes > -- > > Key: KAFKA-10436 > URL: https://issues.apache.org/jira/browse/KAFKA-10436 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes
[ https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10437: - Component/s: streams-test-utils > KIP-478: Implement test-utils changes > - > > Key: KAFKA-10437 > URL: https://issues.apache.org/jira/browse/KAFKA-10437 > Project: Kafka > Issue Type: Sub-task > Components: streams, streams-test-utils >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes
[ https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10437: - Component/s: (was: streams-test-utils) > KIP-478: Implement test-utils changes > - > > Key: KAFKA-10437 > URL: https://issues.apache.org/jira/browse/KAFKA-10437 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes
[ https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10437: - Component/s: streams > KIP-478: Implement test-utils changes > - > > Key: KAFKA-10437 > URL: https://issues.apache.org/jira/browse/KAFKA-10437 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10535) KIP-478: Implement StateStoreContext and Record
[ https://issues.apache.org/jira/browse/KAFKA-10535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10535: - Component/s: streams > KIP-478: Implement StateStoreContext and Record > --- > > Key: KAFKA-10535 > URL: https://issues.apache.org/jira/browse/KAFKA-10535 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10535) KIP-478: Implement StateStoreContext and Record
[ https://issues.apache.org/jira/browse/KAFKA-10535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10535: - Fix Version/s: 2.7.0 > KIP-478: Implement StateStoreContext and Record > --- > > Key: KAFKA-10535 > URL: https://issues.apache.org/jira/browse/KAFKA-10535 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.7.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10536) KIP-478: Implement KStream changes
[ https://issues.apache.org/jira/browse/KAFKA-10536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10536: - Component/s: streams > KIP-478: Implement KStream changes > -- > > Key: KAFKA-10536 > URL: https://issues.apache.org/jira/browse/KAFKA-10536 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10535) KIP-478: Implement StateStoreContext and Record
[ https://issues.apache.org/jira/browse/KAFKA-10535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-10535. -- Resolution: Fixed > KIP-478: Implement StateStoreContext and Record > --- > > Key: KAFKA-10535 > URL: https://issues.apache.org/jira/browse/KAFKA-10535 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10536) KIP-478: Implement KStream changes
[ https://issues.apache.org/jira/browse/KAFKA-10536?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10536: - Fix Version/s: 2.7.0 > KIP-478: Implement KStream changes > -- > > Key: KAFKA-10536 > URL: https://issues.apache.org/jira/browse/KAFKA-10536 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.7.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10437) KIP-478: Implement test-utils changes
[ https://issues.apache.org/jira/browse/KAFKA-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10437: - Fix Version/s: 2.7.0 > KIP-478: Implement test-utils changes > - > > Key: KAFKA-10437 > URL: https://issues.apache.org/jira/browse/KAFKA-10437 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.7.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10537) Convert KStreamImpl filters to new PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10537: - Component/s: streams > Convert KStreamImpl filters to new PAPI > --- > > Key: KAFKA-10537 > URL: https://issues.apache.org/jira/browse/KAFKA-10537 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10542) Convert KTable maps to new PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10542: - Component/s: streams > Convert KTable maps to new PAPI > --- > > Key: KAFKA-10542 > URL: https://issues.apache.org/jira/browse/KAFKA-10542 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10543) Convert KTable joins to new PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10543: - Component/s: streams > Convert KTable joins to new PAPI > > > Key: KAFKA-10543 > URL: https://issues.apache.org/jira/browse/KAFKA-10543 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10541) Convert KTable filters to new PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10541: - Component/s: streams > Convert KTable filters to new PAPI > -- > > Key: KAFKA-10541 > URL: https://issues.apache.org/jira/browse/KAFKA-10541 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10540) Convert KStream aggregations to new PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10540: - Component/s: streams > Convert KStream aggregations to new PAPI > > > Key: KAFKA-10540 > URL: https://issues.apache.org/jira/browse/KAFKA-10540 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10539) Convert KStreamImpl joins to new PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10539: - Component/s: streams > Convert KStreamImpl joins to new PAPI > - > > Key: KAFKA-10539 > URL: https://issues.apache.org/jira/browse/KAFKA-10539 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10538) Convert KStreamImpl maps to new PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10538: - Component/s: streams > Convert KStreamImpl maps to new PAPI > > > Key: KAFKA-10538 > URL: https://issues.apache.org/jira/browse/KAFKA-10538 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10544) Convert KTable aggregations to new PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10544: - Component/s: streams > Convert KTable aggregations to new PAPI > --- > > Key: KAFKA-10544 > URL: https://issues.apache.org/jira/browse/KAFKA-10544 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10546) KIP-478: Deprecate old PAPI
[ https://issues.apache.org/jira/browse/KAFKA-10546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10546: - Component/s: streams > KIP-478: Deprecate old PAPI > --- > > Key: KAFKA-10546 > URL: https://issues.apache.org/jira/browse/KAFKA-10546 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > > Can't be done until after the DSL internals are migrated. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10562) Delegate the store wrappers to the new init method
[ https://issues.apache.org/jira/browse/KAFKA-10562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10562: - Component/s: streams > Delegate the store wrappers to the new init method > -- > > Key: KAFKA-10562 > URL: https://issues.apache.org/jira/browse/KAFKA-10562 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 2.7.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10562) KIP-478: Delegate the store wrappers to the new init method
[ https://issues.apache.org/jira/browse/KAFKA-10562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10562: - Summary: KIP-478: Delegate the store wrappers to the new init method (was: Delegate the store wrappers to the new init method) > KIP-478: Delegate the store wrappers to the new init method > --- > > Key: KAFKA-10562 > URL: https://issues.apache.org/jira/browse/KAFKA-10562 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 2.7.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10530) kafka-streams-application-reset misses some internal topics
[ https://issues.apache.org/jira/browse/KAFKA-10530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-10530. -- Resolution: Duplicate Closing now, since this seems like a duplicate report, and visual code inspection indicates it should have been fixed. If you do still see this [~oweiler] , please feel free to re-open the ticket. > kafka-streams-application-reset misses some internal topics > --- > > Key: KAFKA-10530 > URL: https://issues.apache.org/jira/browse/KAFKA-10530 > Project: Kafka > Issue Type: Bug > Components: streams, tools >Affects Versions: 2.6.0 >Reporter: Oliver Weiler >Priority: Major > > While the \{{kafka-streams-application-reset}} tool works in most cases, it > misses some internal topics when using {{Foreign Key Table-Table Joins}}. > After execution, there are still two internal topics left which were not > deleted > {code} > bv4-indexer-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic > bbv4-indexer-717e6cc5-acb2-498d-9d08-4814aaa71c81-StreamThread-1-consumer > bbv4-indexer-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-14-topic > {code} > The reason seems to be the {{StreamsResetter.isInternalTopic}} which requires > the internal topic to end with {{-changelog}} or {{-repartition}} (which the > mentioned topics don't). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10555) Improve client state machine
[ https://issues.apache.org/jira/browse/KAFKA-10555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208231#comment-17208231 ] Sophie Blee-Goldman commented on KAFKA-10555: - Just to clarify, I do agree with Matthias that we shouldn't transit to ERROR if the last stream thread is removed via the new removeStreamThread() method. I thought we were only considering to transit to ERROR if the last thread died, but to transit to NOT_RUNNING if the last thread was removed by the user. This seems consistent with the current behavior and maintains the same semantic meaning of the ERROR state, imo. I don't think we can say that "transiting to ERROR if the last thread is removed" is following the current behavior, because there is no way to remove a thread at the moment. So, we should just do what makes the most sense for this case. Personally I think that would be to transit to NOT_RUNNING, since this is not an error or exceptional case but rather a valid user action. I also agree with something that [~vvcephei] suggested earlier, which is that this should be part of the KIP discussion. At the very least, we should raise the final proposal on the discussion thread in case there are any objections. > Improve client state machine > > > Key: KAFKA-10555 > URL: https://issues.apache.org/jira/browse/KAFKA-10555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > Labels: needs-kip > > The KafkaStreams client exposes its state to the user for monitoring purpose > (ie, RUNNING, REBALANCING etc). The state of the client depends on the > state(s) of the internal StreamThreads that have their own states. > Furthermore, the client state has impact on what the user can do with the > client. For example, active task can only be queried in RUNNING state and > similar. > With KIP-671 and KIP-663 we improved error handling capabilities and allow to > add/remove stream thread dynamically. We allow adding/removing threads only > in RUNNING and REBALANCING state. This puts us in a "weird" position, because > if we enter ERROR state (ie, if the last thread dies), we cannot add new > threads and longer. However, if we have multiple threads and one dies, we > don't enter ERROR state and do allow to recover the thread. > Before the KIPs the definition of ERROR state was clear, however, with both > KIPs it seem that we should revisit its semantics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9585) Flaky Test: LagFetchIntegrationTest#shouldFetchLagsDuringRebalancingWithOptimization
[ https://issues.apache.org/jira/browse/KAFKA-9585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna resolved KAFKA-9585. -- Resolution: Cannot Reproduce > Flaky Test: > LagFetchIntegrationTest#shouldFetchLagsDuringRebalancingWithOptimization > > > Key: KAFKA-9585 > URL: https://issues.apache.org/jira/browse/KAFKA-9585 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: flaky-test > > Failed for me locally with > {noformat} > java.lang.AssertionError: Condition not met within timeout 12. Should > obtain non-empty lag information eventually > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
junrao commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r499753420 ## File path: core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala ## @@ -185,7 +185,7 @@ class BrokerEndPointTest { "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"], "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"}, "rack":"dc1", - "features": {"feature1": {"min_version": 1, "max_version": 2}, "feature2": {"min_version": 2, "max_version": 4}} + "features": {"feature1": {"min_version": 1, "first_active_version": 1, "max_version": 2}, "feature2": {"min_version": 2, "first_active_version": 2, "max_version": 4}} Review comment: Should we revert the changes here? ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -272,6 +281,161 @@ class KafkaController(val config: KafkaConfig, } } + private def createFeatureZNode(newNode: FeatureZNode): Int = { +info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode") +zkClient.createFeatureZNode(newNode) +val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path) +newVersion + } + + private def updateFeatureZNode(updatedNode: FeatureZNode): Int = { +info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode") +zkClient.updateFeatureZNode(updatedNode) + } + + /** + * This method enables the feature versioning system (KIP-584). + * + * Development in Kafka (from a high level) is organized into features. Each feature is tracked by + * a name and a range of version numbers. A feature can be of two types: + * + * 1. Supported feature: + * A supported feature is represented by a name (string) and a range of versions (defined by a + * SupportedVersionRange). It refers to a feature that a particular broker advertises support for. + * Each broker advertises the version ranges of its own supported features in its own + * BrokerIdZNode. The contents of the advertisement are specific to the particular broker and + * do not represent any guarantee of a cluster-wide availability of the feature for any particular + * range of versions. + * + * 2. Finalized feature: + * A finalized feature is represented by a name (string) and a range of version levels (defined + * by a FinalizedVersionRange). Whenever the feature versioning system (KIP-584) is + * enabled, the finalized features are stored in the cluster-wide common FeatureZNode. + * In comparison to a supported feature, the key difference is that a finalized feature exists + * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a + * specified range of version levels. Also, the controller is the only entity modifying the + * information about finalized features. + * + * This method sets up the FeatureZNode with enabled status, which means that the finalized + * features stored in the FeatureZNode are active. The enabled status should be written by the + * controller to the FeatureZNode only when the broker IBP config is greater than or equal to + * KAFKA_2_7_IV0. + * + * There are multiple cases handled here: + * + * 1. New cluster bootstrap: + *A new Kafka cluster (i.e. it is deployed first time) is almost always started with IBP config + *setting greater than or equal to KAFKA_2_7_IV0. We would like to start the cluster with all + *the possible supported features finalized immediately. Assuming this is the case, the + *controller will start up and notice that the FeatureZNode is absent in the new cluster, + *it will then create a FeatureZNode (with enabled status) containing the entire list of + *supported features as its finalized features. + * + * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0: + *Imagine there was an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, and the + *broker binary has now been upgraded to a newer version that supports the feature versioning + *system (KIP-584). But the IBP config is still set to lower than KAFKA_2_7_IV0, and may be + *set to a higher value later. In this case, we want to start with no finalized features and + *allow the user to finalize them whenever they are ready i.e. in the future whenever the + *user sets IBP config to be greater than or equal to KAFKA_2_7_IV0, then the user could start + *finalizing the features. This process ensures we do not enable all the possible features + *immediately after an upgrade, which could be harmful to Kafka. + *This is how we handle such a case: + * - Before the IBP config upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the + *controller will start up and check if the FeatureZNode is absen
[GitHub] [kafka] bbejeck commented on pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered
bbejeck commented on pull request #9237: URL: https://github.com/apache/kafka/pull/9237#issuecomment-703803731 @lkokhreidze, thanks for the quick update. I'll make another pass soon. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] soondenana commented on pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call
soondenana commented on pull request #9347: URL: https://github.com/apache/kafka/pull/9347#issuecomment-703805880 There was an error when building `streams.examples`: ``` [2020-10-05T08:40:05.722Z] [ERROR] Failed to execute goal org.apache.maven.plugins:maven-archetype-plugin:3.2.0:generate (default-cli) on project standalone-pom: A Maven project already exists in the directory /home/jenkins/workspace/Kafka_kafka-pr_PR-9347/streams/quickstart/test-streams-archetype/streams.examples -> [Help 1] ``` The failure is not related to this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10559) Don't shutdown the entire app upon TimeoutException during internal topic validation
[ https://issues.apache.org/jira/browse/KAFKA-10559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208257#comment-17208257 ] Sophie Blee-Goldman commented on KAFKA-10559: - [~sagarrao] Yeah, go ahead! This should be a pretty small PR so it would be great if we could knock it out in the next week or two. Just ping me when it's ready. For the PR itself, I think it sounds reasonable to just rethrow the TimeoutException to kill the thread. The "add/recover stream thread" functionality will probably slip 2.7, but it'll be implemented soon. So we don't really need to go out of our way to save a single thread from death in rare circumstances imo > Don't shutdown the entire app upon TimeoutException during internal topic > validation > > > Key: KAFKA-10559 > URL: https://issues.apache.org/jira/browse/KAFKA-10559 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.7.0 > > > During some of the KIP-572 work, we made things pretty brittle by changing > the StreamsPartitionAssignor to send the `INCOMPLETE_SOURCE_TOPIC_METADATA` > error code and shut down the entire application if a TimeoutException is hit > during the internal topic creation/validation. > Internal topic validation occurs during every rebalance, and we have seen it > time out on topic discovery in unstable environments. So shutting down the > entire application seems like a step in the wrong direction, and antithetical > to the goal of KIP-572 (improving the resiliency of Streams in the face of > TimeoutExceptions) > I'm not totally sure what the previous behavior was, but it seems to me we > have three options: > # Rethrow the TimeoutException and allow it to kill the thread > # Swallow the TimeoutException and retry the rebalance indefinitely > # Some combination of the above: swallow the TimeoutException but don't > retry indefinitely: > ## Start a timer and allow retrying rebalances for up the configured > task.timeout.ms, the timeout config introduced in KIP-572 > ## Retry for some constant number of rebalances > I think if we go with option 3, then shutting down the entire application is > relatively more palatable, as we have given the environment a chance to > stabilize. > But, killing the thread still seems preferable, given the two new features > that are coming out soon: the ability to start up new threads, and the > improved exception handler that allows the user to choose to shut down the > entire application if that's really what they want. Once users have this > level of control over the application, we should allow them to decide how > they want to handle exceptional cases like this, rather than forcing an > option on them (eg shutdown everything) > > Imo we should fix this before 2.7 comes out, even if it's just a partial fix > (eg we do option 1 in 2.7, but plan to implement option 3 eventually) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10559) Don't shutdown the entire app upon TimeoutException during internal topic validation
[ https://issues.apache.org/jira/browse/KAFKA-10559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman reassigned KAFKA-10559: --- Assignee: Sagar Rao > Don't shutdown the entire app upon TimeoutException during internal topic > validation > > > Key: KAFKA-10559 > URL: https://issues.apache.org/jira/browse/KAFKA-10559 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Blocker > Fix For: 2.7.0 > > > During some of the KIP-572 work, we made things pretty brittle by changing > the StreamsPartitionAssignor to send the `INCOMPLETE_SOURCE_TOPIC_METADATA` > error code and shut down the entire application if a TimeoutException is hit > during the internal topic creation/validation. > Internal topic validation occurs during every rebalance, and we have seen it > time out on topic discovery in unstable environments. So shutting down the > entire application seems like a step in the wrong direction, and antithetical > to the goal of KIP-572 (improving the resiliency of Streams in the face of > TimeoutExceptions) > I'm not totally sure what the previous behavior was, but it seems to me we > have three options: > # Rethrow the TimeoutException and allow it to kill the thread > # Swallow the TimeoutException and retry the rebalance indefinitely > # Some combination of the above: swallow the TimeoutException but don't > retry indefinitely: > ## Start a timer and allow retrying rebalances for up the configured > task.timeout.ms, the timeout config introduced in KIP-572 > ## Retry for some constant number of rebalances > I think if we go with option 3, then shutting down the entire application is > relatively more palatable, as we have given the environment a chance to > stabilize. > But, killing the thread still seems preferable, given the two new features > that are coming out soon: the ability to start up new threads, and the > improved exception handler that allows the user to choose to shut down the > entire application if that's really what they want. Once users have this > level of control over the application, we should allow them to decide how > they want to handle exceptional cases like this, rather than forcing an > option on them (eg shutdown everything) > > Imo we should fix this before 2.7 comes out, even if it's just a partial fix > (eg we do option 1 in 2.7, but plan to implement option 3 eventually) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] piotrrzysko commented on pull request #9371: KAFKA-10510: Validate replication factor consistency on reassignment
piotrrzysko commented on pull request #9371: URL: https://github.com/apache/kafka/pull/9371#issuecomment-703823226 Hi @stanislavkozlovski, would you mind taking a look at this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r499810462 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -272,6 +281,161 @@ class KafkaController(val config: KafkaConfig, } } + private def createFeatureZNode(newNode: FeatureZNode): Int = { +info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode") +zkClient.createFeatureZNode(newNode) +val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path) +newVersion + } + + private def updateFeatureZNode(updatedNode: FeatureZNode): Int = { +info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode") +zkClient.updateFeatureZNode(updatedNode) + } + + /** + * This method enables the feature versioning system (KIP-584). + * + * Development in Kafka (from a high level) is organized into features. Each feature is tracked by + * a name and a range of version numbers. A feature can be of two types: + * + * 1. Supported feature: + * A supported feature is represented by a name (string) and a range of versions (defined by a + * SupportedVersionRange). It refers to a feature that a particular broker advertises support for. + * Each broker advertises the version ranges of its own supported features in its own + * BrokerIdZNode. The contents of the advertisement are specific to the particular broker and + * do not represent any guarantee of a cluster-wide availability of the feature for any particular + * range of versions. + * + * 2. Finalized feature: + * A finalized feature is represented by a name (string) and a range of version levels (defined + * by a FinalizedVersionRange). Whenever the feature versioning system (KIP-584) is + * enabled, the finalized features are stored in the cluster-wide common FeatureZNode. + * In comparison to a supported feature, the key difference is that a finalized feature exists + * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a + * specified range of version levels. Also, the controller is the only entity modifying the + * information about finalized features. + * + * This method sets up the FeatureZNode with enabled status, which means that the finalized + * features stored in the FeatureZNode are active. The enabled status should be written by the + * controller to the FeatureZNode only when the broker IBP config is greater than or equal to + * KAFKA_2_7_IV0. + * + * There are multiple cases handled here: + * + * 1. New cluster bootstrap: + *A new Kafka cluster (i.e. it is deployed first time) is almost always started with IBP config + *setting greater than or equal to KAFKA_2_7_IV0. We would like to start the cluster with all + *the possible supported features finalized immediately. Assuming this is the case, the + *controller will start up and notice that the FeatureZNode is absent in the new cluster, + *it will then create a FeatureZNode (with enabled status) containing the entire list of + *supported features as its finalized features. + * + * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0: + *Imagine there was an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, and the + *broker binary has now been upgraded to a newer version that supports the feature versioning + *system (KIP-584). But the IBP config is still set to lower than KAFKA_2_7_IV0, and may be + *set to a higher value later. In this case, we want to start with no finalized features and + *allow the user to finalize them whenever they are ready i.e. in the future whenever the + *user sets IBP config to be greater than or equal to KAFKA_2_7_IV0, then the user could start + *finalizing the features. This process ensures we do not enable all the possible features + *immediately after an upgrade, which could be harmful to Kafka. + *This is how we handle such a case: + * - Before the IBP config upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the + *controller will start up and check if the FeatureZNode is absent. + *- If the node is absent, it will react by creating a FeatureZNode with disabled status + * and empty finalized features. + *- Otherwise, if a node already exists in enabled status then the controller will just + * flip the status to disabled and clear the finalized features. + * - After the IBP config upgrade (i.e. IBP config set to greater than or equal to + *KAFKA_2_7_IV0), when the controller starts up it will check if the FeatureZNode exists + *and whether it is disabled. + * - If the node is in disabled status, the controller won’t upgrade all features immediately. + *
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r499811265 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -272,6 +281,161 @@ class KafkaController(val config: KafkaConfig, } } + private def createFeatureZNode(newNode: FeatureZNode): Int = { +info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode") +zkClient.createFeatureZNode(newNode) +val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path) +newVersion + } + + private def updateFeatureZNode(updatedNode: FeatureZNode): Int = { +info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode") +zkClient.updateFeatureZNode(updatedNode) + } + + /** + * This method enables the feature versioning system (KIP-584). + * + * Development in Kafka (from a high level) is organized into features. Each feature is tracked by + * a name and a range of version numbers. A feature can be of two types: + * + * 1. Supported feature: + * A supported feature is represented by a name (string) and a range of versions (defined by a + * SupportedVersionRange). It refers to a feature that a particular broker advertises support for. + * Each broker advertises the version ranges of its own supported features in its own + * BrokerIdZNode. The contents of the advertisement are specific to the particular broker and + * do not represent any guarantee of a cluster-wide availability of the feature for any particular + * range of versions. + * + * 2. Finalized feature: + * A finalized feature is represented by a name (string) and a range of version levels (defined + * by a FinalizedVersionRange). Whenever the feature versioning system (KIP-584) is + * enabled, the finalized features are stored in the cluster-wide common FeatureZNode. + * In comparison to a supported feature, the key difference is that a finalized feature exists + * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a + * specified range of version levels. Also, the controller is the only entity modifying the + * information about finalized features. + * + * This method sets up the FeatureZNode with enabled status, which means that the finalized + * features stored in the FeatureZNode are active. The enabled status should be written by the + * controller to the FeatureZNode only when the broker IBP config is greater than or equal to + * KAFKA_2_7_IV0. + * + * There are multiple cases handled here: + * + * 1. New cluster bootstrap: + *A new Kafka cluster (i.e. it is deployed first time) is almost always started with IBP config + *setting greater than or equal to KAFKA_2_7_IV0. We would like to start the cluster with all + *the possible supported features finalized immediately. Assuming this is the case, the + *controller will start up and notice that the FeatureZNode is absent in the new cluster, + *it will then create a FeatureZNode (with enabled status) containing the entire list of + *supported features as its finalized features. + * + * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0: + *Imagine there was an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, and the + *broker binary has now been upgraded to a newer version that supports the feature versioning + *system (KIP-584). But the IBP config is still set to lower than KAFKA_2_7_IV0, and may be + *set to a higher value later. In this case, we want to start with no finalized features and + *allow the user to finalize them whenever they are ready i.e. in the future whenever the + *user sets IBP config to be greater than or equal to KAFKA_2_7_IV0, then the user could start + *finalizing the features. This process ensures we do not enable all the possible features + *immediately after an upgrade, which could be harmful to Kafka. + *This is how we handle such a case: + * - Before the IBP config upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the + *controller will start up and check if the FeatureZNode is absent. + *- If the node is absent, it will react by creating a FeatureZNode with disabled status + * and empty finalized features. + *- Otherwise, if a node already exists in enabled status then the controller will just + * flip the status to disabled and clear the finalized features. + * - After the IBP config upgrade (i.e. IBP config set to greater than or equal to + *KAFKA_2_7_IV0), when the controller starts up it will check if the FeatureZNode exists + *and whether it is disabled. + * - If the node is in disabled status, the controller won’t upgrade all features immediately. + *
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r499811752 ## File path: core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala ## @@ -185,7 +185,7 @@ class BrokerEndPointTest { "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"], "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"}, "rack":"dc1", - "features": {"feature1": {"min_version": 1, "max_version": 2}, "feature2": {"min_version": 2, "max_version": 4}} + "features": {"feature1": {"min_version": 1, "first_active_version": 1, "max_version": 2}, "feature2": {"min_version": 2, "first_active_version": 2, "max_version": 4}} Review comment: Done. Nice catch! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch merged pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call
rhauch merged pull request #9347: URL: https://github.com/apache/kafka/pull/9347 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r499812076 ## File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala ## @@ -0,0 +1,580 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.{Optional, Properties} +import java.util.concurrent.ExecutionException + +import kafka.api.KAFKA_2_7_IV0 +import kafka.utils.TestUtils +import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion} +import kafka.utils.TestUtils.waitUntilTrue +import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult} +import org.apache.kafka.common.errors.InvalidRequestException +import org.apache.kafka.common.feature.FinalizedVersionRange +import org.apache.kafka.common.feature.{Features, SupportedVersionRange} +import org.apache.kafka.common.message.UpdateFeaturesRequestData +import org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{UpdateFeaturesRequest, UpdateFeaturesResponse} +import org.apache.kafka.common.utils.Utils +import org.junit.Test +import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue} +import org.scalatest.Assertions.intercept + +import scala.jdk.CollectionConverters._ +import scala.reflect.ClassTag +import scala.util.matching.Regex + +class UpdateFeaturesTest extends BaseRequestTest { + + override def brokerCount = 3 + + override def brokerPropertyOverrides(props: Properties): Unit = { +props.put(KafkaConfig.InterBrokerProtocolVersionProp, KAFKA_2_7_IV0.toString) + } + + private def defaultSupportedFeatures(): Features[SupportedVersionRange] = { +Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3 + } + + private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = { +Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 2 + } + + private def updateSupportedFeatures( +features: Features[SupportedVersionRange], targetServers: Set[KafkaServer]): Unit = { +targetServers.foreach(s => { + s.brokerFeatures.setSupportedFeatures(features) + s.zkClient.updateBrokerInfo(s.createBrokerInfo) +}) + +// Wait until updates to all BrokerZNode supported features propagate to the controller. +val brokerIds = targetServers.map(s => s.config.brokerId) +waitUntilTrue( + () => servers.exists(s => { +if (s.kafkaController.isActive) { + s.kafkaController.controllerContext.liveOrShuttingDownBrokers +.filter(b => brokerIds.contains(b.id)) +.forall(b => { + b.features.equals(features) +}) +} else { + false +} + }), + "Controller did not get broker updates") + } + + private def updateSupportedFeaturesInAllBrokers(features: Features[SupportedVersionRange]): Unit = { +updateSupportedFeatures(features, Set[KafkaServer]() ++ servers) + } + + private def updateFeatureZNode(features: Features[FinalizedVersionRange]): Int = { +val server = serverForId(0).get +val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features) +val newVersion = server.zkClient.updateFeatureZNode(newNode) +servers.foreach(s => { + s.featureCache.waitUntilEpochOrThrow(newVersion, s.config.zkConnectionTimeoutMs) +}) +newVersion + } + + private def getFeatureZNode(): FeatureZNode = { +val (mayBeFeatureZNodeBytes, version) = serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path) +assertNotEquals(version, ZkVersion.UnknownVersion) +FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + } + + private def finalizedFeatures(features: java.util.Map[String, org.apache.kafka.clients.admin.FinalizedVersionRange]): Features[FinalizedVersionRange] = { +Features.finalizedFeatures(features.asScala.map { + case(name, versionRange) => +(name, new FinalizedVersionRange(versionRange.minVersionLevel(), versionRan
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r499816373 ## File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala ## @@ -0,0 +1,580 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.{Optional, Properties} +import java.util.concurrent.ExecutionException + +import kafka.api.KAFKA_2_7_IV0 +import kafka.utils.TestUtils +import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion} +import kafka.utils.TestUtils.waitUntilTrue +import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult} +import org.apache.kafka.common.errors.InvalidRequestException +import org.apache.kafka.common.feature.FinalizedVersionRange +import org.apache.kafka.common.feature.{Features, SupportedVersionRange} +import org.apache.kafka.common.message.UpdateFeaturesRequestData +import org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{UpdateFeaturesRequest, UpdateFeaturesResponse} +import org.apache.kafka.common.utils.Utils +import org.junit.Test +import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue} +import org.scalatest.Assertions.intercept + +import scala.jdk.CollectionConverters._ +import scala.reflect.ClassTag +import scala.util.matching.Regex + +class UpdateFeaturesTest extends BaseRequestTest { + + override def brokerCount = 3 + + override def brokerPropertyOverrides(props: Properties): Unit = { +props.put(KafkaConfig.InterBrokerProtocolVersionProp, KAFKA_2_7_IV0.toString) + } + + private def defaultSupportedFeatures(): Features[SupportedVersionRange] = { +Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3 + } + + private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = { +Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 2 + } + + private def updateSupportedFeatures( +features: Features[SupportedVersionRange], targetServers: Set[KafkaServer]): Unit = { +targetServers.foreach(s => { + s.brokerFeatures.setSupportedFeatures(features) + s.zkClient.updateBrokerInfo(s.createBrokerInfo) +}) + +// Wait until updates to all BrokerZNode supported features propagate to the controller. +val brokerIds = targetServers.map(s => s.config.brokerId) +waitUntilTrue( + () => servers.exists(s => { +if (s.kafkaController.isActive) { + s.kafkaController.controllerContext.liveOrShuttingDownBrokers +.filter(b => brokerIds.contains(b.id)) +.forall(b => { + b.features.equals(features) +}) +} else { + false +} + }), + "Controller did not get broker updates") + } + + private def updateSupportedFeaturesInAllBrokers(features: Features[SupportedVersionRange]): Unit = { +updateSupportedFeatures(features, Set[KafkaServer]() ++ servers) + } + + private def updateFeatureZNode(features: Features[FinalizedVersionRange]): Int = { +val server = serverForId(0).get +val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features) +val newVersion = server.zkClient.updateFeatureZNode(newNode) +servers.foreach(s => { + s.featureCache.waitUntilEpochOrThrow(newVersion, s.config.zkConnectionTimeoutMs) +}) +newVersion + } + + private def getFeatureZNode(): FeatureZNode = { +val (mayBeFeatureZNodeBytes, version) = serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path) +assertNotEquals(version, ZkVersion.UnknownVersion) +FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + } + + private def finalizedFeatures(features: java.util.Map[String, org.apache.kafka.clients.admin.FinalizedVersionRange]): Features[FinalizedVersionRange] = { +Features.finalizedFeatures(features.asScala.map { + case(name, versionRange) => +(name, new FinalizedVersionRange(versionRange.minVersionLevel(), versionRan
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r499816619 ## File path: core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala ## @@ -0,0 +1,580 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.{Optional, Properties} +import java.util.concurrent.ExecutionException + +import kafka.api.KAFKA_2_7_IV0 +import kafka.utils.TestUtils +import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion} +import kafka.utils.TestUtils.waitUntilTrue +import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult} +import org.apache.kafka.common.errors.InvalidRequestException +import org.apache.kafka.common.feature.FinalizedVersionRange +import org.apache.kafka.common.feature.{Features, SupportedVersionRange} +import org.apache.kafka.common.message.UpdateFeaturesRequestData +import org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKeyCollection +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{UpdateFeaturesRequest, UpdateFeaturesResponse} +import org.apache.kafka.common.utils.Utils +import org.junit.Test +import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue} +import org.scalatest.Assertions.intercept + +import scala.jdk.CollectionConverters._ +import scala.reflect.ClassTag +import scala.util.matching.Regex + +class UpdateFeaturesTest extends BaseRequestTest { + + override def brokerCount = 3 + + override def brokerPropertyOverrides(props: Properties): Unit = { +props.put(KafkaConfig.InterBrokerProtocolVersionProp, KAFKA_2_7_IV0.toString) + } + + private def defaultSupportedFeatures(): Features[SupportedVersionRange] = { +Features.supportedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new SupportedVersionRange(1, 3 + } + + private def defaultFinalizedFeatures(): Features[FinalizedVersionRange] = { +Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry("feature_1", new FinalizedVersionRange(1, 2 + } + + private def updateSupportedFeatures( +features: Features[SupportedVersionRange], targetServers: Set[KafkaServer]): Unit = { +targetServers.foreach(s => { + s.brokerFeatures.setSupportedFeatures(features) + s.zkClient.updateBrokerInfo(s.createBrokerInfo) +}) + +// Wait until updates to all BrokerZNode supported features propagate to the controller. +val brokerIds = targetServers.map(s => s.config.brokerId) +waitUntilTrue( + () => servers.exists(s => { +if (s.kafkaController.isActive) { + s.kafkaController.controllerContext.liveOrShuttingDownBrokers +.filter(b => brokerIds.contains(b.id)) +.forall(b => { + b.features.equals(features) +}) +} else { + false +} + }), + "Controller did not get broker updates") + } + + private def updateSupportedFeaturesInAllBrokers(features: Features[SupportedVersionRange]): Unit = { +updateSupportedFeatures(features, Set[KafkaServer]() ++ servers) + } + + private def updateFeatureZNode(features: Features[FinalizedVersionRange]): Int = { +val server = serverForId(0).get +val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, features) +val newVersion = server.zkClient.updateFeatureZNode(newNode) +servers.foreach(s => { + s.featureCache.waitUntilEpochOrThrow(newVersion, s.config.zkConnectionTimeoutMs) +}) +newVersion + } + + private def getFeatureZNode(): FeatureZNode = { +val (mayBeFeatureZNodeBytes, version) = serverForId(0).get.zkClient.getDataAndVersion(FeatureZNode.path) +assertNotEquals(version, ZkVersion.UnknownVersion) +FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + } + + private def finalizedFeatures(features: java.util.Map[String, org.apache.kafka.clients.admin.FinalizedVersionRange]): Features[FinalizedVersionRange] = { +Features.finalizedFeatures(features.asScala.map { + case(name, versionRange) => +(name, new FinalizedVersionRange(versionRange.minVersionLevel(), versionRan
[GitHub] [kafka] dima5rr commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition
dima5rr commented on pull request #9020: URL: https://github.com/apache/kafka/pull/9020#issuecomment-703838454 Hi @guozhangwang can you trigger new build, looks like flaky tests? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10531) KafkaBasedLog can sleep for negative values
[ https://issues.apache.org/jira/browse/KAFKA-10531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-10531: -- Fix Version/s: 2.5.2 2.7.0 > KafkaBasedLog can sleep for negative values > --- > > Key: KAFKA-10531 > URL: https://issues.apache.org/jira/browse/KAFKA-10531 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.6.0 >Reporter: Vikas Singh >Assignee: Vikas Singh >Priority: Major > Fix For: 2.7.0, 2.5.2, 2.6.1 > > > {{time.milliseconds}} is not monotonic, so this code can throw : > {{java.lang.IllegalArgumentException: timeout value is negative}} > > {code:java} > long started = time.milliseconds(); > while (partitionInfos == null && time.milliseconds() - started < > CREATE_TOPIC_TIMEOUT_MS) { > partitionInfos = consumer.partitionsFor(topic); > Utils.sleep(Math.min(time.milliseconds() - started, 1000)); > } > {code} > We need to check for negative value before sleeping. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
vvcephei commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499836489 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly + * terminates due to an uncaught exception. + * + * @param eh the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes the current handler + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler eh) { Review comment: Oh, sure. Now I know why you picked this name :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
vvcephei commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499838819 ## File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public interface StreamsUncaughtExceptionHandler { +/** + * Inspect a record and the exception received. + * @param exception the actual exception + */ +StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse handle(final Exception exception); Review comment: That's a good question. Maybe we could just document that that option is an unsuitable response to an Error, and also log an `ERROR` message if you select it in response to an Error. It's not _always_ bad to ignore an Error, but it usually is. We can leave it to users to decide what they want to do. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
vvcephei commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499842547 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -436,6 +496,8 @@ private void maybeSetError() { } if (setState(State.ERROR)) { +metrics.close(); Review comment: It certainly might. I'm just wary of how far into the uncanny valley we're going here. Streams is going to be put into a state that's very similar to the one that `close()` produces, but not identical. What will then happen when they _do_ call close? OTOH, we could instead change direction on the "error-vs-shutdown" debase and just make all these methods call `close(ZERO)` instead. Then, the _real_ close method will be invoked, and Streams will go through a well-known transition through `PENDING_SHUTDOWN` to `NOT_RUNNING`. It would then be a problem for a later date (after KIP-663) if someone wanted to request that instead the app should stop all running threads so they can manually call "addThread" later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
vvcephei commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499842547 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -436,6 +496,8 @@ private void maybeSetError() { } if (setState(State.ERROR)) { +metrics.close(); Review comment: It certainly might. I'm just wary of how far into the uncanny valley we're going here. Streams is going to be put into a state that's very similar to the one that `close()` produces, but not identical. What will then happen when they _do_ call close? What will happen when we realize that something else needs to be done as part of closing the instance (will we even remember that we should consider doing it here as well)? OTOH, we could instead change direction on the "error-vs-shutdown" debase and just make all these methods call `close(ZERO)` instead. Then, the _real_ close method will be invoked, and Streams will go through a well-known transition through `PENDING_SHUTDOWN` to `NOT_RUNNING`. It would then be a problem for a later date (after KIP-663) if someone wanted to request that instead the app should stop all running threads so they can manually call "addThread" later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
vvcephei commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499843780 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -616,7 +616,7 @@ public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState( final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); streams.start(); try { -streams.setUncaughtExceptionHandler(null); + streams.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler) null); Review comment: Ah, gotcha. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r499847021 ## File path: core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala ## @@ -715,7 +747,58 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { doAnswer((_: InvocationOnMock) => { latch.countDown() }).doCallRealMethod().when(spyThread).awaitShutdown() -controller.shutdown() +controller.shutdown() + } + + private def testControllerFeatureZNodeSetup(initialZNode: Option[FeatureZNode], + interBrokerProtocolVersion: ApiVersion): Unit = { +val versionBeforeOpt = initialZNode match { + case Some(node) => +zkClient.createFeatureZNode(node) +Some(zkClient.getDataAndVersion(FeatureZNode.path)._2) + case None => +Option.empty +} +servers = makeServers(1, interBrokerProtocolVersion = Some(interBrokerProtocolVersion)) +TestUtils.waitUntilControllerElected(zkClient) Review comment: Done. Please take a look at the fix. I've added logic to wait for processing on a dummy event just after waiting for controller election. I'm hoping this will make sure the controller failover logic is completed before the test proceeds further to make assertions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9323: KAFKA-10514: Fix unit test for state directory cleanup
vvcephei commented on pull request #9323: URL: https://github.com/apache/kafka/pull/9323#issuecomment-703867815 Cherry-picked to 2.6 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually
vvcephei commented on pull request #9262: URL: https://github.com/apache/kafka/pull/9262#issuecomment-703867931 Cherry-picked to 2.6 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8353: KAFKA-9764: Remove stream simple benchmark suite
vvcephei commented on pull request #8353: URL: https://github.com/apache/kafka/pull/8353#issuecomment-703869908 cherry-picked to 2.5 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on pull request #9001: URL: https://github.com/apache/kafka/pull/9001#issuecomment-703873486 @junrao Thanks for the review! I've addressed the latest comments in e55358fd1a00f12ef98fc4d2d649a297ddf146da . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik edited a comment on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik edited a comment on pull request #9001: URL: https://github.com/apache/kafka/pull/9001#issuecomment-703873486 @junrao Thanks for the review! I've addressed the latest comments in e55358fd1a00f12ef98fc4d2d649a297ddf146da . The PR is ready for another pass. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8353: KAFKA-9764: Remove stream simple benchmark suite
vvcephei commented on pull request #8353: URL: https://github.com/apache/kafka/pull/8353#issuecomment-703874939 Cherry-picked to 2.4 and 2.3 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
wcarlson5 commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499868891 ## File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public interface StreamsUncaughtExceptionHandler { +/** + * Inspect a record and the exception received. + * @param exception the actual exception + */ +StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse handle(final Exception exception); Review comment: That seems to be a fine solution. It will still attempt the shutdown but may fail. As long as we warn I guess it will work This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
wcarlson5 commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499871031 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -436,6 +496,8 @@ private void maybeSetError() { } if (setState(State.ERROR)) { +metrics.close(); Review comment: We did have it this way in the kip. If we stick to this for now I think that we can clear this up easily when we decide what we want to do with the sates in general in when we take care of the discussion in KIP-663 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
wcarlson5 commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r499871426 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -364,6 +368,62 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly + * terminates due to an uncaught exception. + * + * @param eh the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes the current handler + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler eh) { +final StreamsUncaughtExceptionHandler handler = exception -> handleStreamsUncaughtException(exception, eh); +synchronized (stateLock) { +if (state == State.CREATED) { +for (final StreamThread thread : threads) { +if (eh != null) { +thread.setStreamsUncaughtExceptionHandler(handler); +} else { +final StreamsUncaughtExceptionHandler defaultHandler = exception -> + StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse.SHUTDOWN_STREAM_THREAD; + thread.setStreamsUncaughtExceptionHandler(defaultHandler); +} +} +} else { +throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + +"Current state is: " + state); +} +} +} + +private StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse handleStreamsUncaughtException(final Exception e, + final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler.StreamsUncaughtExceptionHandlerResponse action = streamsUncaughtExceptionHandler.handle(e); +switch (action) { +case SHUTDOWN_STREAM_THREAD: +log.error("Encountered the following exception during processing " + +"and the thread is going to shut down: ", e); +break; +case REPLACE_STREAM_THREAD: +log.error("Encountered the following exception during processing " + +"and the the stream thread will be replaced: ", e); //TODO: add then remove, wait until 663 is merged +break; +case SHUTDOWN_KAFKA_STREAMS_CLIENT: +log.error("Encountered the following exception during processing " + +"and the client is going to shut down: ", e); +for (final StreamThread streamThread: threads) { +streamThread.shutdown(); +} Review comment: I am okay with renaming, but I will wait to for everything else to be cleared up to see if it is still necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #9376: MINOR: Remove `TargetVoters` from `DescribeQuorum`
hachikuji opened a new pull request #9376: URL: https://github.com/apache/kafka/pull/9376 This field is leftover from the early days of the KIP when it covered reassignment. Since the API is not exposed yet, should be no harm updating the first version. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208316#comment-17208316 ] Sophie Blee-Goldman commented on KAFKA-5998: [~sandeep.lakdaw...@gmail.com] are you running all three instances on the same machine with a shared state directory? And/or using /tmp as the state directory? > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Assignee: John Roesler >Priority: Critical > Fix For: 2.2.2, 2.4.0, 2.3.1 > > Attachments: 5998.v1.txt, 5998.v2.txt, Kafka5998.zip, Topology.txt, > exc.txt, props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.intern
[GitHub] [kafka] hachikuji merged pull request #9349: MINOR: add proper checks to KafkaConsumer.groupMetadata
hachikuji merged pull request #9349: URL: https://github.com/apache/kafka/pull/9349 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #9377: MINOR: Move `RaftRequestHandler` to `tools` package
hachikuji opened a new pull request #9377: URL: https://github.com/apache/kafka/pull/9377 To avoid confusion since is only used by `TestRaftServer`, this PR moves `RaftRequestHandler` to the `tools` package. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7421) Deadlock in Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-7421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17208344#comment-17208344 ] Kyle Leiby commented on KAFKA-7421: --- Hi all, we've been encountering a similar deadlock (I think the same as the one [~xakassi] is seeing). We are running a single Debezium JAR inside a {{confluentinc/cp-kafka-connect-base:5.5.1-1-deb8}} container. We tried several 5.x Debian 8 images, and encounter the deadlocks in all of them. Here's the relevant portion from an example thread dump: {code:java} Found one Java-level deadlock: = "StartAndStopExecutor-connect-1-2": waiting to lock monitor 0x7f2b68001d58 (object 0xc118c3c8, a org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader), which is held by "StartAndStopExecutor-connect-1-1" "StartAndStopExecutor-connect-1-1": waiting to lock monitor 0x7f2b68001eb8 (object 0xc510, a org.apache.kafka.connect.runtime.isolation.PluginClassLoader), which is held by "StartAndStopExecutor-connect-1-2" Java stack information for the threads listed above: === "StartAndStopExecutor-connect-1-2": at java.lang.ClassLoader.loadClass(ClassLoader.java:404) - waiting to lock <0xc118c3c8> (a org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:397) at java.lang.ClassLoader.loadClass(ClassLoader.java:411) - locked <0xc6a9e908> (a java.lang.Object) at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104) - locked <0xc6a9e908> (a java.lang.Object) - locked <0xc510> (a org.apache.kafka.connect.runtime.isolation.PluginClassLoader) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:719) at org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:311) at org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:215) at org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:209) at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:432) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1186) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:127) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1201) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1197) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) "StartAndStopExecutor-connect-1-1": at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:91) - waiting to lock <0xc510> (a org.apache.kafka.connect.runtime.isolation.PluginClassLoader) at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.loadClass(DelegatingClassLoader.java:394) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:719) at org.apache.kafka.connect.runtime.ConnectorConfig.enrich(ConnectorConfig.java:311) at org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:215) at org.apache.kafka.connect.runtime.ConnectorConfig.(ConnectorConfig.java:209) at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:251) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1229) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:127) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:1245) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:1241) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Thre
[GitHub] [kafka] guozhangwang commented on pull request #9377: MINOR: Move `RaftRequestHandler` to `tools` package
guozhangwang commented on pull request #9377: URL: https://github.com/apache/kafka/pull/9377#issuecomment-703906935 LGTM! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #9342: MINOR: Update doc for raft state metrics
guozhangwang merged pull request #9342: URL: https://github.com/apache/kafka/pull/9342 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on a change in pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered
bbejeck commented on a change in pull request #9237: URL: https://github.com/apache/kafka/pull/9237#discussion_r499908753 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java ## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(value = Parameterized.class) +@Category({IntegrationTest.class}) +public class StreamTableJoinTopologyOptimizationIntegrationTest { +private static final int NUM_BROKERS = 1; + +@ClassRule +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + +private String tableTopic; +private String inputTopic; +private String outputTopic; +private String applicationId; + +private Properties streamsConfiguration; + +@Rule +public TestName testName = new TestName(); + +@Parameterized.Parameter +public String topologyOptimization; + +@Parameterized.Parameters(name = "Optimization = {0}") +public static Collection topologyOptimization() { +return Arrays.asList(new String[][]{ +{StreamsConfig.OPTIMIZE}, +{StreamsConfig.NO_OPTIMIZATION} +}); +} + +@Before +public void before() throws InterruptedException { +streamsConfiguration = new Properties(); + +final String safeTestName = safeUniqueTestName(getClass(), testName); + +tableTopic = "table-topic" + safeTestName; +inputTopic = "stream-topic-" + safeTestName; +outputTopic = "output-topic-" + safeTestName; +applicationId = "app-" + safeTestName; + +CLUSTER.createTopic(inputTopic, 4, 1); +CLUSTER.createTopic(tableTopic, 2, 1); +CLUSTER.createTopic(outputTopic, 4, 1); + +streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); +streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); +streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_