[GitHub] [kafka] dajac commented on pull request #11201: MINOR: fix mbean tag name ordering in JMX reporter
dajac commented on pull request #11201: URL: https://github.com/apache/kafka/pull/11201#issuecomment-897421076 @xvrl Thanks for the patch. I think that ordering tags make sense. However, I am a little concerned by the potential implication of changing this. I vaguely recall having to craft mbeans names with tags in a specific order to make the datadog agent work for instance. Could it break existing metric collectors if the order changes? I might be completely wrong though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12835) Topic IDs can mismatch on brokers (after interbroker protocol version update)
[ https://issues.apache.org/jira/browse/KAFKA-12835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-12835: Fix Version/s: 2.8.1 > Topic IDs can mismatch on brokers (after interbroker protocol version update) > - > > Key: KAFKA-12835 > URL: https://issues.apache.org/jira/browse/KAFKA-12835 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 >Reporter: Ivan Yurchenko >Assignee: Justine Olshan >Priority: Major > Fix For: 3.0.0, 2.8.1 > > > We had a Kafka cluster running 2.8 version with interbroker protocol set to > 2.7. It had a number of topics and everything was fine. > Then we decided to update the interbroker protocol to 2.8 by the following > procedure: > 1. Run new brokers with the interbroker protocol set to 2.8. > 2. Move the data from the old brokers to the new ones (normal partition > reassignment API). > 3. Decommission the old brokers. > At the stage 2 we had the problem: old brokers started failing on > {{LeaderAndIsrRequest}} handling with > {code:java} > ERROR [Broker id=<...>] Topic Id in memory: <...> does not match the topic Id > for partition <...> provided in the request: <...>. (state.change.logger) > {code} > for multiple topics. Topics were not recreated. > We checked {{partition.metadata}} files and IDs there were indeed different > from the values in ZooKeeper. It was fixed by deleting the metadata files > (and letting them be recreated). > > The logs, unfortunately, didn't show anything that might point to the cause > of the issue (or it happened longer ago than we store the logs). > We tried to reproduce this also, but no success. > If the community can point out what to check or beware of in future, it will > be great. We'll be happy to provide additional information if needed. Thank > you! > Sorry for the ticket that might be not very actionable. We hope to at least > rise awareness of this issue. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13014) KAFKA-Stream stucked when the offset is no more existing
[ https://issues.apache.org/jira/browse/KAFKA-13014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17397957#comment-17397957 ] Ahmed Toumi commented on KAFKA-13014: - thanks [~mjsax] : but the problem was more complicated then i thought. It's the problem of mixing exactly_once with using a state store When the application has cashed without waiting the shutdown of the stream, the transactions of the latest message of the changelog topic is aborted, so when we restarted the kafka-stream the topology wait to reload the local rocksdb store before starting consumig messages. And the bug is there, because they check that using consumer-metadate "topic.lastoffset" == curent_consumer_offset Bu it should be like that: consumer-metadate "topic.last_commited_message_and_transaction_offset" == curent_consumer_offset I fixed that by switching to at_least_one, but I think that It was fixed on 2.7.1 > KAFKA-Stream stucked when the offset is no more existing > > > Key: KAFKA-13014 > URL: https://issues.apache.org/jira/browse/KAFKA-13014 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, offset manager, streams >Affects Versions: 2.7.0 > Environment: PROD >Reporter: Ahmed Toumi >Priority: Major > Attachments: image-2021-06-30-11-10-31-028.png > > > We have kafka-stream with multiple instances and threads. > This kafka-stream consume from a lot of topics. > One of the topic partitions wasn't accessible for a day and the retention of > the topic is 4 Hours. > After fixing the problem, the kafka-stream is trying to consume from an > offset that does ot exist anymore: > * Kafka-consumer-group describe: > !image-2021-06-30-11-10-31-028.png! > We can see that the current offset that the KS is waiting for is *59754934* > but the new first offset of this partition is *264896001*. > The problem that the Kafka-stream does not throw any exception > that's the only log what i'm seeing > > {code:java} > 08:44:53.924 > [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] > INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer, > groupId=talaria-data-mixed-prod] Updating assignment with08:44:53.924 > [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] > INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer, > groupId=talaria-data-mixed-prod] Updating assignment with Assigned > partitions: [adm__article_ean_repartition_v3-10, > adm__article_itm_repartition_v3-10, adm__article_sign_repartition_v3-10, > adm__article_stock_repartition_v3-10] Current owned partitions: > [adm__article_ean_repartition_v3-10, adm__article_itm_repartition_v3-10, > adm__article_sign_repartition_v3-10, adm__article_stock_repartition_v3-10] > Added partitions (assigned - owned): [] Revoked partitions (owned - > assigned): [] 08:44:53.924 > [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] > INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer, > groupId=talaria-data-mixed-prod] Notifying assignor about the new > Assignment(partitions=[adm__article_stock_repartition_v3-10, > adm__article_sign_repartition_v3-10, adm__article_itm_repartition_v3-10, > adm__article_ean_repartition_v3-10], userDataSize=398)08:44:53.924 > [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] > INFO o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread > [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer] > No followup rebalance was requested, resetting the rebalance > schedule.08:44:53.924 > [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] > INFO o.a.k.s.p.internals.TaskManager - stream-thread > [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] > Handle new assignment with: New active tasks: [0_10] New standby tasks: > [0_17, 0_21] Existing active tasks: [0_10] Existing standby tasks: [0_17, > 0_21]08:44:53.924 > [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] > INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer, > groupId=talaria-data-mixed-prod] Adding newly assigned partitions: > {code} > > PI: version broker kafka : 5.3.4-ccs -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] xdgrulez commented on pull request #10897: MINOR: Reduced severity for "skipping records" falling out of time windows
xdgrulez commented on pull request #10897: URL: https://github.com/apache/kafka/pull/10897#issuecomment-897524395 Hi, will do that as soon as I have some time - and sorry for not finding time yet… If anyone else would have some more time on her/his hand - you basically just would have to spot all tests checking for the respective warn log messages and change them to debug… Best, Ralph > Am 28.07.2021 um 23:27 schrieb A. Sophie Blee-Goldman ***@***.***>: > > > Hey @xdgrulez , this PR has a number of test failures. It looks like there are quite a few tests that are verifying the existence of this specific message, and failing because the logs default to INFO. Can you look into those and then run the full set of streams tests once you think you've fixed everything to make sure no new failures have come up since then? > Thanks! > > — > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub, or unsubscribe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11200: KAFKA-13192: Prevent inconsistent broker.id/node.id values
showuon commented on a change in pull request #11200: URL: https://github.com/apache/kafka/pull/11200#discussion_r687624316 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1905,6 +1905,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO @nowarn("cat=deprecation") private def validateValues(): Unit = { +val nodeIdValue = values.get(KafkaConfig.NodeIdProp).asInstanceOf[Int] +if (nodeIdValue >= 0) { + val brokerIdValue = values.get(KafkaConfig.BrokerIdProp).asInstanceOf[Int] + if (brokerIdValue != Defaults.BrokerId && brokerIdValue != nodeIdValue) { +throw new ConfigException(s"The values for broker.id ($brokerIdValue) and node.id ($nodeIdValue) must be the same if both are specified") + } +} Review comment: Do you think we can put this check into below else block for KRaft-based metadata quorum case only? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] socutes opened a new pull request #11205: KAFKA-10900: Add metrics enumerated in KIP-630
socutes opened a new pull request #11205: URL: https://github.com/apache/kafka/pull/11205 KIP-630 enumerates a few metrics. Makes sure that those metrics are implemented. Add the following Metrics. kafka.controller:type=KafkaController,name=GenSnapshotLatencyMs | A histogram of the amount of time it took to generate a snapshot. kafka.controller:type=KafkaController,name=LoadSnapshotLatencyMs | A histogram of the amount of time it took to load the snapshot. kafka.controller:type=KafkaController,name=SnapshotLag | The number of offsets between the largest snapshot offset and the high-watermark. kafka.controller:type=KafkaController,name=SnapshotSizeBytes | Size of the latest snapshot in bytes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #11206: MINOR: Update streams doc to close KeyValueIterator in example code
showuon opened a new pull request #11206: URL: https://github.com/apache/kafka/pull/11206 Update the streams doc: 1. close `KeyValueIterator` in example code https://user-images.githubusercontent.com/43372967/129201334-326ff4f9-e06e-426d-9d2d-ce70e13cbf9c.png";> 2. Fix the example code indent https://user-images.githubusercontent.com/43372967/129201406-632b62a5-9062-4a1f-b529-103f92966541.png";> 3. Add missing comma between key/value https://user-images.githubusercontent.com/43372967/129201511-e69b6225-9e8c-4f08-b9a2-615aee21e52b.png";> ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11206: MINOR: Update streams doc to close KeyValueIterator in example code
showuon commented on pull request #11206: URL: https://github.com/apache/kafka/pull/11206#issuecomment-897622337 @bbejeck , please help review. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ryannedolan commented on a change in pull request #10277: KAFKA-9914: Fix replication cycle detection
ryannedolan commented on a change in pull request #10277: URL: https://github.com/apache/kafka/pull/10277#discussion_r687741760 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java ## @@ -489,7 +489,17 @@ boolean isCycle(String topic) { String source = replicationPolicy.topicSource(topic); if (source == null) { return false; -} else if (source.equals(sourceAndTarget.target())) { +} + +// Fix for https://issues.apache.org/jira/browse/KAFKA-9914 +final boolean condition; +if (replicationPolicy instanceof IdentityReplicationPolicy) { Review comment: Moving isCycle to ReplicationPolicy might make sense. Wondering what the use-case would be for a custom isCycle tho? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13195) StateSerde don't honor DeserializationExceptionHandler
Ludo created KAFKA-13195: Summary: StateSerde don't honor DeserializationExceptionHandler Key: KAFKA-13195 URL: https://issues.apache.org/jira/browse/KAFKA-13195 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.8.0 Reporter: Ludo Kafka streams allow to configure an [DeserializationExceptionHandler|https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling] When you are using a StateStore most of message will be a copy of original message in internal topic and mostly will use the same serializer if the message is another type. You can see [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java#L159-L161] that StateSerde is using the raw Deserializer and not honor the {{StreamsConfig.}}{{DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG}}. Leading to crash the application (reaching the {{setUncaughtExceptionHandler}} method). I think the state store must have the same behavior than the {{RecordDeserializer}} and honor the DeserializationExceptionHandler. Stacktrace (coming from kafka stream 2.6.1) : {code:java} Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing !Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing !org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_14, processor=workertaskjoined-repartition-source, topic=kestra_executor-workertaskjoined-repartition, partition=14, offset=167500, stacktrace=org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize value of type `io.kestra.plugin.gcp.bigquery.ExtractToGcs$Format` from String "txt": not one of the values accepted for Enum class: [NEWLINE_DELIMITED_JSON, AVRO, PARQUET, CSV] at [Source: (byte[])"{[truncated 1270 bytes]; line: 1, column: 1187] (through reference chain: io.kestra.core.models.flows.Flow["tasks"]->java.util.ArrayList[1]->org.kestra.task.gcp.bigquery.ExtractToGcs["format"]) at com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67) at com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1851) at com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:1079) at com.fasterxml.jackson.databind.deser.std.EnumDeserializer._deserializeAltString(EnumDeserializer.java:327) at com.fasterxml.jackson.databind.deser.std.EnumDeserializer._fromString(EnumDeserializer.java:214) at com.fasterxml.jackson.databind.deser.std.EnumDeserializer.deserialize(EnumDeserializer.java:188) at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138) at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:324) at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:225) at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:197) at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:137) at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:107) at com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:263) at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:357) at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244) at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28) at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:542) at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:565) at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:449) at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1405) at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:362) at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:195) at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4593) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3609) at
[GitHub] [kafka] tvainika commented on a change in pull request #10277: KAFKA-9914: Fix replication cycle detection
tvainika commented on a change in pull request #10277: URL: https://github.com/apache/kafka/pull/10277#discussion_r687772348 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java ## @@ -489,7 +489,17 @@ boolean isCycle(String topic) { String source = replicationPolicy.topicSource(topic); if (source == null) { return false; -} else if (source.equals(sourceAndTarget.target())) { +} + +// Fix for https://issues.apache.org/jira/browse/KAFKA-9914 +final boolean condition; +if (replicationPolicy instanceof IdentityReplicationPolicy) { Review comment: Currently `MirrorSourceConnector.isCycle` is only called from `MirrorSourceConnector.shouldReplicateTopic`, which is the method that needs to make decision based on `ReplicationPolicy` used. However I'm out of ideas how to detect without adding some methods into `ReplicationPolicy` interface, and modifying the interface feels like somewhat bigger effort. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13196) MirrorMaker 2 not always starting tasks after upgrade 2.4.0 -> 2.7.1
Jozef Vilcek created KAFKA-13196: Summary: MirrorMaker 2 not always starting tasks after upgrade 2.4.0 -> 2.7.1 Key: KAFKA-13196 URL: https://issues.apache.org/jira/browse/KAFKA-13196 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 2.7.1 Reporter: Jozef Vilcek I am using MirrorMaker 2.0 and running it via [ MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java] class. This method will start up `DistributedHerder` and will use non-functional `advertisedUrl`, and therefore workers can not talk to each other and coordinate. After upgrading from version `2.4.0` to `2.7.1` I did notice that when I am starting up mirror maker ti does not always start tasks - just connector is executing. Doing some amount of stop and start it will eventually start tasks too. After a bit of digging I did noticed that in attempt to configure connector's task, code ends up in this [1] branch, where configure request is being forwarded to the leader. From some reason, task configuration is not done on leader. However, MirrorMaker does not pack RestServer and therefore that request will never succeed. I am not sure what is going no or why it does seem to work better on 2.4.0. I noticed that connector start procedure did involve less callback on 2.4.0 in connector start sequence [2]. I believe the root cause is change in startup procedure of connectors in general. In vestion 2.4, when connector is started by the leader here [1] it will immediately setup and submit configuration for connector's tasks. However, in 2.7.1, is it more asynchronous. Connector is started here [2] Does this look like a bug? Or am I doing something wrong? [1] https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494 [2] https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13196) MirrorMaker 2 not always starting tasks
[ https://issues.apache.org/jira/browse/KAFKA-13196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jozef Vilcek updated KAFKA-13196: - Summary: MirrorMaker 2 not always starting tasks (was: MirrorMaker 2 not always starting tasks after upgrade 2.4.0 -> 2.7.1) > MirrorMaker 2 not always starting tasks > --- > > Key: KAFKA-13196 > URL: https://issues.apache.org/jira/browse/KAFKA-13196 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.1 >Reporter: Jozef Vilcek >Priority: Major > > I am using MirrorMaker 2.0 and running it via [ > MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java] > class. This method will start up `DistributedHerder` and will use > non-functional `advertisedUrl`, and therefore workers can not talk to each > other and coordinate. > After upgrading from version `2.4.0` to `2.7.1` I did notice that when I am > starting up mirror maker ti does not always start tasks - just connector is > executing. Doing some amount of stop and start it will eventually start tasks > too. > After a bit of digging I did noticed that in attempt to configure connector's > task, code ends up in this [1] branch, where configure request is being > forwarded to the leader. From some reason, task configuration is not done on > leader. However, MirrorMaker does not pack RestServer and therefore that > request will never succeed. > I am not sure what is going no or why it does seem to work better on 2.4.0. I > noticed that connector start procedure did involve less callback on 2.4.0 in > connector start sequence [2]. > I believe the root cause is change in startup procedure of connectors in > general. In vestion 2.4, when connector is started by the leader here [1] it > will immediately setup and submit configuration for connector's tasks. > However, in 2.7.1, is it more asynchronous. Connector is started here [2] > Does this look like a bug? Or am I doing something wrong? > > [1] > https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494 > [2] > https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13196) MirrorMaker 2 not always start tasks
[ https://issues.apache.org/jira/browse/KAFKA-13196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jozef Vilcek updated KAFKA-13196: - Description: I am using MirrorMaker 2.0 and running it via [ MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java] class. This method will start up `DistributedHerder` and will use non-functional `advertisedUrl`, and therefore workers can not talk to each other and coordinate. After upgrading from version `2.4.0` to `2.7.1` I did notice that when I am starting up mirror maker ti does not always start tasks - just connector is executing. Doing some amount of stop and start it will eventually start tasks too. After a bit of digging I did noticed that in attempt to configure connector's task, code ends up in this [1] branch, where configure request is being forwarded to the leader. From some reason, task configuration is not done on leader. However, MirrorMaker does not pack RestServer and therefore that request will never succeed. I am not sure what is going no or why it does seem to work better on 2.4.0. I noticed that connector start procedure did involve less callbacks on 2.4.0 in connector start sequence [2]. Does this look like a bug? Or am I doing something wrong? [1] [https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494] [2] [https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236] was: I am using MirrorMaker 2.0 and running it via [ MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java] class. This method will start up `DistributedHerder` and will use non-functional `advertisedUrl`, and therefore workers can not talk to each other and coordinate. After upgrading from version `2.4.0` to `2.7.1` I did notice that when I am starting up mirror maker ti does not always start tasks - just connector is executing. Doing some amount of stop and start it will eventually start tasks too. After a bit of digging I did noticed that in attempt to configure connector's task, code ends up in this [1] branch, where configure request is being forwarded to the leader. From some reason, task configuration is not done on leader. However, MirrorMaker does not pack RestServer and therefore that request will never succeed. I am not sure what is going no or why it does seem to work better on 2.4.0. I noticed that connector start procedure did involve less callback on 2.4.0 in connector start sequence [2]. I believe the root cause is change in startup procedure of connectors in general. In vestion 2.4, when connector is started by the leader here [1] it will immediately setup and submit configuration for connector's tasks. However, in 2.7.1, is it more asynchronous. Connector is started here [2] Does this look like a bug? Or am I doing something wrong? [1] https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494 [2] https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236 > MirrorMaker 2 not always start tasks > > > Key: KAFKA-13196 > URL: https://issues.apache.org/jira/browse/KAFKA-13196 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.1 >Reporter: Jozef Vilcek >Priority: Major > > I am using MirrorMaker 2.0 and running it via [ > MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java] > class. This method will start up `DistributedHerder` and will use > non-functional `advertisedUrl`, and therefore workers can not talk to each > other and coordinate. > After upgrading from version `2.4.0` to `2.7.1` I did notice that when I am > starting up mirror maker ti does not always start tasks - just connector is > executing. Doing some amount of stop and start it will eventually start tasks > too. > After a bit of digging I did noticed that in attempt to configure connector's > task, code ends up in this [1] branch, where configure request is being > forwarded to the leader. From some reason, task configuration is not done on > leader. However, MirrorMaker does not pack RestServer and therefore that > request will never succeed. > I am not sure what is going no or why it does seem to work better on 2.4.0. I > noticed that connector start procedure did involve less callbacks on 2.4.0 in > connector start sequence
[jira] [Updated] (KAFKA-13196) MirrorMaker 2 not always start tasks
[ https://issues.apache.org/jira/browse/KAFKA-13196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jozef Vilcek updated KAFKA-13196: - Description: I am using MirrorMaker 2.0 and running it via [ MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java] class. This method will start up `DistributedHerder` and will use non-functional `advertisedUrl`, and therefore workers can not talk to each other and coordinate. After upgrading from version `2.4.0` to `2.7.1` I did notice that when I am starting up mirror maker ti does not always start tasks - just connector is executing. Doing some amount of stop and start it will eventually start tasks too. After a bit of digging I did noticed that in attempt to configure connector's task, code ends up in this [1] branch, where configure request is being forwarded to the leader. From some reason, task configuration is not done on leader. However, MirrorMaker does not pack RestServer and therefore that request will never succeed. I am not sure what is going no or why it does seem to work better on 2.4.0. I noticed that connector start procedure did involve less callbacks on 2.4.0 in connector start sequence [2]. [1] [https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494] [2] [https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236] was: I am using MirrorMaker 2.0 and running it via [ MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java] class. This method will start up `DistributedHerder` and will use non-functional `advertisedUrl`, and therefore workers can not talk to each other and coordinate. After upgrading from version `2.4.0` to `2.7.1` I did notice that when I am starting up mirror maker ti does not always start tasks - just connector is executing. Doing some amount of stop and start it will eventually start tasks too. After a bit of digging I did noticed that in attempt to configure connector's task, code ends up in this [1] branch, where configure request is being forwarded to the leader. From some reason, task configuration is not done on leader. However, MirrorMaker does not pack RestServer and therefore that request will never succeed. I am not sure what is going no or why it does seem to work better on 2.4.0. I noticed that connector start procedure did involve less callbacks on 2.4.0 in connector start sequence [2]. Does this look like a bug? Or am I doing something wrong? [1] [https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494] [2] [https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236] > MirrorMaker 2 not always start tasks > > > Key: KAFKA-13196 > URL: https://issues.apache.org/jira/browse/KAFKA-13196 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.1 >Reporter: Jozef Vilcek >Priority: Major > > I am using MirrorMaker 2.0 and running it via [ > MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java] > class. This method will start up `DistributedHerder` and will use > non-functional `advertisedUrl`, and therefore workers can not talk to each > other and coordinate. > After upgrading from version `2.4.0` to `2.7.1` I did notice that when I am > starting up mirror maker ti does not always start tasks - just connector is > executing. Doing some amount of stop and start it will eventually start tasks > too. > After a bit of digging I did noticed that in attempt to configure connector's > task, code ends up in this [1] branch, where configure request is being > forwarded to the leader. From some reason, task configuration is not done on > leader. However, MirrorMaker does not pack RestServer and therefore that > request will never succeed. > I am not sure what is going no or why it does seem to work better on 2.4.0. I > noticed that connector start procedure did involve less callbacks on 2.4.0 in > connector start sequence [2]. > > > [1] > [https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494] > [2] > [https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236] -- This message was se
[jira] [Updated] (KAFKA-13196) MirrorMaker 2 not always start tasks
[ https://issues.apache.org/jira/browse/KAFKA-13196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jozef Vilcek updated KAFKA-13196: - Summary: MirrorMaker 2 not always start tasks (was: MirrorMaker 2 not always starting tasks) > MirrorMaker 2 not always start tasks > > > Key: KAFKA-13196 > URL: https://issues.apache.org/jira/browse/KAFKA-13196 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.1 >Reporter: Jozef Vilcek >Priority: Major > > I am using MirrorMaker 2.0 and running it via [ > MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java] > class. This method will start up `DistributedHerder` and will use > non-functional `advertisedUrl`, and therefore workers can not talk to each > other and coordinate. > After upgrading from version `2.4.0` to `2.7.1` I did notice that when I am > starting up mirror maker ti does not always start tasks - just connector is > executing. Doing some amount of stop and start it will eventually start tasks > too. > After a bit of digging I did noticed that in attempt to configure connector's > task, code ends up in this [1] branch, where configure request is being > forwarded to the leader. From some reason, task configuration is not done on > leader. However, MirrorMaker does not pack RestServer and therefore that > request will never succeed. > I am not sure what is going no or why it does seem to work better on 2.4.0. I > noticed that connector start procedure did involve less callback on 2.4.0 in > connector start sequence [2]. > I believe the root cause is change in startup procedure of connectors in > general. In vestion 2.4, when connector is started by the leader here [1] it > will immediately setup and submit configuration for connector's tasks. > However, in 2.7.1, is it more asynchronous. Connector is started here [2] > Does this look like a bug? Or am I doing something wrong? > > [1] > https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494 > [2] > https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13197) KStream-GlobalKTable join semantics don't match documentation
Tommy Becker created KAFKA-13197: Summary: KStream-GlobalKTable join semantics don't match documentation Key: KAFKA-13197 URL: https://issues.apache.org/jira/browse/KAFKA-13197 Project: Kafka Issue Type: Bug Affects Versions: 2.7.0 Reporter: Tommy Becker As part of KAFKA-10277, the behavior of KStream-GlobalKTable joins was changed. It appears the change was intended to merely relax a requirement but it actually broke backwards compatibility. Although it does allow {{null}} keys and values in the KStream to be joined, it now excludes {{null}} results of the {{KeyValueMapper}}. We have an application which can return {{null}} from the {{KeyValueMapper}} for non-null keys in the KStream, and relies on these nulls being passed to the {{ValueJoiner}}. Indeed the javadoc still explicitly says this is done: {quote}If a KStream input record key or value is null the record will not be included in the join operation and thus no output record will be added to the resulting KStream. If keyValueMapper returns null implying no match exists, a null value will be provided to ValueJoiner. {quote} Both these statements are incorrect. I think the new behavior is worse than the previous/documented behavior. It feels more reasonable to have a non-null stream record map to a null join key (our use-case is event-enhancement where the incoming record doesn't have the join field), than the reverse. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] SkyTianTian commented on pull request #10277: KAFKA-9914: Fix replication cycle detection
SkyTianTian commented on pull request #10277: URL: https://github.com/apache/kafka/pull/10277#issuecomment-897736948 Hi, is there anyone who knows any workaround to avoid this heartbeat issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11200: KAFKA-13192: Prevent inconsistent broker.id/node.id values
jsancio commented on a change in pull request #11200: URL: https://github.com/apache/kafka/pull/11200#discussion_r687854292 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1905,6 +1905,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO @nowarn("cat=deprecation") private def validateValues(): Unit = { +val nodeIdValue = values.get(KafkaConfig.NodeIdProp).asInstanceOf[Int] +if (nodeIdValue >= 0) { + val brokerIdValue = values.get(KafkaConfig.BrokerIdProp).asInstanceOf[Int] Review comment: ```suggestion val nodeIdValue = getInt(KafkaConfig.NodeIdProp) if (nodeIdValue >= 0) { val brokerIdValue = getInt(KafkaConfig.BrokerIdProp) ``` For consistency, this type prefers to use `getInt` which does the casting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] SkyTianTian edited a comment on pull request #10277: KAFKA-9914: Fix replication cycle detection
SkyTianTian edited a comment on pull request #10277: URL: https://github.com/apache/kafka/pull/10277#issuecomment-897736948 Hi, is there anyone who knows any workaround to avoid this heartbeat issue temporarily? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11191: KAFKA-13173 Making the controller fence stale brokers one at a time if multiple stale brokers are discovered at the same time
jsancio commented on a change in pull request #11191: URL: https://github.com/apache/kafka/pull/11191#discussion_r687047710 ## File path: metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java ## @@ -161,6 +163,14 @@ public void deactivate() { return brokerRegistrations; } +Set fencedBrokerIds() { Review comment: Looks like this method is only used for tests. How about moving this to `ReplicationControlManagerTest`? ## File path: metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java ## @@ -420,22 +419,22 @@ long nextCheckTimeNs() { } /** - * Find the stale brokers which haven't heartbeated in a long time, and which need to - * be fenced. + * Check if the oldest broker to have hearbeated has already violated the + * sessionTimeoutNs timeout and needs to be fenced. * - * @return A list of node IDs. + * @return An Optional broker node id. */ -List findStaleBrokers() { -List nodes = new ArrayList<>(); +Optional findOneStaleBroker() { BrokerHeartbeatStateIterator iterator = unfenced.iterator(); -while (iterator.hasNext()) { +if (iterator.hasNext()) { Review comment: Isn't `unfenced.first()` suppose to return the oldest heartbeat? If so, can use that method and check for `null`? ## File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java ## @@ -173,6 +176,121 @@ private void testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv, assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), future1.get()); } +@Test +public void testFenceMultipleBrokers() throws Throwable { +int brokerCount = 5; +int brokersToKeepUnfenced = 1; +short replicationFactor = 5; +Long sessionTimeoutSec = 1L; +Long sleepMillis = (sessionTimeoutSec * 1000) / 2; + +try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty())) { +try (QuorumControllerTestEnv controlEnv = +new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) { +ListenerCollection listeners = new ListenerCollection(); +listeners.add(new Listener().setName("PLAINTEXT"). +setHost("localhost").setPort(9092)); +QuorumController active = controlEnv.activeController(); +Map brokerEpochs = new HashMap<>(); +for (int brokerId = 0; brokerId < brokerCount; brokerId++) { +CompletableFuture reply = active.registerBroker( +new BrokerRegistrationRequestData(). +setBrokerId(brokerId). +setClusterId("06B-K3N1TBCNYFgruEVP0Q"). + setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")). +setListeners(listeners)); +brokerEpochs.put(brokerId, reply.get().epoch()); +} + +// Brokers are only registered but still fenced +// Topic creation with no available unfenced brokers should fail +CreateTopicsRequestData createTopicsRequestData = +new CreateTopicsRequestData().setTopics( +new CreatableTopicCollection(Collections.singleton( +new CreatableTopic().setName("foo").setNumPartitions(1). + setReplicationFactor(replicationFactor)).iterator())); +CreateTopicsResponseData createTopicsResponseData = active.createTopics( +createTopicsRequestData).get(); +assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), +createTopicsResponseData.topics().find("foo").errorCode()); +assertEquals("Unable to replicate the partition " + replicationFactor + " time(s): All brokers " + +"are currently fenced.", createTopicsResponseData.topics().find("foo").errorMessage()); + +// Unfence all brokers +sendBrokerheartbeat(active, brokerCount, brokerEpochs); +createTopicsResponseData = active.createTopics( +createTopicsRequestData).get(); +assertEquals(Errors.NONE.code(), createTopicsResponseData.topics().find("foo").errorCode()); +Uuid topicIdFoo = createTopicsResponseData.topics().find("foo").topicId(); +sendBrokerheartbeat(active, brokerCount, brokerEpochs); +Thread.sleep(sleepMillis); Review comment: Can we remove this sleep? What are we trying to test with this sleep? ## File path: metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java ## @@ -
[GitHub] [kafka] socutes closed pull request #11205: KAFKA-10900: Add metrics enumerated in KIP-630
socutes closed pull request #11205: URL: https://github.com/apache/kafka/pull/11205 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13188) Release the memory back into MemoryPool
[ https://issues.apache.org/jira/browse/KAFKA-13188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17398148#comment-17398148 ] Alok Nikhil commented on KAFKA-13188: - [~luwang] I see that the consumer side memory pool management KIP was never implemented - [https://cwiki.apache.org/confluence/display/KAFKA/KIP-81:+Bound+Fetch+memory+usage+in+the+consumer] The default is `MemoryPool.NONE`. How did LinkedIn measure the performance here? Is there a customization / config that allows the consumer to use a different memory pool? > Release the memory back into MemoryPool > --- > > Key: KAFKA-13188 > URL: https://issues.apache.org/jira/browse/KAFKA-13188 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Alok Nikhil >Priority: Major > Fix For: 3.0.1 > > > Tushar made a [hotfix change|https://github.com/linkedin/kafka/pull/186] to > the linkedin/kafka repo hosting apache kafka 2.4. > The change is about releasing memory back to the MemoryPool for the kafka > consumer, and his benchmark showed significant improvement in terms of the > memory graduating from Young Gen and promoted to Old Gen. > Given the benefit, the change should also be added trunk. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #11191: KAFKA-13173 Making the controller fence stale brokers one at a time if multiple stale brokers are discovered at the same time
hachikuji commented on pull request #11191: URL: https://github.com/apache/kafka/pull/11191#issuecomment-897785278 @niket-goel I'm inclined to merge this as is so that we don't need to wait for another build. Perhaps you can submit a small follow-up to address @jsancio 's comments? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #11191: KAFKA-13173 Making the controller fence stale brokers one at a time if multiple stale brokers are discovered at the same time
hachikuji merged pull request #11191: URL: https://github.com/apache/kafka/pull/11191 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] socutes opened a new pull request #11207: KAFKA-10900: Add metrics enumerated in KIP-630
socutes opened a new pull request #11207: URL: https://github.com/apache/kafka/pull/11207 KIP-630 enumerates a few metrics. Makes sure that those metrics are implemented. Add the following Metrics. kafka.controller:type=KafkaController,name=GenSnapshotLatencyMs , A histogram of the amount of time it took to generate a snapshot. kafka.controller:type=KafkaController,name=LoadSnapshotLatencyMs , A histogram of the amount of time it took to load the snapshot. kafka.controller:type=KafkaController,name=SnapshotLag ,The number of offsets between the largest snapshot offset and the high-watermark. kafka.controller:type=KafkaController,name=SnapshotSizeBytes , Size of the latest snapshot in bytes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] socutes commented on pull request #11207: KAFKA-10900: Add metrics enumerated in KIP-630
socutes commented on pull request #11207: URL: https://github.com/apache/kafka/pull/11207#issuecomment-897797158 @jsancio @cmccabe please help review this pr. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13016) Interpret snapshot header version to correctly parse the snapshot
[ https://issues.apache.org/jira/browse/KAFKA-13016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loboxu reassigned KAFKA-13016: -- Assignee: loboxu > Interpret snapshot header version to correctly parse the snapshot > - > > Key: KAFKA-13016 > URL: https://issues.apache.org/jira/browse/KAFKA-13016 > Project: Kafka > Issue Type: Sub-task >Reporter: Niket Goel >Assignee: loboxu >Priority: Minor > > https://issues.apache.org/jira/browse/KAFKA-12952 adds delimiters to the > snapshots. These delimiters serve as start and end markers for the snapshots > and also contain some metadata information about the snapshots. The snapshot > consumers need to interpret the version within the header to correctly parse > the schema of the snapshot being consumed or throw meaningful errors when > consuming incompatible snapshot versions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12956) Validate the snapshot id when the state machine freeze a snapshot
[ https://issues.apache.org/jira/browse/KAFKA-12956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loboxu reassigned KAFKA-12956: -- Assignee: loboxu > Validate the snapshot id when the state machine freeze a snapshot > - > > Key: KAFKA-12956 > URL: https://issues.apache.org/jira/browse/KAFKA-12956 > Project: Kafka > Issue Type: Sub-task >Reporter: Haoran Xuan >Assignee: loboxu >Priority: Major > > This is similar to KAFKA-10800, in this PR, optionally validate the snapshot > id when `onSnapshotFrozen` is being called. The validation logic will be > implemented in KAFKA-10800, and this Jira is supposed to directly call that > logic. > Currently, the `onSnapshotFrozen` can be called by `KafkaRaftClient` and > `SnapshotWriter`. Do not validate if it is called by `KafkaRaftClient` when > it's downloading snapshot from leader, do validate if it is called by > `SnapshotWriter` which implies generating a specific snapshot. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] niket-goel commented on pull request #11191: KAFKA-13173 Making the controller fence stale brokers one at a time if multiple stale brokers are discovered at the same time
niket-goel commented on pull request #11191: URL: https://github.com/apache/kafka/pull/11191#issuecomment-897806958 @hachikuji Will follow up with a follow-up. Thanks for merging this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] niket-goel opened a new pull request #11208: MINOR: Refactored BrokerHeartbeatManager::findOneStaleBroker to not use
niket-goel opened a new pull request #11208: URL: https://github.com/apache/kafka/pull/11208 Some code refactoring based on https://github.com/apache/kafka/pull/11191 feedback -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13162) ElectLeader API must be forwarded to Controller
[ https://issues.apache.org/jira/browse/KAFKA-13162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17398209#comment-17398209 ] Jason Gustafson commented on KAFKA-13162: - This has turned out to be more work than expected. I am going to change the target version to 3.0.1. cc [~kkonstantine] > ElectLeader API must be forwarded to Controller > --- > > Key: KAFKA-13162 > URL: https://issues.apache.org/jira/browse/KAFKA-13162 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 3.0.0 > > > We're missing the logic to forward ElectLeaders requests to the controller. > This means that `kafka-leader-election.sh` does not work correctly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13162) ElectLeader API must be forwarded to Controller
[ https://issues.apache.org/jira/browse/KAFKA-13162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13162: Fix Version/s: (was: 3.0.0) 3.0.1 > ElectLeader API must be forwarded to Controller > --- > > Key: KAFKA-13162 > URL: https://issues.apache.org/jira/browse/KAFKA-13162 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 3.0.1 > > > We're missing the logic to forward ElectLeaders requests to the controller. > This means that `kafka-leader-election.sh` does not work correctly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly
[ https://issues.apache.org/jira/browse/KAFKA-13173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-13173. -- Resolution: Fixed > KRaft controller does not handle simultaneous broker expirations correctly > -- > > Key: KAFKA-13173 > URL: https://issues.apache.org/jira/browse/KAFKA-13173 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Niket Goel >Priority: Blocker > Fix For: 3.0.0 > > > In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current > stale replicas and attempt to remove them from the ISR. However, when > multiple expirations occur at once, we do not properly accumulate the ISR > changes. For example, I ran a test where the ISR of a partition was > initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at > the same time. The records that were generated by `fenceStaleBrokers` were > the following: > {code} > ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, > removingReplicas=null, addingReplicas=null) at version 0), > ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), > ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, > topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, > removingReplicas=null, addingReplicas=null) at version 0), > ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)] > {code} > First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the > record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing > of broker 3 is handled. So we did not account for the fact that we had > already fenced broker 2 in the request. > A simple solution for now is to change the logic to handle fencing only one > broker at a time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13142) KRaft brokers do not validate dynamic configs before forwarding them to controller
[ https://issues.apache.org/jira/browse/KAFKA-13142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-13142: - Fix Version/s: (was: 3.0.0) 3.0.1 Affects Version/s: 3.0.0 > KRaft brokers do not validate dynamic configs before forwarding them to > controller > -- > > Key: KAFKA-13142 > URL: https://issues.apache.org/jira/browse/KAFKA-13142 > Project: Kafka > Issue Type: Task > Components: kraft >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.1 > > > The KRaft brokers are not currently validating dynamic configs before > forwarding them to the controller. To ensure that KRaft clusters are easily > upgradable it would be a good idea to validate dynamic configs in the first > release of KRaft so that invalid dynamic configs are never stored. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #11199: KAFKA-13194: bound cleaning by both LSO and HWM when firstUnstableOffsetMetadata is None
junrao commented on a change in pull request #11199: URL: https://github.com/apache/kafka/pull/11199#discussion_r687993862 ## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ## @@ -595,8 +595,8 @@ private[log] object LogCleanerManager extends Logging { // may be cleaned val firstUncleanableDirtyOffset: Long = Seq( - // we do not clean beyond the first unstable offset - log.firstUnstableOffset, + // we do not clean beyond the last stable offset Review comment: This is an existing issue. But could we update the comment in line 593 to include last stable offset too? ## File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala ## @@ -541,6 +541,29 @@ class LogCleanerManagerTest extends Logging { while(log.numberOfSegments < 8) log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0) +log.updateHighWatermark(50) + +val lastCleanOffset = Some(0L) +val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds) +assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable offset starts at the beginning of the log.") +assertEquals(log.highWatermark, cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset is bounded by the hwm.") Review comment: Since the description of the test says bounded by LSO, should we change log.highWatermark to log.lastStableOffset and the error message accordingly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #11200: KAFKA-13192: Prevent inconsistent broker.id/node.id values
rondagostino commented on a change in pull request #11200: URL: https://github.com/apache/kafka/pull/11200#discussion_r688014091 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1905,6 +1905,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO @nowarn("cat=deprecation") private def validateValues(): Unit = { +val nodeIdValue = values.get(KafkaConfig.NodeIdProp).asInstanceOf[Int] +if (nodeIdValue >= 0) { + val brokerIdValue = values.get(KafkaConfig.BrokerIdProp).asInstanceOf[Int] + if (brokerIdValue != Defaults.BrokerId && brokerIdValue != nodeIdValue) { +throw new ConfigException(s"The values for broker.id ($brokerIdValue) and node.id ($nodeIdValue) must be the same if both are specified") + } +} Review comment: Good question. It's legal to set node.id for the ZooKeeper case, so I think this inconsistency check is applicable in all cases and should be done here, at the top. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #11200: KAFKA-13192: Prevent inconsistent broker.id/node.id values
rondagostino commented on a change in pull request #11200: URL: https://github.com/apache/kafka/pull/11200#discussion_r688016827 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1905,6 +1905,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO @nowarn("cat=deprecation") private def validateValues(): Unit = { +val nodeIdValue = values.get(KafkaConfig.NodeIdProp).asInstanceOf[Int] +if (nodeIdValue >= 0) { + val brokerIdValue = values.get(KafkaConfig.BrokerIdProp).asInstanceOf[Int] Review comment: > prefers to use getInt God catch -- fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #11200: KAFKA-13192: Prevent inconsistent broker.id/node.id values
rondagostino commented on a change in pull request #11200: URL: https://github.com/apache/kafka/pull/11200#discussion_r688016827 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1905,6 +1905,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO @nowarn("cat=deprecation") private def validateValues(): Unit = { +val nodeIdValue = values.get(KafkaConfig.NodeIdProp).asInstanceOf[Int] +if (nodeIdValue >= 0) { + val brokerIdValue = values.get(KafkaConfig.BrokerIdProp).asInstanceOf[Int] Review comment: > prefers to use getInt Good catch -- fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #11199: KAFKA-13194: bound cleaning by both LSO and HWM when firstUnstableOffsetMetadata is None
lbradstreet commented on a change in pull request #11199: URL: https://github.com/apache/kafka/pull/11199#discussion_r688051673 ## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ## @@ -595,8 +595,8 @@ private[log] object LogCleanerManager extends Logging { // may be cleaned val firstUncleanableDirtyOffset: Long = Seq( - // we do not clean beyond the first unstable offset - log.firstUnstableOffset, + // we do not clean beyond the last stable offset Review comment: Makes sense. I've updated it with a better explanation of what we bound by. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #11199: KAFKA-13194: bound cleaning by both LSO and HWM when firstUnstableOffsetMetadata is None
lbradstreet commented on a change in pull request #11199: URL: https://github.com/apache/kafka/pull/11199#discussion_r688051998 ## File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala ## @@ -541,6 +541,29 @@ class LogCleanerManagerTest extends Logging { while(log.numberOfSegments < 8) log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0) +log.updateHighWatermark(50) + +val lastCleanOffset = Some(0L) +val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds) +assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable offset starts at the beginning of the log.") +assertEquals(log.highWatermark, cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset is bounded by the hwm.") Review comment: This is what I intended since the lastStableOffset should equal the highWatermark, but I think it makes sense to keep the intent. Instead I have checked that log.highWatermark == log.lastStableOffset in the line prior to it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13198) TopicsDelta doesn't update deleted topic when processing PartitionChangeRecord
Jose Armando Garcia Sancio created KAFKA-13198: -- Summary: TopicsDelta doesn't update deleted topic when processing PartitionChangeRecord Key: KAFKA-13198 URL: https://issues.apache.org/jira/browse/KAFKA-13198 Project: Kafka Issue Type: Bug Components: kraft, replication Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio Fix For: 3.0.0 In KRaft when a replica gets reassigned away from a topic partition we are not notifying the {{ReplicaManager}} to stop the replica. On solution is to track those topic partition ids when processing {{PartitionChangeRecord}} and to returned them as {{deleted}} when the replica manager calls {{calculateDeltaChanges}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao merged pull request #11154: KAFKA-13068: Rename Log to UnifiedLog
junrao merged pull request #11154: URL: https://github.com/apache/kafka/pull/11154 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13068) Rename Log to UnifiedLog
[ https://issues.apache.org/jira/browse/KAFKA-13068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-13068. - Fix Version/s: 3.1.0 Resolution: Fixed Merged the PR to trunk > Rename Log to UnifiedLog > > > Key: KAFKA-13068 > URL: https://issues.apache.org/jira/browse/KAFKA-13068 > Project: Kafka > Issue Type: Sub-task >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > Fix For: 3.1.0 > > > Once KAFKA-12554 is completed, we can rename Log -> UnifiedLog as described > in the doc: > [https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit#]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
guozhangwang commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r688142016 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ## @@ -1369,4 +1385,67 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { this.userCallback.onCompletion(metadata, exception); } } + +private static class KafkaProducerMetrics { +private static final String FLUSH = "flush"; +private static final String TXN_INIT = "txn-init"; +private static final String TXN_BEGIN = "txn-begin"; +private static final String TXN_SEND_OFFSETS = "txn-send-offsets"; +private static final String TXN_COMMIT = "txn-commit"; +private static final String TXN_ABORT = "txn-abort"; +private static final String TOTAL_TIME_SUFFIX = "-time-total"; + +final Map tags; +final Metrics metrics; +final Sensor initTimeSensor; +final Sensor beginTimeSensor; +final Sensor flushTimeSensor; +final Sensor sendOffsetsSensor; +final Sensor commitSensor; +final Sensor abortSensor; + +private KafkaProducerMetrics(Metrics metrics) { +this.metrics = metrics; +this.tags = this.metrics.config().tags(); +this.flushTimeSensor = newLatencySensor(FLUSH); +this.initTimeSensor = newLatencySensor(TXN_INIT); +this.beginTimeSensor = newLatencySensor(TXN_BEGIN); +this.sendOffsetsSensor = newLatencySensor(TXN_SEND_OFFSETS); +this.commitSensor = newLatencySensor(TXN_COMMIT); +this.abortSensor = newLatencySensor(TXN_ABORT); +} + +private Sensor newLatencySensor(String name) { +Sensor sensor = metrics.sensor(name + TOTAL_TIME_SUFFIX); +sensor.add( +metrics.metricName(name + TOTAL_TIME_SUFFIX, ProducerMetrics.GROUP, tags), +new CumulativeSum() +); +return sensor; +} + +private void recordFlush(long duration) { +flushTimeSensor.record(duration); +} + +private void recordInit(long duration) { +initTimeSensor.record(duration); +} + +private void recordBegin(long duration) { Review comment: nit: better leave the full name as recordBeginTxn/AbortTxn/CommitTxn. ## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ## @@ -1369,4 +1385,67 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { this.userCallback.onCompletion(metadata, exception); } } + +private static class KafkaProducerMetrics { +private static final String FLUSH = "flush"; +private static final String TXN_INIT = "txn-init"; +private static final String TXN_BEGIN = "txn-begin"; +private static final String TXN_SEND_OFFSETS = "txn-send-offsets"; +private static final String TXN_COMMIT = "txn-commit"; +private static final String TXN_ABORT = "txn-abort"; +private static final String TOTAL_TIME_SUFFIX = "-time-total"; + +final Map tags; +final Metrics metrics; +final Sensor initTimeSensor; +final Sensor beginTimeSensor; Review comment: Ditto here; better rename it to `beginTxn` / `commitTxn` / `abortTxn`? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java ## @@ -178,12 +180,39 @@ public void resetProducer() { throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + processingMode); } +final long start = Time.SYSTEM.nanoseconds(); producer.close(); +final long closeTime = Time.SYSTEM.nanoseconds() - start; + +oldProducerTotalBlockedTime += closeTime + totalBlockedTime(producer); producer = clientSupplier.getProducer(eosV2ProducerConfigs); transactionInitialized = false; } +private static double getMetricValue(final Map metrics, + final String name) { +return metrics.keySet().stream() +.filter(n -> n.name().equals(name)) +.findFirst() Review comment: Maybe worth checking there's only one element after the filtering? It should not be expected to have more than one right? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version
[GitHub] [kafka] lbradstreet commented on pull request #11199: KAFKA-13194: bound cleaning by both LSO and HWM when firstUnstableOffsetMetadata is None
lbradstreet commented on pull request #11199: URL: https://github.com/apache/kafka/pull/11199#issuecomment-898057637 > @lbradstreet : Thanks for the updated PR. LGTM. Are the 29 test failures related to this PR? @junrao yeah, they're looking related. Let me clean those up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming opened a new pull request #11209: KAFKA-12465: Logic about inconsistent cluster id
dengziming opened a new pull request #11209: URL: https://github.com/apache/kafka/pull/11209 *More detailed description of your change* When handling a response, invalid cluster id are fatal unless a previous response contained a valid cluster id. Note that this is not a perfect, see https://github.com/apache/kafka/pull/10289#discussion_r595378358 but this is the best as far as I can see because we can catch misconfiguration early. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #11209: KAFKA-12465: Logic about inconsistent cluster id
dengziming commented on pull request #11209: URL: https://github.com/apache/kafka/pull/11209#issuecomment-898134690 Hi @hachikuji @jsancio , PTAL. I also moved the logic for `UNKNOWN_TOPIC_OR_PARTITION` in handleFetchSnapshot to `RaftUtil.java`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11208: MINOR: Refactored BrokerHeartbeatManager::findOneStaleBroker to not use
showuon commented on a change in pull request #11208: URL: https://github.com/apache/kafka/pull/11208#discussion_r688206957 ## File path: metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java ## @@ -200,6 +200,14 @@ void registerBrokers(Integer... brokerIds) throws Exception { } } +Set fencedBrokerIds() { +return clusterControl.brokerRegistrations().values() Review comment: Since it's put inside test class now, we cat declare it as `private`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
rodesai commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r688236582 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ## @@ -590,9 +593,11 @@ else if (acks != -1) public void initTransactions() { throwIfNoTransactionManager(); throwIfProducerClosed(); +long now = time.nanoseconds(); Review comment: ah - I thought we were measuring in nanos. Not sure where I got that impression. I'll change to millis -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
rodesai commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r688239443 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java ## @@ -178,12 +180,39 @@ public void resetProducer() { throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + processingMode); } +final long start = Time.SYSTEM.nanoseconds(); Review comment: ah good call! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
rodesai commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r688240676 ## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ## @@ -730,6 +732,41 @@ public void testFlushCompleteSendOfInflightBatches() { } } +private static Double getMetricValue(final KafkaProducer producer, final String name) { +Metrics metrics = producer.metrics; +Metric metric = metrics.metric(metrics.metricName(name, "producer-metrics")); +return (Double) metric.metricValue(); +} + +@Test +public void testFlushMeasureLatency() { +Map configs = new HashMap<>(); +configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + +Time time = new MockTime(1); +MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1)); +ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE); + +MockClient client = new MockClient(time, metadata); +client.updateMetadata(initialUpdateResponse); + +try (KafkaProducer producer = kafkaProducer( +configs, +new StringSerializer(), +new StringSerializer(), +metadata, +client, +null, +time +)) { +producer.flush(); +double first = getMetricValue(producer, "flush-time-total"); +assertTrue(first > 99.0); Review comment: It's using mock time, so the value here is well-known (should be 1 second). I'm using > rather than equalTo because I don't want the test to fail spuriously on floating point rounding errors. It would probably be better to use [isCloseTo](http://hamcrest.org/JavaHamcrest/javadoc/1.3/org/hamcrest/number/IsCloseTo.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
rodesai commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r688240994 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java ## @@ -178,12 +180,39 @@ public void resetProducer() { throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + processingMode); } +final long start = Time.SYSTEM.nanoseconds(); producer.close(); +final long closeTime = Time.SYSTEM.nanoseconds() - start; + +oldProducerTotalBlockedTime += closeTime + totalBlockedTime(producer); producer = clientSupplier.getProducer(eosV2ProducerConfigs); transactionInitialized = false; } +private static double getMetricValue(final Map metrics, + final String name) { +return metrics.keySet().stream() +.filter(n -> n.name().equals(name)) +.findFirst() Review comment: yeah it should always be one -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
rodesai commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r688242729 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ## @@ -200,6 +201,30 @@ public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() { } } +public void addThreadLevelImmutableMetric(final String name, +final String description, +final String threadId, +final T value) { +final MetricName metricName = metrics.metricName( +name, THREAD_LEVEL_GROUP, description, threadLevelTagMap(threadId)); +synchronized (threadLevelMetrics) { Review comment: not sure I follow the question here - we are using `threadLevelMetrics` to track the metrics for each thread so they can be cleaned up later on when the thread exits. What's wrong with using the thread id for that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
rodesai commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r68825 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import java.util.Map; +import java.util.function.Supplier; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; + +public class StreamsThreadTotalBlockedTime { +final Consumer consumer; +final Consumer restoreConsumer; +final Supplier producerTotalBlockedTime; + +StreamsThreadTotalBlockedTime( +final Consumer consumer, +final Consumer restoreConsumer, +final Supplier producerTotalBlockedTime +) { +this.consumer = consumer; +this.restoreConsumer = restoreConsumer; +this.producerTotalBlockedTime = producerTotalBlockedTime; +} + +final double getMetricValue( Review comment: I tried doing it this way at first, but found it hard to loop over the producers in `TaskManager/Tasks/ActiveTaskCreator` without breaking those abstractions by adding methods to return the producers so we could get the metrics out. So then I went the route of having the total blocked time metric implementation ask `TaskManager` for it's total blocked time component. > we can also use this in unit test e.g. https://github.com/apache/kafka/pull/11149/files#diff-599de0f96fbd5ba6b3d919881426269fc72fe8bbe8e2436fab87d9abe84e8dbaR735 What do you mean here? This is the producer's unit test, and this method is computing total blocked time for a streams app. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
rodesai commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r688245686 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ## @@ -590,9 +593,11 @@ else if (acks != -1) public void initTransactions() { throwIfNoTransactionManager(); throwIfProducerClosed(); +long now = time.nanoseconds(); Review comment: Actually now I remember why - the bufferpool and selector total blocked times are all being measured in nanos and use the suffix `time-total`. So chose the naming convention and unit accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
rodesai commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r688255651 ## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ## @@ -730,6 +732,41 @@ public void testFlushCompleteSendOfInflightBatches() { } } +private static Double getMetricValue(final KafkaProducer producer, final String name) { +Metrics metrics = producer.metrics; +Metric metric = metrics.metric(metrics.metricName(name, "producer-metrics")); +return (Double) metric.metricValue(); +} + +@Test +public void testFlushMeasureLatency() { +Map configs = new HashMap<>(); +configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); + +Time time = new MockTime(1); +MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1)); +ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE); + +MockClient client = new MockClient(time, metadata); +client.updateMetadata(initialUpdateResponse); + +try (KafkaProducer producer = kafkaProducer( +configs, +new StringSerializer(), +new StringSerializer(), +metadata, +client, +null, +time +)) { +producer.flush(); +double first = getMetricValue(producer, "flush-time-total"); +assertTrue(first > 99.0); Review comment: Ah actually this doesn't work because the mock time is passed to and used from the other client threads - so the value is not predictable. So the best we can do is assert that at least one tick (100 nanoseconds has passed). I'll update the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12155) Delay increasing the log start offset
[ https://issues.apache.org/jira/browse/KAFKA-12155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dengziming resolved KAFKA-12155. Resolution: Fixed > Delay increasing the log start offset > - > > Key: KAFKA-12155 > URL: https://issues.apache.org/jira/browse/KAFKA-12155 > Project: Kafka > Issue Type: Sub-task > Components: replication >Reporter: Jose Armando Garcia Sancio >Assignee: David Arthur >Priority: Major > > The implementation in [https://github.com/apache/kafka/pull/9816] increases > the log start offset as soon as a snapshot is created that is greater than > the log start offset. This is correct but causes some inefficiency in some > cases. > # Any follower, voters or observers, with an end offset between the leader's > log start offset and the leader's latest snapshot will get invalidated. This > will cause those follower to fetch the new snapshot and reload it's state > machine. > # Any {{Listener}} or state machine that has a {{nextExpectedOffset()}} less > than the latest snapshot will get invalidated. This will cause the state > machine to have to reload its state from the latest snapshot. > To minimize the frequency of these reloads KIP-630 proposes adding the > following configuration: > * {{metadata.start.offset.lag.time.max.ms}} - The maximum amount of time > that leader will wait for an offset to get replicated to all of the live > replicas before advancing the {{LogStartOffset}}. See section “When to > Increase the LogStartOffset”. The default is 7 days. > This description and implementation should be extended to also apply to the > state machine, or {{Listener}}. The local log start offset should be > increased when all of the {{ListenerContext}}'s {{nextExpectedOffset()}} is > greater than the offset of the latest snapshot. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kamalcph commented on pull request #11197: 28x TS changes
kamalcph commented on pull request #11197: URL: https://github.com/apache/kafka/pull/11197#issuecomment-898206391 retest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13199) Make Task extend Versioned
Xianghu Wang created KAFKA-13199: Summary: Make Task extend Versioned Key: KAFKA-13199 URL: https://issues.apache.org/jira/browse/KAFKA-13199 Project: Kafka Issue Type: Wish Reporter: Xianghu Wang Since `Task` is versioned, we can make it extends `Versioned` directly -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13199) Make Task extends Versioned
[ https://issues.apache.org/jira/browse/KAFKA-13199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xianghu Wang updated KAFKA-13199: - Summary: Make Task extends Versioned (was: Make Task extend Versioned) > Make Task extends Versioned > --- > > Key: KAFKA-13199 > URL: https://issues.apache.org/jira/browse/KAFKA-13199 > Project: Kafka > Issue Type: Wish >Reporter: Xianghu Wang >Priority: Major > > Since `Task` is versioned, we can make it extends `Versioned` directly -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wangxianghu opened a new pull request #11210: KAFKA-13199: Make Task extends Versioned
wangxianghu opened a new pull request #11210: URL: https://github.com/apache/kafka/pull/11210 Since `Task` is versioned, we can make it extends `Versioned` directly, no need to introduce `String version()` again ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
guozhangwang commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r688267294 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ## @@ -590,9 +593,11 @@ else if (acks != -1) public void initTransactions() { throwIfNoTransactionManager(); throwIfProducerClosed(); +long now = time.nanoseconds(); Review comment: hmm, I know that in selector we use nano seconds, but that should be `time-total-ns`, not sure when that's get changed.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
guozhangwang commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r688267856 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import java.util.Map; +import java.util.function.Supplier; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; + +public class StreamsThreadTotalBlockedTime { +final Consumer consumer; +final Consumer restoreConsumer; +final Supplier producerTotalBlockedTime; + +StreamsThreadTotalBlockedTime( +final Consumer consumer, +final Consumer restoreConsumer, +final Supplier producerTotalBlockedTime +) { +this.consumer = consumer; +this.restoreConsumer = restoreConsumer; +this.producerTotalBlockedTime = producerTotalBlockedTime; +} + +final double getMetricValue( Review comment: Sounds good then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams
guozhangwang commented on a change in pull request #11149: URL: https://github.com/apache/kafka/pull/11149#discussion_r688268274 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ## @@ -200,6 +201,30 @@ public RocksDBMetricsRecordingTrigger rocksDBMetricsRecordingTrigger() { } } +public void addThreadLevelImmutableMetric(final String name, +final String description, +final String threadId, +final T value) { +final MetricName metricName = metrics.metricName( +name, THREAD_LEVEL_GROUP, description, threadLevelTagMap(threadId)); +synchronized (threadLevelMetrics) { Review comment: Oh it's totally fine to use the thread id, it's just that for other thread-level metrics we would prefix the thread id with either `internal` or `external`, i.e. the "key" would not just be the thread id itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11201: MINOR: fix mbean tag name ordering in JMX reporter
showuon commented on a change in pull request #11201: URL: https://github.com/apache/kafka/pull/11201#discussion_r688286213 ## File path: clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java ## @@ -76,6 +77,21 @@ public void testJmxRegistration() throws Exception { } } +@Test +public void testMbeanTagOrdering() { +Map tags = new HashMap<>(); +tags.put("tag_a", "x"); +tags.put("tag_b", "y"); +tags.put("tag_c", "z"); +tags.put("tag_d", "1,2"); +tags.put("tag_e", ""); +tags.put("tag_f", "3"); Review comment: Although `HashMap` doesn't guarantee the element order, the current test might make people think this test is passed due to the element insertion is key-ordered. Could you put the key in un-orderded sequence, so that we can make sure your fix works? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11201: MINOR: fix mbean tag name ordering in JMX reporter
showuon commented on pull request #11201: URL: https://github.com/apache/kafka/pull/11201#issuecomment-898234132 Also, there are some `metrics` related tests failed. You might need to take a look. ``` Build / ARM / org.apache.kafka.streams.processor.internals.StreamTaskTest.testMetrics Build / ARM / org.apache.kafka.streams.state.internals.MeteredKeyValueStoreTest.testMetrics Build / ARM / org.apache.kafka.streams.state.internals.MeteredSessionStoreTest.testMetrics Build / ARM / org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStoreTest.testMetrics Build / ARM / org.apache.kafka.streams.state.internals.MeteredWindowStoreTest.testMetrics Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.processor.internals.StreamTaskTest.testMetrics Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.state.internals.MeteredKeyValueStoreTest.testMetrics Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.state.internals.MeteredSessionStoreTest.testMetrics Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStoreTest.testMetrics Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.state.internals.MeteredWindowStoreTest.testMetrics Build / JDK 16 and Scala 2.13 / org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread Build / JDK 16 and Scala 2.13 / org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread Build / JDK 16 and Scala 2.13 / org.apache.kafka.streams.processor.internals.StreamTaskTest.testMetrics Build / JDK 16 and Scala 2.13 / org.apache.kafka.streams.state.internals.MeteredKeyValueStoreTest.testMetrics Build / JDK 16 and Scala 2.13 / org.apache.kafka.streams.state.internals.MeteredSessionStoreTest.testMetrics Build / JDK 16 and Scala 2.13 / org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStoreTest.testMetrics Build / JDK 16 and Scala 2.13 / org.apache.kafka.streams.state.internals.MeteredWindowStoreTest.testMetrics Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.processor.internals.StreamTaskTest.testMetrics Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.state.internals.MeteredKeyValueStoreTest.testMetrics Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.state.internals.MeteredSessionStoreTest.testMetrics Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStoreTest.testMetrics Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.state.internals.MeteredWindowStoreTest.testMetrics ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org