[GitHub] [kafka] GOODBOY008 commented on pull request #11644: KAFKA-13433:JsonConverter's method convertToJson when field is optional with default value and value is null, return default value.

2022-01-04 Thread GitBox


GOODBOY008 commented on pull request #11644:
URL: https://github.com/apache/kafka/pull/11644#issuecomment-1004601251


   @ewencp Please cc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #11642: MINOR: Improve Connect docs

2022-01-04 Thread GitBox


mimaison commented on a change in pull request #11642:
URL: https://github.com/apache/kafka/pull/11642#discussion_r777904250



##
File path: docs/connect.html
##
@@ -41,7 +41,7 @@ Running 
Kafka ConnectIn standalone mode all work is performed in a single process. This 
configuration is simpler to setup and get started with and may be useful in 
situations where only one worker makes sense (e.g. collecting log files), but 
it does not benefit from some of the features of Kafka Connect such as fault 
tolerance. You can start a standalone process with the following command:
 
 
-> bin/connect-standalone.sh config/connect-standalone.properties 
connector1.properties [connector2.properties ...]
+> bin/connect-standalone.sh config/connect-standalone.properties 
connector1.properties [connector2.properties ...]
 

Review comment:
   Good point! I've pushed an update




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] richard-axual commented on a change in pull request #11535: KAFKA-13476: Increase resilience timestamp decoding Kafka Streams

2022-01-04 Thread GitBox


richard-axual commented on a change in pull request #11535:
URL: https://github.com/apache/kafka/pull/11535#discussion_r777961628



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -1097,15 +1097,20 @@ long decodeTimestamp(final String encryptedString) {
 if (encryptedString.isEmpty()) {
 return RecordQueue.UNKNOWN;
 }
-final ByteBuffer buffer = 
ByteBuffer.wrap(Base64.getDecoder().decode(encryptedString));
-final byte version = buffer.get();
-switch (version) {
-case LATEST_MAGIC_BYTE:
-return buffer.getLong();
-default:
-log.warn("Unsupported offset metadata version found. Supported 
version {}. Found version {}.",
- LATEST_MAGIC_BYTE, version);
-return RecordQueue.UNKNOWN;
+try {
+final ByteBuffer buffer = 
ByteBuffer.wrap(Base64.getDecoder().decode(encryptedString));
+final byte version = buffer.get();
+switch (version) {
+case LATEST_MAGIC_BYTE:
+return buffer.getLong();
+default:
+log.warn("Unsupported offset metadata version found. 
Supported version {}. Found version {}.",
+LATEST_MAGIC_BYTE, version);
+return RecordQueue.UNKNOWN;
+}
+} catch (final IllegalArgumentException argumentException) {

Review comment:
   @mjsax I've updated the implementation, can you take a look again?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #11564: MINOR: improve consoleProducer option description

2022-01-04 Thread GitBox


mimaison merged pull request #11564:
URL: https://github.com/apache/kafka/pull/11564


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #11647: KAFKA-13567: adminClient exponential backoff implementation for KIP-580

2022-01-04 Thread GitBox


showuon opened a new pull request #11647:
URL: https://github.com/apache/kafka/pull/11647


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mdshadabin commented on pull request #10056: KAFKA-12293: remove JCenter and Bintray repositories mentions out of build gradle scripts (sunset is announced for those repositories)

2022-01-04 Thread GitBox


mdshadabin commented on pull request #10056:
URL: https://github.com/apache/kafka/pull/10056#issuecomment-1004827722


   > 
   
   i still see issue with kafka 2.8.0 
   
   few days back i  have downloaded kafka 2.8.0 source code (from 
https://kafka.apache.org/downloads ) and build binaries (following 
https://github.com/apache/kafka/tree/2.8) . Although build is successful after 
changing gradle plugin url in gradle/buildscript.gradle   (Since url 
'https://dl.bintray.com/content/netflixoss/external-gradle-plugins/'  is down i 
have changed to url 'https://plugins.gradle.org/m2/' )  but unit
   
[index.html.zip](https://github.com/apache/kafka/files/7808015/index.html.zip)
test is falling. I have attached unit test result.
   
   
   My env details:
   
   Gradle 6.8.3
   
   java version "11.0.13" 2021-10-19 LTS
   Java(TM) SE Runtime Environment 18.9 (build 11.0.13+10-LTS-370)
   Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.13+10-LTS-370, mixed mode)
   
   Scala code runner version 2.13.0 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lhunyady commented on pull request #11333: KAFKA-13306: Null connector config value passes validation, but fails creation

2022-01-04 Thread GitBox


lhunyady commented on pull request #11333:
URL: https://github.com/apache/kafka/pull/11333#issuecomment-1004848659


   Thank you @C0urante, I have updated the pull request with your valuable 
suggestions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13574) NotLeaderOrFollowerException thrown for a successful send

2022-01-04 Thread Kyle Kingsbury (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17468697#comment-17468697
 ] 

Kyle Kingsbury commented on KAFKA-13574:


> NOT_LEADER_OR_FOLLOWER is considered an indefinite error

Aha! In that case this isn't a bug at all, and we can close it! :)

Is there documentation for which errors are definite vs indefinite? I've been 
relying on the javadocs and asking kafka users how they interpret these errors, 
and that may have led me astray here.

> NotLeaderOrFollowerException thrown for a successful send
> -
>
> Key: KAFKA-13574
> URL: https://issues.apache.org/jira/browse/KAFKA-13574
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
> Environment: openjdk version "11.0.13" 2021-10-19
>Reporter: Kyle Kingsbury
>Priority: Minor
>  Labels: error-handling
>
> With org.apache.kafka/kafka-clients 3.0.0, under rare circumstances involving 
> multiple node and network failures, I've observed a call to `producer.send()` 
> throw `NotLeaderOrFollowerException` for a message which later appears in 
> `consumer.poll()` return values.
> I don't have a reliable repro case for this yet, but the case I hit involved 
> retries=1000, acks=all, and idempotence enabled. I suspect what might be 
> happening here is that an initial attempt to send the message makes it to the 
> server and is committed, but the acknowledgement is lost e.g. due to timeout; 
> the Kafka producer then automatically retries the send attempt, and on that 
> retry hits a NotLeaderOrFollowerException, which is thrown back to the 
> caller. If we interpret NotLeaderOrFollowerException as a definite failure, 
> then this would constitute an aborted read.
> I've seen issues like this in a number of databases around client or 
> server-internal retry mechanisms, and I think the thing to do is: rather than 
> throwing the most *recent* error, throw the {*}most indefinite{*}. That way 
> clients know that their request may have actually succeeded, and they won't 
> (e.g.) attempt to re-submit a non-idempotent request again.
> As a side note: is there... perhaps documentation on which errors in Kafka 
> are supposed to be definite vs indefinite? NotLeaderOrFollowerException is a 
> subclass of RetriableException, but it looks like RetriableException is more 
> about transient vs permanent errors than whether it's safe to retry.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] liuchang0520 commented on pull request #11641: MINOR: update the comment for Utils.atomicMoveWithFallback

2022-01-04 Thread GitBox


liuchang0520 commented on pull request #11641:
URL: https://github.com/apache/kafka/pull/11641#issuecomment-1004972135


   Hi @showuon ,  can you help me to merge this PR? I don't have write access.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11609: KAFKA-12648: fixes for query APIs and blocking calls

2022-01-04 Thread GitBox


guozhangwang commented on a change in pull request #11609:
URL: https://github.com/apache/kafka/pull/11609#discussion_r778297858



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
##
@@ -296,9 +297,10 @@ private boolean hasPartitionsForAnyTopics(final 
List topicNames, final S
 }
 
 private Set getStoresOnHost(final Map> 
storeToSourceTopics,
-final Set sourceTopicPartitions, final String 
topologyName) {
+final Set 
sourceTopicPartitions,
+final String topologyName) {
 final InternalTopologyBuilder builder = 
topologyMetadata.lookupBuilderForNamedTopology(topologyName);
-final Set sourceTopicNames = builder.sourceTopicNames();
+final Collection sourceTopicNames = 
builder.sourceTopicCollection();

Review comment:
   Ack! Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11624: KAFKA-13553: add PAPI KV store tests for IQv2

2022-01-04 Thread GitBox


guozhangwang commented on a change in pull request #11624:
URL: https://github.com/apache/kafka/pull/11624#discussion_r778303747



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##
@@ -365,109 +395,200 @@ public void beforeTest() {
 final Properties streamsConfig = streamsConfiguration(
 cache,
 log,
-storeToTest.name()
+storeToTest.name(),
+kind
 );
 
 final StreamsBuilder builder = new StreamsBuilder();
-if (supplier instanceof KeyValueBytesStoreSupplier) {
-final Materialized> 
materialized =
-Materialized.as((KeyValueBytesStoreSupplier) supplier);
+if (Objects.equals(kind, "DSL") && supplier instanceof 
KeyValueBytesStoreSupplier) {
+setUpKeyValueDSLTopology((KeyValueBytesStoreSupplier) supplier, 
builder);
+} else if (Objects.equals(kind, "PAPI") && supplier instanceof 
KeyValueBytesStoreSupplier) {
+setUpKeyValuePAPITopology((KeyValueBytesStoreSupplier) supplier, 
builder);
+} else if (Objects.equals(kind, "DSL") && supplier instanceof 
WindowBytesStoreSupplier) {
+setUpWindowDSLTopology((WindowBytesStoreSupplier) supplier, 
builder);
+} else if (Objects.equals(kind, "PAPI") && supplier instanceof 
WindowBytesStoreSupplier) {
+throw new AssumptionViolatedException("Case not implemented yet");
+} else if (Objects.equals(kind, "DSL") && supplier instanceof 
SessionBytesStoreSupplier) {
+setUpSessionDSLTopology((SessionBytesStoreSupplier) supplier, 
builder);
+} else if (Objects.equals(kind, "PAPI") && supplier instanceof 
SessionBytesStoreSupplier) {
+throw new AssumptionViolatedException("Case not implemented yet");
+} else {
+throw new AssertionError("Store supplier is an unrecognized 
type.");
+}
 
-if (cache) {
-materialized.withCachingEnabled();
-} else {
-materialized.withCachingDisabled();
-}
+// Don't need to wait for running, since tests can use iqv2 to wait 
until they
+// get a valid response.
 
-if (log) {
-materialized.withLoggingEnabled(Collections.emptyMap());
-} else {
-materialized.withCachingDisabled();
-}
+kafkaStreams =
+IntegrationTestUtils.getStartedStreams(
+streamsConfig,
+builder,
+true
+);
+}
 
-if (storeToTest.global()) {
-builder.globalTable(
-INPUT_TOPIC_NAME,
-Consumed.with(Serdes.Integer(), Serdes.Integer()),
-materialized
-);
-} else {
-builder.table(
-INPUT_TOPIC_NAME,
-Consumed.with(Serdes.Integer(), Serdes.Integer()),
-materialized
-);
-}
-} else if (supplier instanceof WindowBytesStoreSupplier) {
-final Materialized> 
materialized =
-Materialized.as((WindowBytesStoreSupplier) supplier);
+private void setUpSessionDSLTopology(final SessionBytesStoreSupplier 
supplier,
+ final StreamsBuilder builder) {
+final Materialized> 
materialized =
+Materialized.as(supplier);
 
-if (cache) {
-materialized.withCachingEnabled();
-} else {
-materialized.withCachingDisabled();
-}
+if (cache) {
+materialized.withCachingEnabled();
+} else {
+materialized.withCachingDisabled();
+}
 
-if (log) {
-materialized.withLoggingEnabled(Collections.emptyMap());
-} else {
-materialized.withCachingDisabled();
-}
+if (log) {
+materialized.withLoggingEnabled(Collections.emptyMap());
+} else {
+materialized.withCachingDisabled();

Review comment:
   withLoggingDisabled?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##
@@ -365,109 +395,200 @@ public void beforeTest() {
 final Properties streamsConfig = streamsConfiguration(
 cache,
 log,
-storeToTest.name()
+storeToTest.name(),
+kind
 );
 
 final StreamsBuilder builder = new StreamsBuilder();
-if (supplier instanceof KeyValueBytesStoreSupplier) {
-final Materialized> 
materialized =
-Materialized.as((KeyValueBytesStoreSupplier) supplier);
+if (Objects.equals(kind, "DSL") && supplier instanceof 
KeyValueBytesStoreSupplier) {
+setUpKeyVa

[GitHub] [kafka] guozhangwang commented on pull request #11624: KAFKA-13553: add PAPI KV store tests for IQv2

2022-01-04 Thread GitBox


guozhangwang commented on pull request #11624:
URL: https://github.com/apache/kafka/pull/11624#issuecomment-1005074050


   Hey @vvcephei , please feel free to merge after addressed the comments and 
resolved conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2022-01-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17468879#comment-17468879
 ] 

Matthias J. Sax commented on KAFKA-13289:
-

{quote}However in the cases mentioned here, there is a lack of output, not an 
early emission. So I fear these issues are different.
{quote}
Thanks for double checking. I just wanted to ensure, that those `null` results 
are not interpret as missing data (there could be a later inner join result).
 
The log line clearly indicates that a record was dropped on the floor: we do a 
put() into the state store, but the record is too old, and the state store drop 
it.
We know that timestamp synchronization in v2.8 still has some gaps. I was also 
just looking into the GitHub repos, and both don't use `max.task.idle.ms` 
config. Can you try to increase this config and/or switch to 3.0? Cf 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization]
 

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> ---
>
> Key: KAFKA-13289
> URL: https://issues.apache.org/jira/browse/KAFKA-13289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Matthew Sheppard
>Priority: Minor
>
> When pushing bulk data through a kafka-steams app, I see it log the following 
> message many times...
> {noformat}
> WARN 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
> Skipping record for expired segment.
> {noformat}
> ...and data which I expect to have been joined through a leftJoin step 
> appears to be lost.
> I've seen this in practice either when my application has been shut down for 
> a while and then is brought back up, or when I've used something like the 
> [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
>  in an attempt to have the application reprocess past data.
> I was able to reproduce this behaviour in isolation by generating 1000 
> messages to two topics spaced an hour apart (with the original timestamps in 
> order), then having kafka streams select a key for them and try to leftJoin 
> the two rekeyed streams.
> Self contained source code for that reproduction is available at 
> https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
> The actual kafka-streams topology in there looks like this.
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream leftStream = 
> builder.stream(leftTopic);
> final KStream rightStream = 
> builder.stream(rightTopic);
> final KStream rekeyedLeftStream = leftStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> final KStream rekeyedRightStream = rightStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
> final KStream joined = rekeyedLeftStream.leftJoin(
> rekeyedRightStream,
> (left, right) -> left + "/" + right,
> joinWindow
> );
> {code}
> ...and the eventual output I produce looks like this...
> {code}
> ...
> 523 [523,left/null]
> 524 [524,left/null, 524,left/524,right]
> 525 [525,left/525,right]
> 526 [526,left/null]
> 527 [527,left/null]
> 528 [528,left/528,right]
> 529 [529,left/null]
> 530 [530,left/null]
> 531 [531,left/null, 531,left/531,right]
> 532 [532,left/null]
> 533 [533,left/null]
> 534 [534,left/null, 534,left/534,right]
> 535 [535,left/null]
> 536 [536,left/null]
> 537 [537,left/null, 537,left/537,right]
> 538 [538,left/null]
> 539 [539,left/null]
> 540 [540,left/null]
> 541 [541,left/null]
> 542 [542,left/null]
> 543 [543,left/null]
> ...
> {code}
> ...where as, given the input data, I expect to see every row end with the two 
> values joined, rather than the right value being null.
> Note that I understand it's expected that we initially get the left/null 
> values for many values since that's the expected semantics of kafka-streams 
> left join, at least until 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious
> I've noticed that if I set a very large grace value on the join window the 
> problem is solved, but since the input I provide is not out of order I did 
> not expect to need to do that, and I'm weary of the resource requirements 
> doing so in practice on an applicat

[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2022-01-04 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17468912#comment-17468912
 ] 

Eugen Dück commented on KAFKA-13289:


[~mjsax] 
I tried setting `max.task.idle.ms` like so:

{color:#00}streamsConfiguration{color}.put({color:#00}StreamsConfig{color}.{color:#871094}MAX_TASK_IDLE_MS_CONFIG{color},
 {color:#067d17}"" {color}+ 
{color:#00}Long{color}.{color:#871094}MAX_VALUE{color});

However, the "Skipping record for expired segment." messages are logged 
nevertheless. I also tried lower values like 1 million, to no avail. This test 
runs on Kafka 2.8.0

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> ---
>
> Key: KAFKA-13289
> URL: https://issues.apache.org/jira/browse/KAFKA-13289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Matthew Sheppard
>Priority: Minor
>
> When pushing bulk data through a kafka-steams app, I see it log the following 
> message many times...
> {noformat}
> WARN 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
> Skipping record for expired segment.
> {noformat}
> ...and data which I expect to have been joined through a leftJoin step 
> appears to be lost.
> I've seen this in practice either when my application has been shut down for 
> a while and then is brought back up, or when I've used something like the 
> [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
>  in an attempt to have the application reprocess past data.
> I was able to reproduce this behaviour in isolation by generating 1000 
> messages to two topics spaced an hour apart (with the original timestamps in 
> order), then having kafka streams select a key for them and try to leftJoin 
> the two rekeyed streams.
> Self contained source code for that reproduction is available at 
> https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
> The actual kafka-streams topology in there looks like this.
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream leftStream = 
> builder.stream(leftTopic);
> final KStream rightStream = 
> builder.stream(rightTopic);
> final KStream rekeyedLeftStream = leftStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> final KStream rekeyedRightStream = rightStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
> final KStream joined = rekeyedLeftStream.leftJoin(
> rekeyedRightStream,
> (left, right) -> left + "/" + right,
> joinWindow
> );
> {code}
> ...and the eventual output I produce looks like this...
> {code}
> ...
> 523 [523,left/null]
> 524 [524,left/null, 524,left/524,right]
> 525 [525,left/525,right]
> 526 [526,left/null]
> 527 [527,left/null]
> 528 [528,left/528,right]
> 529 [529,left/null]
> 530 [530,left/null]
> 531 [531,left/null, 531,left/531,right]
> 532 [532,left/null]
> 533 [533,left/null]
> 534 [534,left/null, 534,left/534,right]
> 535 [535,left/null]
> 536 [536,left/null]
> 537 [537,left/null, 537,left/537,right]
> 538 [538,left/null]
> 539 [539,left/null]
> 540 [540,left/null]
> 541 [541,left/null]
> 542 [542,left/null]
> 543 [543,left/null]
> ...
> {code}
> ...where as, given the input data, I expect to see every row end with the two 
> values joined, rather than the right value being null.
> Note that I understand it's expected that we initially get the left/null 
> values for many values since that's the expected semantics of kafka-streams 
> left join, at least until 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious
> I've noticed that if I set a very large grace value on the join window the 
> problem is solved, but since the input I provide is not out of order I did 
> not expect to need to do that, and I'm weary of the resource requirements 
> doing so in practice on an application with a lot of volume.
> My suspicion is that something is happening such that when one partition is 
> processed it causes the stream time to be pushed forward to the newest 
> message in that partition, meaning when the next partition is then examined 
> it is found to contain many records which are 'too old' compared to the 
> stream time. 
> I ran across this disc

[GitHub] [kafka] guozhangwang commented on pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-01-04 Thread GitBox


guozhangwang commented on pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#issuecomment-1005247992


   Thanks @vamossagar12 , I do not have further comments now, re-triggering the 
jenkins tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2022-01-04 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17468913#comment-17468913
 ] 

Eugen Dück commented on KAFKA-13289:


[~mjsax] 

I just retried with confluentinc/cp-kafka:7.0.1, i.e. Kafka 3.0, and "Skipping 
record for expired segment." still gets logged.

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> ---
>
> Key: KAFKA-13289
> URL: https://issues.apache.org/jira/browse/KAFKA-13289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Matthew Sheppard
>Priority: Minor
>
> When pushing bulk data through a kafka-steams app, I see it log the following 
> message many times...
> {noformat}
> WARN 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
> Skipping record for expired segment.
> {noformat}
> ...and data which I expect to have been joined through a leftJoin step 
> appears to be lost.
> I've seen this in practice either when my application has been shut down for 
> a while and then is brought back up, or when I've used something like the 
> [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
>  in an attempt to have the application reprocess past data.
> I was able to reproduce this behaviour in isolation by generating 1000 
> messages to two topics spaced an hour apart (with the original timestamps in 
> order), then having kafka streams select a key for them and try to leftJoin 
> the two rekeyed streams.
> Self contained source code for that reproduction is available at 
> https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
> The actual kafka-streams topology in there looks like this.
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream leftStream = 
> builder.stream(leftTopic);
> final KStream rightStream = 
> builder.stream(rightTopic);
> final KStream rekeyedLeftStream = leftStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> final KStream rekeyedRightStream = rightStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
> final KStream joined = rekeyedLeftStream.leftJoin(
> rekeyedRightStream,
> (left, right) -> left + "/" + right,
> joinWindow
> );
> {code}
> ...and the eventual output I produce looks like this...
> {code}
> ...
> 523 [523,left/null]
> 524 [524,left/null, 524,left/524,right]
> 525 [525,left/525,right]
> 526 [526,left/null]
> 527 [527,left/null]
> 528 [528,left/528,right]
> 529 [529,left/null]
> 530 [530,left/null]
> 531 [531,left/null, 531,left/531,right]
> 532 [532,left/null]
> 533 [533,left/null]
> 534 [534,left/null, 534,left/534,right]
> 535 [535,left/null]
> 536 [536,left/null]
> 537 [537,left/null, 537,left/537,right]
> 538 [538,left/null]
> 539 [539,left/null]
> 540 [540,left/null]
> 541 [541,left/null]
> 542 [542,left/null]
> 543 [543,left/null]
> ...
> {code}
> ...where as, given the input data, I expect to see every row end with the two 
> values joined, rather than the right value being null.
> Note that I understand it's expected that we initially get the left/null 
> values for many values since that's the expected semantics of kafka-streams 
> left join, at least until 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious
> I've noticed that if I set a very large grace value on the join window the 
> problem is solved, but since the input I provide is not out of order I did 
> not expect to need to do that, and I'm weary of the resource requirements 
> doing so in practice on an application with a lot of volume.
> My suspicion is that something is happening such that when one partition is 
> processed it causes the stream time to be pushed forward to the newest 
> message in that partition, meaning when the next partition is then examined 
> it is found to contain many records which are 'too old' compared to the 
> stream time. 
> I ran across this discussion thread which seems to cover the same issue 
> http://mail-archives.apache.org/mod_mbox/kafka-users/202002.mbox/%3cCAB0tB9p_vijMS18jWXBqp7TQozL__ANoo3=h57q6z3y4hzt...@mail.gmail.com%3e
>  and had a request from [~cadonna] for a reproduction case, so I'm hoping my 
> example above might make the issue ea

[jira] [Comment Edited] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2022-01-04 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17468912#comment-17468912
 ] 

Eugen Dück edited comment on KAFKA-13289 at 1/4/22, 11:34 PM:
--

[~mjsax] 
I tried setting `max.task.idle.ms` like so:

{color:#00}streamsConfiguration{color}.put({color:#00}StreamsConfig{color}.{color:#871094}MAX_TASK_IDLE_MS_CONFIG{color},
 {color:#067d17}"" {color}+ 
{color:#00}Long{color}.{color:#871094}MAX_VALUE{color});

However, the "Skipping record for expired segment." messages are logged 
nevertheless. I also tried lower values like 1 million, to no avail. This test 
runs on confluentinc/cp-kafka:5.4.3 (Kafka 2.8)


was (Author: eugendueck):
[~mjsax] 
I tried setting `max.task.idle.ms` like so:

{color:#00}streamsConfiguration{color}.put({color:#00}StreamsConfig{color}.{color:#871094}MAX_TASK_IDLE_MS_CONFIG{color},
 {color:#067d17}"" {color}+ 
{color:#00}Long{color}.{color:#871094}MAX_VALUE{color});

However, the "Skipping record for expired segment." messages are logged 
nevertheless. I also tried lower values like 1 million, to no avail. This test 
runs on Kafka 2.8.0

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> ---
>
> Key: KAFKA-13289
> URL: https://issues.apache.org/jira/browse/KAFKA-13289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Matthew Sheppard
>Priority: Minor
>
> When pushing bulk data through a kafka-steams app, I see it log the following 
> message many times...
> {noformat}
> WARN 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
> Skipping record for expired segment.
> {noformat}
> ...and data which I expect to have been joined through a leftJoin step 
> appears to be lost.
> I've seen this in practice either when my application has been shut down for 
> a while and then is brought back up, or when I've used something like the 
> [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
>  in an attempt to have the application reprocess past data.
> I was able to reproduce this behaviour in isolation by generating 1000 
> messages to two topics spaced an hour apart (with the original timestamps in 
> order), then having kafka streams select a key for them and try to leftJoin 
> the two rekeyed streams.
> Self contained source code for that reproduction is available at 
> https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
> The actual kafka-streams topology in there looks like this.
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream leftStream = 
> builder.stream(leftTopic);
> final KStream rightStream = 
> builder.stream(rightTopic);
> final KStream rekeyedLeftStream = leftStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> final KStream rekeyedRightStream = rightStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
> final KStream joined = rekeyedLeftStream.leftJoin(
> rekeyedRightStream,
> (left, right) -> left + "/" + right,
> joinWindow
> );
> {code}
> ...and the eventual output I produce looks like this...
> {code}
> ...
> 523 [523,left/null]
> 524 [524,left/null, 524,left/524,right]
> 525 [525,left/525,right]
> 526 [526,left/null]
> 527 [527,left/null]
> 528 [528,left/528,right]
> 529 [529,left/null]
> 530 [530,left/null]
> 531 [531,left/null, 531,left/531,right]
> 532 [532,left/null]
> 533 [533,left/null]
> 534 [534,left/null, 534,left/534,right]
> 535 [535,left/null]
> 536 [536,left/null]
> 537 [537,left/null, 537,left/537,right]
> 538 [538,left/null]
> 539 [539,left/null]
> 540 [540,left/null]
> 541 [541,left/null]
> 542 [542,left/null]
> 543 [543,left/null]
> ...
> {code}
> ...where as, given the input data, I expect to see every row end with the two 
> values joined, rather than the right value being null.
> Note that I understand it's expected that we initially get the left/null 
> values for many values since that's the expected semantics of kafka-streams 
> left join, at least until 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious
> I've noticed that if I set a very large grace value on the join window the 
> problem

[GitHub] [kafka] vvcephei commented on a change in pull request #11624: KAFKA-13553: add PAPI KV store tests for IQv2

2022-01-04 Thread GitBox


vvcephei commented on a change in pull request #11624:
URL: https://github.com/apache/kafka/pull/11624#discussion_r778456441



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##
@@ -365,109 +395,200 @@ public void beforeTest() {
 final Properties streamsConfig = streamsConfiguration(
 cache,
 log,
-storeToTest.name()
+storeToTest.name(),
+kind
 );
 
 final StreamsBuilder builder = new StreamsBuilder();
-if (supplier instanceof KeyValueBytesStoreSupplier) {
-final Materialized> 
materialized =
-Materialized.as((KeyValueBytesStoreSupplier) supplier);
+if (Objects.equals(kind, "DSL") && supplier instanceof 
KeyValueBytesStoreSupplier) {
+setUpKeyValueDSLTopology((KeyValueBytesStoreSupplier) supplier, 
builder);
+} else if (Objects.equals(kind, "PAPI") && supplier instanceof 
KeyValueBytesStoreSupplier) {
+setUpKeyValuePAPITopology((KeyValueBytesStoreSupplier) supplier, 
builder);
+} else if (Objects.equals(kind, "DSL") && supplier instanceof 
WindowBytesStoreSupplier) {
+setUpWindowDSLTopology((WindowBytesStoreSupplier) supplier, 
builder);
+} else if (Objects.equals(kind, "PAPI") && supplier instanceof 
WindowBytesStoreSupplier) {
+throw new AssumptionViolatedException("Case not implemented yet");
+} else if (Objects.equals(kind, "DSL") && supplier instanceof 
SessionBytesStoreSupplier) {
+setUpSessionDSLTopology((SessionBytesStoreSupplier) supplier, 
builder);
+} else if (Objects.equals(kind, "PAPI") && supplier instanceof 
SessionBytesStoreSupplier) {
+throw new AssumptionViolatedException("Case not implemented yet");
+} else {
+throw new AssertionError("Store supplier is an unrecognized 
type.");
+}
 
-if (cache) {
-materialized.withCachingEnabled();
-} else {
-materialized.withCachingDisabled();
-}
+// Don't need to wait for running, since tests can use iqv2 to wait 
until they
+// get a valid response.
 
-if (log) {
-materialized.withLoggingEnabled(Collections.emptyMap());
-} else {
-materialized.withCachingDisabled();
-}
+kafkaStreams =
+IntegrationTestUtils.getStartedStreams(
+streamsConfig,
+builder,
+true
+);
+}
 
-if (storeToTest.global()) {
-builder.globalTable(
-INPUT_TOPIC_NAME,
-Consumed.with(Serdes.Integer(), Serdes.Integer()),
-materialized
-);
-} else {
-builder.table(
-INPUT_TOPIC_NAME,
-Consumed.with(Serdes.Integer(), Serdes.Integer()),
-materialized
-);
-}
-} else if (supplier instanceof WindowBytesStoreSupplier) {
-final Materialized> 
materialized =
-Materialized.as((WindowBytesStoreSupplier) supplier);
+private void setUpSessionDSLTopology(final SessionBytesStoreSupplier 
supplier,
+ final StreamsBuilder builder) {
+final Materialized> 
materialized =
+Materialized.as(supplier);
 
-if (cache) {
-materialized.withCachingEnabled();
-} else {
-materialized.withCachingDisabled();
-}
+if (cache) {
+materialized.withCachingEnabled();
+} else {
+materialized.withCachingDisabled();
+}
 
-if (log) {
-materialized.withLoggingEnabled(Collections.emptyMap());
-} else {
-materialized.withCachingDisabled();
-}
+if (log) {
+materialized.withLoggingEnabled(Collections.emptyMap());
+} else {
+materialized.withCachingDisabled();

Review comment:
   Oops! Thanks for the catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #11624: KAFKA-13553: add PAPI KV store tests for IQv2

2022-01-04 Thread GitBox


vvcephei commented on pull request #11624:
URL: https://github.com/apache/kafka/pull/11624#issuecomment-1005250957


   Thanks, @guozhangwang !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics

2022-01-04 Thread GitBox


guozhangwang commented on pull request #11460:
URL: https://github.com/apache/kafka/pull/11460#issuecomment-1005257929


   Thanks @RivenSun2 for the improvement and @showuon for the reviews, LGTM to 
clarify it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #11460: KAFKA-13425: Optimization of KafkaConsumer#pause semantics

2022-01-04 Thread GitBox


guozhangwang merged pull request #11460:
URL: https://github.com/apache/kafka/pull/11460


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-01-04 Thread GitBox


guozhangwang commented on pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#issuecomment-1005259979


   @vamossagar12 there are some compilation errors in jenkins, could you rebase 
from latest trunk and check what's the issue? Please ping me once that's 
resolved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on pull request #11641: MINOR: update the comment for Utils.atomicMoveWithFallback

2022-01-04 Thread GitBox


ccding commented on pull request #11641:
URL: https://github.com/apache/kafka/pull/11641#issuecomment-1005331649


   cc @junrao 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13570) Fallback of unsupported versions of ApiVersionsRequest and RequestHeader?

2022-01-04 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17468965#comment-17468965
 ] 

dengziming commented on KAFKA-13570:


I don't  think it's a simple problem for a Server to properly handle a higher 
version RPC from a Client. So it's the responsibility of client to do it, most 
Kafka client will fallback to older version RPC on an 
UnsupportedVersionException, but it's not granted.

> Fallback of unsupported versions of ApiVersionsRequest and RequestHeader?
> -
>
> Key: KAFKA-13570
> URL: https://issues.apache.org/jira/browse/KAFKA-13570
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Fredrik Arvidsson
>Priority: Minor
>
> I've gone through the protocol documentation and the source code, but I can't 
> find any explicit documentation stating how clients and brokers are handling 
> the scenario when the client sends higher versions of ApiVersionsRequest and 
> RequestHeader which the broker doesn't understand. I've seen hints in 
> discussions that the broker would fallback to returning version 0 of the 
> ApiVersionsResponse in this scenario.
> Are there any documentation that explains these scenarios?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2022-01-04 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17468913#comment-17468913
 ] 

Eugen Dück edited comment on KAFKA-13289 at 1/5/22, 2:38 AM:
-

[~mjsax] 

I just retried with confluentinc/cp-kafka:7.0.1, i.e. Kafka 3.0, and 
max.task.idle.ms = Long.MAX. "Skipping record for expired segment." still gets 
logged.


was (Author: eugendueck):
[~mjsax] 

I just retried with confluentinc/cp-kafka:7.0.1, i.e. Kafka 3.0, and "Skipping 
record for expired segment." still gets logged.

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> ---
>
> Key: KAFKA-13289
> URL: https://issues.apache.org/jira/browse/KAFKA-13289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Matthew Sheppard
>Priority: Minor
>
> When pushing bulk data through a kafka-steams app, I see it log the following 
> message many times...
> {noformat}
> WARN 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
> Skipping record for expired segment.
> {noformat}
> ...and data which I expect to have been joined through a leftJoin step 
> appears to be lost.
> I've seen this in practice either when my application has been shut down for 
> a while and then is brought back up, or when I've used something like the 
> [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
>  in an attempt to have the application reprocess past data.
> I was able to reproduce this behaviour in isolation by generating 1000 
> messages to two topics spaced an hour apart (with the original timestamps in 
> order), then having kafka streams select a key for them and try to leftJoin 
> the two rekeyed streams.
> Self contained source code for that reproduction is available at 
> https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
> The actual kafka-streams topology in there looks like this.
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream leftStream = 
> builder.stream(leftTopic);
> final KStream rightStream = 
> builder.stream(rightTopic);
> final KStream rekeyedLeftStream = leftStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> final KStream rekeyedRightStream = rightStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
> final KStream joined = rekeyedLeftStream.leftJoin(
> rekeyedRightStream,
> (left, right) -> left + "/" + right,
> joinWindow
> );
> {code}
> ...and the eventual output I produce looks like this...
> {code}
> ...
> 523 [523,left/null]
> 524 [524,left/null, 524,left/524,right]
> 525 [525,left/525,right]
> 526 [526,left/null]
> 527 [527,left/null]
> 528 [528,left/528,right]
> 529 [529,left/null]
> 530 [530,left/null]
> 531 [531,left/null, 531,left/531,right]
> 532 [532,left/null]
> 533 [533,left/null]
> 534 [534,left/null, 534,left/534,right]
> 535 [535,left/null]
> 536 [536,left/null]
> 537 [537,left/null, 537,left/537,right]
> 538 [538,left/null]
> 539 [539,left/null]
> 540 [540,left/null]
> 541 [541,left/null]
> 542 [542,left/null]
> 543 [543,left/null]
> ...
> {code}
> ...where as, given the input data, I expect to see every row end with the two 
> values joined, rather than the right value being null.
> Note that I understand it's expected that we initially get the left/null 
> values for many values since that's the expected semantics of kafka-streams 
> left join, at least until 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious
> I've noticed that if I set a very large grace value on the join window the 
> problem is solved, but since the input I provide is not out of order I did 
> not expect to need to do that, and I'm weary of the resource requirements 
> doing so in practice on an application with a lot of volume.
> My suspicion is that something is happening such that when one partition is 
> processed it causes the stream time to be pushed forward to the newest 
> message in that partition, meaning when the next partition is then examined 
> it is found to contain many records which are 'too old' compared to the 
> stream time. 
> I ran across this discussion thread which seems to cover the same issue 
> http://mail

[jira] [Commented] (KAFKA-13563) FindCoordinatorFuture never get cleared in non-group mode( consumer#assign)

2022-01-04 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17469004#comment-17469004
 ] 

Luke Chen commented on KAFKA-13563:
---

[~guozhang] [~ableegoldman], you might want to check this issue, and the WIP 
PR:  [https://github.com/apache/kafka/pull/11631] . Thank you.

> FindCoordinatorFuture never get cleared in non-group mode( consumer#assign)
> ---
>
> Key: KAFKA-13563
> URL: https://issues.apache.org/jira/browse/KAFKA-13563
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.7.1, 3.0.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Attachments: kafka.zip
>
>
> In KAFKA-10793, we fix the race condition when lookup coordinator by clearing 
> the _findCoordinatorFuture_ when handling the result, rather than in the 
> listener callbacks. It works well under consumer group mode (i.e. 
> Consumer#subscribe), but we found when user is using non consumer group mode 
> (i.e. Consumer#assign) with group id provided (for offset commitment, so that 
> there will be consumerCoordinator created), the _findCoordinatorFuture_ will 
> never be cleared in some situations, and cause the offset committing keeps 
> getting NOT_COORDINATOR error.
>  
> After KAFKA-10793, we clear the _findCoordinatorFuture_ in 2 places:
>  # heartbeat thread
>  # AbstractCoordinator#ensureCoordinatorReady
> But in non consumer group mode with group id provided, there will be no 
> (1)heartbeat thread , and it only call 
> (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to 
> fetch committed offset position. That is, after 2nd lookupCoordinator call, 
> we have no chance to clear the _findCoordinatorFuture_ .
>  
> To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear 
> the _findCoordinatorFuture_ in the future listener. So, I think we can fix 
> this issue by calling AbstractCoordinator#ensureCoordinatorReady when 
> coordinator unknown in non consumer group case, under each Consumer#poll.
>  
> Reproduce steps:
>  
> 1. Start a 3 Broker cluster with a Topic having Replicas=3.
> 2. Start a Client with Producer and Consumer (with Consumer#assign(), not 
> subscribe, and provide a group id) communicating over the Topic.
> 3. Stop the Broker that is acting as the Group Coordinator.
> 4. Observe successful Rediscovery of new Group Coordinator.
> 5. Restart the stopped Broker.
> 6. Stop the Broker that became the new Group Coordinator at step 4.
> 7. Observe "Rediscovery will be attempted" message but no "Discovered group 
> coordinator" message.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13563) FindCoordinatorFuture never get cleared in non-group mode( consumer#assign)

2022-01-04 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-13563:
--
Affects Version/s: 3.0.0
   2.7.1

> FindCoordinatorFuture never get cleared in non-group mode( consumer#assign)
> ---
>
> Key: KAFKA-13563
> URL: https://issues.apache.org/jira/browse/KAFKA-13563
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.7.1, 3.0.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Attachments: kafka.zip
>
>
> In KAFKA-10793, we fix the race condition when lookup coordinator by clearing 
> the _findCoordinatorFuture_ when handling the result, rather than in the 
> listener callbacks. It works well under consumer group mode (i.e. 
> Consumer#subscribe), but we found when user is using non consumer group mode 
> (i.e. Consumer#assign) with group id provided (for offset commitment, so that 
> there will be consumerCoordinator created), the _findCoordinatorFuture_ will 
> never be cleared in some situations, and cause the offset committing keeps 
> getting NOT_COORDINATOR error.
>  
> After KAFKA-10793, we clear the _findCoordinatorFuture_ in 2 places:
>  # heartbeat thread
>  # AbstractCoordinator#ensureCoordinatorReady
> But in non consumer group mode with group id provided, there will be no 
> (1)heartbeat thread , and it only call 
> (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to 
> fetch committed offset position. That is, after 2nd lookupCoordinator call, 
> we have no chance to clear the _findCoordinatorFuture_ .
>  
> To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear 
> the _findCoordinatorFuture_ in the future listener. So, I think we can fix 
> this issue by calling AbstractCoordinator#ensureCoordinatorReady when 
> coordinator unknown in non consumer group case, under each Consumer#poll.
>  
> Reproduce steps:
>  
> 1. Start a 3 Broker cluster with a Topic having Replicas=3.
> 2. Start a Client with Producer and Consumer (with Consumer#assign(), not 
> subscribe, and provide a group id) communicating over the Topic.
> 3. Stop the Broker that is acting as the Group Coordinator.
> 4. Observe successful Rediscovery of new Group Coordinator.
> 5. Restart the stopped Broker.
> 6. Stop the Broker that became the new Group Coordinator at step 4.
> 7. Observe "Rediscovery will be attempted" message but no "Discovered group 
> coordinator" message.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] vamossagar12 commented on pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-01-04 Thread GitBox


vamossagar12 commented on pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#issuecomment-1005361882


   @guozhangwang , it's due to the usage of deprecated configs and gradle 
having a check against it.
   
   ```
   [2022-01-04T23:35:30.575Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11424/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java:61:
 warning: [deprecation] CACHE_MAX_BYTES_BUFFERING_CONFIG in 
org.apache.kafka.streams.StreamsConfig has been deprecated
   
   [2022-01-04T23:35:30.575Z] 
.define(CACHE_MAX_BYTES_BUFFERING_CONFIG,
   
   [2022-01-04T23:35:30.575Z] ^
   
   [2022-01-04T23:35:30.575Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11424/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java:56:
 warning: [deprecation] BUFFERED_RECORDS_PER_PARTITION_CONFIG in 
org.apache.kafka.streams.StreamsConfig has been deprecated
   
   [2022-01-04T23:35:30.575Z]  
.define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
   
   [2022-01-04T23:35:30.575Z]  ^
   
   [2022-01-04T23:35:30.575Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11424/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java:115:
 warning: [deprecation] BUFFERED_RECORDS_PER_PARTITION_CONFIG in 
org.apache.kafka.streams.StreamsConfig has been deprecated
   
   [2022-01-04T23:35:30.575Z] if 
(isTopologyOverride(BUFFERED_RECORDS_PER_PARTITION_CONFIG, topologyOverrides)) {
   
   [2022-01-04T23:35:30.575Z]^
   
   [2022-01-04T23:35:30.575Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11424/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java:116:
 warning: [deprecation] BUFFERED_RECORDS_PER_PARTITION_CONFIG in 
org.apache.kafka.streams.StreamsConfig has been deprecated
   
   [2022-01-04T23:35:30.575Z] maxBufferedSize = 
getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
   
   [2022-01-04T23:35:30.575Z]  ^
   
   [2022-01-04T23:35:30.575Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11424/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java:117:
 warning: [deprecation] BUFFERED_RECORDS_PER_PARTITION_CONFIG in 
org.apache.kafka.streams.StreamsConfig has been deprecated
   
   [2022-01-04T23:35:30.575Z] log.info("Topology {} is overriding 
{} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, 
maxBufferedSize);
   
   [2022-01-04T23:35:30.575Z]   
   ^
   
   [2022-01-04T23:35:30.575Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11424/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java:119:
 warning: [deprecation] BUFFERED_RECORDS_PER_PARTITION_CONFIG in 
org.apache.kafka.streams.StreamsConfig has been deprecated
   
   [2022-01-04T23:35:30.575Z] maxBufferedSize = 
globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)
   
   [2022-01-04T23:35:30.575Z]   
  ^
   
   [2022-01-04T23:35:30.575Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11424/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java:120:
 warning: [deprecation] BUFFERED_RECORDS_PER_PARTITION_CONFIG in 
org.apache.kafka.streams.StreamsConfig has been deprecated
   
   [2022-01-04T23:35:30.575Z] ? 
globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : 
-1;
   
   [2022-01-04T23:35:30.575Z]   
 ^
   
   [2022-01-04T23:35:30.575Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11424/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java:123:
 warning: [deprecation] CACHE_MAX_BYTES_BUFFERING_CONFIG in 
org.apache.kafka.streams.StreamsConfig has been deprecated
   
   [2022-01-04T23:35:30.575Z] if 
(isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) {
   
   [2022-01-04T23:35:30.575Z]^
   
   [2022-01-04T23:35:30.575Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11424/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java:124:
 warning: [deprecation] CACHE_MAX_BYTES_BUFFERING_CONFIG in 
org.apache.kafka.streams.StreamsConfig has been deprecated
   
   [2022-01-04T23:35:30.575Z] cacheSize = 
getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
   
   [2022-01-04T23:35:30.575Z] ^
   

[GitHub] [kafka] mjsax opened a new pull request #11648: MINOR: add default-replication-factor to MockAdminClient

2022-01-04 Thread GitBox


mjsax opened a new pull request #11648:
URL: https://github.com/apache/kafka/pull/11648


   MockAdminClient should add `default.replication.factor` to the config of 
each node. Otherwise, `describeConfigs()` does not return the default 
replication factor correctly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2022-01-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17469010#comment-17469010
 ] 

Matthias J. Sax commented on KAFKA-13289:
-

You can reproduce with the code in the github repo linked above? Let me try to 
check it out and reproduce locally to see what's going on. Not sure how quickly 
I will find time though. – Thanks for your patients and collaboration!

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> ---
>
> Key: KAFKA-13289
> URL: https://issues.apache.org/jira/browse/KAFKA-13289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Matthew Sheppard
>Priority: Minor
>
> When pushing bulk data through a kafka-steams app, I see it log the following 
> message many times...
> {noformat}
> WARN 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
> Skipping record for expired segment.
> {noformat}
> ...and data which I expect to have been joined through a leftJoin step 
> appears to be lost.
> I've seen this in practice either when my application has been shut down for 
> a while and then is brought back up, or when I've used something like the 
> [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
>  in an attempt to have the application reprocess past data.
> I was able to reproduce this behaviour in isolation by generating 1000 
> messages to two topics spaced an hour apart (with the original timestamps in 
> order), then having kafka streams select a key for them and try to leftJoin 
> the two rekeyed streams.
> Self contained source code for that reproduction is available at 
> https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
> The actual kafka-streams topology in there looks like this.
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream leftStream = 
> builder.stream(leftTopic);
> final KStream rightStream = 
> builder.stream(rightTopic);
> final KStream rekeyedLeftStream = leftStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> final KStream rekeyedRightStream = rightStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
> final KStream joined = rekeyedLeftStream.leftJoin(
> rekeyedRightStream,
> (left, right) -> left + "/" + right,
> joinWindow
> );
> {code}
> ...and the eventual output I produce looks like this...
> {code}
> ...
> 523 [523,left/null]
> 524 [524,left/null, 524,left/524,right]
> 525 [525,left/525,right]
> 526 [526,left/null]
> 527 [527,left/null]
> 528 [528,left/528,right]
> 529 [529,left/null]
> 530 [530,left/null]
> 531 [531,left/null, 531,left/531,right]
> 532 [532,left/null]
> 533 [533,left/null]
> 534 [534,left/null, 534,left/534,right]
> 535 [535,left/null]
> 536 [536,left/null]
> 537 [537,left/null, 537,left/537,right]
> 538 [538,left/null]
> 539 [539,left/null]
> 540 [540,left/null]
> 541 [541,left/null]
> 542 [542,left/null]
> 543 [543,left/null]
> ...
> {code}
> ...where as, given the input data, I expect to see every row end with the two 
> values joined, rather than the right value being null.
> Note that I understand it's expected that we initially get the left/null 
> values for many values since that's the expected semantics of kafka-streams 
> left join, at least until 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious
> I've noticed that if I set a very large grace value on the join window the 
> problem is solved, but since the input I provide is not out of order I did 
> not expect to need to do that, and I'm weary of the resource requirements 
> doing so in practice on an application with a lot of volume.
> My suspicion is that something is happening such that when one partition is 
> processed it causes the stream time to be pushed forward to the newest 
> message in that partition, meaning when the next partition is then examined 
> it is found to contain many records which are 'too old' compared to the 
> stream time. 
> I ran across this discussion thread which seems to cover the same issue 
> http://mail-archives.apache.org/mod_mbox/kafka-users/202002.mbox/%3cCAB0tB9p_vijMS18jWXBqp7TQozL__ANoo3=h57q6z3y4hzt...@mail.gmail.com%3e
>  and had a req

[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2022-01-04 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17469016#comment-17469016
 ] 

Eugen Dück commented on KAFKA-13289:


Yes, if you use Matthew's repo, which gives the "Skipping record" logs:  
[https://github.com/mattsheppard/ins14809/|https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java]

Setting max.task.idle.ms is straightforward, and so is using 
confluentinc/cp-kafka:7.0.1

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> ---
>
> Key: KAFKA-13289
> URL: https://issues.apache.org/jira/browse/KAFKA-13289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Matthew Sheppard
>Priority: Minor
>
> When pushing bulk data through a kafka-steams app, I see it log the following 
> message many times...
> {noformat}
> WARN 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
> Skipping record for expired segment.
> {noformat}
> ...and data which I expect to have been joined through a leftJoin step 
> appears to be lost.
> I've seen this in practice either when my application has been shut down for 
> a while and then is brought back up, or when I've used something like the 
> [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
>  in an attempt to have the application reprocess past data.
> I was able to reproduce this behaviour in isolation by generating 1000 
> messages to two topics spaced an hour apart (with the original timestamps in 
> order), then having kafka streams select a key for them and try to leftJoin 
> the two rekeyed streams.
> Self contained source code for that reproduction is available at 
> https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
> The actual kafka-streams topology in there looks like this.
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream leftStream = 
> builder.stream(leftTopic);
> final KStream rightStream = 
> builder.stream(rightTopic);
> final KStream rekeyedLeftStream = leftStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> final KStream rekeyedRightStream = rightStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
> final KStream joined = rekeyedLeftStream.leftJoin(
> rekeyedRightStream,
> (left, right) -> left + "/" + right,
> joinWindow
> );
> {code}
> ...and the eventual output I produce looks like this...
> {code}
> ...
> 523 [523,left/null]
> 524 [524,left/null, 524,left/524,right]
> 525 [525,left/525,right]
> 526 [526,left/null]
> 527 [527,left/null]
> 528 [528,left/528,right]
> 529 [529,left/null]
> 530 [530,left/null]
> 531 [531,left/null, 531,left/531,right]
> 532 [532,left/null]
> 533 [533,left/null]
> 534 [534,left/null, 534,left/534,right]
> 535 [535,left/null]
> 536 [536,left/null]
> 537 [537,left/null, 537,left/537,right]
> 538 [538,left/null]
> 539 [539,left/null]
> 540 [540,left/null]
> 541 [541,left/null]
> 542 [542,left/null]
> 543 [543,left/null]
> ...
> {code}
> ...where as, given the input data, I expect to see every row end with the two 
> values joined, rather than the right value being null.
> Note that I understand it's expected that we initially get the left/null 
> values for many values since that's the expected semantics of kafka-streams 
> left join, at least until 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious
> I've noticed that if I set a very large grace value on the join window the 
> problem is solved, but since the input I provide is not out of order I did 
> not expect to need to do that, and I'm weary of the resource requirements 
> doing so in practice on an application with a lot of volume.
> My suspicion is that something is happening such that when one partition is 
> processed it causes the stream time to be pushed forward to the newest 
> message in that partition, meaning when the next partition is then examined 
> it is found to contain many records which are 'too old' compared to the 
> stream time. 
> I ran across this discussion thread which seems to cover the same issue 
> http://mail-archives.apache.org/mod_mbox/kafka-users/202002.mbox/%3cCAB0tB9p_vijMS18jWXBqp7TQoz

[GitHub] [kafka] guozhangwang commented on pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-01-04 Thread GitBox


guozhangwang commented on pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#issuecomment-1005381382


   @vamossagar12 Note that `TopologyConfig` is an internal class for now, and 
hence should not use deprecated values, I think we need to update this class to 
directly use the new config and remove the old ones --- since it is internal, 
it would not be breaking any compatibilities.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2022-01-04 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17469016#comment-17469016
 ] 

Eugen Dück edited comment on KAFKA-13289 at 1/5/22, 4:53 AM:
-

That would be awesome!

> You can reproduce with the code in the github repo linked above? 


Yes, if you use Matthew's repo, which gives the "Skipping record" logs:  
[https://github.com/mattsheppard/ins14809/|https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java]

Setting max.task.idle.ms is straightforward, and so is the change to using 
confluentinc/cp-kafka:7.0.1


was (Author: eugendueck):
Yes, if you use Matthew's repo, which gives the "Skipping record" logs:  
[https://github.com/mattsheppard/ins14809/|https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java]

Setting max.task.idle.ms is straightforward, and so is using 
confluentinc/cp-kafka:7.0.1

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> ---
>
> Key: KAFKA-13289
> URL: https://issues.apache.org/jira/browse/KAFKA-13289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Matthew Sheppard
>Priority: Minor
>
> When pushing bulk data through a kafka-steams app, I see it log the following 
> message many times...
> {noformat}
> WARN 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
> Skipping record for expired segment.
> {noformat}
> ...and data which I expect to have been joined through a leftJoin step 
> appears to be lost.
> I've seen this in practice either when my application has been shut down for 
> a while and then is brought back up, or when I've used something like the 
> [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
>  in an attempt to have the application reprocess past data.
> I was able to reproduce this behaviour in isolation by generating 1000 
> messages to two topics spaced an hour apart (with the original timestamps in 
> order), then having kafka streams select a key for them and try to leftJoin 
> the two rekeyed streams.
> Self contained source code for that reproduction is available at 
> https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
> The actual kafka-streams topology in there looks like this.
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream leftStream = 
> builder.stream(leftTopic);
> final KStream rightStream = 
> builder.stream(rightTopic);
> final KStream rekeyedLeftStream = leftStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> final KStream rekeyedRightStream = rightStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
> final KStream joined = rekeyedLeftStream.leftJoin(
> rekeyedRightStream,
> (left, right) -> left + "/" + right,
> joinWindow
> );
> {code}
> ...and the eventual output I produce looks like this...
> {code}
> ...
> 523 [523,left/null]
> 524 [524,left/null, 524,left/524,right]
> 525 [525,left/525,right]
> 526 [526,left/null]
> 527 [527,left/null]
> 528 [528,left/528,right]
> 529 [529,left/null]
> 530 [530,left/null]
> 531 [531,left/null, 531,left/531,right]
> 532 [532,left/null]
> 533 [533,left/null]
> 534 [534,left/null, 534,left/534,right]
> 535 [535,left/null]
> 536 [536,left/null]
> 537 [537,left/null, 537,left/537,right]
> 538 [538,left/null]
> 539 [539,left/null]
> 540 [540,left/null]
> 541 [541,left/null]
> 542 [542,left/null]
> 543 [543,left/null]
> ...
> {code}
> ...where as, given the input data, I expect to see every row end with the two 
> values joined, rather than the right value being null.
> Note that I understand it's expected that we initially get the left/null 
> values for many values since that's the expected semantics of kafka-streams 
> left join, at least until 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious
> I've noticed that if I set a very large grace value on the join window the 
> problem is solved, but since the input I provide is not out of order I did 
> not expect to need to do that, and I'm weary of the resource requirements 
> doing so in practice on an application with a lot of volume.
> My susp

[GitHub] [kafka] guozhangwang commented on pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2022-01-04 Thread GitBox


guozhangwang commented on pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#issuecomment-1005381862


   Retriggering jenkins tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11631: [WIP] KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode

2022-01-04 Thread GitBox


guozhangwang commented on a change in pull request #11631:
URL: https://github.com/apache/kafka/pull/11631#discussion_r778549345



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -517,15 +517,13 @@ public boolean poll(Timer timer, boolean 
waitForJoinGroup) {
 }
 }
 } else {
-// For manually assigned partitions, if there are no ready nodes, 
await metadata.
+// For manually assigned partitions, if coordinator is unknown, 
make sure we lookup one and await metadata.
 // If connections to all nodes fail, wakeups triggered while 
attempting to send fetch
 // requests result in polls returning immediately, causing a tight 
loop of polls. Without
 // the wakeup, poll() with no channels would block for the 
timeout, delaying re-connection.
-// awaitMetadataUpdate() initiates new connections with configured 
backoff and avoids the busy loop.
-// When group management is used, metadata wait is already 
performed for this scenario as
-// coordinator is unknown, hence this check is not required.
-if (metadata.updateRequested() && 
!client.hasReadyNodes(timer.currentTimeMs())) {
-client.awaitMetadataUpdate(timer);
+// awaitMetadataUpdate() in ensureCoordinatorReady initiates new 
connections with configured backoff and avoids the busy loop.
+if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
+return false;

Review comment:
   I'm wondering if this fix is a bit overkill, e.g. for those consumers 
who do not even set the group ID and do not rely on the coordinator for 
anything, the `coordinatorUnknown()` would always return true while 
`ensureCoordinatorReady` would send a discover coordinator request with `null` 
group id. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13563) FindCoordinatorFuture never get cleared in non-group mode( consumer#assign)

2022-01-04 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17469023#comment-17469023
 ] 

Guozhang Wang commented on KAFKA-13563:
---

Hi [~showuon] thanks for the report, I've reviewed the PR and left some comment.

> FindCoordinatorFuture never get cleared in non-group mode( consumer#assign)
> ---
>
> Key: KAFKA-13563
> URL: https://issues.apache.org/jira/browse/KAFKA-13563
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.7.1, 3.0.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Attachments: kafka.zip
>
>
> In KAFKA-10793, we fix the race condition when lookup coordinator by clearing 
> the _findCoordinatorFuture_ when handling the result, rather than in the 
> listener callbacks. It works well under consumer group mode (i.e. 
> Consumer#subscribe), but we found when user is using non consumer group mode 
> (i.e. Consumer#assign) with group id provided (for offset commitment, so that 
> there will be consumerCoordinator created), the _findCoordinatorFuture_ will 
> never be cleared in some situations, and cause the offset committing keeps 
> getting NOT_COORDINATOR error.
>  
> After KAFKA-10793, we clear the _findCoordinatorFuture_ in 2 places:
>  # heartbeat thread
>  # AbstractCoordinator#ensureCoordinatorReady
> But in non consumer group mode with group id provided, there will be no 
> (1)heartbeat thread , and it only call 
> (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to 
> fetch committed offset position. That is, after 2nd lookupCoordinator call, 
> we have no chance to clear the _findCoordinatorFuture_ .
>  
> To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear 
> the _findCoordinatorFuture_ in the future listener. So, I think we can fix 
> this issue by calling AbstractCoordinator#ensureCoordinatorReady when 
> coordinator unknown in non consumer group case, under each Consumer#poll.
>  
> Reproduce steps:
>  
> 1. Start a 3 Broker cluster with a Topic having Replicas=3.
> 2. Start a Client with Producer and Consumer (with Consumer#assign(), not 
> subscribe, and provide a group id) communicating over the Topic.
> 3. Stop the Broker that is acting as the Group Coordinator.
> 4. Observe successful Rediscovery of new Group Coordinator.
> 5. Restart the stopped Broker.
> 6. Stop the Broker that became the new Group Coordinator at step 4.
> 7. Observe "Rediscovery will be attempted" message but no "Discovered group 
> coordinator" message.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] guozhangwang commented on pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580

2022-01-04 Thread GitBox


guozhangwang commented on pull request #11627:
URL: https://github.com/apache/kafka/pull/11627#issuecomment-1005389524


   Thanks @showuon , I've made a quick pass and it looks promising to me. I'm 
pinging @dajac and @kirktrue who would be the best reviewers for this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11631: [WIP] KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode

2022-01-04 Thread GitBox


showuon commented on pull request #11631:
URL: https://github.com/apache/kafka/pull/11631#issuecomment-1005407357


   @guozhangwang , answer your question below:
   
   > I'm wondering if this fix is a bit overkill, e.g. for those consumers who 
do not even set the group ID and do not rely on the coordinator for anything, 
the coordinatorUnknown() would always return true while ensureCoordinatorReady 
would send a discover coordinator request with null group id.
   
   No, if consumer doesn't provide group id config value (default is null), we 
won't create `consumerCoordinator` in the consumer. That is, if the group id is 
provided, it's either with consumer group management (via Consumer#subscribe), 
or with manual assignment (via Consumer#assign) with offset commit enabled. 
   
   REF: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L775
   
   And before the PR, we will wait for the metadata update if no nodes 
available, to avoid busy loop when in non consumer group mode.
   
   ```java
   if (metadata.updateRequested() && 
!client.hasReadyNodes(timer.currentTimeMs())) {
   client.awaitMetadataUpdate(timer);
   ```
   
   After the change, we did as the group management did, to call 
`ensureCoordinatorReady` when coordinator unknown. And in 
`ensureCoordinatorReady`. This way, we can also make sure to handle the 
`FindCoordinatorFuture` well (and clear it) inside `ensureCoordinatorReady`.
   
   Does that make sense?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #11631: [WIP] KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode

2022-01-04 Thread GitBox


showuon edited a comment on pull request #11631:
URL: https://github.com/apache/kafka/pull/11631#issuecomment-1005407357


   @guozhangwang , thanks for your comment. Answering your question below:
   
   > I'm wondering if this fix is a bit overkill, e.g. for those consumers who 
do not even set the group ID and do not rely on the coordinator for anything, 
the coordinatorUnknown() would always return true while ensureCoordinatorReady 
would send a discover coordinator request with null group id.
   
   No, if consumer doesn't provide group id config value (default is null), we 
won't create `consumerCoordinator` in the consumer. That is, if the group id is 
provided, it's either with consumer group management (via Consumer#subscribe), 
or with manual assignment (via Consumer#assign) with offset commit enabled. 
   
   REF: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L775
   
   And before the PR, we will wait for the metadata update if no nodes 
available, to avoid busy loop when in non consumer group mode.
   
   ```java
   if (metadata.updateRequested() && 
!client.hasReadyNodes(timer.currentTimeMs())) {
   client.awaitMetadataUpdate(timer);
   ```
   
   After the change, we did as the group management did, to call 
`ensureCoordinatorReady` when coordinator unknown. And in 
`ensureCoordinatorReady`. This way, we can also make sure to handle the 
`FindCoordinatorFuture` well (and clear it) inside `ensureCoordinatorReady`.
   
   Does that make sense?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13422) Even if the correct username and password are configured, when ClientBroker or KafkaClient tries to establish a SASL connection to ServerBroker, an exception is thrown:

2022-01-04 Thread RivenSun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13422:
-
Reviewer: Guozhang Wang  (was: Luke Chen)

> Even if the correct username and password are configured, when ClientBroker 
> or KafkaClient tries to establish a SASL connection to ServerBroker, an 
> exception is thrown: (Authentication failed: Invalid username or password)
> --
>
> Key: KAFKA-13422
> URL: https://issues.apache.org/jira/browse/KAFKA-13422
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 2.7.1, 3.0.0
>Reporter: RivenSun
>Priority: Major
> Attachments: CustomerAuthCallbackHandler.java, 
> LoginContext_login_debug.png, SaslClientCallbackHandler_handle_debug.png
>
>
>  
> h1. Foreword:
> When deploying a Kafka cluster with a higher version (2.7.1), I encountered 
> an exception of communication identity authentication failure between 
> brokers. In the current latest version 3.0.0, this problem can also be 
> reproduced.
> h1. Problem recurring:
> h2. 1)broker Version is 3.0.0
> h3. The content of kafka_server_jaas.conf of each broker is exactly the same, 
> the content is as follows:
>  
>  
> {code:java}
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_alice="alice";
>   org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin_scram"
>   password="admin_scram_password";
>  
> };
> {code}
>  
>  
> h3. broker server.properties:
> One of the broker configuration files is provided, and the content of the 
> configuration files of other brokers is only different from the localPublicIp 
> of advertised.listeners.
>  
> {code:java}
> broker.id=1
> broker.rack=us-east-1a
> advertised.listeners=SASL_PLAINTEXT://localPublicIp:9779,SASL_SSL://localPublicIp:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://localPublicIp:9669
> log.dirs=/asyncmq/kafka/data_1,/asyncmq/kafka/data_2
> zookeeper.connect=***
> listeners=SASL_PLAINTEXT://:9779,SASL_SSL://:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://:9669
> listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,PLAIN_PLUGIN_SSL:SASL_SSL
> listener.name.plain_plugin_ssl.plain.sasl.server.callback.handler.class=org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler
> #ssl config
> ssl.keystore.password=***
> ssl.key.password=***
> ssl.truststore.password=***
> ssl.keystore.location=***
> ssl.truststore.location=***
> ssl.client.auth=none
> ssl.endpoint.identification.algorithm=
> #broker communicate config
> #security.inter.broker.protocol=SASL_PLAINTEXT
> inter.broker.listener.name=INTERNAL_SSL
> sasl.mechanism.inter.broker.protocol=PLAIN
> #sasl authentication config
> sasl.kerberos.service.name=kafka
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI
> delegation.token.master.key=***
> delegation.token.expiry.time.ms=8640
> delegation.token.max.lifetime.ms=31536
> {code}
>  
>  
> Then start all brokers at the same time. Each broker has actually been 
> started successfully, but when establishing a connection between the 
> controller node and all brokers, the identity authentication has always 
> failed. The connection between brokers cannot be established normally, 
> causing the entire Kafka cluster to be unable to provide external services.
> h3. The server log keeps printing abnormally like crazy:
> The real ip sensitive information of the broker in the log, I use ** 
> instead of here
>  
> {code:java}
> [2021-10-29 14:16:19,831] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Started socket server acceptors and processors 
> (kafka.network.SocketServer)
> [2021-10-29 14:16:19,836] INFO Kafka version: 3.0.0 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,836] INFO Kafka commitId: 8cb0a5e9d3441962 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,836] INFO Kafka startTimeMs: 1635516979831 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,837] INFO [KafkaServer id=3] started 
> (kafka.server.KafkaServer)
> [2021-10-29 14:16:20,249] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Failed authentication with /** (Authentication failed: Invalid 
> username or password) (org.apache.kafka.common.network.Selector)
> [2021-10-29 14:16:20,680] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Failed authentication with /** (A

[jira] [Commented] (KAFKA-13422) Even if the correct username and password are configured, when ClientBroker or KafkaClient tries to establish a SASL connection to ServerBroker, an exception is thrown

2022-01-04 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17469084#comment-17469084
 ] 

RivenSun commented on KAFKA-13422:
--

Hi [~guozhang]  [~rsivaram]  [~ijuma] 
I think this issue is a problem worth discussing. Can you give any suggestions?
Thanks a lot.

> Even if the correct username and password are configured, when ClientBroker 
> or KafkaClient tries to establish a SASL connection to ServerBroker, an 
> exception is thrown: (Authentication failed: Invalid username or password)
> --
>
> Key: KAFKA-13422
> URL: https://issues.apache.org/jira/browse/KAFKA-13422
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 2.7.1, 3.0.0
>Reporter: RivenSun
>Priority: Major
> Attachments: CustomerAuthCallbackHandler.java, 
> LoginContext_login_debug.png, SaslClientCallbackHandler_handle_debug.png
>
>
>  
> h1. Foreword:
> When deploying a Kafka cluster with a higher version (2.7.1), I encountered 
> an exception of communication identity authentication failure between 
> brokers. In the current latest version 3.0.0, this problem can also be 
> reproduced.
> h1. Problem recurring:
> h2. 1)broker Version is 3.0.0
> h3. The content of kafka_server_jaas.conf of each broker is exactly the same, 
> the content is as follows:
>  
>  
> {code:java}
> KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
>   user_alice="alice";
>   org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin_scram"
>   password="admin_scram_password";
>  
> };
> {code}
>  
>  
> h3. broker server.properties:
> One of the broker configuration files is provided, and the content of the 
> configuration files of other brokers is only different from the localPublicIp 
> of advertised.listeners.
>  
> {code:java}
> broker.id=1
> broker.rack=us-east-1a
> advertised.listeners=SASL_PLAINTEXT://localPublicIp:9779,SASL_SSL://localPublicIp:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://localPublicIp:9669
> log.dirs=/asyncmq/kafka/data_1,/asyncmq/kafka/data_2
> zookeeper.connect=***
> listeners=SASL_PLAINTEXT://:9779,SASL_SSL://:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://:9669
> listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,PLAIN_PLUGIN_SSL:SASL_SSL
> listener.name.plain_plugin_ssl.plain.sasl.server.callback.handler.class=org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler
> #ssl config
> ssl.keystore.password=***
> ssl.key.password=***
> ssl.truststore.password=***
> ssl.keystore.location=***
> ssl.truststore.location=***
> ssl.client.auth=none
> ssl.endpoint.identification.algorithm=
> #broker communicate config
> #security.inter.broker.protocol=SASL_PLAINTEXT
> inter.broker.listener.name=INTERNAL_SSL
> sasl.mechanism.inter.broker.protocol=PLAIN
> #sasl authentication config
> sasl.kerberos.service.name=kafka
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI
> delegation.token.master.key=***
> delegation.token.expiry.time.ms=8640
> delegation.token.max.lifetime.ms=31536
> {code}
>  
>  
> Then start all brokers at the same time. Each broker has actually been 
> started successfully, but when establishing a connection between the 
> controller node and all brokers, the identity authentication has always 
> failed. The connection between brokers cannot be established normally, 
> causing the entire Kafka cluster to be unable to provide external services.
> h3. The server log keeps printing abnormally like crazy:
> The real ip sensitive information of the broker in the log, I use ** 
> instead of here
>  
> {code:java}
> [2021-10-29 14:16:19,831] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Started socket server acceptors and processors 
> (kafka.network.SocketServer)
> [2021-10-29 14:16:19,836] INFO Kafka version: 3.0.0 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,836] INFO Kafka commitId: 8cb0a5e9d3441962 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,836] INFO Kafka startTimeMs: 1635516979831 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,837] INFO [KafkaServer id=3] started 
> (kafka.server.KafkaServer)
> [2021-10-29 14:16:20,249] INFO [SocketServer listenerType=ZK_BROKER, 
> nodeId=3] Failed authentication with /** (Authentication failed: Invalid 
> username or password) (org.apache.kafka.c