[jira] [Created] (KAFKA-13592) Fix flaky test ControllerIntegrationTest.testTopicIdUpgradeAfterReassigningPartitions
David Jacot created KAFKA-13592: --- Summary: Fix flaky test ControllerIntegrationTest.testTopicIdUpgradeAfterReassigningPartitions Key: KAFKA-13592 URL: https://issues.apache.org/jira/browse/KAFKA-13592 Project: Kafka Issue Type: Bug Reporter: David Jacot {noformat} java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at kafka.controller.ControllerIntegrationTest.testTopicIdUpgradeAfterReassigningPartitions(ControllerIntegrationTest.scala:1239){noformat} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dajac opened a new pull request #11670: MINOR: Update year in NOTICE
dajac opened a new pull request #11670: URL: https://github.com/apache/kafka/pull/11670 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11670: MINOR: Update year in NOTICE
dajac commented on pull request #11670: URL: https://github.com/apache/kafka/pull/11670#issuecomment-1010783804 Thanks @rajinisivaram! Merging it without waiting for the build to complete as the build do not verify the NOTICE file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #11670: MINOR: Update year in NOTICE
dajac merged pull request #11670: URL: https://github.com/apache/kafka/pull/11670 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records
[ https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474389#comment-17474389 ] Aleksandr Sorokoumov commented on KAFKA-6502: - [~jadireddi] Are you actively working on this issue? If not, I would like to give it a try. > Kafka streams deserialization handler not committing offsets on error records > - > > Key: KAFKA-6502 > URL: https://issues.apache.org/jira/browse/KAFKA-6502 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Soby Chacko >Assignee: Jagadesh Adireddi >Priority: Minor > > See this StackOverflow issue: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler] > and this comment: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899] > I am trying to use the LogAndContinueExceptionHandler on deserialization. It > works fine when an error occurs by successfully logging and continuing. > However, on a continuous stream of errors, it seems like these messages are > not committed and on a restart of the application they reappear again. It is > more problematic if I try to send the messages in error to a DLQ. On a > restart, they are sent again to DLQ. As soon as I have a good record coming > in, it looks like the offset moves further and not seeing the already logged > messages again after a restart. > I reproduced this behavior by running the sample provided here: > [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java] > I changed the incoming value Serde to > {{Serdes.Integer().getClass().getName()}} to force a deserialization error on > input and reduced the commit interval to just 1 second. Also added the > following to the config. > {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, > LogAndContinueExceptionHandler.class);}}. > It looks like when deserialization exceptions occur, this flag is never set > to be true here: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228]. > It only becomes true once processing succeeds. That might be the reason why > commit is not happening even after I manually call processorContext#commit(). -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] twobeeb commented on pull request #11575: KAFKA-13511: Update TimestampConverter support unix epoch as millis, micros, and seconds
twobeeb commented on pull request #11575: URL: https://github.com/apache/kafka/pull/11575#issuecomment-1011011375 Hi @mimaison, I haven't received any update on the [KIP-808](https://lists.apache.org/thread/t51lmxjdt3k4y990s2c378529lwtt0q0) which I created as per your suggestion. Could you be of any help ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #11575: KAFKA-13511: Update TimestampConverter support unix epoch as millis, micros, and seconds
mimaison commented on pull request #11575: URL: https://github.com/apache/kafka/pull/11575#issuecomment-1011045546 Sorry @twobeeb, I'll try to take a look this afternoon. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so
[ https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474548#comment-17474548 ] Seungchan Ahn commented on KAFKA-13217: --- Hi [~ableegoldman] [~guozhang] 👋 I just opened [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group] for this issue. FYI > Reconsider skipping the LeaveGroup on close() or add an overload that does so > - > > Key: KAFKA-13217 > URL: https://issues.apache.org/jira/browse/KAFKA-13217 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Seungchan Ahn >Priority: Major > Labels: needs-kip, newbie, newbie++ > > In Kafka Streams, when an instance is shut down via the close() API, we > intentionally skip sending a LeaveGroup request. This is because often the > shutdown is not due to a scaling down event but instead some transient > closure, such as during a rolling bounce. In cases where the instance is > expected to start up again shortly after, we originally wanted to avoid that > member's tasks from being redistributed across the remaining group members > since this would disturb the stable assignment and could cause unnecessary > state migration and restoration. We also hoped > to limit the disruption to just a single rebalance, rather than forcing the > group to rebalance once when the member shuts down and then again when it > comes back up. So it's really an optimization for the case in which the > shutdown is temporary. > > That said, many of those optimizations are no longer necessary or at least > much less useful given recent features and improvements. For example > rebalances are now lightweight so skipping the 2nd rebalance is not as worth > optimizing for, and the new assignor will take into account the actual > underlying state for each task/partition assignment, rather than just the > previous assignment, so the assignment should be considerably more stable > across bounces and rolling restarts. > > Given that, it might be time to reconsider this optimization. Alternatively, > we could introduce another form of the close() API that forces the member to > leave the group, to be used in event of actual scale down rather than a > transient bounce. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so
[ https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474548#comment-17474548 ] Seungchan Ahn edited comment on KAFKA-13217 at 1/12/22, 2:02 PM: - Hi [~ableegoldman] [~guozhang] 👋 I just opened the [discussion thread|https://lists.apache.org/thread/ydh5j9qzgvxb5go8onmb44p3sjz8x9vt] of new [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group] for this issue. FYI was (Author: JIRAUSER283196): Hi [~ableegoldman] [~guozhang] 👋 I just opened [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group] for this issue. FYI > Reconsider skipping the LeaveGroup on close() or add an overload that does so > - > > Key: KAFKA-13217 > URL: https://issues.apache.org/jira/browse/KAFKA-13217 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Seungchan Ahn >Priority: Major > Labels: needs-kip, newbie, newbie++ > > In Kafka Streams, when an instance is shut down via the close() API, we > intentionally skip sending a LeaveGroup request. This is because often the > shutdown is not due to a scaling down event but instead some transient > closure, such as during a rolling bounce. In cases where the instance is > expected to start up again shortly after, we originally wanted to avoid that > member's tasks from being redistributed across the remaining group members > since this would disturb the stable assignment and could cause unnecessary > state migration and restoration. We also hoped > to limit the disruption to just a single rebalance, rather than forcing the > group to rebalance once when the member shuts down and then again when it > comes back up. So it's really an optimization for the case in which the > shutdown is temporary. > > That said, many of those optimizations are no longer necessary or at least > much less useful given recent features and improvements. For example > rebalances are now lightweight so skipping the 2nd rebalance is not as worth > optimizing for, and the new assignor will take into account the actual > underlying state for each task/partition assignment, rather than just the > previous assignment, so the assignment should be considerably more stable > across bounces and rolling restarts. > > Given that, it might be time to reconsider this optimization. Alternatively, > we could introduce another form of the close() API that forces the member to > leave the group, to be used in event of actual scale down rather than a > transient bounce. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so
[ https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474548#comment-17474548 ] Seungchan Ahn edited comment on KAFKA-13217 at 1/12/22, 2:04 PM: - Hi [~ableegoldman] [~guozhang] 👋 I just opened the [discussion thread|https://lists.apache.org/thread/ydh5j9qzgvxb5go8onmb44p3sjz8x9vt] of new [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group] for this issue. FYI _KIP-812: Introduce another form of the `KafkaStreams.close()` API that forces the member to leave the consumer group_ was (Author: JIRAUSER283196): Hi [~ableegoldman] [~guozhang] 👋 I just opened the [discussion thread|https://lists.apache.org/thread/ydh5j9qzgvxb5go8onmb44p3sjz8x9vt] of new [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group] for this issue. FYI > Reconsider skipping the LeaveGroup on close() or add an overload that does so > - > > Key: KAFKA-13217 > URL: https://issues.apache.org/jira/browse/KAFKA-13217 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Seungchan Ahn >Priority: Major > Labels: needs-kip, newbie, newbie++ > > In Kafka Streams, when an instance is shut down via the close() API, we > intentionally skip sending a LeaveGroup request. This is because often the > shutdown is not due to a scaling down event but instead some transient > closure, such as during a rolling bounce. In cases where the instance is > expected to start up again shortly after, we originally wanted to avoid that > member's tasks from being redistributed across the remaining group members > since this would disturb the stable assignment and could cause unnecessary > state migration and restoration. We also hoped > to limit the disruption to just a single rebalance, rather than forcing the > group to rebalance once when the member shuts down and then again when it > comes back up. So it's really an optimization for the case in which the > shutdown is temporary. > > That said, many of those optimizations are no longer necessary or at least > much less useful given recent features and improvements. For example > rebalances are now lightweight so skipping the 2nd rebalance is not as worth > optimizing for, and the new assignor will take into account the actual > underlying state for each task/partition assignment, rather than just the > previous assignment, so the assignment should be considerably more stable > across bounces and rolling restarts. > > Given that, it might be time to reconsider this optimization. Alternatively, > we could introduce another form of the close() API that forces the member to > leave the group, to be used in event of actual scale down rather than a > transient bounce. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dajac opened a new pull request #11671: KAFKA-13388; Kafka Producer nodes stuck in CHECKING_API_VERSIONS
dajac opened a new pull request #11671: URL: https://github.com/apache/kafka/pull/11671 At the moment, the `NetworkClient` will remain stuck in the `CHECKING_API_VERSIONS` state forever if the `Channel` does not become ready. To prevent this from happening, this patch changes the logic to transition to the `CHECKING_API_VERSIONS` only when the `ApiVersionsRequest` is queued to be sent out. With this, the connection will timeout if the `Channel` does not become ready within the connection setup timeout. Once the `ApiVersionsRequest` is queued up, the request timeout takes over. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11571: KAFKA-13496: add reason to LeaveGroupRequest
dajac commented on a change in pull request #11571: URL: https://github.com/apache/kafka/pull/11571#discussion_r783185731 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -3907,6 +3910,79 @@ public void testRemoveMembersFromGroup() throws Exception { } } +@Test +public void testRemoveMembersFromGroupReason() throws Exception { +final Cluster cluster = mockCluster(3, 0); +final Time time = new MockTime(); + +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster)) { + +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); +env.kafkaClient().prepareResponse(body -> { +if (!(body instanceof LeaveGroupRequest)) { +return false; +} +LeaveGroupRequestData leaveGroupRequest = ((LeaveGroupRequest) body).data(); + +return leaveGroupRequest.members().stream().allMatch( +member -> member.reason().equals(LEAVE_GROUP_REASON + ": testing remove members reason") +); +}, new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code()).setMembers( +Arrays.asList( +new MemberResponse().setGroupInstanceId("instance-1"), +new MemberResponse().setGroupInstanceId("instance-2") +)) +)); + +Collection membersToRemove = Arrays.asList(new MemberToRemove("instance-1"), new MemberToRemove("instance-2")); + +RemoveMembersFromConsumerGroupOptions options = new RemoveMembersFromConsumerGroupOptions(membersToRemove); +options.reason("testing remove members reason"); + +final RemoveMembersFromConsumerGroupResult result = env.adminClient().removeMembersFromConsumerGroup( +GROUP_ID, options); + +assertNull(result.all().get()); +} +} + +@Test +public void testRemoveMembersFromGroupDefaultReason() throws Exception { Review comment: The code duplication is a bit annoying here, don't you think? I wonder if we could define an helper method, say `testRemoveMembersFromGroup(String reason, String expectedReason)`. Then we could call it from within the two test cases: * `testRemoveMembersFromGroup("testing remove members reason", LEAVE_GROUP_REASON + ": testing remove members reason")` * `testRemoveMembersFromGroup(null, LEAVE_GROUP_REASON)` - passing `null` is somewhat equivalent to not setting the reason at all. Otherwise, we could pass the `RemoveMembersFromConsumerGroupOptions` to the helper method. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a change in pull request #11571: KAFKA-13496: add reason to LeaveGroupRequest
jeffkbkim commented on a change in pull request #11571: URL: https://github.com/apache/kafka/pull/11571#discussion_r783200432 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -3907,6 +3910,79 @@ public void testRemoveMembersFromGroup() throws Exception { } } +@Test +public void testRemoveMembersFromGroupReason() throws Exception { +final Cluster cluster = mockCluster(3, 0); +final Time time = new MockTime(); + +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster)) { + +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); +env.kafkaClient().prepareResponse(body -> { +if (!(body instanceof LeaveGroupRequest)) { +return false; +} +LeaveGroupRequestData leaveGroupRequest = ((LeaveGroupRequest) body).data(); + +return leaveGroupRequest.members().stream().allMatch( +member -> member.reason().equals(LEAVE_GROUP_REASON + ": testing remove members reason") +); +}, new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code()).setMembers( +Arrays.asList( +new MemberResponse().setGroupInstanceId("instance-1"), +new MemberResponse().setGroupInstanceId("instance-2") +)) +)); + +Collection membersToRemove = Arrays.asList(new MemberToRemove("instance-1"), new MemberToRemove("instance-2")); + +RemoveMembersFromConsumerGroupOptions options = new RemoveMembersFromConsumerGroupOptions(membersToRemove); +options.reason("testing remove members reason"); + +final RemoveMembersFromConsumerGroupResult result = env.adminClient().removeMembersFromConsumerGroup( +GROUP_ID, options); + +assertNull(result.all().get()); +} +} + +@Test +public void testRemoveMembersFromGroupDefaultReason() throws Exception { Review comment: you're totally right, will update -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gabrieljones commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
gabrieljones commented on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1011241406 confluent also has a fork of log4j 1 https://mvnrepository.com/artifact/io.confluent/confluent-log4j/1.2.17-cp6 The github repo seems to have disappeared though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison opened a new pull request #11672: KAFKA-13577: Replace easymock with mockito in kafka:core - part 1
mimaison opened a new pull request #11672: URL: https://github.com/apache/kafka/pull/11672 I've fully replaced easymock with mockito in core. As it's a pretty large change, I'm splitting it in a few PRs (2 or 3) so reviewing it is not a complete nightmare. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison opened a new pull request #11673: KAFKA-13577: Replace easymock with mockito in kafka:core - part 2
mimaison opened a new pull request #11673: URL: https://github.com/apache/kafka/pull/11673 Follow up from https://github.com/apache/kafka/pull/11672 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison opened a new pull request #11674: KAFKA-13577: Replace easymock with mockito in kafka:core - part 3
mimaison opened a new pull request #11674: URL: https://github.com/apache/kafka/pull/11674 Follow up from https://github.com/apache/kafka/pull/11672 and https://github.com/apache/kafka/pull/11673 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lmr3796 commented on pull request #11669: MINOR: Replace if/else with match in KafkaZkClient#getPartitionAssignmentForTopics
lmr3796 commented on pull request #11669: URL: https://github.com/apache/kafka/pull/11669#issuecomment-1011328979 Hey thanks for the review @ijuma . Since I don't have the write access, I'm wondering what would be the process of merging the patch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10173) BufferUnderflowException during Kafka Streams Upgrade
[ https://issues.apache.org/jira/browse/KAFKA-10173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-10173: --- Assignee: Matthias J. Sax (was: John Roesler) > BufferUnderflowException during Kafka Streams Upgrade > - > > Key: KAFKA-10173 > URL: https://issues.apache.org/jira/browse/KAFKA-10173 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Karsten Schnitter >Assignee: Matthias J. Sax >Priority: Blocker > Labels: suppress > Fix For: 2.6.0, 2.4.2, 2.5.1 > > > I migrated a Kafka Streams application from version 2.3.1 to 2.5.0. I > followed the steps described in the upgrade guide and set the property > {{migrate.from=2.3}}. On my dev system with just one running instance I got > the following exception: > {noformat} > stream-thread [0-StreamThread-2] Encountered the following error during > processing: > java.nio.BufferUnderflowException: null > at java.base/java.nio.HeapByteBuffer.get(Unknown Source) > at java.base/java.nio.ByteBuffer.get(Unknown Source) > at > org.apache.kafka.streams.state.internals.BufferValue.extractValue(BufferValue.java:94) > at > org.apache.kafka.streams.state.internals.BufferValue.deserialize(BufferValue.java:83) > at > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:368) > at > org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89) > at > org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:350) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:401) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > {noformat} > I figured out, that this problem only occurs for stores, where I use the > suppress feature. If I rename the changelog topics during the migration, the > problem will not occur. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (KAFKA-10173) BufferUnderflowException during Kafka Streams Upgrade
[ https://issues.apache.org/jira/browse/KAFKA-10173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-10173: --- Assignee: John Roesler (was: Matthias J. Sax) > BufferUnderflowException during Kafka Streams Upgrade > - > > Key: KAFKA-10173 > URL: https://issues.apache.org/jira/browse/KAFKA-10173 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Karsten Schnitter >Assignee: John Roesler >Priority: Blocker > Labels: suppress > Fix For: 2.6.0, 2.4.2, 2.5.1 > > > I migrated a Kafka Streams application from version 2.3.1 to 2.5.0. I > followed the steps described in the upgrade guide and set the property > {{migrate.from=2.3}}. On my dev system with just one running instance I got > the following exception: > {noformat} > stream-thread [0-StreamThread-2] Encountered the following error during > processing: > java.nio.BufferUnderflowException: null > at java.base/java.nio.HeapByteBuffer.get(Unknown Source) > at java.base/java.nio.ByteBuffer.get(Unknown Source) > at > org.apache.kafka.streams.state.internals.BufferValue.extractValue(BufferValue.java:94) > at > org.apache.kafka.streams.state.internals.BufferValue.deserialize(BufferValue.java:83) > at > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:368) > at > org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89) > at > org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:350) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:401) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > {noformat} > I figured out, that this problem only occurs for stores, where I use the > suppress feature. If I rename the changelog topics during the migration, the > problem will not occur. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13574) NotLeaderOrFollowerException thrown for a successful send
[ https://issues.apache.org/jira/browse/KAFKA-13574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474900#comment-17474900 ] Kyle Kingsbury commented on KAFKA-13574: Another case like this–I'm seeing transactions fail with `InvalidTxnStateException: The producer attempted a transactional operation in an invalid state.`, which *sounds* like it should be a definite failure, but... sometimes appears to actually commit. Is this error intended to be definite? Is this documented anywhere? > NotLeaderOrFollowerException thrown for a successful send > - > > Key: KAFKA-13574 > URL: https://issues.apache.org/jira/browse/KAFKA-13574 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0 > Environment: openjdk version "11.0.13" 2021-10-19 >Reporter: Kyle Kingsbury >Priority: Minor > Labels: error-handling > > With org.apache.kafka/kafka-clients 3.0.0, under rare circumstances involving > multiple node and network failures, I've observed a call to `producer.send()` > throw `NotLeaderOrFollowerException` for a message which later appears in > `consumer.poll()` return values. > I don't have a reliable repro case for this yet, but the case I hit involved > retries=1000, acks=all, and idempotence enabled. I suspect what might be > happening here is that an initial attempt to send the message makes it to the > server and is committed, but the acknowledgement is lost e.g. due to timeout; > the Kafka producer then automatically retries the send attempt, and on that > retry hits a NotLeaderOrFollowerException, which is thrown back to the > caller. If we interpret NotLeaderOrFollowerException as a definite failure, > then this would constitute an aborted read. > I've seen issues like this in a number of databases around client or > server-internal retry mechanisms, and I think the thing to do is: rather than > throwing the most *recent* error, throw the {*}most indefinite{*}. That way > clients know that their request may have actually succeeded, and they won't > (e.g.) attempt to re-submit a non-idempotent request again. > As a side note: is there... perhaps documentation on which errors in Kafka > are supposed to be definite vs indefinite? NotLeaderOrFollowerException is a > subclass of RetriableException, but it looks like RetriableException is more > about transient vs permanent errors than whether it's safe to retry. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474910#comment-17474910 ] Matthias J. Sax commented on KAFKA-13289: - {quote}However it seems the dropping of messages is based on "partition time", i.e. per partition? {quote} Yes. The state store tracks the "max timestamp seen" and applies the retention period accordingly, ie, older data will be dropped. {quote}As we don't have monotonically increasing timestamps across keys, that would mean we have an actual problem... {quote} Yes. This statement is different to your previous statement when you claimed to not have out-of-order data. You clearly have out-of-order data (out-of-order data is defined per partition, not per key). {quote}So a solution would perhaps be to increase the grace {quote} Correct. This will increase the store retention time accordingly and allow to accept out-of-order data. > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > --- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Matthew Sheppard >Priority: Minor > > When pushing bulk data through a kafka-steams app, I see it log the following > message many times... > {noformat} > WARN > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - > Skipping record for expired segment. > {noformat} > ...and data which I expect to have been joined through a leftJoin step > appears to be lost. > I've seen this in practice either when my application has been shut down for > a while and then is brought back up, or when I've used something like the > [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) > in an attempt to have the application reprocess past data. > I was able to reproduce this behaviour in isolation by generating 1000 > messages to two topics spaced an hour apart (with the original timestamps in > order), then having kafka streams select a key for them and try to leftJoin > the two rekeyed streams. > Self contained source code for that reproduction is available at > https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java > The actual kafka-streams topology in there looks like this. > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final KStream leftStream = > builder.stream(leftTopic); > final KStream rightStream = > builder.stream(rightTopic); > final KStream rekeyedLeftStream = leftStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > final KStream rekeyedRightStream = rightStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); > final KStream joined = rekeyedLeftStream.leftJoin( > rekeyedRightStream, > (left, right) -> left + "/" + right, > joinWindow > ); > {code} > ...and the eventual output I produce looks like this... > {code} > ... > 523 [523,left/null] > 524 [524,left/null, 524,left/524,right] > 525 [525,left/525,right] > 526 [526,left/null] > 527 [527,left/null] > 528 [528,left/528,right] > 529 [529,left/null] > 530 [530,left/null] > 531 [531,left/null, 531,left/531,right] > 532 [532,left/null] > 533 [533,left/null] > 534 [534,left/null, 534,left/534,right] > 535 [535,left/null] > 536 [536,left/null] > 537 [537,left/null, 537,left/537,right] > 538 [538,left/null] > 539 [539,left/null] > 540 [540,left/null] > 541 [541,left/null] > 542 [542,left/null] > 543 [543,left/null] > ... > {code} > ...where as, given the input data, I expect to see every row end with the two > values joined, rather than the right value being null. > Note that I understand it's expected that we initially get the left/null > values for many values since that's the expected semantics of kafka-streams > left join, at least until > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious > I've noticed that if I set a very large grace value on the join window the > problem is solved, but since the input I provide is not out of order I did > not expect to need to do that, and I'm weary of the resource requirements > doing so in practice on an application with a lot of volume. > My suspicion is that something is happen
[GitHub] [kafka] wcarlson5 opened a new pull request #11675: KAFKA-12648: POC for committing tasks on error
wcarlson5 opened a new pull request #11675: URL: https://github.com/apache/kafka/pull/11675 If a task has an exception while processing the it commits any tasks that were successfully processed before entering the error handling logic. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13593) ThrottledChannelReaper slows broker shutdown by multiple seconds
Steven Schlansker created KAFKA-13593: - Summary: ThrottledChannelReaper slows broker shutdown by multiple seconds Key: KAFKA-13593 URL: https://issues.apache.org/jira/browse/KAFKA-13593 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 3.0.0 Reporter: Steven Schlansker We run an embedded KRaft broker in integration tests, to test that our Producer / Consumers are all hooked up and verify our overall pipeline. While trying to deliver CI speed improvements, we noticed that the majority of time for a small test is actually spent in Kafka broker shutdown. From the logs, it looks like the ClientQuotaManager has multiple ThrottledChannelReaper threads and each of them takes up to a second to shutdown. {code:java} 2022-01-12T15:26:31.932Z [main] INFO kafka.log.LogManager - Shutdown complete. 2022-01-12T15:26:31.934Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Fetch]: Shutting down 2022-01-12T15:26:32.311Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Fetch]: Shutdown completed 2022-01-12T15:26:32.311Z [ThrottledChannelReaper-Fetch] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Fetch]: Stopped 2022-01-12T15:26:32.311Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Produce]: Shutting down 2022-01-12T15:26:33.312Z [ThrottledChannelReaper-Produce] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Produce]: Stopped 2022-01-12T15:26:33.312Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Produce]: Shutdown completed 2022-01-12T15:26:33.312Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Request]: Shutting down 2022-01-12T15:26:34.315Z [ThrottledChannelReaper-Request] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Request]: Stopped 2022-01-12T15:26:34.315Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Request]: Shutdown completed 2022-01-12T15:26:34.315Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-ControllerMutation]: Shutting down 2022-01-12T15:26:35.317Z [main] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-ControllerMutation]: Shutdown completed 2022-01-12T15:26:35.317Z [ThrottledChannelReaper-ControllerMutation] INFO k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-ControllerMutation]: Stopped{code} Inspecting the code, the ThrottledChannelReaper threads are marked as not interruptible, so ShutdownableThread does not interrupt the worker on shutdown. Unfortunately, the doWork method polls with a 1 second timeout. So you wait up to 1s for every type of quota, in this case for a total of almost 4s. While this is not a problem for production Kafka brokers, where instances are expected to stay up for months, in tests that expect to spin up and down it is easily noticed and adds real overhead to CI. Suggested possible remediations: * Allow ThrottledChannelReaper to be interrupted by ShutdownableThread * ThrottledChannelReaper overrides initiateShutdown and after calling {{super.initiateShutdown}} then enqueues a {{null}} element on the delayQueue to force worker thread to notice shutdown state * Reduce 1s poll timeout to something small (carries overhead penalty for all users though, so this is less desirable), or make it configurable so we can set it to e.g. 10ms in unit tests -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] hachikuji commented on a change in pull request #11666: KAFKA-13591; Fix flaky test `ControllerIntegrationTest.testTopicIdCreatedOnUpgrade`
hachikuji commented on a change in pull request #11666: URL: https://github.com/apache/kafka/pull/11666#discussion_r783575296 ## File path: core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala ## @@ -1127,27 +1127,29 @@ class ControllerIntegrationTest extends QuorumTestHarness { TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch, "failed to get expected partition state upon topic creation") -val topicIdAfterCreate = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic()) -assertEquals(None, topicIdAfterCreate) -val emptyTopicId = controller.controllerContext.topicIds.get("t") -assertEquals(None, emptyTopicId) +assertEquals(None, zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic)) +assertEquals(None, controller.controllerContext.topicIds.get(tp.topic)) servers(controllerId).shutdown() servers(controllerId).awaitShutdown() servers = makeServers(1) TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed to elect a controller") -val topicIdAfterUpgrade = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic()) -assertNotEquals(emptyTopicId, topicIdAfterUpgrade) + +var topicIdAfterUpgrade = Option.empty[Uuid] +TestUtils.waitUntilTrue(() => { Review comment: Might be able to use `computeUntilTrue` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13549) Add "delete interval" config
[ https://issues.apache.org/jira/browse/KAFKA-13549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13549: Description: KIP-811: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-811%3A+Add+config+min.repartition.purge.interval.ms+to+Kafka+Streams] Kafka Streams uses "delete record" requests to aggressively purge data from repartition topics. Those request are sent each time we commit. For at-least-once with a default commit interval of 30 seconds, this works fine. However, for exactly-once with a default commit interval of 100ms, it's very aggressive. The main issue is broker side, because the broker logs every "delete record" request, and thus broker logs are spammed if EOS is enabled. We should consider to add a new config (eg `delete.record.interval.ms` or similar) to have a dedicated config for "delete record" requests, to decouple it from the commit interval config and allow to purge data less aggressively, even if the commit interval is small to avoid the broker side log spamming. was: KIP-811: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-811%3A+Add+separate+delete.interval.ms+to+Kafka+Streams] Kafka Streams uses "delete record" requests to aggressively purge data from repartition topics. Those request are sent each time we commit. For at-least-once with a default commit interval of 30 seconds, this works fine. However, for exactly-once with a default commit interval of 100ms, it's very aggressive. The main issue is broker side, because the broker logs every "delete record" request, and thus broker logs are spammed if EOS is enabled. We should consider to add a new config (eg `delete.record.interval.ms` or similar) to have a dedicated config for "delete record" requests, to decouple it from the commit interval config and allow to purge data less aggressively, even if the commit interval is small to avoid the broker side log spamming. > Add "delete interval" config > > > Key: KAFKA-13549 > URL: https://issues.apache.org/jira/browse/KAFKA-13549 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Nicholas Telford >Priority: Major > Labels: kip > > KIP-811: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-811%3A+Add+config+min.repartition.purge.interval.ms+to+Kafka+Streams] > Kafka Streams uses "delete record" requests to aggressively purge data from > repartition topics. Those request are sent each time we commit. > For at-least-once with a default commit interval of 30 seconds, this works > fine. However, for exactly-once with a default commit interval of 100ms, it's > very aggressive. The main issue is broker side, because the broker logs every > "delete record" request, and thus broker logs are spammed if EOS is enabled. > We should consider to add a new config (eg `delete.record.interval.ms` or > similar) to have a dedicated config for "delete record" requests, to decouple > it from the commit interval config and allow to purge data less aggressively, > even if the commit interval is small to avoid the broker side log spamming. -- This message was sent by Atlassian Jira (v8.20.1#820001)