[jira] [Created] (KAFKA-13592) Fix flaky test ControllerIntegrationTest.testTopicIdUpgradeAfterReassigningPartitions

2022-01-12 Thread David Jacot (Jira)
David Jacot created KAFKA-13592:
---

 Summary: Fix flaky test 
ControllerIntegrationTest.testTopicIdUpgradeAfterReassigningPartitions
 Key: KAFKA-13592
 URL: https://issues.apache.org/jira/browse/KAFKA-13592
 Project: Kafka
  Issue Type: Bug
Reporter: David Jacot


{noformat}
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:529)
at scala.None$.get(Option.scala:527)
at 
kafka.controller.ControllerIntegrationTest.testTopicIdUpgradeAfterReassigningPartitions(ControllerIntegrationTest.scala:1239){noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dajac opened a new pull request #11670: MINOR: Update year in NOTICE

2022-01-12 Thread GitBox


dajac opened a new pull request #11670:
URL: https://github.com/apache/kafka/pull/11670


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11670: MINOR: Update year in NOTICE

2022-01-12 Thread GitBox


dajac commented on pull request #11670:
URL: https://github.com/apache/kafka/pull/11670#issuecomment-1010783804


   Thanks @rajinisivaram! Merging it without waiting for the build to complete 
as the build do not verify the NOTICE file.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #11670: MINOR: Update year in NOTICE

2022-01-12 Thread GitBox


dajac merged pull request #11670:
URL: https://github.com/apache/kafka/pull/11670


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records

2022-01-12 Thread Aleksandr Sorokoumov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474389#comment-17474389
 ] 

Aleksandr Sorokoumov commented on KAFKA-6502:
-

[~jadireddi] Are you actively working on this issue? If not, I would like to 
give it a try.

> Kafka streams deserialization handler not committing offsets on error records
> -
>
> Key: KAFKA-6502
> URL: https://issues.apache.org/jira/browse/KAFKA-6502
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Soby Chacko
>Assignee: Jagadesh Adireddi
>Priority: Minor
>
> See this StackOverflow issue: 
> [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler]
> and this comment: 
> [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899]
>  I am trying to use the LogAndContinueExceptionHandler on deserialization. It 
> works fine when an error occurs by successfully logging and continuing. 
> However, on a continuous stream of errors, it seems like these messages are 
> not committed and on a restart of the application they reappear again.  It is 
> more problematic if I try to send the messages in error to a DLQ. On a 
> restart, they are sent again to DLQ. As soon as I have a good record coming 
> in, it looks like the offset moves further and not seeing the already logged 
> messages again after a restart. 
> I reproduced this behavior by running the sample provided here: 
> [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java]
> I changed the incoming value Serde to 
> {{Serdes.Integer().getClass().getName()}} to force a deserialization error on 
> input and reduced the commit interval to just 1 second. Also added the 
> following to the config.
> {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
>  LogAndContinueExceptionHandler.class);}}.
>  It looks like when deserialization exceptions occur, this flag is never set 
> to be true here: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228].
>  It only becomes true once processing succeeds. That might be the reason why 
> commit is not happening even after I manually call processorContext#commit().



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] twobeeb commented on pull request #11575: KAFKA-13511: Update TimestampConverter support unix epoch as millis, micros, and seconds

2022-01-12 Thread GitBox


twobeeb commented on pull request #11575:
URL: https://github.com/apache/kafka/pull/11575#issuecomment-1011011375


   Hi @mimaison, 
   I haven't received any update on the 
[KIP-808](https://lists.apache.org/thread/t51lmxjdt3k4y990s2c378529lwtt0q0) 
which I created as per your suggestion.
   
   Could you be of any help ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #11575: KAFKA-13511: Update TimestampConverter support unix epoch as millis, micros, and seconds

2022-01-12 Thread GitBox


mimaison commented on pull request #11575:
URL: https://github.com/apache/kafka/pull/11575#issuecomment-1011045546


   Sorry @twobeeb, I'll try to take a look this afternoon. Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so

2022-01-12 Thread Seungchan Ahn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474548#comment-17474548
 ] 

Seungchan Ahn commented on KAFKA-13217:
---

Hi [~ableegoldman] [~guozhang] 👋

I just opened 
[KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group]
 for this issue. FYI

> Reconsider skipping the LeaveGroup on close() or add an overload that does so
> -
>
> Key: KAFKA-13217
> URL: https://issues.apache.org/jira/browse/KAFKA-13217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Seungchan Ahn
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> In Kafka Streams, when an instance is shut down via the close() API, we 
> intentionally skip sending a LeaveGroup request. This is because often the 
> shutdown is not due to a scaling down event but instead some transient 
> closure, such as during a rolling bounce. In cases where the instance is 
> expected to start up again shortly after, we originally wanted to avoid that 
> member's tasks from being redistributed across the remaining group members 
> since this would disturb the stable assignment and could cause unnecessary 
> state migration and restoration. We also hoped
> to limit the disruption to just a single rebalance, rather than forcing the 
> group to rebalance once when the member shuts down and then again when it 
> comes back up. So it's really an optimization  for the case in which the 
> shutdown is temporary.
>  
> That said, many of those optimizations are no longer necessary or at least 
> much less useful given recent features and improvements. For example 
> rebalances are now lightweight so skipping the 2nd rebalance is not as worth 
> optimizing for, and the new assignor will take into account the actual 
> underlying state for each task/partition assignment, rather than just the 
> previous assignment, so the assignment should be considerably more stable 
> across bounces and rolling restarts. 
>  
> Given that, it might be time to reconsider this optimization. Alternatively, 
> we could introduce another form of the close() API that forces the member to 
> leave the group, to be used in event of actual scale down rather than a 
> transient bounce.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so

2022-01-12 Thread Seungchan Ahn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474548#comment-17474548
 ] 

Seungchan Ahn edited comment on KAFKA-13217 at 1/12/22, 2:02 PM:
-

Hi [~ableegoldman] [~guozhang] 👋

I just opened the [discussion 
thread|https://lists.apache.org/thread/ydh5j9qzgvxb5go8onmb44p3sjz8x9vt] of new 
[KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group]
 for this issue. FYI


was (Author: JIRAUSER283196):
Hi [~ableegoldman] [~guozhang] 👋

I just opened 
[KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group]
 for this issue. FYI

> Reconsider skipping the LeaveGroup on close() or add an overload that does so
> -
>
> Key: KAFKA-13217
> URL: https://issues.apache.org/jira/browse/KAFKA-13217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Seungchan Ahn
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> In Kafka Streams, when an instance is shut down via the close() API, we 
> intentionally skip sending a LeaveGroup request. This is because often the 
> shutdown is not due to a scaling down event but instead some transient 
> closure, such as during a rolling bounce. In cases where the instance is 
> expected to start up again shortly after, we originally wanted to avoid that 
> member's tasks from being redistributed across the remaining group members 
> since this would disturb the stable assignment and could cause unnecessary 
> state migration and restoration. We also hoped
> to limit the disruption to just a single rebalance, rather than forcing the 
> group to rebalance once when the member shuts down and then again when it 
> comes back up. So it's really an optimization  for the case in which the 
> shutdown is temporary.
>  
> That said, many of those optimizations are no longer necessary or at least 
> much less useful given recent features and improvements. For example 
> rebalances are now lightweight so skipping the 2nd rebalance is not as worth 
> optimizing for, and the new assignor will take into account the actual 
> underlying state for each task/partition assignment, rather than just the 
> previous assignment, so the assignment should be considerably more stable 
> across bounces and rolling restarts. 
>  
> Given that, it might be time to reconsider this optimization. Alternatively, 
> we could introduce another form of the close() API that forces the member to 
> leave the group, to be used in event of actual scale down rather than a 
> transient bounce.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so

2022-01-12 Thread Seungchan Ahn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474548#comment-17474548
 ] 

Seungchan Ahn edited comment on KAFKA-13217 at 1/12/22, 2:04 PM:
-

Hi [~ableegoldman] [~guozhang] 👋

I just opened the [discussion 
thread|https://lists.apache.org/thread/ydh5j9qzgvxb5go8onmb44p3sjz8x9vt] of new 
[KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group]
 for this issue. FYI

_KIP-812: Introduce another form of the `KafkaStreams.close()` API that forces 
the member to leave the consumer group_


was (Author: JIRAUSER283196):
Hi [~ableegoldman] [~guozhang] 👋

I just opened the [discussion 
thread|https://lists.apache.org/thread/ydh5j9qzgvxb5go8onmb44p3sjz8x9vt] of new 
[KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group]
 for this issue. FYI

> Reconsider skipping the LeaveGroup on close() or add an overload that does so
> -
>
> Key: KAFKA-13217
> URL: https://issues.apache.org/jira/browse/KAFKA-13217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Seungchan Ahn
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> In Kafka Streams, when an instance is shut down via the close() API, we 
> intentionally skip sending a LeaveGroup request. This is because often the 
> shutdown is not due to a scaling down event but instead some transient 
> closure, such as during a rolling bounce. In cases where the instance is 
> expected to start up again shortly after, we originally wanted to avoid that 
> member's tasks from being redistributed across the remaining group members 
> since this would disturb the stable assignment and could cause unnecessary 
> state migration and restoration. We also hoped
> to limit the disruption to just a single rebalance, rather than forcing the 
> group to rebalance once when the member shuts down and then again when it 
> comes back up. So it's really an optimization  for the case in which the 
> shutdown is temporary.
>  
> That said, many of those optimizations are no longer necessary or at least 
> much less useful given recent features and improvements. For example 
> rebalances are now lightweight so skipping the 2nd rebalance is not as worth 
> optimizing for, and the new assignor will take into account the actual 
> underlying state for each task/partition assignment, rather than just the 
> previous assignment, so the assignment should be considerably more stable 
> across bounces and rolling restarts. 
>  
> Given that, it might be time to reconsider this optimization. Alternatively, 
> we could introduce another form of the close() API that forces the member to 
> leave the group, to be used in event of actual scale down rather than a 
> transient bounce.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dajac opened a new pull request #11671: KAFKA-13388; Kafka Producer nodes stuck in CHECKING_API_VERSIONS

2022-01-12 Thread GitBox


dajac opened a new pull request #11671:
URL: https://github.com/apache/kafka/pull/11671


   At the moment, the `NetworkClient` will remain stuck in the 
`CHECKING_API_VERSIONS` state forever if the `Channel` does not become ready. 
To prevent this from happening, this patch changes the logic to transition to 
the `CHECKING_API_VERSIONS` only when the `ApiVersionsRequest` is queued to be 
sent out. With this, the connection will timeout if the `Channel` does not 
become ready within the connection setup timeout. Once the `ApiVersionsRequest` 
is queued up, the request timeout takes over.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11571: KAFKA-13496: add reason to LeaveGroupRequest

2022-01-12 Thread GitBox


dajac commented on a change in pull request #11571:
URL: https://github.com/apache/kafka/pull/11571#discussion_r783185731



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -3907,6 +3910,79 @@ public void testRemoveMembersFromGroup() throws 
Exception {
 }
 }
 
+@Test
+public void testRemoveMembersFromGroupReason() throws Exception {
+final Cluster cluster = mockCluster(3, 0);
+final Time time = new MockTime();
+
+try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster)) {
+
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+env.kafkaClient().prepareResponse(body -> {
+if (!(body instanceof LeaveGroupRequest)) {
+return false;
+}
+LeaveGroupRequestData leaveGroupRequest = ((LeaveGroupRequest) 
body).data();
+
+return leaveGroupRequest.members().stream().allMatch(
+member -> member.reason().equals(LEAVE_GROUP_REASON + ": 
testing remove members reason")
+);
+}, new LeaveGroupResponse(new 
LeaveGroupResponseData().setErrorCode(Errors.NONE.code()).setMembers(
+Arrays.asList(
+new MemberResponse().setGroupInstanceId("instance-1"),
+new MemberResponse().setGroupInstanceId("instance-2")
+))
+));
+
+Collection membersToRemove = Arrays.asList(new 
MemberToRemove("instance-1"), new MemberToRemove("instance-2"));
+
+RemoveMembersFromConsumerGroupOptions options = new 
RemoveMembersFromConsumerGroupOptions(membersToRemove);
+options.reason("testing remove members reason");
+
+final RemoveMembersFromConsumerGroupResult result = 
env.adminClient().removeMembersFromConsumerGroup(
+GROUP_ID, options);
+
+assertNull(result.all().get());
+}
+}
+
+@Test
+public void testRemoveMembersFromGroupDefaultReason() throws Exception {

Review comment:
   The code duplication is a bit annoying here, don't you think? I wonder 
if we could define an helper method, say `testRemoveMembersFromGroup(String 
reason, String expectedReason)`. Then we could call it from within the two test 
cases:
   * `testRemoveMembersFromGroup("testing remove members reason", 
LEAVE_GROUP_REASON + ": testing remove members reason")`
   * `testRemoveMembersFromGroup(null, LEAVE_GROUP_REASON)` - passing `null` is 
somewhat equivalent to not setting the reason at all. Otherwise, we could pass 
the `RemoveMembersFromConsumerGroupOptions` to the helper method.
   
   What do you think?
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeffkbkim commented on a change in pull request #11571: KAFKA-13496: add reason to LeaveGroupRequest

2022-01-12 Thread GitBox


jeffkbkim commented on a change in pull request #11571:
URL: https://github.com/apache/kafka/pull/11571#discussion_r783200432



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -3907,6 +3910,79 @@ public void testRemoveMembersFromGroup() throws 
Exception {
 }
 }
 
+@Test
+public void testRemoveMembersFromGroupReason() throws Exception {
+final Cluster cluster = mockCluster(3, 0);
+final Time time = new MockTime();
+
+try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster)) {
+
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+env.kafkaClient().prepareResponse(body -> {
+if (!(body instanceof LeaveGroupRequest)) {
+return false;
+}
+LeaveGroupRequestData leaveGroupRequest = ((LeaveGroupRequest) 
body).data();
+
+return leaveGroupRequest.members().stream().allMatch(
+member -> member.reason().equals(LEAVE_GROUP_REASON + ": 
testing remove members reason")
+);
+}, new LeaveGroupResponse(new 
LeaveGroupResponseData().setErrorCode(Errors.NONE.code()).setMembers(
+Arrays.asList(
+new MemberResponse().setGroupInstanceId("instance-1"),
+new MemberResponse().setGroupInstanceId("instance-2")
+))
+));
+
+Collection membersToRemove = Arrays.asList(new 
MemberToRemove("instance-1"), new MemberToRemove("instance-2"));
+
+RemoveMembersFromConsumerGroupOptions options = new 
RemoveMembersFromConsumerGroupOptions(membersToRemove);
+options.reason("testing remove members reason");
+
+final RemoveMembersFromConsumerGroupResult result = 
env.adminClient().removeMembersFromConsumerGroup(
+GROUP_ID, options);
+
+assertNull(result.all().get());
+}
+}
+
+@Test
+public void testRemoveMembersFromGroupDefaultReason() throws Exception {

Review comment:
   you're totally right, will update




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gabrieljones commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-01-12 Thread GitBox


gabrieljones commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1011241406


   confluent also has a fork of log4j 1
   https://mvnrepository.com/artifact/io.confluent/confluent-log4j/1.2.17-cp6
   The github repo seems to have disappeared though.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison opened a new pull request #11672: KAFKA-13577: Replace easymock with mockito in kafka:core - part 1

2022-01-12 Thread GitBox


mimaison opened a new pull request #11672:
URL: https://github.com/apache/kafka/pull/11672


   I've fully replaced easymock with mockito in core. As it's a pretty large 
change, I'm splitting it in a few PRs (2 or 3) so reviewing it is not a 
complete nightmare.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison opened a new pull request #11673: KAFKA-13577: Replace easymock with mockito in kafka:core - part 2

2022-01-12 Thread GitBox


mimaison opened a new pull request #11673:
URL: https://github.com/apache/kafka/pull/11673


   Follow up from https://github.com/apache/kafka/pull/11672
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison opened a new pull request #11674: KAFKA-13577: Replace easymock with mockito in kafka:core - part 3

2022-01-12 Thread GitBox


mimaison opened a new pull request #11674:
URL: https://github.com/apache/kafka/pull/11674


   Follow up from https://github.com/apache/kafka/pull/11672 and 
https://github.com/apache/kafka/pull/11673
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lmr3796 commented on pull request #11669: MINOR: Replace if/else with match in KafkaZkClient#getPartitionAssignmentForTopics

2022-01-12 Thread GitBox


lmr3796 commented on pull request #11669:
URL: https://github.com/apache/kafka/pull/11669#issuecomment-1011328979


   Hey thanks for the review @ijuma .
   Since I don't have the write access, I'm wondering what would be the process 
of merging the patch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10173) BufferUnderflowException during Kafka Streams Upgrade

2022-01-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-10173:
---

Assignee: Matthias J. Sax  (was: John Roesler)

> BufferUnderflowException during Kafka Streams Upgrade
> -
>
> Key: KAFKA-10173
> URL: https://issues.apache.org/jira/browse/KAFKA-10173
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Karsten Schnitter
>Assignee: Matthias J. Sax
>Priority: Blocker
>  Labels: suppress
> Fix For: 2.6.0, 2.4.2, 2.5.1
>
>
> I migrated a Kafka Streams application from version 2.3.1 to 2.5.0. I 
> followed the steps described in the upgrade guide and set the property 
> {{migrate.from=2.3}}. On my dev system with just one running instance I got 
> the following exception:
> {noformat}
> stream-thread [0-StreamThread-2] Encountered the following error during 
> processing:
> java.nio.BufferUnderflowException: null
>   at java.base/java.nio.HeapByteBuffer.get(Unknown Source)
>   at java.base/java.nio.ByteBuffer.get(Unknown Source)
>   at 
> org.apache.kafka.streams.state.internals.BufferValue.extractValue(BufferValue.java:94)
>   at 
> org.apache.kafka.streams.state.internals.BufferValue.deserialize(BufferValue.java:83)
>   at 
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:368)
>   at 
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
>   at 
> org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92)
>   at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:350)
>   at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:401)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> {noformat}
> I figured out, that this problem only occurs for stores, where I use the 
> suppress feature. If I rename the changelog topics during the migration, the 
> problem will not occur. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-10173) BufferUnderflowException during Kafka Streams Upgrade

2022-01-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-10173:
---

Assignee: John Roesler  (was: Matthias J. Sax)

> BufferUnderflowException during Kafka Streams Upgrade
> -
>
> Key: KAFKA-10173
> URL: https://issues.apache.org/jira/browse/KAFKA-10173
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Karsten Schnitter
>Assignee: John Roesler
>Priority: Blocker
>  Labels: suppress
> Fix For: 2.6.0, 2.4.2, 2.5.1
>
>
> I migrated a Kafka Streams application from version 2.3.1 to 2.5.0. I 
> followed the steps described in the upgrade guide and set the property 
> {{migrate.from=2.3}}. On my dev system with just one running instance I got 
> the following exception:
> {noformat}
> stream-thread [0-StreamThread-2] Encountered the following error during 
> processing:
> java.nio.BufferUnderflowException: null
>   at java.base/java.nio.HeapByteBuffer.get(Unknown Source)
>   at java.base/java.nio.ByteBuffer.get(Unknown Source)
>   at 
> org.apache.kafka.streams.state.internals.BufferValue.extractValue(BufferValue.java:94)
>   at 
> org.apache.kafka.streams.state.internals.BufferValue.deserialize(BufferValue.java:83)
>   at 
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:368)
>   at 
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
>   at 
> org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92)
>   at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:350)
>   at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:401)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> {noformat}
> I figured out, that this problem only occurs for stores, where I use the 
> suppress feature. If I rename the changelog topics during the migration, the 
> problem will not occur. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13574) NotLeaderOrFollowerException thrown for a successful send

2022-01-12 Thread Kyle Kingsbury (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474900#comment-17474900
 ] 

Kyle Kingsbury commented on KAFKA-13574:


Another case like this–I'm seeing transactions fail with 
`InvalidTxnStateException: The producer attempted a transactional operation in 
an invalid state.`, which *sounds* like it should be a definite failure, but... 
sometimes appears to actually commit. Is this error intended to be definite? Is 
this documented anywhere?

> NotLeaderOrFollowerException thrown for a successful send
> -
>
> Key: KAFKA-13574
> URL: https://issues.apache.org/jira/browse/KAFKA-13574
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
> Environment: openjdk version "11.0.13" 2021-10-19
>Reporter: Kyle Kingsbury
>Priority: Minor
>  Labels: error-handling
>
> With org.apache.kafka/kafka-clients 3.0.0, under rare circumstances involving 
> multiple node and network failures, I've observed a call to `producer.send()` 
> throw `NotLeaderOrFollowerException` for a message which later appears in 
> `consumer.poll()` return values.
> I don't have a reliable repro case for this yet, but the case I hit involved 
> retries=1000, acks=all, and idempotence enabled. I suspect what might be 
> happening here is that an initial attempt to send the message makes it to the 
> server and is committed, but the acknowledgement is lost e.g. due to timeout; 
> the Kafka producer then automatically retries the send attempt, and on that 
> retry hits a NotLeaderOrFollowerException, which is thrown back to the 
> caller. If we interpret NotLeaderOrFollowerException as a definite failure, 
> then this would constitute an aborted read.
> I've seen issues like this in a number of databases around client or 
> server-internal retry mechanisms, and I think the thing to do is: rather than 
> throwing the most *recent* error, throw the {*}most indefinite{*}. That way 
> clients know that their request may have actually succeeded, and they won't 
> (e.g.) attempt to re-submit a non-idempotent request again.
> As a side note: is there... perhaps documentation on which errors in Kafka 
> are supposed to be definite vs indefinite? NotLeaderOrFollowerException is a 
> subclass of RetriableException, but it looks like RetriableException is more 
> about transient vs permanent errors than whether it's safe to retry.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2022-01-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474910#comment-17474910
 ] 

Matthias J. Sax commented on KAFKA-13289:
-

{quote}However it seems the dropping of messages is based on "partition time", 
i.e. per partition?
{quote}
Yes. The state store tracks the "max timestamp seen" and applies the retention 
period accordingly, ie, older data will be dropped.
{quote}As we don't have monotonically increasing timestamps across keys, that 
would mean we have an actual problem...
{quote}
Yes. This statement is different to your previous statement when you claimed to 
not have out-of-order data. You clearly have out-of-order data (out-of-order 
data is defined per partition, not per key).
{quote}So a solution would perhaps be to increase the grace
{quote}
Correct. This will increase the store retention time accordingly and allow to 
accept out-of-order data.

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> ---
>
> Key: KAFKA-13289
> URL: https://issues.apache.org/jira/browse/KAFKA-13289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Matthew Sheppard
>Priority: Minor
>
> When pushing bulk data through a kafka-steams app, I see it log the following 
> message many times...
> {noformat}
> WARN 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
> Skipping record for expired segment.
> {noformat}
> ...and data which I expect to have been joined through a leftJoin step 
> appears to be lost.
> I've seen this in practice either when my application has been shut down for 
> a while and then is brought back up, or when I've used something like the 
> [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
>  in an attempt to have the application reprocess past data.
> I was able to reproduce this behaviour in isolation by generating 1000 
> messages to two topics spaced an hour apart (with the original timestamps in 
> order), then having kafka streams select a key for them and try to leftJoin 
> the two rekeyed streams.
> Self contained source code for that reproduction is available at 
> https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
> The actual kafka-streams topology in there looks like this.
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream leftStream = 
> builder.stream(leftTopic);
> final KStream rightStream = 
> builder.stream(rightTopic);
> final KStream rekeyedLeftStream = leftStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> final KStream rekeyedRightStream = rightStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
> final KStream joined = rekeyedLeftStream.leftJoin(
> rekeyedRightStream,
> (left, right) -> left + "/" + right,
> joinWindow
> );
> {code}
> ...and the eventual output I produce looks like this...
> {code}
> ...
> 523 [523,left/null]
> 524 [524,left/null, 524,left/524,right]
> 525 [525,left/525,right]
> 526 [526,left/null]
> 527 [527,left/null]
> 528 [528,left/528,right]
> 529 [529,left/null]
> 530 [530,left/null]
> 531 [531,left/null, 531,left/531,right]
> 532 [532,left/null]
> 533 [533,left/null]
> 534 [534,left/null, 534,left/534,right]
> 535 [535,left/null]
> 536 [536,left/null]
> 537 [537,left/null, 537,left/537,right]
> 538 [538,left/null]
> 539 [539,left/null]
> 540 [540,left/null]
> 541 [541,left/null]
> 542 [542,left/null]
> 543 [543,left/null]
> ...
> {code}
> ...where as, given the input data, I expect to see every row end with the two 
> values joined, rather than the right value being null.
> Note that I understand it's expected that we initially get the left/null 
> values for many values since that's the expected semantics of kafka-streams 
> left join, at least until 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious
> I've noticed that if I set a very large grace value on the join window the 
> problem is solved, but since the input I provide is not out of order I did 
> not expect to need to do that, and I'm weary of the resource requirements 
> doing so in practice on an application with a lot of volume.
> My suspicion is that something is happen

[GitHub] [kafka] wcarlson5 opened a new pull request #11675: KAFKA-12648: POC for committing tasks on error

2022-01-12 Thread GitBox


wcarlson5 opened a new pull request #11675:
URL: https://github.com/apache/kafka/pull/11675


   If a task has an exception while processing the it commits any tasks that 
were successfully processed before entering the error handling logic.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13593) ThrottledChannelReaper slows broker shutdown by multiple seconds

2022-01-12 Thread Steven Schlansker (Jira)
Steven Schlansker created KAFKA-13593:
-

 Summary: ThrottledChannelReaper slows broker shutdown by multiple 
seconds
 Key: KAFKA-13593
 URL: https://issues.apache.org/jira/browse/KAFKA-13593
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 3.0.0
Reporter: Steven Schlansker


We run an embedded KRaft broker in integration tests, to test that our Producer 
/ Consumers are all hooked up and verify our overall pipeline.

While trying to deliver CI speed improvements, we noticed that the majority of 
time for a small test is actually spent in Kafka broker shutdown. From the 
logs, it looks like the ClientQuotaManager has multiple ThrottledChannelReaper 
threads and each of them takes up to a second to shutdown.
{code:java}
2022-01-12T15:26:31.932Z [main] INFO  kafka.log.LogManager - Shutdown complete.
2022-01-12T15:26:31.934Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Fetch]: 
Shutting down
2022-01-12T15:26:32.311Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Fetch]: 
Shutdown completed
2022-01-12T15:26:32.311Z [ThrottledChannelReaper-Fetch] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - [ThrottledChannelReaper-Fetch]: 
Stopped
2022-01-12T15:26:32.311Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-Produce]: Shutting down
2022-01-12T15:26:33.312Z [ThrottledChannelReaper-Produce] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-Produce]: Stopped
2022-01-12T15:26:33.312Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-Produce]: Shutdown completed
2022-01-12T15:26:33.312Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-Request]: Shutting down
2022-01-12T15:26:34.315Z [ThrottledChannelReaper-Request] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-Request]: Stopped
2022-01-12T15:26:34.315Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-Request]: Shutdown completed
2022-01-12T15:26:34.315Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-ControllerMutation]: Shutting down
2022-01-12T15:26:35.317Z [main] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-ControllerMutation]: Shutdown completed
2022-01-12T15:26:35.317Z [ThrottledChannelReaper-ControllerMutation] INFO  
k.s.ClientQuotaManager$ThrottledChannelReaper - 
[ThrottledChannelReaper-ControllerMutation]: Stopped{code}
Inspecting the code, the ThrottledChannelReaper threads are marked as not 
interruptible, so ShutdownableThread does not interrupt the worker on shutdown. 
Unfortunately, the doWork method polls with a 1 second timeout. So you wait up 
to 1s for every type of quota, in this case for a total of almost 4s.

 

While this is not a problem for production Kafka brokers, where instances are 
expected to stay up for months, in tests that expect to spin up and down it is 
easily noticed and adds real overhead to CI.

 

Suggested possible remediations:
 * Allow ThrottledChannelReaper to be interrupted by ShutdownableThread
 * ThrottledChannelReaper overrides initiateShutdown and after calling 
{{super.initiateShutdown}} then enqueues a {{null}} element on the delayQueue 
to force worker thread to notice shutdown state
 * Reduce 1s poll timeout to something small (carries overhead penalty for all 
users though, so this is less desirable), or make it configurable so we can set 
it to e.g. 10ms in unit tests



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] hachikuji commented on a change in pull request #11666: KAFKA-13591; Fix flaky test `ControllerIntegrationTest.testTopicIdCreatedOnUpgrade`

2022-01-12 Thread GitBox


hachikuji commented on a change in pull request #11666:
URL: https://github.com/apache/kafka/pull/11666#discussion_r783575296



##
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##
@@ -1127,27 +1127,29 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
 TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = 
assignment, servers = servers)
 waitForPartitionState(tp, firstControllerEpoch, controllerId, 
LeaderAndIsr.initialLeaderEpoch,
   "failed to get expected partition state upon topic creation")
-val topicIdAfterCreate = 
zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
-assertEquals(None, topicIdAfterCreate)
-val emptyTopicId = controller.controllerContext.topicIds.get("t")
-assertEquals(None, emptyTopicId)
+assertEquals(None, 
zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic))
+assertEquals(None, controller.controllerContext.topicIds.get(tp.topic))
 
 servers(controllerId).shutdown()
 servers(controllerId).awaitShutdown()
 servers = makeServers(1)
 TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed 
to elect a controller")
-val topicIdAfterUpgrade = 
zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
-assertNotEquals(emptyTopicId, topicIdAfterUpgrade)
+
+var topicIdAfterUpgrade = Option.empty[Uuid]
+TestUtils.waitUntilTrue(() => {

Review comment:
   Might be able to use `computeUntilTrue` instead?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13549) Add "delete interval" config

2022-01-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13549:

Description: 
KIP-811: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-811%3A+Add+config+min.repartition.purge.interval.ms+to+Kafka+Streams]

Kafka Streams uses "delete record" requests to aggressively purge data from 
repartition topics. Those request are sent each time we commit.

For at-least-once with a default commit interval of 30 seconds, this works 
fine. However, for exactly-once with a default commit interval of 100ms, it's 
very aggressive. The main issue is broker side, because the broker logs every 
"delete record" request, and thus broker logs are spammed if EOS is enabled.

We should consider to add a new config (eg `delete.record.interval.ms` or 
similar) to have a dedicated config for "delete record" requests, to decouple 
it from the commit interval config and allow to purge data less aggressively, 
even if the commit interval is small to avoid the broker side log spamming.

  was:
KIP-811: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-811%3A+Add+separate+delete.interval.ms+to+Kafka+Streams]

Kafka Streams uses "delete record" requests to aggressively purge data from 
repartition topics. Those request are sent each time we commit.

For at-least-once with a default commit interval of 30 seconds, this works 
fine. However, for exactly-once with a default commit interval of 100ms, it's 
very aggressive. The main issue is broker side, because the broker logs every 
"delete record" request, and thus broker logs are spammed if EOS is enabled.

We should consider to add a new config (eg `delete.record.interval.ms` or 
similar) to have a dedicated config for "delete record" requests, to decouple 
it from the commit interval config and allow to purge data less aggressively, 
even if the commit interval is small to avoid the broker side log spamming.


> Add "delete interval" config
> 
>
> Key: KAFKA-13549
> URL: https://issues.apache.org/jira/browse/KAFKA-13549
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Nicholas Telford
>Priority: Major
>  Labels: kip
>
> KIP-811: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-811%3A+Add+config+min.repartition.purge.interval.ms+to+Kafka+Streams]
> Kafka Streams uses "delete record" requests to aggressively purge data from 
> repartition topics. Those request are sent each time we commit.
> For at-least-once with a default commit interval of 30 seconds, this works 
> fine. However, for exactly-once with a default commit interval of 100ms, it's 
> very aggressive. The main issue is broker side, because the broker logs every 
> "delete record" request, and thus broker logs are spammed if EOS is enabled.
> We should consider to add a new config (eg `delete.record.interval.ms` or 
> similar) to have a dedicated config for "delete record" requests, to decouple 
> it from the commit interval config and allow to purge data less aggressively, 
> even if the commit interval is small to avoid the broker side log spamming.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)