[jira] [Created] (KAFKA-10126) Remove unused options in ConsumerPerformance

2020-06-09 Thread jiamei xie (Jira)
jiamei xie created KAFKA-10126:
--

 Summary: Remove unused options in ConsumerPerformance
 Key: KAFKA-10126
 URL: https://issues.apache.org/jira/browse/KAFKA-10126
 Project: Kafka
  Issue Type: Bug
Reporter: jiamei xie
Assignee: jiamei xie


Option numThreadsOpt and numFetchersOpt are unused in ConsumerPerformance. It's 
a waste of time to test performance vs threads number. So removing it is needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7888) kafka cluster not recovering - Shrinking ISR from 14,13 to 13 (kafka.cluster.Partition) continously

2020-06-09 Thread Youssef BOUZAIENNE (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-7888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129031#comment-17129031
 ] 

Youssef BOUZAIENNE commented on KAFKA-7888:
---

I'm facing the same issue on 2.4.1

> kafka cluster not recovering - Shrinking ISR from 14,13 to 13 
> (kafka.cluster.Partition) continously
> ---
>
> Key: KAFKA-7888
> URL: https://issues.apache.org/jira/browse/KAFKA-7888
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, replication, zkclient
>Affects Versions: 2.1.0
> Environment: using kafka_2.12-2.1.0
> 3 ZKs 3 Broker cluster, using 3 boxes (1 ZK and 1 broker on each box), 
> default.replication factor: 2, 
> offset replication factor was 1 when the error happened, increased to 2 after 
> seeing this error by reassigning-partitions.
> compression: default (producer) on broker but sending gzip from producers.
> linux (redhat) etx4 kafka logs on single local disk
>Reporter: Kemal ERDEN
>Priority: Major
> Attachments: combined.log, producer.log
>
>
> we're seeing the following repeating logs on our kafka cluster from time to 
> time which seems to cause messages expiring on Producers and the cluster 
> going into a non-recoverable state. The only fix seems to be to restart 
> brokers.
> {{Shrinking ISR from 14,13 to 13 (kafka.cluster.Partition)}}
>  {{Cached zkVersion [21] not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)}}
>  and later on the following log is repeated:
> {{Got user-level KeeperException when processing sessionid:0xe046aa4f8e6 
> type:setData cxid:0x2df zxid:0xa01fd txntype:-1 reqpath:n/a Error 
> Path:/brokers/topics/ucTrade/partitions/6/state Error:KeeperErrorCode = 
> BadVersion for /brokers/topics/ucTrade/partitions/6/state}}
> We haven't interfered with any of the brokers/zookeepers whilst this happened.
> I've attached a combined log which represents a combination of controller, 
> server and state change logs from each broker (ids 13,14 and 15, log files 
> have the suffix b13, b14, b15 respectively)
> We have increased the heaps from 1g to 6g for the brokers and from 512m to 4g 
> for the zookeepers since this happened but not sure if it is relevant. the ZK 
> logs are unfortunately overwritten so can't provide those.
> We produce varying message sizes but some messages are relatively large (6mb) 
> but we use compression on the producers (set to gzip).
> I've attached some logs from one of our producers as well.
> producer.properties that we've changed:
> spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
> spring.kafka.producer.compression-type=gzip
> spring.kafka.producer.retries=5
> spring.kafka.producer.acks=-1
> spring.kafka.producer.batch-size=1048576
> spring.kafka.producer.properties.linger.ms=200
> spring.kafka.producer.properties.request.timeout.ms=60
> spring.kafka.producer.properties.max.block.ms=24
> spring.kafka.producer.properties.max.request.size=104857600
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2020-06-09 Thread Karthik (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Karthik updated KAFKA-7500:
---
Comment: was deleted

(was: Hi All, Im unable to find a document for MM2, Any leads to the 
documentation would be appreciated. Thanks in advance. )

> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Major
>  Labels: pull-request-available, ready-to-commit
> Fix For: 2.4.0
>
> Attachments: Active-Active XDCR setup.png
>
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
> [https://github.com/apache/kafka/pull/6295]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10126) Remove unused options in ConsumerPerformance

2020-06-09 Thread Chia-Ping Tsai (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129042#comment-17129042
 ] 

Chia-Ping Tsai commented on KAFKA-10126:


How about making numThreadsOpt work? maybe multiples consumers 

> Remove unused options in ConsumerPerformance
> 
>
> Key: KAFKA-10126
> URL: https://issues.apache.org/jira/browse/KAFKA-10126
> Project: Kafka
>  Issue Type: Bug
>Reporter: jiamei xie
>Assignee: jiamei xie
>Priority: Major
>
> Option numThreadsOpt and numFetchersOpt are unused in ConsumerPerformance. 
> It's a waste of time to test performance vs threads number. So removing it is 
> needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10127) kafka cluster not recovering - Shrinking ISR continously

2020-06-09 Thread Youssef BOUZAIENNE (Jira)
Youssef BOUZAIENNE created KAFKA-10127:
--

 Summary: kafka cluster not recovering - Shrinking ISR  continously
 Key: KAFKA-10127
 URL: https://issues.apache.org/jira/browse/KAFKA-10127
 Project: Kafka
  Issue Type: Bug
  Components: replication, zkclient
Affects Versions: 2.4.1
 Environment: using kafka version 2.4.1 and zookeeper version 3.5.7
Reporter: Youssef BOUZAIENNE


We are actually facing issue from time to time where our kafka cluster goes 
into a weird state where we see the following log repeating

 

[2020-06-06 08:35:48,117] INFO [Partition test broker=1002] Cached zkVersion 
620 not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2020-06-06 08:35:48,117] INFO [Partition test broker=1002] Shrinking ISR from 
1006,1002 to 1002. Leader: (highWatermark: 3222733572, endOffset: 3222741893). 
Out of sync replicas: (brokerId: 1006, endOffset: 3222733572). 
(kafka.cluster.Partition)

 

 

Before that our zookeeper session expired which lead us to that state 

 

after we increased this two value we encounter the issue less frequently but it 
still appears from time to time and the only solution is restart of kafka 
service on all brokers

zookeeper.session.timeout.ms=18000

replica.lag.time.max.ms=3

 

Any help on that please

 

  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10126) Remove unused options in ConsumerPerformance

2020-06-09 Thread jiamei xie (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129056#comment-17129056
 ] 

jiamei xie commented on KAFKA-10126:


Ok. Thanks. I'll have a look at it. Use consumer group?  It seems the consumer 
number can't be bigger than the partition number. So the thread number can't be 
bigger than partition number either. 

> Remove unused options in ConsumerPerformance
> 
>
> Key: KAFKA-10126
> URL: https://issues.apache.org/jira/browse/KAFKA-10126
> Project: Kafka
>  Issue Type: Bug
>Reporter: jiamei xie
>Assignee: jiamei xie
>Priority: Major
>
> Option numThreadsOpt and numFetchersOpt are unused in ConsumerPerformance. 
> It's a waste of time to test performance vs threads number. So removing it is 
> needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10126) Remove unused options in ConsumerPerformance

2020-06-09 Thread Chia-Ping Tsai (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129067#comment-17129067
 ] 

Chia-Ping Tsai commented on KAFKA-10126:


> Use consumer group?

yep. Should we enable user to control the number of groups ? (for example, 10 
consumers with 2 groups)

>  It seems the consumer number can't be bigger than the partition number.

maybe add new argument to create topic with specify number of partitions. It 
seems to me both numbers are important factor of performance.

> So the thread number can't be bigger than partition number either.

we can throw exception or print warning.

BTW, this issue requires a KIP if you want to remove/add arguments :)

> Remove unused options in ConsumerPerformance
> 
>
> Key: KAFKA-10126
> URL: https://issues.apache.org/jira/browse/KAFKA-10126
> Project: Kafka
>  Issue Type: Bug
>Reporter: jiamei xie
>Assignee: jiamei xie
>Priority: Major
>
> Option numThreadsOpt and numFetchersOpt are unused in ConsumerPerformance. 
> It's a waste of time to test performance vs threads number. So removing it is 
> needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10105) Regression in group coordinator dealing with flaky clients joining while leaving

2020-06-09 Thread William Reynolds (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129070#comment-17129070
 ] 

William Reynolds commented on KAFKA-10105:
--

Hi James, that looks really similar. If it isn't the exact thing I would 
suspect that it is another symptom of the group coord change that we hit

> Regression in group coordinator dealing with flaky clients joining while 
> leaving
> 
>
> Key: KAFKA-10105
> URL: https://issues.apache.org/jira/browse/KAFKA-10105
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.1
> Environment: Kafka 1.1.0 on jre 8 on debian 9 in docker
> Kafka 2.4.1 on jre 11 on debian 9 in docker
>Reporter: William Reynolds
>Priority: Major
>
> Since upgrade of a cluster from 1.1.0 to 2.4.1 the broker no longer deals 
> correctly with a consumer sending a join after a leave correctly.
> What happens no is that if a consumer sends a leaving then follows up by 
> trying to send a join again as it is shutting down the group coordinator adds 
> the leaving member to the group but never seems to heartbeat that member.
> Since the consumer is then gone when it joins again after starting it is 
> added as a new member but the zombie member is there and is included in the 
> partition assignment which means that those partitions never get consumed 
> from. What can also happen is that one of the zombies gets group leader so 
> rebalance gets stuck forever and the group is entirely blocked.
> I have not been able to track down where this got introduced between 1.1.0 
> and 2.4.1 but I will look further into this. Unfortunately the logs are 
> essentially silent about the zombie mebers and I only had INFO level logging 
> on during the issue and by stopping all the consumers in the group and 
> restarting the broker coordinating that group we could get back to a working 
> state.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9935) Kafka not releasing member from Consumer Group

2020-06-09 Thread William Reynolds (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-9935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129072#comment-17129072
 ] 

William Reynolds commented on KAFKA-9935:
-

It does look related to me. Your reproduction looks similar to what we hit and 
also what it looks like to the tools

> Kafka not releasing member from Consumer Group
> --
>
> Key: KAFKA-9935
> URL: https://issues.apache.org/jira/browse/KAFKA-9935
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.1
> Environment: Linux
>Reporter: Steve Kecskes
>Priority: Major
>
> Hello. I am experiencing an issue where Kafka is not releasing members from a 
> consumer group when the member crashes. The consumer group is then stuck in 
> rebalancing state indefinitely.
> In this consumer group, there is only 1 client. This client has the following 
> related settings:
> {code:java}
> auto.commit.interval.ms = 5000
>  auto.offset.reset = earliest
>  bootstrap.servers = [austgkafka01.hk.eclipseoptions.com:9092]
>  check.crcs = true
>  client.dns.lookup = default
>  client.id = TraderAutomationViewServer_workaround_stuck_rebalance_20200427-0
>  connections.max.idle.ms = 54
>  default.api.timeout.ms = 6
>  enable.auto.commit = true
>  exclude.internal.topics = true
>  fetch.max.bytes = 52428800
>  fetch.max.wait.ms = 500
>  fetch.min.bytes = 1
>  group.id = TraderAutomationViewServer_workaround_stuck_rebalance_20200427
>  heartbeat.interval.ms = 3000
>  interceptor.classes = []
>  internal.leave.group.on.close = true
>  isolation.level = read_uncommitted
>  key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>  max.partition.fetch.bytes = 1048576
>  max.poll.interval.ms = 30
>  max.poll.records = 1
>  metadata.max.age.ms = 30
>  metric.reporters = []
>  metrics.num.samples = 2
>  metrics.recording.level = INFO
>  metrics.sample.window.ms = 3
>  partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
>  receive.buffer.bytes = 16777216
>  reconnect.backoff.max.ms = 1000
>  reconnect.backoff.ms = 50
>  request.timeout.ms = 3
>  retry.backoff.ms = 100
>  sasl.client.callback.handler.class = null
>  sasl.jaas.config = null
>  sasl.kerberos.kinit.cmd = /usr/bin/kinit
>  sasl.kerberos.min.time.before.relogin = 6
>  sasl.kerberos.service.name = null
>  sasl.kerberos.ticket.renew.jitter = 0.05
>  sasl.kerberos.ticket.renew.window.factor = 0.8
>  sasl.login.callback.handler.class = null
>  sasl.login.class = null
>  sasl.login.refresh.buffer.seconds = 300
>  sasl.login.refresh.min.period.seconds = 60
>  sasl.login.refresh.window.factor = 0.8
>  sasl.login.refresh.window.jitter = 0.05
>  sasl.mechanism = GSSAPI
>  security.protocol = PLAINTEXT
>  send.buffer.bytes = 131072
>  session.timeout.ms = 1
>  ssl.cipher.suites = null
>  ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>  ssl.endpoint.identification.algorithm = https
>  ssl.key.password = null
>  ssl.keymanager.algorithm = SunX509
>  ssl.keystore.location = null
>  ssl.keystore.password = null
>  ssl.keystore.type = JKS
>  ssl.protocol = TLS
>  ssl.provider = null
>  ssl.secure.random.implementation = null
>  ssl.trustmanager.algorithm = PKIX
>  ssl.truststore.location = null
>  ssl.truststore.password = null
>  ssl.truststore.type = JKS
>  value.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
> {code}
> If the client crashes (not a graceful exit from group) the group remains in 
> the following state indefinitely.
> {code}
> Warning: Consumer group 
> 'TraderAutomationViewServer_workaround_stuck_rebalance' is rebalancing.
> GROUP TOPIC 
> PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG CONSUMER-ID 
> HOSTCLIENT-ID
> TraderAutomationViewServer_workaround_stuck_rebalance EventAdjustedVols 10
>  6984061 7839599 855538  -   -
>-
> TraderAutomationViewServer_workaround_stuck_rebalance VolMetrics8 
>  128459531   143736443   15276912-   -
>-
> TraderAutomationViewServer_workaround_stuck_rebalance EventAdjustedVols 12
>  7216495 8106030 889535  -   -
>-
> TraderAutomationViewServer_workaround_stuck_rebalance VolMetrics6 
>  122921729   137377358   14455629-   -
>-
> TraderAutomationViewServer_workaround_stuck_rebalance EventAdjustedVols 14
>  5457618 6171142 713524  -   -
>-
> TraderAutomationViewServer_workaround_stuck_rebalance VolMetrics4 
>  125647891

[jira] [Commented] (KAFKA-10107) Producer snapshots LSO used in certain situations which can lead to data loss on compacted topics as LSO breach occurs and early offsets cleaned

2020-06-09 Thread William Reynolds (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129077#comment-17129077
 ] 

William Reynolds commented on KAFKA-10107:
--

Still working on getting the logs out, apologies for delay

> Producer snapshots LSO used in certain situations which can lead to data loss 
> on compacted topics as LSO breach occurs and early offsets cleaned
> 
>
> Key: KAFKA-10107
> URL: https://issues.apache.org/jira/browse/KAFKA-10107
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log cleaner
>Affects Versions: 2.4.1
> Environment: Kafka 1.1.0 on jre 8 on debian 9 in docker
> Kafka 2.4.1 on jre 11 on debian 9 in docker
>Reporter: William Reynolds
>Priority: Major
>
> While upgading a 1.1.0 cluster to 2.4.1 and also adding an interbroker port 
> using SSL we ran into a situation where producer snapshot offsets get set as 
> the log start offset then logs truncate to nothing across 2 relatively unsafe 
> restarts.
>  
> Here is the timeline of what we did to trigger this
> Broker 40 is shutdown as first to go to 2.4.1 and switch to interbroker port 
> 9094.
>  As it shuts down it writes producer snapshots
>  Broker 40 starts on 2.4.1, loads the snapshots then compares checkpointed 
> offsets to log start offset and finds them to be invalid (exact reason 
> unknown but looks to be producer snapshot load related)
>  On broker 40 all topics show an offset reset like this 2020-05-18 
> 15:22:21,106] WARN Resetting first dirty offset of topic-name-60 to log start 
> offset 6009368 since the checkpointed offset 5952382 is invalid. 
> (kafka.log.LogCleanerManager$)" which then triggers log cleanup on broker 40 
> for all these topics which is where the data is lost
>  At this point only partitions led by broker 40 have lost data and would be 
> failing for client lookups on older data but this can't spread as 40 has 
> interbroker port 9094 and brokers 50 and 60 have interbroker port 9092
>  I stop start brokers 50 and 60 in quick succession to take them to 2.4.1 and 
> onto the new interbroker port 9094
>  This leaves broker 40 as the in sync replica for all but a couple of 
> partitions which aren't on 40 at all shown in the attached image
>  Brokers 50 and 60 start and then take their start offset from leader (or if 
> there was no leader pulls from recovery on returning broker 50 or 60) and so 
> all the replicas also clean logs to remove data to catch up to broker 40 as 
> that is the in sync replica
>  Then I shutdown 40 and 50 leading to 60 leading all partitions it holds and 
> then we see this happen across all of those partitions
>  "May 18, 2020 @ 
> 15:48:28.252",hostname-1,30438,apache-kafka:2.4.1,"[2020-05-18 15:48:28,251] 
> INFO [Log partition=topic-name-60, dir=/kafka-topic-data] Loading producer 
> state till offset 0 with message format version 2 (kafka.log.Log)" 
>  "May 18, 2020 @ 
> 15:48:28.252",hostname-1,30438,apache-kafka:2.4.1,"[2020-05-18 15:48:28,252] 
> INFO [Log partition=topic-name-60, dir=/kafka-topic-data] Completed load of 
> log with 1 segments, log start offset 0 and log end offset 0 in 2 ms 
> (kafka.log.Log)"
>  "May 18, 2020 @ 15:48:45.883",hostname,7805,apache-kafka:2.4.1,"[2020-05-18 
> 15:48:45,883] WARN [ReplicaFetcher replicaId=50, leaderId=60, fetcherId=0] 
> Leader or replica is on protocol version where leader epoch is not considered 
> in the OffsetsForLeaderEpoch response. The leader's offset 0 will be used for 
> truncation in topic-name-60. (kafka.server.ReplicaFetcherThread)" 
>  "May 18, 2020 @ 15:48:45.883",hostname,7805,apache-kafka:2.4.1,"[2020-05-18 
> 15:48:45,883] INFO [Log partition=topic-name-60, dir=/kafka-topic-data] 
> Truncating to offset 0 (kafka.log.Log)"
>  
> I believe the truncation has always been a problem but recent 
> https://issues.apache.org/jira/browse/KAFKA-6266 fix allowed truncation to 
> actually happen where it wouldn't have before. 
>  The producer snapshots setting as log start offset is a mystery to me so any 
> light you could shed on why that yhappened and how to avoid would be great.
>  
> I am sanitising full logs and will upload here soon



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10126) Remove unused options in ConsumerPerformance

2020-06-09 Thread jiamei xie (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129079#comment-17129079
 ] 

jiamei xie commented on KAFKA-10126:


Thanks. Got it.  What's about numFetchersOpt? Can it be removed?

> Remove unused options in ConsumerPerformance
> 
>
> Key: KAFKA-10126
> URL: https://issues.apache.org/jira/browse/KAFKA-10126
> Project: Kafka
>  Issue Type: Bug
>Reporter: jiamei xie
>Assignee: jiamei xie
>Priority: Major
>
> Option numThreadsOpt and numFetchersOpt are unused in ConsumerPerformance. 
> It's a waste of time to test performance vs threads number. So removing it is 
> needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10126) Remove unused options in ConsumerPerformance

2020-06-09 Thread jiamei xie (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129108#comment-17129108
 ] 

jiamei xie commented on KAFKA-10126:


> maybe add new argument to create topic with specify number of partitions. It 
> seems to me both numbers are an important factor of performance.
 It seems that the topic could be is created by the producer or 
kafka-topics.sh.  So is proper to add new arguments to create topic ?

> Remove unused options in ConsumerPerformance
> 
>
> Key: KAFKA-10126
> URL: https://issues.apache.org/jira/browse/KAFKA-10126
> Project: Kafka
>  Issue Type: Bug
>Reporter: jiamei xie
>Assignee: jiamei xie
>Priority: Major
>
> Option numThreadsOpt and numFetchersOpt are unused in ConsumerPerformance. 
> It's a waste of time to test performance vs threads number. So removing it is 
> needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10126) Remove unused options in ConsumerPerformance

2020-06-09 Thread jiamei xie (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129108#comment-17129108
 ] 

jiamei xie edited comment on KAFKA-10126 at 6/9/20, 10:11 AM:
--

> maybe add new argument to create topic with specify number of partitions. It 
> seems to me both numbers are an important factor of performance.
 It seems that the topic could be created by the producer or kafka-topics.sh.  
So is proper to add new arguments to create topic ?


was (Author: adally):
> maybe add new argument to create topic with specify number of partitions. It 
> seems to me both numbers are an important factor of performance.
 It seems that the topic could be is created by the producer or 
kafka-topics.sh.  So is proper to add new arguments to create topic ?

> Remove unused options in ConsumerPerformance
> 
>
> Key: KAFKA-10126
> URL: https://issues.apache.org/jira/browse/KAFKA-10126
> Project: Kafka
>  Issue Type: Bug
>Reporter: jiamei xie
>Assignee: jiamei xie
>Priority: Major
>
> Option numThreadsOpt and numFetchersOpt are unused in ConsumerPerformance. 
> It's a waste of time to test performance vs threads number. So removing it is 
> needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10126) Remove unused options in ConsumerPerformance

2020-06-09 Thread jiamei xie (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129108#comment-17129108
 ] 

jiamei xie edited comment on KAFKA-10126 at 6/9/20, 10:12 AM:
--

> maybe add new argument to create topic with specify number of partitions. It 
> seems to me both numbers are an important factor of performance.
 It seems that the topic could be created by the producer or kafka-topics.sh.  
So is it proper to add new arguments to create topic ?


was (Author: adally):
> maybe add new argument to create topic with specify number of partitions. It 
> seems to me both numbers are an important factor of performance.
 It seems that the topic could be created by the producer or kafka-topics.sh.  
So is proper to add new arguments to create topic ?

> Remove unused options in ConsumerPerformance
> 
>
> Key: KAFKA-10126
> URL: https://issues.apache.org/jira/browse/KAFKA-10126
> Project: Kafka
>  Issue Type: Bug
>Reporter: jiamei xie
>Assignee: jiamei xie
>Priority: Major
>
> Option numThreadsOpt and numFetchersOpt are unused in ConsumerPerformance. 
> It's a waste of time to test performance vs threads number. So removing it is 
> needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10128) MM2 - Delete topics when config sync is enabled

2020-06-09 Thread Karthik (Jira)
Karthik created KAFKA-10128:
---

 Summary: MM2 - Delete topics when config sync is enabled
 Key: KAFKA-10128
 URL: https://issues.apache.org/jira/browse/KAFKA-10128
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect, mirrormaker
Affects Versions: 2.5.0
Reporter: Karthik


Topics being deleted on one region is not being deleted and with that its being 
recreated incase of a active-active deployment

 

Logs:

*Test Check delete Topic*

 

*Delete Command*

/opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --delete 
--topic im115

 

*Source cluster - Before*

/opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list

__consumer_offsets

heartbeats

im115

im32

mm2-configs.us-east.internal

mm2-offset-syncs.us-east.internal

mm2-offsets.us-east.internal

mm2-status.us-east.internal

us-east.checkpoints.internal

us-east.heartbeats

us-east.im115

us-east.us-east-offsets

us-west-offsets

 

*Source cluster -* *After delete*

/opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list

__consumer_offsets

heartbeats

im32

mm2-configs.us-east.internal

mm2-offset-syncs.us-east.internal

mm2-offsets.us-east.internal

mm2-status.us-east.internal

us-east.checkpoints.internal

us-east.heartbeats

us-east.im115

us-east.us-east-offsets

us-west-offsets

 

*Dest Cluster - Before*

/opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.3.115:9092 --list 
    __consumer_offsets

heartbeats

im115

mm2-configs.us-west.internal

mm2-offset-syncs.us-west.internal

mm2-offsets.us-west.internal

mm2-status.us-west.internal

us-east-offsets

us-west.checkpoints.internal

us-west.heartbeats

us-west.im115

us-west.im32

us-west.us-west-offsets

 ** 

*Dest Cluster -* *After Delete*

*Did not delete*

/opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.3.115:9092 --list 
__consumer_offsets
heartbeats
im115
mm2-configs.us-west.internal
mm2-offset-syncs.us-west.internal
mm2-offsets.us-west.internal
mm2-status.us-west.internal
us-east-offsets
us-west.checkpoints.internal
us-west.heartbeats
us-west.im115
us-west.im32
us-west.us-west-offsets

 

With that after the config refresh mins, the deleted topic is being replicated 
back on the source cluster

/opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list
__consumer_offsets
heartbeats
im115
im32
mm2-configs.us-east.internal
mm2-offset-syncs.us-east.internal
mm2-offsets.us-east.internal
mm2-status.us-east.internal
us-east.checkpoints.internal
us-east.heartbeats
us-east.im115
us-east.us-east-offsets
us-west-offsets



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10128) MM2 - Delete topics when config sync is enabled

2020-06-09 Thread Karthik (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129139#comment-17129139
 ] 

Karthik commented on KAFKA-10128:
-

It also the other way around where even deleted mirrored topic is also being 
recreated

> MM2 - Delete topics when config sync is enabled
> ---
>
> Key: KAFKA-10128
> URL: https://issues.apache.org/jira/browse/KAFKA-10128
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 2.5.0
>Reporter: Karthik
>Priority: Minor
>
> Topics being deleted on one region is not being deleted and with that its 
> being recreated incase of a active-active deployment
>  
> Logs:
> *Test Check delete Topic*
>  
> *Delete Command*
> /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --delete 
> --topic im115
>  
> *Source cluster - Before*
> /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list
> __consumer_offsets
> heartbeats
> im115
> im32
> mm2-configs.us-east.internal
> mm2-offset-syncs.us-east.internal
> mm2-offsets.us-east.internal
> mm2-status.us-east.internal
> us-east.checkpoints.internal
> us-east.heartbeats
> us-east.im115
> us-east.us-east-offsets
> us-west-offsets
>  
> *Source cluster -* *After delete*
> /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list
> __consumer_offsets
> heartbeats
> im32
> mm2-configs.us-east.internal
> mm2-offset-syncs.us-east.internal
> mm2-offsets.us-east.internal
> mm2-status.us-east.internal
> us-east.checkpoints.internal
> us-east.heartbeats
> us-east.im115
> us-east.us-east-offsets
> us-west-offsets
>  
> *Dest Cluster - Before*
> /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.3.115:9092 --list   
>   __consumer_offsets
> heartbeats
> im115
> mm2-configs.us-west.internal
> mm2-offset-syncs.us-west.internal
> mm2-offsets.us-west.internal
> mm2-status.us-west.internal
> us-east-offsets
> us-west.checkpoints.internal
> us-west.heartbeats
> us-west.im115
> us-west.im32
> us-west.us-west-offsets
>  ** 
> *Dest Cluster -* *After Delete*
> *Did not delete*
> /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.3.115:9092 --list 
> __consumer_offsets
> heartbeats
> im115
> mm2-configs.us-west.internal
> mm2-offset-syncs.us-west.internal
> mm2-offsets.us-west.internal
> mm2-status.us-west.internal
> us-east-offsets
> us-west.checkpoints.internal
> us-west.heartbeats
> us-west.im115
> us-west.im32
> us-west.us-west-offsets
>  
> With that after the config refresh mins, the deleted topic is being 
> replicated back on the source cluster
> /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list
> __consumer_offsets
> heartbeats
> im115
> im32
> mm2-configs.us-east.internal
> mm2-offset-syncs.us-east.internal
> mm2-offsets.us-east.internal
> mm2-status.us-east.internal
> us-east.checkpoints.internal
> us-east.heartbeats
> us-east.im115
> us-east.us-east-offsets
> us-west-offsets



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10128) MM2 - Delete topics when config sync is enabled

2020-06-09 Thread Karthik (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129139#comment-17129139
 ] 

Karthik edited comment on KAFKA-10128 at 6/9/20, 11:34 AM:
---

It also the other way around where even deleted mirrored topic is also being 
recreated.

Update:

Even deleting all the copies of topics are being recreated after refresh

 

*using third cluster for MM2


was (Author: kurs):
It also the other way around where even deleted mirrored topic is also being 
recreated

> MM2 - Delete topics when config sync is enabled
> ---
>
> Key: KAFKA-10128
> URL: https://issues.apache.org/jira/browse/KAFKA-10128
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 2.5.0
>Reporter: Karthik
>Priority: Minor
>
> Topics being deleted on one region is not being deleted and with that its 
> being recreated incase of a active-active deployment
>  
> Logs:
> *Test Check delete Topic*
>  
> *Delete Command*
> /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --delete 
> --topic im115
>  
> *Source cluster - Before*
> /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list
> __consumer_offsets
> heartbeats
> im115
> im32
> mm2-configs.us-east.internal
> mm2-offset-syncs.us-east.internal
> mm2-offsets.us-east.internal
> mm2-status.us-east.internal
> us-east.checkpoints.internal
> us-east.heartbeats
> us-east.im115
> us-east.us-east-offsets
> us-west-offsets
>  
> *Source cluster -* *After delete*
> /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list
> __consumer_offsets
> heartbeats
> im32
> mm2-configs.us-east.internal
> mm2-offset-syncs.us-east.internal
> mm2-offsets.us-east.internal
> mm2-status.us-east.internal
> us-east.checkpoints.internal
> us-east.heartbeats
> us-east.im115
> us-east.us-east-offsets
> us-west-offsets
>  
> *Dest Cluster - Before*
> /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.3.115:9092 --list   
>   __consumer_offsets
> heartbeats
> im115
> mm2-configs.us-west.internal
> mm2-offset-syncs.us-west.internal
> mm2-offsets.us-west.internal
> mm2-status.us-west.internal
> us-east-offsets
> us-west.checkpoints.internal
> us-west.heartbeats
> us-west.im115
> us-west.im32
> us-west.us-west-offsets
>  ** 
> *Dest Cluster -* *After Delete*
> *Did not delete*
> /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.3.115:9092 --list 
> __consumer_offsets
> heartbeats
> im115
> mm2-configs.us-west.internal
> mm2-offset-syncs.us-west.internal
> mm2-offsets.us-west.internal
> mm2-status.us-west.internal
> us-east-offsets
> us-west.checkpoints.internal
> us-west.heartbeats
> us-west.im115
> us-west.im32
> us-west.us-west-offsets
>  
> With that after the config refresh mins, the deleted topic is being 
> replicated back on the source cluster
> /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list
> __consumer_offsets
> heartbeats
> im115
> im32
> mm2-configs.us-east.internal
> mm2-offset-syncs.us-east.internal
> mm2-offsets.us-east.internal
> mm2-status.us-east.internal
> us-east.checkpoints.internal
> us-east.heartbeats
> us-east.im115
> us-east.us-east-offsets
> us-west-offsets



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10128) MM2 - Delete topics when config sync is enabled

2020-06-09 Thread Karthik (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-10128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Karthik updated KAFKA-10128:

Reviewer: Ryanne Dolan

> MM2 - Delete topics when config sync is enabled
> ---
>
> Key: KAFKA-10128
> URL: https://issues.apache.org/jira/browse/KAFKA-10128
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 2.5.0
>Reporter: Karthik
>Priority: Minor
>
> Topics being deleted on one region is not being deleted and with that its 
> being recreated incase of a active-active deployment
>  
> Logs:
> *Test Check delete Topic*
>  
> *Delete Command*
> /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --delete 
> --topic im115
>  
> *Source cluster - Before*
> /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list
> __consumer_offsets
> heartbeats
> im115
> im32
> mm2-configs.us-east.internal
> mm2-offset-syncs.us-east.internal
> mm2-offsets.us-east.internal
> mm2-status.us-east.internal
> us-east.checkpoints.internal
> us-east.heartbeats
> us-east.im115
> us-east.us-east-offsets
> us-west-offsets
>  
> *Source cluster -* *After delete*
> /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list
> __consumer_offsets
> heartbeats
> im32
> mm2-configs.us-east.internal
> mm2-offset-syncs.us-east.internal
> mm2-offsets.us-east.internal
> mm2-status.us-east.internal
> us-east.checkpoints.internal
> us-east.heartbeats
> us-east.im115
> us-east.us-east-offsets
> us-west-offsets
>  
> *Dest Cluster - Before*
> /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.3.115:9092 --list   
>   __consumer_offsets
> heartbeats
> im115
> mm2-configs.us-west.internal
> mm2-offset-syncs.us-west.internal
> mm2-offsets.us-west.internal
> mm2-status.us-west.internal
> us-east-offsets
> us-west.checkpoints.internal
> us-west.heartbeats
> us-west.im115
> us-west.im32
> us-west.us-west-offsets
>  ** 
> *Dest Cluster -* *After Delete*
> *Did not delete*
> /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.3.115:9092 --list 
> __consumer_offsets
> heartbeats
> im115
> mm2-configs.us-west.internal
> mm2-offset-syncs.us-west.internal
> mm2-offsets.us-west.internal
> mm2-status.us-west.internal
> us-east-offsets
> us-west.checkpoints.internal
> us-west.heartbeats
> us-west.im115
> us-west.im32
> us-west.us-west-offsets
>  
> With that after the config refresh mins, the deleted topic is being 
> replicated back on the source cluster
> /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list
> __consumer_offsets
> heartbeats
> im115
> im32
> mm2-configs.us-east.internal
> mm2-offset-syncs.us-east.internal
> mm2-offsets.us-east.internal
> mm2-status.us-east.internal
> us-east.checkpoints.internal
> us-east.heartbeats
> us-east.im115
> us-east.us-east-offsets
> us-west-offsets



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation

2020-06-09 Thread Cheng Tan (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Tan updated KAFKA-9800:
-
Description: 
Design:

The main idea is to bookkeep the failed attempts. Making those class containing 
retriable data inherit from an abstract class {{Retriable.}}This class will 
record the number of failed attempts. I already wrapped the exponential 
backoff/timeout util class in my KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the and returns the backoff/timeout value at the corresponding 
level. There’re two main usage patterns.

{{}}

{{}}
 # For those async retries, the data often stays in a queue. We will make the 
class inherit from the {{Retriable}} and record failure when a 
{{RetriableException}} happens.
 # For those synchronous retires, the backoff is often implemented in a 
blocking poll/loop, we won’t need the inheritance and will just record the 
failedAttempts using a local variable of generic data type (Long).

Producer side:
 # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each 
ProducerBatch, which already has an attribute attempts recording the number of 
failed attempts.
 # 

 

 

{{}}

  was:In {{KafkaAdminClient}}, we will have to modify the way the retry backoff 
is calculated for the calls that have failed and need to be retried. From the 
current static retry backoff, we have to introduce a mechanism for all calls 
that upon failure, the next retry time is dynamically calculated.


> [KIP-580] Client Exponential Backoff Implementation
> ---
>
> Key: KAFKA-9800
> URL: https://issues.apache.org/jira/browse/KAFKA-9800
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Cheng Tan
>Assignee: Cheng Tan
>Priority: Major
>  Labels: KIP-580
>
> Design:
> The main idea is to bookkeep the failed attempts. Making those class 
> containing retriable data inherit from an abstract class {{Retriable.}}This 
> class will record the number of failed attempts. I already wrapped the 
> exponential backoff/timeout util class in my KIP-601 
> [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
>  which takes the and returns the backoff/timeout value at the corresponding 
> level. There’re two main usage patterns.
> {{}}
> {{}}
>  # For those async retries, the data often stays in a queue. We will make the 
> class inherit from the {{Retriable}} and record failure when a 
> {{RetriableException}} happens.
>  # For those synchronous retires, the backoff is often implemented in a 
> blocking poll/loop, we won’t need the inheritance and will just record the 
> failedAttempts using a local variable of generic data type (Long).
> Producer side:
>  # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each 
> ProducerBatch, which already has an attribute attempts recording the number 
> of failed attempts.
>  # 
>  
>  
> {{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation

2020-06-09 Thread Cheng Tan (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Tan updated KAFKA-9800:
-
Description: 
Design:

The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
has two main usage patterns:
 # Synchronous retires and blocking loop. The thread will sleep in each 
iteration for 
 # Async retries. In each polling, the retries do not meet the backoff will be 
filtered. The data class often maintains a 1:1 mapping to a set of requests 
which are logically associated. (i.e. contains only one initial request and 
only its retries.)

For type 1, we can utilize a local failure counter of a Java generic data type.

For case 2, we can make those classes containing retriable data inherit from an 
abstract class Retriable. Retriable will implement interfaces recording the 
number of failed attempts. I already wrapped the exponential backoff/timeout 
util class in my KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the number of failures and returns the backoff/timeout value at 
the corresponding level.

Currently, the retry backoff has two main usage patterns.
 # For those async retries, the data often stays in a queue. We will make the 
class inherit from the {{Retriable}} and record failure when a 
{{RetriableException}} happens.
 # For those synchronous retires, the backoff is often implemented in a 
blocking poll/loop, we won’t need the inheritance and will just record the 
failed attempts using a local variable of generic data type (Long).

Producer side:
 # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each 
ProducerBatch in Accumulator, which already has an attribute attempts recording 
the number of failed attempts.
 # Transaction request (API_KEY..*TXN). TxnRequestHandler will inherit from 
Retriable and record each failed attempts, which will .
 # 

 

 

{{}}

  was:
Design:

The main idea is to bookkeep the failed attempts. Making those class containing 
retriable data inherit from an abstract class {{Retriable.}}This class will 
record the number of failed attempts. I already wrapped the exponential 
backoff/timeout util class in my KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the and returns the backoff/timeout value at the corresponding 
level. There’re two main usage patterns.

{{}}

{{}}
 # For those async retries, the data often stays in a queue. We will make the 
class inherit from the {{Retriable}} and record failure when a 
{{RetriableException}} happens.
 # For those synchronous retires, the backoff is often implemented in a 
blocking poll/loop, we won’t need the inheritance and will just record the 
failedAttempts using a local variable of generic data type (Long).

Producer side:
 # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each 
ProducerBatch, which already has an attribute attempts recording the number of 
failed attempts.
 # 

 

 

{{}}


> [KIP-580] Client Exponential Backoff Implementation
> ---
>
> Key: KAFKA-9800
> URL: https://issues.apache.org/jira/browse/KAFKA-9800
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Cheng Tan
>Assignee: Cheng Tan
>Priority: Major
>  Labels: KIP-580
>
> Design:
> The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
> has two main usage patterns:
>  # Synchronous retires and blocking loop. The thread will sleep in each 
> iteration for 
>  # Async retries. In each polling, the retries do not meet the backoff will 
> be filtered. The data class often maintains a 1:1 mapping to a set of 
> requests which are logically associated. (i.e. contains only one initial 
> request and only its retries.)
> For type 1, we can utilize a local failure counter of a Java generic data 
> type.
> For case 2, we can make those classes containing retriable data inherit from 
> an abstract class Retriable. Retriable will implement interfaces recording 
> the number of failed attempts. I already wrapped the exponential 
> backoff/timeout util class in my KIP-601 
> [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
>  which takes the number of failures and returns the backoff/timeout value at 
> the corresponding level.
> Currently, the retry backoff has two main usage patterns.
>  # For those async retries, the data often stays in a queue. We will make the 
> class inherit from the {{Retriable}} and record failure when a 
> {{RetriableException}} happens.
>  # For those synchronous retires, the backoff is often implemented in a 
> blocking poll/loop, we won’t need the inheritance and will just record the 
> 

[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation

2020-06-09 Thread Cheng Tan (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Tan updated KAFKA-9800:
-
Description: 
Design:

The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
has two main usage patterns:
 # Synchronous retires and blocking loop. The thread will sleep in each 
iteration for 
 # Async retries. In each polling, the retries do not meet the backoff will be 
filtered. The data class often maintains a 1:1 mapping to a set of requests 
which are logically associated. (i.e. a set contains only one initial request 
and only its retries.)

For type 1, we can utilize a local failure counter of a Java generic data type.

For case 2, we can make those classes containing retriable data inherit from an 
abstract class Retriable. Retriable will implement interfaces recording the 
number of failed attempts. I already wrapped the exponential backoff/timeout 
util class in my KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the number of failures and returns the backoff/timeout value at 
the corresponding level.

Currently, the retry backoff has two main usage patterns.
 # For those async retries, the data often stays in a queue. We will make the 
class inherit from the {{Retriable}} and record failure when a 
{{RetriableException}} happens.
 # For those synchronous retires, the backoff is often implemented in a 
blocking poll/loop, we won’t need the inheritance and will just record the 
failed attempts using a local variable of generic data type (Long).

Producer side:
 # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each 
ProducerBatch in Accumulator, which already has an attribute attempts recording 
the number of failed attempts.
 # Transaction request (API_KEY..*TXN). TxnRequestHandler will inherit from 
Retriable and record each failed attempts, which will .
 # 

 

 

{{}}

  was:
Design:

The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
has two main usage patterns:
 # Synchronous retires and blocking loop. The thread will sleep in each 
iteration for 
 # Async retries. In each polling, the retries do not meet the backoff will be 
filtered. The data class often maintains a 1:1 mapping to a set of requests 
which are logically associated. (i.e. contains only one initial request and 
only its retries.)

For type 1, we can utilize a local failure counter of a Java generic data type.

For case 2, we can make those classes containing retriable data inherit from an 
abstract class Retriable. Retriable will implement interfaces recording the 
number of failed attempts. I already wrapped the exponential backoff/timeout 
util class in my KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the number of failures and returns the backoff/timeout value at 
the corresponding level.

Currently, the retry backoff has two main usage patterns.
 # For those async retries, the data often stays in a queue. We will make the 
class inherit from the {{Retriable}} and record failure when a 
{{RetriableException}} happens.
 # For those synchronous retires, the backoff is often implemented in a 
blocking poll/loop, we won’t need the inheritance and will just record the 
failed attempts using a local variable of generic data type (Long).

Producer side:
 # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each 
ProducerBatch in Accumulator, which already has an attribute attempts recording 
the number of failed attempts.
 # Transaction request (API_KEY..*TXN). TxnRequestHandler will inherit from 
Retriable and record each failed attempts, which will .
 # 

 

 

{{}}


> [KIP-580] Client Exponential Backoff Implementation
> ---
>
> Key: KAFKA-9800
> URL: https://issues.apache.org/jira/browse/KAFKA-9800
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Cheng Tan
>Assignee: Cheng Tan
>Priority: Major
>  Labels: KIP-580
>
> Design:
> The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
> has two main usage patterns:
>  # Synchronous retires and blocking loop. The thread will sleep in each 
> iteration for 
>  # Async retries. In each polling, the retries do not meet the backoff will 
> be filtered. The data class often maintains a 1:1 mapping to a set of 
> requests which are logically associated. (i.e. a set contains only one 
> initial request and only its retries.)
> For type 1, we can utilize a local failure counter of a Java generic data 
> type.
> For case 2, we can make those classes containing retriable data inherit from 
> an abstract class Retriable. Retriable will implement interfaces recording 
> the numb

[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation

2020-06-09 Thread Cheng Tan (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Tan updated KAFKA-9800:
-
Description: 
Design:

The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
has two main usage patterns:
 # Synchronous retires and blocking loop. The thread will sleep in each 
iteration for 
 # Async retries. In each polling, the retries do not meet the backoff will be 
filtered. The data class often maintains a 1:1 mapping to a set of requests 
which are logically associated. (i.e. a set contains only one initial request 
and only its retries.)

For type 1, we can utilize a local failure counter of a Java generic data type.

For case 2, we can make those classes containing retriable data inherit from an 
abstract class Retriable. Retriable will implement interfaces recording the 
number of failed attempts. I already wrapped the exponential backoff/timeout 
util class in my KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the number of failures and returns the backoff/timeout value at 
the corresponding level.

 

Changes:

KafkaProducer:
 # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each 
ProducerBatch in Accumulator, which already has an attribute attempts recording 
the number of failed attempts.
 # Transaction request (API_KEY..*TXN). TxnRequestHandler will inherit from 
Retriable and record each failed attempt.

KafkaConsumer

 

 

  was:
Design:

The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
has two main usage patterns:
 # Synchronous retires and blocking loop. The thread will sleep in each 
iteration for 
 # Async retries. In each polling, the retries do not meet the backoff will be 
filtered. The data class often maintains a 1:1 mapping to a set of requests 
which are logically associated. (i.e. a set contains only one initial request 
and only its retries.)

For type 1, we can utilize a local failure counter of a Java generic data type.

For case 2, we can make those classes containing retriable data inherit from an 
abstract class Retriable. Retriable will implement interfaces recording the 
number of failed attempts. I already wrapped the exponential backoff/timeout 
util class in my KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the number of failures and returns the backoff/timeout value at 
the corresponding level.

Currently, the retry backoff has two main usage patterns.
 # For those async retries, the data often stays in a queue. We will make the 
class inherit from the {{Retriable}} and record failure when a 
{{RetriableException}} happens.
 # For those synchronous retires, the backoff is often implemented in a 
blocking poll/loop, we won’t need the inheritance and will just record the 
failed attempts using a local variable of generic data type (Long).

Producer side:
 # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each 
ProducerBatch in Accumulator, which already has an attribute attempts recording 
the number of failed attempts.
 # Transaction request (API_KEY..*TXN). TxnRequestHandler will inherit from 
Retriable and record each failed attempts, which will .
 # 

 

 

{{}}


> [KIP-580] Client Exponential Backoff Implementation
> ---
>
> Key: KAFKA-9800
> URL: https://issues.apache.org/jira/browse/KAFKA-9800
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Cheng Tan
>Assignee: Cheng Tan
>Priority: Major
>  Labels: KIP-580
>
> Design:
> The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
> has two main usage patterns:
>  # Synchronous retires and blocking loop. The thread will sleep in each 
> iteration for 
>  # Async retries. In each polling, the retries do not meet the backoff will 
> be filtered. The data class often maintains a 1:1 mapping to a set of 
> requests which are logically associated. (i.e. a set contains only one 
> initial request and only its retries.)
> For type 1, we can utilize a local failure counter of a Java generic data 
> type.
> For case 2, we can make those classes containing retriable data inherit from 
> an abstract class Retriable. Retriable will implement interfaces recording 
> the number of failed attempts. I already wrapped the exponential 
> backoff/timeout util class in my KIP-601 
> [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
>  which takes the number of failures and returns the backoff/timeout value at 
> the corresponding level.
>  
> Changes:
> KafkaProducer:
>  # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each 
> ProducerBa

[jira] [Resolved] (KAFKA-9724) Consumer wrongly ignores fetched records "since it no longer has valid position"

2020-06-09 Thread David Arthur (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-9724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-9724.
-
Fix Version/s: 2.6.0
 Assignee: David Arthur
   Resolution: Fixed

> Consumer wrongly ignores fetched records "since it no longer has valid 
> position"
> 
>
> Key: KAFKA-9724
> URL: https://issues.apache.org/jira/browse/KAFKA-9724
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 2.4.0
>Reporter: Oleg Muravskiy
>Assignee: David Arthur
>Priority: Major
> Fix For: 2.6.0
>
> Attachments: consumer.log.xz
>
>
> After upgrading kafka-client to 2.4.0 (while brokers are still at 2.2.0) 
> consumers in a consumer group intermittently stop progressing on assigned 
> partitions, even when there are messages to consume. This is not a permanent 
> condition, as they progress from time to time, but become slower with time, 
> and catch up after restart.
> Here is a sample of 3 consecutive ignored fetches:
> {noformat}
> 2020-03-15 12:08:58,440 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - 
> Committed offset 538065584 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,541 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Skipping validation of fetch offsets for partitions [mrt-rrc10-1, 
> mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required 
> protocol version (introduced in Kafka 2.3)
> 2020-03-15 12:08:58,549 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - 
> Updating last seen epoch from null to 62 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,557 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - 
> Committed offset 538065584 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,652 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Skipping validation of fetch offsets for partitions [mrt-rrc10-1, 
> mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required 
> protocol version (introduced in Kafka 2.3)
> 2020-03-15 12:08:58,659 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - 
> Updating last seen epoch from null to 62 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,659 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Fetch READ_UNCOMMITTED at offset 538065584 for partition mrt-rrc10-6 returned 
> fetch data (error=NONE, highWaterMark=538065631, lastStableOffset = 
> 538065631, logStartOffset = 485284547, preferredReadReplica = absent, 
> abortedTransactions = null, recordsSizeInBytes=16380)
> 2020-03-15 12:08:58,659 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Ignoring fetched records for partition mrt-rrc10-6 since it no longer has 
> valid position
> 2020-03-15 12:08:58,665 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - 
> Committed offset 538065584 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Skipping validation of fetch offsets for partitions [mrt-rrc10-1, 
> mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required 
> protocol version (introduced in Kafka 2.3)
> 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Added READ_UNCOMMITTED fetch request for partition mrt-rrc10-6 at position 
> FetchPosition{offset=538065584, offsetEpoch=Optional[62], 
> currentLeader=LeaderAndEpoch{leader=node03.kafka:9092 (id: 3 rack: null), 
> epoch=-1}} to node node03.kafka:9092 (id: 3 rack: null)
> 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), 
> implied=(mrt-rrc10-6, mrt-rrc22-7, mrt-rrc10-1)) to broker node03.kafka:9092 
> (id: 3 rack: null)
> 2020-03-15 12:08:58,770 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - 
> Updating last seen epoch from null to 62 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,770 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Fetch READ_UNCOMMITTED at offset 538065584 for partition mrt-rrc10-6 returned 
> fetch data (error=NONE, highWaterMark=538065727, lastStableOffset = 
> 538065727, logStartOffset = 485284547, preferredReadReplica = absent, 
> abortedTransactions = null, recordsSizeInBytes=51864)
> 2020-03-15 12:08:58,770 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Ignoring fetched records for partition mrt-rrc10-6 since it no longer has 
> valid position
> 2020-03-15 12:08:58,808 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - 
> Committed offset 538065584 for partition mrt-rrc10-6
> {noformat}
> After which consumer makes progress:
> {noformat}
> 2020-03-15 12:08:58,871 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Skipping validation of fetch offsets for partitions [mrt-rrc10-1, 
> mrt-rrc10-6, mrt-rrc22-7] since the

[jira] [Updated] (KAFKA-9724) Consumer wrongly ignores fetched records "since it no longer has valid position"

2020-06-09 Thread David Arthur (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-9724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-9724:

Fix Version/s: 2.5.1

> Consumer wrongly ignores fetched records "since it no longer has valid 
> position"
> 
>
> Key: KAFKA-9724
> URL: https://issues.apache.org/jira/browse/KAFKA-9724
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 2.4.0
>Reporter: Oleg Muravskiy
>Assignee: David Arthur
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
> Attachments: consumer.log.xz
>
>
> After upgrading kafka-client to 2.4.0 (while brokers are still at 2.2.0) 
> consumers in a consumer group intermittently stop progressing on assigned 
> partitions, even when there are messages to consume. This is not a permanent 
> condition, as they progress from time to time, but become slower with time, 
> and catch up after restart.
> Here is a sample of 3 consecutive ignored fetches:
> {noformat}
> 2020-03-15 12:08:58,440 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - 
> Committed offset 538065584 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,541 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Skipping validation of fetch offsets for partitions [mrt-rrc10-1, 
> mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required 
> protocol version (introduced in Kafka 2.3)
> 2020-03-15 12:08:58,549 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - 
> Updating last seen epoch from null to 62 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,557 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - 
> Committed offset 538065584 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,652 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Skipping validation of fetch offsets for partitions [mrt-rrc10-1, 
> mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required 
> protocol version (introduced in Kafka 2.3)
> 2020-03-15 12:08:58,659 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - 
> Updating last seen epoch from null to 62 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,659 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Fetch READ_UNCOMMITTED at offset 538065584 for partition mrt-rrc10-6 returned 
> fetch data (error=NONE, highWaterMark=538065631, lastStableOffset = 
> 538065631, logStartOffset = 485284547, preferredReadReplica = absent, 
> abortedTransactions = null, recordsSizeInBytes=16380)
> 2020-03-15 12:08:58,659 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Ignoring fetched records for partition mrt-rrc10-6 since it no longer has 
> valid position
> 2020-03-15 12:08:58,665 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - 
> Committed offset 538065584 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Skipping validation of fetch offsets for partitions [mrt-rrc10-1, 
> mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required 
> protocol version (introduced in Kafka 2.3)
> 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Added READ_UNCOMMITTED fetch request for partition mrt-rrc10-6 at position 
> FetchPosition{offset=538065584, offsetEpoch=Optional[62], 
> currentLeader=LeaderAndEpoch{leader=node03.kafka:9092 (id: 3 rack: null), 
> epoch=-1}} to node node03.kafka:9092 (id: 3 rack: null)
> 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), 
> implied=(mrt-rrc10-6, mrt-rrc22-7, mrt-rrc10-1)) to broker node03.kafka:9092 
> (id: 3 rack: null)
> 2020-03-15 12:08:58,770 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - 
> Updating last seen epoch from null to 62 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,770 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Fetch READ_UNCOMMITTED at offset 538065584 for partition mrt-rrc10-6 returned 
> fetch data (error=NONE, highWaterMark=538065727, lastStableOffset = 
> 538065727, logStartOffset = 485284547, preferredReadReplica = absent, 
> abortedTransactions = null, recordsSizeInBytes=51864)
> 2020-03-15 12:08:58,770 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Ignoring fetched records for partition mrt-rrc10-6 since it no longer has 
> valid position
> 2020-03-15 12:08:58,808 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - 
> Committed offset 538065584 for partition mrt-rrc10-6
> {noformat}
> After which consumer makes progress:
> {noformat}
> 2020-03-15 12:08:58,871 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Skipping validation of fetch offsets for partitions [mrt-rrc10-1, 
> mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required 
> protocol ve

[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation

2020-06-09 Thread Cheng Tan (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Tan updated KAFKA-9800:
-
Description: 
Design:

The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
has two main usage patterns:
 # Synchronous retires and blocking loop. The thread will sleep in each 
iteration for 
 # Async retries. In each polling, the retries do not meet the backoff will be 
filtered. The data class often maintains a 1:1 mapping to a set of requests 
which are logically associated. (i.e. a set contains only one initial request 
and only its retries.)

For type 1, we can utilize a local failure counter of a Java generic data type.

For case 2, we can make those classes containing retriable data inherit from an 
abstract class Retriable. Retriable will implement interfaces recording the 
number of failed attempts. I already wrapped the exponential backoff/timeout 
util class in my KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the number of failures and returns the backoff/timeout value at 
the corresponding level.

 

Changes:

KafkaProducer:
 # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each 
ProducerBatch in Accumulator, which already has an attribute attempts recording 
the number of failed attempts.
 # Transaction request (API_KEY..*TXN). TxnRequestHandler will inherit from 
Retriable and record each failed attempt.

KafkaConsumer:
 # 
 # Partition state request (API_KEY.OFFSET_FOR_LEADER_EPOCH, 

 

  was:
Design:

The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
has two main usage patterns:
 # Synchronous retires and blocking loop. The thread will sleep in each 
iteration for 
 # Async retries. In each polling, the retries do not meet the backoff will be 
filtered. The data class often maintains a 1:1 mapping to a set of requests 
which are logically associated. (i.e. a set contains only one initial request 
and only its retries.)

For type 1, we can utilize a local failure counter of a Java generic data type.

For case 2, we can make those classes containing retriable data inherit from an 
abstract class Retriable. Retriable will implement interfaces recording the 
number of failed attempts. I already wrapped the exponential backoff/timeout 
util class in my KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the number of failures and returns the backoff/timeout value at 
the corresponding level.

 

Changes:

KafkaProducer:
 # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each 
ProducerBatch in Accumulator, which already has an attribute attempts recording 
the number of failed attempts.
 # Transaction request (API_KEY..*TXN). TxnRequestHandler will inherit from 
Retriable and record each failed attempt.

KafkaConsumer

 

 


> [KIP-580] Client Exponential Backoff Implementation
> ---
>
> Key: KAFKA-9800
> URL: https://issues.apache.org/jira/browse/KAFKA-9800
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Cheng Tan
>Assignee: Cheng Tan
>Priority: Major
>  Labels: KIP-580
>
> Design:
> The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
> has two main usage patterns:
>  # Synchronous retires and blocking loop. The thread will sleep in each 
> iteration for 
>  # Async retries. In each polling, the retries do not meet the backoff will 
> be filtered. The data class often maintains a 1:1 mapping to a set of 
> requests which are logically associated. (i.e. a set contains only one 
> initial request and only its retries.)
> For type 1, we can utilize a local failure counter of a Java generic data 
> type.
> For case 2, we can make those classes containing retriable data inherit from 
> an abstract class Retriable. Retriable will implement interfaces recording 
> the number of failed attempts. I already wrapped the exponential 
> backoff/timeout util class in my KIP-601 
> [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
>  which takes the number of failures and returns the backoff/timeout value at 
> the corresponding level.
>  
> Changes:
> KafkaProducer:
>  # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each 
> ProducerBatch in Accumulator, which already has an attribute attempts 
> recording the number of failed attempts.
>  # Transaction request (API_KEY..*TXN). TxnRequestHandler will inherit from 
> Retriable and record each failed attempt.
> KafkaConsumer:
>  # 
>  # Partition state request (API_KEY.OFFSET_FOR_LEADER_EPOCH, 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10127) kafka cluster not recovering - Shrinking ISR continously

2020-06-09 Thread Youssef BOUZAIENNE (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-10127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Youssef BOUZAIENNE updated KAFKA-10127:
---
Description: 
We are actually facing issue from time to time where our kafka cluster goes 
into a weird state where we see the following log repeating

[2020-06-06 08:35:48,117] INFO [Partition test broker=1002] Cached zkVersion 
620 not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
 [2020-06-06 08:35:48,117] INFO [Partition test broker=1002] Shrinking ISR from 
1006,1002 to 1002. Leader: (highWatermark: 3222733572, endOffset: 3222741893). 
Out of sync replicas: (brokerId: 1006, endOffset: 3222733572). 
(kafka.cluster.Partition)

 

Before that our zookeeper session expired which lead us to that state 

 

after we increased this two value we encounter the issue less frequently but it 
still appears from time to time and the only solution is restart of kafka 
service on all brokers

zookeeper.session.timeout.ms=18000

replica.lag.time.max.ms=3

 

Any help on that please  

  was:
We are actually facing issue from time to time where our kafka cluster goes 
into a weird state where we see the following log repeating

 

[2020-06-06 08:35:48,117] INFO [Partition test broker=1002] Cached zkVersion 
620 not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2020-06-06 08:35:48,117] INFO [Partition test broker=1002] Shrinking ISR from 
1006,1002 to 1002. Leader: (highWatermark: 3222733572, endOffset: 3222741893). 
Out of sync replicas: (brokerId: 1006, endOffset: 3222733572). 
(kafka.cluster.Partition)

 

 

Before that our zookeeper session expired which lead us to that state 

 

after we increased this two value we encounter the issue less frequently but it 
still appears from time to time and the only solution is restart of kafka 
service on all brokers

zookeeper.session.timeout.ms=18000

replica.lag.time.max.ms=3

 

Any help on that please

 

  


> kafka cluster not recovering - Shrinking ISR  continously
> -
>
> Key: KAFKA-10127
> URL: https://issues.apache.org/jira/browse/KAFKA-10127
> Project: Kafka
>  Issue Type: Bug
>  Components: replication, zkclient
>Affects Versions: 2.4.1
> Environment: using kafka version 2.4.1 and zookeeper version 3.5.7
>Reporter: Youssef BOUZAIENNE
>Priority: Major
>
> We are actually facing issue from time to time where our kafka cluster goes 
> into a weird state where we see the following log repeating
> [2020-06-06 08:35:48,117] INFO [Partition test broker=1002] Cached zkVersion 
> 620 not equal to that in zookeeper, skip updating ISR 
> (kafka.cluster.Partition)
>  [2020-06-06 08:35:48,117] INFO [Partition test broker=1002] Shrinking ISR 
> from 1006,1002 to 1002. Leader: (highWatermark: 3222733572, endOffset: 
> 3222741893). Out of sync replicas: (brokerId: 1006, endOffset: 3222733572). 
> (kafka.cluster.Partition)
>  
> Before that our zookeeper session expired which lead us to that state 
>  
> after we increased this two value we encounter the issue less frequently but 
> it still appears from time to time and the only solution is restart of kafka 
> service on all brokers
> zookeeper.session.timeout.ms=18000
> replica.lag.time.max.ms=3
>  
> Any help on that please  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9747) No tasks created for a connector

2020-06-09 Thread Rodrigo (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-9747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129374#comment-17129374
 ] 

Rodrigo commented on KAFKA-9747:


I have the same issue here... MongoDB (Atlas), MongoDB Connector and AWS MSK.

> No tasks created for a connector
> 
>
> Key: KAFKA-9747
> URL: https://issues.apache.org/jira/browse/KAFKA-9747
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0
> Environment: OS: Ubuntu 18.04 LTS
> Platform: Confluent Platform 5.4
> HW: The same behaviour on various AWS instances - from t3.small to c5.xlarge
>Reporter: Vit Koma
>Priority: Major
> Attachments: connect-distributed.properties, connect.log
>
>
> We are running Kafka Connect in a distributed mode on 3 nodes using Debezium 
> (MongoDB) and Confluent S3 connectors. When adding a new connector via the 
> REST API the connector is created in RUNNING state, but no tasks are created 
> for the connector.
> Pausing and resuming the connector does not help. When we stop all workers 
> and then start them again, the tasks are created and everything runs as it 
> should.
> The issue does not show up if we run only a single node.
> The issue is not caused by the connector plugins, because we see the same 
> behaviour for both Debezium and S3 connectors. Also in debug logs I can see 
> that Debezium is correctly returning a task configuration from the 
> Connector.taskConfigs() method.
> Connector configuration examples
> Debezium:
> {
>   "name": "qa-mongodb-comp-converter-task|1",
>   "config": {
> "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
> "mongodb.hosts": 
> "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017",
> "mongodb.name": "qa-debezium-comp",
> "mongodb.ssl.enabled": true,
> "collection.whitelist": "converter[.]task",
> "tombstones.on.delete": true
>   }
> }
> S3 Connector:
> {
>   "name": "qa-s3-sink-task|1",
>   "config": {
> "connector.class": "io.confluent.connect.s3.S3SinkConnector",
> "topics": "qa-debezium-comp.converter.task",
> "topics.dir": "data/env/qa",
> "s3.region": "eu-west-1",
> "s3.bucket.name": "",
> "flush.size": "15000",
> "rotate.interval.ms": "360",
> "storage.class": "io.confluent.connect.s3.storage.S3Storage",
> "format.class": 
> "custom.kafka.connect.s3.format.plaintext.PlaintextFormat",
> "schema.generator.class": 
> "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.DefaultPartitioner",
> "schema.compatibility": "NONE",
> "key.converter": "org.apache.kafka.connect.json.JsonConverter",
> "value.converter": "org.apache.kafka.connect.json.JsonConverter",
> "key.converter.schemas.enable": false,
> "value.converter.schemas.enable": false,
> "transforms": "ExtractDocument",
> 
> "transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value"
>   }
> }
> The connectors are created using curl: {{curl -X POST -H "Content-Type: 
> application/json" --data @ http:/:10083/connectors}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10129) Fail the QA if there is javadoc error

2020-06-09 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-10129:
--

 Summary: Fail the QA if there is javadoc error
 Key: KAFKA-10129
 URL: https://issues.apache.org/jira/browse/KAFKA-10129
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


from [~hachikuji] 
(https://github.com/apache/kafka/pull/8660#pullrequestreview-425856179)

{quote}
One other question I had is whether we should consider making doc failures also 
fail the build?
{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10129) Fail the QA if there is javadoc error

2020-06-09 Thread Chia-Ping Tsai (Jira)

[ 
https://issues.apache.org/jira/browse/KAFKA-10129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129393#comment-17129393
 ] 

Chia-Ping Tsai commented on KAFKA-10129:


The javadoc is a part of public publication so we should avoid the error as 
much as possible 

> Fail the QA if there is javadoc error
> -
>
> Key: KAFKA-10129
> URL: https://issues.apache.org/jira/browse/KAFKA-10129
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> from [~hachikuji] 
> (https://github.com/apache/kafka/pull/8660#pullrequestreview-425856179)
> {quote}
> One other question I had is whether we should consider making doc failures 
> also fail the build?
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation

2020-06-09 Thread Cheng Tan (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Tan updated KAFKA-9800:
-
Description: 
Design:

The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
has two main usage patterns:
 # Synchronous retires and blocking loop. The thread will sleep in each 
iteration for 
 # Async retries. In each polling, the retries do not meet the backoff will be 
filtered. The data class often maintains a 1:1 mapping to a set of requests 
which are logically associated. (i.e. a set contains only one initial request 
and only its retries.)

For type 1, we can utilize a local failure counter of a Java generic data type.

For case 2, we can make those classes containing retriable data inherit from an 
abstract class Retriable. Retriable will implement interfaces recording the 
number of failed attempts. I already wrapped the exponential backoff/timeout 
util class in my KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the number of failures and returns the backoff/timeout value at 
the corresponding level.

 

Changes:

KafkaProducer:
 # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each 
ProducerBatch in Accumulator, which already has an attribute attempts recording 
the number of failed attempts. So probably clean up the logic a little bit by 
hiding the failed attempts property and the getter method by inheritance.
 # Transaction request (ApiKeys..*TXN). TxnRequestHandler will inherit from 
Retriable and record each failed attempt.

KafkaConsumer:
 # Some synchronous retry use cases. Record the failed attempts in the blocking 
loop.
 # Partition state request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, 
ApiKeys.LIST_OFFSETS). Though the actual requests are packed for each node, the 
current implementation is applying backoff to each topic partition, where the 
backoff value is kept by TopicPartitionState. Thus, TopicPartitionState will 
inherit from Retribale.

Metadata:

 

 AdminClient:
 # AdminClient has its own request abstraction Call. The failed attempts are 
kept by the abstraction. So probably clean the logic a bit by inheritance.

 

  was:
Design:

The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
has two main usage patterns:
 # Synchronous retires and blocking loop. The thread will sleep in each 
iteration for 
 # Async retries. In each polling, the retries do not meet the backoff will be 
filtered. The data class often maintains a 1:1 mapping to a set of requests 
which are logically associated. (i.e. a set contains only one initial request 
and only its retries.)

For type 1, we can utilize a local failure counter of a Java generic data type.

For case 2, we can make those classes containing retriable data inherit from an 
abstract class Retriable. Retriable will implement interfaces recording the 
number of failed attempts. I already wrapped the exponential backoff/timeout 
util class in my KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the number of failures and returns the backoff/timeout value at 
the corresponding level.

 

Changes:

KafkaProducer:
 # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each 
ProducerBatch in Accumulator, which already has an attribute attempts recording 
the number of failed attempts.
 # Transaction request (API_KEY..*TXN). TxnRequestHandler will inherit from 
Retriable and record each failed attempt.

KafkaConsumer:
 # 
 # Partition state request (API_KEY.OFFSET_FOR_LEADER_EPOCH, 

 


> [KIP-580] Client Exponential Backoff Implementation
> ---
>
> Key: KAFKA-9800
> URL: https://issues.apache.org/jira/browse/KAFKA-9800
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Cheng Tan
>Assignee: Cheng Tan
>Priority: Major
>  Labels: KIP-580
>
> Design:
> The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
> has two main usage patterns:
>  # Synchronous retires and blocking loop. The thread will sleep in each 
> iteration for 
>  # Async retries. In each polling, the retries do not meet the backoff will 
> be filtered. The data class often maintains a 1:1 mapping to a set of 
> requests which are logically associated. (i.e. a set contains only one 
> initial request and only its retries.)
> For type 1, we can utilize a local failure counter of a Java generic data 
> type.
> For case 2, we can make those classes containing retriable data inherit from 
> an abstract class Retriable. Retriable will implement interfaces recording 
> the number of failed attempts. I already wrapped the exponential 
> backoff/timeout util class in my KIP-601 
> [implem

[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation

2020-06-09 Thread Cheng Tan (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Tan updated KAFKA-9800:
-
Description: 
Design:

The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
has two main usage patterns:
 # Synchronous retires and blocking loop. The thread will sleep in each 
iteration for 
 # Async retries. In each polling, the retries do not meet the backoff will be 
filtered. The data class often maintains a 1:1 mapping to a set of requests 
which are logically associated. (i.e. a set contains only one initial request 
and only its retries.)

For type 1, we can utilize a local failure counter of a Java generic data type.

For case 2, we can make those classes containing retriable data inherit from an 
abstract class Retriable. Retriable will implement interfaces recording the 
number of failed attempts. I already wrapped the exponential backoff/timeout 
util class in my KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the number of failures and returns the backoff/timeout value at 
the corresponding level.

 

Changes:

KafkaProducer:
 # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each 
ProducerBatch in Accumulator, which already has an attribute attempts recording 
the number of failed attempts. So probably clean up the logic a little bit by 
hiding the failed attempts property and the getter method by inheritance.
 # Transaction request (ApiKeys..*TXN). TxnRequestHandler will inherit from 
Retriable and record each failed attempt.

KafkaConsumer:
 # Some synchronous retry use cases. Record the failed attempts in the blocking 
loop.
 # Partition state request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, 
ApiKeys.LIST_OFFSETS). Though the actual requests are packed for each node, the 
current implementation is applying backoff to each topic partition, where the 
backoff value is kept by TopicPartitionState. Thus, TopicPartitionState will 
inherit from Retribale.

Metadata:
 #  Metadata lives as a singleton in many clients. It can inherit from 
Retriable.

 AdminClient:
 # AdminClient has its own request abstraction Call. The failed attempts are 
kept by the abstraction. So probably clean the logic a bit by inheritance.

There're other common usages look like client.poll(timeout), where the timeout 
passed in is the retry backoff value. We won't change these usages since its 
underlying logic is nioSelector.select(timeout) and nioSelector.selectNow(), 
which means if no interested op exists, the client will block retry backoff 
milliseconds. This is an optimization when there's no request that needs to be 
sent but the client is waiting for responses. Specifically, if the client fails 
the inflight requests before the retry backoff milliseconds passed, it still 
needs to wait until that amount of time passed, unless there's a new request 
need to be sent.

 

  was:
Design:

The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
has two main usage patterns:
 # Synchronous retires and blocking loop. The thread will sleep in each 
iteration for 
 # Async retries. In each polling, the retries do not meet the backoff will be 
filtered. The data class often maintains a 1:1 mapping to a set of requests 
which are logically associated. (i.e. a set contains only one initial request 
and only its retries.)

For type 1, we can utilize a local failure counter of a Java generic data type.

For case 2, we can make those classes containing retriable data inherit from an 
abstract class Retriable. Retriable will implement interfaces recording the 
number of failed attempts. I already wrapped the exponential backoff/timeout 
util class in my KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the number of failures and returns the backoff/timeout value at 
the corresponding level.

 

Changes:

KafkaProducer:
 # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each 
ProducerBatch in Accumulator, which already has an attribute attempts recording 
the number of failed attempts. So probably clean up the logic a little bit by 
hiding the failed attempts property and the getter method by inheritance.
 # Transaction request (ApiKeys..*TXN). TxnRequestHandler will inherit from 
Retriable and record each failed attempt.

KafkaConsumer:
 # Some synchronous retry use cases. Record the failed attempts in the blocking 
loop.
 # Partition state request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, 
ApiKeys.LIST_OFFSETS). Though the actual requests are packed for each node, the 
current implementation is applying backoff to each topic partition, where the 
backoff value is kept by TopicPartitionState. Thus, TopicPartitionState will 
inherit from Retribale.

Metadata:

 

 AdminClient:
 # AdminClient has its own request

[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation

2020-06-09 Thread Cheng Tan (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Tan updated KAFKA-9800:
-
Description: 
Design:

The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
has two main usage patterns:
 # Synchronous retires and blocking loop. The thread will sleep in each 
iteration for 
 # Async retries. In each polling, the retries do not meet the backoff will be 
filtered. The data class often maintains a 1:1 mapping to a set of requests 
which are logically associated. (i.e. a set contains only one initial request 
and only its retries.)

For type 1, we can utilize a local failure counter of a Java generic data type.

For case 2, we can make those classes containing retriable data inherit from an 
abstract class Retriable. Retriable will implement interfaces recording the 
number of failed attempts. I already wrapped the exponential backoff/timeout 
util class in my KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the number of failures and returns the backoff/timeout value at 
the corresponding level.

 

Changes:

KafkaProducer:
 # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each 
ProducerBatch in Accumulator, which already has an attribute attempts recording 
the number of failed attempts. So probably clean up the logic a little bit by 
hiding the failed attempts property and the getter method by inheritance.
 # Transaction request (ApiKeys..*TXN). TxnRequestHandler will inherit from 
Retriable and record each failed attempt.

KafkaConsumer:
 # Some synchronous retry use cases. Record the failed attempts in the blocking 
loop.
 # Partition state request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, 
ApiKeys.LIST_OFFSETS). Though the actual requests are packed for each node, the 
current implementation is applying backoff to each topic partition, where the 
backoff value is kept by TopicPartitionState. Thus, TopicPartitionState will 
inherit from Retribale.

Metadata:
 #  Metadata lives as a singleton in many clients. It can inherit from 
Retriable.

 AdminClient:
 # AdminClient has its own request abstraction Call. The failed attempts are 
kept by the abstraction. So probably clean the Call class logic a bit by 
inheritance.

There're other common usages look like client.poll(timeout), where the timeout 
passed in is the retry backoff value. We won't change these usages since its 
underlying logic is nioSelector.select(timeout) and nioSelector.selectNow(), 
which means if no interested op exists, the client will block retry backoff 
milliseconds. This is an optimization when there's no request that needs to be 
sent but the client is waiting for responses. Specifically, if the client fails 
the inflight requests before the retry backoff milliseconds passed, it still 
needs to wait until that amount of time passed, unless there's a new request 
need to be sent.

 

  was:
Design:

The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
has two main usage patterns:
 # Synchronous retires and blocking loop. The thread will sleep in each 
iteration for 
 # Async retries. In each polling, the retries do not meet the backoff will be 
filtered. The data class often maintains a 1:1 mapping to a set of requests 
which are logically associated. (i.e. a set contains only one initial request 
and only its retries.)

For type 1, we can utilize a local failure counter of a Java generic data type.

For case 2, we can make those classes containing retriable data inherit from an 
abstract class Retriable. Retriable will implement interfaces recording the 
number of failed attempts. I already wrapped the exponential backoff/timeout 
util class in my KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the number of failures and returns the backoff/timeout value at 
the corresponding level.

 

Changes:

KafkaProducer:
 # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each 
ProducerBatch in Accumulator, which already has an attribute attempts recording 
the number of failed attempts. So probably clean up the logic a little bit by 
hiding the failed attempts property and the getter method by inheritance.
 # Transaction request (ApiKeys..*TXN). TxnRequestHandler will inherit from 
Retriable and record each failed attempt.

KafkaConsumer:
 # Some synchronous retry use cases. Record the failed attempts in the blocking 
loop.
 # Partition state request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, 
ApiKeys.LIST_OFFSETS). Though the actual requests are packed for each node, the 
current implementation is applying backoff to each topic partition, where the 
backoff value is kept by TopicPartitionState. Thus, TopicPartitionState will 
inherit from Retribale.

Metadata:
 #  Metadata lives as a singleton in man

[GitHub] [kafka] hachikuji commented on pull request #8664: KAFKA-9716: Clarify meaning of compression rate metrics

2020-06-09 Thread GitBox

hachikuji commented on pull request #8664:
URL: https://github.com/apache/kafka/pull/8664#issuecomment-640947926







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10129) Fail QA if there are javadoc warnings

2020-06-09 Thread Chia-Ping Tsai (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-10129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-10129:
---
Summary: Fail QA if there are javadoc warnings  (was: Fail the QA if there 
is javadoc error)

> Fail QA if there are javadoc warnings
> -
>
> Key: KAFKA-10129
> URL: https://issues.apache.org/jira/browse/KAFKA-10129
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> from [~hachikuji] 
> (https://github.com/apache/kafka/pull/8660#pullrequestreview-425856179)
> {quote}
> One other question I had is whether we should consider making doc failures 
> also fail the build?
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang merged pull request #8788: MINOR: Remove unused isSticky assert out from tests only do constrainedAssign

2020-06-09 Thread GitBox

guozhangwang merged pull request #8788:
URL: https://github.com/apache/kafka/pull/8788


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #8835: MINOR: reduce sizeInBytes for percentiles metrics

2020-06-09 Thread GitBox

ableegoldman opened a new pull request #8835:
URL: https://github.com/apache/kafka/pull/8835


   The total amount of memory per Percentiles metric is actually the 
sizeInBytes * number of samples, where the default number of samples is 2. 
There are also at least 2 Percentiles metrics per task (and more when there are 
multiple sources or multiple terminal nodes). So the total memory we allocate 
for the current e2e latency metrics is `4 * sizeInBytes * numTasks`
   
   The current sizeInBytes we chose was 1MB. This is still not particularly 
large, but may add up and cause unnecessary pressure for users running on lean 
machines.
   
   I reduced the size to 100kB and still found it accurate enough* so we may as 
well reduce the size so we don't have to worry. (*1,000 runs of the randomized 
test all passed, meaning it was accurate to within 20% of the expected value)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #8312: KAFKA-9432 automated protocol for DescribeConfigs

2020-06-09 Thread GitBox

mimaison commented on pull request #8312:
URL: https://github.com/apache/kafka/pull/8312#issuecomment-640882951


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #8828: KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy

2020-06-09 Thread GitBox

C0urante commented on a change in pull request #8828:
URL: https://github.com/apache/kafka/pull/8828#discussion_r436881026



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##
@@ -375,6 +383,152 @@ public boolean createTopic(NewTopic topic) {
 return existingTopics;
 }
 
+/**
+ * Verify the named topic uses only compaction for the cleanup policy.
+ *
+ * @param topic the name of the topic
+ * @param workerTopicConfig the name of the worker configuration that 
specifies the topic name
+ * @return true if the admin client could be used to verify the topic 
setting, or false if
+ * the verification could not be performed, likely because the 
admin client principal
+ * did not have the required permissions or because the broker was 
older than 0.11.0.0
+ * @throws ConfigException if the actual topic setting did not match the 
required setting
+ */
+public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String 
workerTopicConfig,
+String topicPurpose) {
+Set cleanupPolicies = topicCleanupPolicy(topic);
+if (cleanupPolicies == null || cleanupPolicies.isEmpty()) {

Review comment:
   Is it possible that this will also be true if there isn't a cleanup 
policy configured on the topic?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #8312: KAFKA-9432 automated protocol for DescribeConfigs

2020-06-09 Thread GitBox

tombentley commented on pull request #8312:
URL: https://github.com/apache/kafka/pull/8312#issuecomment-640700423







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8672: KAFKA-10002; Improve performances of StopReplicaRequest with large number of partitions to be deleted

2020-06-09 Thread GitBox

ijuma commented on a change in pull request #8672:
URL: https://github.com/apache/kafka/pull/8672#discussion_r436791602



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -878,8 +936,7 @@ class LogManager(logDirs: Seq[File],
 // Now that replica in source log directory has been successfully 
renamed for deletion.
 // Close the log, update checkpoint files, and enqueue this log to be 
deleted.
 sourceLog.close()
-checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.parentDirFile, 
ArrayBuffer.empty)
-checkpointLogStartOffsetsInDir(sourceLog.parentDirFile)
+checkpointRecoveryAndLogStartOffsetsInDir(sourceLog.parentDirFile)

Review comment:
   Well, they are tightly coupled right? The method name of 
`deleteSnapshotsAfterRecoveryPointCheckpoint` makes it clear that this should 
be called after the recovery point is checkpointed. Generally, we've had bugs 
whenever we left it to the callers to make the same multiple calls in sequence 
in multiple places.
   
   I haven't looked at this PR in detail, so there are probably good reasons to 
change it. Also keep in mind https://github.com/apache/kafka/pull/7929/files 
that tries to improve the approach on how we handle this more generally.

##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -878,8 +936,7 @@ class LogManager(logDirs: Seq[File],
 // Now that replica in source log directory has been successfully 
renamed for deletion.
 // Close the log, update checkpoint files, and enqueue this log to be 
deleted.
 sourceLog.close()
-checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.parentDirFile, 
ArrayBuffer.empty)
-checkpointLogStartOffsetsInDir(sourceLog.parentDirFile)
+checkpointRecoveryAndLogStartOffsetsInDir(sourceLog.parentDirFile)

Review comment:
   Well, they are tightly coupled right? The method name of 
`deleteSnapshotsAfterRecoveryPointCheckpoint` makes it clear that this should 
be called after the recovery point is checkpointed. Generally, we've had bugs 
whenever we left it to the callers to make the same multiple calls in sequence 
in multiple places.
   
   I haven't looked at this PR in detail, so there are probably good reasons to 
change it. Also keep in mind https://github.com/apache/kafka/pull/7929/files 
which tries to improve the approach on how we handle this more generally.

##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -878,8 +936,7 @@ class LogManager(logDirs: Seq[File],
 // Now that replica in source log directory has been successfully 
renamed for deletion.
 // Close the log, update checkpoint files, and enqueue this log to be 
deleted.
 sourceLog.close()
-checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.parentDirFile, 
ArrayBuffer.empty)
-checkpointLogStartOffsetsInDir(sourceLog.parentDirFile)
+checkpointRecoveryAndLogStartOffsetsInDir(sourceLog.parentDirFile)

Review comment:
   Well, they are tightly coupled right? The method name of 
`deleteSnapshotsAfterRecoveryPointCheckpoint` makes it clear that this should 
be called after the recovery point is checkpointed. Generally, we've had bugs 
whenever we left it to the callers to make the same multiple calls in sequence 
in multiple places.
   
   I haven't looked at this PR in detail, so there are probably good reasons to 
change it. Also keep in mind #7929 which tries to improve the approach on how 
we handle this more generally.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jiameixie commented on pull request #8836: KAFKA-10124:Wrong rebalance.time.ms

2020-06-09 Thread GitBox

jiameixie commented on pull request #8836:
URL: https://github.com/apache/kafka/pull/8836#issuecomment-641125571


   @ijuma @huxihx @guozhangwang Call for a review. Do you think there is a good 
place to update `joinStart `? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #8815: HOTFIX: fix validity check in sticky assignor tests

2020-06-09 Thread GitBox

guozhangwang merged pull request #8815:
URL: https://github.com/apache/kafka/pull/8815


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8672: KAFKA-10002; Improve performances of StopReplicaRequest with large number of partitions to be deleted

2020-06-09 Thread GitBox

dajac commented on a change in pull request #8672:
URL: https://github.com/apache/kafka/pull/8672#discussion_r436704800



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -273,7 +272,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
*  6. If the partition is already paused, a new call to this function
* will increase the paused count by one.
*/
-  def abortAndPauseCleaning(topicPartition: TopicPartition): Unit = {
+  def abortAndPauseCleaning(topicPartition: TopicPartition, partitionDeleted: 
Boolean = false): Unit = {

Review comment:
   Actually, we already have `abortCleaning` for this purpose and 
`abortCleaning` calls `abortAndPauseCleaning`. I was trying to avoid logging a 
message when the partition is deleted because it does not bring much and 
literally flood the log when many partitions are deleted.
   
   While re-looking at this, I have done it differently now. I have found that 
logs were spread between the LogCleanerManager and the LogManager and that we 
were logging when resuming cleaning but not all the time. I have consolidated 
all the cases where we want to log explicitly in helper methods. It also helps 
with not always having to check if `cleaner != null`. Let me know what you 
think. 

##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -912,12 +969,9 @@ class LogManager(logDirs: Seq[File],
 if (removedLog != null) {
   //We need to wait until there is no more cleaning task on the log to be 
deleted before actually deleting it.
   if (cleaner != null && !isFuture) {
-cleaner.abortCleaning(topicPartition)
-cleaner.updateCheckpoints(removedLog.parentDirFile)
+cleaner.abortCleaning(topicPartition, partitionDeleted = true)
   }
   removedLog.renameDir(Log.logDeleteDirName(topicPartition))
-  checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.parentDirFile, 
ArrayBuffer.empty)

Review comment:
   I totally agree with you. I have tried different ways trying to keep a 
better encapsulation but I have found a really satisfying way to get there. As 
you pointed out, the involvement of `Partition` makes this complex.
   
   Here is my best idea so far:
   * We remove the calls to `asyncDelete` in the `Partition.delete` method. It 
seems safe to "delete the partition" while keeping the files on disk while 
holding the `replicaStateChangeLock` in the `ReplicaManager`.
   * We use a `asyncDelete` that takes a batch of logs, deletes them and 
checkpoint.
   * We rename `Partition.delete` to something like `Partition.close` as the 
log is not really deleted any more.
   
   What do you think?

##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -912,12 +969,9 @@ class LogManager(logDirs: Seq[File],
 if (removedLog != null) {
   //We need to wait until there is no more cleaning task on the log to be 
deleted before actually deleting it.
   if (cleaner != null && !isFuture) {
-cleaner.abortCleaning(topicPartition)
-cleaner.updateCheckpoints(removedLog.parentDirFile)
+cleaner.abortCleaning(topicPartition, partitionDeleted = true)
   }
   removedLog.renameDir(Log.logDeleteDirName(topicPartition))
-  checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.parentDirFile, 
ArrayBuffer.empty)

Review comment:
   I have implemented it to see. You can check out the last commit.

##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -878,8 +936,7 @@ class LogManager(logDirs: Seq[File],
 // Now that replica in source log directory has been successfully 
renamed for deletion.
 // Close the log, update checkpoint files, and enqueue this log to be 
deleted.
 sourceLog.close()
-checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.parentDirFile, 
ArrayBuffer.empty)
-checkpointLogStartOffsetsInDir(sourceLog.parentDirFile)
+checkpointRecoveryAndLogStartOffsetsInDir(sourceLog.parentDirFile)

Review comment:
   Indeed, they are coupled from that perspective. We can keep them 
together in one method though.

##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -869,17 +903,18 @@ class LogManager(logDirs: Seq[File],
   currentLogs.put(topicPartition, destLog)
   if (cleaner != null) {
 cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, 
destLog.parentDirFile)
-cleaner.resumeCleaning(Seq(topicPartition))
-info(s"Compaction for partition $topicPartition is resumed")
+resumeCleaning(topicPartition)
   }
 
   try {
 sourceLog.renameDir(Log.logDeleteDirName(topicPartition))
 // Now that replica in source log directory has been successfully 
renamed for deletion.
 // Close the log, update checkpoint files, and enqueue this log to be 
deleted.
 sourceLog.close()
-checkpointRec

[GitHub] [kafka] bbejeck merged pull request #8823: MINOR: fix HTML markup

2020-06-09 Thread GitBox

bbejeck merged pull request #8823:
URL: https://github.com/apache/kafka/pull/8823


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8836: KAFKA-10124:Wrong rebalance.time.ms

2020-06-09 Thread GitBox

chia7712 commented on a change in pull request #8836:
URL: https://github.com/apache/kafka/pull/8836#discussion_r437177352



##
File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala
##
@@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging {
 var messagesRead = 0L
 var lastBytesRead = 0L
 var lastMessagesRead = 0L
-var joinStart = 0L
 var joinTimeMsInSingleRound = 0L
 
 consumer.subscribe(topics.asJava, new ConsumerRebalanceListener {
   def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): 
Unit = {
-joinTime.addAndGet(System.currentTimeMillis - joinStart)

Review comment:
   It seems to me that the origin code want to count the elapsed time of 
joining group. Hence, the ```joinStart``` is required and the following 
```joinStart = System.currentTimeMillis``` is required too. 
   
   For another, the initial value of ```joinStart``` is incorrect. It should be 
equal to ```testStartTime```

##
File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala
##
@@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging {
 var messagesRead = 0L
 var lastBytesRead = 0L
 var lastMessagesRead = 0L
-var joinStart = 0L
 var joinTimeMsInSingleRound = 0L
 
 consumer.subscribe(topics.asJava, new ConsumerRebalanceListener {
   def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): 
Unit = {
-joinTime.addAndGet(System.currentTimeMillis - joinStart)

Review comment:
   hmmm, is there a good place to update the  start time of join? 

##
File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala
##
@@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging {
 var messagesRead = 0L
 var lastBytesRead = 0L
 var lastMessagesRead = 0L
-var joinStart = 0L
 var joinTimeMsInSingleRound = 0L
 
 consumer.subscribe(topics.asJava, new ConsumerRebalanceListener {
   def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): 
Unit = {
-joinTime.addAndGet(System.currentTimeMillis - joinStart)

Review comment:
   hmmm, is there a good place to update the  start time of join if they 
are NOT called just once?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8805: KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group

2020-06-09 Thread GitBox

kkonstantine commented on pull request #8805:
URL: https://github.com/apache/kafka/pull/8805#issuecomment-640853227







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #8672: KAFKA-10002; Improve performances of StopReplicaRequest with large number of partitions to be deleted

2020-06-09 Thread GitBox

dajac commented on pull request #8672:
URL: https://github.com/apache/kafka/pull/8672#issuecomment-641249326


   @hachikuji I have made another pass on the PR based on your input. Could you 
please have a second look at it? I am especially interested by getting your 
view on the overall approach for the checkpointing and the batch deletion.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gardnervickers commented on pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories

2020-06-09 Thread GitBox

gardnervickers commented on pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#issuecomment-640691697


   In 6152847, I changed the approach taken in this PR to move away from 
managing the producer state snapshot file as part of the segment lifecycle. 
This was done for two reasons, first it turned out to be a bit difficult to 
reason about producer state snapshot files which did not have a corresponding 
segment file (such as the snapshot taken during shutdown). Second, moving the 
management of deletion to the LogSegment ended up splitting ownership of the 
producer state snapshot file between the `LogSegment` and the 
`ProducerStateManager`, which seemed to complicate when things like truncation 
could happen.
   
   Instead, a call is made to the `ProducerStateManager` to delete any matching 
producer snapshot files when `LogSegment`'s are deleted. This occurs during 
retention and as a result of log cleaning (compaction). For producer state 
snapshot files which do not have a corresponding segment, they will be cleaned 
up on restart (unless they are the latest snapshot file).
   
   I think this ends up being simpler while more closely matching the current 
set of responsibilities, where the `ProducerStateManager` "owns" 
creating/reading/deleting snapshot files, but I'd be interested in hearing 
other opinions 😄.  
   
   cc/ @junrao



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)

2020-06-09 Thread GitBox

kowshik commented on a change in pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#discussion_r437124061



##
File path: core/src/main/scala/kafka/zk/ZkData.scala
##
@@ -81,17 +83,26 @@ object BrokerIdsZNode {
 object BrokerInfo {
 
   /**
-   * Create a broker info with v4 json format (which includes multiple 
endpoints and rack) if
-   * the apiVersion is 0.10.0.X or above. Register the broker with v2 json 
format otherwise.
+   * - Create a broker info with v5 json format if the apiVersion is 2.6.x or 
above.
+   * - Create a broker info with v4 json format (which includes multiple 
endpoints and rack) if
+   *   the apiVersion is 0.10.0.X or above but lesser than 2.6.x.

Review comment:
   Done.

##
File path: core/src/main/scala/kafka/zk/ZkData.scala
##
@@ -81,17 +83,26 @@ object BrokerIdsZNode {
 object BrokerInfo {
 
   /**
-   * Create a broker info with v4 json format (which includes multiple 
endpoints and rack) if
-   * the apiVersion is 0.10.0.X or above. Register the broker with v2 json 
format otherwise.
+   * - Create a broker info with v5 json format if the apiVersion is 2.6.x or 
above.

Review comment:
   Done.

##
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler}
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+import scala.util.control.Exception.ignoring
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce an optional latch that can be used to notify 
the caller when an
+   *updateOrThrow() operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+/**
+ * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+ * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+ * exception is raised.
+ *
+ * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked exactly
+ * once successfully. A subsequent invocation will raise an exception.
+ *
+ * @throws   IllegalStateException, if a non-empty notifier was provided 
in the constructor, and
+ *   this method is called again after a successful previous 
invocation.
+ * @throws   FeatureCacheUpdateException, if there was an error in 
updating the
+ *   FinalizedFeatureCache.
+ * @throws   RuntimeException, if there was a failure in 
reading/deserializing the
+ *   contents of the feature ZK node.
+ */
+def updateLatestOrThrow(): Unit = {
+  maybeNotifyOnce.foreach(notifier => {
+if (notifier.getCount != 1) {
+  throw new IllegalStateException(
+"Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+}
+  })
+
+  debug(s"Reading feature ZK node at path: $featureZkNodePath")
+  val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+
+  // There are 4 cases:
+  //
+  // (empty dataBytes, valid version

[GitHub] [kafka] Lucent-Wong removed a comment on pull request #8453: KAFKA-9841: Connector and Task duplicated when a worker join with old…

2020-06-09 Thread GitBox

Lucent-Wong removed a comment on pull request #8453:
URL: https://github.com/apache/kafka/pull/8453#issuecomment-641192433


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] sneakyburro commented on a change in pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

2020-06-09 Thread GitBox

sneakyburro commented on a change in pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#discussion_r437317697



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -842,7 +842,14 @@ public synchronized ProcessorTopology buildTopology() {
 nodeGroup.addAll(value);
 }
 nodeGroup.removeAll(globalNodeGroups());
-
+for (final NodeFactory entry : nodeFactories.values()) {
+if (entry instanceof ProcessorNodeFactory) {
+ProcessorNodeFactory factory = (ProcessorNodeFactory) entry;
+if (factory.supplier.get() == factory.supplier.get()) {
+throw new TopologyException("topology has singleton result 
of ProcessorSupplier " + factory.name);
+}
+}
+}

Review comment:
   That's a great idea. I introduced a util class `TopologyUtil` under 
processor package.

##
File path: docs/streams/developer-guide/processor-api.html
##
@@ -439,6 +439,9 @@ Connecting 
Processors and State StoresTopology code, accessing it in the processor’s 
init() method 
will also throw an exception at
 runtime, indicating the state store is not accessible from 
this processor.
+Note that there could be multiple ProcessorContext instances initialize 
your Processor 
during initialization.

Review comment:
   Adopted and revised a little. Thanks!

##
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##
@@ -648,11 +648,12 @@ public synchronized Topology addSink(final String name,
  * If {@code supplier} provides stores via {@link 
ConnectedStoreProvider#stores()}, the provided {@link StoreBuilder}s
  * will be added to the topology and connected to this processor 
automatically.
  *
- * @param name the unique name of the processor node
- * @param supplier the supplier used to obtain this node's {@link 
Processor} instance
- * @param parentNames the name of one or more source or processor nodes 
whose output records this processor should receive
- * and process
- * @return itself
+ * @param name  the unique name of the processor node
+ * @param supplier  the supplier used to construct this node's {@link 
Processor} instance; the implementation of supplier
+ *  should return a newly constructed {@link 
Processor} instance inside the scope of the lambda expression.

Review comment:
   I reverted java docs on parameter and added web-docs-like docs on main 
java docs for this function. 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##
@@ -200,6 +200,9 @@ public StateStore getStateStore(final String name) {
 }
 
 final String sendTo = toInternal.child();
+if (currentNode() == null) {
+throw new StreamsException("Current node is unknown when 
forwarding to: " + key);

Review comment:
   Made the change!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8453: KAFKA-9841: Connector and Task duplicated when a worker join with old…

2020-06-09 Thread GitBox

kkonstantine commented on pull request #8453:
URL: https://github.com/apache/kafka/pull/8453#issuecomment-641102055


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

2020-06-09 Thread GitBox

abbccdda commented on a change in pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#discussion_r436913810



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -147,6 +147,46 @@ public String toString() {
 }
 }
 
+/**
+ * TopicNode is the a topic node abstraction for graph built with 
TopicNode and TopicsInfo, the graph is useful
+ * when in certain cases traverse is needed. For example, method 
setRepartitionTopicMetadataNumberOfPartitions
+ * internally do a DFS search along with the graph.
+ *
+ TopicNode("t1")  TopicNode("t2")
TopicNode("t6") TopicNode("t7")
+\   /  
  \   /
+  TopicsInfo(source = (t1,t2), sink = (t3,t4)) 
  TopicsInfo(source = (t6,t7), sink = (t4))
+/   \  
  /
+ / \   
   /
+  /\   
/
+  /\   
/
+ /   \/
+ TopicNode("t3") TopicNode("t4")
+\
+ TopicsInfo(source = (t3), sink = ())
+
+ t3 = max(t1,t2)
+ t4 = max(max(t1,t2), max(t6,t7))
+ */
+private static class TopicNode {
+public final String topicName;
+public final Set upStreams; // upStream TopicsInfo's 
sinkTopics contains this
+public final Set downStreams; // downStreams TopicsInfo's 
sourceTopics contains this

Review comment:
   I haven't seen this struct being used.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int 
minReceivedMetadataVersion,
  */
 private void setRepartitionTopicMetadataNumberOfPartitions(final 
Map repartitionTopicMetadata,
final 
Map topicGroups,
-   final Cluster 
metadata) {
-boolean numPartitionsNeeded;
-do {
-numPartitionsNeeded = false;
-
-for (final TopicsInfo topicsInfo : topicGroups.values()) {
-for (final String topicName : 
topicsInfo.repartitionSourceTopics.keySet()) {
-final Optional maybeNumPartitions = 
repartitionTopicMetadata.get(topicName)
- 
.numberOfPartitions();
-Integer numPartitions = null;
-
-if (!maybeNumPartitions.isPresent()) {
-// try set the number of partitions for this 
repartition topic if it is not set yet
-for (final TopicsInfo otherTopicsInfo : 
topicGroups.values()) {
-final Set otherSinkTopics = 
otherTopicsInfo.sinkTopics;
-
-if (otherSinkTopics.contains(topicName)) {
-// if this topic is one of the sink topics of 
this topology,
-// use the maximum of all its source topic 
partitions as the number of partitions
-for (final String sourceTopicName : 
otherTopicsInfo.sourceTopics) {
-Integer numPartitionsCandidate = null;
-// It is possible the sourceTopic is 
another internal topic, i.e,
-// map().join().join(map())
-if 
(repartitionTopicMetadata.containsKey(sourceTopicName)) {
-if 
(repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent())
 {
-numPartitionsCandidate =
-
repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-}
-} else {
-final Integer count = 
metadata.partitionCountForTopic(sourceTopicName);
-if (count == null) {
-throw new IllegalStateException(
-"No partition count found for 
source topic "
-+ sourceTopicName
-  

[GitHub] [kafka] ijuma commented on pull request #8802: MINOR: Fix fetch session epoch comment in `FetchRequest.json`

2020-06-09 Thread GitBox

ijuma commented on pull request #8802:
URL: https://github.com/apache/kafka/pull/8802#issuecomment-640721062


   Request/response names are just labels for humans, so you can change them 
without affecting compatibility.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management

2020-06-09 Thread GitBox

mjsax commented on a change in pull request #8833:
URL: https://github.com/apache/kafka/pull/8833#discussion_r436864561



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##
@@ -108,13 +107,20 @@ public void completeRestoration() {
 }
 
 @Override
-public void prepareSuspend() {
-log.trace("No-op prepareSuspend with state {}", state());
+public void suspendDirty() {
+log.trace("No-op suspend dirty with state {}", state());
+if (state() == State.RUNNING) {
+transitionTo(State.SUSPENDED);
+}
 }
 
 @Override
-public void suspend() {
-log.trace("No-op suspend with state {}", state());
+public void suspendCleanAndPrepareCommit() {
+log.trace("No-op suspend clean with state {}", state());
+if (state() == State.RUNNING) {
+transitionTo(State.SUSPENDED);
+}
+prepareCommit();

Review comment:
   When we suspend, we _always_ want to commit.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##
@@ -151,69 +157,23 @@ public void postCommit() {
 }
 }
 
-@Override
-public void prepareCloseClean() {
-prepareClose(true);
-
-log.info("Prepared clean close");
-}
-
-@Override
-public void prepareCloseDirty() {
-prepareClose(false);
-
-log.info("Prepared dirty close");
-}
-
-/**
- * 1. commit if we are running and clean close;
- * 2. close the state manager.
- *
- * @throws TaskMigratedException all the task has been migrated
- * @throws StreamsException fatal error, should close the thread
- */
-private void prepareClose(final boolean clean) {

Review comment:
   This logic is now followed in `suspendCleanAndPrepareCommit()` that must 
be called before a task can be closed.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -246,82 +245,39 @@ public void completeRestoration() {
 }
 }
 
-/**
- * 
- * the following order must be followed:
- *  1. first close topology to make sure all cached records in the 
topology are processed
- *  2. then flush the state, send any left changelog records
- *  3. then flush the record collector
- *  4. then commit the record collector -- for EOS this is the 
synchronization barrier
- *  5. then checkpoint the state manager -- even if we crash before this 
step, EOS is still guaranteed
- * 
- *
- * @throws TaskMigratedException if committing offsets failed (non-EOS)
- *   or if the task producer got fenced (EOS)
- */
 @Override
-public void prepareSuspend() {

Review comment:
   This is now done via `suspendAndPrepareCommit()`

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -246,82 +245,39 @@ public void completeRestoration() {
 }
 }
 
-/**
- * 
- * the following order must be followed:
- *  1. first close topology to make sure all cached records in the 
topology are processed
- *  2. then flush the state, send any left changelog records
- *  3. then flush the record collector
- *  4. then commit the record collector -- for EOS this is the 
synchronization barrier
- *  5. then checkpoint the state manager -- even if we crash before this 
step, EOS is still guaranteed
- * 
- *
- * @throws TaskMigratedException if committing offsets failed (non-EOS)
- *   or if the task producer got fenced (EOS)
- */
 @Override
-public void prepareSuspend() {
-switch (state()) {
-case CREATED:
-case SUSPENDED:
-// do nothing
-log.trace("Skip prepare suspending since state is {}", 
state());
-
-break;
-
-case RESTORING:
-stateMgr.flush();
-log.info("Prepare suspending restoring");
-
-break;
-
-case RUNNING:
-closeTopology(true);
-
-stateMgr.flush();
-recordCollector.flush();
-
-log.info("Prepare suspending running");
-
-break;
-
-case CLOSED:
-throw new IllegalStateException("Illegal state " + state() + " 
while suspending active task " + id);
-
-default:
-throw new IllegalStateException("Unknown state " + state() + " 
while suspending active task " + id);
-}
+public void suspendDirty() {
+log.info("Suspending dirty");
+suspend(false);
 }
 
 @Override
-public void suspend() {
+public void suspendCleanAndPrepareCommit() {
+log.info("Suspending clean");
+suspend(true);
+

[GitHub] [kafka] guozhangwang commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management

2020-06-09 Thread GitBox

guozhangwang commented on a change in pull request #8833:
URL: https://github.com/apache/kafka/pull/8833#discussion_r437090376



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##
@@ -125,40 +126,21 @@ public boolean isValidTransition(final State newState) {
 
 void postCommit();
 
-/**
- * @throws TaskMigratedException all the task has been migrated
- * @throws StreamsException fatal error, should close the thread
- */
-void prepareSuspend();
+void suspendDirty();

Review comment:
   It seems to me that the reason we want to have two suspends and also 
merging the suspendClean with prepareCommit is that for `StreamTask`, if state 
`SUSPENDED` we want to skip prepareCommit. I feel it is a tad cleaner to 
separate them further into one `suspend` which does not try to call 
prepareCommit, and rely on whether prepareCommit should do anything or not 
based on both `state` (i.e. only running/restoring/suspended need to commit) 
and `commitNeeded` flag.
   
   With that we can convert the callers as follows:
   
   1. suspendDirty(): just call suspend(), do not call prepareCommit().
   2. suspendCleanAndPrepareCommit(): 
   2.a) from `task.closeAndRecycleState`: call suspend(), and then call 
prepareCommit(); the second would check `commitNeeded` and if it was false, we 
would not try to flush / commit. Hence if the task just transited from other 
states to suspended, then `commitNeeded` should still be true.
   2.b) from `taskManager` directly: same as above, but for this call we always 
follow with a `committableOffsetsAndMetadata` getting the map of offsets, so 
I'm thinking we can merge `prepareCommit` with `committableOffsetsAndMetadata` 
as well: if the state is right and `commitNeeded` is set, execute the prepare 
committing procedure, and accumulate the offsets, otherwise returning `null` 
indicating no offsets needed to be committed.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##
@@ -125,40 +126,21 @@ public boolean isValidTransition(final State newState) {
 
 void postCommit();
 
-/**

Review comment:
   I'm wondering if we could merge `committableOffsetsAndMetadata` with 
`prepareCommit` as well, letting the latter to return the map? See my other 
comment aside.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8676: KAFKA-10005: Decouple RestoreListener from RestoreCallback

2020-06-09 Thread GitBox

vvcephei commented on a change in pull request #8676:
URL: https://github.com/apache/kafka/pull/8676#discussion_r436755332



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
##
@@ -237,25 +236,6 @@ public void 
shouldNotRemoveStatisticsFromInjectedMetricsRecorderOnCloseWhenUserP
 verify(metricsRecorder);
 }
 
-@Test
-public void shouldRespectBulkloadOptionsDuringInit() {
-rocksDBStore.init(context, rocksDBStore);
-
-final StateRestoreListener restoreListener = 
context.getRestoreListener(rocksDBStore.name());
-
-restoreListener.onRestoreStart(null, rocksDBStore.name(), 0L, 0L);
-
-assertThat(rocksDBStore.getOptions().level0FileNumCompactionTrigger(), 
equalTo(1 << 30));

Review comment:
   Sounds good. I was thrown off by 
`userSpecifiedOptions.prepareForBulkLoad()` because it looks like the user 
could have specified some bulk loading options, but I took another look, and I 
see that that's just the variable name. The actual method is internal, and 
there's no way to configure bulk-loading options specifically as a user. So, 
I'm satisfied. Thanks!

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
##
@@ -29,9 +29,12 @@
  * Users desiring stateful operations will need to provide synchronization 
internally in
  * the {@code StateRestorerListener} implementation.
  *
- * When used for monitoring a single {@link StateStore} using either {@link 
AbstractNotifyingRestoreCallback} or
- * {@link AbstractNotifyingBatchingRestoreCallback} no synchronization is 
necessary
- * as each StreamThread has its own StateStore instance.
+ * Note that this listener is only registered at the per-client level and 
users can base on the {@code storeName}

Review comment:
   Thanks!

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
##
@@ -210,30 +207,6 @@ public void shouldFindSingleStoreForChangelog() {
 );
 }
 
-@Test
-public void shouldRestoreStoreWithRestoreCallback() {

Review comment:
   Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: MINOR: Do not disable heartbeat during Rebalance

2020-06-09 Thread GitBox

guozhangwang commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r437069073



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) {
 }
 
 private void recordRebalanceFailure() {
-state = MemberState.UNJOINED;

Review comment:
   This is a piggy-backed cleanup: we call resetGenerationXXX in the 
join/sync-group handler func and hence should not re-call it again here.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -652,9 +651,10 @@ public void handle(JoinGroupResponse joinResponse, 
RequestFuture fut
 } else if (error == Errors.MEMBER_ID_REQUIRED) {
 // Broker requires a concrete member id to be allowed to join 
the group. Update member id
 // and send another join group request in next cycle.
+String memberId = joinResponse.data().memberId();
+log.debug("Attempt to join group returned {} error. Will set 
the member id as {} and then rejoin", error, memberId);

Review comment:
   Minor log4j improvement.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8838: MINOR: fix flaky TopicCommandWithAdminClientTest.testDescribeUnderRep…

2020-06-09 Thread GitBox

chia7712 commented on a change in pull request #8838:
URL: https://github.com/apache/kafka/pull/8838#discussion_r437289845



##
File path: 
core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##
@@ -697,6 +697,10 @@ class TopicCommandWithAdminClientTest extends 
KafkaServerTestHarness with Loggin
 assertTrue(simpleDescribeOutputRows(0).startsWith(s"Topic: 
$testTopicName"))
 assertEquals(2, simpleDescribeOutputRows.size)
 
+// let's wait until the LAIR is NOT propagated
+TestUtils.waitUntilTrue(() => 
!adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get().containsKey(tp),

Review comment:
   make sure the reassignment is completed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8676: KAFKA-10005: Decouple RestoreListener from RestoreCallback

2020-06-09 Thread GitBox

guozhangwang commented on pull request #8676:
URL: https://github.com/apache/kafka/pull/8676#issuecomment-640892231


   Merged to trunk and cherry-picked to 2.6



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bellemare commented on pull request #8764: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes

2020-06-09 Thread GitBox

bellemare commented on pull request #8764:
URL: https://github.com/apache/kafka/pull/8764#issuecomment-640926702


   I swear it ran checkstyle when I compiled it... bah! I guess not. I did run 
./gradlew :streams:test it had one unrelated error (I think) so lets see how it 
does now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #8839: KIP-585: Documentation

2020-06-09 Thread GitBox

tombentley commented on pull request #8839:
URL: https://github.com/apache/kafka/pull/8839#issuecomment-641201269


   @kkonstantine please could you review this, thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #8805: KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group

2020-06-09 Thread GitBox

rhauch commented on a change in pull request #8805:
URL: https://github.com/apache/kafka/pull/8805#discussion_r436925315



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##
@@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
 // Base set: The previous assignment of connectors-and-tasks is a 
standalone snapshot that
 // can be used to calculate derived sets
 log.debug("Previous assignments: {}", previousAssignment);
+int lastCompletedGenerationId = 
coordinator.lastCompletedGenerationId();
+if (previousGenerationId != lastCompletedGenerationId) {
+log.debug("Emptying previous assignments due to generation 
mismatch between previous "
++ "generation ID {} and last completed generation ID {} 
since the last assignment: {}",
+previousGenerationId, lastCompletedGenerationId, 
previousAssignment);

Review comment:
   Can we make this a little easier to understand for most users? I think 
it might be sufficient to add some combination of:
   * what this means (e.g., the worker was partitioned and missed at least one 
rebalance rounds, likely due to a network issue), and 
   * what resulted (e.g., the workers gave up its tasks in case the cluster had 
reassigned them to another worker).
   
   And, should this be debug or info or warn? Warn seems wrong, since the user 
shouldn't do anything, but excessive #s of these could signal the need for 
additional tuning. WDYT?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##
@@ -361,6 +369,14 @@ protected void handleLostAssignments(ConnectorsAndTasks 
lostAssignments,
 log.debug("Found the following connectors and tasks missing from 
previous assignments: "
 + lostAssignments);
 
+if (previousMembers.size() == memberConfigs.size() && 
scheduledRebalance <= 0) {
+log.debug("Group size is same between rebalances. Lost assignments 
are probably due to lost SyncGroup "
++ "responses. Treating lost tasks as new tasks");

Review comment:
   Similar comment to that above.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##
@@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
 // Base set: The previous assignment of connectors-and-tasks is a 
standalone snapshot that
 // can be used to calculate derived sets
 log.debug("Previous assignments: {}", previousAssignment);
+int lastCompletedGenerationId = 
coordinator.lastCompletedGenerationId();
+if (previousGenerationId != lastCompletedGenerationId) {
+log.debug("Emptying previous assignments due to generation 
mismatch between previous "
++ "generation ID {} and last completed generation ID {} 
since the last assignment: {}",
+previousGenerationId, lastCompletedGenerationId, 
previousAssignment);

Review comment:
   Sounds good, though it'd be better to have a single (long) log message 
to prevent them from being separated by other log messages from other threads.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##
@@ -361,6 +369,14 @@ protected void handleLostAssignments(ConnectorsAndTasks 
lostAssignments,
 log.debug("Found the following connectors and tasks missing from 
previous assignments: "
 + lostAssignments);
 
+if (previousMembers.size() == memberConfigs.size() && 
scheduledRebalance <= 0) {
+log.debug("Group size is same between rebalances. Lost assignments 
are probably due to lost SyncGroup "
++ "responses. Treating lost tasks as new tasks");

Review comment:
   Sounds good.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##
@@ -361,6 +373,16 @@ protected void handleLostAssignments(ConnectorsAndTasks 
lostAssignments,
 log.debug("Found the following connectors and tasks missing from 
previous assignments: "
 + lostAssignments);
 
+if (previousMembers.size() == memberConfigs.size() && 
scheduledRebalance <= 0) {

Review comment:
   Is it enough to trust that the # of workers has not changed, or should 
we compare the members, via something like:
   ```
   if (previousMembers.equals(memberConfigs.keySet()) && scheduledRebalance <= 
0) {
   ```
   IOW, what happens if one worker disappeared about the same time that an 
operator added a new worker?
   
   IIUC from the integration tests, this logic 

[GitHub] [kafka] vvcephei commented on a change in pull request #8818: KAFKA-10086: Integration test for ensuring warmups are effective

2020-06-09 Thread GitBox

vvcephei commented on a change in pull request #8818:
URL: https://github.com/apache/kafka/pull/8818#discussion_r436758351



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -1084,12 +1088,15 @@ private boolean 
populateActiveTaskAndPartitionsLists(final List
 // If the partition is new to this consumer but is still owned 
by another, remove from the assignment
 // until it has been revoked and can safely be reassigned 
according to the COOPERATIVE protocol
 if (newPartitionForConsumer && 
allOwnedPartitions.contains(partition)) {
-log.info("Removing task {} from assignment until it is 
safely revoked in followup rebalance", taskId);
-clientState.unassignActive(taskId);

Review comment:
   Ah, I see. I'll put it back with a comment that we're just using it for 
the internal assertions.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -514,17 +515,24 @@ void handleLostAll() {
 
 /**
  * Compute the offset total summed across all stores in a task. Includes 
offset sum for any tasks we own the
- * lock for, which includes assigned and unassigned tasks we locked in 
{@link #tryToLockAllNonEmptyTaskDirectories()}
- *
- * @return Map from task id to its total offset summed across all state 
stores
+ * lock for, which includes assigned and unassigned tasks we locked in 
{@link #tryToLockAllNonEmptyTaskDirectories()}.
+ * Does not include stateless or non-logged tasks.
  */
 public Map getTaskOffsetSums() {
 final Map taskOffsetSums = new HashMap<>();
 
-for (final TaskId id : lockedTaskDirectories) {
+// Not all tasks will create directories, and there may be directories 
for tasks we don't currently own,
+// so we consider all tasks that are either owned or on disk. This 
includes stateless tasks, which should
+// just have an empty changelogOffsets map.
+for (final TaskId id : union(HashSet::new, lockedTaskDirectories, 
tasks.keySet())) {
 final Task task = tasks.get(id);
 if (task != null) {
-taskOffsetSums.put(id, sumOfChangelogOffsets(id, 
task.changelogOffsets()));
+final Map changelogOffsets = 
task.changelogOffsets();
+if (changelogOffsets.isEmpty()) {
+log.debug("Skipping to encode apparently stateless (or 
non-logged) offset sum for task {}", id);

Review comment:
   Yep! We don't know that it's stateless, just that it didn't report any 
changelogs.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
##
@@ -142,4 +169,184 @@ public void shouldProperlyConfigureTheAssignor() throws 
NoSuchFieldException, Il
 assertThat(taskAssignor, instanceOf(MyTaskAssignor.class));
 }
 }
+
+@Test
+public void shouldScaleOutWithWarmupTasksAndInMemoryStores() throws 
InterruptedException {
+// NB: this test takes at least a minute to run, because it needs a 
probing rebalance, and the minimum
+// value is one minute
+shouldScaleOutWithWarmupTasks(storeName -> 
Materialized.as(Stores.inMemoryKeyValueStore(storeName)));
+}
+
+@Test
+public void shouldScaleOutWithWarmupTasksAndPersistentStores() throws 
InterruptedException {
+// NB: this test takes at least a minute to run, because it needs a 
probing rebalance, and the minimum

Review comment:
   https://en.wikipedia.org/wiki/Nota_bene :) 

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
##
@@ -142,4 +169,184 @@ public void shouldProperlyConfigureTheAssignor() throws 
NoSuchFieldException, Il
 assertThat(taskAssignor, instanceOf(MyTaskAssignor.class));
 }
 }
+
+@Test
+public void shouldScaleOutWithWarmupTasksAndInMemoryStores() throws 
InterruptedException {
+// NB: this test takes at least a minute to run, because it needs a 
probing rebalance, and the minimum
+// value is one minute
+shouldScaleOutWithWarmupTasks(storeName -> 
Materialized.as(Stores.inMemoryKeyValueStore(storeName)));
+}
+
+@Test
+public void shouldScaleOutWithWarmupTasksAndPersistentStores() throws 
InterruptedException {
+// NB: this test takes at least a minute to run, because it needs a 
probing rebalance, and the minimum
+// value is one minute
+shouldScaleOutWithWarmupTasks(storeName -> 
Materialized.as(Stores.persistentKeyValueStore(storeName)));
+}
+
+private void shouldScaleOutWithWarmupTasks(final Function>> 
materializedFunction) throws InterruptedException {
+final String testId = safeUniqueTestName(getClass(), testName);
+final St

[GitHub] [kafka] guozhangwang commented on pull request #8816: MINOR: Print all removed dynamic members during join complete

2020-06-09 Thread GitBox

guozhangwang commented on pull request #8816:
URL: https://github.com/apache/kafka/pull/8816#issuecomment-640934122







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

2020-06-09 Thread GitBox

feyman2016 commented on a change in pull request #8832:
URL: https://github.com/apache/kafka/pull/8832#discussion_r437529829



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -147,6 +147,46 @@ public String toString() {
 }
 }
 
+/**
+ * TopicNode is the a topic node abstraction for graph built with 
TopicNode and TopicsInfo, the graph is useful

Review comment:
   Fixed

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -147,6 +147,46 @@ public String toString() {
 }
 }
 
+/**
+ * TopicNode is the a topic node abstraction for graph built with 
TopicNode and TopicsInfo, the graph is useful

Review comment:
   Updated

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int 
minReceivedMetadataVersion,
  */
 private void setRepartitionTopicMetadataNumberOfPartitions(final 
Map repartitionTopicMetadata,
final 
Map topicGroups,
-   final Cluster 
metadata) {
-boolean numPartitionsNeeded;
-do {
-numPartitionsNeeded = false;
-
-for (final TopicsInfo topicsInfo : topicGroups.values()) {
-for (final String topicName : 
topicsInfo.repartitionSourceTopics.keySet()) {
-final Optional maybeNumPartitions = 
repartitionTopicMetadata.get(topicName)
- 
.numberOfPartitions();
-Integer numPartitions = null;
-
-if (!maybeNumPartitions.isPresent()) {
-// try set the number of partitions for this 
repartition topic if it is not set yet
-for (final TopicsInfo otherTopicsInfo : 
topicGroups.values()) {
-final Set otherSinkTopics = 
otherTopicsInfo.sinkTopics;
-
-if (otherSinkTopics.contains(topicName)) {
-// if this topic is one of the sink topics of 
this topology,
-// use the maximum of all its source topic 
partitions as the number of partitions
-for (final String sourceTopicName : 
otherTopicsInfo.sourceTopics) {
-Integer numPartitionsCandidate = null;
-// It is possible the sourceTopic is 
another internal topic, i.e,
-// map().join().join(map())
-if 
(repartitionTopicMetadata.containsKey(sourceTopicName)) {
-if 
(repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent())
 {
-numPartitionsCandidate =
-
repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get();
-}
-} else {
-final Integer count = 
metadata.partitionCountForTopic(sourceTopicName);
-if (count == null) {
-throw new IllegalStateException(
-"No partition count found for 
source topic "
-+ sourceTopicName
-+ ", but it should have 
been."
-);
-}
-numPartitionsCandidate = count;
-}
-
-if (numPartitionsCandidate != null) {
-if (numPartitions == null || 
numPartitionsCandidate > numPartitions) {
-numPartitions = 
numPartitionsCandidate;
-}
-}
-}
-}
-}
-
-// if we still have not found the right number of 
partitions,
-// another iteration is needed
-if (numPartitions == null) {
-numPartitionsNeeded = true;
-} else {
-
repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
-}
-}
+

[GitHub] [kafka] xvrl commented on pull request #8800: MINOR: Fix incorrect GC log size with JDK9+

2020-06-09 Thread GitBox

xvrl commented on pull request #8800:
URL: https://github.com/apache/kafka/pull/8800#issuecomment-640914664


   @ijuma tested this change locally, broker starts up fine and gc logs show 
up. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wicknicks commented on a change in pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter

2020-06-09 Thread GitBox

wicknicks commented on a change in pull request #8829:
URL: https://github.com/apache/kafka/pull/8829#discussion_r436843713



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##
@@ -87,6 +87,12 @@ public RetryWithToleranceOperator(long errorRetryTimeout, 
long errorMaxDelayInMi
 public Future executeFailed(Stage stage, Class executingClass,
   ConsumerRecord 
consumerRecord,
   Throwable error) {
+if (!withinToleranceLimits()) {
+errorHandlingMetrics.recordFailure();
+markAsFailed();
+throw new ConnectException("Tolerance exceeded in the errant 
record reporter", error);
+}
+

Review comment:
   if we attempt an operation and it fails, `recordFailure` will be 
incremented, but `recordError` only tracks the cases where the when we 
encounter a problem that the framework cannot retry or skip. In the first case, 
we may still be able to retry or skip the record. In the `executeFailed` 
scenario, we should `recordFailure()` every time, and only `recordError` only 
when we have to fail the task.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
##
@@ -87,6 +87,12 @@ public RetryWithToleranceOperator(long errorRetryTimeout, 
long errorMaxDelayInMi
 public Future executeFailed(Stage stage, Class executingClass,
   ConsumerRecord 
consumerRecord,
   Throwable error) {
+if (!withinToleranceLimits()) {
+errorHandlingMetrics.recordFailure();
+markAsFailed();
+throw new ConnectException("Tolerance exceeded in the errant 
record reporter", error);

Review comment:
   Since this is called from the task(), is it enough to just raise an 
exception? that may be swallowed by the task, and could continue processing. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

2020-06-09 Thread GitBox

mjsax merged pull request #8803:
URL: https://github.com/apache/kafka/pull/8803


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bellemare commented on a change in pull request #8764: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes

2020-06-09 Thread GitBox

bellemare commented on a change in pull request #8764:
URL: https://github.com/apache/kafka/pull/8764#discussion_r436871862



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
##
@@ -243,17 +244,17 @@ private void validateTopologyCanProcessData(final 
StreamsBuilder builder) {
 final String safeTestName = safeUniqueTestName(getClass(), testName);
 config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-" + 
safeTestName);
 config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
-config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class.getName());
+config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class.getName());
 config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class.getName());
 config.setProperty(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
 try (final TopologyTestDriver topologyTestDriver = new 
TopologyTestDriver(builder.build(), config)) {
-final TestInputTopic aTopic = 
topologyTestDriver.createInputTopic("A", new StringSerializer(), new 
StringSerializer());
-final TestInputTopic bTopic = 
topologyTestDriver.createInputTopic("B", new StringSerializer(), new 
StringSerializer());
-final TestOutputTopic output = 
topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new 
StringDeserializer());
-aTopic.pipeInput("a1", "b1-alpha");
-bTopic.pipeInput("b1", "beta");
-final Map x = output.readKeyValuesToMap();
-assertThat(x, is(Collections.singletonMap("a1", 
"(b1-alpha,(b1-alpha,beta))")));
+final TestInputTopic aTopic = 
topologyTestDriver.createInputTopic("A", new IntegerSerializer(), new 
StringSerializer());
+final TestInputTopic bTopic = 
topologyTestDriver.createInputTopic("B", new IntegerSerializer(), new 
StringSerializer());
+final TestOutputTopic output = 
topologyTestDriver.createOutputTopic("output", new IntegerDeserializer(), new 
StringDeserializer());
+aTopic.pipeInput(1, "1-alpha");
+bTopic.pipeInput(1, "beta");

Review comment:
   Agreed. Good catch.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
##
@@ -181,60 +182,60 @@ public void shouldWorkWithDefaultAndProducedSerdes() {
 public void shouldUseExpectedTopicsWithSerde() {
 final String applicationId = "ktable-ktable-joinOnForeignKey";
 final Properties streamsConfig = mkProperties(mkMap(
-mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId),
-mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:"),
-mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath())
+mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId),
+mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:"),
+mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath())
 ));
 
 final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
 final StreamsBuilder builder = new StreamsBuilder();
 
-final KTable left = builder.table(
-LEFT_TABLE,
-Consumed.with(serdeScope.decorateSerde(Serdes.String(), 
streamsConfig, true),
-  serdeScope.decorateSerde(Serdes.String(), 
streamsConfig, false))
+final KTable left = builder.table(
+LEFT_TABLE,
+Consumed.with(serdeScope.decorateSerde(Serdes.Integer(), 
streamsConfig, true),
+serdeScope.decorateSerde(Serdes.String(), 
streamsConfig, false))
 );
-final KTable right = builder.table(
-RIGHT_TABLE,
-Consumed.with(serdeScope.decorateSerde(Serdes.String(), 
streamsConfig, true),
-  serdeScope.decorateSerde(Serdes.String(), 
streamsConfig, false))
+final KTable right = builder.table(
+RIGHT_TABLE,
+Consumed.with(serdeScope.decorateSerde(Serdes.Integer(), 
streamsConfig, true),
+serdeScope.decorateSerde(Serdes.String(), 
streamsConfig, false))
 );
 
 left.join(
-right,
-value -> value.split("\\|")[1],
-(value1, value2) -> "(" + value1 + "," + value2 + ")",
-Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), 
streamsConfig, false)
-))
-.toStream()
-.to(OUTPUT);
+right,
+value -> Integer.parseInt(value.split("\\|")[1]),
+(value1, value2) -> "(" + value1 + "," + value2 + ")",
+ 

[GitHub] [kafka] abbccdda commented on a change in pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

2020-06-09 Thread GitBox

abbccdda commented on a change in pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#discussion_r436800666



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##
@@ -1050,4 +1062,43 @@ public String toString() {
 '}';
 }
 }
+
+public static class LogTruncation {
+public final TopicPartition topicPartition;
+public final FetchPosition fetchPosition;
+public final Optional divergentOffsetOpt;
+
+public LogTruncation(TopicPartition topicPartition,
+ FetchPosition fetchPosition,
+ Optional divergentOffsetOpt) {
+this.topicPartition = topicPartition;
+this.fetchPosition = fetchPosition;

Review comment:
   After a second thought, I don't feel strong about it. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] msilb commented on pull request #1596: KAFKA-1543: Change replication factor during partition map generation

2020-06-09 Thread GitBox

msilb commented on pull request #1596:
URL: https://github.com/apache/kafka/pull/1596#issuecomment-641312887


   I see the parent issue has been open since 2014. Any idea when this finally 
is going to be implemented?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8788: MINOR: Remove unused isSticky assert out from tests only do constrainedAssign

2020-06-09 Thread GitBox

mjsax commented on pull request #8788:
URL: https://github.com/apache/kafka/pull/8788#issuecomment-640741662


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #7384: KAFKA-8938 - Connect - Improve Allocations During Struct Validation

2020-06-09 Thread GitBox

kkonstantine commented on pull request #7384:
URL: https://github.com/apache/kafka/pull/7384#issuecomment-641103428


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9923) Join window store duplicates can be compacted in changelog

2020-06-09 Thread Sophie Blee-Goldman (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-9923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9923:
---
Fix Version/s: (was: 2.6.0)

> Join window store duplicates can be compacted in changelog 
> ---
>
> Key: KAFKA-9923
> URL: https://issues.apache.org/jira/browse/KAFKA-9923
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Bruno Cadonna
>Priority: Blocker
>
> Stream-stream joins use the regular `WindowStore` implementation but with 
> `retainDuplicates` set to true. To allow for duplicates while using the same 
> unique-key underlying stores we just wrap the key with an incrementing 
> sequence number before inserting it.
> This wrapping occurs at the innermost layer of the store hierarchy, which 
> means the duplicates must first pass through the changelogging layer. At this 
> point the keys are still identical. So, we end up sending the records to the 
> changelog without distinct keys and therefore may lose the older of the 
> duplicates during compaction.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9923) Join window store duplicates can be compacted in changelog

2020-06-09 Thread Sophie Blee-Goldman (Jira)

 [ 
https://issues.apache.org/jira/browse/KAFKA-9923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-9923.

Resolution: Not A Problem

[~cadonna] pointed out that this actually ins't a problem/has been fixed by 
KAFKA-5804. We could certainly stand to do some cleaning up around the 
duplicates handling but at least we aren't losing data!

> Join window store duplicates can be compacted in changelog 
> ---
>
> Key: KAFKA-9923
> URL: https://issues.apache.org/jira/browse/KAFKA-9923
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Bruno Cadonna
>Priority: Blocker
> Fix For: 2.6.0
>
>
> Stream-stream joins use the regular `WindowStore` implementation but with 
> `retainDuplicates` set to true. To allow for duplicates while using the same 
> unique-key underlying stores we just wrap the key with an incrementing 
> sequence number before inserting it.
> This wrapping occurs at the innermost layer of the store hierarchy, which 
> means the duplicates must first pass through the changelogging layer. At this 
> point the keys are still identical. So, we end up sending the records to the 
> changelog without distinct keys and therefore may lose the older of the 
> duplicates during compaction.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it

2020-06-09 Thread GitBox

mjsax commented on pull request #8803:
URL: https://github.com/apache/kafka/pull/8803#issuecomment-640934234


   Merged to `trunk` and cherry-picked to `2.6` branch.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories

2020-06-09 Thread GitBox

ijuma commented on a change in pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#discussion_r436792477



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2237,7 +2209,11 @@ class Log(@volatile private var _dir: File,
 def deleteSegments(): Unit = {
   info(s"Deleting segments ${segments.mkString(",")}")
   maybeHandleIOException(s"Error while deleting segments for 
$topicPartition in dir ${dir.getParent}") {
-segments.foreach(_.deleteIfExists())
+segments.foreach {
+  case segment =>

Review comment:
   Nit: `case` and newline are unnecessary here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)

2020-06-09 Thread GitBox

junrao commented on a change in pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#discussion_r437049072



##
File path: core/src/main/scala/kafka/zk/ZkData.scala
##
@@ -81,17 +83,26 @@ object BrokerIdsZNode {
 object BrokerInfo {
 
   /**
-   * Create a broker info with v4 json format (which includes multiple 
endpoints and rack) if
-   * the apiVersion is 0.10.0.X or above. Register the broker with v2 json 
format otherwise.
+   * - Create a broker info with v5 json format if the apiVersion is 2.6.x or 
above.

Review comment:
   2.6.x => 2.7.x

##
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler}
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+import scala.util.control.Exception.ignoring
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce an optional latch that can be used to notify 
the caller when an
+   *updateOrThrow() operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+/**
+ * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+ * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+ * exception is raised.
+ *
+ * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked exactly
+ * once successfully. A subsequent invocation will raise an exception.
+ *
+ * @throws   IllegalStateException, if a non-empty notifier was provided 
in the constructor, and
+ *   this method is called again after a successful previous 
invocation.
+ * @throws   FeatureCacheUpdateException, if there was an error in 
updating the
+ *   FinalizedFeatureCache.
+ * @throws   RuntimeException, if there was a failure in 
reading/deserializing the
+ *   contents of the feature ZK node.
+ */
+def updateLatestOrThrow(): Unit = {
+  maybeNotifyOnce.foreach(notifier => {
+if (notifier.getCount != 1) {
+  throw new IllegalStateException(
+"Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+}
+  })
+
+  debug(s"Reading feature ZK node at path: $featureZkNodePath")
+  val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+
+  // There are 4 cases:
+  //
+  // (empty dataBytes, valid version)   => The empty dataBytes will 
fail FeatureZNode deserialization.
+  //   FeatureZNode, when present 
in ZK, can not have empty contents.
+  // (non-empty dataBytes, valid version)   => This is a valid case, and 
should pass FeatureZNode deserialization
+  //   if dataBytes contains valid 
data.
+  // (empty dataBytes, unknown version) => This is a valid case, and 
this can happen if the FeatureZNode
+  //   does not exist in ZK.
+  // (non-empty dataBytes,

[GitHub] [kafka] chia7712 opened a new pull request #8837: KAFKA-10125 The partition which is removing should be considered to b…

2020-06-09 Thread GitBox

chia7712 opened a new pull request #8837:
URL: https://github.com/apache/kafka/pull/8837


   When a reassignment is still in progress, the replica which is either 
removing or adding should be considered to be under reassignment. However, 
TopicCommand still print the partition which is removing.
   
   issue: https://issues.apache.org/jira/browse/KAFKA-10125
   related to d63e0181bb7b9b4f5ed088abc00d7b32aeb0
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #8802: MINOR: Fix fetch session epoch comment in `FetchRequest.json`

2020-06-09 Thread GitBox

hachikuji merged pull request #8802:
URL: https://github.com/apache/kafka/pull/8802


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8764: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes

2020-06-09 Thread GitBox

vvcephei commented on pull request #8764:
URL: https://github.com/apache/kafka/pull/8764#issuecomment-640686536







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Lucent-Wong commented on pull request #8453: KAFKA-9841: Connector and Task duplicated when a worker join with old…

2020-06-09 Thread GitBox

Lucent-Wong commented on pull request #8453:
URL: https://github.com/apache/kafka/pull/8453#issuecomment-641192263







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #8823: MINOR: fix HTML markup

2020-06-09 Thread GitBox

bbejeck commented on pull request #8823:
URL: https://github.com/apache/kafka/pull/8823#issuecomment-640729994


   Merged #8823 into trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] sneakyburro commented on pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

2020-06-09 Thread GitBox

sneakyburro commented on pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#issuecomment-640936812


   > @sneakyburro -- Just a heads up: code freeze for 2.6 release in on 
Wednesday. If you want to get the PR into the release, please address the 
review comments soon. Otherwise, this will slip and go into 2.7.
   
   I'll update the PR tonight.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8818: KAFKA-10086: Integration test for ensuring warmups are effective

2020-06-09 Thread GitBox

ableegoldman commented on a change in pull request #8818:
URL: https://github.com/apache/kafka/pull/8818#discussion_r436825265



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -86,7 +87,7 @@
 private boolean rebalanceInProgress = false;  // if we are in the middle 
of a rebalance, it is not safe to commit
 
 // includes assigned & initialized tasks and unassigned tasks we locked 
temporarily during rebalance
-private Set lockedTaskDirectories = new HashSet<>();
+private final Set lockedTaskDirectories = new HashSet<>();

Review comment:
   Has checkstyle just been dropping the ball lately? Could we be 
unknowingly suppressing this...?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
##
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener;
+import 
org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.util.Collection;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+@Category(IntegrationTest.class)
+public class HighAvailabilityTaskAssignorIntegrationTest {

Review comment:
   Cool, I was kind of hoping you would put this in a separate integration 
test class





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs

2020-06-09 Thread GitBox

mjsax commented on pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#issuecomment-640935840


   @sneakyburro -- Just a heads up: code freeze for 2.6 release in on 
Wednesday. If you want to get the PR into the release, please address the 
review comments soon. Otherwise, this will slip and go into 2.7.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rgroothuijsen commented on a change in pull request #8664: KAFKA-9716: Clarify meaning of compression rate metrics

2020-06-09 Thread GitBox

rgroothuijsen commented on a change in pull request #8664:
URL: https://github.com/apache/kafka/pull/8664#discussion_r436973523



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java
##
@@ -84,7 +84,7 @@ public SenderMetricsRegistry(Metrics metrics) {
 this.batchSizeMax = createMetricName("batch-size-max",
 "The max number of bytes sent per partition per-request.");
 this.compressionRateAvg = createMetricName("compression-rate-avg",
-"The average compression rate of record batches.");
+"The average compressed-to-uncompressed size ratio of record 
batches.");

Review comment:
   I was in fact going for brevity with my approach, but if more detail 
will mean increased clarity for the user then I'm all for it. With the way 
you've defined it, I think there can be little question regarding the meaning.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8834: MINOR: Do not disable heartbeat during Rebalance

2020-06-09 Thread GitBox

abbccdda commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r437110527



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##
@@ -604,6 +605,25 @@ public void 
testSyncGroupIllegalGenerationResponseWithOldGeneration() throws Int
 assertEquals(newGen, coordinator.generation());
 }
 
+@Test
+public void testHeartbeatSentWhenRebalancing() throws Exception {
+setupCoordinator();
+joinGroup();
+
+final AbstractCoordinator.Generation currGen = 
coordinator.generation();
+
+coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
+
+// the heartbeat thread should be sent out during a rebalance

Review comment:
   remove `thread`

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -652,9 +651,10 @@ public void handle(JoinGroupResponse joinResponse, 
RequestFuture fut
 } else if (error == Errors.MEMBER_ID_REQUIRED) {
 // Broker requires a concrete member id to be allowed to join 
the group. Update member id
 // and send another join group request in next cycle.
+String memberId = joinResponse.data().memberId();
+log.debug("Attempt to join group returned {} error. Will set 
the member id as {} and then rejoin", error, memberId);

Review comment:
   There might be slight performance gain if we just say "Attempt to join 
group and receive member id required error." instead of passing in the error.

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -287,7 +287,7 @@ class GroupCoordinator(val brokerId: Int,
 
   group.currentState match {
 case PreparingRebalance =>
-  updateMemberAndRebalance(group, member, protocols, 
responseCallback)
+  updateMemberAndRebalance(group, member, protocols, s"Member 
${member.memberId} joining group during ${group.currentState}", 
responseCallback)

Review comment:
   Just to confirm, this file only has logging changes right?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -1069,6 +1069,13 @@ private HeartbeatResponseHandler(final Generation 
generation) {
 public void handle(HeartbeatResponse heartbeatResponse, 
RequestFuture future) {
 sensors.heartbeatSensor.record(response.requestLatencyMs());
 Errors error = heartbeatResponse.error();
+
+if (state != MemberState.STABLE) {

Review comment:
   We should still handle fatal exception IMHO, such as 
FencedInstanceIdException

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) {
 }
 
 private void recordRebalanceFailure() {
-state = MemberState.UNJOINED;

Review comment:
   Comment here as no better place: on L485 we have this logic:
   ```
   if (joinFuture == null) {
   // fence off the heartbeat thread explicitly so that it cannot 
interfere with the join group.
   // Note that this must come after the call to onJoinPrepare 
since we must be able to continue
   // sending heartbeats if that callback takes some time.
   disableHeartbeatThread();
   ```
   As we are ensuring the heartbeat thread working during rebalance, will this 
case be a bit dangerous for heartbeat disabling? Maybe we could also do a check 
of member status here to decide whether to disable.
   

##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##
@@ -604,6 +605,25 @@ public void 
testSyncGroupIllegalGenerationResponseWithOldGeneration() throws Int
 assertEquals(newGen, coordinator.generation());
 }
 
+@Test
+public void testHeartbeatSentWhenRebalancing() throws Exception {
+setupCoordinator();
+joinGroup();
+
+final AbstractCoordinator.Generation currGen = 
coordinator.generation();
+
+coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING);
+
+// the heartbeat thread should be sent out during a rebalance
+mockTime.sleep(HEARTBEAT_INTERVAL_MS);
+TestUtils.waitForCondition(() -> !mockClient.requests().isEmpty(), 
2000,
+"The heartbeat request was not sent");
+assertTrue(coordinator.heartbeat().hasInflight());
+
+mockClient.respond(heartbeatResponse(Errors.REBALANCE_IN_PROGRESS));

Review comment:
   Why do we need to respond?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##

[GitHub] [kafka] chia7712 opened a new pull request #8838: MINOR: fix flaky TopicCommandWithAdminClientTest.testDescribeUnderRep…

2020-06-09 Thread GitBox

chia7712 opened a new pull request #8838:
URL: https://github.com/apache/kafka/pull/8838


   
```TopicCommandWithAdminClientTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress```
 fails frequently on my local.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy merged pull request #8800: MINOR: Fix incorrect GC log size with JDK9+

2020-06-09 Thread GitBox

omkreddy merged pull request #8800:
URL: https://github.com/apache/kafka/pull/8800


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Lucent-Wong edited a comment on pull request #8453: KAFKA-9841: Connector and Task duplicated when a worker join with old…

2020-06-09 Thread GitBox

Lucent-Wong edited a comment on pull request #8453:
URL: https://github.com/apache/kafka/pull/8453#issuecomment-641192263


   Looks like the build failure is not related with my changes.
   
   
   > Task :streams:test-utils:integrationTest
   
   > Task :streams:upgrade-system-tests-0100:integrationTest
   
   > Task :streams:upgrade-system-tests-0101:integrationTest
   
   > Task :streams:upgrade-system-tests-0102:integrationTest
   
   > Task :streams:upgrade-system-tests-0110:integrationTest
   
   > Task :streams:upgrade-system-tests-10:integrationTest
   
   > Task :streams:upgrade-system-tests-11:integrationTest
   
   > Task :streams:upgrade-system-tests-20:integrationTest
   
   > Task :streams:upgrade-system-tests-21:integrationTest
   
   > Task :streams:upgrade-system-tests-22:integrationTest
   
   > Task :streams:upgrade-system-tests-23:integrationTest
   
   > Task :streams:upgrade-system-tests-24:integrationTest
   
   > Task :streams:upgrade-system-tests-25:integrationTest
   
   
   
   FAILURE: Build failed with an exception.
   
   
   
   * What went wrong:
   
   Execution failed for task ':streams:unitTest'.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #8805: KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group

2020-06-09 Thread GitBox

kkonstantine commented on a change in pull request #8805:
URL: https://github.com/apache/kafka/pull/8805#discussion_r436950305



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##
@@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
 // Base set: The previous assignment of connectors-and-tasks is a 
standalone snapshot that
 // can be used to calculate derived sets
 log.debug("Previous assignments: {}", previousAssignment);
+int lastCompletedGenerationId = 
coordinator.lastCompletedGenerationId();
+if (previousGenerationId != lastCompletedGenerationId) {
+log.debug("Emptying previous assignments due to generation 
mismatch between previous "
++ "generation ID {} and last completed generation ID {} 
since the last assignment: {}",
+previousGenerationId, lastCompletedGenerationId, 
previousAssignment);

Review comment:
   This is related to the leader's internal bookkeeping when it calculates 
a new assignment. It's not related to the tasks that a worker (even the leader) 
is actually running. 
   
   Emptying/clearing the previous assignment might result in some tasks 
shuffling around, because the leader will calculate an assignment from scratch, 
but it doesn't affect running tasks. The new computed assignment will send 
assignment and/or revocations as needed based on a) what tasks each worker has 
reported running in this round and which tasks are configured in the config 
topic. Another way to say this is that the leader won't bother detecting lost 
tasks in this round. Every unassigned task will be treated as a new task. 
   
   You are right on the log message not conveying that meaning exactly. How 
about: 
   ```
   log.debug("Clearing the view of previous assignments due to generation 
mismatch between "
   + "previous generation ID {} and last completed 
generation ID {}. ",
   previousGenerationId, lastCompletedGenerationId);
   log.debug("This can happen if the leader fails to sync the assignment within 
a " 
   + "rebalancing round. The following view of previous 
assignments might be "
   + "outdated and will be ignored by the leader in the 
current computation of " 
   + "new assignments. Possibly outdated previous 
assignments: {}", previousAssignment);
   ```
   
   
   

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##
@@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
 // Base set: The previous assignment of connectors-and-tasks is a 
standalone snapshot that
 // can be used to calculate derived sets
 log.debug("Previous assignments: {}", previousAssignment);
+int lastCompletedGenerationId = 
coordinator.lastCompletedGenerationId();
+if (previousGenerationId != lastCompletedGenerationId) {
+log.debug("Emptying previous assignments due to generation 
mismatch between previous "
++ "generation ID {} and last completed generation ID {} 
since the last assignment: {}",
+previousGenerationId, lastCompletedGenerationId, 
previousAssignment);

Review comment:
   Also, given that the previous assignments are printed in `debug`, I 
think it makes sense to keep these log messages in debug as well. 

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##
@@ -361,6 +369,14 @@ protected void handleLostAssignments(ConnectorsAndTasks 
lostAssignments,
 log.debug("Found the following connectors and tasks missing from 
previous assignments: "
 + lostAssignments);
 
+if (previousMembers.size() == memberConfigs.size() && 
scheduledRebalance <= 0) {
+log.debug("Group size is same between rebalances. Lost assignments 
are probably due to lost SyncGroup "
++ "responses. Treating lost tasks as new tasks");

Review comment:
   How about: 
   
   ```
   log.debug("The number of workers remained the same between rebalances. The 
missing " 
   + "assignments that the leader is detecting are probably 
due to some workers " 
   + "failing to receive the new assignments in the 
previous rebalance. Will "
   + "reassign missing tasks as new tasks");
   ```

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##
@@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
 // Base set: The pr

[GitHub] [kafka] guozhangwang merged pull request #8676: KAFKA-10005: Decouple RestoreListener from RestoreCallback

2020-06-09 Thread GitBox

guozhangwang merged pull request #8676:
URL: https://github.com/apache/kafka/pull/8676


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #8783: KAFKA-10063 UnsupportedOperation when querying cleaner metrics after …

2020-06-09 Thread GitBox

hachikuji merged pull request #8783:
URL: https://github.com/apache/kafka/pull/8783


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8676: KAFKA-10005: Decouple RestoreListener from RestoreCallback

2020-06-09 Thread GitBox

ableegoldman commented on a change in pull request #8676:
URL: https://github.com/apache/kafka/pull/8676#discussion_r436858916



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java
##
@@ -34,7 +34,6 @@
 /**
  * Unregisters and removes the passed in partitions from the set of 
changelogs
  * @param removedPartitions the set of partitions to remove
- * @param triggerOnRestoreEnd whether to trigger the onRestoreEnd callback
  */
-void unregister(final Collection removedPartitions, final 
boolean triggerOnRestoreEnd);
+void unregister(final Collection removedPartitions);

Review comment:
   Nice, this was kind of an ugly hack to being with so I'm happy to see it 
go





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8821: [DO NOT MERGE] Reenable flaky EosBetaUpgradeIntegrationTest

2020-06-09 Thread GitBox

mjsax commented on pull request #8821:
URL: https://github.com/apache/kafka/pull/8821#issuecomment-641022736







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang opened a new pull request #8834: MINOR: Do not disable heartbeat during Rebalance

2020-06-09 Thread GitBox

guozhangwang opened a new pull request #8834:
URL: https://github.com/apache/kafka/pull/8834


   1. Allow the heartbeat thread to send hb request during rebalance; in turn 
when handling responses if the state is not in STABLE ignore the error.
   
   2. Piggy-backing a log4j improvement on the broker coordinator for 
triggering rebalance reason.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bob-barrett commented on a change in pull request #8239: KAFKA-9666: Don't increase transactional epoch when trying to fence if the log append fails

2020-06-09 Thread GitBox

bob-barrett commented on a change in pull request #8239:
URL: https://github.com/apache/kafka/pull/8239#discussion_r437077452



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##
@@ -487,6 +487,33 @@ class TransactionCoordinator(brokerId: Int,
   info(s"Aborting sending of transaction markers and returning 
$error error to client for $transactionalId's EndTransaction request of 
$txnMarkerResult, " +
 s"since appending $newMetadata to transaction log with 
coordinator epoch $coordinatorEpoch failed")
 
+  txnManager.getTransactionState(transactionalId).right.foreach {

Review comment:
   Thanks for the suggestion, @hachikuji! I wound up taking that approach.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 opened a new pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic

2020-06-09 Thread GitBox

feyman2016 opened a new pull request #8832:
URL: https://github.com/apache/kafka/pull/8832


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >