[GitHub] [kafka] dajac commented on pull request #11201: MINOR: fix mbean tag name ordering in JMX reporter

2021-08-12 Thread GitBox


dajac commented on pull request #11201:
URL: https://github.com/apache/kafka/pull/11201#issuecomment-897421076


   @xvrl Thanks for the patch. I think that ordering tags make sense. However, 
I am a little concerned by the potential implication of changing this. I 
vaguely recall having to craft mbeans names with tags in a specific order to 
make the datadog agent work for instance. Could it break existing metric 
collectors if the order changes? I might be completely wrong though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12835) Topic IDs can mismatch on brokers (after interbroker protocol version update)

2021-08-12 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-12835:

Fix Version/s: 2.8.1

> Topic IDs can mismatch on brokers (after interbroker protocol version update)
> -
>
> Key: KAFKA-12835
> URL: https://issues.apache.org/jira/browse/KAFKA-12835
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: Ivan Yurchenko
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.0.0, 2.8.1
>
>
> We had a Kafka cluster running 2.8 version with interbroker protocol set to 
> 2.7. It had a number of topics and everything was fine.
> Then we decided to update the interbroker protocol to 2.8 by the following 
> procedure:
> 1. Run new brokers with the interbroker protocol set to 2.8.
> 2. Move the data from the old brokers to the new ones (normal partition 
> reassignment API).
> 3. Decommission the old brokers.
> At the stage 2 we had the problem: old brokers started failing on 
> {{LeaderAndIsrRequest}} handling with
> {code:java}
> ERROR [Broker id=<...>] Topic Id in memory: <...> does not match the topic Id 
> for partition <...> provided in the request: <...>. (state.change.logger)
> {code}
> for multiple topics. Topics were not recreated.
> We checked {{partition.metadata}} files and IDs there were indeed different 
> from the values in ZooKeeper. It was fixed by deleting the metadata files 
> (and letting them be recreated).
>  
> The logs, unfortunately, didn't show anything that might point to the cause 
> of the issue (or it happened longer ago than we store the logs).
> We tried to reproduce this also, but no success.
> If the community can point out what to check or beware of in future, it will 
> be great. We'll be happy to provide additional information if needed. Thank 
> you! 
> Sorry for the ticket that might be not very actionable. We hope to at least 
> rise awareness of this issue.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13014) KAFKA-Stream stucked when the offset is no more existing

2021-08-12 Thread Ahmed Toumi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17397957#comment-17397957
 ] 

Ahmed Toumi commented on KAFKA-13014:
-

thanks [~mjsax] : but the problem was more complicated then i thought.

It's the problem of mixing exactly_once with using a state store

When the application has cashed without waiting the shutdown of the stream, the 
transactions of the latest message of the changelog topic is aborted, so when 
we restarted the kafka-stream the topology wait to reload the local rocksdb 
store before starting consumig messages.

And the bug is there, because they check that using consumer-metadate 
"topic.lastoffset" == curent_consumer_offset

Bu it should be like that:

consumer-metadate "topic.last_commited_message_and_transaction_offset" == 
curent_consumer_offset

 

I fixed that by switching to at_least_one, but I think that It was fixed on 
2.7.1

> KAFKA-Stream stucked when the offset is no more existing
> 
>
> Key: KAFKA-13014
> URL: https://issues.apache.org/jira/browse/KAFKA-13014
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, offset manager, streams
>Affects Versions: 2.7.0
> Environment: PROD
>Reporter: Ahmed Toumi
>Priority: Major
> Attachments: image-2021-06-30-11-10-31-028.png
>
>
> We have kafka-stream with multiple instances and threads.
> This kafka-stream consume from a lot of topics.
> One of the topic partitions wasn't accessible for a day and the retention of 
> the topic is 4 Hours.
> After fixing the problem, the kafka-stream is trying to consume from an 
> offset that does ot exist anymore:
>  * Kafka-consumer-group describe:
> !image-2021-06-30-11-10-31-028.png!
> We can see that the current offset that the KS is waiting for is *59754934* 
> but the new first offset of this partition is *264896001*.
> The problem that the Kafka-stream does not throw any exception
> that's the only log what i'm seeing 
>  
> {code:java}
> 08:44:53.924 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer,
>  groupId=talaria-data-mixed-prod] Updating assignment with08:44:53.924 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer,
>  groupId=talaria-data-mixed-prod] Updating assignment with Assigned 
> partitions:                       [adm__article_ean_repartition_v3-10, 
> adm__article_itm_repartition_v3-10, adm__article_sign_repartition_v3-10, 
> adm__article_stock_repartition_v3-10] Current owned partitions:               
>    [adm__article_ean_repartition_v3-10, adm__article_itm_repartition_v3-10, 
> adm__article_sign_repartition_v3-10, adm__article_stock_repartition_v3-10] 
> Added partitions (assigned - owned):       [] Revoked partitions (owned - 
> assigned):     [] 08:44:53.924 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer,
>  groupId=talaria-data-mixed-prod] Notifying assignor about the new 
> Assignment(partitions=[adm__article_stock_repartition_v3-10, 
> adm__article_sign_repartition_v3-10, adm__article_itm_repartition_v3-10, 
> adm__article_ean_repartition_v3-10], userDataSize=398)08:44:53.924 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> INFO  o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer]
>  No followup rebalance was requested, resetting the rebalance 
> schedule.08:44:53.924 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> INFO  o.a.k.s.p.internals.TaskManager - stream-thread 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> Handle new assignment with: New active tasks: [0_10] New standby tasks: 
> [0_17, 0_21] Existing active tasks: [0_10] Existing standby tasks: [0_17, 
> 0_21]08:44:53.924 
> [talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2] 
> INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=talaria-data-mixed-prod-c3d6ac16-516c-49ee-a34e-bde5f3f629dc-StreamThread-2-consumer,
>  groupId=talaria-data-mixed-prod] Adding newly assigned partitions: 
> {code}
>  
> PI: version broker kafka : 5.3.4-ccs



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] xdgrulez commented on pull request #10897: MINOR: Reduced severity for "skipping records" falling out of time windows

2021-08-12 Thread GitBox


xdgrulez commented on pull request #10897:
URL: https://github.com/apache/kafka/pull/10897#issuecomment-897524395


   Hi,
   
   will do that as soon as I have some time - and sorry for not finding time 
yet…
   
   If anyone else would have some more time on her/his hand - you basically 
just would have to spot all tests checking for the respective warn log messages 
and change them to debug…
   
   Best, Ralph
   
   > Am 28.07.2021 um 23:27 schrieb A. Sophie Blee-Goldman ***@***.***>:
   > 
   > 
   > Hey @xdgrulez , this PR has a number of test failures. It looks like there 
are quite a few tests that are verifying the existence of this specific 
message, and failing because the logs default to INFO. Can you look into those 
and then run the full set of streams tests once you think you've fixed 
everything to make sure no new failures have come up since then?
   > Thanks!
   > 
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub, or unsubscribe.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11200: KAFKA-13192: Prevent inconsistent broker.id/node.id values

2021-08-12 Thread GitBox


showuon commented on a change in pull request #11200:
URL: https://github.com/apache/kafka/pull/11200#discussion_r687624316



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1905,6 +1905,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
 
   @nowarn("cat=deprecation")
   private def validateValues(): Unit = {
+val nodeIdValue = values.get(KafkaConfig.NodeIdProp).asInstanceOf[Int]
+if (nodeIdValue >= 0) {
+  val brokerIdValue = 
values.get(KafkaConfig.BrokerIdProp).asInstanceOf[Int]
+  if (brokerIdValue != Defaults.BrokerId && brokerIdValue != nodeIdValue) {
+throw new ConfigException(s"The values for broker.id ($brokerIdValue) 
and node.id ($nodeIdValue) must be the same if both are specified")
+  }
+}

Review comment:
   Do you think we can put this check into below else block for KRaft-based 
metadata quorum case only? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] socutes opened a new pull request #11205: KAFKA-10900: Add metrics enumerated in KIP-630

2021-08-12 Thread GitBox


socutes opened a new pull request #11205:
URL: https://github.com/apache/kafka/pull/11205


   KIP-630 enumerates a few metrics. Makes sure that those metrics are 
implemented. Add the following Metrics.
   
   
   kafka.controller:type=KafkaController,name=GenSnapshotLatencyMs | A 
histogram of the amount of time it took to generate a snapshot.
   kafka.controller:type=KafkaController,name=LoadSnapshotLatencyMs | A 
histogram of the amount of time it took to load the snapshot.
   kafka.controller:type=KafkaController,name=SnapshotLag | The number of 
offsets between the largest snapshot offset and the high-watermark.
   kafka.controller:type=KafkaController,name=SnapshotSizeBytes | Size of the 
latest snapshot in bytes.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #11206: MINOR: Update streams doc to close KeyValueIterator in example code

2021-08-12 Thread GitBox


showuon opened a new pull request #11206:
URL: https://github.com/apache/kafka/pull/11206


   Update the streams doc:
   1. close `KeyValueIterator` in example code
   https://user-images.githubusercontent.com/43372967/129201334-326ff4f9-e06e-426d-9d2d-ce70e13cbf9c.png";>
   
   2. Fix the example code indent
   https://user-images.githubusercontent.com/43372967/129201406-632b62a5-9062-4a1f-b529-103f92966541.png";>
   
   
   3. Add missing comma between key/value
   https://user-images.githubusercontent.com/43372967/129201511-e69b6225-9e8c-4f08-b9a2-615aee21e52b.png";>
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11206: MINOR: Update streams doc to close KeyValueIterator in example code

2021-08-12 Thread GitBox


showuon commented on pull request #11206:
URL: https://github.com/apache/kafka/pull/11206#issuecomment-897622337


   @bbejeck , please help review. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ryannedolan commented on a change in pull request #10277: KAFKA-9914: Fix replication cycle detection

2021-08-12 Thread GitBox


ryannedolan commented on a change in pull request #10277:
URL: https://github.com/apache/kafka/pull/10277#discussion_r687741760



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
##
@@ -489,7 +489,17 @@ boolean isCycle(String topic) {
 String source = replicationPolicy.topicSource(topic);
 if (source == null) {
 return false;
-} else if (source.equals(sourceAndTarget.target())) {
+}
+
+// Fix for https://issues.apache.org/jira/browse/KAFKA-9914
+final boolean condition;
+if (replicationPolicy instanceof IdentityReplicationPolicy) {

Review comment:
   Moving isCycle to ReplicationPolicy might make sense. Wondering what the 
use-case would be for a custom isCycle tho?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13195) StateSerde don't honor DeserializationExceptionHandler

2021-08-12 Thread Ludo (Jira)
Ludo created KAFKA-13195:


 Summary: StateSerde don't honor DeserializationExceptionHandler
 Key: KAFKA-13195
 URL: https://issues.apache.org/jira/browse/KAFKA-13195
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.8.0
Reporter: Ludo


Kafka streams allow to configure an 
[DeserializationExceptionHandler|https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling]
 

When you are using a StateStore most of message will be a copy of original 
message in internal topic and mostly will use the same serializer if the 
message is another type. 

You can see 
[here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java#L159-L161]
 that StateSerde is using the raw Deserializer and not honor the 
{{StreamsConfig.}}{{DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG}}.

Leading to crash the application (reaching the {{setUncaughtExceptionHandler}} 
method).

I think the state store must have the same behavior than the 
{{RecordDeserializer}} and honor the DeserializationExceptionHandler.

 

Stacktrace (coming from kafka stream 2.6.1) :

 
{code:java}
Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing 
!Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing 
!org.apache.kafka.streams.errors.StreamsException: Exception caught in process. 
taskId=1_14, processor=workertaskjoined-repartition-source, 
topic=kestra_executor-workertaskjoined-repartition, partition=14, 
offset=167500, 
stacktrace=org.apache.kafka.common.errors.SerializationException: 
com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize 
value of type `io.kestra.plugin.gcp.bigquery.ExtractToGcs$Format` from String 
"txt": not one of the values accepted for Enum class: [NEWLINE_DELIMITED_JSON, 
AVRO, PARQUET, CSV] at [Source: (byte[])"{[truncated 1270 bytes]; line: 1, 
column: 1187] (through reference chain: 
io.kestra.core.models.flows.Flow["tasks"]->java.util.ArrayList[1]->org.kestra.task.gcp.bigquery.ExtractToGcs["format"])
 at 
com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67)
 at 
com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1851)
 at 
com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:1079)
 at 
com.fasterxml.jackson.databind.deser.std.EnumDeserializer._deserializeAltString(EnumDeserializer.java:327)
 at 
com.fasterxml.jackson.databind.deser.std.EnumDeserializer._fromString(EnumDeserializer.java:214)
 at 
com.fasterxml.jackson.databind.deser.std.EnumDeserializer.deserialize(EnumDeserializer.java:188)
 at 
com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138)
 at 
com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:324)
 at 
com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:225)
 at 
com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:197)
 at 
com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:137)
 at 
com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:107)
 at 
com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:263)
 at 
com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:357)
 at 
com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244)
 at 
com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28)
 at 
com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:542)
 at 
com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:565)
 at 
com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:449)
 at 
com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1405)
 at 
com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:362)
 at 
com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:195)
 at 
com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:322)
 at 
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4593)
 at 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3609) 
at

[GitHub] [kafka] tvainika commented on a change in pull request #10277: KAFKA-9914: Fix replication cycle detection

2021-08-12 Thread GitBox


tvainika commented on a change in pull request #10277:
URL: https://github.com/apache/kafka/pull/10277#discussion_r687772348



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
##
@@ -489,7 +489,17 @@ boolean isCycle(String topic) {
 String source = replicationPolicy.topicSource(topic);
 if (source == null) {
 return false;
-} else if (source.equals(sourceAndTarget.target())) {
+}
+
+// Fix for https://issues.apache.org/jira/browse/KAFKA-9914
+final boolean condition;
+if (replicationPolicy instanceof IdentityReplicationPolicy) {

Review comment:
   Currently `MirrorSourceConnector.isCycle` is only called from 
`MirrorSourceConnector.shouldReplicateTopic`, which is the method that needs to 
make decision based on `ReplicationPolicy` used. However I'm out of ideas how 
to detect without adding some methods into `ReplicationPolicy` interface, and 
modifying the interface feels like somewhat bigger effort.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13196) MirrorMaker 2 not always starting tasks after upgrade 2.4.0 -> 2.7.1

2021-08-12 Thread Jozef Vilcek (Jira)
Jozef Vilcek created KAFKA-13196:


 Summary: MirrorMaker 2 not always starting tasks after upgrade 
2.4.0 -> 2.7.1
 Key: KAFKA-13196
 URL: https://issues.apache.org/jira/browse/KAFKA-13196
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.7.1
Reporter: Jozef Vilcek


I am using MirrorMaker 2.0 and running it via [ 
MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java]
 class. This method will start up `DistributedHerder` and will use 
non-functional `advertisedUrl`, and therefore workers can not talk to each 
other and coordinate.

After upgrading from version `2.4.0` to `2.7.1` I did notice that when I am 
starting up mirror maker ti does not always start tasks - just connector is 
executing. Doing some amount of stop and start it will eventually start tasks 
too.

After a bit of digging I did noticed that in attempt to configure connector's 
task, code ends up in this [1] branch, where configure request is being 
forwarded to the leader. From some reason, task configuration is not done on 
leader. However, MirrorMaker does not pack RestServer and therefore that 
request will never succeed.

I am not sure what is going no or why it does seem to work better on 2.4.0. I 
noticed that connector start procedure did involve less callback on 2.4.0 in 
connector start sequence [2].

I believe the root cause is change in startup procedure of connectors in 
general. In vestion 2.4, when connector is started by the leader here [1] it 
will immediately setup and submit configuration for connector's tasks. However, 
in 2.7.1, is it more asynchronous. Connector is started here [2] 

Does this look like a bug? Or am I doing something wrong?

 

[1] 
https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494

[2] 
https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13196) MirrorMaker 2 not always starting tasks

2021-08-12 Thread Jozef Vilcek (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jozef Vilcek updated KAFKA-13196:
-
Summary: MirrorMaker 2 not always starting tasks  (was: MirrorMaker 2 not 
always starting tasks after upgrade 2.4.0 -> 2.7.1)

> MirrorMaker 2 not always starting tasks
> ---
>
> Key: KAFKA-13196
> URL: https://issues.apache.org/jira/browse/KAFKA-13196
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.1
>Reporter: Jozef Vilcek
>Priority: Major
>
> I am using MirrorMaker 2.0 and running it via [ 
> MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java]
>  class. This method will start up `DistributedHerder` and will use 
> non-functional `advertisedUrl`, and therefore workers can not talk to each 
> other and coordinate.
> After upgrading from version `2.4.0` to `2.7.1` I did notice that when I am 
> starting up mirror maker ti does not always start tasks - just connector is 
> executing. Doing some amount of stop and start it will eventually start tasks 
> too.
> After a bit of digging I did noticed that in attempt to configure connector's 
> task, code ends up in this [1] branch, where configure request is being 
> forwarded to the leader. From some reason, task configuration is not done on 
> leader. However, MirrorMaker does not pack RestServer and therefore that 
> request will never succeed.
> I am not sure what is going no or why it does seem to work better on 2.4.0. I 
> noticed that connector start procedure did involve less callback on 2.4.0 in 
> connector start sequence [2].
> I believe the root cause is change in startup procedure of connectors in 
> general. In vestion 2.4, when connector is started by the leader here [1] it 
> will immediately setup and submit configuration for connector's tasks. 
> However, in 2.7.1, is it more asynchronous. Connector is started here [2] 
> Does this look like a bug? Or am I doing something wrong?
>  
> [1] 
> https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494
> [2] 
> https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13196) MirrorMaker 2 not always start tasks

2021-08-12 Thread Jozef Vilcek (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jozef Vilcek updated KAFKA-13196:
-
Description: 
I am using MirrorMaker 2.0 and running it via [ 
MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java]
 class. This method will start up `DistributedHerder` and will use 
non-functional `advertisedUrl`, and therefore workers can not talk to each 
other and coordinate.

After upgrading from version `2.4.0` to `2.7.1` I did notice that when I am 
starting up mirror maker ti does not always start tasks - just connector is 
executing. Doing some amount of stop and start it will eventually start tasks 
too.

After a bit of digging I did noticed that in attempt to configure connector's 
task, code ends up in this [1] branch, where configure request is being 
forwarded to the leader. From some reason, task configuration is not done on 
leader. However, MirrorMaker does not pack RestServer and therefore that 
request will never succeed.

I am not sure what is going no or why it does seem to work better on 2.4.0. I 
noticed that connector start procedure did involve less callbacks on 2.4.0 in 
connector start sequence [2].

 

Does this look like a bug? Or am I doing something wrong?

 

[1] 
[https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494]

[2] 
[https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236]

  was:
I am using MirrorMaker 2.0 and running it via [ 
MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java]
 class. This method will start up `DistributedHerder` and will use 
non-functional `advertisedUrl`, and therefore workers can not talk to each 
other and coordinate.

After upgrading from version `2.4.0` to `2.7.1` I did notice that when I am 
starting up mirror maker ti does not always start tasks - just connector is 
executing. Doing some amount of stop and start it will eventually start tasks 
too.

After a bit of digging I did noticed that in attempt to configure connector's 
task, code ends up in this [1] branch, where configure request is being 
forwarded to the leader. From some reason, task configuration is not done on 
leader. However, MirrorMaker does not pack RestServer and therefore that 
request will never succeed.

I am not sure what is going no or why it does seem to work better on 2.4.0. I 
noticed that connector start procedure did involve less callback on 2.4.0 in 
connector start sequence [2].

I believe the root cause is change in startup procedure of connectors in 
general. In vestion 2.4, when connector is started by the leader here [1] it 
will immediately setup and submit configuration for connector's tasks. However, 
in 2.7.1, is it more asynchronous. Connector is started here [2] 

Does this look like a bug? Or am I doing something wrong?

 

[1] 
https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494

[2] 
https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236


> MirrorMaker 2 not always start tasks
> 
>
> Key: KAFKA-13196
> URL: https://issues.apache.org/jira/browse/KAFKA-13196
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.1
>Reporter: Jozef Vilcek
>Priority: Major
>
> I am using MirrorMaker 2.0 and running it via [ 
> MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java]
>  class. This method will start up `DistributedHerder` and will use 
> non-functional `advertisedUrl`, and therefore workers can not talk to each 
> other and coordinate.
> After upgrading from version `2.4.0` to `2.7.1` I did notice that when I am 
> starting up mirror maker ti does not always start tasks - just connector is 
> executing. Doing some amount of stop and start it will eventually start tasks 
> too.
> After a bit of digging I did noticed that in attempt to configure connector's 
> task, code ends up in this [1] branch, where configure request is being 
> forwarded to the leader. From some reason, task configuration is not done on 
> leader. However, MirrorMaker does not pack RestServer and therefore that 
> request will never succeed.
> I am not sure what is going no or why it does seem to work better on 2.4.0. I 
> noticed that connector start procedure did involve less callbacks on 2.4.0 in 
> connector start sequence

[jira] [Updated] (KAFKA-13196) MirrorMaker 2 not always start tasks

2021-08-12 Thread Jozef Vilcek (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jozef Vilcek updated KAFKA-13196:
-
Description: 
I am using MirrorMaker 2.0 and running it via [ 
MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java]
 class. This method will start up `DistributedHerder` and will use 
non-functional `advertisedUrl`, and therefore workers can not talk to each 
other and coordinate.

After upgrading from version `2.4.0` to `2.7.1` I did notice that when I am 
starting up mirror maker ti does not always start tasks - just connector is 
executing. Doing some amount of stop and start it will eventually start tasks 
too.

After a bit of digging I did noticed that in attempt to configure connector's 
task, code ends up in this [1] branch, where configure request is being 
forwarded to the leader. From some reason, task configuration is not done on 
leader. However, MirrorMaker does not pack RestServer and therefore that 
request will never succeed.

I am not sure what is going no or why it does seem to work better on 2.4.0. I 
noticed that connector start procedure did involve less callbacks on 2.4.0 in 
connector start sequence [2].

 

 

[1] 
[https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494]

[2] 
[https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236]

  was:
I am using MirrorMaker 2.0 and running it via [ 
MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java]
 class. This method will start up `DistributedHerder` and will use 
non-functional `advertisedUrl`, and therefore workers can not talk to each 
other and coordinate.

After upgrading from version `2.4.0` to `2.7.1` I did notice that when I am 
starting up mirror maker ti does not always start tasks - just connector is 
executing. Doing some amount of stop and start it will eventually start tasks 
too.

After a bit of digging I did noticed that in attempt to configure connector's 
task, code ends up in this [1] branch, where configure request is being 
forwarded to the leader. From some reason, task configuration is not done on 
leader. However, MirrorMaker does not pack RestServer and therefore that 
request will never succeed.

I am not sure what is going no or why it does seem to work better on 2.4.0. I 
noticed that connector start procedure did involve less callbacks on 2.4.0 in 
connector start sequence [2].

 

Does this look like a bug? Or am I doing something wrong?

 

[1] 
[https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494]

[2] 
[https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236]


> MirrorMaker 2 not always start tasks
> 
>
> Key: KAFKA-13196
> URL: https://issues.apache.org/jira/browse/KAFKA-13196
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.1
>Reporter: Jozef Vilcek
>Priority: Major
>
> I am using MirrorMaker 2.0 and running it via [ 
> MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java]
>  class. This method will start up `DistributedHerder` and will use 
> non-functional `advertisedUrl`, and therefore workers can not talk to each 
> other and coordinate.
> After upgrading from version `2.4.0` to `2.7.1` I did notice that when I am 
> starting up mirror maker ti does not always start tasks - just connector is 
> executing. Doing some amount of stop and start it will eventually start tasks 
> too.
> After a bit of digging I did noticed that in attempt to configure connector's 
> task, code ends up in this [1] branch, where configure request is being 
> forwarded to the leader. From some reason, task configuration is not done on 
> leader. However, MirrorMaker does not pack RestServer and therefore that 
> request will never succeed.
> I am not sure what is going no or why it does seem to work better on 2.4.0. I 
> noticed that connector start procedure did involve less callbacks on 2.4.0 in 
> connector start sequence [2].
>  
>  
> [1] 
> [https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494]
> [2] 
> [https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236]



--
This message was se

[jira] [Updated] (KAFKA-13196) MirrorMaker 2 not always start tasks

2021-08-12 Thread Jozef Vilcek (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jozef Vilcek updated KAFKA-13196:
-
Summary: MirrorMaker 2 not always start tasks  (was: MirrorMaker 2 not 
always starting tasks)

> MirrorMaker 2 not always start tasks
> 
>
> Key: KAFKA-13196
> URL: https://issues.apache.org/jira/browse/KAFKA-13196
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.1
>Reporter: Jozef Vilcek
>Priority: Major
>
> I am using MirrorMaker 2.0 and running it via [ 
> MirrorMaker.java|https://github.com/apache/kafka/blob/2.7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java]
>  class. This method will start up `DistributedHerder` and will use 
> non-functional `advertisedUrl`, and therefore workers can not talk to each 
> other and coordinate.
> After upgrading from version `2.4.0` to `2.7.1` I did notice that when I am 
> starting up mirror maker ti does not always start tasks - just connector is 
> executing. Doing some amount of stop and start it will eventually start tasks 
> too.
> After a bit of digging I did noticed that in attempt to configure connector's 
> task, code ends up in this [1] branch, where configure request is being 
> forwarded to the leader. From some reason, task configuration is not done on 
> leader. However, MirrorMaker does not pack RestServer and therefore that 
> request will never succeed.
> I am not sure what is going no or why it does seem to work better on 2.4.0. I 
> noticed that connector start procedure did involve less callback on 2.4.0 in 
> connector start sequence [2].
> I believe the root cause is change in startup procedure of connectors in 
> general. In vestion 2.4, when connector is started by the leader here [1] it 
> will immediately setup and submit configuration for connector's tasks. 
> However, in 2.7.1, is it more asynchronous. Connector is started here [2] 
> Does this look like a bug? Or am I doing something wrong?
>  
> [1] 
> https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1494
> [2] 
> https://github.com/apache/kafka/blob/2.4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1236



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13197) KStream-GlobalKTable join semantics don't match documentation

2021-08-12 Thread Tommy Becker (Jira)
Tommy Becker created KAFKA-13197:


 Summary: KStream-GlobalKTable join semantics don't match 
documentation
 Key: KAFKA-13197
 URL: https://issues.apache.org/jira/browse/KAFKA-13197
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.7.0
Reporter: Tommy Becker


As part of KAFKA-10277, the behavior of KStream-GlobalKTable joins was changed. 
It appears the change was intended to merely relax a requirement but it 
actually broke backwards compatibility. Although it does allow {{null}} keys 
and values in the KStream to be joined, it now excludes {{null}} results of the 
{{KeyValueMapper}}. We have an application which can return {{null}} from the 
{{KeyValueMapper}} for non-null keys in the KStream, and relies on these nulls 
being passed to the {{ValueJoiner}}. Indeed the javadoc still explicitly says 
this is done:
{quote}If a KStream input record key or value is null the record will not be 
included in the join operation and thus no output record will be added to the 
resulting KStream.
 If keyValueMapper returns null implying no match exists, a null value will be 
provided to ValueJoiner.
{quote}
Both these statements are incorrect.

I think the new behavior is worse than the previous/documented behavior. It 
feels more reasonable to have a non-null stream record map to a null join key 
(our use-case is event-enhancement where the incoming record doesn't have the 
join field), than the reverse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] SkyTianTian commented on pull request #10277: KAFKA-9914: Fix replication cycle detection

2021-08-12 Thread GitBox


SkyTianTian commented on pull request #10277:
URL: https://github.com/apache/kafka/pull/10277#issuecomment-897736948


   Hi, is there anyone who knows any workaround to avoid this heartbeat issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11200: KAFKA-13192: Prevent inconsistent broker.id/node.id values

2021-08-12 Thread GitBox


jsancio commented on a change in pull request #11200:
URL: https://github.com/apache/kafka/pull/11200#discussion_r687854292



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1905,6 +1905,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
 
   @nowarn("cat=deprecation")
   private def validateValues(): Unit = {
+val nodeIdValue = values.get(KafkaConfig.NodeIdProp).asInstanceOf[Int]
+if (nodeIdValue >= 0) {
+  val brokerIdValue = 
values.get(KafkaConfig.BrokerIdProp).asInstanceOf[Int]

Review comment:
   ```suggestion
   val nodeIdValue = getInt(KafkaConfig.NodeIdProp)
   if (nodeIdValue >= 0) {
 val brokerIdValue = getInt(KafkaConfig.BrokerIdProp)
   ```
   For consistency, this type prefers to use `getInt` which does the casting. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] SkyTianTian edited a comment on pull request #10277: KAFKA-9914: Fix replication cycle detection

2021-08-12 Thread GitBox


SkyTianTian edited a comment on pull request #10277:
URL: https://github.com/apache/kafka/pull/10277#issuecomment-897736948


   Hi, is there anyone who knows any workaround to avoid this heartbeat issue 
temporarily?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11191: KAFKA-13173 Making the controller fence stale brokers one at a time if multiple stale brokers are discovered at the same time

2021-08-12 Thread GitBox


jsancio commented on a change in pull request #11191:
URL: https://github.com/apache/kafka/pull/11191#discussion_r687047710



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
##
@@ -161,6 +163,14 @@ public void deactivate() {
 return brokerRegistrations;
 }
 
+Set fencedBrokerIds() {

Review comment:
   Looks like this method is only used for tests. How about moving this to 
`ReplicationControlManagerTest`?

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
##
@@ -420,22 +419,22 @@ long nextCheckTimeNs() {
 }
 
 /**
- * Find the stale brokers which haven't heartbeated in a long time, and 
which need to
- * be fenced.
+ * Check if the oldest broker to have hearbeated has already violated the
+ * sessionTimeoutNs timeout and needs to be fenced.
  *
- * @return  A list of node IDs.
+ * @return  An Optional broker node id.
  */
-List findStaleBrokers() {
-List nodes = new ArrayList<>();
+Optional findOneStaleBroker() {
 BrokerHeartbeatStateIterator iterator = unfenced.iterator();
-while (iterator.hasNext()) {
+if (iterator.hasNext()) {

Review comment:
   Isn't `unfenced.first()` suppose to return the oldest heartbeat? If so, 
can use that method and check for `null`?

##
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##
@@ -173,6 +176,121 @@ private void 
testDelayedConfigurationOperations(LocalLogManagerTestEnv logEnv,
 assertEquals(Collections.singletonMap(BROKER0, ApiError.NONE), 
future1.get());
 }
 
+@Test
+public void testFenceMultipleBrokers() throws Throwable {
+int brokerCount = 5;
+int brokersToKeepUnfenced = 1;
+short replicationFactor = 5;
+Long sessionTimeoutSec = 1L;
+Long sleepMillis = (sessionTimeoutSec * 1000) / 2;
+
+try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
+try (QuorumControllerTestEnv controlEnv =
+new QuorumControllerTestEnv(logEnv, b -> 
b.setConfigDefs(CONFIGS), Optional.of(sessionTimeoutSec))) {
+ListenerCollection listeners = new ListenerCollection();
+listeners.add(new Listener().setName("PLAINTEXT").
+setHost("localhost").setPort(9092));
+QuorumController active = controlEnv.activeController();
+Map brokerEpochs = new HashMap<>();
+for (int brokerId = 0; brokerId < brokerCount; brokerId++) {
+CompletableFuture reply = 
active.registerBroker(
+new BrokerRegistrationRequestData().
+setBrokerId(brokerId).
+setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
+setListeners(listeners));
+brokerEpochs.put(brokerId, reply.get().epoch());
+}
+
+// Brokers are only registered but still fenced
+// Topic creation with no available unfenced brokers should 
fail
+CreateTopicsRequestData createTopicsRequestData =
+new CreateTopicsRequestData().setTopics(
+new CreatableTopicCollection(Collections.singleton(
+new 
CreatableTopic().setName("foo").setNumPartitions(1).
+
setReplicationFactor(replicationFactor)).iterator()));
+CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
+createTopicsRequestData).get();
+assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),
+createTopicsResponseData.topics().find("foo").errorCode());
+assertEquals("Unable to replicate the partition " + 
replicationFactor + " time(s): All brokers " +
+"are currently fenced.", 
createTopicsResponseData.topics().find("foo").errorMessage());
+
+// Unfence all brokers
+sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+createTopicsResponseData = active.createTopics(
+createTopicsRequestData).get();
+assertEquals(Errors.NONE.code(), 
createTopicsResponseData.topics().find("foo").errorCode());
+Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
+sendBrokerheartbeat(active, brokerCount, brokerEpochs);
+Thread.sleep(sleepMillis);

Review comment:
   Can we remove this sleep? What are we trying to test with this sleep?

##
File path: 
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
##
@@ -

[GitHub] [kafka] socutes closed pull request #11205: KAFKA-10900: Add metrics enumerated in KIP-630

2021-08-12 Thread GitBox


socutes closed pull request #11205:
URL: https://github.com/apache/kafka/pull/11205


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13188) Release the memory back into MemoryPool

2021-08-12 Thread Alok Nikhil (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17398148#comment-17398148
 ] 

Alok Nikhil commented on KAFKA-13188:
-

[~luwang] I see that the consumer side memory pool management KIP was never 
implemented - 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-81:+Bound+Fetch+memory+usage+in+the+consumer]

The default is `MemoryPool.NONE`. How did LinkedIn measure the performance 
here? Is there a customization / config that allows the consumer to use a 
different memory pool?

> Release the memory back into MemoryPool
> ---
>
> Key: KAFKA-13188
> URL: https://issues.apache.org/jira/browse/KAFKA-13188
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Alok Nikhil
>Priority: Major
> Fix For: 3.0.1
>
>
> Tushar made a [hotfix change|https://github.com/linkedin/kafka/pull/186] to 
> the linkedin/kafka repo hosting apache kafka 2.4.
> The change is about releasing memory back to the MemoryPool for the kafka 
> consumer, and his benchmark showed significant improvement in terms of the 
> memory graduating from Young Gen and promoted to Old Gen.
> Given the benefit, the change should also be added trunk.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #11191: KAFKA-13173 Making the controller fence stale brokers one at a time if multiple stale brokers are discovered at the same time

2021-08-12 Thread GitBox


hachikuji commented on pull request #11191:
URL: https://github.com/apache/kafka/pull/11191#issuecomment-897785278


   @niket-goel I'm inclined to merge this as is so that we don't need to wait 
for another build. Perhaps you can submit a small follow-up to address @jsancio 
's comments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #11191: KAFKA-13173 Making the controller fence stale brokers one at a time if multiple stale brokers are discovered at the same time

2021-08-12 Thread GitBox


hachikuji merged pull request #11191:
URL: https://github.com/apache/kafka/pull/11191


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] socutes opened a new pull request #11207: KAFKA-10900: Add metrics enumerated in KIP-630

2021-08-12 Thread GitBox


socutes opened a new pull request #11207:
URL: https://github.com/apache/kafka/pull/11207


   KIP-630 enumerates a few metrics. Makes sure that those metrics are 
implemented. Add the following Metrics.
   
   kafka.controller:type=KafkaController,name=GenSnapshotLatencyMs , A 
histogram of the amount of time it took to generate a snapshot.
   kafka.controller:type=KafkaController,name=LoadSnapshotLatencyMs , A 
histogram of the amount of time it took to load the snapshot.
   kafka.controller:type=KafkaController,name=SnapshotLag ,The number of 
offsets between the largest snapshot offset and the high-watermark.
   kafka.controller:type=KafkaController,name=SnapshotSizeBytes , Size of the 
latest snapshot in bytes.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] socutes commented on pull request #11207: KAFKA-10900: Add metrics enumerated in KIP-630

2021-08-12 Thread GitBox


socutes commented on pull request #11207:
URL: https://github.com/apache/kafka/pull/11207#issuecomment-897797158


   @jsancio @cmccabe please help review this pr. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13016) Interpret snapshot header version to correctly parse the snapshot

2021-08-12 Thread loboxu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loboxu reassigned KAFKA-13016:
--

Assignee: loboxu

> Interpret snapshot header version to correctly parse the snapshot
> -
>
> Key: KAFKA-13016
> URL: https://issues.apache.org/jira/browse/KAFKA-13016
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Niket Goel
>Assignee: loboxu
>Priority: Minor
>
> https://issues.apache.org/jira/browse/KAFKA-12952 adds delimiters to the 
> snapshots. These delimiters serve as start and end markers for the snapshots 
> and also contain some metadata information about the snapshots. The snapshot 
> consumers need to interpret the version within the header to correctly parse 
> the schema of the snapshot being consumed or throw meaningful errors when 
> consuming incompatible snapshot versions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12956) Validate the snapshot id when the state machine freeze a snapshot

2021-08-12 Thread loboxu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loboxu reassigned KAFKA-12956:
--

Assignee: loboxu

> Validate the snapshot id when the state machine freeze a snapshot
> -
>
> Key: KAFKA-12956
> URL: https://issues.apache.org/jira/browse/KAFKA-12956
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Haoran Xuan
>Assignee: loboxu
>Priority: Major
>
> This is similar to KAFKA-10800, in this PR, optionally validate the snapshot 
> id when `onSnapshotFrozen` is being called. The validation logic will be 
> implemented in KAFKA-10800, and this Jira is supposed to directly call that 
> logic.
> Currently, the `onSnapshotFrozen` can be called by `KafkaRaftClient` and 
> `SnapshotWriter`. Do not validate if it is called by `KafkaRaftClient` when 
> it's downloading snapshot from leader, do validate if it is called by  
> `SnapshotWriter` which implies generating a specific snapshot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] niket-goel commented on pull request #11191: KAFKA-13173 Making the controller fence stale brokers one at a time if multiple stale brokers are discovered at the same time

2021-08-12 Thread GitBox


niket-goel commented on pull request #11191:
URL: https://github.com/apache/kafka/pull/11191#issuecomment-897806958


   @hachikuji Will follow up with a follow-up. Thanks for merging this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] niket-goel opened a new pull request #11208: MINOR: Refactored BrokerHeartbeatManager::findOneStaleBroker to not use

2021-08-12 Thread GitBox


niket-goel opened a new pull request #11208:
URL: https://github.com/apache/kafka/pull/11208


   Some code refactoring based on https://github.com/apache/kafka/pull/11191 
feedback


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13162) ElectLeader API must be forwarded to Controller

2021-08-12 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17398209#comment-17398209
 ] 

Jason Gustafson commented on KAFKA-13162:
-

This has turned out to be more work than expected. I am going to change the 
target version to 3.0.1. cc [~kkonstantine]

> ElectLeader API must be forwarded to Controller
> ---
>
> Key: KAFKA-13162
> URL: https://issues.apache.org/jira/browse/KAFKA-13162
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 3.0.0
>
>
> We're missing the logic to forward ElectLeaders requests to the controller. 
> This means that `kafka-leader-election.sh` does not work correctly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13162) ElectLeader API must be forwarded to Controller

2021-08-12 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13162:

Fix Version/s: (was: 3.0.0)
   3.0.1

> ElectLeader API must be forwarded to Controller
> ---
>
> Key: KAFKA-13162
> URL: https://issues.apache.org/jira/browse/KAFKA-13162
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 3.0.1
>
>
> We're missing the logic to forward ElectLeaders requests to the controller. 
> This means that `kafka-leader-election.sh` does not work correctly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13173) KRaft controller does not handle simultaneous broker expirations correctly

2021-08-12 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-13173.
--
Resolution: Fixed

> KRaft controller does not handle simultaneous broker expirations correctly
> --
>
> Key: KAFKA-13173
> URL: https://issues.apache.org/jira/browse/KAFKA-13173
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Niket Goel
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In `ReplicationControlManager.fenceStaleBrokers`, we find all of the current 
> stale replicas and attempt to remove them from the ISR. However, when 
> multiple expirations occur at once, we do not properly accumulate the ISR 
> changes. For example, I ran a test where the ISR of a partition was 
> initialized to [1, 2, 3]. Then I triggered a timeout of replicas 2 and 3 at 
> the same time. The records that were generated by `fenceStaleBrokers` were 
> the following:
> {code}
> ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 3], leader=1, replicas=null, 
> removingReplicas=null, addingReplicas=null) at version 0), 
> ApiMessageAndVersion(FenceBrokerRecord(id=2, epoch=102) at version 0), 
> ApiMessageAndVersion(PartitionChangeRecord(partitionId=0, 
> topicId=_seg8hBuSymBHUQ1sMKr2g, isr=[1, 2], leader=1, replicas=null, 
> removingReplicas=null, addingReplicas=null) at version 0), 
> ApiMessageAndVersion(FenceBrokerRecord(id=3, epoch=103) at version 0)]
> {code}
> First the ISR is shrunk to [1, 3] as broker 2 is fenced. We also see the 
> record to fence broker 2. Then the ISR is modified to [1, 2] as the fencing 
> of broker 3 is handled. So we did not account for the fact that we had 
> already fenced broker 2 in the request. 
> A simple solution for now is to change the logic to handle fencing only one 
> broker at a time. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13142) KRaft brokers do not validate dynamic configs before forwarding them to controller

2021-08-12 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-13142:
-
Fix Version/s: (was: 3.0.0)
   3.0.1
Affects Version/s: 3.0.0

> KRaft brokers do not validate dynamic configs before forwarding them to 
> controller
> --
>
> Key: KAFKA-13142
> URL: https://issues.apache.org/jira/browse/KAFKA-13142
> Project: Kafka
>  Issue Type: Task
>  Components: kraft
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.1
>
>
> The KRaft brokers are not currently validating dynamic configs before 
> forwarding them to the controller. To ensure that KRaft clusters are easily 
> upgradable it would be a good idea to validate dynamic configs in the first 
> release of KRaft so that invalid dynamic configs are never stored.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #11199: KAFKA-13194: bound cleaning by both LSO and HWM when firstUnstableOffsetMetadata is None

2021-08-12 Thread GitBox


junrao commented on a change in pull request #11199:
URL: https://github.com/apache/kafka/pull/11199#discussion_r687993862



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -595,8 +595,8 @@ private[log] object LogCleanerManager extends Logging {
 // may be cleaned
 val firstUncleanableDirtyOffset: Long = Seq(
 
-  // we do not clean beyond the first unstable offset
-  log.firstUnstableOffset,
+  // we do not clean beyond the last stable offset

Review comment:
   This is an existing issue. But could we update the comment in line 593 
to include last stable offset too?

##
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##
@@ -541,6 +541,29 @@ class LogCleanerManagerTest extends Logging {
 while(log.numberOfSegments < 8)
   log.appendAsLeader(records(log.logEndOffset.toInt, 
log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0)
 
+log.updateHighWatermark(50)
+
+val lastCleanOffset = Some(0L)
+val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
lastCleanOffset, time.milliseconds)
+assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable 
offset starts at the beginning of the log.")
+assertEquals(log.highWatermark, 
cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset is 
bounded by the hwm.")

Review comment:
   Since the description of the test says bounded by LSO, should we change 
log.highWatermark to log.lastStableOffset and the error message accordingly? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #11200: KAFKA-13192: Prevent inconsistent broker.id/node.id values

2021-08-12 Thread GitBox


rondagostino commented on a change in pull request #11200:
URL: https://github.com/apache/kafka/pull/11200#discussion_r688014091



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1905,6 +1905,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
 
   @nowarn("cat=deprecation")
   private def validateValues(): Unit = {
+val nodeIdValue = values.get(KafkaConfig.NodeIdProp).asInstanceOf[Int]
+if (nodeIdValue >= 0) {
+  val brokerIdValue = 
values.get(KafkaConfig.BrokerIdProp).asInstanceOf[Int]
+  if (brokerIdValue != Defaults.BrokerId && brokerIdValue != nodeIdValue) {
+throw new ConfigException(s"The values for broker.id ($brokerIdValue) 
and node.id ($nodeIdValue) must be the same if both are specified")
+  }
+}

Review comment:
   Good question.  It's legal to set node.id for the ZooKeeper case, so I 
think this inconsistency check is applicable in all cases and should be done 
here, at the top.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #11200: KAFKA-13192: Prevent inconsistent broker.id/node.id values

2021-08-12 Thread GitBox


rondagostino commented on a change in pull request #11200:
URL: https://github.com/apache/kafka/pull/11200#discussion_r688016827



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1905,6 +1905,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
 
   @nowarn("cat=deprecation")
   private def validateValues(): Unit = {
+val nodeIdValue = values.get(KafkaConfig.NodeIdProp).asInstanceOf[Int]
+if (nodeIdValue >= 0) {
+  val brokerIdValue = 
values.get(KafkaConfig.BrokerIdProp).asInstanceOf[Int]

Review comment:
   > prefers to use getInt
   
   God catch -- fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #11200: KAFKA-13192: Prevent inconsistent broker.id/node.id values

2021-08-12 Thread GitBox


rondagostino commented on a change in pull request #11200:
URL: https://github.com/apache/kafka/pull/11200#discussion_r688016827



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1905,6 +1905,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
 
   @nowarn("cat=deprecation")
   private def validateValues(): Unit = {
+val nodeIdValue = values.get(KafkaConfig.NodeIdProp).asInstanceOf[Int]
+if (nodeIdValue >= 0) {
+  val brokerIdValue = 
values.get(KafkaConfig.BrokerIdProp).asInstanceOf[Int]

Review comment:
   > prefers to use getInt
   
   Good catch -- fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #11199: KAFKA-13194: bound cleaning by both LSO and HWM when firstUnstableOffsetMetadata is None

2021-08-12 Thread GitBox


lbradstreet commented on a change in pull request #11199:
URL: https://github.com/apache/kafka/pull/11199#discussion_r688051673



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -595,8 +595,8 @@ private[log] object LogCleanerManager extends Logging {
 // may be cleaned
 val firstUncleanableDirtyOffset: Long = Seq(
 
-  // we do not clean beyond the first unstable offset
-  log.firstUnstableOffset,
+  // we do not clean beyond the last stable offset

Review comment:
   Makes sense. I've updated it with a better explanation of what we bound 
by.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #11199: KAFKA-13194: bound cleaning by both LSO and HWM when firstUnstableOffsetMetadata is None

2021-08-12 Thread GitBox


lbradstreet commented on a change in pull request #11199:
URL: https://github.com/apache/kafka/pull/11199#discussion_r688051998



##
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##
@@ -541,6 +541,29 @@ class LogCleanerManagerTest extends Logging {
 while(log.numberOfSegments < 8)
   log.appendAsLeader(records(log.logEndOffset.toInt, 
log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0)
 
+log.updateHighWatermark(50)
+
+val lastCleanOffset = Some(0L)
+val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
lastCleanOffset, time.milliseconds)
+assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable 
offset starts at the beginning of the log.")
+assertEquals(log.highWatermark, 
cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset is 
bounded by the hwm.")

Review comment:
   This is what I intended since the lastStableOffset should equal the 
highWatermark, but I think it makes sense to keep the intent. Instead I have 
checked that log.highWatermark == log.lastStableOffset in the line prior to it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13198) TopicsDelta doesn't update deleted topic when processing PartitionChangeRecord

2021-08-12 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13198:
--

 Summary: TopicsDelta doesn't update deleted topic when processing 
PartitionChangeRecord
 Key: KAFKA-13198
 URL: https://issues.apache.org/jira/browse/KAFKA-13198
 Project: Kafka
  Issue Type: Bug
  Components: kraft, replication
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio
 Fix For: 3.0.0


In KRaft when a replica gets reassigned away from a topic partition we are not 
notifying the {{ReplicaManager}} to stop the replica.

On solution is to track those topic partition ids when processing 
{{PartitionChangeRecord}} and to returned them as {{deleted}} when the replica 
manager calls {{calculateDeltaChanges}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao merged pull request #11154: KAFKA-13068: Rename Log to UnifiedLog

2021-08-12 Thread GitBox


junrao merged pull request #11154:
URL: https://github.com/apache/kafka/pull/11154


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13068) Rename Log to UnifiedLog

2021-08-12 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-13068.
-
Fix Version/s: 3.1.0
   Resolution: Fixed

Merged the PR to trunk

> Rename Log to UnifiedLog
> 
>
> Key: KAFKA-13068
> URL: https://issues.apache.org/jira/browse/KAFKA-13068
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kowshik Prakasam
>Assignee: Kowshik Prakasam
>Priority: Major
> Fix For: 3.1.0
>
>
> Once KAFKA-12554 is completed, we can rename Log -> UnifiedLog as described 
> in the doc:  
> [https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit#].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-12 Thread GitBox


guozhangwang commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688142016



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -1369,4 +1385,67 @@ public void onCompletion(RecordMetadata metadata, 
Exception exception) {
 this.userCallback.onCompletion(metadata, exception);
 }
 }
+
+private static class KafkaProducerMetrics {
+private static final String FLUSH = "flush";
+private static final String TXN_INIT = "txn-init";
+private static final String TXN_BEGIN = "txn-begin";
+private static final String TXN_SEND_OFFSETS = "txn-send-offsets";
+private static final String TXN_COMMIT = "txn-commit";
+private static final String TXN_ABORT = "txn-abort";
+private static final String TOTAL_TIME_SUFFIX = "-time-total";
+
+final Map tags;
+final Metrics metrics;
+final Sensor initTimeSensor;
+final Sensor beginTimeSensor;
+final Sensor flushTimeSensor;
+final Sensor sendOffsetsSensor;
+final Sensor commitSensor;
+final Sensor abortSensor;
+
+private KafkaProducerMetrics(Metrics metrics) {
+this.metrics = metrics;
+this.tags = this.metrics.config().tags();
+this.flushTimeSensor = newLatencySensor(FLUSH);
+this.initTimeSensor = newLatencySensor(TXN_INIT);
+this.beginTimeSensor = newLatencySensor(TXN_BEGIN);
+this.sendOffsetsSensor = newLatencySensor(TXN_SEND_OFFSETS);
+this.commitSensor = newLatencySensor(TXN_COMMIT);
+this.abortSensor = newLatencySensor(TXN_ABORT);
+}
+
+private Sensor newLatencySensor(String name) {
+Sensor sensor = metrics.sensor(name + TOTAL_TIME_SUFFIX);
+sensor.add(
+metrics.metricName(name + TOTAL_TIME_SUFFIX, 
ProducerMetrics.GROUP, tags),
+new CumulativeSum()
+);
+return sensor;
+}
+
+private void recordFlush(long duration) {
+flushTimeSensor.record(duration);
+}
+
+private void recordInit(long duration) {
+initTimeSensor.record(duration);
+}
+
+private void recordBegin(long duration) {

Review comment:
   nit: better leave the full name as recordBeginTxn/AbortTxn/CommitTxn.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -1369,4 +1385,67 @@ public void onCompletion(RecordMetadata metadata, 
Exception exception) {
 this.userCallback.onCompletion(metadata, exception);
 }
 }
+
+private static class KafkaProducerMetrics {
+private static final String FLUSH = "flush";
+private static final String TXN_INIT = "txn-init";
+private static final String TXN_BEGIN = "txn-begin";
+private static final String TXN_SEND_OFFSETS = "txn-send-offsets";
+private static final String TXN_COMMIT = "txn-commit";
+private static final String TXN_ABORT = "txn-abort";
+private static final String TOTAL_TIME_SUFFIX = "-time-total";
+
+final Map tags;
+final Metrics metrics;
+final Sensor initTimeSensor;
+final Sensor beginTimeSensor;

Review comment:
   Ditto here; better rename it to `beginTxn` / `commitTxn` / `abortTxn`?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##
@@ -178,12 +180,39 @@ public void resetProducer() {
 throw new IllegalStateException("Expected eos-v2 to be enabled, 
but the processing mode was " + processingMode);
 }
 
+final long start = Time.SYSTEM.nanoseconds();
 producer.close();
+final long closeTime = Time.SYSTEM.nanoseconds() - start;
+
+oldProducerTotalBlockedTime += closeTime + totalBlockedTime(producer);
 
 producer = clientSupplier.getProducer(eosV2ProducerConfigs);
 transactionInitialized = false;
 }
 
+private static double getMetricValue(final Map metrics,
+ final String name) {
+return metrics.keySet().stream()
+.filter(n -> n.name().equals(name))
+.findFirst()

Review comment:
   Maybe worth checking there's only one element after the filtering? It 
should not be expected to have more than one right?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version

[GitHub] [kafka] lbradstreet commented on pull request #11199: KAFKA-13194: bound cleaning by both LSO and HWM when firstUnstableOffsetMetadata is None

2021-08-12 Thread GitBox


lbradstreet commented on pull request #11199:
URL: https://github.com/apache/kafka/pull/11199#issuecomment-898057637


   > @lbradstreet : Thanks for the updated PR. LGTM. Are the 29 test failures 
related to this PR?
   
   @junrao yeah, they're looking related. Let me clean those up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #11209: KAFKA-12465: Logic about inconsistent cluster id

2021-08-12 Thread GitBox


dengziming opened a new pull request #11209:
URL: https://github.com/apache/kafka/pull/11209


   *More detailed description of your change*
   When handling a response, invalid cluster id are fatal unless a previous 
response contained a valid cluster id.
   Note that this is not a perfect, see 
https://github.com/apache/kafka/pull/10289#discussion_r595378358
   but this is the best as far as I can see because we can catch 
misconfiguration early.
   
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #11209: KAFKA-12465: Logic about inconsistent cluster id

2021-08-12 Thread GitBox


dengziming commented on pull request #11209:
URL: https://github.com/apache/kafka/pull/11209#issuecomment-898134690


   Hi @hachikuji @jsancio , PTAL.  I also moved the logic for 
`UNKNOWN_TOPIC_OR_PARTITION`  in handleFetchSnapshot to `RaftUtil.java`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11208: MINOR: Refactored BrokerHeartbeatManager::findOneStaleBroker to not use

2021-08-12 Thread GitBox


showuon commented on a change in pull request #11208:
URL: https://github.com/apache/kafka/pull/11208#discussion_r688206957



##
File path: 
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
##
@@ -200,6 +200,14 @@ void registerBrokers(Integer... brokerIds) throws 
Exception {
 }
 }
 
+Set fencedBrokerIds() {
+return clusterControl.brokerRegistrations().values()

Review comment:
   Since it's put inside test class now, we cat declare it as `private`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-12 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688236582



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -590,9 +593,11 @@ else if (acks != -1)
 public void initTransactions() {
 throwIfNoTransactionManager();
 throwIfProducerClosed();
+long now = time.nanoseconds();

Review comment:
   ah - I thought we were measuring in nanos. Not sure where I got that 
impression. I'll change to millis




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-12 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688239443



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##
@@ -178,12 +180,39 @@ public void resetProducer() {
 throw new IllegalStateException("Expected eos-v2 to be enabled, 
but the processing mode was " + processingMode);
 }
 
+final long start = Time.SYSTEM.nanoseconds();

Review comment:
   ah good call!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-12 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688240676



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##
@@ -730,6 +732,41 @@ public void testFlushCompleteSendOfInflightBatches() {
 }
 }
 
+private static Double getMetricValue(final KafkaProducer producer, 
final String name) {
+Metrics metrics = producer.metrics;
+Metric metric =  metrics.metric(metrics.metricName(name, 
"producer-metrics"));
+return (Double) metric.metricValue();
+}
+
+@Test
+public void testFlushMeasureLatency() {
+Map configs = new HashMap<>();
+configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+
+Time time = new MockTime(1);
+MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+MockClient client = new MockClient(time, metadata);
+client.updateMetadata(initialUpdateResponse);
+
+try (KafkaProducer producer = kafkaProducer(
+configs,
+new StringSerializer(),
+new StringSerializer(),
+metadata,
+client,
+null,
+time
+)) {
+producer.flush();
+double first = getMetricValue(producer, "flush-time-total");
+assertTrue(first > 99.0);

Review comment:
   It's using mock time, so the value here is well-known (should be 1 
second). I'm using > rather than equalTo because I don't want the test to fail 
spuriously on floating point rounding errors. It would probably be better to 
use 
[isCloseTo](http://hamcrest.org/JavaHamcrest/javadoc/1.3/org/hamcrest/number/IsCloseTo.html)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-12 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688240994



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##
@@ -178,12 +180,39 @@ public void resetProducer() {
 throw new IllegalStateException("Expected eos-v2 to be enabled, 
but the processing mode was " + processingMode);
 }
 
+final long start = Time.SYSTEM.nanoseconds();
 producer.close();
+final long closeTime = Time.SYSTEM.nanoseconds() - start;
+
+oldProducerTotalBlockedTime += closeTime + totalBlockedTime(producer);
 
 producer = clientSupplier.getProducer(eosV2ProducerConfigs);
 transactionInitialized = false;
 }
 
+private static double getMetricValue(final Map metrics,
+ final String name) {
+return metrics.keySet().stream()
+.filter(n -> n.name().equals(name))
+.findFirst()

Review comment:
   yeah it should always be one




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-12 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688242729



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##
@@ -200,6 +201,30 @@ public RocksDBMetricsRecordingTrigger 
rocksDBMetricsRecordingTrigger() {
 }
 }
 
+public  void addThreadLevelImmutableMetric(final String name,
+final String description,
+final String threadId,
+final T value) {
+final MetricName metricName = metrics.metricName(
+name, THREAD_LEVEL_GROUP, description, 
threadLevelTagMap(threadId));
+synchronized (threadLevelMetrics) {

Review comment:
   not sure I follow the question here - we are using `threadLevelMetrics` 
to track the metrics for each thread so they can be cleaned up later on when 
the thread exits. What's wrong with using the thread id for that?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-12 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r68825



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+public class StreamsThreadTotalBlockedTime {
+final Consumer consumer;
+final Consumer restoreConsumer;
+final Supplier producerTotalBlockedTime;
+
+StreamsThreadTotalBlockedTime(
+final Consumer consumer,
+final Consumer restoreConsumer,
+final Supplier producerTotalBlockedTime
+) {
+this.consumer = consumer;
+this.restoreConsumer = restoreConsumer;
+this.producerTotalBlockedTime = producerTotalBlockedTime;
+}
+
+final double getMetricValue(

Review comment:
   I tried doing it this way at first, but found it hard to loop over the 
producers in `TaskManager/Tasks/ActiveTaskCreator` without breaking those 
abstractions by adding methods to return the producers so we could get the 
metrics out. So then I went the route of having the total blocked time metric 
implementation ask `TaskManager` for it's total blocked time component.
   
   > we can also use this in unit test e.g. 
https://github.com/apache/kafka/pull/11149/files#diff-599de0f96fbd5ba6b3d919881426269fc72fe8bbe8e2436fab87d9abe84e8dbaR735
   What do you mean here? This is the producer's unit test, and this method is 
computing total blocked time for a streams app.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-12 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688245686



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -590,9 +593,11 @@ else if (acks != -1)
 public void initTransactions() {
 throwIfNoTransactionManager();
 throwIfProducerClosed();
+long now = time.nanoseconds();

Review comment:
   Actually now I remember why - the bufferpool and selector total blocked 
times are all being measured in nanos and use the suffix `time-total`. So chose 
the naming convention and unit accordingly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-12 Thread GitBox


rodesai commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688255651



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##
@@ -730,6 +732,41 @@ public void testFlushCompleteSendOfInflightBatches() {
 }
 }
 
+private static Double getMetricValue(final KafkaProducer producer, 
final String name) {
+Metrics metrics = producer.metrics;
+Metric metric =  metrics.metric(metrics.metricName(name, 
"producer-metrics"));
+return (Double) metric.metricValue();
+}
+
+@Test
+public void testFlushMeasureLatency() {
+Map configs = new HashMap<>();
+configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+
+Time time = new MockTime(1);
+MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
+
+MockClient client = new MockClient(time, metadata);
+client.updateMetadata(initialUpdateResponse);
+
+try (KafkaProducer producer = kafkaProducer(
+configs,
+new StringSerializer(),
+new StringSerializer(),
+metadata,
+client,
+null,
+time
+)) {
+producer.flush();
+double first = getMetricValue(producer, "flush-time-total");
+assertTrue(first > 99.0);

Review comment:
   Ah actually this doesn't work because the mock time is passed to and 
used from the other client threads - so the value is not predictable. So the 
best we can do is assert that at least one tick (100 nanoseconds has 
passed). I'll update the test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12155) Delay increasing the log start offset

2021-08-12 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming resolved KAFKA-12155.

Resolution: Fixed

> Delay increasing the log start offset
> -
>
> Key: KAFKA-12155
> URL: https://issues.apache.org/jira/browse/KAFKA-12155
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: David Arthur
>Priority: Major
>
> The implementation in [https://github.com/apache/kafka/pull/9816] increases 
> the log start offset as soon as a snapshot is created that is greater than 
> the log start offset. This is correct but causes some inefficiency in some 
> cases.
>  # Any follower, voters or observers, with an end offset between the leader's 
> log start offset and the leader's latest snapshot will get invalidated. This 
> will cause those follower to fetch the new snapshot and reload it's state 
> machine.
>  # Any {{Listener}} or state machine that has a {{nextExpectedOffset()}} less 
> than the latest snapshot will get invalidated. This will cause the state 
> machine to have to reload its state from the latest snapshot.
> To minimize the frequency of these reloads KIP-630 proposes adding the 
> following configuration:
>  * {{metadata.start.offset.lag.time.max.ms}} - The maximum amount of time 
> that leader will wait for an offset to get replicated to all of the live 
> replicas before advancing the {{LogStartOffset}}. See section “When to 
> Increase the LogStartOffset”. The default is 7 days.
> This description and implementation should be extended to also apply to the 
> state machine, or {{Listener}}. The local log start offset should be 
> increased when all of the {{ListenerContext}}'s {{nextExpectedOffset()}} is 
> greater than the offset of the latest snapshot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kamalcph commented on pull request #11197: 28x TS changes

2021-08-12 Thread GitBox


kamalcph commented on pull request #11197:
URL: https://github.com/apache/kafka/pull/11197#issuecomment-898206391


   retest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13199) Make Task extend Versioned

2021-08-12 Thread Xianghu Wang (Jira)
Xianghu Wang created KAFKA-13199:


 Summary: Make Task extend Versioned
 Key: KAFKA-13199
 URL: https://issues.apache.org/jira/browse/KAFKA-13199
 Project: Kafka
  Issue Type: Wish
Reporter: Xianghu Wang


Since `Task` is versioned, we can make it extends `Versioned` directly



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13199) Make Task extends Versioned

2021-08-12 Thread Xianghu Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xianghu Wang updated KAFKA-13199:
-
Summary: Make Task extends Versioned  (was: Make Task extend Versioned)

> Make Task extends Versioned
> ---
>
> Key: KAFKA-13199
> URL: https://issues.apache.org/jira/browse/KAFKA-13199
> Project: Kafka
>  Issue Type: Wish
>Reporter: Xianghu Wang
>Priority: Major
>
> Since `Task` is versioned, we can make it extends `Versioned` directly



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wangxianghu opened a new pull request #11210: KAFKA-13199: Make Task extends Versioned

2021-08-12 Thread GitBox


wangxianghu opened a new pull request #11210:
URL: https://github.com/apache/kafka/pull/11210


   Since `Task` is versioned, we can make it extends `Versioned` directly, no 
need to introduce `String version()` again
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-12 Thread GitBox


guozhangwang commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688267294



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -590,9 +593,11 @@ else if (acks != -1)
 public void initTransactions() {
 throwIfNoTransactionManager();
 throwIfProducerClosed();
+long now = time.nanoseconds();

Review comment:
   hmm, I know that in selector we use nano seconds, but that should be 
`time-total-ns`, not sure when that's get changed..




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-12 Thread GitBox


guozhangwang commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688267856



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsThreadTotalBlockedTime.java
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+
+public class StreamsThreadTotalBlockedTime {
+final Consumer consumer;
+final Consumer restoreConsumer;
+final Supplier producerTotalBlockedTime;
+
+StreamsThreadTotalBlockedTime(
+final Consumer consumer,
+final Consumer restoreConsumer,
+final Supplier producerTotalBlockedTime
+) {
+this.consumer = consumer;
+this.restoreConsumer = restoreConsumer;
+this.producerTotalBlockedTime = producerTotalBlockedTime;
+}
+
+final double getMetricValue(

Review comment:
   Sounds good then.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11149: KIP-761: add total blocked time metric to streams

2021-08-12 Thread GitBox


guozhangwang commented on a change in pull request #11149:
URL: https://github.com/apache/kafka/pull/11149#discussion_r688268274



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##
@@ -200,6 +201,30 @@ public RocksDBMetricsRecordingTrigger 
rocksDBMetricsRecordingTrigger() {
 }
 }
 
+public  void addThreadLevelImmutableMetric(final String name,
+final String description,
+final String threadId,
+final T value) {
+final MetricName metricName = metrics.metricName(
+name, THREAD_LEVEL_GROUP, description, 
threadLevelTagMap(threadId));
+synchronized (threadLevelMetrics) {

Review comment:
   Oh it's totally fine to use the thread id, it's just that for other 
thread-level metrics we would prefix the thread id with either `internal` or 
`external`, i.e. the "key" would not just be the thread id itself.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11201: MINOR: fix mbean tag name ordering in JMX reporter

2021-08-12 Thread GitBox


showuon commented on a change in pull request #11201:
URL: https://github.com/apache/kafka/pull/11201#discussion_r688286213



##
File path: 
clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
##
@@ -76,6 +77,21 @@ public void testJmxRegistration() throws Exception {
 }
 }
 
+@Test
+public void testMbeanTagOrdering() {
+Map tags = new HashMap<>();
+tags.put("tag_a", "x");
+tags.put("tag_b", "y");
+tags.put("tag_c", "z");
+tags.put("tag_d", "1,2");
+tags.put("tag_e", "");
+tags.put("tag_f", "3");

Review comment:
   Although `HashMap` doesn't guarantee the element order, the current test 
might make people think this test is passed due to the element insertion is 
key-ordered. Could you put the key in un-orderded sequence, so that we can make 
sure your fix works? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11201: MINOR: fix mbean tag name ordering in JMX reporter

2021-08-12 Thread GitBox


showuon commented on pull request #11201:
URL: https://github.com/apache/kafka/pull/11201#issuecomment-898234132


   Also, there are some `metrics` related tests failed. You might need to take 
a look.
   ```
   Build / ARM / 
org.apache.kafka.streams.processor.internals.StreamTaskTest.testMetrics
   Build / ARM / 
org.apache.kafka.streams.state.internals.MeteredKeyValueStoreTest.testMetrics
   Build / ARM / 
org.apache.kafka.streams.state.internals.MeteredSessionStoreTest.testMetrics
   Build / ARM / 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStoreTest.testMetrics
   Build / ARM / 
org.apache.kafka.streams.state.internals.MeteredWindowStoreTest.testMetrics
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.streams.processor.internals.StreamTaskTest.testMetrics
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.streams.state.internals.MeteredKeyValueStoreTest.testMetrics
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.streams.state.internals.MeteredSessionStoreTest.testMetrics
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStoreTest.testMetrics
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.streams.state.internals.MeteredWindowStoreTest.testMetrics
   Build / JDK 16 and Scala 2.13 / 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
   Build / JDK 16 and Scala 2.13 / 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
   Build / JDK 16 and Scala 2.13 / 
org.apache.kafka.streams.processor.internals.StreamTaskTest.testMetrics
   Build / JDK 16 and Scala 2.13 / 
org.apache.kafka.streams.state.internals.MeteredKeyValueStoreTest.testMetrics
   Build / JDK 16 and Scala 2.13 / 
org.apache.kafka.streams.state.internals.MeteredSessionStoreTest.testMetrics
   Build / JDK 16 and Scala 2.13 / 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStoreTest.testMetrics
   Build / JDK 16 and Scala 2.13 / 
org.apache.kafka.streams.state.internals.MeteredWindowStoreTest.testMetrics
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.processor.internals.StreamTaskTest.testMetrics
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.state.internals.MeteredKeyValueStoreTest.testMetrics
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.state.internals.MeteredSessionStoreTest.testMetrics
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStoreTest.testMetrics
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.state.internals.MeteredWindowStoreTest.testMetrics
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org