[jira] [Created] (KAFKA-13581) Error getting old protocol

2022-01-09 Thread chenzhongyu (Jira)
chenzhongyu created KAFKA-13581:
---

 Summary: Error getting old protocol
 Key: KAFKA-13581
 URL: https://issues.apache.org/jira/browse/KAFKA-13581
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.5.0
Reporter: chenzhongyu
 Fix For: 2.5.0


In this case,oldProtocols will always be the protocols,because 
knownStaticMember is updated before.So, I think oldProtocol should be assigned 
before updateMember.
{code:java}
private def updateStaticMemberAndRebalance(group: GroupMetadata,
   newMemberId: String,
   groupInstanceId: Option[String],
   protocols: List[(String, 
Array[Byte])],
   responseCallback: JoinCallback): 
Unit = {
  val oldMemberId = group.getStaticMemberId(groupInstanceId)
  info(s"Static member $groupInstanceId of group ${group.groupId} with unknown 
member id rejoins, assigning new member id $newMemberId, while " +
s"old member id $oldMemberId will be removed.")

  val currentLeader = group.leaderOrNull
  val member = group.replaceGroupInstance(oldMemberId, newMemberId, 
groupInstanceId)
  // Heartbeat of old member id will expire without effect since the group no 
longer contains that member id.
  // New heartbeat shall be scheduled with new member id.
  completeAndScheduleNextHeartbeatExpiration(group, member)

  val knownStaticMember = group.get(newMemberId)
  group.updateMember(knownStaticMember, protocols, responseCallback)
  val oldProtocols = knownStaticMember.supportedProtocols

  group.currentState match {
case Stable =>
  // check if group's selectedProtocol of next generation will change, if 
not, simply store group to persist the
  // updated static member, if yes, rebalance should be triggered to let 
the group's assignment and selectProtocol consistent
  val selectedProtocolOfNextGeneration = group.selectProtocol
  if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
info(s"Static member which joins during Stable stage and doesn't affect 
selectProtocol will not trigger rebalance.")
val groupAssignment: Map[String, Array[Byte]] = 
group.allMemberMetadata.map(member => member.memberId -> 
member.assignment).toMap
groupManager.storeGroup(group, groupAssignment, error => {
  if (error != Errors.NONE) {
warn(s"Failed to persist metadata for group ${group.groupId}: 
${error.message}")

// Failed to persist member.id of the given static member, revert 
the update of the static member in the group.
group.updateMember(knownStaticMember, oldProtocols, null)
val oldMember = group.replaceGroupInstance(newMemberId, 
oldMemberId, groupInstanceId)
completeAndScheduleNextHeartbeatExpiration(group, oldMember)
responseCallback(JoinGroupResult(
  List.empty,
  memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
  generationId = group.generationId,
  protocolType = group.protocolType,
  protocolName = group.protocolName,
  leaderId = currentLeader,
  error = error
))
  } else {
group.maybeInvokeJoinCallback(member, JoinGroupResult(
  members = List.empty,
  memberId = newMemberId,
  generationId = group.generationId,
  protocolType = group.protocolType,
  protocolName = group.protocolName,
  // We want to avoid current leader performing trivial assignment 
while the group
  // is in stable stage, because the new assignment in leader's 
next sync call
  // won't be broadcast by a stable group. This could be guaranteed 
by
  // always returning the old leader id so that the current leader 
won't assume itself
  // as a leader based on the returned message, since the new 
member.id won't match
  // returned leader id, therefore no assignment will be performed.
  leaderId = currentLeader,
  error = Errors.NONE))
  }
})
  } else {
maybePrepareRebalance(group, s"Group's selectedProtocol will change 
because static member ${member.memberId} with instance id $groupInstanceId 
joined with change of protocol")
  }
case CompletingRebalance =>
  // if the group is in after-sync stage, upon getting a new join-group of 
a known static member
  // we should still trigger a new rebalance, since the old member may 
already be sent to the leader
  // for assignment, and hence when the assignment gets back there would be 
a mismatch of the old member id
  // with the new replaced member id. As a result the new member id would 
not get any assignment.

[GitHub] [kafka] showuon opened a new pull request #11661: [WIP] MINOR: shutdown thread test

2022-01-09 Thread GitBox


showuon opened a new pull request #11661:
URL: https://github.com/apache/kafka/pull/11661


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gurinderu closed pull request #11662: Add github ci

2022-01-09 Thread GitBox


gurinderu closed pull request #11662:
URL: https://github.com/apache/kafka/pull/11662


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2022-01-09 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17471710#comment-17471710
 ] 

Eugen Dück commented on KAFKA-13289:


Meanwhile I could reproduce the "Skipping record for expired segment" messages 
in our actual Kafka Streams app on my laptop. It occurred despite 
"max.task.idle.ms = Long.MAX". Not sure yet exactly how to reproduce it, but I 
tried feeding 1 million messages into the input topics a couple of times, 3 
days ago. No "Skipping" messages then. But today - (having meanwhile restarted 
kafka instance, zookeeper, and the kafka apps),"Skipping" messages were being 
logged.

Still don't know the exact circumstances to reproduce it.

The app uses a 0ms join window with a 0ms grace 
phttp://mail-archives.apache.org/mod_mbox/kafka-users/202002.mbox/%3cCAB0tB9p_vijMS18jWXBqp7TQozL__ANoo3=h57q6z3y4hzt...@mail.gmail.com%3eeriod.
 If I were to increase join window or grace period (as Matthew said would fix 
the issue), then the outer join would involve so many more left/right pairs 
that the impact on performance would surely be immense, as we have many tens of 
thousands of messages per second.

Here's a question - could a large difference between message timestamp (used 
for joining) and system time be causing the "Skipping record" messages?

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> ---
>
> Key: KAFKA-13289
> URL: https://issues.apache.org/jira/browse/KAFKA-13289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Matthew Sheppard
>Priority: Minor
>
> When pushing bulk data through a kafka-steams app, I see it log the following 
> message many times...
> {noformat}
> WARN 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
> Skipping record for expired segment.
> {noformat}
> ...and data which I expect to have been joined through a leftJoin step 
> appears to be lost.
> I've seen this in practice either when my application has been shut down for 
> a while and then is brought back up, or when I've used something like the 
> [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
>  in an attempt to have the application reprocess past data.
> I was able to reproduce this behaviour in isolation by generating 1000 
> messages to two topics spaced an hour apart (with the original timestamps in 
> order), then having kafka streams select a key for them and try to leftJoin 
> the two rekeyed streams.
> Self contained source code for that reproduction is available at 
> https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
> The actual kafka-streams topology in there looks like this.
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream leftStream = 
> builder.stream(leftTopic);
> final KStream rightStream = 
> builder.stream(rightTopic);
> final KStream rekeyedLeftStream = leftStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> final KStream rekeyedRightStream = rightStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
> final KStream joined = rekeyedLeftStream.leftJoin(
> rekeyedRightStream,
> (left, right) -> left + "/" + right,
> joinWindow
> );
> {code}
> ...and the eventual output I produce looks like this...
> {code}
> ...
> 523 [523,left/null]
> 524 [524,left/null, 524,left/524,right]
> 525 [525,left/525,right]
> 526 [526,left/null]
> 527 [527,left/null]
> 528 [528,left/528,right]
> 529 [529,left/null]
> 530 [530,left/null]
> 531 [531,left/null, 531,left/531,right]
> 532 [532,left/null]
> 533 [533,left/null]
> 534 [534,left/null, 534,left/534,right]
> 535 [535,left/null]
> 536 [536,left/null]
> 537 [537,left/null, 537,left/537,right]
> 538 [538,left/null]
> 539 [539,left/null]
> 540 [540,left/null]
> 541 [541,left/null]
> 542 [542,left/null]
> 543 [543,left/null]
> ...
> {code}
> ...where as, given the input data, I expect to see every row end with the two 
> values joined, rather than the right value being null.
> Note that I understand it's expected that we initially get the left/null 
> values for many values since that's the expected semantics of kafka-streams 
> left join, at least until 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-Impro

[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2022-01-09 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17471729#comment-17471729
 ] 

Eugen Dück commented on KAFKA-13289:


After reading 
[https://lists.apache.org/thread/lqb1xn83ls0z0g81lpqklj809yxbpnx2] that was 
mentioned by Matthew, I want to note that in my case the "Skipping record for 
expired segment" messages in AbstractRocksDBSegmentedBytesStore do NOT occur on 
startup.

Here's the procedure I used
 # start up the Kafka Streams app
 # send messages to input topics
 # observe "WARN 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
Skipping record for expired segment." messages

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> ---
>
> Key: KAFKA-13289
> URL: https://issues.apache.org/jira/browse/KAFKA-13289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Matthew Sheppard
>Priority: Minor
>
> When pushing bulk data through a kafka-steams app, I see it log the following 
> message many times...
> {noformat}
> WARN 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
> Skipping record for expired segment.
> {noformat}
> ...and data which I expect to have been joined through a leftJoin step 
> appears to be lost.
> I've seen this in practice either when my application has been shut down for 
> a while and then is brought back up, or when I've used something like the 
> [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
>  in an attempt to have the application reprocess past data.
> I was able to reproduce this behaviour in isolation by generating 1000 
> messages to two topics spaced an hour apart (with the original timestamps in 
> order), then having kafka streams select a key for them and try to leftJoin 
> the two rekeyed streams.
> Self contained source code for that reproduction is available at 
> https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
> The actual kafka-streams topology in there looks like this.
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream leftStream = 
> builder.stream(leftTopic);
> final KStream rightStream = 
> builder.stream(rightTopic);
> final KStream rekeyedLeftStream = leftStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> final KStream rekeyedRightStream = rightStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
> final KStream joined = rekeyedLeftStream.leftJoin(
> rekeyedRightStream,
> (left, right) -> left + "/" + right,
> joinWindow
> );
> {code}
> ...and the eventual output I produce looks like this...
> {code}
> ...
> 523 [523,left/null]
> 524 [524,left/null, 524,left/524,right]
> 525 [525,left/525,right]
> 526 [526,left/null]
> 527 [527,left/null]
> 528 [528,left/528,right]
> 529 [529,left/null]
> 530 [530,left/null]
> 531 [531,left/null, 531,left/531,right]
> 532 [532,left/null]
> 533 [533,left/null]
> 534 [534,left/null, 534,left/534,right]
> 535 [535,left/null]
> 536 [536,left/null]
> 537 [537,left/null, 537,left/537,right]
> 538 [538,left/null]
> 539 [539,left/null]
> 540 [540,left/null]
> 541 [541,left/null]
> 542 [542,left/null]
> 543 [543,left/null]
> ...
> {code}
> ...where as, given the input data, I expect to see every row end with the two 
> values joined, rather than the right value being null.
> Note that I understand it's expected that we initially get the left/null 
> values for many values since that's the expected semantics of kafka-streams 
> left join, at least until 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious
> I've noticed that if I set a very large grace value on the join window the 
> problem is solved, but since the input I provide is not out of order I did 
> not expect to need to do that, and I'm weary of the resource requirements 
> doing so in practice on an application with a lot of volume.
> My suspicion is that something is happening such that when one partition is 
> processed it causes the stream time to be pushed forward to the newest 
> message in that partition, meaning when the next partition is then examined 
> it is found to contain many records which are 'too old' compa

[jira] [Comment Edited] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2022-01-09 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17471729#comment-17471729
 ] 

Eugen Dück edited comment on KAFKA-13289 at 1/10/22, 7:46 AM:
--

After reading 
[https://lists.apache.org/thread/lqb1xn83ls0z0g81lpqklj809yxbpnx2] that was 
mentioned by Matthew, I want to note that in my case (and of course also in 
Matthew's self-contained case) the "Skipping record for expired segment" 
messages in do NOT occur on startup.

Here's the procedure I used
 # start up the Kafka Streams app
 # wait a couple of seconds
 # send messages to input topics
 # observe "WARN 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
Skipping record for expired segment." messages


was (Author: eugendueck):
After reading 
[https://lists.apache.org/thread/lqb1xn83ls0z0g81lpqklj809yxbpnx2] that was 
mentioned by Matthew, I want to note that in my case the "Skipping record for 
expired segment" messages in AbstractRocksDBSegmentedBytesStore do NOT occur on 
startup.

Here's the procedure I used
 # start up the Kafka Streams app
 # send messages to input topics
 # observe "WARN 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
Skipping record for expired segment." messages

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> ---
>
> Key: KAFKA-13289
> URL: https://issues.apache.org/jira/browse/KAFKA-13289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Matthew Sheppard
>Priority: Minor
>
> When pushing bulk data through a kafka-steams app, I see it log the following 
> message many times...
> {noformat}
> WARN 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
> Skipping record for expired segment.
> {noformat}
> ...and data which I expect to have been joined through a leftJoin step 
> appears to be lost.
> I've seen this in practice either when my application has been shut down for 
> a while and then is brought back up, or when I've used something like the 
> [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
>  in an attempt to have the application reprocess past data.
> I was able to reproduce this behaviour in isolation by generating 1000 
> messages to two topics spaced an hour apart (with the original timestamps in 
> order), then having kafka streams select a key for them and try to leftJoin 
> the two rekeyed streams.
> Self contained source code for that reproduction is available at 
> https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
> The actual kafka-streams topology in there looks like this.
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream leftStream = 
> builder.stream(leftTopic);
> final KStream rightStream = 
> builder.stream(rightTopic);
> final KStream rekeyedLeftStream = leftStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> final KStream rekeyedRightStream = rightStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
> final KStream joined = rekeyedLeftStream.leftJoin(
> rekeyedRightStream,
> (left, right) -> left + "/" + right,
> joinWindow
> );
> {code}
> ...and the eventual output I produce looks like this...
> {code}
> ...
> 523 [523,left/null]
> 524 [524,left/null, 524,left/524,right]
> 525 [525,left/525,right]
> 526 [526,left/null]
> 527 [527,left/null]
> 528 [528,left/528,right]
> 529 [529,left/null]
> 530 [530,left/null]
> 531 [531,left/null, 531,left/531,right]
> 532 [532,left/null]
> 533 [533,left/null]
> 534 [534,left/null, 534,left/534,right]
> 535 [535,left/null]
> 536 [536,left/null]
> 537 [537,left/null, 537,left/537,right]
> 538 [538,left/null]
> 539 [539,left/null]
> 540 [540,left/null]
> 541 [541,left/null]
> 542 [542,left/null]
> 543 [543,left/null]
> ...
> {code}
> ...where as, given the input data, I expect to see every row end with the two 
> values joined, rather than the right value being null.
> Note that I understand it's expected that we initially get the left/null 
> values for many values since that's the expected semantics of kafka-streams 
> left join, at least until 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream