[GitHub] [kafka] dajac commented on a change in pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance
dajac commented on a change in pull request #11231: URL: https://github.com/apache/kafka/pull/11231#discussion_r694568169 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -476,7 +476,6 @@ boolean joinGroupIfNeeded(final Timer timer) { else if (!future.isRetriable()) throw exception; -resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception)); Review comment: I have double checked the error handling as well and I do agree with the change as all the errors, which requires to rejoin, are explicitly handled in the `JoinResponseHandler` and the `SyncGroupResponseHandler`. The change looks good to me. I do agree with @guozhangwang that reseting `rejoinNeeded` would make the whole logic a bit cleaner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13195) StateSerde don't honor DeserializationExceptionHandler
[ https://issues.apache.org/jira/browse/KAFKA-13195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403611#comment-17403611 ] Bruno Cadonna commented on KAFKA-13195: --- [~mjsax] [~tchiotludo] Can we close this ticket? > StateSerde don't honor DeserializationExceptionHandler > -- > > Key: KAFKA-13195 > URL: https://issues.apache.org/jira/browse/KAFKA-13195 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.8.0 >Reporter: Ludo >Priority: Major > > Kafka streams allow to configure an > [DeserializationExceptionHandler|https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling] > > When you are using a StateStore most of message will be a copy of original > message in internal topic and mostly will use the same serializer if the > message is another type. > You can see > [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java#L159-L161] > that StateSerde is using the raw Deserializer and not honor the > {{StreamsConfig.}}{{DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG}}. > Leading to crash the application (reaching the > {{setUncaughtExceptionHandler}} method). > I think the state store must have the same behavior than the > {{RecordDeserializer}} and honor the DeserializationExceptionHandler. > > Stacktrace (coming from kafka stream 2.6.1) : > > {code:java} > Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing > !Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing > !org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_14, processor=workertaskjoined-repartition-source, > topic=kestra_executor-workertaskjoined-repartition, partition=14, > offset=167500, > stacktrace=org.apache.kafka.common.errors.SerializationException: > com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize > value of type `io.kestra.plugin.gcp.bigquery.ExtractToGcs$Format` from String > "txt": not one of the values accepted for Enum class: > [NEWLINE_DELIMITED_JSON, AVRO, PARQUET, CSV] at [Source: > (byte[])"{[truncated 1270 bytes]; line: 1, column: 1187] (through > reference chain: > io.kestra.core.models.flows.Flow["tasks"]->java.util.ArrayList[1]->org.kestra.task.gcp.bigquery.ExtractToGcs["format"]) > at > com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67) > at > com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1851) > at > com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:1079) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer._deserializeAltString(EnumDeserializer.java:327) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer._fromString(EnumDeserializer.java:214) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer.deserialize(EnumDeserializer.java:188) > at > com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:324) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:225) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:197) > at > com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:137) > at > com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:107) > at > com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:263) > at > com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:357) > at > com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244) > at > com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28) > at > com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:542) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:565) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:449) > at > com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1405) > at > com.fasterxml.jackson.data
[GitHub] [kafka] dajac merged pull request #11230: KAFKA-12840; Removing `compact` cleaning on a topic should abort on-going compactions
dajac merged pull request #11230: URL: https://github.com/apache/kafka/pull/11230 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12840) Removing `compact` cleaning on a topic should abort on-going compactions
[ https://issues.apache.org/jira/browse/KAFKA-12840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-12840. - Fix Version/s: 3.1.0 Reviewer: Jun Rao Resolution: Fixed > Removing `compact` cleaning on a topic should abort on-going compactions > > > Key: KAFKA-12840 > URL: https://issues.apache.org/jira/browse/KAFKA-12840 > Project: Kafka > Issue Type: Improvement >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > Fix For: 3.1.0 > > > When `compact` is removed from the `cleanup.policy` of a topic, the > compactions of that topic should be aborted. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10038) ConsumerPerformance.scala supports the setting of client.id
[ https://issues.apache.org/jira/browse/KAFKA-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yanwen Lin reassigned KAFKA-10038: -- Assignee: Yanwen Lin > ConsumerPerformance.scala supports the setting of client.id > --- > > Key: KAFKA-10038 > URL: https://issues.apache.org/jira/browse/KAFKA-10038 > Project: Kafka > Issue Type: Improvement > Components: consumer, core >Affects Versions: 2.1.1 > Environment: Trunk branch >Reporter: tigertan >Assignee: Yanwen Lin >Priority: Minor > Labels: newbie, performance > > ConsumerPerformance.scala supports the setting of "client.id", which is a > reasonable requirement, and the way "console consumer" and "console producer" > handle "client.id" can be unified. "client.id" defaults to > "perf-consumer-client". > We often use client.id in quotas, if the script of > kafka-producer-perf-test.sh supports the setting of "client.id" , we can do > quota testing through scripts without writing our own consumer programs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kowshik opened a new pull request #11253: MINOR: Improve local variable name in UnifiedLog.maybeIncrementFirstUnstableOffset
kowshik opened a new pull request #11253: URL: https://github.com/apache/kafka/pull/11253 It looked odd that the code has a local variable named `updatedFirstStableOffset` which is used to update `MergedLog.firstUnstableOffsetMetadata`. This PR improves the local variable name to be `updatedFirstUnstableOffset` instead which is more aligned with the `MergedLog` attribute being updated. **Tests:** Relying on existing unit & integration tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper
jlprat commented on a change in pull request #11241: URL: https://github.com/apache/kafka/pull/11241#discussion_r694629688 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java ## @@ -68,6 +70,12 @@ public void testMap() { } } +@Test +public void testKeyValueMapperResultNotNull() { +final KStreamMap supplier = new KStreamMap<>((key, value) -> null); +assertThrows(NullPointerException.class, () -> supplier.get().process(new Record<>("K", 0, 0L))); Review comment: Same as previous comment ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java ## @@ -86,4 +88,10 @@ public void testFlatMap() { assertEquals(expected[i], supplier.theCapturedProcessor().processed().get(i)); } } + +@Test +public void testKeyValueMapperResultNotNull() { +final KStreamFlatMap supplier = new KStreamFlatMap<>((key, value) -> null); +assertThrows(NullPointerException.class, () -> supplier.get().process(new Record<>("K", 0, 0L))); Review comment: As the whole point of this PR is to provide better messages, I would also check in the test that the exception has the new enhanced message. Something like ```suggestion final Record record = new Record<>("K", 0, 0L); assertThrows(NullPointerException.class, () -> supplier.get().process(record), String.format("KeyValueMapper can't return null from mapping the record: %s", record)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #11250: Kafka 12766 - Disabling WAL-related Options in RocksDB
cadonna commented on a change in pull request #11250: URL: https://github.com/apache/kafka/pull/11250#discussion_r694622105 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java ## @@ -249,7 +251,7 @@ public int maxFileOpeningThreads() { @Override public Options setMaxTotalWalSize(final long maxTotalWalSize) { -dbOptions.setMaxTotalWalSize(maxTotalWalSize); +LOGGER.warn("WAL is explicitly disabled by Streams in RocksDB. Setting option 'maxTotalWalSize' will be ignored"); Review comment: Could you please add tests that verify the log messages? You can find an example how to verify log messages in `KTableSourceTest#kTableShouldLogAndMeterOnSkippedRecords()`. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java ## @@ -61,12 +62,13 @@ * * This class do the translation between generic {@link Options} into {@link DBOptions} and {@link ColumnFamilyOptions}. */ -public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options { +class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter extends Options { Review comment: I think this is not strictly needed since the constructor is already package-private. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java ## @@ -304,7 +306,7 @@ public String walDir() { @Override public Options setWalDir(final String walDir) { -dbOptions.setWalDir(walDir); +LOGGER.warn("WAL is explicitly disabled by Streams in RocksDB. Setting option 'walDir' will be ignored"); Review comment: A logging helper method sounds reasonable. IMO the return can stay as it is, because we would not save too much. But I do not have strong feelings about it. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ## @@ -178,15 +178,15 @@ void openDB(final Map configs, final File stateDir) { // Setup statistics before the database is opened, otherwise the statistics are not updated // with the measurements from Rocks DB -maybeSetUpStatistics(configs); +setStatisticsIfNeeded(configs); openRocksDB(dbOptions, columnFamilyOptions); open = true; addValueProvidersToMetricsRecorder(); } -private void maybeSetUpStatistics(final Map configs) { +private void setStatisticsIfNeeded(final Map configs) { Review comment: `maybeDoSomething()` is also used in a lot of places. So I agree with @abbccdda. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11073: KAFKA-13092: Perf regression in LISR requests
dajac commented on pull request #11073: URL: https://github.com/apache/kafka/pull/11073#issuecomment-904481315 @jolshan All the builds have failed. Could you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #11127: KAFKA-13134: Give up group metadata lock before sending heartbeat response
dajac merged pull request #11127: URL: https://github.com/apache/kafka/pull/11127 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13134) Heartbeat Request high lock contention
[ https://issues.apache.org/jira/browse/KAFKA-13134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-13134. - Fix Version/s: 3.1.0 Reviewer: David Jacot Resolution: Fixed > Heartbeat Request high lock contention > -- > > Key: KAFKA-13134 > URL: https://issues.apache.org/jira/browse/KAFKA-13134 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: David Mao >Assignee: David Mao >Priority: Minor > Fix For: 3.1.0 > > > On a cluster with high heartbeat rate, a lock profile showed high contention > for the GroupMetadata lock. > We can significantly reduce this by invoking the response callback outside of > the group metadata lock. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13195) StateSerde don't honor DeserializationExceptionHandler
[ https://issues.apache.org/jira/browse/KAFKA-13195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403737#comment-17403737 ] Ludo commented on KAFKA-13195: -- [~cadonna] : I think there is still some improvement to handle SerializationException if the community seems to be interested by the features. [~mjsax]: Just a thinking, in my special case, I've a way to catch the exception since I use a Custom Transformer with store api (see [here|https://github.com/kestra-io/kestra/blob/d21a94ddbd56b1610161451c392afe0829f8f412/runner-kafka/src/main/java/io/kestra/runner/kafka/streams/FlowWithTriggerTransformer.java#L37]), but what about a KTable or GlobalKTable ? I think the fetch of the state store is done internally and I don't think I have any way to catch the Exception for this case ? In this case, the special ExceptionHandler can be really helpful, the underlying topic data will never change and there is no way to handle it properly (maybe a custom deserializer that will emit null but it will log a warm as I remember) > StateSerde don't honor DeserializationExceptionHandler > -- > > Key: KAFKA-13195 > URL: https://issues.apache.org/jira/browse/KAFKA-13195 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.8.0 >Reporter: Ludo >Priority: Major > > Kafka streams allow to configure an > [DeserializationExceptionHandler|https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling] > > When you are using a StateStore most of message will be a copy of original > message in internal topic and mostly will use the same serializer if the > message is another type. > You can see > [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java#L159-L161] > that StateSerde is using the raw Deserializer and not honor the > {{StreamsConfig.}}{{DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG}}. > Leading to crash the application (reaching the > {{setUncaughtExceptionHandler}} method). > I think the state store must have the same behavior than the > {{RecordDeserializer}} and honor the DeserializationExceptionHandler. > > Stacktrace (coming from kafka stream 2.6.1) : > > {code:java} > Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing > !Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing > !org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_14, processor=workertaskjoined-repartition-source, > topic=kestra_executor-workertaskjoined-repartition, partition=14, > offset=167500, > stacktrace=org.apache.kafka.common.errors.SerializationException: > com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize > value of type `io.kestra.plugin.gcp.bigquery.ExtractToGcs$Format` from String > "txt": not one of the values accepted for Enum class: > [NEWLINE_DELIMITED_JSON, AVRO, PARQUET, CSV] at [Source: > (byte[])"{[truncated 1270 bytes]; line: 1, column: 1187] (through > reference chain: > io.kestra.core.models.flows.Flow["tasks"]->java.util.ArrayList[1]->org.kestra.task.gcp.bigquery.ExtractToGcs["format"]) > at > com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67) > at > com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1851) > at > com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:1079) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer._deserializeAltString(EnumDeserializer.java:327) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer._fromString(EnumDeserializer.java:214) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer.deserialize(EnumDeserializer.java:188) > at > com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:324) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:225) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:197) > at > com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:137) > at > com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:107) > at > com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:263) > at > com.fasterxml.jackson.databind.deser.std.CollectionDeseri
[GitHub] [kafka] cadonna commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
cadonna commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r694676118 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ## @@ -1873,7 +1875,13 @@ public OffsetAndMetadata committed(TopicPartition partition, final Duration time acquireAndEnsureOpen(); try { maybeThrowInvalidGroupIdException(); -Map offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout)); +final Map offsets; +long start = time.nanoseconds(); +try { +offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout)); +} finally { +kafkaConsumerMetrics.recordCommitted(time.nanoseconds() - start); +} Review comment: Could you please add unit tests for this change? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ## @@ -1873,7 +1875,13 @@ public OffsetAndMetadata committed(TopicPartition partition, final Duration time acquireAndEnsureOpen(); try { maybeThrowInvalidGroupIdException(); Review comment: Why do you exclude this check in the measured time here but include it above? Similar applies to `offsets.forEach(this::updateLastSeenEpochIfNewer)`. ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.producer.internals; + +import java.util.Map; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.CumulativeSum; Review comment: ```suggestion import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.CumulativeSum; import java.util.Map; ``` ## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ## @@ -699,7 +706,9 @@ public void sendOffsetsToTransaction(Map offs throwIfProducerClosed(); TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, groupMetadata); sender.wakeup(); Review comment: Why are those lines not included in the measurement? ## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ## @@ -905,6 +971,57 @@ public void testSendTxnOffsetsWithGroupId() { } } +private double getAndAssertDuration(KafkaProducer producer, String name, double floor) { +double value = getMetricValue(producer, name); +assertTrue(value > floor); +return value; +} + +@Test +public void testMeasureTransactionDurations() { +Map configs = new HashMap<>(); +configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id"); +configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1); +configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); +Time time = new MockTime(1); +MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1)); +ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE); + +MockClient client = new MockClient(time, metadata); +client.updateMetadata(initialUpdateResponse); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1)); +client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE)); + +try (KafkaProducer producer = kafkaProducer(configs, new StringSerializer(), +new StringSerializer(), metadata, client, null, time)) { +producer.initTransactions(); +assertTrue(getMetricValue(producer, "txn-init-time-total") > 99); Review comment: I am not sure I understand this verificati
[GitHub] [kafka] cadonna commented on pull request #11149: KIP-761: add total blocked time metric to streams
cadonna commented on pull request #11149: URL: https://github.com/apache/kafka/pull/11149#issuecomment-904548251 The title of the PR should start with the Jira ID, i.e., KAFKA-1234. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-2424) Consider introducing lint-like tool for Scala
[ https://issues.apache.org/jira/browse/KAFKA-2424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josep Prat reassigned KAFKA-2424: - Assignee: Josep Prat > Consider introducing lint-like tool for Scala > - > > Key: KAFKA-2424 > URL: https://issues.apache.org/jira/browse/KAFKA-2424 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: Ismael Juma >Assignee: Josep Prat >Priority: Major > Labels: newbie > > Typesafe is working on abide and the first release is expected next month: > https://github.com/scala/scala-abide > An alternative is scapegoat: > https://github.com/sksamuel/scalac-scapegoat-plugin -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jlprat opened a new pull request #11254: KAFKA-2424: Introduce Scalafix linter
jlprat opened a new pull request #11254: URL: https://github.com/apache/kafka/pull/11254 Introduces Scalafix, a linter with some rewrite capabilities on top (https://scalacenter.github.io/scalafix/) By running the `checkScalafix` gradle tasks a report is printed with all broken rules. Running `scalafix` gradle task will apply some rewrites on the files, for example: import ordering, and annotating return types. This change uses the official gradle plugin for Scalafix, and Scalafix itself is maintained by the Scala Center. Current rules checked: - Do not allow `final var` - Explicit return types for public and protected methods (for public `val` and `var` the change was too big. This can be done at a later time) - Avoid procedure syntax in Scala - Avoid use of val in for comprehensions (it's a deprecated feature) *Known limitations*: when automatically inferring the return types, Scalafix might fail and pick a "too wide" type, i.e. `Any` instead of the correct one. After letting Scalafix fix the files, contributors should still look at the changes done and validate they are correct. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-2424) Consider introducing lint-like tool for Scala
[ https://issues.apache.org/jira/browse/KAFKA-2424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403765#comment-17403765 ] Josep Prat commented on KAFKA-2424: --- I know this is a very old task, but there is still no linter in the build. I configured and added Scalafix in the submitted PR. > Consider introducing lint-like tool for Scala > - > > Key: KAFKA-2424 > URL: https://issues.apache.org/jira/browse/KAFKA-2424 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: Ismael Juma >Assignee: Josep Prat >Priority: Major > Labels: newbie > > Typesafe is working on abide and the first release is expected next month: > https://github.com/scala/scala-abide > An alternative is scapegoat: > https://github.com/sksamuel/scalac-scapegoat-plugin -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jlprat commented on pull request #11254: KAFKA-2424: Introduce Scalafix linter
jlprat commented on pull request #11254: URL: https://github.com/apache/kafka/pull/11254#issuecomment-904569475 @cadonna, @ijuma and @vvcephei if any of you has time to take a look at this PR I'd be thankful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #11254: KAFKA-2424: Introduce Scalafix linter
jlprat commented on a change in pull request #11254: URL: https://github.com/apache/kafka/pull/11254#discussion_r694776794 ## File path: Jenkinsfile ## @@ -20,7 +20,7 @@ def doValidation() { sh """ ./gradlew -PscalaVersion=$SCALA_VERSION clean compileJava compileScala compileTestJava compileTestScala \ -spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat \ +spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain checkScalafix rat \ Review comment: Added the new task in the Jenkins job script ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1737,11 +1737,11 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO Set.empty[String] } - def interBrokerListenerName = getInterBrokerListenerNameAndSecurityProtocol._1 - def interBrokerSecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2 - def controlPlaneListenerName = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, _) => listenerName } - def controlPlaneSecurityProtocol = getControlPlaneListenerNameAndSecurityProtocol.map { case (_, securityProtocol) => securityProtocol } - def saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp) + def interBrokerListenerName: ListenerName = getInterBrokerListenerNameAndSecurityProtocol._1 + def interBrokerSecurityProtocol: SecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2 + def controlPlaneListenerName: Option[ListenerName] = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, _) => listenerName } + def controlPlaneSecurityProtocol: Option[SecurityProtocol] = getControlPlaneListenerNameAndSecurityProtocol.map { case (_, securityProtocol) => securityProtocol } Review comment: These 2 were incorrectly inferred by Scalafix, I needed to correct them manually. Scalafix inferred for both `Option[Any]`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #11254: KAFKA-2424: Introduce Scalafix linter
jlprat commented on pull request #11254: URL: https://github.com/apache/kafka/pull/11254#issuecomment-904572739 As far as I understand, my jenkinsfile modification will not be taken into consideration by Jenkins in this run, maybe it's worth keep a close eye to this once it's merged -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11104: KAFKA-13079: Forgotten Topics in Fetch Requests may incorrectly use topic IDs
dajac commented on a change in pull request #11104: URL: https://github.com/apache/kafka/pull/11104#discussion_r694816464 ## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ## @@ -332,6 +329,9 @@ public FetchRequestData build() { iter.remove(); // Indicate that we no longer want to listen to this partition. removed.add(topicPartition); +// If we do not have this topic ID in the session, we can not use topic IDs Review comment: nit: `.` at the end of the sentence. ## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ## @@ -443,17 +443,50 @@ public void testIdUsageRevokedOnIdDowngrade() { // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); -builder2.add(new TopicPartition("foo", 0), Uuid.ZERO_UUID, +builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); // Should have the same session ID and next epoch, but can no longer use topic IDs. // The receiving broker will close the session if we were previously using topic IDs. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); -assertEquals(1, data2.metadata().epoch(), "Did not close session when " + testType); +assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType); assertFalse(data2.canUseTopicIds()); }); } +@Test +public void testIdUsageWithAllForgottenPartitions() { +// We want to test when all topics are removed from the session +List useTopicIdsTests = Arrays.asList(true, false); Review comment: It would be great if we could use `@ParameterizedTest` here/ ## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ## @@ -443,17 +443,50 @@ public void testIdUsageRevokedOnIdDowngrade() { // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); -builder2.add(new TopicPartition("foo", 0), Uuid.ZERO_UUID, +builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); // Should have the same session ID and next epoch, but can no longer use topic IDs. // The receiving broker will close the session if we were previously using topic IDs. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); -assertEquals(1, data2.metadata().epoch(), "Did not close session when " + testType); +assertEquals(1, data2.metadata().epoch(), "Did not have correct epoch when " + testType); assertFalse(data2.canUseTopicIds()); }); } +@Test +public void testIdUsageWithAllForgottenPartitions() { +// We want to test when all topics are removed from the session +List useTopicIdsTests = Arrays.asList(true, false); +useTopicIdsTests.forEach(useTopicIds -> { +Uuid topicId = useTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID; +Short respVer = useTopicIds ? ApiKeys.FETCH.latestVersion() : 12; Review comment: nit: `respVer` -> `responseVersion`? ## File path: clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java ## @@ -443,17 +443,50 @@ public void testIdUsageRevokedOnIdDowngrade() { // Try to remove a topic ID from an existing topic partition (0) or add a new topic partition (1) without an ID. FetchSessionHandler.Builder builder2 = handler.newBuilder(); -builder2.add(new TopicPartition("foo", 0), Uuid.ZERO_UUID, +builder2.add(new TopicPartition("foo", partition), Uuid.ZERO_UUID, new FetchRequest.PartitionData(10, 110, 210, Optional.empty())); FetchSessionHandler.FetchRequestData data2 = builder2.build(); // Should have the same session ID and next epoch, but can no longer use topic IDs. // The receiving broker will close the session if we were previously using topic IDs. assertEquals(123, data2.metadata().sessionId(), "Did not use same session when " + testType); -assertEquals(1, data2.metadata().epoch(),
[GitHub] [kafka] dajac commented on a change in pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error
dajac commented on a change in pull request #11086: URL: https://github.com/apache/kafka/pull/11086#discussion_r694843292 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java ## @@ -159,10 +159,11 @@ private void handleError( Set groupsToRetry ) { switch (error) { -// If the coordinator is in the middle of loading, then we just need to retry. +// If the coordinator is in the middle of loading, or rebalance is in progress, then we just need to retry. case COORDINATOR_LOAD_IN_PROGRESS: +case REBALANCE_IN_PROGRESS: log.debug("OffsetCommit request for group id {} failed because the coordinator" + -" is still in the process of loading state. Will retry.", groupId.idValue); +" is still in the process of loading state or the group is rebalancing. Will retry.", groupId.idValue); Review comment: I wonder if it would be preferable to use a more generic message which clearly mention the error encountered: `OffsetCommit request for group id {} returned error {}. Will retry.`. Without mentioning the received error, we don't really know what happened so the log is not that useful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #11254: KAFKA-2424: Introduce Scalafix linter
jlprat commented on pull request #11254: URL: https://github.com/apache/kafka/pull/11254#issuecomment-904646319 Failure was https://issues.apache.org/jira/browse/KAFKA-9897p -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat edited a comment on pull request #11254: KAFKA-2424: Introduce Scalafix linter
jlprat edited a comment on pull request #11254: URL: https://github.com/apache/kafka/pull/11254#issuecomment-904646319 Failure was https://issues.apache.org/jira/browse/KAFKA-9897 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat edited a comment on pull request #11254: KAFKA-2424: Introduce Scalafix linter
jlprat edited a comment on pull request #11254: URL: https://github.com/apache/kafka/pull/11254#issuecomment-904646319 Failure was https://issues.apache.org/jira/browse/KAFKA-13128 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error
showuon commented on a change in pull request #11086: URL: https://github.com/apache/kafka/pull/11086#discussion_r694870996 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java ## @@ -159,10 +159,11 @@ private void handleError( Set groupsToRetry ) { switch (error) { -// If the coordinator is in the middle of loading, then we just need to retry. +// If the coordinator is in the middle of loading, or rebalance is in progress, then we just need to retry. case COORDINATOR_LOAD_IN_PROGRESS: +case REBALANCE_IN_PROGRESS: log.debug("OffsetCommit request for group id {} failed because the coordinator" + -" is still in the process of loading state. Will retry.", groupId.idValue); +" is still in the process of loading state or the group is rebalancing. Will retry.", groupId.idValue); Review comment: Make sense to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error
showuon commented on pull request #11086: URL: https://github.com/apache/kafka/pull/11086#issuecomment-904662073 > excuse me for the delay Never mind! :) And thanks for the comment. I've updated the PR. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error
dajac commented on a change in pull request #11086: URL: https://github.com/apache/kafka/pull/11086#discussion_r694876130 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java ## @@ -158,25 +158,24 @@ private void handleError( Set groupsToUnmap, Set groupsToRetry ) { +final String requestErrorMsg = "OffsetCommit request for group id {} returned error {}. Will retry."; Review comment: I would rather keep it inline to stay consistent with the other cases. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java ## @@ -158,25 +158,24 @@ private void handleError( Set groupsToUnmap, Set groupsToRetry ) { +final String requestErrorMsg = "OffsetCommit request for group id {} returned error {}. Will retry."; switch (error) { -// If the coordinator is in the middle of loading, then we just need to retry. +// If the coordinator is in the middle of loading, or rebalance is in progress, then we just need to retry. case COORDINATOR_LOAD_IN_PROGRESS: -log.debug("OffsetCommit request for group id {} failed because the coordinator" + -" is still in the process of loading state. Will retry.", groupId.idValue); +case REBALANCE_IN_PROGRESS: +log.debug(requestErrorMsg, groupId.idValue, error); groupsToRetry.add(groupId); break; // If the coordinator is not available, then we unmap and retry. case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: -log.debug("OffsetCommit request for group id {} returned error {}. Will retry.", -groupId.idValue, error); +log.debug(requestErrorMsg, groupId.idValue, error); Review comment: Perhaps, we could say ` Will rediscover the coordinator and retry.` in this case. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`
dajac commented on a change in pull request #11225: URL: https://github.com/apache/kafka/pull/11225#discussion_r694960458 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -2207,15 +2198,28 @@ class ReplicaManager(val config: KafkaConfig, InitialFetchState(leaderEndPoint, partition.getLeaderEpoch, fetchOffset)) } else { stateChangeLogger.info( -s"Skipped the become-follower state change after marking its partition as " + +"Skipped the become-follower state change after marking its partition as " + s"follower for partition $tp with id ${info.topicId} and partition state $state." ) } } } changedPartitions.add(partition) } catch { - case e: Throwable => stateChangeLogger.error(s"Unable to start fetching ${tp} " + + case e: KafkaStorageException => +// If there is an offline log directory, a Partition object may have been created by +// `getOrCreatePartition()` before `createLogIfNotExists()` failed to create local replica due +// to KafkaStorageException. In this case `ReplicaManager.allPartitions` will map this topic-partition +// to an empty Partition object. We need to map this topic-partition to OfflinePartition instead. +markPartitionOffline(tp) Review comment: Good point. I've added few unit tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11225: MINOR; Small optimizations in `ReplicaManager#becomeLeaderOrFollower`
dajac commented on a change in pull request #11225: URL: https://github.com/apache/kafka/pull/11225#discussion_r694960197 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -2207,15 +2198,28 @@ class ReplicaManager(val config: KafkaConfig, InitialFetchState(leaderEndPoint, partition.getLeaderEpoch, fetchOffset)) } else { stateChangeLogger.info( -s"Skipped the become-follower state change after marking its partition as " + +"Skipped the become-follower state change after marking its partition as " + s"follower for partition $tp with id ${info.topicId} and partition state $state." ) } } } changedPartitions.add(partition) } catch { - case e: Throwable => stateChangeLogger.error(s"Unable to start fetching ${tp} " + + case e: KafkaStorageException => +// If there is an offline log directory, a Partition object may have been created by +// `getOrCreatePartition()` before `createLogIfNotExists()` failed to create local replica due +// to KafkaStorageException. In this case `ReplicaManager.allPartitions` will map this topic-partition +// to an empty Partition object. We need to map this topic-partition to OfflinePartition instead. +markPartitionOffline(tp) +stateChangeLogger.error(s"Unable to start fetching $tp " + + s"with topic ID ${info.topicId} due to a storage error ${e.getMessage}", e) +replicaFetcherManager.addFailedPartition(tp) +error(s"Error while making broker the follower for partition $tp in dir " + Review comment: No... I've put it because the other cases have it. I have removed all of them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonyanwenl commented on pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper
jasonyanwenl commented on pull request #11241: URL: https://github.com/apache/kafka/pull/11241#issuecomment-904751684 > LGTM! Thanks for the fix! > One suggestion: please try not to email to the whole dev group to request for a code review next time (unless it's an urgent PR). If every PR owner sends an email in the dev group for code review, you can imagine how many emails will received for dev group members. Please try to mention the possible reviewer's names (could be more than one) in the PR, and try again next week ( or next 2 weeks, or more ) if no response. Hope that helps, and welcome to the Kafka community! :) Hi @showuon, thanks for your suggestion! Actualy when I checked the official site "[How to Contribute](https://kafka.apache.org/contributing)", I found it said "**Nag us if we aren't doing our job...**" and when I clicked the link "**Nag us**", it directly pops up an emailing window. So I guess we may need to update that webiste to avoid suggesting people directly sending email to the entire dev email group for PR review request (or be more clear that is for urgent fix). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 opened a new pull request #11255: KAFKA-13225: Controller skips sending UpdateMetadataRequest when no change in partition state.
splett2 opened a new pull request #11255: URL: https://github.com/apache/kafka/pull/11255 ### What The controller can skip sending `updateMetadataRequest` during the broker failure callback if there are offline partitions and the deleted brokers don't host any partitions. Looking at the logic, I'm not sure why the if check is checking for partitionsWithOfflineLeader. This seems like a bug which may mean we're sending additional `updateMetadataRequests` on broker shutdowns. ### Testing Added an integration test for the failure scenario. The controller integration test suite passes locally with my change. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13215) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation
[ https://issues.apache.org/jira/browse/KAFKA-13215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson reassigned KAFKA-13215: -- Assignee: Walker Carlson > Flaky test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation > --- > > Key: KAFKA-13215 > URL: https://issues.apache.org/jira/browse/KAFKA-13215 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Konstantine Karantasis >Assignee: Walker Carlson >Priority: Major > Labels: flaky-test > Fix For: 3.1.0 > > > Integration test {{test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}} > sometimes fails with > {code:java} > java.lang.AssertionError: only one task > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:163) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation(TaskMetadataIntegrationTest.java:144) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13215) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation
[ https://issues.apache.org/jira/browse/KAFKA-13215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403921#comment-17403921 ] Walker Carlson commented on KAFKA-13215: [GitHub Pull Request #11083|https://github.com/apache/kafka/pull/11083] This Pr should take case of this ticket as well > Flaky test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation > --- > > Key: KAFKA-13215 > URL: https://issues.apache.org/jira/browse/KAFKA-13215 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Konstantine Karantasis >Assignee: Walker Carlson >Priority: Major > Labels: flaky-test > Fix For: 3.1.0 > > > Integration test {{test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}} > sometimes fails with > {code:java} > java.lang.AssertionError: only one task > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:163) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation(TaskMetadataIntegrationTest.java:144) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jasonyanwenl commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper
jasonyanwenl commented on a change in pull request #11241: URL: https://github.com/apache/kafka/pull/11241#discussion_r695002209 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java ## @@ -86,4 +88,10 @@ public void testFlatMap() { assertEquals(expected[i], supplier.theCapturedProcessor().processed().get(i)); } } + +@Test +public void testKeyValueMapperResultNotNull() { +final KStreamFlatMap supplier = new KStreamFlatMap<>((key, value) -> null); +assertThrows(NullPointerException.class, () -> supplier.get().process(new Record<>("K", 0, 0L))); Review comment: Thanks for your review! I found we mixed the using of two packages: `org.junit.Assert` and `org.junit.jupiter.api.Assertions`. The method `asssertThrows` in those two packages have different api: For `org.junit.Assert.assertThrows`, the [api](https://junit.org/junit4/javadoc/4.13/org/junit/Assert.html#assertThrows(java.lang.String,%20java.lang.Class,%20org.junit.function.ThrowingRunnable)) is: ```java public static T assertThrows(String message, Class expectedThrowable, ThrowingRunnable runnable) ``` For `org.junit.jupiter.api.Assertions.assertThrows`, the [api](https://junit.org/junit5/docs/5.0.1/api/org/junit/jupiter/api/Assertions.html#assertThrows-java.lang.Class-org.junit.jupiter.api.function.Executable-java.lang.String-) is: ```java public static T assertThrows(Class expectedType, Executable executable, String message) ``` I searched our Kafka code base and found there is [another place](https://github.com/apache/kafka/blob/9565a529e08d7aa36beac02c8e6115bcc87d2dc7/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java#L521) using the package `org.junit.Assert.assertThrows` to assert exception message so I follow that to use the same apii as well here. Please let me know if this is not proper. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonyanwenl commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper
jasonyanwenl commented on a change in pull request #11241: URL: https://github.com/apache/kafka/pull/11241#discussion_r695002209 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java ## @@ -86,4 +88,10 @@ public void testFlatMap() { assertEquals(expected[i], supplier.theCapturedProcessor().processed().get(i)); } } + +@Test +public void testKeyValueMapperResultNotNull() { +final KStreamFlatMap supplier = new KStreamFlatMap<>((key, value) -> null); +assertThrows(NullPointerException.class, () -> supplier.get().process(new Record<>("K", 0, 0L))); Review comment: Thanks for your review! I found we mixed the using of two packages: `org.junit.Assert` and `org.junit.jupiter.api.Assertions`. The method `asssertThrows` in those two packages have different api: For `org.junit.Assert.assertThrows`, the [api](https://junit.org/junit4/javadoc/4.13/org/junit/Assert.html#assertThrows(java.lang.String,%20java.lang.Class,%20org.junit.function.ThrowingRunnable)) is: ```java public static T assertThrows(String message, Class expectedThrowable, ThrowingRunnable runnable) ``` For `org.junit.jupiter.api.Assertions.assertThrows`, the [api](https://junit.org/junit5/docs/5.0.1/api/org/junit/jupiter/api/Assertions.html#assertThrows-java.lang.Class-org.junit.jupiter.api.function.Executable-java.lang.String-) is: ```java public static T assertThrows(Class expectedType, Executable executable, String message) ``` I searched our Kafka code base and found there is [another place](https://github.com/apache/kafka/blob/9565a529e08d7aa36beac02c8e6115bcc87d2dc7/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java#L521) using the package `org.junit.Assert.assertThrows` to assert exception message so I follow that to use the same api as well here. Please let me know if this is not proper. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13215) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation
[ https://issues.apache.org/jira/browse/KAFKA-13215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson resolved KAFKA-13215. Resolution: Fixed > Flaky test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation > --- > > Key: KAFKA-13215 > URL: https://issues.apache.org/jira/browse/KAFKA-13215 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Konstantine Karantasis >Assignee: Walker Carlson >Priority: Major > Labels: flaky-test > Fix For: 3.1.0 > > > Integration test {{test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}} > sometimes fails with > {code:java} > java.lang.AssertionError: only one task > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:163) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation(TaskMetadataIntegrationTest.java:144) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jasonyanwenl commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper
jasonyanwenl commented on a change in pull request #11241: URL: https://github.com/apache/kafka/pull/11241#discussion_r695002209 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java ## @@ -86,4 +88,10 @@ public void testFlatMap() { assertEquals(expected[i], supplier.theCapturedProcessor().processed().get(i)); } } + +@Test +public void testKeyValueMapperResultNotNull() { +final KStreamFlatMap supplier = new KStreamFlatMap<>((key, value) -> null); +assertThrows(NullPointerException.class, () -> supplier.get().process(new Record<>("K", 0, 0L))); Review comment: @jlprat Thanks for your review! I found we mixed the using of two packages: `org.junit.Assert` and `org.junit.jupiter.api.Assertions`. The method `asssertThrows` in those two packages have different api: For `org.junit.Assert.assertThrows`, the [api](https://junit.org/junit4/javadoc/4.13/org/junit/Assert.html#assertThrows(java.lang.String,%20java.lang.Class,%20org.junit.function.ThrowingRunnable)) is: ```java public static T assertThrows(String message, Class expectedThrowable, ThrowingRunnable runnable) ``` For `org.junit.jupiter.api.Assertions.assertThrows`, the [api](https://junit.org/junit5/docs/5.0.1/api/org/junit/jupiter/api/Assertions.html#assertThrows-java.lang.Class-org.junit.jupiter.api.function.Executable-java.lang.String-) is: ```java public static T assertThrows(Class expectedType, Executable executable, String message) ``` I searched our Kafka code base and found there is [another place](https://github.com/apache/kafka/blob/9565a529e08d7aa36beac02c8e6115bcc87d2dc7/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java#L521) using the package `org.junit.Assert.assertThrows` to assert exception message so I follow that to use the same api as well here. Please let me know if this is not proper. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests
wcarlson5 commented on pull request #11083: URL: https://github.com/apache/kafka/pull/11083#issuecomment-904782977 @kkonstantine I saw you made a ticket for a failing test. I think this PR should fix it for 3.0 as well if cherrypicked -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 edited a comment on pull request #11083: KAFKA-13010: Retry getting tasks incase of rebalance for TaskMetadata tests
wcarlson5 edited a comment on pull request #11083: URL: https://github.com/apache/kafka/pull/11083#issuecomment-904782977 @kkonstantine I saw you made a ticket for a failing test. I think this PR should fix it for 3.0 as well if cherrypicked. https://issues.apache.org/jira/browse/KAFKA-13215 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #11246: MINOR: Improve controlled shutdown logging
hachikuji merged pull request #11246: URL: https://github.com/apache/kafka/pull/11246 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance
hachikuji commented on a change in pull request #11231: URL: https://github.com/apache/kafka/pull/11231#discussion_r695012453 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -476,7 +476,6 @@ boolean joinGroupIfNeeded(final Timer timer) { else if (!future.isRetriable()) throw exception; -resetStateAndRejoin(String.format("rebalance failed with retriable error %s", exception)); Review comment: Thanks for the suggestion. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper
jlprat commented on a change in pull request #11241: URL: https://github.com/apache/kafka/pull/11241#discussion_r695017818 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java ## @@ -86,4 +88,10 @@ public void testFlatMap() { assertEquals(expected[i], supplier.theCapturedProcessor().processed().get(i)); } } + +@Test +public void testKeyValueMapperResultNotNull() { +final KStreamFlatMap supplier = new KStreamFlatMap<>((key, value) -> null); +assertThrows(NullPointerException.class, () -> supplier.get().process(new Record<>("K", 0, 0L))); Review comment: Actually I was wrong with my code suggestion, the overloaded method doesn't do what I expected it to do. The message it takes is only to print if the assertion fails. That should do the trick! ``` final Throwable throwable = assertThrows(String.format("KeyValueMapper can't return null from mapping the record: %s", record), NullPointerException.class, () -> supplier.get().process(record)); assertEquals(throwable.getMessage(), String.format("KeyValueMapper can't return null from mapping the record: %s", record)); ``` Sorry about the confusion I brought with the first comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper
jlprat commented on a change in pull request #11241: URL: https://github.com/apache/kafka/pull/11241#discussion_r695019986 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java ## @@ -86,4 +88,12 @@ public void testFlatMap() { assertEquals(expected[i], supplier.theCapturedProcessor().processed().get(i)); } } + +@Test +public void testKeyValueMapperResultNotNull() { +final KStreamFlatMap supplier = new KStreamFlatMap<>((key, value) -> null); +final Record record = new Record<>("K", 0, 0L); +assertThrows(String.format("KeyValueMapper can't return null from mapping the record: %s", record), +NullPointerException.class, () -> supplier.get().process(record)); Review comment: Just copying over the suggestion to here, so it's easy to find ```suggestion final Throwable throwable = assertThrows(NullPointerException.class, () -> supplier.get().process(record)); assertEquals(throwable.getMessage(), String.format("KeyValueMapper can't return null from mapping the record: %s", record)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper
jlprat commented on a change in pull request #11241: URL: https://github.com/apache/kafka/pull/11241#discussion_r695017818 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java ## @@ -86,4 +88,10 @@ public void testFlatMap() { assertEquals(expected[i], supplier.theCapturedProcessor().processed().get(i)); } } + +@Test +public void testKeyValueMapperResultNotNull() { +final KStreamFlatMap supplier = new KStreamFlatMap<>((key, value) -> null); +assertThrows(NullPointerException.class, () -> supplier.get().process(new Record<>("K", 0, 0L))); Review comment: Actually I was wrong with my code suggestion, the overloaded method doesn't do what I expected it to do. The message it takes is only to print if the assertion fails. That should do the trick! ``` final Throwable throwable = assertThrows(NullPointerException.class, () -> supplier.get().process(record)); assertEquals(throwable.getMessage(), String.format("KeyValueMapper can't return null from mapping the record: %s", record)); ``` Sorry about the confusion I brought with the first comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #11073: KAFKA-13092: Perf regression in LISR requests
jolshan commented on pull request #11073: URL: https://github.com/apache/kafka/pull/11073#issuecomment-904805600 Looks like an issue with more recent changes. I'll fix it up -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jasonyanwenl commented on a change in pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper
jasonyanwenl commented on a change in pull request #11241: URL: https://github.com/apache/kafka/pull/11241#discussion_r695031580 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java ## @@ -86,4 +88,10 @@ public void testFlatMap() { assertEquals(expected[i], supplier.theCapturedProcessor().processed().get(i)); } } + +@Test +public void testKeyValueMapperResultNotNull() { +final KStreamFlatMap supplier = new KStreamFlatMap<>((key, value) -> null); +assertThrows(NullPointerException.class, () -> supplier.get().process(new Record<>("K", 0, 0L))); Review comment: @jlprat I see. I didn't notice this, either LOL. Fixed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #11238: MINOR: Fix force kill of KRaft colocated controllers in system tests
rondagostino commented on pull request #11238: URL: https://github.com/apache/kafka/pull/11238#issuecomment-904824877 > we can have ceil method to use, which will be easier Good suggestion! Done. Here's some system tests runs that show the new code is syntactically correct (and I observed the correct kill behavior). ``` test_id: kafkatest.sanity_checks.test_bounce.TestBounce.test_simple_run.metadata_quorum=COLOCATED_KRAFT.quorum_size=1 status: PASS run time: 1 minute 49.278 seconds test_id: kafkatest.sanity_checks.test_bounce.TestBounce.test_simple_run.metadata_quorum=COLOCATED_KRAFT.quorum_size=3 status: PASS run time: 2 minutes 6.602 seconds ``` Thank for the review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #11047: MINOR: Remove unnecessary code for WindowStoreBuilder.
abbccdda commented on a change in pull request #11047: URL: https://github.com/apache/kafka/pull/11047#discussion_r695050765 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java ## @@ -36,7 +36,6 @@ public WindowStoreBuilder(final WindowBytesStoreSupplier storeSupplier, final Serde valueSerde, final Time time) { super(storeSupplier.name(), keySerde, valueSerde, time); -Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); Review comment: @tang7526 Thanks, could we do sth similar to https://github.com/apache/kafka/blob/d30b4e51513e6938970020bf1e894c983447ef8f/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java#L36 here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance
ijuma commented on pull request #11231: URL: https://github.com/apache/kafka/pull/11231#issuecomment-904843864 @guozhangwang @dajac does the last commit from Jason look good? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11149: KAFKA-1234: KIP-761, add total blocked time metric to streams
guozhangwang commented on pull request #11149: URL: https://github.com/apache/kafka/pull/11149#issuecomment-904852557 > The title of the PR should start with the Jira ID, i.e., KAFKA-1234. Just to explain the context here, we have a browser plugin for AK tickets which can re-direct from PR directly to the ticket URL, but that script relies on the PR title to follow the pattern of `KAFKA-1234: blah blah`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang edited a comment on pull request #11149: KIP-761: add total blocked time metric to streams
guozhangwang edited a comment on pull request #11149: URL: https://github.com/apache/kafka/pull/11149#issuecomment-904852557 > The title of the PR should start with the Jira ID, i.e., KAFKA-1234. Just to explain the context here, we have a browser plugin for AK tickets which can re-direct from PR directly to the ticket URL, but that script relies on the PR title to follow the pattern of `KAFKA-1234: blah blah`. For this KIP probably we have not created a JIRA ticket yet? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance
guozhangwang commented on pull request #11231: URL: https://github.com/apache/kafka/pull/11231#issuecomment-904853346 Yup, I've approved it yesterday night. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance
ijuma commented on pull request #11231: URL: https://github.com/apache/kafka/pull/11231#issuecomment-904853838 The commit metadata said it was added 1 hour ago and hence why I asked :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance
guozhangwang commented on pull request #11231: URL: https://github.com/apache/kafka/pull/11231#issuecomment-904860153 Yeah I reviewed that last commit as well, btw it is not a correctness issue anyways so I was happy to merge as-is even :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance
hachikuji merged pull request #11231: URL: https://github.com/apache/kafka/pull/11231 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13214) Consumer should not reset group state after disconnect
[ https://issues.apache.org/jira/browse/KAFKA-13214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13214. - Fix Version/s: 2.8.1 2.7.2 3.0.0 Resolution: Fixed > Consumer should not reset group state after disconnect > -- > > Key: KAFKA-13214 > URL: https://issues.apache.org/jira/browse/KAFKA-13214 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.0, 2.8.0 >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Critical > Fix For: 3.0.0, 2.7.2, 2.8.1 > > > When the consumer disconnects from the coordinator while a rebalance is in > progress, we currently reset the memberId and generation. The coordinator > then must await the session timeout in order to expire the old memberId. This > was apparently a regression from > https://github.com/apache/kafka/commit/7e7bb184d2abe34280a7f0eb0f0d9fc0e32389f2#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R478. > It would be better to keep the memberId/generation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino opened a new pull request #11256: KAFKA-13224: Expose broker.id and node.id in config originals map
rondagostino opened a new pull request #11256: URL: https://github.com/apache/kafka/pull/11256 Plugins may expect `broker.id` to exist as a key in the config's various originals()-related maps, but with KRaft we rely solely on `node.id` for the broker's ID, and with the Zk-based brokers we provide the option to specify `node.id` in addition to (or as a full replacement for) `broker.id`. There are multiple problems related to this switch to `node.id`: - We do not enforce consistency between explicitly-specified `broker.id` and `node.id` properties in the config – it is entirely possible right now that we could set `broker.id=0` and also set `node.id=1`, and the broker will use 1 for it's ID. This is confusing at best; the broker should detect this inconsistency and fail to start with a ConfigException. - When `node.id` is set, both that value and any explicitly-set `broker.id` value will exist in the config's **originals()-related maps**. Downstream components are often configured based on these maps, and they may ask for the `broker.id`, so downstream components may be misconfigured if the values differ, or they may fail during configuration if no `broker.id` key exists in the map at all. - The config's **values()-related maps** will contain either the explicitly-specified `broker.id` value or the default value of -1. When `node.id` is set, both that value (which cannot be negative) and the (potentially -1) `broker.id` value will exist in the config's values()-related maps. Downstream components are often configured based on these maps, and they may ask for the `broker.id`, so downstream components may be misconfigured if the `broker.id` value differs from the broker's true ID. The broker should detect inconsistency between explicitly-specified `broker.id` and `node.id` values and fail startup accordingly. It should also ensures that the config's originals()- and values()-related maps contain the same mapped values for both `broker.id` and `node.id` keys when at least one is specified. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #11256: KAFKA-13224: Expose consistent broker.id and node.id in config values/originals maps
rondagostino commented on a change in pull request #11256: URL: https://github.com/apache/kafka/pull/11256#discussion_r695176785 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1405,21 +1405,95 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO // We make it part of each instance rather than the object to facilitate testing. private val zkClientConfigViaSystemProperties = new ZKClientConfig() - override def originals: util.Map[String, AnyRef] = -if (this eq currentConfig) super.originals else currentConfig.originals + private def maybeMutateOriginalsToMakeBrokerIdAndNodeIdMatch(map: util.Map[String, AnyRef]): util.Map[String, AnyRef] = { Review comment: This is pretty much the same as `maybeMutateOriginalsStringsToMakeBrokerIdAndNodeIdMatch()` below, but I was unable to consolidate them into a single method due to type-related errors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #11073: KAFKA-13092: Perf regression in LISR requests
jolshan commented on pull request #11073: URL: https://github.com/apache/kafka/pull/11073#issuecomment-905024994 Fixed my issue locally but looks like 2.8 branch is broken in general. ``` > Task :streams:compileTestJava /Users/jolshan/kafka/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java:73: error: stop() has private access in EmbeddedKafkaCluster CLUSTER.stop(); ^ 1 error ``` Will try to contact relevant person. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #11248: HOTFIX: Fix null pointer when getting metric value in MetricsReporter
jolshan commented on pull request #11248: URL: https://github.com/apache/kafka/pull/11248#issuecomment-905028262 Hi. This PR seemed to break the 2.8 branch. When I try to run checkstyle I see: ``` > Task :streams:compileTestJava /Users/jolshan/kafka/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java:73: error: stop() has private access in EmbeddedKafkaCluster CLUSTER.stop(); ``` I believe that file was added in this commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11186: KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft
jsancio commented on a change in pull request #11186: URL: https://github.com/apache/kafka/pull/11186#discussion_r687906243 ## File path: core/src/main/scala/kafka/server/BrokerServer.scala ## @@ -90,8 +90,7 @@ class BrokerServer( this.logIdent = logContext.logPrefix - val lifecycleManager: BrokerLifecycleManager = -new BrokerLifecycleManager(config, time, threadNamePrefix) + private var lifecycleManager: BrokerLifecycleManager = null Review comment: Why did we move this to `startup`? We seem to check for `null` in the `shutdown` method for a few of these fields, should do the same for this field? ## File path: clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java ## @@ -103,20 +103,22 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { ApiError apiError = ApiError.fromThrowable(e); List electionResults = new ArrayList<>(); -for (TopicPartitions topic : data.topicPartitions()) { -ReplicaElectionResult electionResult = new ReplicaElectionResult(); +if (data.topicPartitions() != null) { Review comment: When is `data.topicPartitions()` `null`? ## File path: core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala ## @@ -86,44 +79,22 @@ abstract class ZooKeeperTestHarness extends Logging { object ZooKeeperTestHarness { val ZkClientEventThreadSuffix = "-EventThread" - // Threads which may cause transient failures in subsequent tests if not shutdown. - // These include threads which make connections to brokers and may cause issues - // when broker ports are reused (e.g. auto-create topics) as well as threads - // which reset static JAAS configuration. Review comment: I see. Maybe move this comment to the test utility function you created. ## File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala ## @@ -1914,4 +1933,26 @@ object TestUtils extends Logging { ) } + def verifyNoUnexpectedThreads(context: String): Unit = { +val unexpectedThreadNames = Set( + ControllerEventManager.ControllerEventThreadName, + KafkaProducer.NETWORK_THREAD_PREFIX, + AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(), + AbstractCoordinator.HEARTBEAT_THREAD_PREFIX, + ZooKeeperTestHarness.ZkClientEventThreadSuffix +) Review comment: Is there some insight into why this specific set of threads? ## File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala ## @@ -1914,4 +1933,26 @@ object TestUtils extends Logging { ) } + def verifyNoUnexpectedThreads(context: String): Unit = { +val unexpectedThreadNames = Set( + ControllerEventManager.ControllerEventThreadName, + KafkaProducer.NETWORK_THREAD_PREFIX, + AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(), + AbstractCoordinator.HEARTBEAT_THREAD_PREFIX, + ZooKeeperTestHarness.ZkClientEventThreadSuffix +) + +def unexpectedThreads: Set[String] = { + val allThreads = Thread.getAllStackTraces.keySet.asScala.map(thread => thread.getName) + allThreads.filter(t => unexpectedThreadNames.exists(s => t.contains(s))).toSet +} + +def printUnexpectedThreads: String = { + val unexpected = unexpectedThreads + s"Found ${unexpected.size} unexpected threads during $context: ${unexpected.mkString("`", ",", "`")}" +} + +TestUtils.waitUntilTrue(() => unexpectedThreads.isEmpty, printUnexpectedThreads) Review comment: Probably unlikely to cause any issues in the thread but the set of threads check is different from the set of threads printed. Maybe we can use `computeUntilTrue`. ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -123,8 +123,8 @@ public class ReplicationControlManager { static class TopicControlInfo { -private final String name; -private final Uuid id; +final String name; +final Uuid id; Review comment: How about adding access methods instead? The nice thing about access method is that we can easily use them as functions. E.g. `Optional id = maybeTopicControlInfo.map(TopicControlInfo::id);` ## File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala ## @@ -1622,19 +1623,37 @@ object TestUtils extends Logging { waitForLeaderToBecome(client, topicPartition, None) } - def waitForLeaderToBecome(client: Admin, topicPartition: TopicPartition, leader: Option[Int]): Unit = { + def waitForOnlineBroker(client: Admin, brokerId: Int): Unit = { +waitUntilTrue(() => { + val nodes = client.describeCluster().nodes().get() + nodes.asScala.exists(_.id == brokerId) +}, s"Timed out waiting for brokerId $brokerId to come online") + } + + def waitForLeaderToBecome( +client: Admin, +topicPartition: TopicPartition, +expe
[jira] [Updated] (KAFKA-13010) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()
[ https://issues.apache.org/jira/browse/KAFKA-13010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-13010: --- Fix Version/s: (was: 3.1.0) 3.0.0 > Flaky test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation() > --- > > Key: KAFKA-13010 > URL: https://issues.apache.org/jira/browse/KAFKA-13010 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Bruno Cadonna >Assignee: Walker Carlson >Priority: Major > Labels: flaky-test > Fix For: 3.0.0 > > Attachments: > TaskMetadataIntegrationTest#shouldReportCorrectEndOffsetInformation.rtf > > > Integration test {{test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}} > sometimes fails with > {code:java} > java.lang.AssertionError: only one task > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:162) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation(TaskMetadataIntegrationTest.java:117) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13215) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation
[ https://issues.apache.org/jira/browse/KAFKA-13215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-13215: --- Fix Version/s: (was: 3.1.0) 3.0.0 > Flaky test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation > --- > > Key: KAFKA-13215 > URL: https://issues.apache.org/jira/browse/KAFKA-13215 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Konstantine Karantasis >Assignee: Walker Carlson >Priority: Major > Labels: flaky-test > Fix For: 3.0.0 > > > Integration test {{test > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}} > sometimes fails with > {code:java} > java.lang.AssertionError: only one task > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:163) > at > org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectEndOffsetInformation(TaskMetadataIntegrationTest.java:144) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #11248: HOTFIX: Fix null pointer when getting metric value in MetricsReporter
ableegoldman commented on pull request #11248: URL: https://github.com/apache/kafka/pull/11248#issuecomment-905084553 @jolshan ah, whoops. Thanks for the heads up. I'll get a patch ready -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #11257: HOTFIX: fix backport of #11248 by future-proofing the EmbeddedKafkaCluster
ableegoldman opened a new pull request #11257: URL: https://github.com/apache/kafka/pull/11257 A backport of #11248 broke the 2.8 build due to usage of the `EmbeddedKafkaCluster#stop` method, which used to be private. It seems we made this public when we upgraded to JUnit5 on the 3.0 branch and had to remove the ExternalResource that was previously responsible for calling `start()` and `stop()` for this class using the no-longer-available `@ClassRule` annotation. Rather than adapt this test to the 2.8 style by migrating it to use `@ClassRule` as well, I opted to just make the `stop() method public as well (since its analogue `start()` has always been public anyways). This should hopefully prevent any future backports that include integration tests from having to manually go in and adapt the test, or accidentally break the build as happened here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11257: HOTFIX: fix backport of #11248 by future-proofing the EmbeddedKafkaCluster
ableegoldman commented on pull request #11257: URL: https://github.com/apache/kafka/pull/11257#issuecomment-905096193 @jolshan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11248: HOTFIX: Fix null pointer when getting metric value in MetricsReporter
ableegoldman commented on pull request #11248: URL: https://github.com/apache/kafka/pull/11248#issuecomment-905097291 Fix is available here: https://github.com/apache/kafka/pull/11257 This also got me digging into why we had to make `stop()` public in 3.0, and seem to now be required to manually invoke both `start()` and `stop()` in every single integration test...which is incredibly error prone (or rather, resource-leakage prone). I fixed the 2.8 build in a way that should prevent future backports from breaking the 2.8 branch specifically, but I'm going to look into just improving how we do this in 3.0+ so we don't need to worry about breaking older branches than 2.8, or leaking resources by forgetting to clean up this cluster... 😬 Anyways, thanks very much for bringing this up! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9858: KAFKA-12173 Migrate streams:streams-scala module to JUnit 5
ableegoldman commented on a change in pull request #9858: URL: https://github.com/apache/kafka/pull/9858#discussion_r695328381 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -75,9 +76,19 @@ @Category(IntegrationTest.class) public class AdjustStreamThreadCountTest { -@ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); +@BeforeClass +public static void startCluster() throws IOException { +CLUSTER.start(); +} + +@AfterClass +public static void closeCluster() { +CLUSTER.stop(); Review comment: Hey @chia7712 , sorry for never bothering to take a look at this PR until just now, but I had a question about this. I know it was necessary to remove the `ExternalResource` feature that used to be responsible for calling `start()` and `stop()` for us in the integration tests since `@ClassRule` was removed in JUnit5, but that was really quite a loss since this now leaves us vulnerable to 1) resource leaks due to forgetting to clean up the EmbeddedKafkaCluster in an integration test, or doing so in an incorrect way (eg such that a test failure might skip the cleanup stage, a mistake that we've certainly encountered in our tests in the past) 2) breaking compatibility of integration tests across older branches, so that if we ever need to backport a fix that includes an integration test -- which many will/should do -- we can easily break the build of older branches by accident. See for example [#11257](https://github.com/apache/kafka/pull/11257): aka the reason I started digging into this 🙂 . Even if we remember to fix this during the backport, it's just an extra hassle. Now I'm certainly not an expert in all things JUnit, but a cursory glance suggests we can replicate the old behavior in which the EmbeddedKafkaCluster is automatically started/stopped without the need for this `@Before/AfterClass` boilerplate code in every integration test. I believe it's possible to do so using the `@Extension/ExtendWith` annotations. Can we try to port the EmbeddedKafkaCluster back to an automated lifecycle with these so we can clean up the Streams integration tests? cc @ijuma @vvcephei who may be more familiar with these constructs and how/when/why to use them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #11073: KAFKA-13092: Perf regression in LISR requests
jolshan commented on pull request #11073: URL: https://github.com/apache/kafka/pull/11073#issuecomment-905114876 fix here: https://github.com/apache/kafka/pull/11257 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
rodesai commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r695341815 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetrics.java ## @@ -63,6 +66,18 @@ public KafkaConsumerMetrics(Metrics metrics, String metricGrpPrefix) { metricGroupName, "The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records."), new Avg()); + +this.commitSyncSensor = metrics.sensor("commit-sync-time-total"); +this.commitSyncSensor.add( +metrics.metricName("commit-sync-time-total", metricGroupName), +new CumulativeSum() +); + +this.committedSensor = metrics.sensor("committed-time-total"); +this.committedSensor.add( +metrics.metricName("committed-time-total", metricGroupName), +new CumulativeSum() +); Review comment: ack - will defer to a follow-up PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
rodesai commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r695342570 ## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ## @@ -905,6 +971,57 @@ public void testSendTxnOffsetsWithGroupId() { } } +private double getAndAssertDuration(KafkaProducer producer, String name, double floor) { +double value = getMetricValue(producer, name); +assertTrue(value > floor); +return value; +} + +@Test +public void testMeasureTransactionDurations() { +Map configs = new HashMap<>(); +configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id"); +configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1); +configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); +Time time = new MockTime(1); +MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1)); +ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE); + +MockClient client = new MockClient(time, metadata); +client.updateMetadata(initialUpdateResponse); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "some.id", host1)); +client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE)); + +try (KafkaProducer producer = kafkaProducer(configs, new StringSerializer(), +new StringSerializer(), metadata, client, null, time)) { +producer.initTransactions(); +assertTrue(getMetricValue(producer, "txn-init-time-total") > 99); Review comment: I'm verifying that something was measured and that it's at least 1 tick of the clock. The clock is shared between multiple threads (e.g. the io threads) so the number of ticks depends on what threads get scheduled while we're in `initTransactions`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error
showuon commented on a change in pull request #11086: URL: https://github.com/apache/kafka/pull/11086#discussion_r695353891 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AlterConsumerGroupOffsetsHandler.java ## @@ -158,25 +158,24 @@ private void handleError( Set groupsToUnmap, Set groupsToRetry ) { +final String requestErrorMsg = "OffsetCommit request for group id {} returned error {}. Will retry."; switch (error) { -// If the coordinator is in the middle of loading, then we just need to retry. +// If the coordinator is in the middle of loading, or rebalance is in progress, then we just need to retry. case COORDINATOR_LOAD_IN_PROGRESS: -log.debug("OffsetCommit request for group id {} failed because the coordinator" + -" is still in the process of loading state. Will retry.", groupId.idValue); +case REBALANCE_IN_PROGRESS: +log.debug(requestErrorMsg, groupId.idValue, error); groupsToRetry.add(groupId); break; // If the coordinator is not available, then we unmap and retry. case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: -log.debug("OffsetCommit request for group id {} returned error {}. Will retry.", -groupId.idValue, error); +log.debug(requestErrorMsg, groupId.idValue, error); Review comment: Good to me. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11086: KAFKA-13103: add REBALANCE_IN_PROGRESS error as retriable error
showuon commented on pull request #11086: URL: https://github.com/apache/kafka/pull/11086#issuecomment-905139875 @dajac , thanks for the comment. I've updated the PR. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #11253: MINOR: Improve local variable name in UnifiedLog.maybeIncrementFirstUnstableOffset
kowshik commented on pull request #11253: URL: https://github.com/apache/kafka/pull/11253#issuecomment-905142894 cc @junrao @ijuma for review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11257: HOTFIX: fix backport of #11248 by future-proofing the EmbeddedKafkaCluster
ableegoldman commented on pull request #11257: URL: https://github.com/apache/kafka/pull/11257#issuecomment-905150624 Failures are unrelated are known to be flaky on older branches (`connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining`) -- will merge to unblock the 2.8 build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #11257: HOTFIX: fix backport of #11248 by future-proofing the EmbeddedKafkaCluster
ableegoldman merged pull request #11257: URL: https://github.com/apache/kafka/pull/11257 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10038) ConsumerPerformance.scala supports the setting of client.id
[ https://issues.apache.org/jira/browse/KAFKA-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404165#comment-17404165 ] Yanwen Lin commented on KAFKA-10038: Hi [~tigertan], I'm working on this issue and trying to understand the requirement here: * Seems like for any of the *ConsumerPerformance*, ProducerPerformance, ConsoleConsumer, ConsoleProducer, they already accept the property *client.id*. We can add this into a config file and send it via a option like consumer.config/producer.config (tried and this works). *So do you mean this task is more for: If no client.id is set, we should give a default one.* * You also mentioned that we can unify the way ConsoleConsumer and ConsoleProducer are handling the client.id. What do you mean by handling? Since only ConsoleProducer will set a default client.id. *By "unify", do you mean we should give default client.id in ConsoleConsumer?* * Why providing client.id can help do the quota testing? What is the connection btw those two? Hi [~showuon], since this is a simple change (provide default if not set client.id), *may I ask why do we need a KIP here?* Thanks! > ConsumerPerformance.scala supports the setting of client.id > --- > > Key: KAFKA-10038 > URL: https://issues.apache.org/jira/browse/KAFKA-10038 > Project: Kafka > Issue Type: Improvement > Components: consumer, core >Affects Versions: 2.1.1 > Environment: Trunk branch >Reporter: tigertan >Assignee: Yanwen Lin >Priority: Minor > Labels: newbie, performance > > ConsumerPerformance.scala supports the setting of "client.id", which is a > reasonable requirement, and the way "console consumer" and "console producer" > handle "client.id" can be unified. "client.id" defaults to > "perf-consumer-client". > We often use client.id in quotas, if the script of > kafka-producer-perf-test.sh supports the setting of "client.id" , we can do > quota testing through scripts without writing our own consumer programs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12933) Flaky test ReassignPartitionsIntegrationTest.testReassignmentWithAlterIsrDisabled
[ https://issues.apache.org/jira/browse/KAFKA-12933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404166#comment-17404166 ] Yanwen Lin commented on KAFKA-12933: Hi [~mjsax], I guess we shall mark this ticket as Resolved because [~david.mao] has already raised and merged a PR for this? [https://github.com/apache/kafka/pull/11244] CC: [~ijuma] > Flaky test > ReassignPartitionsIntegrationTest.testReassignmentWithAlterIsrDisabled > - > > Key: KAFKA-12933 > URL: https://issues.apache.org/jira/browse/KAFKA-12933 > Project: Kafka > Issue Type: Test > Components: admin >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > {quote}org.opentest4j.AssertionFailedError: expected: but was: > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) at > kafka.admin.ReassignPartitionsIntegrationTest.executeAndVerifyReassignment(ReassignPartitionsIntegrationTest.scala:130) > at > kafka.admin.ReassignPartitionsIntegrationTest.testReassignmentWithAlterIsrDisabled(ReassignPartitionsIntegrationTest.scala:74){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-1935) Consumer should use a separate socket for Coordinator connection
[ https://issues.apache.org/jira/browse/KAFKA-1935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404167#comment-17404167 ] Yanwen Lin commented on KAFKA-1935: --- Hi [~guozhang], is this still an issue currently? If so, I'd like to work this. > Consumer should use a separate socket for Coordinator connection > > > Key: KAFKA-1935 > URL: https://issues.apache.org/jira/browse/KAFKA-1935 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Guozhang Wang >Priority: Major > Labels: newbie > > KAFKA-1925 is just a quick-fix of this issue, we need to let consumer to be > able to create separate sockets for the same server for coordinator / broker > roles. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-6579) Consolidate window store and session store unit tests into a single class
[ https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404175#comment-17404175 ] Yanwen Lin commented on KAFKA-6579: --- Hi [~teamurko], are you still working on this? If not, I'd like to give a shot. > Consolidate window store and session store unit tests into a single class > - > > Key: KAFKA-6579 > URL: https://issues.apache.org/jira/browse/KAFKA-6579 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: newbie, unit-test > > For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared > among all its implementations; however for window and session stores, each > class has its own independent unit test classes that do not share the test > coverage. In fact, many of these test classes share the same unit test > functions (e.g. {{RocksDBWindowStoreTest}}, > {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}). > It is better to use the same pattern as for key value stores to consolidate > these test functions into a shared base class. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-6579) Consolidate window store and session store unit tests into a single class
[ https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yanwen Lin updated KAFKA-6579: -- Comment: was deleted (was: Hi [~teamurko], are you still working on this? If not, I'd like to give a shot.) > Consolidate window store and session store unit tests into a single class > - > > Key: KAFKA-6579 > URL: https://issues.apache.org/jira/browse/KAFKA-6579 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: newbie, unit-test > > For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared > among all its implementations; however for window and session stores, each > class has its own independent unit test classes that do not share the test > coverage. In fact, many of these test classes share the same unit test > functions (e.g. {{RocksDBWindowStoreTest}}, > {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}). > It is better to use the same pattern as for key value stores to consolidate > these test functions into a shared base class. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13032) Impossible stacktrace
[ https://issues.apache.org/jira/browse/KAFKA-13032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404177#comment-17404177 ] Yanwen Lin commented on KAFKA-13032: Hi [~mjsax], I have raised a PR for this and it's got approval. Could you please help take a look and maybe merge it (:)) ? Thanks! PR link: [https://github.com/apache/kafka/pull/11241] > Impossible stacktrace > - > > Key: KAFKA-13032 > URL: https://issues.apache.org/jira/browse/KAFKA-13032 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.8.0 >Reporter: Niclas Hedhman >Assignee: Yanwen Lin >Priority: Minor > Labels: beginner, easy-fix > > I am presented with a stacktrace that has not a single touch point in my > code, so it is incredibly difficult to figure out where the problem could be. > I think more RuntimeExceptions need to be caught and pull out information at > each level that is providing any additional hint of where we are. > For instance, each node could prepend its reference/name and one would have a > chance to see where we are... > ``` > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_140, processor=KSTREAM-SOURCE-00, topic=_poll, > partition=140, offset=0, stacktrace=java.lang.NullPointerException > at > org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > at > org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:268) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214) > at > org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:50) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > at > org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.me
[jira] [Commented] (KAFKA-5666) Need feedback to user if consumption fails due to offsets.topic.replication.factor=3
[ https://issues.apache.org/jira/browse/KAFKA-5666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404179#comment-17404179 ] Yanwen Lin commented on KAFKA-5666: --- Hi [~RensGroothuijsen], are you still working on this? If not, I'd like to give a try. Hi [~yevabyzek], is this still an issue currently? > Need feedback to user if consumption fails due to > offsets.topic.replication.factor=3 > > > Key: KAFKA-5666 > URL: https://issues.apache.org/jira/browse/KAFKA-5666 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Affects Versions: 0.11.0.0 >Reporter: Yeva Byzek >Priority: Major > Labels: newbie, usability > > Introduced in 0.11: The offsets.topic.replication.factor broker config is now > enforced upon auto topic creation. Internal auto topic creation will fail > with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets > this replication factor requirement. > Issue: Default is setting offsets.topic.replication.factor=3, but in > development and docker environments where there is only 1 broker, the offsets > topic will fail to be created when a consumer tries to consume and no records > will be returned. As a result, the user experience is bad. The user may > have no idea about this setting change and enforcement, and they just see > that `kafka-console-consumer` hangs with ZERO output. It is true that the > broker log file will provide a message (e.g. {{ERROR [KafkaApi-1] Number of > alive brokers '1' does not meet the required replication factor '3' for the > offsets topic (configured via 'offsets.topic.replication.factor'). This error > can be ignored if the cluster is starting up and not all brokers are up yet. > (kafka.server.KafkaApis)}}) but many users do not have access to the log > files or know how to get them. > Suggestion: give feedback to the user/app if offsets topic cannot be created. > For example, after some timeout. > Workaround: > Set offsets.topic.replication.factor=3 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10038) ConsumerPerformance.scala supports the setting of client.id
[ https://issues.apache.org/jira/browse/KAFKA-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404185#comment-17404185 ] Luke Chen commented on KAFKA-10038: --- If it's just to provide default value when not provided, then no KIP is needed. Thanks. > ConsumerPerformance.scala supports the setting of client.id > --- > > Key: KAFKA-10038 > URL: https://issues.apache.org/jira/browse/KAFKA-10038 > Project: Kafka > Issue Type: Improvement > Components: consumer, core >Affects Versions: 2.1.1 > Environment: Trunk branch >Reporter: tigertan >Assignee: Yanwen Lin >Priority: Minor > Labels: newbie, performance > > ConsumerPerformance.scala supports the setting of "client.id", which is a > reasonable requirement, and the way "console consumer" and "console producer" > handle "client.id" can be unified. "client.id" defaults to > "perf-consumer-client". > We often use client.id in quotas, if the script of > kafka-producer-perf-test.sh supports the setting of "client.id" , we can do > quota testing through scripts without writing our own consumer programs. -- This message was sent by Atlassian Jira (v8.3.4#803005)