[GitHub] [kafka] dajac commented on a change in pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance

2021-08-24 Thread GitBox


dajac commented on a change in pull request #11231:
URL: https://github.com/apache/kafka/pull/11231#discussion_r694568169



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -476,7 +476,6 @@ boolean joinGroupIfNeeded(final Timer timer) {
 else if (!future.isRetriable())
 throw exception;
 
-resetStateAndRejoin(String.format("rebalance failed with 
retriable error %s", exception));

Review comment:
   I have double checked the error handling as well and I do agree with the 
change as all the errors, which requires to rejoin, are explicitly handled in 
the `JoinResponseHandler` and the `SyncGroupResponseHandler`. The change looks 
good to me.
   
   I do agree with @guozhangwang that reseting `rejoinNeeded` would make the 
whole logic a bit cleaner.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13195) StateSerde don't honor DeserializationExceptionHandler

2021-08-24 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403611#comment-17403611
 ] 

Bruno Cadonna commented on KAFKA-13195:
---

[~mjsax] [~tchiotludo] Can we close this ticket?

> StateSerde don't honor DeserializationExceptionHandler
> --
>
> Key: KAFKA-13195
> URL: https://issues.apache.org/jira/browse/KAFKA-13195
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Ludo
>Priority: Major
>
> Kafka streams allow to configure an 
> [DeserializationExceptionHandler|https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling]
>  
> When you are using a StateStore most of message will be a copy of original 
> message in internal topic and mostly will use the same serializer if the 
> message is another type. 
> You can see 
> [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java#L159-L161]
>  that StateSerde is using the raw Deserializer and not honor the 
> {{StreamsConfig.}}{{DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG}}.
> Leading to crash the application (reaching the 
> {{setUncaughtExceptionHandler}} method).
> I think the state store must have the same behavior than the 
> {{RecordDeserializer}} and honor the DeserializationExceptionHandler.
>  
> Stacktrace (coming from kafka stream 2.6.1) :
>  
> {code:java}
> Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing 
> !Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing 
> !org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_14, processor=workertaskjoined-repartition-source, 
> topic=kestra_executor-workertaskjoined-repartition, partition=14, 
> offset=167500, 
> stacktrace=org.apache.kafka.common.errors.SerializationException: 
> com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize 
> value of type `io.kestra.plugin.gcp.bigquery.ExtractToGcs$Format` from String 
> "txt": not one of the values accepted for Enum class: 
> [NEWLINE_DELIMITED_JSON, AVRO, PARQUET, CSV] at [Source: 
> (byte[])"{[truncated 1270 bytes]; line: 1, column: 1187] (through 
> reference chain: 
> io.kestra.core.models.flows.Flow["tasks"]->java.util.ArrayList[1]->org.kestra.task.gcp.bigquery.ExtractToGcs["format"])
>  at 
> com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67)
>  at 
> com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1851)
>  at 
> com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:1079)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer._deserializeAltString(EnumDeserializer.java:327)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer._fromString(EnumDeserializer.java:214)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer.deserialize(EnumDeserializer.java:188)
>  at 
> com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:324)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:225)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:197)
>  at 
> com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:137)
>  at 
> com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:107)
>  at 
> com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:263)
>  at 
> com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:357)
>  at 
> com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244)
>  at 
> com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28)
>  at 
> com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:542)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:565)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:449)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1405)
>  at 
> com.fasterxml.jackson.data

[GitHub] [kafka] dajac merged pull request #11230: KAFKA-12840; Removing `compact` cleaning on a topic should abort on-going compactions

2021-08-24 Thread GitBox


dajac merged pull request #11230:
URL: https://github.com/apache/kafka/pull/11230


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12840) Removing `compact` cleaning on a topic should abort on-going compactions

2021-08-24 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-12840.
-
Fix Version/s: 3.1.0
 Reviewer: Jun Rao
   Resolution: Fixed

> Removing `compact` cleaning on a topic should abort on-going compactions
> 
>
> Key: KAFKA-12840
> URL: https://issues.apache.org/jira/browse/KAFKA-12840
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 3.1.0
>
>
> When `compact` is removed from the `cleanup.policy` of a topic, the 
> compactions of that topic should be aborted.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10038) ConsumerPerformance.scala supports the setting of client.id

2021-08-24 Thread Yanwen Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanwen Lin reassigned KAFKA-10038:
--

Assignee: Yanwen Lin

> ConsumerPerformance.scala supports the setting of client.id
> ---
>
> Key: KAFKA-10038
> URL: https://issues.apache.org/jira/browse/KAFKA-10038
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, core
>Affects Versions: 2.1.1
> Environment: Trunk branch
>Reporter: tigertan
>Assignee: Yanwen Lin
>Priority: Minor
>  Labels: newbie, performance
>
> ConsumerPerformance.scala supports the setting of "client.id", which is a 
> reasonable requirement, and the way "console consumer" and "console producer" 
> handle "client.id" can be unified. "client.id" defaults to 
> "perf-consumer-client".
> We often use client.id in quotas, if the script of 
> kafka-producer-perf-test.sh supports the setting of "client.id" , we can do 
> quota testing through scripts without writing our own consumer programs. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kowshik opened a new pull request #11253: MINOR: Improve local variable name in UnifiedLog.maybeIncrementFirstUnstableOffset

2021-08-24 Thread GitBox


kowshik opened a new pull request #11253:
URL: https://github.com/apache/kafka/pull/11253


   It looked odd that the code has a local variable named 
`updatedFirstStableOffset` which is used to update 
`MergedLog.firstUnstableOffsetMetadata`. This PR improves the local variable 
name to be `updatedFirstUnstableOffset` instead which is more aligned with the 
`MergedLog` attribute being updated.
   
   **Tests:**
   Relying on existing unit & integration tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper

2021-08-24 Thread GitBox


jlprat commented on a change in pull request #11241:
URL: https://github.com/apache/kafka/pull/11241#discussion_r694629688



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
##
@@ -68,6 +70,12 @@ public void testMap() {
 }
 }
 
+@Test
+public void testKeyValueMapperResultNotNull() {
+final KStreamMap supplier = new 
KStreamMap<>((key, value) -> null);
+assertThrows(NullPointerException.class, () -> 
supplier.get().process(new Record<>("K", 0, 0L)));

Review comment:
   Same as previous comment

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
##
@@ -86,4 +88,10 @@ public void testFlatMap() {
 assertEquals(expected[i], 
supplier.theCapturedProcessor().processed().get(i));
 }
 }
+
+@Test
+public void testKeyValueMapperResultNotNull() {
+final KStreamFlatMap supplier = new 
KStreamFlatMap<>((key, value) -> null);
+assertThrows(NullPointerException.class, () -> 
supplier.get().process(new Record<>("K", 0, 0L)));

Review comment:
   As the whole point of this PR is to provide better messages, I would 
also check in the test that the exception has the new enhanced message. 
Something like 
   ```suggestion
   final Record record = new Record<>("K", 0, 0L);
   assertThrows(NullPointerException.class, () -> 
supplier.get().process(record), String.format("KeyValueMapper can't return null 
from mapping the record: %s", record));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #11250: Kafka 12766 - Disabling WAL-related Options in RocksDB

2021-08-24 Thread GitBox


cadonna commented on a change in pull request #11250:
URL: https://github.com/apache/kafka/pull/11250#discussion_r694622105



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##
@@ -249,7 +251,7 @@ public int maxFileOpeningThreads() {
 
 @Override
 public Options setMaxTotalWalSize(final long maxTotalWalSize) {
-dbOptions.setMaxTotalWalSize(maxTotalWalSize);
+LOGGER.warn("WAL is explicitly disabled by Streams in RocksDB. Setting 
option 'maxTotalWalSize' will be ignored");

Review comment:
   Could you please add tests that verify the log messages? You can find an 
example how to verify log messages in 
`KTableSourceTest#kTableShouldLogAndMeterOnSkippedRecords()`.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##
@@ -61,12 +62,13 @@
  *
  * This class do the translation between generic {@link Options} into {@link 
DBOptions} and {@link ColumnFamilyOptions}.
  */
-public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter 
extends Options {
+class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends 
Options {

Review comment:
   I think this is not strictly needed since the constructor is already 
package-private.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##
@@ -304,7 +306,7 @@ public String walDir() {
 
 @Override
 public Options setWalDir(final String walDir) {
-dbOptions.setWalDir(walDir);
+LOGGER.warn("WAL is explicitly disabled by Streams in RocksDB. Setting 
option 'walDir' will be ignored");

Review comment:
   A logging helper method sounds reasonable. IMO the return can stay as it 
is, because we would not save too much. But I do not have strong feelings about 
it.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##
@@ -178,15 +178,15 @@ void openDB(final Map configs, final File 
stateDir) {
 
 // Setup statistics before the database is opened, otherwise the 
statistics are not updated
 // with the measurements from Rocks DB
-maybeSetUpStatistics(configs);
+setStatisticsIfNeeded(configs);
 
 openRocksDB(dbOptions, columnFamilyOptions);
 open = true;
 
 addValueProvidersToMetricsRecorder();
 }
 
-private void maybeSetUpStatistics(final Map configs) {
+private void setStatisticsIfNeeded(final Map configs) {

Review comment:
   `maybeDoSomething()` is also used in a lot of places. So I agree with 
@abbccdda.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11073: KAFKA-13092: Perf regression in LISR requests

2021-08-24 Thread GitBox


dajac commented on pull request #11073:
URL: https://github.com/apache/kafka/pull/11073#issuecomment-904481315


   @jolshan All the builds have failed. Could you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #11127: KAFKA-13134: Give up group metadata lock before sending heartbeat response

2021-08-24 Thread GitBox


dajac merged pull request #11127:
URL: https://github.com/apache/kafka/pull/11127


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13134) Heartbeat Request high lock contention

2021-08-24 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-13134.
-
Fix Version/s: 3.1.0
 Reviewer: David Jacot
   Resolution: Fixed

> Heartbeat Request high lock contention
> --
>
> Key: KAFKA-13134
> URL: https://issues.apache.org/jira/browse/KAFKA-13134
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: David Mao
>Assignee: David Mao
>Priority: Minor
> Fix For: 3.1.0
>
>
> On a cluster with high heartbeat rate, a lock profile showed high contention 
> for the GroupMetadata lock.
> We can significantly reduce this by invoking the response callback outside of 
> the group metadata lock.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13195) StateSerde don't honor DeserializationExceptionHandler

2021-08-24 Thread Ludo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403737#comment-17403737
 ] 

Ludo commented on KAFKA-13195:
--

[~cadonna] : I think there is still some improvement to handle 
SerializationException if the community seems to be interested by the features. 

[~mjsax]: Just a thinking, in my special case, I've a way to catch the 
exception since I use a Custom Transformer with store api (see 
[here|https://github.com/kestra-io/kestra/blob/d21a94ddbd56b1610161451c392afe0829f8f412/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/FlowWithTriggerTransformer.java#L37]),
 but what about a KTable or GlobalKTable ? I think the fetch of the state store 
is done internally and I don't think I have any way to catch the Exception for 
this case ? 

In this case, the special ExceptionHandler can be really helpful, the 
underlying topic data will never change and there is no way to handle it 
properly (maybe a custom deserializer that will emit null but it will log a 
warm as I remember)  

> StateSerde don't honor DeserializationExceptionHandler
> --
>
> Key: KAFKA-13195
> URL: https://issues.apache.org/jira/browse/KAFKA-13195
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Ludo
>Priority: Major
>
> Kafka streams allow to configure an 
> [DeserializationExceptionHandler|https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling]
>  
> When you are using a StateStore most of message will be a copy of original 
> message in internal topic and mostly will use the same serializer if the 
> message is another type. 
> You can see 
> [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java#L159-L161]
>  that StateSerde is using the raw Deserializer and not honor the 
> {{StreamsConfig.}}{{DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG}}.
> Leading to crash the application (reaching the 
> {{setUncaughtExceptionHandler}} method).
> I think the state store must have the same behavior than the 
> {{RecordDeserializer}} and honor the DeserializationExceptionHandler.
>  
> Stacktrace (coming from kafka stream 2.6.1) :
>  
> {code:java}
> Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing 
> !Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing 
> !org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_14, processor=workertaskjoined-repartition-source, 
> topic=kestra_executor-workertaskjoined-repartition, partition=14, 
> offset=167500, 
> stacktrace=org.apache.kafka.common.errors.SerializationException: 
> com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize 
> value of type `io.kestra.plugin.gcp.bigquery.ExtractToGcs$Format` from String 
> "txt": not one of the values accepted for Enum class: 
> [NEWLINE_DELIMITED_JSON, AVRO, PARQUET, CSV] at [Source: 
> (byte[])"{[truncated 1270 bytes]; line: 1, column: 1187] (through 
> reference chain: 
> io.kestra.core.models.flows.Flow["tasks"]->java.util.ArrayList[1]->org.kestra.task.gcp.bigquery.ExtractToGcs["format"])
>  at 
> com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67)
>  at 
> com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1851)
>  at 
> com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:1079)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer._deserializeAltString(EnumDeserializer.java:327)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer._fromString(EnumDeserializer.java:214)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer.deserialize(EnumDeserializer.java:188)
>  at 
> com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:324)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:225)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:197)
>  at 
> com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:137)
>  at 
> com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:107)
>  at 
> com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:263)
>  at 
> com.fasterxml.jackson.databind.deser.std.CollectionDeseri

[GitHub] [kafka] cadonna commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-24 Thread GitBox


cadonna commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r694676118



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##
@@ -1873,7 +1875,13 @@ public OffsetAndMetadata committed(TopicPartition 
partition, final Duration time
 acquireAndEnsureOpen();
 try {
 maybeThrowInvalidGroupIdException();
-Map offsets = 
coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
+final Map offsets;
+long start = time.nanoseconds();
+try {
+offsets = coordinator.fetchCommittedOffsets(partitions, 
time.timer(timeout));
+} finally {
+kafkaConsumerMetrics.recordCommitted(time.nanoseconds() - 
start);
+}

Review comment:
   Could you please add unit tests for this change?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##
@@ -1873,7 +1875,13 @@ public OffsetAndMetadata committed(TopicPartition 
partition, final Duration time
 acquireAndEnsureOpen();
 try {
 maybeThrowInvalidGroupIdException();

Review comment:
   Why do you exclude this check in the measured time here but include it 
above? Similar applies to `offsets.forEach(this::updateLastSeenEpochIfNewer)`.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.Map;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;

Review comment:
   ```suggestion
   import org.apache.kafka.common.MetricName;
   import org.apache.kafka.common.metrics.Metrics;
   import org.apache.kafka.common.metrics.Sensor;
   import org.apache.kafka.common.metrics.stats.CumulativeSum;
   
   import java.util.Map;
   ```

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -699,7 +706,9 @@ public void sendOffsetsToTransaction(Map offs
 throwIfProducerClosed();
 TransactionalRequestResult result = 
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
 sender.wakeup();

Review comment:
   Why are those lines not included in the measurement?

##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##
@@ -905,6 +971,57 @@ public void testSendTxnOffsetsWithGroupId() {
 }
 }
 
+private double getAndAssertDuration(KafkaProducer producer, String 
name, double floor) {
+double value = getMetricValue(producer, name);
+assertTrue(value > floor);
+return value;
+}
+
+@Test
+public void testMeasureTransactionDurations() {
+Map configs = new HashMap<>();
+configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1);
+configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+Time time = new MockTime(1);
+MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+MockClient client = new MockClient(time, metadata);
+client.updateMetadata(initialUpdateResponse);
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some.id", host1));
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.NONE));
+
+try (KafkaProducer producer = kafkaProducer(configs, 
new StringSerializer(),
+new StringSerializer(), metadata, client, null, time)) {
+producer.initTransactions();
+assertTrue(getMetricValue(producer, "txn-init-time-total") > 
99);

Review comment:
   I am not sure I understand this verificati

[GitHub] [kafka] cadonna commented on pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-24 Thread GitBox


cadonna commented on pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#issuecomment-904548251


   The title of the PR should start with the Jira ID, i.e., KAFKA-1234.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-2424) Consider introducing lint-like tool for Scala

2021-08-24 Thread Josep Prat (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josep Prat reassigned KAFKA-2424:
-

Assignee: Josep Prat

> Consider introducing lint-like tool for Scala
> -
>
> Key: KAFKA-2424
> URL: https://issues.apache.org/jira/browse/KAFKA-2424
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Ismael Juma
>Assignee: Josep Prat
>Priority: Major
>  Labels: newbie
>
> Typesafe is working on abide and the first release is expected next month:
> https://github.com/scala/scala-abide
> An alternative is scapegoat:
> https://github.com/sksamuel/scalac-scapegoat-plugin



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jlprat opened a new pull request #11254: KAFKA-2424: Introduce Scalafix linter

2021-08-24 Thread GitBox


jlprat opened a new pull request #11254:
URL: https://github.com/apache/kafka/pull/11254


   Introduces Scalafix, a linter with some rewrite capabilities on
   top (https://scalacenter.github.io/scalafix/)
   
   By running the `checkScalafix` gradle tasks a report is printed with all
   broken rules.
   
   Running `scalafix` gradle task will apply some rewrites on the files,
   for example: import ordering, and annotating return types.
   This change uses the official gradle plugin for Scalafix, and Scalafix
   itself is maintained by the Scala Center.
   
   Current rules checked:
   - Do not allow `final var`
   - Explicit return types for public and protected methods (for public
 `val` and `var` the change was too big. This can be done at a later
   time)
   - Avoid procedure syntax in Scala
   - Avoid use of val in for comprehensions (it's a deprecated feature)
   
   *Known limitations*: when automatically inferring the return types,
   Scalafix might fail and pick a "too wide" type, i.e. `Any` instead of
   the correct one. After letting Scalafix fix the files, contributors
   should still look at the changes done and validate they are correct.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-2424) Consider introducing lint-like tool for Scala

2021-08-24 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403765#comment-17403765
 ] 

Josep Prat commented on KAFKA-2424:
---

I know this is a very old task, but there is still no linter in the build.

I configured and added Scalafix in the submitted PR.

> Consider introducing lint-like tool for Scala
> -
>
> Key: KAFKA-2424
> URL: https://issues.apache.org/jira/browse/KAFKA-2424
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Ismael Juma
>Assignee: Josep Prat
>Priority: Major
>  Labels: newbie
>
> Typesafe is working on abide and the first release is expected next month:
> https://github.com/scala/scala-abide
> An alternative is scapegoat:
> https://github.com/sksamuel/scalac-scapegoat-plugin



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jlprat commented on pull request #11254: KAFKA-2424: Introduce Scalafix linter

2021-08-24 Thread GitBox


jlprat commented on pull request #11254:
URL: https://github.com/apache/kafka/pull/11254#issuecomment-904569475


   @cadonna, @ijuma and @vvcephei if any of you has time to take a look at this 
PR I'd be thankful.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #11254: KAFKA-2424: Introduce Scalafix linter

2021-08-24 Thread GitBox


jlprat commented on a change in pull request #11254:
URL: https://github.com/apache/kafka/pull/11254#discussion_r694776794



##
File path: Jenkinsfile
##
@@ -20,7 +20,7 @@
 def doValidation() {
   sh """
 ./gradlew -PscalaVersion=$SCALA_VERSION clean compileJava compileScala 
compileTestJava compileTestScala \
-spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat \
+spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain 
checkScalafix rat \

Review comment:
   Added the new task in the Jenkins job script

##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1737,11 +1737,11 @@ class KafkaConfig(val props: java.util.Map[_, _], 
doLog: Boolean, dynamicConfigO
   Set.empty[String]
   }
 
-  def interBrokerListenerName = 
getInterBrokerListenerNameAndSecurityProtocol._1
-  def interBrokerSecurityProtocol = 
getInterBrokerListenerNameAndSecurityProtocol._2
-  def controlPlaneListenerName = 
getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, _) => 
listenerName }
-  def controlPlaneSecurityProtocol = 
getControlPlaneListenerNameAndSecurityProtocol.map { case (_, securityProtocol) 
=> securityProtocol }
-  def saslMechanismInterBrokerProtocol = 
getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp)
+  def interBrokerListenerName: ListenerName = 
getInterBrokerListenerNameAndSecurityProtocol._1
+  def interBrokerSecurityProtocol: SecurityProtocol = 
getInterBrokerListenerNameAndSecurityProtocol._2
+  def controlPlaneListenerName: Option[ListenerName] = 
getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, _) => 
listenerName }
+  def controlPlaneSecurityProtocol: Option[SecurityProtocol] = 
getControlPlaneListenerNameAndSecurityProtocol.map { case (_, securityProtocol) 
=> securityProtocol }

Review comment:
   These 2 were incorrectly inferred by Scalafix, I needed to correct them 
manually.
   Scalafix inferred for both `Option[Any]`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #11254: KAFKA-2424: Introduce Scalafix linter

2021-08-24 Thread GitBox


jlprat commented on pull request #11254:
URL: https://github.com/apache/kafka/pull/11254#issuecomment-904572739


   As far as I understand, my jenkinsfile modification will not be taken into 
consideration by Jenkins in this run, maybe it's worth keep a close eye to this 
once it's merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs

2021-08-24 Thread GitBox


dajac commented on a change in pull request #11104:
URL: https://github.com/apache/kafka/pull/11104#discussion_r694816464



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -332,6 +329,9 @@ public FetchRequestData build() {
 iter.remove();
 // Indicate that we no longer want to listen to this 
partition.
 removed.add(topicPartition);
+// If we do not have this topic ID in the session, we can 
not use topic IDs

Review comment:
   nit: `.` at the end of the sentence.

##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -443,17 +443,50 @@ public void testIdUsageRevokedOnIdDowngrade() {
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", 0), Uuid.ZERO_UUID,
+builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
 new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
 // Should have the same session ID and next epoch, but can no 
longer use topic IDs.
 // The receiving broker will close the session if we were 
previously using topic IDs.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
-assertEquals(1, data2.metadata().epoch(), "Did not close session 
when " + testType);
+assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@Test
+public void testIdUsageWithAllForgottenPartitions() {
+// We want to test when all topics are removed from the session
+List useTopicIdsTests = Arrays.asList(true, false);

Review comment:
   It would be great if we could use `@ParameterizedTest` here/

##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -443,17 +443,50 @@ public void testIdUsageRevokedOnIdDowngrade() {
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", 0), Uuid.ZERO_UUID,
+builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
 new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
 // Should have the same session ID and next epoch, but can no 
longer use topic IDs.
 // The receiving broker will close the session if we were 
previously using topic IDs.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
-assertEquals(1, data2.metadata().epoch(), "Did not close session 
when " + testType);
+assertEquals(1, data2.metadata().epoch(), "Did not have correct 
epoch when " + testType);
 assertFalse(data2.canUseTopicIds());
 });
 }
 
+@Test
+public void testIdUsageWithAllForgottenPartitions() {
+// We want to test when all topics are removed from the session
+List useTopicIdsTests = Arrays.asList(true, false);
+useTopicIdsTests.forEach(useTopicIds -> {
+Uuid topicId = useTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID;
+Short respVer = useTopicIds ? ApiKeys.FETCH.latestVersion() : 12;

Review comment:
   nit: `respVer` -> `responseVersion`?

##
File path: 
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java
##
@@ -443,17 +443,50 @@ public void testIdUsageRevokedOnIdDowngrade() {
 
 // Try to remove a topic ID from an existing topic partition (0) 
or add a new topic partition (1) without an ID.
 FetchSessionHandler.Builder builder2 = handler.newBuilder();
-builder2.add(new TopicPartition("foo", 0), Uuid.ZERO_UUID,
+builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID,
 new FetchRequest.PartitionData(10, 110, 210, 
Optional.empty()));
 FetchSessionHandler.FetchRequestData data2 = builder2.build();
 // Should have the same session ID and next epoch, but can no 
longer use topic IDs.
 // The receiving broker will close the session if we were 
previously using topic IDs.
 assertEquals(123, data2.metadata().sessionId(), "Did not use same 
session when " + testType);
-assertEquals(1, data2.metadata().epoch(),

[GitHub] [kafka] dajac commented on a change in pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error

2021-08-24 Thread GitBox


dajac commented on a change in pull request #11086:
URL: https://github.com/apache/kafka/pull/11086#discussion_r694843292



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##
@@ -159,10 +159,11 @@ private void handleError(
 Set groupsToRetry
 ) {
 switch (error) {
-// If the coordinator is in the middle of loading, then we just 
need to retry.
+// If the coordinator is in the middle of loading, or rebalance is 
in progress, then we just need to retry.
 case COORDINATOR_LOAD_IN_PROGRESS:
+case REBALANCE_IN_PROGRESS:
 log.debug("OffsetCommit request for group id {} failed because 
the coordinator" +
-" is still in the process of loading state. Will retry.", 
groupId.idValue);
+" is still in the process of loading state or the group is 
rebalancing. Will retry.", groupId.idValue);

Review comment:
   I wonder if it would be preferable to use a more generic message which 
clearly mention the error encountered: `OffsetCommit request for group id {} 
returned error {}. Will retry.`. Without mentioning the received error, we 
don't really know what happened so the log is not that useful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #11254: KAFKA-2424: Introduce Scalafix linter

2021-08-24 Thread GitBox


jlprat commented on pull request #11254:
URL: https://github.com/apache/kafka/pull/11254#issuecomment-904646319


   Failure was https://issues.apache.org/jira/browse/KAFKA-9897p


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat edited a comment on pull request #11254: KAFKA-2424: Introduce Scalafix linter

2021-08-24 Thread GitBox


jlprat edited a comment on pull request #11254:
URL: https://github.com/apache/kafka/pull/11254#issuecomment-904646319


   Failure was https://issues.apache.org/jira/browse/KAFKA-9897


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat edited a comment on pull request #11254: KAFKA-2424: Introduce Scalafix linter

2021-08-24 Thread GitBox


jlprat edited a comment on pull request #11254:
URL: https://github.com/apache/kafka/pull/11254#issuecomment-904646319


   Failure was https://issues.apache.org/jira/browse/KAFKA-13128


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error

2021-08-24 Thread GitBox


showuon commented on a change in pull request #11086:
URL: https://github.com/apache/kafka/pull/11086#discussion_r694870996



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##
@@ -159,10 +159,11 @@ private void handleError(
 Set groupsToRetry
 ) {
 switch (error) {
-// If the coordinator is in the middle of loading, then we just 
need to retry.
+// If the coordinator is in the middle of loading, or rebalance is 
in progress, then we just need to retry.
 case COORDINATOR_LOAD_IN_PROGRESS:
+case REBALANCE_IN_PROGRESS:
 log.debug("OffsetCommit request for group id {} failed because 
the coordinator" +
-" is still in the process of loading state. Will retry.", 
groupId.idValue);
+" is still in the process of loading state or the group is 
rebalancing. Will retry.", groupId.idValue);

Review comment:
   Make sense to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error

2021-08-24 Thread GitBox


showuon commented on pull request #11086:
URL: https://github.com/apache/kafka/pull/11086#issuecomment-904662073


   > excuse me for the delay
   
   Never mind! :) 
   And thanks for the comment. I've updated the PR. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error

2021-08-24 Thread GitBox


dajac commented on a change in pull request #11086:
URL: https://github.com/apache/kafka/pull/11086#discussion_r694876130



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##
@@ -158,25 +158,24 @@ private void handleError(
 Set groupsToUnmap,
 Set groupsToRetry
 ) {
+final String requestErrorMsg = "OffsetCommit request for group id {} 
returned error {}. Will retry.";

Review comment:
   I would rather keep it inline to stay consistent with the other cases.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##
@@ -158,25 +158,24 @@ private void handleError(
 Set groupsToUnmap,
 Set groupsToRetry
 ) {
+final String requestErrorMsg = "OffsetCommit request for group id {} 
returned error {}. Will retry.";
 switch (error) {
-// If the coordinator is in the middle of loading, then we just 
need to retry.
+// If the coordinator is in the middle of loading, or rebalance is 
in progress, then we just need to retry.
 case COORDINATOR_LOAD_IN_PROGRESS:
-log.debug("OffsetCommit request for group id {} failed because 
the coordinator" +
-" is still in the process of loading state. Will retry.", 
groupId.idValue);
+case REBALANCE_IN_PROGRESS:
+log.debug(requestErrorMsg, groupId.idValue, error);
 groupsToRetry.add(groupId);
 break;
 
 // If the coordinator is not available, then we unmap and retry.
 case COORDINATOR_NOT_AVAILABLE:
 case NOT_COORDINATOR:
-log.debug("OffsetCommit request for group id {} returned error 
{}. Will retry.",
-groupId.idValue, error);
+log.debug(requestErrorMsg, groupId.idValue, error);

Review comment:
   Perhaps, we could say ` Will rediscover the coordinator and retry.` 
in this case. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`

2021-08-24 Thread GitBox


dajac commented on a change in pull request #11225:
URL: https://github.com/apache/kafka/pull/11225#discussion_r694960458



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -2207,15 +2198,28 @@ class ReplicaManager(val config: KafkaConfig,
 InitialFetchState(leaderEndPoint, 
partition.getLeaderEpoch, fetchOffset))
 } else {
   stateChangeLogger.info(
-s"Skipped the become-follower state change after marking 
its partition as " +
+"Skipped the become-follower state change after marking 
its partition as " +
 s"follower for partition $tp with id ${info.topicId} and 
partition state $state."
   )
 }
 }
   }
   changedPartitions.add(partition)
 } catch {
-  case e: Throwable => stateChangeLogger.error(s"Unable to start 
fetching ${tp} " +
+  case e: KafkaStorageException =>
+// If there is an offline log directory, a Partition object may 
have been created by
+// `getOrCreatePartition()` before `createLogIfNotExists()` failed 
to create local replica due
+// to KafkaStorageException. In this case 
`ReplicaManager.allPartitions` will map this topic-partition
+// to an empty Partition object. We need to map this 
topic-partition to OfflinePartition instead.
+markPartitionOffline(tp)

Review comment:
   Good point. I've added few unit tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`

2021-08-24 Thread GitBox


dajac commented on a change in pull request #11225:
URL: https://github.com/apache/kafka/pull/11225#discussion_r694960197



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -2207,15 +2198,28 @@ class ReplicaManager(val config: KafkaConfig,
 InitialFetchState(leaderEndPoint, 
partition.getLeaderEpoch, fetchOffset))
 } else {
   stateChangeLogger.info(
-s"Skipped the become-follower state change after marking 
its partition as " +
+"Skipped the become-follower state change after marking 
its partition as " +
 s"follower for partition $tp with id ${info.topicId} and 
partition state $state."
   )
 }
 }
   }
   changedPartitions.add(partition)
 } catch {
-  case e: Throwable => stateChangeLogger.error(s"Unable to start 
fetching ${tp} " +
+  case e: KafkaStorageException =>
+// If there is an offline log directory, a Partition object may 
have been created by
+// `getOrCreatePartition()` before `createLogIfNotExists()` failed 
to create local replica due
+// to KafkaStorageException. In this case 
`ReplicaManager.allPartitions` will map this topic-partition
+// to an empty Partition object. We need to map this 
topic-partition to OfflinePartition instead.
+markPartitionOffline(tp)
+stateChangeLogger.error(s"Unable to start fetching $tp " +
+  s"with topic ID ${info.topicId} due to a storage error 
${e.getMessage}", e)
+replicaFetcherManager.addFailedPartition(tp)
+error(s"Error while making broker the follower for partition $tp 
in dir " +

Review comment:
   No... I've put it because the other cases have it. I have removed all of 
them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonyanwenl commented on pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper

2021-08-24 Thread GitBox


jasonyanwenl commented on pull request #11241:
URL: https://github.com/apache/kafka/pull/11241#issuecomment-904751684


   > LGTM! Thanks for the fix!
   > One suggestion: please try not to email to the whole dev group to request 
for a code review next time (unless it's an urgent PR). If every PR owner sends 
an email in the dev group for code review, you can imagine how many emails will 
received for dev group members. Please try to mention the possible reviewer's 
names (could be more than one) in the PR, and try again next week ( or next 2 
weeks, or more ) if no response. Hope that helps, and welcome to the Kafka 
community! :)
   
   Hi @showuon, thanks for your suggestion! Actualy when I checked the official 
site "[How to Contribute](https://kafka.apache.org/contributing)", I found it 
said "**Nag us if we aren't doing our job...**" and when I clicked the link 
"**Nag us**", it directly pops up an emailing window. So I guess we may need to 
update that webiste to avoid suggesting people directly sending email to the 
entire dev email group for PR review request (or be more clear that is for 
urgent fix).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 opened a new pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.

2021-08-24 Thread GitBox


splett2 opened a new pull request #11255:
URL: https://github.com/apache/kafka/pull/11255


   ### What
   The controller can skip sending `updateMetadataRequest` during the broker 
failure callback if there are offline partitions and the deleted brokers don't 
host any partitions. Looking at the logic, I'm not sure why the if check is 
checking for partitionsWithOfflineLeader. This seems like a bug which may mean 
we're sending additional `updateMetadataRequests` on broker shutdowns.
   
   ### Testing
   Added an integration test for the failure scenario. The controller 
integration test suite passes locally with my change.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13215) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation

2021-08-24 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson reassigned KAFKA-13215:
--

Assignee: Walker Carlson

> Flaky test 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation
> ---
>
> Key: KAFKA-13215
> URL: https://issues.apache.org/jira/browse/KAFKA-13215
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Konstantine Karantasis
>Assignee: Walker Carlson
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.1.0
>
>
> Integration test {{test 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}}
>  sometimes fails with
> {code:java}
> java.lang.AssertionError: only one task
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:163)
>   at 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation(TaskMetadataIntegrationTest.java:144)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13215) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation

2021-08-24 Thread Walker Carlson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403921#comment-17403921
 ] 

Walker Carlson commented on KAFKA-13215:


[GitHub Pull Request #11083|https://github.com/apache/kafka/pull/11083]

This Pr should take case of this ticket as well

> Flaky test 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation
> ---
>
> Key: KAFKA-13215
> URL: https://issues.apache.org/jira/browse/KAFKA-13215
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Konstantine Karantasis
>Assignee: Walker Carlson
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.1.0
>
>
> Integration test {{test 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}}
>  sometimes fails with
> {code:java}
> java.lang.AssertionError: only one task
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:163)
>   at 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation(TaskMetadataIntegrationTest.java:144)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jasonyanwenl commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper

2021-08-24 Thread GitBox


jasonyanwenl commented on a change in pull request #11241:
URL: https://github.com/apache/kafka/pull/11241#discussion_r695002209



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
##
@@ -86,4 +88,10 @@ public void testFlatMap() {
 assertEquals(expected[i], 
supplier.theCapturedProcessor().processed().get(i));
 }
 }
+
+@Test
+public void testKeyValueMapperResultNotNull() {
+final KStreamFlatMap supplier = new 
KStreamFlatMap<>((key, value) -> null);
+assertThrows(NullPointerException.class, () -> 
supplier.get().process(new Record<>("K", 0, 0L)));

Review comment:
   Thanks for your review! I found we mixed the using of two packages: 
`org.junit.Assert` and `org.junit.jupiter.api.Assertions`.  The method 
`asssertThrows` in those two packages have different api:
   
   For `org.junit.Assert.assertThrows`, the 
[api](https://junit.org/junit4/javadoc/4.13/org/junit/Assert.html#assertThrows(java.lang.String,%20java.lang.Class,%20org.junit.function.ThrowingRunnable))
 is:
   
   ```java
   public static  T assertThrows(String message,
  Class 
expectedThrowable,
  ThrowingRunnable runnable)
   ```
   
   For `org.junit.jupiter.api.Assertions.assertThrows`, the 
[api](https://junit.org/junit5/docs/5.0.1/api/org/junit/jupiter/api/Assertions.html#assertThrows-java.lang.Class-org.junit.jupiter.api.function.Executable-java.lang.String-)
 is:
   ```java
   public static  T assertThrows(Class expectedType,
  Executable executable,
  String message)
   ```
   
   I searched our Kafka code base and found there is [another 
place](https://github.com/apache/kafka/blob/9565a529e08d7aa36beac02c8e6115bcc87d2dc7/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java#L521)
 using the package `org.junit.Assert.assertThrows` to assert exception message 
so I follow that to use the same apii as well here.
   
   Please let me know if this is not proper.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonyanwenl commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper

2021-08-24 Thread GitBox


jasonyanwenl commented on a change in pull request #11241:
URL: https://github.com/apache/kafka/pull/11241#discussion_r695002209



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
##
@@ -86,4 +88,10 @@ public void testFlatMap() {
 assertEquals(expected[i], 
supplier.theCapturedProcessor().processed().get(i));
 }
 }
+
+@Test
+public void testKeyValueMapperResultNotNull() {
+final KStreamFlatMap supplier = new 
KStreamFlatMap<>((key, value) -> null);
+assertThrows(NullPointerException.class, () -> 
supplier.get().process(new Record<>("K", 0, 0L)));

Review comment:
   Thanks for your review! I found we mixed the using of two packages: 
`org.junit.Assert` and `org.junit.jupiter.api.Assertions`.  The method 
`asssertThrows` in those two packages have different api:
   
   For `org.junit.Assert.assertThrows`, the 
[api](https://junit.org/junit4/javadoc/4.13/org/junit/Assert.html#assertThrows(java.lang.String,%20java.lang.Class,%20org.junit.function.ThrowingRunnable))
 is:
   
   ```java
   public static  T assertThrows(String message,
  Class 
expectedThrowable,
  ThrowingRunnable runnable)
   ```
   
   For `org.junit.jupiter.api.Assertions.assertThrows`, the 
[api](https://junit.org/junit5/docs/5.0.1/api/org/junit/jupiter/api/Assertions.html#assertThrows-java.lang.Class-org.junit.jupiter.api.function.Executable-java.lang.String-)
 is:
   ```java
   public static  T assertThrows(Class expectedType,
  Executable executable,
  String message)
   ```
   
   I searched our Kafka code base and found there is [another 
place](https://github.com/apache/kafka/blob/9565a529e08d7aa36beac02c8e6115bcc87d2dc7/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java#L521)
 using the package `org.junit.Assert.assertThrows` to assert exception message 
so I follow that to use the same api as well here.
   
   Please let me know if this is not proper.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13215) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation

2021-08-24 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson resolved KAFKA-13215.

Resolution: Fixed

> Flaky test 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation
> ---
>
> Key: KAFKA-13215
> URL: https://issues.apache.org/jira/browse/KAFKA-13215
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Konstantine Karantasis
>Assignee: Walker Carlson
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.1.0
>
>
> Integration test {{test 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}}
>  sometimes fails with
> {code:java}
> java.lang.AssertionError: only one task
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:163)
>   at 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation(TaskMetadataIntegrationTest.java:144)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jasonyanwenl commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper

2021-08-24 Thread GitBox


jasonyanwenl commented on a change in pull request #11241:
URL: https://github.com/apache/kafka/pull/11241#discussion_r695002209



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
##
@@ -86,4 +88,10 @@ public void testFlatMap() {
 assertEquals(expected[i], 
supplier.theCapturedProcessor().processed().get(i));
 }
 }
+
+@Test
+public void testKeyValueMapperResultNotNull() {
+final KStreamFlatMap supplier = new 
KStreamFlatMap<>((key, value) -> null);
+assertThrows(NullPointerException.class, () -> 
supplier.get().process(new Record<>("K", 0, 0L)));

Review comment:
   @jlprat Thanks for your review! I found we mixed the using of two 
packages: `org.junit.Assert` and `org.junit.jupiter.api.Assertions`.  The 
method `asssertThrows` in those two packages have different api:
   
   For `org.junit.Assert.assertThrows`, the 
[api](https://junit.org/junit4/javadoc/4.13/org/junit/Assert.html#assertThrows(java.lang.String,%20java.lang.Class,%20org.junit.function.ThrowingRunnable))
 is:
   
   ```java
   public static  T assertThrows(String message,
  Class 
expectedThrowable,
  ThrowingRunnable runnable)
   ```
   
   For `org.junit.jupiter.api.Assertions.assertThrows`, the 
[api](https://junit.org/junit5/docs/5.0.1/api/org/junit/jupiter/api/Assertions.html#assertThrows-java.lang.Class-org.junit.jupiter.api.function.Executable-java.lang.String-)
 is:
   ```java
   public static  T assertThrows(Class expectedType,
  Executable executable,
  String message)
   ```
   
   I searched our Kafka code base and found there is [another 
place](https://github.com/apache/kafka/blob/9565a529e08d7aa36beac02c8e6115bcc87d2dc7/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java#L521)
 using the package `org.junit.Assert.assertThrows` to assert exception message 
so I follow that to use the same api as well here.
   
   Please let me know if this is not proper.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

2021-08-24 Thread GitBox


wcarlson5 commented on pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#issuecomment-904782977


   @kkonstantine I saw you made a ticket for a failing test. I think this PR 
should fix it for 3.0 as well if cherrypicked


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 edited a comment on pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests

2021-08-24 Thread GitBox


wcarlson5 edited a comment on pull request #11083:
URL: https://github.com/apache/kafka/pull/11083#issuecomment-904782977


   @kkonstantine I saw you made a ticket for a failing test. I think this PR 
should fix it for 3.0 as well if cherrypicked.
   https://issues.apache.org/jira/browse/KAFKA-13215


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #11246: MINOR: Improve controlled shutdown logging

2021-08-24 Thread GitBox


hachikuji merged pull request #11246:
URL: https://github.com/apache/kafka/pull/11246


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance

2021-08-24 Thread GitBox


hachikuji commented on a change in pull request #11231:
URL: https://github.com/apache/kafka/pull/11231#discussion_r695012453



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -476,7 +476,6 @@ boolean joinGroupIfNeeded(final Timer timer) {
 else if (!future.isRetriable())
 throw exception;
 
-resetStateAndRejoin(String.format("rebalance failed with 
retriable error %s", exception));

Review comment:
   Thanks for the suggestion. Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper

2021-08-24 Thread GitBox


jlprat commented on a change in pull request #11241:
URL: https://github.com/apache/kafka/pull/11241#discussion_r695017818



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
##
@@ -86,4 +88,10 @@ public void testFlatMap() {
 assertEquals(expected[i], 
supplier.theCapturedProcessor().processed().get(i));
 }
 }
+
+@Test
+public void testKeyValueMapperResultNotNull() {
+final KStreamFlatMap supplier = new 
KStreamFlatMap<>((key, value) -> null);
+assertThrows(NullPointerException.class, () -> 
supplier.get().process(new Record<>("K", 0, 0L)));

Review comment:
   Actually I was wrong with my code suggestion, the overloaded method 
doesn't do what I expected it to do.
   The message it takes is only to print if the assertion fails.
   
   That should do the trick!
   ```
   final Throwable throwable = assertThrows(String.format("KeyValueMapper can't 
return null from mapping the record: %s", record), NullPointerException.class, 
() -> supplier.get().process(record));
   assertEquals(throwable.getMessage(), String.format("KeyValueMapper 
can't return null from mapping the record: %s", record));
   ```
   Sorry about the confusion I brought with the first comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper

2021-08-24 Thread GitBox


jlprat commented on a change in pull request #11241:
URL: https://github.com/apache/kafka/pull/11241#discussion_r695019986



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
##
@@ -86,4 +88,12 @@ public void testFlatMap() {
 assertEquals(expected[i], 
supplier.theCapturedProcessor().processed().get(i));
 }
 }
+
+@Test
+public void testKeyValueMapperResultNotNull() {
+final KStreamFlatMap supplier = new 
KStreamFlatMap<>((key, value) -> null);
+final Record record = new Record<>("K", 0, 0L);
+assertThrows(String.format("KeyValueMapper can't return null from 
mapping the record: %s", record),
+NullPointerException.class, () -> 
supplier.get().process(record));

Review comment:
   Just copying over the suggestion to here, so it's easy to find
   
   ```suggestion
  final Throwable throwable = assertThrows(NullPointerException.class, 
() -> supplier.get().process(record));
  assertEquals(throwable.getMessage(), String.format("KeyValueMapper 
can't return null from mapping the record: %s", record));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper

2021-08-24 Thread GitBox


jlprat commented on a change in pull request #11241:
URL: https://github.com/apache/kafka/pull/11241#discussion_r695017818



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
##
@@ -86,4 +88,10 @@ public void testFlatMap() {
 assertEquals(expected[i], 
supplier.theCapturedProcessor().processed().get(i));
 }
 }
+
+@Test
+public void testKeyValueMapperResultNotNull() {
+final KStreamFlatMap supplier = new 
KStreamFlatMap<>((key, value) -> null);
+assertThrows(NullPointerException.class, () -> 
supplier.get().process(new Record<>("K", 0, 0L)));

Review comment:
   Actually I was wrong with my code suggestion, the overloaded method 
doesn't do what I expected it to do.
   The message it takes is only to print if the assertion fails.
   
   That should do the trick!
   ```
   final Throwable throwable = assertThrows(NullPointerException.class, 
() -> supplier.get().process(record));
   assertEquals(throwable.getMessage(), String.format("KeyValueMapper 
can't return null from mapping the record: %s", record));
   ```
   Sorry about the confusion I brought with the first comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #11073: KAFKA-13092: Perf regression in LISR requests

2021-08-24 Thread GitBox


jolshan commented on pull request #11073:
URL: https://github.com/apache/kafka/pull/11073#issuecomment-904805600


   Looks like an issue with more recent changes. I'll fix it up


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jasonyanwenl commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper

2021-08-24 Thread GitBox


jasonyanwenl commented on a change in pull request #11241:
URL: https://github.com/apache/kafka/pull/11241#discussion_r695031580



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
##
@@ -86,4 +88,10 @@ public void testFlatMap() {
 assertEquals(expected[i], 
supplier.theCapturedProcessor().processed().get(i));
 }
 }
+
+@Test
+public void testKeyValueMapperResultNotNull() {
+final KStreamFlatMap supplier = new 
KStreamFlatMap<>((key, value) -> null);
+assertThrows(NullPointerException.class, () -> 
supplier.get().process(new Record<>("K", 0, 0L)));

Review comment:
   @jlprat  I see. I didn't notice this, either LOL. Fixed!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #11238: MINOR: Fix force kill of KRaft colocated controllers in system tests

2021-08-24 Thread GitBox


rondagostino commented on pull request #11238:
URL: https://github.com/apache/kafka/pull/11238#issuecomment-904824877


   > we can have ceil method to use, which will be easier 
   
   Good suggestion!  Done.  Here's some system tests runs that show the new 
code is syntactically correct (and I observed the correct kill behavior).
   
   ```
   test_id:
kafkatest.sanity_checks.test_bounce.TestBounce.test_simple_run.metadata_quorum=COLOCATED_KRAFT.quorum_size=1
   status: PASS
   run time:   1 minute 49.278 seconds
   

   test_id:
kafkatest.sanity_checks.test_bounce.TestBounce.test_simple_run.metadata_quorum=COLOCATED_KRAFT.quorum_size=3
   status: PASS
   run time:   2 minutes 6.602 seconds
   ```
   
   Thank for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #11047: MINOR: Remove unnecessary code for WindowStoreBuilder.

2021-08-24 Thread GitBox


abbccdda commented on a change in pull request #11047:
URL: https://github.com/apache/kafka/pull/11047#discussion_r695050765



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
##
@@ -36,7 +36,6 @@ public WindowStoreBuilder(final WindowBytesStoreSupplier 
storeSupplier,
   final Serde valueSerde,
   final Time time) {
 super(storeSupplier.name(), keySerde, valueSerde, time);
-Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");

Review comment:
   @tang7526 Thanks, could we do sth similar to 
https://github.com/apache/kafka/blob/d30b4e51513e6938970020bf1e894c983447ef8f/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java#L36
 here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance

2021-08-24 Thread GitBox


ijuma commented on pull request #11231:
URL: https://github.com/apache/kafka/pull/11231#issuecomment-904843864


   @guozhangwang @dajac does the last commit from Jason look good?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11149: KAFKA-1234: KIP-761, add total blocked time metric to streams

2021-08-24 Thread GitBox


guozhangwang commented on pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#issuecomment-904852557


   > The title of the PR should start with the Jira ID, i.e., KAFKA-1234.
   
   Just to explain the context here, we have a browser plugin for AK tickets 
which can re-direct from PR directly to the ticket URL, but that script relies 
on the PR title to follow the pattern of `KAFKA-1234: blah blah`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang edited a comment on pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-24 Thread GitBox


guozhangwang edited a comment on pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#issuecomment-904852557


   > The title of the PR should start with the Jira ID, i.e., KAFKA-1234.
   
   Just to explain the context here, we have a browser plugin for AK tickets 
which can re-direct from PR directly to the ticket URL, but that script relies 
on the PR title to follow the pattern of `KAFKA-1234: blah blah`. For this KIP 
probably we have not created a JIRA ticket yet?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance

2021-08-24 Thread GitBox


guozhangwang commented on pull request #11231:
URL: https://github.com/apache/kafka/pull/11231#issuecomment-904853346


   Yup, I've approved it yesterday night.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance

2021-08-24 Thread GitBox


ijuma commented on pull request #11231:
URL: https://github.com/apache/kafka/pull/11231#issuecomment-904853838


   The commit metadata said it was added 1 hour ago and hence why I asked :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance

2021-08-24 Thread GitBox


guozhangwang commented on pull request #11231:
URL: https://github.com/apache/kafka/pull/11231#issuecomment-904860153


   Yeah I reviewed that last commit as well, btw it is not a correctness issue 
anyways so I was happy to merge as-is even :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance

2021-08-24 Thread GitBox


hachikuji merged pull request #11231:
URL: https://github.com/apache/kafka/pull/11231


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13214) Consumer should not reset group state after disconnect

2021-08-24 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13214.
-
Fix Version/s: 2.8.1
   2.7.2
   3.0.0
   Resolution: Fixed

> Consumer should not reset group state after disconnect
> --
>
> Key: KAFKA-13214
> URL: https://issues.apache.org/jira/browse/KAFKA-13214
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.0, 2.8.0
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 3.0.0, 2.7.2, 2.8.1
>
>
> When the consumer disconnects from the coordinator while a rebalance is in 
> progress, we currently reset the memberId and generation. The coordinator 
> then must await the session timeout in order to expire the old memberId. This 
> was apparently a regression from 
> https://github.com/apache/kafka/commit/7e7bb184d2abe34280a7f0eb0f0d9fc0e32389f2#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R478.
>  It would be better to keep the memberId/generation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino opened a new pull request #11256: KAFKA-13224: Expose broker.id and node.id in config originals map

2021-08-24 Thread GitBox


rondagostino opened a new pull request #11256:
URL: https://github.com/apache/kafka/pull/11256


   Plugins may expect `broker.id` to exist as a key in the config's various 
originals()-related maps, but with KRaft we rely solely on `node.id` for the 
broker's ID, and with the Zk-based brokers we provide the option to specify 
`node.id` in addition to (or as a full replacement for) `broker.id`.  There are 
multiple problems related to this switch to `node.id`:
   
   - We do not enforce consistency between explicitly-specified `broker.id` and 
`node.id` properties in the config – it is entirely possible right now that we 
could set `broker.id=0` and also set `node.id=1`, and the broker will use 1 for 
it's ID. This is confusing at best; the broker should detect this inconsistency 
and fail to start with a ConfigException.
   - When `node.id` is set, both that value and any explicitly-set `broker.id` 
value will exist in the config's **originals()-related maps**. Downstream 
components are often configured based on these maps, and they may ask for the 
`broker.id`, so downstream components may be misconfigured if the values 
differ, or they may fail during configuration if no `broker.id` key exists in 
the map at all.
   - The config's **values()-related maps** will contain either the 
explicitly-specified `broker.id` value or the default value of -1. When 
`node.id` is set, both that value (which cannot be negative) and the 
(potentially -1) `broker.id` value will exist in the config's values()-related 
maps. Downstream components are often configured based on these maps, and they 
may ask for the `broker.id`, so downstream components may be misconfigured if 
the `broker.id` value differs from the broker's true ID.
   
   The broker should detect inconsistency between explicitly-specified 
`broker.id` and `node.id` values and fail startup accordingly. It should also 
ensures that the config's originals()- and values()-related maps contain the 
same mapped values for both `broker.id` and `node.id` keys when at least one is 
specified.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #11256: KAFKA-13224: Expose consistent broker.id and node.id in config values/originals maps

2021-08-24 Thread GitBox


rondagostino commented on a change in pull request #11256:
URL: https://github.com/apache/kafka/pull/11256#discussion_r695176785



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1405,21 +1405,95 @@ class KafkaConfig(val props: java.util.Map[_, _], 
doLog: Boolean, dynamicConfigO
   // We make it part of each instance rather than the object to facilitate 
testing.
   private val zkClientConfigViaSystemProperties = new ZKClientConfig()
 
-  override def originals: util.Map[String, AnyRef] =
-if (this eq currentConfig) super.originals else currentConfig.originals
+  private def maybeMutateOriginalsToMakeBrokerIdAndNodeIdMatch(map: 
util.Map[String, AnyRef]): util.Map[String, AnyRef] = {

Review comment:
   This is pretty much the same as 
`maybeMutateOriginalsStringsToMakeBrokerIdAndNodeIdMatch()` below, but I was 
unable to consolidate them into a single method due to type-related errors.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #11073: KAFKA-13092: Perf regression in LISR requests

2021-08-24 Thread GitBox


jolshan commented on pull request #11073:
URL: https://github.com/apache/kafka/pull/11073#issuecomment-905024994


   Fixed my issue locally but looks like 2.8 branch is broken in general.
   ```
   > Task :streams:compileTestJava
   
/Users/jolshan/kafka/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java:73:
 error: stop() has private access in EmbeddedKafkaCluster
   CLUSTER.stop();
  ^
   1 error
   ```
   
   Will try to contact relevant person.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #11248: HOTFIX: Fix null pointer when getting metric value in MetricsReporter

2021-08-24 Thread GitBox


jolshan commented on pull request #11248:
URL: https://github.com/apache/kafka/pull/11248#issuecomment-905028262


   Hi. This PR seemed to break the 2.8 branch. When I try to run checkstyle I 
see:
   ```
   > Task :streams:compileTestJava
   
/Users/jolshan/kafka/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java:73:
 error: stop() has private access in EmbeddedKafkaCluster
   CLUSTER.stop();
   ```
   
   I believe that file was added in this commit. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft

2021-08-24 Thread GitBox


jsancio commented on a change in pull request #11186:
URL: https://github.com/apache/kafka/pull/11186#discussion_r687906243



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -90,8 +90,7 @@ class BrokerServer(
 
   this.logIdent = logContext.logPrefix
 
-  val lifecycleManager: BrokerLifecycleManager =
-new BrokerLifecycleManager(config, time, threadNamePrefix)
+  private var lifecycleManager: BrokerLifecycleManager = null

Review comment:
   Why did we move this to `startup`? We seem to check for `null` in the 
`shutdown` method for a few of these fields, should do the same for this field?

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
##
@@ -103,20 +103,22 @@ public AbstractResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
 ApiError apiError = ApiError.fromThrowable(e);
 List electionResults = new ArrayList<>();
 
-for (TopicPartitions topic : data.topicPartitions()) {
-ReplicaElectionResult electionResult = new ReplicaElectionResult();
+if (data.topicPartitions() != null) {

Review comment:
   When is `data.topicPartitions()` `null`?

##
File path: core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
##
@@ -86,44 +79,22 @@ abstract class ZooKeeperTestHarness extends Logging {
 object ZooKeeperTestHarness {
   val ZkClientEventThreadSuffix = "-EventThread"
 
-  // Threads which may cause transient failures in subsequent tests if not 
shutdown.
-  // These include threads which make connections to brokers and may cause 
issues
-  // when broker ports are reused (e.g. auto-create topics) as well as threads
-  // which reset static JAAS configuration.

Review comment:
   I see. Maybe move this comment to the test utility function you created.

##
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##
@@ -1914,4 +1933,26 @@ object TestUtils extends Logging {
 )
   }
 
+  def verifyNoUnexpectedThreads(context: String): Unit = {
+val unexpectedThreadNames = Set(
+  ControllerEventManager.ControllerEventThreadName,
+  KafkaProducer.NETWORK_THREAD_PREFIX,
+  AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
+  AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
+  ZooKeeperTestHarness.ZkClientEventThreadSuffix
+)

Review comment:
   Is there some insight into why this specific set of threads?

##
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##
@@ -1914,4 +1933,26 @@ object TestUtils extends Logging {
 )
   }
 
+  def verifyNoUnexpectedThreads(context: String): Unit = {
+val unexpectedThreadNames = Set(
+  ControllerEventManager.ControllerEventThreadName,
+  KafkaProducer.NETWORK_THREAD_PREFIX,
+  AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
+  AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
+  ZooKeeperTestHarness.ZkClientEventThreadSuffix
+)
+
+def unexpectedThreads: Set[String] = {
+  val allThreads = Thread.getAllStackTraces.keySet.asScala.map(thread => 
thread.getName)
+  allThreads.filter(t => unexpectedThreadNames.exists(s => 
t.contains(s))).toSet
+}
+
+def printUnexpectedThreads: String = {
+  val unexpected = unexpectedThreads
+  s"Found ${unexpected.size} unexpected threads during $context: 
${unexpected.mkString("`", ",", "`")}"
+}
+
+TestUtils.waitUntilTrue(() => unexpectedThreads.isEmpty, 
printUnexpectedThreads)

Review comment:
   Probably unlikely to cause any issues in the thread but the set of 
threads check is different from the set of threads printed. Maybe we can use 
`computeUntilTrue`.

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -123,8 +123,8 @@
 public class ReplicationControlManager {
 
 static class TopicControlInfo {
-private final String name;
-private final Uuid id;
+final String name;
+final Uuid id;

Review comment:
   How about adding access methods instead? The nice thing about access 
method is that we can easily use them as functions. E.g. `Optional id = 
maybeTopicControlInfo.map(TopicControlInfo::id);`

##
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##
@@ -1622,19 +1623,37 @@ object TestUtils extends Logging {
 waitForLeaderToBecome(client, topicPartition, None)
   }
 
-  def waitForLeaderToBecome(client: Admin, topicPartition: TopicPartition, 
leader: Option[Int]): Unit = {
+  def waitForOnlineBroker(client: Admin, brokerId: Int): Unit = {
+waitUntilTrue(() => {
+  val nodes = client.describeCluster().nodes().get()
+  nodes.asScala.exists(_.id == brokerId)
+}, s"Timed out waiting for brokerId $brokerId to come online")
+  }
+
+  def waitForLeaderToBecome(
+client: Admin,
+topicPartition: TopicPartition,
+expe

[jira] [Updated] (KAFKA-13010) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()

2021-08-24 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13010:
---
Fix Version/s: (was: 3.1.0)
   3.0.0

> Flaky test 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()
> ---
>
> Key: KAFKA-13010
> URL: https://issues.apache.org/jira/browse/KAFKA-13010
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Walker Carlson
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.0.0
>
> Attachments: 
> TaskMetadataIntegrationTest#shouldReportCorrectEndOffsetInformation.rtf
>
>
> Integration test {{test 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}}
>  sometimes fails with
> {code:java}
> java.lang.AssertionError: only one task
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:162)
>   at 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation(TaskMetadataIntegrationTest.java:117)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13215) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation

2021-08-24 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13215:
---
Fix Version/s: (was: 3.1.0)
   3.0.0

> Flaky test 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation
> ---
>
> Key: KAFKA-13215
> URL: https://issues.apache.org/jira/browse/KAFKA-13215
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Konstantine Karantasis
>Assignee: Walker Carlson
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> Integration test {{test 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}}
>  sometimes fails with
> {code:java}
> java.lang.AssertionError: only one task
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:163)
>   at 
> org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation(TaskMetadataIntegrationTest.java:144)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #11248: HOTFIX: Fix null pointer when getting metric value in MetricsReporter

2021-08-24 Thread GitBox


ableegoldman commented on pull request #11248:
URL: https://github.com/apache/kafka/pull/11248#issuecomment-905084553


   @jolshan ah, whoops. Thanks for the heads up.  I'll get a patch ready


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #11257: HOTFIX: fix backport of #11248 by future-proofing the EmbeddedKafkaCluster

2021-08-24 Thread GitBox


ableegoldman opened a new pull request #11257:
URL: https://github.com/apache/kafka/pull/11257


   A backport of #11248 broke the 2.8 build due to usage of the 
`EmbeddedKafkaCluster#stop` method, which used to be private. It seems we made 
this public when we upgraded to JUnit5 on the 3.0 branch and had to remove the 
ExternalResource that was previously responsible for calling `start()` and 
`stop()` for this class using the no-longer-available `@ClassRule` annotation. 
   
   Rather than adapt this test to the 2.8 style by migrating it to use 
`@ClassRule` as well, I opted to just make the `stop() method public as well 
(since its analogue `start()` has always been public anyways). This should 
hopefully prevent any future backports that include integration tests from 
having to manually go in and adapt the test, or accidentally break the build as 
happened here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11257: HOTFIX: fix backport of #11248 by future-proofing the EmbeddedKafkaCluster

2021-08-24 Thread GitBox


ableegoldman commented on pull request #11257:
URL: https://github.com/apache/kafka/pull/11257#issuecomment-905096193


   @jolshan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11248: HOTFIX: Fix null pointer when getting metric value in MetricsReporter

2021-08-24 Thread GitBox


ableegoldman commented on pull request #11248:
URL: https://github.com/apache/kafka/pull/11248#issuecomment-905097291


   Fix is available here: https://github.com/apache/kafka/pull/11257
   
   This also got me digging into why we had to make `stop()` public in 3.0, and 
seem to now be required to manually invoke both `start()` and `stop()` in every 
single integration test...which is incredibly error prone (or rather, 
resource-leakage prone). I fixed the 2.8 build in a way that should prevent 
future backports from breaking the 2.8 branch specifically, but I'm going to 
look into just improving how we do this in 3.0+ so we don't need to worry about 
breaking older branches than 2.8, or leaking resources by forgetting to clean 
up this cluster... 😬 
   
   Anyways, thanks very much for bringing this up!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9858: KAFKA-12173 Migrate streams:streams-scala module to JUnit 5

2021-08-24 Thread GitBox


ableegoldman commented on a change in pull request #9858:
URL: https://github.com/apache/kafka/pull/9858#discussion_r695328381



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -75,9 +76,19 @@
 @Category(IntegrationTest.class)
 public class AdjustStreamThreadCountTest {
 
-@ClassRule
 public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();

Review comment:
   Hey @chia7712 , sorry for never bothering to take a look at this PR 
until just now, but I had a question about this. I know it was necessary to 
remove the `ExternalResource` feature that used to be responsible for calling 
`start()` and `stop()` for us in the integration tests since `@ClassRule` was 
removed in JUnit5, but that was really quite a loss since this now leaves us 
vulnerable to
   1) resource leaks due to forgetting to clean up the EmbeddedKafkaCluster in 
an integration test, or doing so in an incorrect way (eg such that a test 
failure might skip the cleanup stage, a mistake that we've certainly 
encountered in our tests in the past)
   2) breaking compatibility of integration tests across older branches, so 
that if we ever need to backport a fix that includes an integration test -- 
which many will/should do -- we can easily break the build of older branches by 
accident. See for example [#11257](https://github.com/apache/kafka/pull/11257): 
aka the reason I started digging into this 🙂 . Even if we remember to fix this 
during the backport, it's just an extra hassle.
   
   Now I'm certainly not an expert in all things JUnit, but a cursory glance 
suggests we can replicate the old behavior in which the EmbeddedKafkaCluster is 
automatically started/stopped without the need for this `@Before/AfterClass` 
boilerplate code in every integration test. I believe it's possible to do so 
using the `@Extension/ExtendWith` annotations. Can we try to port the 
EmbeddedKafkaCluster back to an automated lifecycle with these so we can clean 
up the Streams integration tests?
   
   cc @ijuma @vvcephei who may be more familiar with these constructs and 
how/when/why to use them




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #11073: KAFKA-13092: Perf regression in LISR requests

2021-08-24 Thread GitBox


jolshan commented on pull request #11073:
URL: https://github.com/apache/kafka/pull/11073#issuecomment-905114876


   fix here: https://github.com/apache/kafka/pull/11257
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-24 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r695341815



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java
##
@@ -63,6 +66,18 @@ public KafkaConsumerMetrics(Metrics metrics, String 
metricGrpPrefix) {
 metricGroupName,
 "The average fraction of time the consumer's poll() is idle as 
opposed to waiting for the user code to process records."),
 new Avg());
+
+this.commitSyncSensor = metrics.sensor("commit-sync-time-total");
+this.commitSyncSensor.add(
+metrics.metricName("commit-sync-time-total", metricGroupName),
+new CumulativeSum()
+);
+
+this.committedSensor = metrics.sensor("committed-time-total");
+this.committedSensor.add(
+metrics.metricName("committed-time-total", metricGroupName),
+new CumulativeSum()
+);

Review comment:
   ack - will defer to a follow-up PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-24 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r695342570



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##
@@ -905,6 +971,57 @@ public void testSendTxnOffsetsWithGroupId() {
 }
 }
 
+private double getAndAssertDuration(KafkaProducer producer, String 
name, double floor) {
+double value = getMetricValue(producer, name);
+assertTrue(value > floor);
+return value;
+}
+
+@Test
+public void testMeasureTransactionDurations() {
+Map configs = new HashMap<>();
+configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
+configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1);
+configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+Time time = new MockTime(1);
+MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+MockClient client = new MockClient(time, metadata);
+client.updateMetadata(initialUpdateResponse);
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some.id", host1));
+client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.NONE));
+
+try (KafkaProducer producer = kafkaProducer(configs, 
new StringSerializer(),
+new StringSerializer(), metadata, client, null, time)) {
+producer.initTransactions();
+assertTrue(getMetricValue(producer, "txn-init-time-total") > 
99);

Review comment:
   I'm verifying that something was measured and that it's at least 1 tick 
of the clock. The clock is shared between multiple threads (e.g. the io 
threads) so the number of ticks depends  on what threads get scheduled while 
we're in `initTransactions`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error

2021-08-24 Thread GitBox


showuon commented on a change in pull request #11086:
URL: https://github.com/apache/kafka/pull/11086#discussion_r695353891



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java
##
@@ -158,25 +158,24 @@ private void handleError(
 Set groupsToUnmap,
 Set groupsToRetry
 ) {
+final String requestErrorMsg = "OffsetCommit request for group id {} 
returned error {}. Will retry.";
 switch (error) {
-// If the coordinator is in the middle of loading, then we just 
need to retry.
+// If the coordinator is in the middle of loading, or rebalance is 
in progress, then we just need to retry.
 case COORDINATOR_LOAD_IN_PROGRESS:
-log.debug("OffsetCommit request for group id {} failed because 
the coordinator" +
-" is still in the process of loading state. Will retry.", 
groupId.idValue);
+case REBALANCE_IN_PROGRESS:
+log.debug(requestErrorMsg, groupId.idValue, error);
 groupsToRetry.add(groupId);
 break;
 
 // If the coordinator is not available, then we unmap and retry.
 case COORDINATOR_NOT_AVAILABLE:
 case NOT_COORDINATOR:
-log.debug("OffsetCommit request for group id {} returned error 
{}. Will retry.",
-groupId.idValue, error);
+log.debug(requestErrorMsg, groupId.idValue, error);

Review comment:
   Good to me. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error

2021-08-24 Thread GitBox


showuon commented on pull request #11086:
URL: https://github.com/apache/kafka/pull/11086#issuecomment-905139875


   @dajac , thanks for the comment. I've updated the PR. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #11253: MINOR: Improve local variable name in UnifiedLog.maybeIncrementFirstUnstableOffset

2021-08-24 Thread GitBox


kowshik commented on pull request #11253:
URL: https://github.com/apache/kafka/pull/11253#issuecomment-905142894


   cc @junrao @ijuma for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11257: HOTFIX: fix backport of #11248 by future-proofing the EmbeddedKafkaCluster

2021-08-24 Thread GitBox


ableegoldman commented on pull request #11257:
URL: https://github.com/apache/kafka/pull/11257#issuecomment-905150624


   Failures are unrelated are known to be flaky on older branches 
(`connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining`)
  -- will merge to unblock the 2.8 build


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #11257: HOTFIX: fix backport of #11248 by future-proofing the EmbeddedKafkaCluster

2021-08-24 Thread GitBox


ableegoldman merged pull request #11257:
URL: https://github.com/apache/kafka/pull/11257


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10038) ConsumerPerformance.scala supports the setting of client.id

2021-08-24 Thread Yanwen Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404165#comment-17404165
 ] 

Yanwen Lin commented on KAFKA-10038:


Hi [~tigertan], I'm working on this issue and trying to understand the 
requirement here:
 * Seems like for any of the *ConsumerPerformance*,  ProducerPerformance, 
ConsoleConsumer, ConsoleProducer, they already accept the property *client.id*. 
We can add this into a config file and send it via a option like 
consumer.config/producer.config (tried and this works). *So do you mean this 
task is more for: If no client.id is set, we should give a default one.*
 * You also mentioned that we can unify the way ConsoleConsumer and 
ConsoleProducer are handling the client.id. What do you mean by handling? Since 
only ConsoleProducer will set a default client.id. *By "unify", do you mean we 
should give default client.id in ConsoleConsumer?*
 * Why providing client.id can help do the quota testing? What is the 
connection btw those two?

Hi [~showuon], since this is a simple change (provide default if not set 
client.id), *may I ask why do we need a KIP here?*

Thanks!

> ConsumerPerformance.scala supports the setting of client.id
> ---
>
> Key: KAFKA-10038
> URL: https://issues.apache.org/jira/browse/KAFKA-10038
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, core
>Affects Versions: 2.1.1
> Environment: Trunk branch
>Reporter: tigertan
>Assignee: Yanwen Lin
>Priority: Minor
>  Labels: newbie, performance
>
> ConsumerPerformance.scala supports the setting of "client.id", which is a 
> reasonable requirement, and the way "console consumer" and "console producer" 
> handle "client.id" can be unified. "client.id" defaults to 
> "perf-consumer-client".
> We often use client.id in quotas, if the script of 
> kafka-producer-perf-test.sh supports the setting of "client.id" , we can do 
> quota testing through scripts without writing our own consumer programs. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12933) Flaky test ReassignPartitionsIntegrationTest.testReassignmentWithAlterIsrDisabled

2021-08-24 Thread Yanwen Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404166#comment-17404166
 ] 

Yanwen Lin commented on KAFKA-12933:


Hi [~mjsax], I guess we shall mark this ticket as Resolved because [~david.mao] 
has already raised and merged a PR for this? 
[https://github.com/apache/kafka/pull/11244]

CC: [~ijuma]

> Flaky test 
> ReassignPartitionsIntegrationTest.testReassignmentWithAlterIsrDisabled
> -
>
> Key: KAFKA-12933
> URL: https://issues.apache.org/jira/browse/KAFKA-12933
> Project: Kafka
>  Issue Type: Test
>  Components: admin
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> {quote}org.opentest4j.AssertionFailedError: expected:  but was:  
> at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) at 
> kafka.admin.ReassignPartitionsIntegrationTest.executeAndVerifyReassignment(ReassignPartitionsIntegrationTest.scala:130)
>  at 
> kafka.admin.ReassignPartitionsIntegrationTest.testReassignmentWithAlterIsrDisabled(ReassignPartitionsIntegrationTest.scala:74){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-1935) Consumer should use a separate socket for Coordinator connection

2021-08-24 Thread Yanwen Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-1935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404167#comment-17404167
 ] 

Yanwen Lin commented on KAFKA-1935:
---

Hi [~guozhang], is this still an issue currently? If so, I'd like to work this.

> Consumer should use a separate socket for Coordinator connection
> 
>
> Key: KAFKA-1935
> URL: https://issues.apache.org/jira/browse/KAFKA-1935
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie
>
> KAFKA-1925 is just a quick-fix of this issue, we need to let consumer to be 
> able to create separate sockets for the same server for coordinator / broker 
> roles.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6579) Consolidate window store and session store unit tests into a single class

2021-08-24 Thread Yanwen Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404175#comment-17404175
 ] 

Yanwen Lin commented on KAFKA-6579:
---

Hi [~teamurko], are you still working on this? If not, I'd like to give a shot.

> Consolidate window store and session store unit tests into a single class
> -
>
> Key: KAFKA-6579
> URL: https://issues.apache.org/jira/browse/KAFKA-6579
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie, unit-test
>
> For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared 
> among all its implementations; however for window and session stores, each 
> class has its own independent unit test classes that do not share the test 
> coverage. In fact, many of these test classes share the same unit test 
> functions (e.g. {{RocksDBWindowStoreTest}}, 
> {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}).
> It is better to use the same pattern as for key value stores to consolidate 
> these test functions into a shared base class.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-6579) Consolidate window store and session store unit tests into a single class

2021-08-24 Thread Yanwen Lin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanwen Lin updated KAFKA-6579:
--
Comment: was deleted

(was: Hi [~teamurko], are you still working on this? If not, I'd like to give a 
shot.)

> Consolidate window store and session store unit tests into a single class
> -
>
> Key: KAFKA-6579
> URL: https://issues.apache.org/jira/browse/KAFKA-6579
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie, unit-test
>
> For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared 
> among all its implementations; however for window and session stores, each 
> class has its own independent unit test classes that do not share the test 
> coverage. In fact, many of these test classes share the same unit test 
> functions (e.g. {{RocksDBWindowStoreTest}}, 
> {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}).
> It is better to use the same pattern as for key value stores to consolidate 
> these test functions into a shared base class.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13032) Impossible stacktrace

2021-08-24 Thread Yanwen Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404177#comment-17404177
 ] 

Yanwen Lin commented on KAFKA-13032:


Hi [~mjsax], I have raised a PR for this and it's got approval. Could you 
please help take a look and maybe merge it (:)) ? Thanks!

PR link: [https://github.com/apache/kafka/pull/11241]

> Impossible stacktrace
> -
>
> Key: KAFKA-13032
> URL: https://issues.apache.org/jira/browse/KAFKA-13032
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Niclas Hedhman
>Assignee: Yanwen Lin
>Priority: Minor
>  Labels: beginner, easy-fix
>
> I am presented with a stacktrace that has not a single touch point in my 
> code, so it is incredibly difficult to figure out where the problem could be. 
> I think more RuntimeExceptions need to be caught and pull out information at 
> each level that is providing any additional hint of where we are.
> For instance, each node could prepend its reference/name and one would have a 
> chance to see where we are...
> ```
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_140, processor=KSTREAM-SOURCE-00, topic=_poll, 
> partition=140, offset=0, stacktrace=java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>   at 
> org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:268)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:50)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>   at 
> org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>   at 
> org.apache.kafka.streams.processor.internals.me

[jira] [Commented] (KAFKA-5666) Need feedback to user if consumption fails due to offsets.topic.replication.factor=3

2021-08-24 Thread Yanwen Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404179#comment-17404179
 ] 

Yanwen Lin commented on KAFKA-5666:
---

Hi [~RensGroothuijsen], are you still working on this? If not, I'd like to give 
a try.

Hi [~yevabyzek], is this still an issue currently?

> Need feedback to user if consumption fails due to 
> offsets.topic.replication.factor=3
> 
>
> Key: KAFKA-5666
> URL: https://issues.apache.org/jira/browse/KAFKA-5666
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Affects Versions: 0.11.0.0
>Reporter: Yeva Byzek
>Priority: Major
>  Labels: newbie, usability
>
> Introduced in 0.11: The offsets.topic.replication.factor broker config is now 
> enforced upon auto topic creation. Internal auto topic creation will fail 
> with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets 
> this replication factor requirement.
> Issue: Default is setting offsets.topic.replication.factor=3, but in 
> development and docker environments where there is only 1 broker, the offsets 
> topic will fail to be created when a consumer tries to consume and no records 
> will be returned.  As a result, the user experience is bad.  The user may 
> have no idea about this setting change and enforcement, and they just see 
> that `kafka-console-consumer` hangs with ZERO output. It is true that the 
> broker log file will provide a message (e.g. {{ERROR [KafkaApi-1] Number of 
> alive brokers '1' does not meet the required replication factor '3' for the 
> offsets topic (configured via 'offsets.topic.replication.factor'). This error 
> can be ignored if the cluster is starting up and not all brokers are up yet. 
> (kafka.server.KafkaApis)}}) but many users do not have access to the log 
> files or know how to get them.
> Suggestion: give feedback to the user/app if offsets topic cannot be created. 
>  For example, after some timeout.
> Workaround:
> Set offsets.topic.replication.factor=3



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10038) ConsumerPerformance.scala supports the setting of client.id

2021-08-24 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404185#comment-17404185
 ] 

Luke Chen commented on KAFKA-10038:
---

If it's just to provide default value when not provided, then no KIP is needed. 
Thanks.

> ConsumerPerformance.scala supports the setting of client.id
> ---
>
> Key: KAFKA-10038
> URL: https://issues.apache.org/jira/browse/KAFKA-10038
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, core
>Affects Versions: 2.1.1
> Environment: Trunk branch
>Reporter: tigertan
>Assignee: Yanwen Lin
>Priority: Minor
>  Labels: newbie, performance
>
> ConsumerPerformance.scala supports the setting of "client.id", which is a 
> reasonable requirement, and the way "console consumer" and "console producer" 
> handle "client.id" can be unified. "client.id" defaults to 
> "perf-consumer-client".
> We often use client.id in quotas, if the script of 
> kafka-producer-perf-test.sh supports the setting of "client.id" , we can do 
> quota testing through scripts without writing our own consumer programs. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)