[jira] [Created] (KAFKA-13581) Error getting old protocol
chenzhongyu created KAFKA-13581: --- Summary: Error getting old protocol Key: KAFKA-13581 URL: https://issues.apache.org/jira/browse/KAFKA-13581 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.5.0 Reporter: chenzhongyu Fix For: 2.5.0 In this case,oldProtocols will always be the protocols,because knownStaticMember is updated before.So, I think oldProtocol should be assigned before updateMember. {code:java} private def updateStaticMemberAndRebalance(group: GroupMetadata, newMemberId: String, groupInstanceId: Option[String], protocols: List[(String, Array[Byte])], responseCallback: JoinCallback): Unit = { val oldMemberId = group.getStaticMemberId(groupInstanceId) info(s"Static member $groupInstanceId of group ${group.groupId} with unknown member id rejoins, assigning new member id $newMemberId, while " + s"old member id $oldMemberId will be removed.") val currentLeader = group.leaderOrNull val member = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId) // Heartbeat of old member id will expire without effect since the group no longer contains that member id. // New heartbeat shall be scheduled with new member id. completeAndScheduleNextHeartbeatExpiration(group, member) val knownStaticMember = group.get(newMemberId) group.updateMember(knownStaticMember, protocols, responseCallback) val oldProtocols = knownStaticMember.supportedProtocols group.currentState match { case Stable => // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent val selectedProtocolOfNextGeneration = group.selectProtocol if (group.protocolName.contains(selectedProtocolOfNextGeneration)) { info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.") val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap groupManager.storeGroup(group, groupAssignment, error => { if (error != Errors.NONE) { warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}") // Failed to persist member.id of the given static member, revert the update of the static member in the group. group.updateMember(knownStaticMember, oldProtocols, null) val oldMember = group.replaceGroupInstance(newMemberId, oldMemberId, groupInstanceId) completeAndScheduleNextHeartbeatExpiration(group, oldMember) responseCallback(JoinGroupResult( List.empty, memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = currentLeader, error = error )) } else { group.maybeInvokeJoinCallback(member, JoinGroupResult( members = List.empty, memberId = newMemberId, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, // We want to avoid current leader performing trivial assignment while the group // is in stable stage, because the new assignment in leader's next sync call // won't be broadcast by a stable group. This could be guaranteed by // always returning the old leader id so that the current leader won't assume itself // as a leader based on the returned message, since the new member.id won't match // returned leader id, therefore no assignment will be performed. leaderId = currentLeader, error = Errors.NONE)) } }) } else { maybePrepareRebalance(group, s"Group's selectedProtocol will change because static member ${member.memberId} with instance id $groupInstanceId joined with change of protocol") } case CompletingRebalance => // if the group is in after-sync stage, upon getting a new join-group of a known static member // we should still trigger a new rebalance, since the old member may already be sent to the leader // for assignment, and hence when the assignment gets back there would be a mismatch of the old member id // with the new replaced member id. As a result the new member id would not get any assignment.
[GitHub] [kafka] showuon opened a new pull request #11661: [WIP] MINOR: shutdown thread test
showuon opened a new pull request #11661: URL: https://github.com/apache/kafka/pull/11661 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gurinderu closed pull request #11662: Add github ci
gurinderu closed pull request #11662: URL: https://github.com/apache/kafka/pull/11662 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17471710#comment-17471710 ] Eugen Dück commented on KAFKA-13289: Meanwhile I could reproduce the "Skipping record for expired segment" messages in our actual Kafka Streams app on my laptop. It occurred despite "max.task.idle.ms = Long.MAX". Not sure yet exactly how to reproduce it, but I tried feeding 1 million messages into the input topics a couple of times, 3 days ago. No "Skipping" messages then. But today - (having meanwhile restarted kafka instance, zookeeper, and the kafka apps),"Skipping" messages were being logged. Still don't know the exact circumstances to reproduce it. The app uses a 0ms join window with a 0ms grace phttp://mail-archives.apache.org/mod_mbox/kafka-users/202002.mbox/%3cCAB0tB9p_vijMS18jWXBqp7TQozL__ANoo3=h57q6z3y4hzt...@mail.gmail.com%3eeriod. If I were to increase join window or grace period (as Matthew said would fix the issue), then the outer join would involve so many more left/right pairs that the impact on performance would surely be immense, as we have many tens of thousands of messages per second. Here's a question - could a large difference between message timestamp (used for joining) and system time be causing the "Skipping record" messages? > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > --- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Matthew Sheppard >Priority: Minor > > When pushing bulk data through a kafka-steams app, I see it log the following > message many times... > {noformat} > WARN > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - > Skipping record for expired segment. > {noformat} > ...and data which I expect to have been joined through a leftJoin step > appears to be lost. > I've seen this in practice either when my application has been shut down for > a while and then is brought back up, or when I've used something like the > [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) > in an attempt to have the application reprocess past data. > I was able to reproduce this behaviour in isolation by generating 1000 > messages to two topics spaced an hour apart (with the original timestamps in > order), then having kafka streams select a key for them and try to leftJoin > the two rekeyed streams. > Self contained source code for that reproduction is available at > https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java > The actual kafka-streams topology in there looks like this. > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final KStream leftStream = > builder.stream(leftTopic); > final KStream rightStream = > builder.stream(rightTopic); > final KStream rekeyedLeftStream = leftStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > final KStream rekeyedRightStream = rightStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); > final KStream joined = rekeyedLeftStream.leftJoin( > rekeyedRightStream, > (left, right) -> left + "/" + right, > joinWindow > ); > {code} > ...and the eventual output I produce looks like this... > {code} > ... > 523 [523,left/null] > 524 [524,left/null, 524,left/524,right] > 525 [525,left/525,right] > 526 [526,left/null] > 527 [527,left/null] > 528 [528,left/528,right] > 529 [529,left/null] > 530 [530,left/null] > 531 [531,left/null, 531,left/531,right] > 532 [532,left/null] > 533 [533,left/null] > 534 [534,left/null, 534,left/534,right] > 535 [535,left/null] > 536 [536,left/null] > 537 [537,left/null, 537,left/537,right] > 538 [538,left/null] > 539 [539,left/null] > 540 [540,left/null] > 541 [541,left/null] > 542 [542,left/null] > 543 [543,left/null] > ... > {code} > ...where as, given the input data, I expect to see every row end with the two > values joined, rather than the right value being null. > Note that I understand it's expected that we initially get the left/null > values for many values since that's the expected semantics of kafka-streams > left join, at least until > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-Impro
[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17471729#comment-17471729 ] Eugen Dück commented on KAFKA-13289: After reading [https://lists.apache.org/thread/lqb1xn83ls0z0g81lpqklj809yxbpnx2] that was mentioned by Matthew, I want to note that in my case the "Skipping record for expired segment" messages in AbstractRocksDBSegmentedBytesStore do NOT occur on startup. Here's the procedure I used # start up the Kafka Streams app # send messages to input topics # observe "WARN org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - Skipping record for expired segment." messages > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > --- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Matthew Sheppard >Priority: Minor > > When pushing bulk data through a kafka-steams app, I see it log the following > message many times... > {noformat} > WARN > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - > Skipping record for expired segment. > {noformat} > ...and data which I expect to have been joined through a leftJoin step > appears to be lost. > I've seen this in practice either when my application has been shut down for > a while and then is brought back up, or when I've used something like the > [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) > in an attempt to have the application reprocess past data. > I was able to reproduce this behaviour in isolation by generating 1000 > messages to two topics spaced an hour apart (with the original timestamps in > order), then having kafka streams select a key for them and try to leftJoin > the two rekeyed streams. > Self contained source code for that reproduction is available at > https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java > The actual kafka-streams topology in there looks like this. > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final KStream leftStream = > builder.stream(leftTopic); > final KStream rightStream = > builder.stream(rightTopic); > final KStream rekeyedLeftStream = leftStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > final KStream rekeyedRightStream = rightStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); > final KStream joined = rekeyedLeftStream.leftJoin( > rekeyedRightStream, > (left, right) -> left + "/" + right, > joinWindow > ); > {code} > ...and the eventual output I produce looks like this... > {code} > ... > 523 [523,left/null] > 524 [524,left/null, 524,left/524,right] > 525 [525,left/525,right] > 526 [526,left/null] > 527 [527,left/null] > 528 [528,left/528,right] > 529 [529,left/null] > 530 [530,left/null] > 531 [531,left/null, 531,left/531,right] > 532 [532,left/null] > 533 [533,left/null] > 534 [534,left/null, 534,left/534,right] > 535 [535,left/null] > 536 [536,left/null] > 537 [537,left/null, 537,left/537,right] > 538 [538,left/null] > 539 [539,left/null] > 540 [540,left/null] > 541 [541,left/null] > 542 [542,left/null] > 543 [543,left/null] > ... > {code} > ...where as, given the input data, I expect to see every row end with the two > values joined, rather than the right value being null. > Note that I understand it's expected that we initially get the left/null > values for many values since that's the expected semantics of kafka-streams > left join, at least until > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious > I've noticed that if I set a very large grace value on the join window the > problem is solved, but since the input I provide is not out of order I did > not expect to need to do that, and I'm weary of the resource requirements > doing so in practice on an application with a lot of volume. > My suspicion is that something is happening such that when one partition is > processed it causes the stream time to be pushed forward to the newest > message in that partition, meaning when the next partition is then examined > it is found to contain many records which are 'too old' compa
[jira] [Comment Edited] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17471729#comment-17471729 ] Eugen Dück edited comment on KAFKA-13289 at 1/10/22, 7:46 AM: -- After reading [https://lists.apache.org/thread/lqb1xn83ls0z0g81lpqklj809yxbpnx2] that was mentioned by Matthew, I want to note that in my case (and of course also in Matthew's self-contained case) the "Skipping record for expired segment" messages in do NOT occur on startup. Here's the procedure I used # start up the Kafka Streams app # wait a couple of seconds # send messages to input topics # observe "WARN org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - Skipping record for expired segment." messages was (Author: eugendueck): After reading [https://lists.apache.org/thread/lqb1xn83ls0z0g81lpqklj809yxbpnx2] that was mentioned by Matthew, I want to note that in my case the "Skipping record for expired segment" messages in AbstractRocksDBSegmentedBytesStore do NOT occur on startup. Here's the procedure I used # start up the Kafka Streams app # send messages to input topics # observe "WARN org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - Skipping record for expired segment." messages > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > --- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Matthew Sheppard >Priority: Minor > > When pushing bulk data through a kafka-steams app, I see it log the following > message many times... > {noformat} > WARN > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - > Skipping record for expired segment. > {noformat} > ...and data which I expect to have been joined through a leftJoin step > appears to be lost. > I've seen this in practice either when my application has been shut down for > a while and then is brought back up, or when I've used something like the > [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) > in an attempt to have the application reprocess past data. > I was able to reproduce this behaviour in isolation by generating 1000 > messages to two topics spaced an hour apart (with the original timestamps in > order), then having kafka streams select a key for them and try to leftJoin > the two rekeyed streams. > Self contained source code for that reproduction is available at > https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java > The actual kafka-streams topology in there looks like this. > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final KStream leftStream = > builder.stream(leftTopic); > final KStream rightStream = > builder.stream(rightTopic); > final KStream rekeyedLeftStream = leftStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > final KStream rekeyedRightStream = rightStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); > final KStream joined = rekeyedLeftStream.leftJoin( > rekeyedRightStream, > (left, right) -> left + "/" + right, > joinWindow > ); > {code} > ...and the eventual output I produce looks like this... > {code} > ... > 523 [523,left/null] > 524 [524,left/null, 524,left/524,right] > 525 [525,left/525,right] > 526 [526,left/null] > 527 [527,left/null] > 528 [528,left/528,right] > 529 [529,left/null] > 530 [530,left/null] > 531 [531,left/null, 531,left/531,right] > 532 [532,left/null] > 533 [533,left/null] > 534 [534,left/null, 534,left/534,right] > 535 [535,left/null] > 536 [536,left/null] > 537 [537,left/null, 537,left/537,right] > 538 [538,left/null] > 539 [539,left/null] > 540 [540,left/null] > 541 [541,left/null] > 542 [542,left/null] > 543 [543,left/null] > ... > {code} > ...where as, given the input data, I expect to see every row end with the two > values joined, rather than the right value being null. > Note that I understand it's expected that we initially get the left/null > values for many values since that's the expected semantics of kafka-streams > left join, at least until > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream