[GitHub] [kafka] GOODBOY008 commented on pull request #11644: KAFKA-13433:JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.
GOODBOY008 commented on pull request #11644: URL: https://github.com/apache/kafka/pull/11644#issuecomment-1004601251 @ewencp Please cc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #11642: MINOR: Improve Connect docs
mimaison commented on a change in pull request #11642: URL: https://github.com/apache/kafka/pull/11642#discussion_r777904250 ## File path: docs/connect.html ## @@ -41,7 +41,7 @@ Running Kafka ConnectIn standalone mode all work is performed in a single process. This configuration is simpler to setup and get started with and may be useful in situations where only one worker makes sense (e.g. collecting log files), but it does not benefit from some of the features of Kafka Connect such as fault tolerance. You can start a standalone process with the following command: -> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...] +> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...] Review comment: Good point! I've pushed an update -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] richard-axual commented on a change in pull request #11535: KAFKA-13476: Increase resilience timestamp decoding Kafka Streams
richard-axual commented on a change in pull request #11535: URL: https://github.com/apache/kafka/pull/11535#discussion_r777961628 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -1097,15 +1097,20 @@ long decodeTimestamp(final String encryptedString) { if (encryptedString.isEmpty()) { return RecordQueue.UNKNOWN; } -final ByteBuffer buffer = ByteBuffer.wrap(Base64.getDecoder().decode(encryptedString)); -final byte version = buffer.get(); -switch (version) { -case LATEST_MAGIC_BYTE: -return buffer.getLong(); -default: -log.warn("Unsupported offset metadata version found. Supported version {}. Found version {}.", - LATEST_MAGIC_BYTE, version); -return RecordQueue.UNKNOWN; +try { +final ByteBuffer buffer = ByteBuffer.wrap(Base64.getDecoder().decode(encryptedString)); +final byte version = buffer.get(); +switch (version) { +case LATEST_MAGIC_BYTE: +return buffer.getLong(); +default: +log.warn("Unsupported offset metadata version found. Supported version {}. Found version {}.", +LATEST_MAGIC_BYTE, version); +return RecordQueue.UNKNOWN; +} +} catch (final IllegalArgumentException argumentException) { Review comment: @mjsax I've updated the implementation, can you take a look again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #11564: MINOR: improve consoleProducer option description
mimaison merged pull request #11564: URL: https://github.com/apache/kafka/pull/11564 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #11647: KAFKA-13567: adminClient exponential backoff implementation for KIP-580
showuon opened a new pull request #11647: URL: https://github.com/apache/kafka/pull/11647 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdshadabin commented on pull request #10056: KAFKA-12293: remove JCenter and Bintray repositories mentions out of build gradle scripts (sunset is announced for those repositories)
mdshadabin commented on pull request #10056: URL: https://github.com/apache/kafka/pull/10056#issuecomment-1004827722 > i still see issue with kafka 2.8.0 few days back i have downloaded kafka 2.8.0 source code (from https://kafka.apache.org/downloads ) and build binaries (following https://github.com/apache/kafka/tree/2.8) . Although build is successful after changing gradle plugin url in gradle/buildscript.gradle (Since url 'https://dl.bintray.com/content/netflixoss/external-gradle-plugins/' is down i have changed to url 'https://plugins.gradle.org/m2/' ) but unit [index.html.zip](https://github.com/apache/kafka/files/7808015/index.html.zip) test is falling. I have attached unit test result. My env details: Gradle 6.8.3 java version "11.0.13" 2021-10-19 LTS Java(TM) SE Runtime Environment 18.9 (build 11.0.13+10-LTS-370) Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.13+10-LTS-370, mixed mode) Scala code runner version 2.13.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lhunyady commented on pull request #11333: KAFKA-13306: Null connector config value passes validation, but fails creation
lhunyady commented on pull request #11333: URL: https://github.com/apache/kafka/pull/11333#issuecomment-1004848659 Thank you @C0urante, I have updated the pull request with your valuable suggestions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13574) NotLeaderOrFollowerException thrown for a successful send
[ https://issues.apache.org/jira/browse/KAFKA-13574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17468697#comment-17468697 ] Kyle Kingsbury commented on KAFKA-13574: > NOT_LEADER_OR_FOLLOWER is considered an indefinite error Aha! In that case this isn't a bug at all, and we can close it! :) Is there documentation for which errors are definite vs indefinite? I've been relying on the javadocs and asking kafka users how they interpret these errors, and that may have led me astray here. > NotLeaderOrFollowerException thrown for a successful send > - > > Key: KAFKA-13574 > URL: https://issues.apache.org/jira/browse/KAFKA-13574 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0 > Environment: openjdk version "11.0.13" 2021-10-19 >Reporter: Kyle Kingsbury >Priority: Minor > Labels: error-handling > > With org.apache.kafka/kafka-clients 3.0.0, under rare circumstances involving > multiple node and network failures, I've observed a call to `producer.send()` > throw `NotLeaderOrFollowerException` for a message which later appears in > `consumer.poll()` return values. > I don't have a reliable repro case for this yet, but the case I hit involved > retries=1000, acks=all, and idempotence enabled. I suspect what might be > happening here is that an initial attempt to send the message makes it to the > server and is committed, but the acknowledgement is lost e.g. due to timeout; > the Kafka producer then automatically retries the send attempt, and on that > retry hits a NotLeaderOrFollowerException, which is thrown back to the > caller. If we interpret NotLeaderOrFollowerException as a definite failure, > then this would constitute an aborted read. > I've seen issues like this in a number of databases around client or > server-internal retry mechanisms, and I think the thing to do is: rather than > throwing the most *recent* error, throw the {*}most indefinite{*}. That way > clients know that their request may have actually succeeded, and they won't > (e.g.) attempt to re-submit a non-idempotent request again. > As a side note: is there... perhaps documentation on which errors in Kafka > are supposed to be definite vs indefinite? NotLeaderOrFollowerException is a > subclass of RetriableException, but it looks like RetriableException is more > about transient vs permanent errors than whether it's safe to retry. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] liuchang0520 commented on pull request #11641: MINOR: update the comment for Utils.atomicMoveWithFallback
liuchang0520 commented on pull request #11641: URL: https://github.com/apache/kafka/pull/11641#issuecomment-1004972135 Hi @showuon , can you help me to merge this PR? I don't have write access. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11609: KAFKA-12648: fixes for query APIs and blocking calls
guozhangwang commented on a change in pull request #11609: URL: https://github.com/apache/kafka/pull/11609#discussion_r778297858 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java ## @@ -296,9 +297,10 @@ private boolean hasPartitionsForAnyTopics(final List topicNames, final S } private Set getStoresOnHost(final Map> storeToSourceTopics, -final Set sourceTopicPartitions, final String topologyName) { +final Set sourceTopicPartitions, +final String topologyName) { final InternalTopologyBuilder builder = topologyMetadata.lookupBuilderForNamedTopology(topologyName); -final Set sourceTopicNames = builder.sourceTopicNames(); +final Collection sourceTopicNames = builder.sourceTopicCollection(); Review comment: Ack! Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11624: KAFKA-13553: add PAPI KV store tests for IQv2
guozhangwang commented on a change in pull request #11624: URL: https://github.com/apache/kafka/pull/11624#discussion_r778303747 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ## @@ -365,109 +395,200 @@ public void beforeTest() { final Properties streamsConfig = streamsConfiguration( cache, log, -storeToTest.name() +storeToTest.name(), +kind ); final StreamsBuilder builder = new StreamsBuilder(); -if (supplier instanceof KeyValueBytesStoreSupplier) { -final Materialized> materialized = -Materialized.as((KeyValueBytesStoreSupplier) supplier); +if (Objects.equals(kind, "DSL") && supplier instanceof KeyValueBytesStoreSupplier) { +setUpKeyValueDSLTopology((KeyValueBytesStoreSupplier) supplier, builder); +} else if (Objects.equals(kind, "PAPI") && supplier instanceof KeyValueBytesStoreSupplier) { +setUpKeyValuePAPITopology((KeyValueBytesStoreSupplier) supplier, builder); +} else if (Objects.equals(kind, "DSL") && supplier instanceof WindowBytesStoreSupplier) { +setUpWindowDSLTopology((WindowBytesStoreSupplier) supplier, builder); +} else if (Objects.equals(kind, "PAPI") && supplier instanceof WindowBytesStoreSupplier) { +throw new AssumptionViolatedException("Case not implemented yet"); +} else if (Objects.equals(kind, "DSL") && supplier instanceof SessionBytesStoreSupplier) { +setUpSessionDSLTopology((SessionBytesStoreSupplier) supplier, builder); +} else if (Objects.equals(kind, "PAPI") && supplier instanceof SessionBytesStoreSupplier) { +throw new AssumptionViolatedException("Case not implemented yet"); +} else { +throw new AssertionError("Store supplier is an unrecognized type."); +} -if (cache) { -materialized.withCachingEnabled(); -} else { -materialized.withCachingDisabled(); -} +// Don't need to wait for running, since tests can use iqv2 to wait until they +// get a valid response. -if (log) { -materialized.withLoggingEnabled(Collections.emptyMap()); -} else { -materialized.withCachingDisabled(); -} +kafkaStreams = +IntegrationTestUtils.getStartedStreams( +streamsConfig, +builder, +true +); +} -if (storeToTest.global()) { -builder.globalTable( -INPUT_TOPIC_NAME, -Consumed.with(Serdes.Integer(), Serdes.Integer()), -materialized -); -} else { -builder.table( -INPUT_TOPIC_NAME, -Consumed.with(Serdes.Integer(), Serdes.Integer()), -materialized -); -} -} else if (supplier instanceof WindowBytesStoreSupplier) { -final Materialized> materialized = -Materialized.as((WindowBytesStoreSupplier) supplier); +private void setUpSessionDSLTopology(final SessionBytesStoreSupplier supplier, + final StreamsBuilder builder) { +final Materialized> materialized = +Materialized.as(supplier); -if (cache) { -materialized.withCachingEnabled(); -} else { -materialized.withCachingDisabled(); -} +if (cache) { +materialized.withCachingEnabled(); +} else { +materialized.withCachingDisabled(); +} -if (log) { -materialized.withLoggingEnabled(Collections.emptyMap()); -} else { -materialized.withCachingDisabled(); -} +if (log) { +materialized.withLoggingEnabled(Collections.emptyMap()); +} else { +materialized.withCachingDisabled(); Review comment: withLoggingDisabled? ## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ## @@ -365,109 +395,200 @@ public void beforeTest() { final Properties streamsConfig = streamsConfiguration( cache, log, -storeToTest.name() +storeToTest.name(), +kind ); final StreamsBuilder builder = new StreamsBuilder(); -if (supplier instanceof KeyValueBytesStoreSupplier) { -final Materialized> materialized = -Materialized.as((KeyValueBytesStoreSupplier) supplier); +if (Objects.equals(kind, "DSL") && supplier instanceof KeyValueBytesStoreSupplier) { +setUpKeyVa
[GitHub] [kafka] guozhangwang commented on pull request #11624: KAFKA-13553: add PAPI KV store tests for IQv2
guozhangwang commented on pull request #11624: URL: https://github.com/apache/kafka/pull/11624#issuecomment-1005074050 Hey @vvcephei , please feel free to merge after addressed the comments and resolved conflicts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17468879#comment-17468879 ] Matthias J. Sax commented on KAFKA-13289: - {quote}However in the cases mentioned here, there is a lack of output, not an early emission. So I fear these issues are different. {quote} Thanks for double checking. I just wanted to ensure, that those `null` results are not interpret as missing data (there could be a later inner join result). The log line clearly indicates that a record was dropped on the floor: we do a put() into the state store, but the record is too old, and the state store drop it. We know that timestamp synchronization in v2.8 still has some gaps. I was also just looking into the GitHub repos, and both don't use `max.task.idle.ms` config. Can you try to increase this config and/or switch to 3.0? Cf [https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization] > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > --- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Matthew Sheppard >Priority: Minor > > When pushing bulk data through a kafka-steams app, I see it log the following > message many times... > {noformat} > WARN > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - > Skipping record for expired segment. > {noformat} > ...and data which I expect to have been joined through a leftJoin step > appears to be lost. > I've seen this in practice either when my application has been shut down for > a while and then is brought back up, or when I've used something like the > [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) > in an attempt to have the application reprocess past data. > I was able to reproduce this behaviour in isolation by generating 1000 > messages to two topics spaced an hour apart (with the original timestamps in > order), then having kafka streams select a key for them and try to leftJoin > the two rekeyed streams. > Self contained source code for that reproduction is available at > https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java > The actual kafka-streams topology in there looks like this. > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final KStream leftStream = > builder.stream(leftTopic); > final KStream rightStream = > builder.stream(rightTopic); > final KStream rekeyedLeftStream = leftStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > final KStream rekeyedRightStream = rightStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); > final KStream joined = rekeyedLeftStream.leftJoin( > rekeyedRightStream, > (left, right) -> left + "/" + right, > joinWindow > ); > {code} > ...and the eventual output I produce looks like this... > {code} > ... > 523 [523,left/null] > 524 [524,left/null, 524,left/524,right] > 525 [525,left/525,right] > 526 [526,left/null] > 527 [527,left/null] > 528 [528,left/528,right] > 529 [529,left/null] > 530 [530,left/null] > 531 [531,left/null, 531,left/531,right] > 532 [532,left/null] > 533 [533,left/null] > 534 [534,left/null, 534,left/534,right] > 535 [535,left/null] > 536 [536,left/null] > 537 [537,left/null, 537,left/537,right] > 538 [538,left/null] > 539 [539,left/null] > 540 [540,left/null] > 541 [541,left/null] > 542 [542,left/null] > 543 [543,left/null] > ... > {code} > ...where as, given the input data, I expect to see every row end with the two > values joined, rather than the right value being null. > Note that I understand it's expected that we initially get the left/null > values for many values since that's the expected semantics of kafka-streams > left join, at least until > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious > I've noticed that if I set a very large grace value on the join window the > problem is solved, but since the input I provide is not out of order I did > not expect to need to do that, and I'm weary of the resource requirements > doing so in practice on an applicat
[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17468912#comment-17468912 ] Eugen Dück commented on KAFKA-13289: [~mjsax] I tried setting `max.task.idle.ms` like so: {color:#00}streamsConfiguration{color}.put({color:#00}StreamsConfig{color}.{color:#871094}MAX_TASK_IDLE_MS_CONFIG{color}, {color:#067d17}"" {color}+ {color:#00}Long{color}.{color:#871094}MAX_VALUE{color}); However, the "Skipping record for expired segment." messages are logged nevertheless. I also tried lower values like 1 million, to no avail. This test runs on Kafka 2.8.0 > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > --- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Matthew Sheppard >Priority: Minor > > When pushing bulk data through a kafka-steams app, I see it log the following > message many times... > {noformat} > WARN > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - > Skipping record for expired segment. > {noformat} > ...and data which I expect to have been joined through a leftJoin step > appears to be lost. > I've seen this in practice either when my application has been shut down for > a while and then is brought back up, or when I've used something like the > [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) > in an attempt to have the application reprocess past data. > I was able to reproduce this behaviour in isolation by generating 1000 > messages to two topics spaced an hour apart (with the original timestamps in > order), then having kafka streams select a key for them and try to leftJoin > the two rekeyed streams. > Self contained source code for that reproduction is available at > https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java > The actual kafka-streams topology in there looks like this. > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final KStream leftStream = > builder.stream(leftTopic); > final KStream rightStream = > builder.stream(rightTopic); > final KStream rekeyedLeftStream = leftStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > final KStream rekeyedRightStream = rightStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); > final KStream joined = rekeyedLeftStream.leftJoin( > rekeyedRightStream, > (left, right) -> left + "/" + right, > joinWindow > ); > {code} > ...and the eventual output I produce looks like this... > {code} > ... > 523 [523,left/null] > 524 [524,left/null, 524,left/524,right] > 525 [525,left/525,right] > 526 [526,left/null] > 527 [527,left/null] > 528 [528,left/528,right] > 529 [529,left/null] > 530 [530,left/null] > 531 [531,left/null, 531,left/531,right] > 532 [532,left/null] > 533 [533,left/null] > 534 [534,left/null, 534,left/534,right] > 535 [535,left/null] > 536 [536,left/null] > 537 [537,left/null, 537,left/537,right] > 538 [538,left/null] > 539 [539,left/null] > 540 [540,left/null] > 541 [541,left/null] > 542 [542,left/null] > 543 [543,left/null] > ... > {code} > ...where as, given the input data, I expect to see every row end with the two > values joined, rather than the right value being null. > Note that I understand it's expected that we initially get the left/null > values for many values since that's the expected semantics of kafka-streams > left join, at least until > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious > I've noticed that if I set a very large grace value on the join window the > problem is solved, but since the input I provide is not out of order I did > not expect to need to do that, and I'm weary of the resource requirements > doing so in practice on an application with a lot of volume. > My suspicion is that something is happening such that when one partition is > processed it causes the stream time to be pushed forward to the newest > message in that partition, meaning when the next partition is then examined > it is found to contain many records which are 'too old' compared to the > stream time. > I ran across this disc
[GitHub] [kafka] guozhangwang commented on pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
guozhangwang commented on pull request #11424: URL: https://github.com/apache/kafka/pull/11424#issuecomment-1005247992 Thanks @vamossagar12 , I do not have further comments now, re-triggering the jenkins tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17468913#comment-17468913 ] Eugen Dück commented on KAFKA-13289: [~mjsax] I just retried with confluentinc/cp-kafka:7.0.1, i.e. Kafka 3.0, and "Skipping record for expired segment." still gets logged. > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > --- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Matthew Sheppard >Priority: Minor > > When pushing bulk data through a kafka-steams app, I see it log the following > message many times... > {noformat} > WARN > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - > Skipping record for expired segment. > {noformat} > ...and data which I expect to have been joined through a leftJoin step > appears to be lost. > I've seen this in practice either when my application has been shut down for > a while and then is brought back up, or when I've used something like the > [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) > in an attempt to have the application reprocess past data. > I was able to reproduce this behaviour in isolation by generating 1000 > messages to two topics spaced an hour apart (with the original timestamps in > order), then having kafka streams select a key for them and try to leftJoin > the two rekeyed streams. > Self contained source code for that reproduction is available at > https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java > The actual kafka-streams topology in there looks like this. > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final KStream leftStream = > builder.stream(leftTopic); > final KStream rightStream = > builder.stream(rightTopic); > final KStream rekeyedLeftStream = leftStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > final KStream rekeyedRightStream = rightStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); > final KStream joined = rekeyedLeftStream.leftJoin( > rekeyedRightStream, > (left, right) -> left + "/" + right, > joinWindow > ); > {code} > ...and the eventual output I produce looks like this... > {code} > ... > 523 [523,left/null] > 524 [524,left/null, 524,left/524,right] > 525 [525,left/525,right] > 526 [526,left/null] > 527 [527,left/null] > 528 [528,left/528,right] > 529 [529,left/null] > 530 [530,left/null] > 531 [531,left/null, 531,left/531,right] > 532 [532,left/null] > 533 [533,left/null] > 534 [534,left/null, 534,left/534,right] > 535 [535,left/null] > 536 [536,left/null] > 537 [537,left/null, 537,left/537,right] > 538 [538,left/null] > 539 [539,left/null] > 540 [540,left/null] > 541 [541,left/null] > 542 [542,left/null] > 543 [543,left/null] > ... > {code} > ...where as, given the input data, I expect to see every row end with the two > values joined, rather than the right value being null. > Note that I understand it's expected that we initially get the left/null > values for many values since that's the expected semantics of kafka-streams > left join, at least until > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious > I've noticed that if I set a very large grace value on the join window the > problem is solved, but since the input I provide is not out of order I did > not expect to need to do that, and I'm weary of the resource requirements > doing so in practice on an application with a lot of volume. > My suspicion is that something is happening such that when one partition is > processed it causes the stream time to be pushed forward to the newest > message in that partition, meaning when the next partition is then examined > it is found to contain many records which are 'too old' compared to the > stream time. > I ran across this discussion thread which seems to cover the same issue > http://mail-archives.apache.org/mod_mbox/kafka-users/202002.mbox/%3cCAB0tB9p_vijMS18jWXBqp7TQozL__ANoo3=h57q6z3y4hzt...@mail.gmail.com%3e > and had a request from [~cadonna] for a reproduction case, so I'm hoping my > example above might make the issue ea
[jira] [Comment Edited] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17468912#comment-17468912 ] Eugen Dück edited comment on KAFKA-13289 at 1/4/22, 11:34 PM: -- [~mjsax] I tried setting `max.task.idle.ms` like so: {color:#00}streamsConfiguration{color}.put({color:#00}StreamsConfig{color}.{color:#871094}MAX_TASK_IDLE_MS_CONFIG{color}, {color:#067d17}"" {color}+ {color:#00}Long{color}.{color:#871094}MAX_VALUE{color}); However, the "Skipping record for expired segment." messages are logged nevertheless. I also tried lower values like 1 million, to no avail. This test runs on confluentinc/cp-kafka:5.4.3 (Kafka 2.8) was (Author: eugendueck): [~mjsax] I tried setting `max.task.idle.ms` like so: {color:#00}streamsConfiguration{color}.put({color:#00}StreamsConfig{color}.{color:#871094}MAX_TASK_IDLE_MS_CONFIG{color}, {color:#067d17}"" {color}+ {color:#00}Long{color}.{color:#871094}MAX_VALUE{color}); However, the "Skipping record for expired segment." messages are logged nevertheless. I also tried lower values like 1 million, to no avail. This test runs on Kafka 2.8.0 > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > --- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Matthew Sheppard >Priority: Minor > > When pushing bulk data through a kafka-steams app, I see it log the following > message many times... > {noformat} > WARN > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - > Skipping record for expired segment. > {noformat} > ...and data which I expect to have been joined through a leftJoin step > appears to be lost. > I've seen this in practice either when my application has been shut down for > a while and then is brought back up, or when I've used something like the > [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) > in an attempt to have the application reprocess past data. > I was able to reproduce this behaviour in isolation by generating 1000 > messages to two topics spaced an hour apart (with the original timestamps in > order), then having kafka streams select a key for them and try to leftJoin > the two rekeyed streams. > Self contained source code for that reproduction is available at > https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java > The actual kafka-streams topology in there looks like this. > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final KStream leftStream = > builder.stream(leftTopic); > final KStream rightStream = > builder.stream(rightTopic); > final KStream rekeyedLeftStream = leftStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > final KStream rekeyedRightStream = rightStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); > final KStream joined = rekeyedLeftStream.leftJoin( > rekeyedRightStream, > (left, right) -> left + "/" + right, > joinWindow > ); > {code} > ...and the eventual output I produce looks like this... > {code} > ... > 523 [523,left/null] > 524 [524,left/null, 524,left/524,right] > 525 [525,left/525,right] > 526 [526,left/null] > 527 [527,left/null] > 528 [528,left/528,right] > 529 [529,left/null] > 530 [530,left/null] > 531 [531,left/null, 531,left/531,right] > 532 [532,left/null] > 533 [533,left/null] > 534 [534,left/null, 534,left/534,right] > 535 [535,left/null] > 536 [536,left/null] > 537 [537,left/null, 537,left/537,right] > 538 [538,left/null] > 539 [539,left/null] > 540 [540,left/null] > 541 [541,left/null] > 542 [542,left/null] > 543 [543,left/null] > ... > {code} > ...where as, given the input data, I expect to see every row end with the two > values joined, rather than the right value being null. > Note that I understand it's expected that we initially get the left/null > values for many values since that's the expected semantics of kafka-streams > left join, at least until > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious > I've noticed that if I set a very large grace value on the join window the > problem
[GitHub] [kafka] vvcephei commented on a change in pull request #11624: KAFKA-13553: add PAPI KV store tests for IQv2
vvcephei commented on a change in pull request #11624: URL: https://github.com/apache/kafka/pull/11624#discussion_r778456441 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java ## @@ -365,109 +395,200 @@ public void beforeTest() { final Properties streamsConfig = streamsConfiguration( cache, log, -storeToTest.name() +storeToTest.name(), +kind ); final StreamsBuilder builder = new StreamsBuilder(); -if (supplier instanceof KeyValueBytesStoreSupplier) { -final Materialized> materialized = -Materialized.as((KeyValueBytesStoreSupplier) supplier); +if (Objects.equals(kind, "DSL") && supplier instanceof KeyValueBytesStoreSupplier) { +setUpKeyValueDSLTopology((KeyValueBytesStoreSupplier) supplier, builder); +} else if (Objects.equals(kind, "PAPI") && supplier instanceof KeyValueBytesStoreSupplier) { +setUpKeyValuePAPITopology((KeyValueBytesStoreSupplier) supplier, builder); +} else if (Objects.equals(kind, "DSL") && supplier instanceof WindowBytesStoreSupplier) { +setUpWindowDSLTopology((WindowBytesStoreSupplier) supplier, builder); +} else if (Objects.equals(kind, "PAPI") && supplier instanceof WindowBytesStoreSupplier) { +throw new AssumptionViolatedException("Case not implemented yet"); +} else if (Objects.equals(kind, "DSL") && supplier instanceof SessionBytesStoreSupplier) { +setUpSessionDSLTopology((SessionBytesStoreSupplier) supplier, builder); +} else if (Objects.equals(kind, "PAPI") && supplier instanceof SessionBytesStoreSupplier) { +throw new AssumptionViolatedException("Case not implemented yet"); +} else { +throw new AssertionError("Store supplier is an unrecognized type."); +} -if (cache) { -materialized.withCachingEnabled(); -} else { -materialized.withCachingDisabled(); -} +// Don't need to wait for running, since tests can use iqv2 to wait until they +// get a valid response. -if (log) { -materialized.withLoggingEnabled(Collections.emptyMap()); -} else { -materialized.withCachingDisabled(); -} +kafkaStreams = +IntegrationTestUtils.getStartedStreams( +streamsConfig, +builder, +true +); +} -if (storeToTest.global()) { -builder.globalTable( -INPUT_TOPIC_NAME, -Consumed.with(Serdes.Integer(), Serdes.Integer()), -materialized -); -} else { -builder.table( -INPUT_TOPIC_NAME, -Consumed.with(Serdes.Integer(), Serdes.Integer()), -materialized -); -} -} else if (supplier instanceof WindowBytesStoreSupplier) { -final Materialized> materialized = -Materialized.as((WindowBytesStoreSupplier) supplier); +private void setUpSessionDSLTopology(final SessionBytesStoreSupplier supplier, + final StreamsBuilder builder) { +final Materialized> materialized = +Materialized.as(supplier); -if (cache) { -materialized.withCachingEnabled(); -} else { -materialized.withCachingDisabled(); -} +if (cache) { +materialized.withCachingEnabled(); +} else { +materialized.withCachingDisabled(); +} -if (log) { -materialized.withLoggingEnabled(Collections.emptyMap()); -} else { -materialized.withCachingDisabled(); -} +if (log) { +materialized.withLoggingEnabled(Collections.emptyMap()); +} else { +materialized.withCachingDisabled(); Review comment: Oops! Thanks for the catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #11624: KAFKA-13553: add PAPI KV store tests for IQv2
vvcephei commented on pull request #11624: URL: https://github.com/apache/kafka/pull/11624#issuecomment-1005250957 Thanks, @guozhangwang ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics
guozhangwang commented on pull request #11460: URL: https://github.com/apache/kafka/pull/11460#issuecomment-1005257929 Thanks @RivenSun2 for the improvement and @showuon for the reviews, LGTM to clarify it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics
guozhangwang merged pull request #11460: URL: https://github.com/apache/kafka/pull/11460 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
guozhangwang commented on pull request #11424: URL: https://github.com/apache/kafka/pull/11424#issuecomment-1005259979 @vamossagar12 there are some compilation errors in jenkins, could you rebase from latest trunk and check what's the issue? Please ping me once that's resolved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on pull request #11641: MINOR: update the comment for Utils.atomicMoveWithFallback
ccding commented on pull request #11641: URL: https://github.com/apache/kafka/pull/11641#issuecomment-1005331649 cc @junrao -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13570) Fallback of unsupported versions of ApiVersionsRequest and RequestHeader?
[ https://issues.apache.org/jira/browse/KAFKA-13570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17468965#comment-17468965 ] dengziming commented on KAFKA-13570: I don't think it's a simple problem for a Server to properly handle a higher version RPC from a Client. So it's the responsibility of client to do it, most Kafka client will fallback to older version RPC on an UnsupportedVersionException, but it's not granted. > Fallback of unsupported versions of ApiVersionsRequest and RequestHeader? > - > > Key: KAFKA-13570 > URL: https://issues.apache.org/jira/browse/KAFKA-13570 > Project: Kafka > Issue Type: Improvement >Reporter: Fredrik Arvidsson >Priority: Minor > > I've gone through the protocol documentation and the source code, but I can't > find any explicit documentation stating how clients and brokers are handling > the scenario when the client sends higher versions of ApiVersionsRequest and > RequestHeader which the broker doesn't understand. I've seen hints in > discussions that the broker would fallback to returning version 0 of the > ApiVersionsResponse in this scenario. > Are there any documentation that explains these scenarios? -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17468913#comment-17468913 ] Eugen Dück edited comment on KAFKA-13289 at 1/5/22, 2:38 AM: - [~mjsax] I just retried with confluentinc/cp-kafka:7.0.1, i.e. Kafka 3.0, and max.task.idle.ms = Long.MAX. "Skipping record for expired segment." still gets logged. was (Author: eugendueck): [~mjsax] I just retried with confluentinc/cp-kafka:7.0.1, i.e. Kafka 3.0, and "Skipping record for expired segment." still gets logged. > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > --- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Matthew Sheppard >Priority: Minor > > When pushing bulk data through a kafka-steams app, I see it log the following > message many times... > {noformat} > WARN > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - > Skipping record for expired segment. > {noformat} > ...and data which I expect to have been joined through a leftJoin step > appears to be lost. > I've seen this in practice either when my application has been shut down for > a while and then is brought back up, or when I've used something like the > [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) > in an attempt to have the application reprocess past data. > I was able to reproduce this behaviour in isolation by generating 1000 > messages to two topics spaced an hour apart (with the original timestamps in > order), then having kafka streams select a key for them and try to leftJoin > the two rekeyed streams. > Self contained source code for that reproduction is available at > https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java > The actual kafka-streams topology in there looks like this. > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final KStream leftStream = > builder.stream(leftTopic); > final KStream rightStream = > builder.stream(rightTopic); > final KStream rekeyedLeftStream = leftStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > final KStream rekeyedRightStream = rightStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); > final KStream joined = rekeyedLeftStream.leftJoin( > rekeyedRightStream, > (left, right) -> left + "/" + right, > joinWindow > ); > {code} > ...and the eventual output I produce looks like this... > {code} > ... > 523 [523,left/null] > 524 [524,left/null, 524,left/524,right] > 525 [525,left/525,right] > 526 [526,left/null] > 527 [527,left/null] > 528 [528,left/528,right] > 529 [529,left/null] > 530 [530,left/null] > 531 [531,left/null, 531,left/531,right] > 532 [532,left/null] > 533 [533,left/null] > 534 [534,left/null, 534,left/534,right] > 535 [535,left/null] > 536 [536,left/null] > 537 [537,left/null, 537,left/537,right] > 538 [538,left/null] > 539 [539,left/null] > 540 [540,left/null] > 541 [541,left/null] > 542 [542,left/null] > 543 [543,left/null] > ... > {code} > ...where as, given the input data, I expect to see every row end with the two > values joined, rather than the right value being null. > Note that I understand it's expected that we initially get the left/null > values for many values since that's the expected semantics of kafka-streams > left join, at least until > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious > I've noticed that if I set a very large grace value on the join window the > problem is solved, but since the input I provide is not out of order I did > not expect to need to do that, and I'm weary of the resource requirements > doing so in practice on an application with a lot of volume. > My suspicion is that something is happening such that when one partition is > processed it causes the stream time to be pushed forward to the newest > message in that partition, meaning when the next partition is then examined > it is found to contain many records which are 'too old' compared to the > stream time. > I ran across this discussion thread which seems to cover the same issue > http://mail
[jira] [Commented] (KAFKA-13563) FindCoordinatorFuture never get cleared in non-group mode( consumer#assign)
[ https://issues.apache.org/jira/browse/KAFKA-13563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17469004#comment-17469004 ] Luke Chen commented on KAFKA-13563: --- [~guozhang] [~ableegoldman], you might want to check this issue, and the WIP PR: [https://github.com/apache/kafka/pull/11631] . Thank you. > FindCoordinatorFuture never get cleared in non-group mode( consumer#assign) > --- > > Key: KAFKA-13563 > URL: https://issues.apache.org/jira/browse/KAFKA-13563 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.7.1, 3.0.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Attachments: kafka.zip > > > In KAFKA-10793, we fix the race condition when lookup coordinator by clearing > the _findCoordinatorFuture_ when handling the result, rather than in the > listener callbacks. It works well under consumer group mode (i.e. > Consumer#subscribe), but we found when user is using non consumer group mode > (i.e. Consumer#assign) with group id provided (for offset commitment, so that > there will be consumerCoordinator created), the _findCoordinatorFuture_ will > never be cleared in some situations, and cause the offset committing keeps > getting NOT_COORDINATOR error. > > After KAFKA-10793, we clear the _findCoordinatorFuture_ in 2 places: > # heartbeat thread > # AbstractCoordinator#ensureCoordinatorReady > But in non consumer group mode with group id provided, there will be no > (1)heartbeat thread , and it only call > (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to > fetch committed offset position. That is, after 2nd lookupCoordinator call, > we have no chance to clear the _findCoordinatorFuture_ . > > To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear > the _findCoordinatorFuture_ in the future listener. So, I think we can fix > this issue by calling AbstractCoordinator#ensureCoordinatorReady when > coordinator unknown in non consumer group case, under each Consumer#poll. > > Reproduce steps: > > 1. Start a 3 Broker cluster with a Topic having Replicas=3. > 2. Start a Client with Producer and Consumer (with Consumer#assign(), not > subscribe, and provide a group id) communicating over the Topic. > 3. Stop the Broker that is acting as the Group Coordinator. > 4. Observe successful Rediscovery of new Group Coordinator. > 5. Restart the stopped Broker. > 6. Stop the Broker that became the new Group Coordinator at step 4. > 7. Observe "Rediscovery will be attempted" message but no "Discovered group > coordinator" message. > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13563) FindCoordinatorFuture never get cleared in non-group mode( consumer#assign)
[ https://issues.apache.org/jira/browse/KAFKA-13563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-13563: -- Affects Version/s: 3.0.0 2.7.1 > FindCoordinatorFuture never get cleared in non-group mode( consumer#assign) > --- > > Key: KAFKA-13563 > URL: https://issues.apache.org/jira/browse/KAFKA-13563 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.7.1, 3.0.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Attachments: kafka.zip > > > In KAFKA-10793, we fix the race condition when lookup coordinator by clearing > the _findCoordinatorFuture_ when handling the result, rather than in the > listener callbacks. It works well under consumer group mode (i.e. > Consumer#subscribe), but we found when user is using non consumer group mode > (i.e. Consumer#assign) with group id provided (for offset commitment, so that > there will be consumerCoordinator created), the _findCoordinatorFuture_ will > never be cleared in some situations, and cause the offset committing keeps > getting NOT_COORDINATOR error. > > After KAFKA-10793, we clear the _findCoordinatorFuture_ in 2 places: > # heartbeat thread > # AbstractCoordinator#ensureCoordinatorReady > But in non consumer group mode with group id provided, there will be no > (1)heartbeat thread , and it only call > (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to > fetch committed offset position. That is, after 2nd lookupCoordinator call, > we have no chance to clear the _findCoordinatorFuture_ . > > To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear > the _findCoordinatorFuture_ in the future listener. So, I think we can fix > this issue by calling AbstractCoordinator#ensureCoordinatorReady when > coordinator unknown in non consumer group case, under each Consumer#poll. > > Reproduce steps: > > 1. Start a 3 Broker cluster with a Topic having Replicas=3. > 2. Start a Client with Producer and Consumer (with Consumer#assign(), not > subscribe, and provide a group id) communicating over the Topic. > 3. Stop the Broker that is acting as the Group Coordinator. > 4. Observe successful Rediscovery of new Group Coordinator. > 5. Restart the stopped Broker. > 6. Stop the Broker that became the new Group Coordinator at step 4. > 7. Observe "Rediscovery will be attempted" message but no "Discovered group > coordinator" message. > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] vamossagar12 commented on pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
vamossagar12 commented on pull request #11424: URL: https://github.com/apache/kafka/pull/11424#issuecomment-1005361882 @guozhangwang , it's due to the usage of deprecated configs and gradle having a check against it. ``` [2022-01-04T23:35:30.575Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11424/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java:61: warning: [deprecation] CACHE_MAX_BYTES_BUFFERING_CONFIG in org.apache.kafka.streams.StreamsConfig has been deprecated [2022-01-04T23:35:30.575Z] .define(CACHE_MAX_BYTES_BUFFERING_CONFIG, [2022-01-04T23:35:30.575Z] ^ [2022-01-04T23:35:30.575Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11424/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java:56: warning: [deprecation] BUFFERED_RECORDS_PER_PARTITION_CONFIG in org.apache.kafka.streams.StreamsConfig has been deprecated [2022-01-04T23:35:30.575Z] .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG, [2022-01-04T23:35:30.575Z] ^ [2022-01-04T23:35:30.575Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11424/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java:115: warning: [deprecation] BUFFERED_RECORDS_PER_PARTITION_CONFIG in org.apache.kafka.streams.StreamsConfig has been deprecated [2022-01-04T23:35:30.575Z] if (isTopologyOverride(BUFFERED_RECORDS_PER_PARTITION_CONFIG, topologyOverrides)) { [2022-01-04T23:35:30.575Z]^ [2022-01-04T23:35:30.575Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11424/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java:116: warning: [deprecation] BUFFERED_RECORDS_PER_PARTITION_CONFIG in org.apache.kafka.streams.StreamsConfig has been deprecated [2022-01-04T23:35:30.575Z] maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); [2022-01-04T23:35:30.575Z] ^ [2022-01-04T23:35:30.575Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11424/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java:117: warning: [deprecation] BUFFERED_RECORDS_PER_PARTITION_CONFIG in org.apache.kafka.streams.StreamsConfig has been deprecated [2022-01-04T23:35:30.575Z] log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize); [2022-01-04T23:35:30.575Z] ^ [2022-01-04T23:35:30.575Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11424/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java:119: warning: [deprecation] BUFFERED_RECORDS_PER_PARTITION_CONFIG in org.apache.kafka.streams.StreamsConfig has been deprecated [2022-01-04T23:35:30.575Z] maxBufferedSize = globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) [2022-01-04T23:35:30.575Z] ^ [2022-01-04T23:35:30.575Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11424/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java:120: warning: [deprecation] BUFFERED_RECORDS_PER_PARTITION_CONFIG in org.apache.kafka.streams.StreamsConfig has been deprecated [2022-01-04T23:35:30.575Z] ? globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : -1; [2022-01-04T23:35:30.575Z] ^ [2022-01-04T23:35:30.575Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11424/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java:123: warning: [deprecation] CACHE_MAX_BYTES_BUFFERING_CONFIG in org.apache.kafka.streams.StreamsConfig has been deprecated [2022-01-04T23:35:30.575Z] if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { [2022-01-04T23:35:30.575Z]^ [2022-01-04T23:35:30.575Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11424/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java:124: warning: [deprecation] CACHE_MAX_BYTES_BUFFERING_CONFIG in org.apache.kafka.streams.StreamsConfig has been deprecated [2022-01-04T23:35:30.575Z] cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); [2022-01-04T23:35:30.575Z] ^
[GitHub] [kafka] mjsax opened a new pull request #11648: MINOR: add default-replication-factor to MockAdminClient
mjsax opened a new pull request #11648: URL: https://github.com/apache/kafka/pull/11648 MockAdminClient should add `default.replication.factor` to the config of each node. Otherwise, `describeConfigs()` does not return the default replication factor correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17469010#comment-17469010 ] Matthias J. Sax commented on KAFKA-13289: - You can reproduce with the code in the github repo linked above? Let me try to check it out and reproduce locally to see what's going on. Not sure how quickly I will find time though. – Thanks for your patients and collaboration! > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > --- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Matthew Sheppard >Priority: Minor > > When pushing bulk data through a kafka-steams app, I see it log the following > message many times... > {noformat} > WARN > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - > Skipping record for expired segment. > {noformat} > ...and data which I expect to have been joined through a leftJoin step > appears to be lost. > I've seen this in practice either when my application has been shut down for > a while and then is brought back up, or when I've used something like the > [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) > in an attempt to have the application reprocess past data. > I was able to reproduce this behaviour in isolation by generating 1000 > messages to two topics spaced an hour apart (with the original timestamps in > order), then having kafka streams select a key for them and try to leftJoin > the two rekeyed streams. > Self contained source code for that reproduction is available at > https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java > The actual kafka-streams topology in there looks like this. > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final KStream leftStream = > builder.stream(leftTopic); > final KStream rightStream = > builder.stream(rightTopic); > final KStream rekeyedLeftStream = leftStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > final KStream rekeyedRightStream = rightStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); > final KStream joined = rekeyedLeftStream.leftJoin( > rekeyedRightStream, > (left, right) -> left + "/" + right, > joinWindow > ); > {code} > ...and the eventual output I produce looks like this... > {code} > ... > 523 [523,left/null] > 524 [524,left/null, 524,left/524,right] > 525 [525,left/525,right] > 526 [526,left/null] > 527 [527,left/null] > 528 [528,left/528,right] > 529 [529,left/null] > 530 [530,left/null] > 531 [531,left/null, 531,left/531,right] > 532 [532,left/null] > 533 [533,left/null] > 534 [534,left/null, 534,left/534,right] > 535 [535,left/null] > 536 [536,left/null] > 537 [537,left/null, 537,left/537,right] > 538 [538,left/null] > 539 [539,left/null] > 540 [540,left/null] > 541 [541,left/null] > 542 [542,left/null] > 543 [543,left/null] > ... > {code} > ...where as, given the input data, I expect to see every row end with the two > values joined, rather than the right value being null. > Note that I understand it's expected that we initially get the left/null > values for many values since that's the expected semantics of kafka-streams > left join, at least until > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious > I've noticed that if I set a very large grace value on the join window the > problem is solved, but since the input I provide is not out of order I did > not expect to need to do that, and I'm weary of the resource requirements > doing so in practice on an application with a lot of volume. > My suspicion is that something is happening such that when one partition is > processed it causes the stream time to be pushed forward to the newest > message in that partition, meaning when the next partition is then examined > it is found to contain many records which are 'too old' compared to the > stream time. > I ran across this discussion thread which seems to cover the same issue > http://mail-archives.apache.org/mod_mbox/kafka-users/202002.mbox/%3cCAB0tB9p_vijMS18jWXBqp7TQozL__ANoo3=h57q6z3y4hzt...@mail.gmail.com%3e > and had a req
[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17469016#comment-17469016 ] Eugen Dück commented on KAFKA-13289: Yes, if you use Matthew's repo, which gives the "Skipping record" logs: [https://github.com/mattsheppard/ins14809/|https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java] Setting max.task.idle.ms is straightforward, and so is using confluentinc/cp-kafka:7.0.1 > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > --- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Matthew Sheppard >Priority: Minor > > When pushing bulk data through a kafka-steams app, I see it log the following > message many times... > {noformat} > WARN > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - > Skipping record for expired segment. > {noformat} > ...and data which I expect to have been joined through a leftJoin step > appears to be lost. > I've seen this in practice either when my application has been shut down for > a while and then is brought back up, or when I've used something like the > [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) > in an attempt to have the application reprocess past data. > I was able to reproduce this behaviour in isolation by generating 1000 > messages to two topics spaced an hour apart (with the original timestamps in > order), then having kafka streams select a key for them and try to leftJoin > the two rekeyed streams. > Self contained source code for that reproduction is available at > https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java > The actual kafka-streams topology in there looks like this. > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final KStream leftStream = > builder.stream(leftTopic); > final KStream rightStream = > builder.stream(rightTopic); > final KStream rekeyedLeftStream = leftStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > final KStream rekeyedRightStream = rightStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); > final KStream joined = rekeyedLeftStream.leftJoin( > rekeyedRightStream, > (left, right) -> left + "/" + right, > joinWindow > ); > {code} > ...and the eventual output I produce looks like this... > {code} > ... > 523 [523,left/null] > 524 [524,left/null, 524,left/524,right] > 525 [525,left/525,right] > 526 [526,left/null] > 527 [527,left/null] > 528 [528,left/528,right] > 529 [529,left/null] > 530 [530,left/null] > 531 [531,left/null, 531,left/531,right] > 532 [532,left/null] > 533 [533,left/null] > 534 [534,left/null, 534,left/534,right] > 535 [535,left/null] > 536 [536,left/null] > 537 [537,left/null, 537,left/537,right] > 538 [538,left/null] > 539 [539,left/null] > 540 [540,left/null] > 541 [541,left/null] > 542 [542,left/null] > 543 [543,left/null] > ... > {code} > ...where as, given the input data, I expect to see every row end with the two > values joined, rather than the right value being null. > Note that I understand it's expected that we initially get the left/null > values for many values since that's the expected semantics of kafka-streams > left join, at least until > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious > I've noticed that if I set a very large grace value on the join window the > problem is solved, but since the input I provide is not out of order I did > not expect to need to do that, and I'm weary of the resource requirements > doing so in practice on an application with a lot of volume. > My suspicion is that something is happening such that when one partition is > processed it causes the stream time to be pushed forward to the newest > message in that partition, meaning when the next partition is then examined > it is found to contain many records which are 'too old' compared to the > stream time. > I ran across this discussion thread which seems to cover the same issue > http://mail-archives.apache.org/mod_mbox/kafka-users/202002.mbox/%3cCAB0tB9p_vijMS18jWXBqp7TQoz
[GitHub] [kafka] guozhangwang commented on pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
guozhangwang commented on pull request #11424: URL: https://github.com/apache/kafka/pull/11424#issuecomment-1005381382 @vamossagar12 Note that `TopologyConfig` is an internal class for now, and hence should not use deprecated values, I think we need to update this class to directly use the new config and remove the old ones --- since it is internal, it would not be breaking any compatibilities. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17469016#comment-17469016 ] Eugen Dück edited comment on KAFKA-13289 at 1/5/22, 4:53 AM: - That would be awesome! > You can reproduce with the code in the github repo linked above? Yes, if you use Matthew's repo, which gives the "Skipping record" logs: [https://github.com/mattsheppard/ins14809/|https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java] Setting max.task.idle.ms is straightforward, and so is the change to using confluentinc/cp-kafka:7.0.1 was (Author: eugendueck): Yes, if you use Matthew's repo, which gives the "Skipping record" logs: [https://github.com/mattsheppard/ins14809/|https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java] Setting max.task.idle.ms is straightforward, and so is using confluentinc/cp-kafka:7.0.1 > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > --- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Matthew Sheppard >Priority: Minor > > When pushing bulk data through a kafka-steams app, I see it log the following > message many times... > {noformat} > WARN > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - > Skipping record for expired segment. > {noformat} > ...and data which I expect to have been joined through a leftJoin step > appears to be lost. > I've seen this in practice either when my application has been shut down for > a while and then is brought back up, or when I've used something like the > [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) > in an attempt to have the application reprocess past data. > I was able to reproduce this behaviour in isolation by generating 1000 > messages to two topics spaced an hour apart (with the original timestamps in > order), then having kafka streams select a key for them and try to leftJoin > the two rekeyed streams. > Self contained source code for that reproduction is available at > https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java > The actual kafka-streams topology in there looks like this. > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final KStream leftStream = > builder.stream(leftTopic); > final KStream rightStream = > builder.stream(rightTopic); > final KStream rekeyedLeftStream = leftStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > final KStream rekeyedRightStream = rightStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); > final KStream joined = rekeyedLeftStream.leftJoin( > rekeyedRightStream, > (left, right) -> left + "/" + right, > joinWindow > ); > {code} > ...and the eventual output I produce looks like this... > {code} > ... > 523 [523,left/null] > 524 [524,left/null, 524,left/524,right] > 525 [525,left/525,right] > 526 [526,left/null] > 527 [527,left/null] > 528 [528,left/528,right] > 529 [529,left/null] > 530 [530,left/null] > 531 [531,left/null, 531,left/531,right] > 532 [532,left/null] > 533 [533,left/null] > 534 [534,left/null, 534,left/534,right] > 535 [535,left/null] > 536 [536,left/null] > 537 [537,left/null, 537,left/537,right] > 538 [538,left/null] > 539 [539,left/null] > 540 [540,left/null] > 541 [541,left/null] > 542 [542,left/null] > 543 [543,left/null] > ... > {code} > ...where as, given the input data, I expect to see every row end with the two > values joined, rather than the right value being null. > Note that I understand it's expected that we initially get the left/null > values for many values since that's the expected semantics of kafka-streams > left join, at least until > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious > I've noticed that if I set a very large grace value on the join window the > problem is solved, but since the input I provide is not out of order I did > not expect to need to do that, and I'm weary of the resource requirements > doing so in practice on an application with a lot of volume. > My susp
[GitHub] [kafka] guozhangwang commented on pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…
guozhangwang commented on pull request #11340: URL: https://github.com/apache/kafka/pull/11340#issuecomment-1005381862 Retriggering jenkins tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11631: [WIP] KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode
guozhangwang commented on a change in pull request #11631: URL: https://github.com/apache/kafka/pull/11631#discussion_r778549345 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -517,15 +517,13 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) { } } } else { -// For manually assigned partitions, if there are no ready nodes, await metadata. +// For manually assigned partitions, if coordinator is unknown, make sure we lookup one and await metadata. // If connections to all nodes fail, wakeups triggered while attempting to send fetch // requests result in polls returning immediately, causing a tight loop of polls. Without // the wakeup, poll() with no channels would block for the timeout, delaying re-connection. -// awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop. -// When group management is used, metadata wait is already performed for this scenario as -// coordinator is unknown, hence this check is not required. -if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) { -client.awaitMetadataUpdate(timer); +// awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop. +if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) { +return false; Review comment: I'm wondering if this fix is a bit overkill, e.g. for those consumers who do not even set the group ID and do not rely on the coordinator for anything, the `coordinatorUnknown()` would always return true while `ensureCoordinatorReady` would send a discover coordinator request with `null` group id. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13563) FindCoordinatorFuture never get cleared in non-group mode( consumer#assign)
[ https://issues.apache.org/jira/browse/KAFKA-13563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17469023#comment-17469023 ] Guozhang Wang commented on KAFKA-13563: --- Hi [~showuon] thanks for the report, I've reviewed the PR and left some comment. > FindCoordinatorFuture never get cleared in non-group mode( consumer#assign) > --- > > Key: KAFKA-13563 > URL: https://issues.apache.org/jira/browse/KAFKA-13563 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.7.1, 3.0.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Attachments: kafka.zip > > > In KAFKA-10793, we fix the race condition when lookup coordinator by clearing > the _findCoordinatorFuture_ when handling the result, rather than in the > listener callbacks. It works well under consumer group mode (i.e. > Consumer#subscribe), but we found when user is using non consumer group mode > (i.e. Consumer#assign) with group id provided (for offset commitment, so that > there will be consumerCoordinator created), the _findCoordinatorFuture_ will > never be cleared in some situations, and cause the offset committing keeps > getting NOT_COORDINATOR error. > > After KAFKA-10793, we clear the _findCoordinatorFuture_ in 2 places: > # heartbeat thread > # AbstractCoordinator#ensureCoordinatorReady > But in non consumer group mode with group id provided, there will be no > (1)heartbeat thread , and it only call > (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to > fetch committed offset position. That is, after 2nd lookupCoordinator call, > we have no chance to clear the _findCoordinatorFuture_ . > > To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear > the _findCoordinatorFuture_ in the future listener. So, I think we can fix > this issue by calling AbstractCoordinator#ensureCoordinatorReady when > coordinator unknown in non consumer group case, under each Consumer#poll. > > Reproduce steps: > > 1. Start a 3 Broker cluster with a Topic having Replicas=3. > 2. Start a Client with Producer and Consumer (with Consumer#assign(), not > subscribe, and provide a group id) communicating over the Topic. > 3. Stop the Broker that is acting as the Group Coordinator. > 4. Observe successful Rediscovery of new Group Coordinator. > 5. Restart the stopped Broker. > 6. Stop the Broker that became the new Group Coordinator at step 4. > 7. Observe "Rediscovery will be attempted" message but no "Discovered group > coordinator" message. > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] guozhangwang commented on pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580
guozhangwang commented on pull request #11627: URL: https://github.com/apache/kafka/pull/11627#issuecomment-1005389524 Thanks @showuon , I've made a quick pass and it looks promising to me. I'm pinging @dajac and @kirktrue who would be the best reviewers for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11631: [WIP] KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode
showuon commented on pull request #11631: URL: https://github.com/apache/kafka/pull/11631#issuecomment-1005407357 @guozhangwang , answer your question below: > I'm wondering if this fix is a bit overkill, e.g. for those consumers who do not even set the group ID and do not rely on the coordinator for anything, the coordinatorUnknown() would always return true while ensureCoordinatorReady would send a discover coordinator request with null group id. No, if consumer doesn't provide group id config value (default is null), we won't create `consumerCoordinator` in the consumer. That is, if the group id is provided, it's either with consumer group management (via Consumer#subscribe), or with manual assignment (via Consumer#assign) with offset commit enabled. REF: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L775 And before the PR, we will wait for the metadata update if no nodes available, to avoid busy loop when in non consumer group mode. ```java if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) { client.awaitMetadataUpdate(timer); ``` After the change, we did as the group management did, to call `ensureCoordinatorReady` when coordinator unknown. And in `ensureCoordinatorReady`. This way, we can also make sure to handle the `FindCoordinatorFuture` well (and clear it) inside `ensureCoordinatorReady`. Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon edited a comment on pull request #11631: [WIP] KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode
showuon edited a comment on pull request #11631: URL: https://github.com/apache/kafka/pull/11631#issuecomment-1005407357 @guozhangwang , thanks for your comment. Answering your question below: > I'm wondering if this fix is a bit overkill, e.g. for those consumers who do not even set the group ID and do not rely on the coordinator for anything, the coordinatorUnknown() would always return true while ensureCoordinatorReady would send a discover coordinator request with null group id. No, if consumer doesn't provide group id config value (default is null), we won't create `consumerCoordinator` in the consumer. That is, if the group id is provided, it's either with consumer group management (via Consumer#subscribe), or with manual assignment (via Consumer#assign) with offset commit enabled. REF: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L775 And before the PR, we will wait for the metadata update if no nodes available, to avoid busy loop when in non consumer group mode. ```java if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) { client.awaitMetadataUpdate(timer); ``` After the change, we did as the group management did, to call `ensureCoordinatorReady` when coordinator unknown. And in `ensureCoordinatorReady`. This way, we can also make sure to handle the `FindCoordinatorFuture` well (and clear it) inside `ensureCoordinatorReady`. Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13422) Even if the correct username and password are configured, when ClientBroker or KafkaClient tries to establish a SASL connection to ServerBroker, an exception is thrown:
[ https://issues.apache.org/jira/browse/KAFKA-13422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] RivenSun updated KAFKA-13422: - Reviewer: Guozhang Wang (was: Luke Chen) > Even if the correct username and password are configured, when ClientBroker > or KafkaClient tries to establish a SASL connection to ServerBroker, an > exception is thrown: (Authentication failed: Invalid username or password) > -- > > Key: KAFKA-13422 > URL: https://issues.apache.org/jira/browse/KAFKA-13422 > Project: Kafka > Issue Type: Bug > Components: clients, core >Affects Versions: 2.7.1, 3.0.0 >Reporter: RivenSun >Priority: Major > Attachments: CustomerAuthCallbackHandler.java, > LoginContext_login_debug.png, SaslClientCallbackHandler_handle_debug.png > > > > h1. Foreword: > When deploying a Kafka cluster with a higher version (2.7.1), I encountered > an exception of communication identity authentication failure between > brokers. In the current latest version 3.0.0, this problem can also be > reproduced. > h1. Problem recurring: > h2. 1)broker Version is 3.0.0 > h3. The content of kafka_server_jaas.conf of each broker is exactly the same, > the content is as follows: > > > {code:java} > KafkaServer { > org.apache.kafka.common.security.plain.PlainLoginModule required > username="admin" > password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS" > user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS" > user_alice="alice"; > org.apache.kafka.common.security.scram.ScramLoginModule required > username="admin_scram" > password="admin_scram_password"; > > }; > {code} > > > h3. broker server.properties: > One of the broker configuration files is provided, and the content of the > configuration files of other brokers is only different from the localPublicIp > of advertised.listeners. > > {code:java} > broker.id=1 > broker.rack=us-east-1a > advertised.listeners=SASL_PLAINTEXT://localPublicIp:9779,SASL_SSL://localPublicIp:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://localPublicIp:9669 > log.dirs=/asyncmq/kafka/data_1,/asyncmq/kafka/data_2 > zookeeper.connect=*** > listeners=SASL_PLAINTEXT://:9779,SASL_SSL://:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://:9669 > listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,PLAIN_PLUGIN_SSL:SASL_SSL > listener.name.plain_plugin_ssl.plain.sasl.server.callback.handler.class=org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler > #ssl config > ssl.keystore.password=*** > ssl.key.password=*** > ssl.truststore.password=*** > ssl.keystore.location=*** > ssl.truststore.location=*** > ssl.client.auth=none > ssl.endpoint.identification.algorithm= > #broker communicate config > #security.inter.broker.protocol=SASL_PLAINTEXT > inter.broker.listener.name=INTERNAL_SSL > sasl.mechanism.inter.broker.protocol=PLAIN > #sasl authentication config > sasl.kerberos.service.name=kafka > sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI > delegation.token.master.key=*** > delegation.token.expiry.time.ms=8640 > delegation.token.max.lifetime.ms=31536 > {code} > > > Then start all brokers at the same time. Each broker has actually been > started successfully, but when establishing a connection between the > controller node and all brokers, the identity authentication has always > failed. The connection between brokers cannot be established normally, > causing the entire Kafka cluster to be unable to provide external services. > h3. The server log keeps printing abnormally like crazy: > The real ip sensitive information of the broker in the log, I use ** > instead of here > > {code:java} > [2021-10-29 14:16:19,831] INFO [SocketServer listenerType=ZK_BROKER, > nodeId=3] Started socket server acceptors and processors > (kafka.network.SocketServer) > [2021-10-29 14:16:19,836] INFO Kafka version: 3.0.0 > (org.apache.kafka.common.utils.AppInfoParser) > [2021-10-29 14:16:19,836] INFO Kafka commitId: 8cb0a5e9d3441962 > (org.apache.kafka.common.utils.AppInfoParser) > [2021-10-29 14:16:19,836] INFO Kafka startTimeMs: 1635516979831 > (org.apache.kafka.common.utils.AppInfoParser) > [2021-10-29 14:16:19,837] INFO [KafkaServer id=3] started > (kafka.server.KafkaServer) > [2021-10-29 14:16:20,249] INFO [SocketServer listenerType=ZK_BROKER, > nodeId=3] Failed authentication with /** (Authentication failed: Invalid > username or password) (org.apache.kafka.common.network.Selector) > [2021-10-29 14:16:20,680] INFO [SocketServer listenerType=ZK_BROKER, > nodeId=3] Failed authentication with /** (A
[jira] [Commented] (KAFKA-13422) Even if the correct username and password are configured, when ClientBroker or KafkaClient tries to establish a SASL connection to ServerBroker, an exception is thrown
[ https://issues.apache.org/jira/browse/KAFKA-13422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17469084#comment-17469084 ] RivenSun commented on KAFKA-13422: -- Hi [~guozhang] [~rsivaram] [~ijuma] I think this issue is a problem worth discussing. Can you give any suggestions? Thanks a lot. > Even if the correct username and password are configured, when ClientBroker > or KafkaClient tries to establish a SASL connection to ServerBroker, an > exception is thrown: (Authentication failed: Invalid username or password) > -- > > Key: KAFKA-13422 > URL: https://issues.apache.org/jira/browse/KAFKA-13422 > Project: Kafka > Issue Type: Bug > Components: clients, core >Affects Versions: 2.7.1, 3.0.0 >Reporter: RivenSun >Priority: Major > Attachments: CustomerAuthCallbackHandler.java, > LoginContext_login_debug.png, SaslClientCallbackHandler_handle_debug.png > > > > h1. Foreword: > When deploying a Kafka cluster with a higher version (2.7.1), I encountered > an exception of communication identity authentication failure between > brokers. In the current latest version 3.0.0, this problem can also be > reproduced. > h1. Problem recurring: > h2. 1)broker Version is 3.0.0 > h3. The content of kafka_server_jaas.conf of each broker is exactly the same, > the content is as follows: > > > {code:java} > KafkaServer { > org.apache.kafka.common.security.plain.PlainLoginModule required > username="admin" > password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS" > user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS" > user_alice="alice"; > org.apache.kafka.common.security.scram.ScramLoginModule required > username="admin_scram" > password="admin_scram_password"; > > }; > {code} > > > h3. broker server.properties: > One of the broker configuration files is provided, and the content of the > configuration files of other brokers is only different from the localPublicIp > of advertised.listeners. > > {code:java} > broker.id=1 > broker.rack=us-east-1a > advertised.listeners=SASL_PLAINTEXT://localPublicIp:9779,SASL_SSL://localPublicIp:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://localPublicIp:9669 > log.dirs=/asyncmq/kafka/data_1,/asyncmq/kafka/data_2 > zookeeper.connect=*** > listeners=SASL_PLAINTEXT://:9779,SASL_SSL://:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://:9669 > listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,PLAIN_PLUGIN_SSL:SASL_SSL > listener.name.plain_plugin_ssl.plain.sasl.server.callback.handler.class=org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler > #ssl config > ssl.keystore.password=*** > ssl.key.password=*** > ssl.truststore.password=*** > ssl.keystore.location=*** > ssl.truststore.location=*** > ssl.client.auth=none > ssl.endpoint.identification.algorithm= > #broker communicate config > #security.inter.broker.protocol=SASL_PLAINTEXT > inter.broker.listener.name=INTERNAL_SSL > sasl.mechanism.inter.broker.protocol=PLAIN > #sasl authentication config > sasl.kerberos.service.name=kafka > sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI > delegation.token.master.key=*** > delegation.token.expiry.time.ms=8640 > delegation.token.max.lifetime.ms=31536 > {code} > > > Then start all brokers at the same time. Each broker has actually been > started successfully, but when establishing a connection between the > controller node and all brokers, the identity authentication has always > failed. The connection between brokers cannot be established normally, > causing the entire Kafka cluster to be unable to provide external services. > h3. The server log keeps printing abnormally like crazy: > The real ip sensitive information of the broker in the log, I use ** > instead of here > > {code:java} > [2021-10-29 14:16:19,831] INFO [SocketServer listenerType=ZK_BROKER, > nodeId=3] Started socket server acceptors and processors > (kafka.network.SocketServer) > [2021-10-29 14:16:19,836] INFO Kafka version: 3.0.0 > (org.apache.kafka.common.utils.AppInfoParser) > [2021-10-29 14:16:19,836] INFO Kafka commitId: 8cb0a5e9d3441962 > (org.apache.kafka.common.utils.AppInfoParser) > [2021-10-29 14:16:19,836] INFO Kafka startTimeMs: 1635516979831 > (org.apache.kafka.common.utils.AppInfoParser) > [2021-10-29 14:16:19,837] INFO [KafkaServer id=3] started > (kafka.server.KafkaServer) > [2021-10-29 14:16:20,249] INFO [SocketServer listenerType=ZK_BROKER, > nodeId=3] Failed authentication with /** (Authentication failed: Invalid > username or password) (org.apache.kafka.c