[jira] [Created] (KAFKA-10126) Remove unused options in ConsumerPerformance
jiamei xie created KAFKA-10126: -- Summary: Remove unused options in ConsumerPerformance Key: KAFKA-10126 URL: https://issues.apache.org/jira/browse/KAFKA-10126 Project: Kafka Issue Type: Bug Reporter: jiamei xie Assignee: jiamei xie Option numThreadsOpt and numFetchersOpt are unused in ConsumerPerformance. It's a waste of time to test performance vs threads number. So removing it is needed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7888) kafka cluster not recovering - Shrinking ISR from 14,13 to 13 (kafka.cluster.Partition) continously
[ https://issues.apache.org/jira/browse/KAFKA-7888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129031#comment-17129031 ] Youssef BOUZAIENNE commented on KAFKA-7888: --- I'm facing the same issue on 2.4.1 > kafka cluster not recovering - Shrinking ISR from 14,13 to 13 > (kafka.cluster.Partition) continously > --- > > Key: KAFKA-7888 > URL: https://issues.apache.org/jira/browse/KAFKA-7888 > Project: Kafka > Issue Type: Bug > Components: controller, replication, zkclient >Affects Versions: 2.1.0 > Environment: using kafka_2.12-2.1.0 > 3 ZKs 3 Broker cluster, using 3 boxes (1 ZK and 1 broker on each box), > default.replication factor: 2, > offset replication factor was 1 when the error happened, increased to 2 after > seeing this error by reassigning-partitions. > compression: default (producer) on broker but sending gzip from producers. > linux (redhat) etx4 kafka logs on single local disk >Reporter: Kemal ERDEN >Priority: Major > Attachments: combined.log, producer.log > > > we're seeing the following repeating logs on our kafka cluster from time to > time which seems to cause messages expiring on Producers and the cluster > going into a non-recoverable state. The only fix seems to be to restart > brokers. > {{Shrinking ISR from 14,13 to 13 (kafka.cluster.Partition)}} > {{Cached zkVersion [21] not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition)}} > and later on the following log is repeated: > {{Got user-level KeeperException when processing sessionid:0xe046aa4f8e6 > type:setData cxid:0x2df zxid:0xa01fd txntype:-1 reqpath:n/a Error > Path:/brokers/topics/ucTrade/partitions/6/state Error:KeeperErrorCode = > BadVersion for /brokers/topics/ucTrade/partitions/6/state}} > We haven't interfered with any of the brokers/zookeepers whilst this happened. > I've attached a combined log which represents a combination of controller, > server and state change logs from each broker (ids 13,14 and 15, log files > have the suffix b13, b14, b15 respectively) > We have increased the heaps from 1g to 6g for the brokers and from 512m to 4g > for the zookeepers since this happened but not sure if it is relevant. the ZK > logs are unfortunately overwritten so can't provide those. > We produce varying message sizes but some messages are relatively large (6mb) > but we use compression on the producers (set to gzip). > I've attached some logs from one of our producers as well. > producer.properties that we've changed: > spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer > spring.kafka.producer.compression-type=gzip > spring.kafka.producer.retries=5 > spring.kafka.producer.acks=-1 > spring.kafka.producer.batch-size=1048576 > spring.kafka.producer.properties.linger.ms=200 > spring.kafka.producer.properties.request.timeout.ms=60 > spring.kafka.producer.properties.max.block.ms=24 > spring.kafka.producer.properties.max.request.size=104857600 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Karthik updated KAFKA-7500: --- Comment: was deleted (was: Hi All, Im unable to find a document for MM2, Any leads to the documentation would be appreciated. Thanks in advance. ) > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Ryanne Dolan >Priority: Major > Labels: pull-request-available, ready-to-commit > Fix For: 2.4.0 > > Attachments: Active-Active XDCR setup.png > > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] > [https://github.com/apache/kafka/pull/6295] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10126) Remove unused options in ConsumerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129042#comment-17129042 ] Chia-Ping Tsai commented on KAFKA-10126: How about making numThreadsOpt work? maybe multiples consumers > Remove unused options in ConsumerPerformance > > > Key: KAFKA-10126 > URL: https://issues.apache.org/jira/browse/KAFKA-10126 > Project: Kafka > Issue Type: Bug >Reporter: jiamei xie >Assignee: jiamei xie >Priority: Major > > Option numThreadsOpt and numFetchersOpt are unused in ConsumerPerformance. > It's a waste of time to test performance vs threads number. So removing it is > needed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10127) kafka cluster not recovering - Shrinking ISR continously
Youssef BOUZAIENNE created KAFKA-10127: -- Summary: kafka cluster not recovering - Shrinking ISR continously Key: KAFKA-10127 URL: https://issues.apache.org/jira/browse/KAFKA-10127 Project: Kafka Issue Type: Bug Components: replication, zkclient Affects Versions: 2.4.1 Environment: using kafka version 2.4.1 and zookeeper version 3.5.7 Reporter: Youssef BOUZAIENNE We are actually facing issue from time to time where our kafka cluster goes into a weird state where we see the following log repeating [2020-06-06 08:35:48,117] INFO [Partition test broker=1002] Cached zkVersion 620 not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) [2020-06-06 08:35:48,117] INFO [Partition test broker=1002] Shrinking ISR from 1006,1002 to 1002. Leader: (highWatermark: 3222733572, endOffset: 3222741893). Out of sync replicas: (brokerId: 1006, endOffset: 3222733572). (kafka.cluster.Partition) Before that our zookeeper session expired which lead us to that state after we increased this two value we encounter the issue less frequently but it still appears from time to time and the only solution is restart of kafka service on all brokers zookeeper.session.timeout.ms=18000 replica.lag.time.max.ms=3 Any help on that please -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10126) Remove unused options in ConsumerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129056#comment-17129056 ] jiamei xie commented on KAFKA-10126: Ok. Thanks. I'll have a look at it. Use consumer group? It seems the consumer number can't be bigger than the partition number. So the thread number can't be bigger than partition number either. > Remove unused options in ConsumerPerformance > > > Key: KAFKA-10126 > URL: https://issues.apache.org/jira/browse/KAFKA-10126 > Project: Kafka > Issue Type: Bug >Reporter: jiamei xie >Assignee: jiamei xie >Priority: Major > > Option numThreadsOpt and numFetchersOpt are unused in ConsumerPerformance. > It's a waste of time to test performance vs threads number. So removing it is > needed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10126) Remove unused options in ConsumerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129067#comment-17129067 ] Chia-Ping Tsai commented on KAFKA-10126: > Use consumer group? yep. Should we enable user to control the number of groups ? (for example, 10 consumers with 2 groups) > It seems the consumer number can't be bigger than the partition number. maybe add new argument to create topic with specify number of partitions. It seems to me both numbers are important factor of performance. > So the thread number can't be bigger than partition number either. we can throw exception or print warning. BTW, this issue requires a KIP if you want to remove/add arguments :) > Remove unused options in ConsumerPerformance > > > Key: KAFKA-10126 > URL: https://issues.apache.org/jira/browse/KAFKA-10126 > Project: Kafka > Issue Type: Bug >Reporter: jiamei xie >Assignee: jiamei xie >Priority: Major > > Option numThreadsOpt and numFetchersOpt are unused in ConsumerPerformance. > It's a waste of time to test performance vs threads number. So removing it is > needed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10105) Regression in group coordinator dealing with flaky clients joining while leaving
[ https://issues.apache.org/jira/browse/KAFKA-10105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129070#comment-17129070 ] William Reynolds commented on KAFKA-10105: -- Hi James, that looks really similar. If it isn't the exact thing I would suspect that it is another symptom of the group coord change that we hit > Regression in group coordinator dealing with flaky clients joining while > leaving > > > Key: KAFKA-10105 > URL: https://issues.apache.org/jira/browse/KAFKA-10105 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.4.1 > Environment: Kafka 1.1.0 on jre 8 on debian 9 in docker > Kafka 2.4.1 on jre 11 on debian 9 in docker >Reporter: William Reynolds >Priority: Major > > Since upgrade of a cluster from 1.1.0 to 2.4.1 the broker no longer deals > correctly with a consumer sending a join after a leave correctly. > What happens no is that if a consumer sends a leaving then follows up by > trying to send a join again as it is shutting down the group coordinator adds > the leaving member to the group but never seems to heartbeat that member. > Since the consumer is then gone when it joins again after starting it is > added as a new member but the zombie member is there and is included in the > partition assignment which means that those partitions never get consumed > from. What can also happen is that one of the zombies gets group leader so > rebalance gets stuck forever and the group is entirely blocked. > I have not been able to track down where this got introduced between 1.1.0 > and 2.4.1 but I will look further into this. Unfortunately the logs are > essentially silent about the zombie mebers and I only had INFO level logging > on during the issue and by stopping all the consumers in the group and > restarting the broker coordinating that group we could get back to a working > state. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9935) Kafka not releasing member from Consumer Group
[ https://issues.apache.org/jira/browse/KAFKA-9935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129072#comment-17129072 ] William Reynolds commented on KAFKA-9935: - It does look related to me. Your reproduction looks similar to what we hit and also what it looks like to the tools > Kafka not releasing member from Consumer Group > -- > > Key: KAFKA-9935 > URL: https://issues.apache.org/jira/browse/KAFKA-9935 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.1 > Environment: Linux >Reporter: Steve Kecskes >Priority: Major > > Hello. I am experiencing an issue where Kafka is not releasing members from a > consumer group when the member crashes. The consumer group is then stuck in > rebalancing state indefinitely. > In this consumer group, there is only 1 client. This client has the following > related settings: > {code:java} > auto.commit.interval.ms = 5000 > auto.offset.reset = earliest > bootstrap.servers = [austgkafka01.hk.eclipseoptions.com:9092] > check.crcs = true > client.dns.lookup = default > client.id = TraderAutomationViewServer_workaround_stuck_rebalance_20200427-0 > connections.max.idle.ms = 54 > default.api.timeout.ms = 6 > enable.auto.commit = true > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = TraderAutomationViewServer_workaround_stuck_rebalance_20200427 > heartbeat.interval.ms = 3000 > interceptor.classes = [] > internal.leave.group.on.close = true > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 30 > max.poll.records = 1 > metadata.max.age.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor] > receive.buffer.bytes = 16777216 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 3 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 1 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = https > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > {code} > If the client crashes (not a graceful exit from group) the group remains in > the following state indefinitely. > {code} > Warning: Consumer group > 'TraderAutomationViewServer_workaround_stuck_rebalance' is rebalancing. > GROUP TOPIC > PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID > HOSTCLIENT-ID > TraderAutomationViewServer_workaround_stuck_rebalance EventAdjustedVols 10 > 6984061 7839599 855538 - - >- > TraderAutomationViewServer_workaround_stuck_rebalance VolMetrics8 > 128459531 143736443 15276912- - >- > TraderAutomationViewServer_workaround_stuck_rebalance EventAdjustedVols 12 > 7216495 8106030 889535 - - >- > TraderAutomationViewServer_workaround_stuck_rebalance VolMetrics6 > 122921729 137377358 14455629- - >- > TraderAutomationViewServer_workaround_stuck_rebalance EventAdjustedVols 14 > 5457618 6171142 713524 - - >- > TraderAutomationViewServer_workaround_stuck_rebalance VolMetrics4 > 125647891
[jira] [Commented] (KAFKA-10107) Producer snapshots LSO used in certain situations which can lead to data loss on compacted topics as LSO breach occurs and early offsets cleaned
[ https://issues.apache.org/jira/browse/KAFKA-10107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129077#comment-17129077 ] William Reynolds commented on KAFKA-10107: -- Still working on getting the logs out, apologies for delay > Producer snapshots LSO used in certain situations which can lead to data loss > on compacted topics as LSO breach occurs and early offsets cleaned > > > Key: KAFKA-10107 > URL: https://issues.apache.org/jira/browse/KAFKA-10107 > Project: Kafka > Issue Type: Bug > Components: core, log cleaner >Affects Versions: 2.4.1 > Environment: Kafka 1.1.0 on jre 8 on debian 9 in docker > Kafka 2.4.1 on jre 11 on debian 9 in docker >Reporter: William Reynolds >Priority: Major > > While upgading a 1.1.0 cluster to 2.4.1 and also adding an interbroker port > using SSL we ran into a situation where producer snapshot offsets get set as > the log start offset then logs truncate to nothing across 2 relatively unsafe > restarts. > > Here is the timeline of what we did to trigger this > Broker 40 is shutdown as first to go to 2.4.1 and switch to interbroker port > 9094. > As it shuts down it writes producer snapshots > Broker 40 starts on 2.4.1, loads the snapshots then compares checkpointed > offsets to log start offset and finds them to be invalid (exact reason > unknown but looks to be producer snapshot load related) > On broker 40 all topics show an offset reset like this 2020-05-18 > 15:22:21,106] WARN Resetting first dirty offset of topic-name-60 to log start > offset 6009368 since the checkpointed offset 5952382 is invalid. > (kafka.log.LogCleanerManager$)" which then triggers log cleanup on broker 40 > for all these topics which is where the data is lost > At this point only partitions led by broker 40 have lost data and would be > failing for client lookups on older data but this can't spread as 40 has > interbroker port 9094 and brokers 50 and 60 have interbroker port 9092 > I stop start brokers 50 and 60 in quick succession to take them to 2.4.1 and > onto the new interbroker port 9094 > This leaves broker 40 as the in sync replica for all but a couple of > partitions which aren't on 40 at all shown in the attached image > Brokers 50 and 60 start and then take their start offset from leader (or if > there was no leader pulls from recovery on returning broker 50 or 60) and so > all the replicas also clean logs to remove data to catch up to broker 40 as > that is the in sync replica > Then I shutdown 40 and 50 leading to 60 leading all partitions it holds and > then we see this happen across all of those partitions > "May 18, 2020 @ > 15:48:28.252",hostname-1,30438,apache-kafka:2.4.1,"[2020-05-18 15:48:28,251] > INFO [Log partition=topic-name-60, dir=/kafka-topic-data] Loading producer > state till offset 0 with message format version 2 (kafka.log.Log)" > "May 18, 2020 @ > 15:48:28.252",hostname-1,30438,apache-kafka:2.4.1,"[2020-05-18 15:48:28,252] > INFO [Log partition=topic-name-60, dir=/kafka-topic-data] Completed load of > log with 1 segments, log start offset 0 and log end offset 0 in 2 ms > (kafka.log.Log)" > "May 18, 2020 @ 15:48:45.883",hostname,7805,apache-kafka:2.4.1,"[2020-05-18 > 15:48:45,883] WARN [ReplicaFetcher replicaId=50, leaderId=60, fetcherId=0] > Leader or replica is on protocol version where leader epoch is not considered > in the OffsetsForLeaderEpoch response. The leader's offset 0 will be used for > truncation in topic-name-60. (kafka.server.ReplicaFetcherThread)" > "May 18, 2020 @ 15:48:45.883",hostname,7805,apache-kafka:2.4.1,"[2020-05-18 > 15:48:45,883] INFO [Log partition=topic-name-60, dir=/kafka-topic-data] > Truncating to offset 0 (kafka.log.Log)" > > I believe the truncation has always been a problem but recent > https://issues.apache.org/jira/browse/KAFKA-6266 fix allowed truncation to > actually happen where it wouldn't have before. > The producer snapshots setting as log start offset is a mystery to me so any > light you could shed on why that yhappened and how to avoid would be great. > > I am sanitising full logs and will upload here soon -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10126) Remove unused options in ConsumerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129079#comment-17129079 ] jiamei xie commented on KAFKA-10126: Thanks. Got it. What's about numFetchersOpt? Can it be removed? > Remove unused options in ConsumerPerformance > > > Key: KAFKA-10126 > URL: https://issues.apache.org/jira/browse/KAFKA-10126 > Project: Kafka > Issue Type: Bug >Reporter: jiamei xie >Assignee: jiamei xie >Priority: Major > > Option numThreadsOpt and numFetchersOpt are unused in ConsumerPerformance. > It's a waste of time to test performance vs threads number. So removing it is > needed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10126) Remove unused options in ConsumerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129108#comment-17129108 ] jiamei xie commented on KAFKA-10126: > maybe add new argument to create topic with specify number of partitions. It > seems to me both numbers are an important factor of performance. It seems that the topic could be is created by the producer or kafka-topics.sh. So is proper to add new arguments to create topic ? > Remove unused options in ConsumerPerformance > > > Key: KAFKA-10126 > URL: https://issues.apache.org/jira/browse/KAFKA-10126 > Project: Kafka > Issue Type: Bug >Reporter: jiamei xie >Assignee: jiamei xie >Priority: Major > > Option numThreadsOpt and numFetchersOpt are unused in ConsumerPerformance. > It's a waste of time to test performance vs threads number. So removing it is > needed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10126) Remove unused options in ConsumerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129108#comment-17129108 ] jiamei xie edited comment on KAFKA-10126 at 6/9/20, 10:11 AM: -- > maybe add new argument to create topic with specify number of partitions. It > seems to me both numbers are an important factor of performance. It seems that the topic could be created by the producer or kafka-topics.sh. So is proper to add new arguments to create topic ? was (Author: adally): > maybe add new argument to create topic with specify number of partitions. It > seems to me both numbers are an important factor of performance. It seems that the topic could be is created by the producer or kafka-topics.sh. So is proper to add new arguments to create topic ? > Remove unused options in ConsumerPerformance > > > Key: KAFKA-10126 > URL: https://issues.apache.org/jira/browse/KAFKA-10126 > Project: Kafka > Issue Type: Bug >Reporter: jiamei xie >Assignee: jiamei xie >Priority: Major > > Option numThreadsOpt and numFetchersOpt are unused in ConsumerPerformance. > It's a waste of time to test performance vs threads number. So removing it is > needed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10126) Remove unused options in ConsumerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-10126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129108#comment-17129108 ] jiamei xie edited comment on KAFKA-10126 at 6/9/20, 10:12 AM: -- > maybe add new argument to create topic with specify number of partitions. It > seems to me both numbers are an important factor of performance. It seems that the topic could be created by the producer or kafka-topics.sh. So is it proper to add new arguments to create topic ? was (Author: adally): > maybe add new argument to create topic with specify number of partitions. It > seems to me both numbers are an important factor of performance. It seems that the topic could be created by the producer or kafka-topics.sh. So is proper to add new arguments to create topic ? > Remove unused options in ConsumerPerformance > > > Key: KAFKA-10126 > URL: https://issues.apache.org/jira/browse/KAFKA-10126 > Project: Kafka > Issue Type: Bug >Reporter: jiamei xie >Assignee: jiamei xie >Priority: Major > > Option numThreadsOpt and numFetchersOpt are unused in ConsumerPerformance. > It's a waste of time to test performance vs threads number. So removing it is > needed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10128) MM2 - Delete topics when config sync is enabled
Karthik created KAFKA-10128: --- Summary: MM2 - Delete topics when config sync is enabled Key: KAFKA-10128 URL: https://issues.apache.org/jira/browse/KAFKA-10128 Project: Kafka Issue Type: Bug Components: KafkaConnect, mirrormaker Affects Versions: 2.5.0 Reporter: Karthik Topics being deleted on one region is not being deleted and with that its being recreated incase of a active-active deployment Logs: *Test Check delete Topic* *Delete Command* /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --delete --topic im115 *Source cluster - Before* /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list __consumer_offsets heartbeats im115 im32 mm2-configs.us-east.internal mm2-offset-syncs.us-east.internal mm2-offsets.us-east.internal mm2-status.us-east.internal us-east.checkpoints.internal us-east.heartbeats us-east.im115 us-east.us-east-offsets us-west-offsets *Source cluster -* *After delete* /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list __consumer_offsets heartbeats im32 mm2-configs.us-east.internal mm2-offset-syncs.us-east.internal mm2-offsets.us-east.internal mm2-status.us-east.internal us-east.checkpoints.internal us-east.heartbeats us-east.im115 us-east.us-east-offsets us-west-offsets *Dest Cluster - Before* /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.3.115:9092 --list __consumer_offsets heartbeats im115 mm2-configs.us-west.internal mm2-offset-syncs.us-west.internal mm2-offsets.us-west.internal mm2-status.us-west.internal us-east-offsets us-west.checkpoints.internal us-west.heartbeats us-west.im115 us-west.im32 us-west.us-west-offsets ** *Dest Cluster -* *After Delete* *Did not delete* /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.3.115:9092 --list __consumer_offsets heartbeats im115 mm2-configs.us-west.internal mm2-offset-syncs.us-west.internal mm2-offsets.us-west.internal mm2-status.us-west.internal us-east-offsets us-west.checkpoints.internal us-west.heartbeats us-west.im115 us-west.im32 us-west.us-west-offsets With that after the config refresh mins, the deleted topic is being replicated back on the source cluster /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list __consumer_offsets heartbeats im115 im32 mm2-configs.us-east.internal mm2-offset-syncs.us-east.internal mm2-offsets.us-east.internal mm2-status.us-east.internal us-east.checkpoints.internal us-east.heartbeats us-east.im115 us-east.us-east-offsets us-west-offsets -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10128) MM2 - Delete topics when config sync is enabled
[ https://issues.apache.org/jira/browse/KAFKA-10128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129139#comment-17129139 ] Karthik commented on KAFKA-10128: - It also the other way around where even deleted mirrored topic is also being recreated > MM2 - Delete topics when config sync is enabled > --- > > Key: KAFKA-10128 > URL: https://issues.apache.org/jira/browse/KAFKA-10128 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, mirrormaker >Affects Versions: 2.5.0 >Reporter: Karthik >Priority: Minor > > Topics being deleted on one region is not being deleted and with that its > being recreated incase of a active-active deployment > > Logs: > *Test Check delete Topic* > > *Delete Command* > /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --delete > --topic im115 > > *Source cluster - Before* > /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list > __consumer_offsets > heartbeats > im115 > im32 > mm2-configs.us-east.internal > mm2-offset-syncs.us-east.internal > mm2-offsets.us-east.internal > mm2-status.us-east.internal > us-east.checkpoints.internal > us-east.heartbeats > us-east.im115 > us-east.us-east-offsets > us-west-offsets > > *Source cluster -* *After delete* > /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list > __consumer_offsets > heartbeats > im32 > mm2-configs.us-east.internal > mm2-offset-syncs.us-east.internal > mm2-offsets.us-east.internal > mm2-status.us-east.internal > us-east.checkpoints.internal > us-east.heartbeats > us-east.im115 > us-east.us-east-offsets > us-west-offsets > > *Dest Cluster - Before* > /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.3.115:9092 --list > __consumer_offsets > heartbeats > im115 > mm2-configs.us-west.internal > mm2-offset-syncs.us-west.internal > mm2-offsets.us-west.internal > mm2-status.us-west.internal > us-east-offsets > us-west.checkpoints.internal > us-west.heartbeats > us-west.im115 > us-west.im32 > us-west.us-west-offsets > ** > *Dest Cluster -* *After Delete* > *Did not delete* > /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.3.115:9092 --list > __consumer_offsets > heartbeats > im115 > mm2-configs.us-west.internal > mm2-offset-syncs.us-west.internal > mm2-offsets.us-west.internal > mm2-status.us-west.internal > us-east-offsets > us-west.checkpoints.internal > us-west.heartbeats > us-west.im115 > us-west.im32 > us-west.us-west-offsets > > With that after the config refresh mins, the deleted topic is being > replicated back on the source cluster > /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list > __consumer_offsets > heartbeats > im115 > im32 > mm2-configs.us-east.internal > mm2-offset-syncs.us-east.internal > mm2-offsets.us-east.internal > mm2-status.us-east.internal > us-east.checkpoints.internal > us-east.heartbeats > us-east.im115 > us-east.us-east-offsets > us-west-offsets -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10128) MM2 - Delete topics when config sync is enabled
[ https://issues.apache.org/jira/browse/KAFKA-10128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129139#comment-17129139 ] Karthik edited comment on KAFKA-10128 at 6/9/20, 11:34 AM: --- It also the other way around where even deleted mirrored topic is also being recreated. Update: Even deleting all the copies of topics are being recreated after refresh *using third cluster for MM2 was (Author: kurs): It also the other way around where even deleted mirrored topic is also being recreated > MM2 - Delete topics when config sync is enabled > --- > > Key: KAFKA-10128 > URL: https://issues.apache.org/jira/browse/KAFKA-10128 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, mirrormaker >Affects Versions: 2.5.0 >Reporter: Karthik >Priority: Minor > > Topics being deleted on one region is not being deleted and with that its > being recreated incase of a active-active deployment > > Logs: > *Test Check delete Topic* > > *Delete Command* > /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --delete > --topic im115 > > *Source cluster - Before* > /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list > __consumer_offsets > heartbeats > im115 > im32 > mm2-configs.us-east.internal > mm2-offset-syncs.us-east.internal > mm2-offsets.us-east.internal > mm2-status.us-east.internal > us-east.checkpoints.internal > us-east.heartbeats > us-east.im115 > us-east.us-east-offsets > us-west-offsets > > *Source cluster -* *After delete* > /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list > __consumer_offsets > heartbeats > im32 > mm2-configs.us-east.internal > mm2-offset-syncs.us-east.internal > mm2-offsets.us-east.internal > mm2-status.us-east.internal > us-east.checkpoints.internal > us-east.heartbeats > us-east.im115 > us-east.us-east-offsets > us-west-offsets > > *Dest Cluster - Before* > /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.3.115:9092 --list > __consumer_offsets > heartbeats > im115 > mm2-configs.us-west.internal > mm2-offset-syncs.us-west.internal > mm2-offsets.us-west.internal > mm2-status.us-west.internal > us-east-offsets > us-west.checkpoints.internal > us-west.heartbeats > us-west.im115 > us-west.im32 > us-west.us-west-offsets > ** > *Dest Cluster -* *After Delete* > *Did not delete* > /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.3.115:9092 --list > __consumer_offsets > heartbeats > im115 > mm2-configs.us-west.internal > mm2-offset-syncs.us-west.internal > mm2-offsets.us-west.internal > mm2-status.us-west.internal > us-east-offsets > us-west.checkpoints.internal > us-west.heartbeats > us-west.im115 > us-west.im32 > us-west.us-west-offsets > > With that after the config refresh mins, the deleted topic is being > replicated back on the source cluster > /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list > __consumer_offsets > heartbeats > im115 > im32 > mm2-configs.us-east.internal > mm2-offset-syncs.us-east.internal > mm2-offsets.us-east.internal > mm2-status.us-east.internal > us-east.checkpoints.internal > us-east.heartbeats > us-east.im115 > us-east.us-east-offsets > us-west-offsets -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10128) MM2 - Delete topics when config sync is enabled
[ https://issues.apache.org/jira/browse/KAFKA-10128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Karthik updated KAFKA-10128: Reviewer: Ryanne Dolan > MM2 - Delete topics when config sync is enabled > --- > > Key: KAFKA-10128 > URL: https://issues.apache.org/jira/browse/KAFKA-10128 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect, mirrormaker >Affects Versions: 2.5.0 >Reporter: Karthik >Priority: Minor > > Topics being deleted on one region is not being deleted and with that its > being recreated incase of a active-active deployment > > Logs: > *Test Check delete Topic* > > *Delete Command* > /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --delete > --topic im115 > > *Source cluster - Before* > /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list > __consumer_offsets > heartbeats > im115 > im32 > mm2-configs.us-east.internal > mm2-offset-syncs.us-east.internal > mm2-offsets.us-east.internal > mm2-status.us-east.internal > us-east.checkpoints.internal > us-east.heartbeats > us-east.im115 > us-east.us-east-offsets > us-west-offsets > > *Source cluster -* *After delete* > /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list > __consumer_offsets > heartbeats > im32 > mm2-configs.us-east.internal > mm2-offset-syncs.us-east.internal > mm2-offsets.us-east.internal > mm2-status.us-east.internal > us-east.checkpoints.internal > us-east.heartbeats > us-east.im115 > us-east.us-east-offsets > us-west-offsets > > *Dest Cluster - Before* > /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.3.115:9092 --list > __consumer_offsets > heartbeats > im115 > mm2-configs.us-west.internal > mm2-offset-syncs.us-west.internal > mm2-offsets.us-west.internal > mm2-status.us-west.internal > us-east-offsets > us-west.checkpoints.internal > us-west.heartbeats > us-west.im115 > us-west.im32 > us-west.us-west-offsets > ** > *Dest Cluster -* *After Delete* > *Did not delete* > /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.3.115:9092 --list > __consumer_offsets > heartbeats > im115 > mm2-configs.us-west.internal > mm2-offset-syncs.us-west.internal > mm2-offsets.us-west.internal > mm2-status.us-west.internal > us-east-offsets > us-west.checkpoints.internal > us-west.heartbeats > us-west.im115 > us-west.im32 > us-west.us-west-offsets > > With that after the config refresh mins, the deleted topic is being > replicated back on the source cluster > /opt/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.0.32:9092 --list > __consumer_offsets > heartbeats > im115 > im32 > mm2-configs.us-east.internal > mm2-offset-syncs.us-east.internal > mm2-offsets.us-east.internal > mm2-status.us-east.internal > us-east.checkpoints.internal > us-east.heartbeats > us-east.im115 > us-east.us-east-offsets > us-west-offsets -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation
[ https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Tan updated KAFKA-9800: - Description: Design: The main idea is to bookkeep the failed attempts. Making those class containing retriable data inherit from an abstract class {{Retriable.}}This class will record the number of failed attempts. I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the and returns the backoff/timeout value at the corresponding level. There’re two main usage patterns. {{}} {{}} # For those async retries, the data often stays in a queue. We will make the class inherit from the {{Retriable}} and record failure when a {{RetriableException}} happens. # For those synchronous retires, the backoff is often implemented in a blocking poll/loop, we won’t need the inheritance and will just record the failedAttempts using a local variable of generic data type (Long). Producer side: # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each ProducerBatch, which already has an attribute attempts recording the number of failed attempts. # {{}} was:In {{KafkaAdminClient}}, we will have to modify the way the retry backoff is calculated for the calls that have failed and need to be retried. From the current static retry backoff, we have to introduce a mechanism for all calls that upon failure, the next retry time is dynamically calculated. > [KIP-580] Client Exponential Backoff Implementation > --- > > Key: KAFKA-9800 > URL: https://issues.apache.org/jira/browse/KAFKA-9800 > Project: Kafka > Issue Type: New Feature >Reporter: Cheng Tan >Assignee: Cheng Tan >Priority: Major > Labels: KIP-580 > > Design: > The main idea is to bookkeep the failed attempts. Making those class > containing retriable data inherit from an abstract class {{Retriable.}}This > class will record the number of failed attempts. I already wrapped the > exponential backoff/timeout util class in my KIP-601 > [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] > which takes the and returns the backoff/timeout value at the corresponding > level. There’re two main usage patterns. > {{}} > {{}} > # For those async retries, the data often stays in a queue. We will make the > class inherit from the {{Retriable}} and record failure when a > {{RetriableException}} happens. > # For those synchronous retires, the backoff is often implemented in a > blocking poll/loop, we won’t need the inheritance and will just record the > failedAttempts using a local variable of generic data type (Long). > Producer side: > # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each > ProducerBatch, which already has an attribute attempts recording the number > of failed attempts. > # > > > {{}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation
[ https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Tan updated KAFKA-9800: - Description: Design: The main idea is to bookkeep the failed attempt. Currently, the retry backoff has two main usage patterns: # Synchronous retires and blocking loop. The thread will sleep in each iteration for # Async retries. In each polling, the retries do not meet the backoff will be filtered. The data class often maintains a 1:1 mapping to a set of requests which are logically associated. (i.e. contains only one initial request and only its retries.) For type 1, we can utilize a local failure counter of a Java generic data type. For case 2, we can make those classes containing retriable data inherit from an abstract class Retriable. Retriable will implement interfaces recording the number of failed attempts. I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the number of failures and returns the backoff/timeout value at the corresponding level. Currently, the retry backoff has two main usage patterns. # For those async retries, the data often stays in a queue. We will make the class inherit from the {{Retriable}} and record failure when a {{RetriableException}} happens. # For those synchronous retires, the backoff is often implemented in a blocking poll/loop, we won’t need the inheritance and will just record the failed attempts using a local variable of generic data type (Long). Producer side: # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each ProducerBatch in Accumulator, which already has an attribute attempts recording the number of failed attempts. # Transaction request (API_KEY..*TXN). TxnRequestHandler will inherit from Retriable and record each failed attempts, which will . # {{}} was: Design: The main idea is to bookkeep the failed attempts. Making those class containing retriable data inherit from an abstract class {{Retriable.}}This class will record the number of failed attempts. I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the and returns the backoff/timeout value at the corresponding level. There’re two main usage patterns. {{}} {{}} # For those async retries, the data often stays in a queue. We will make the class inherit from the {{Retriable}} and record failure when a {{RetriableException}} happens. # For those synchronous retires, the backoff is often implemented in a blocking poll/loop, we won’t need the inheritance and will just record the failedAttempts using a local variable of generic data type (Long). Producer side: # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each ProducerBatch, which already has an attribute attempts recording the number of failed attempts. # {{}} > [KIP-580] Client Exponential Backoff Implementation > --- > > Key: KAFKA-9800 > URL: https://issues.apache.org/jira/browse/KAFKA-9800 > Project: Kafka > Issue Type: New Feature >Reporter: Cheng Tan >Assignee: Cheng Tan >Priority: Major > Labels: KIP-580 > > Design: > The main idea is to bookkeep the failed attempt. Currently, the retry backoff > has two main usage patterns: > # Synchronous retires and blocking loop. The thread will sleep in each > iteration for > # Async retries. In each polling, the retries do not meet the backoff will > be filtered. The data class often maintains a 1:1 mapping to a set of > requests which are logically associated. (i.e. contains only one initial > request and only its retries.) > For type 1, we can utilize a local failure counter of a Java generic data > type. > For case 2, we can make those classes containing retriable data inherit from > an abstract class Retriable. Retriable will implement interfaces recording > the number of failed attempts. I already wrapped the exponential > backoff/timeout util class in my KIP-601 > [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] > which takes the number of failures and returns the backoff/timeout value at > the corresponding level. > Currently, the retry backoff has two main usage patterns. > # For those async retries, the data often stays in a queue. We will make the > class inherit from the {{Retriable}} and record failure when a > {{RetriableException}} happens. > # For those synchronous retires, the backoff is often implemented in a > blocking poll/loop, we won’t need the inheritance and will just record the >
[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation
[ https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Tan updated KAFKA-9800: - Description: Design: The main idea is to bookkeep the failed attempt. Currently, the retry backoff has two main usage patterns: # Synchronous retires and blocking loop. The thread will sleep in each iteration for # Async retries. In each polling, the retries do not meet the backoff will be filtered. The data class often maintains a 1:1 mapping to a set of requests which are logically associated. (i.e. a set contains only one initial request and only its retries.) For type 1, we can utilize a local failure counter of a Java generic data type. For case 2, we can make those classes containing retriable data inherit from an abstract class Retriable. Retriable will implement interfaces recording the number of failed attempts. I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the number of failures and returns the backoff/timeout value at the corresponding level. Currently, the retry backoff has two main usage patterns. # For those async retries, the data often stays in a queue. We will make the class inherit from the {{Retriable}} and record failure when a {{RetriableException}} happens. # For those synchronous retires, the backoff is often implemented in a blocking poll/loop, we won’t need the inheritance and will just record the failed attempts using a local variable of generic data type (Long). Producer side: # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each ProducerBatch in Accumulator, which already has an attribute attempts recording the number of failed attempts. # Transaction request (API_KEY..*TXN). TxnRequestHandler will inherit from Retriable and record each failed attempts, which will . # {{}} was: Design: The main idea is to bookkeep the failed attempt. Currently, the retry backoff has two main usage patterns: # Synchronous retires and blocking loop. The thread will sleep in each iteration for # Async retries. In each polling, the retries do not meet the backoff will be filtered. The data class often maintains a 1:1 mapping to a set of requests which are logically associated. (i.e. contains only one initial request and only its retries.) For type 1, we can utilize a local failure counter of a Java generic data type. For case 2, we can make those classes containing retriable data inherit from an abstract class Retriable. Retriable will implement interfaces recording the number of failed attempts. I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the number of failures and returns the backoff/timeout value at the corresponding level. Currently, the retry backoff has two main usage patterns. # For those async retries, the data often stays in a queue. We will make the class inherit from the {{Retriable}} and record failure when a {{RetriableException}} happens. # For those synchronous retires, the backoff is often implemented in a blocking poll/loop, we won’t need the inheritance and will just record the failed attempts using a local variable of generic data type (Long). Producer side: # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each ProducerBatch in Accumulator, which already has an attribute attempts recording the number of failed attempts. # Transaction request (API_KEY..*TXN). TxnRequestHandler will inherit from Retriable and record each failed attempts, which will . # {{}} > [KIP-580] Client Exponential Backoff Implementation > --- > > Key: KAFKA-9800 > URL: https://issues.apache.org/jira/browse/KAFKA-9800 > Project: Kafka > Issue Type: New Feature >Reporter: Cheng Tan >Assignee: Cheng Tan >Priority: Major > Labels: KIP-580 > > Design: > The main idea is to bookkeep the failed attempt. Currently, the retry backoff > has two main usage patterns: > # Synchronous retires and blocking loop. The thread will sleep in each > iteration for > # Async retries. In each polling, the retries do not meet the backoff will > be filtered. The data class often maintains a 1:1 mapping to a set of > requests which are logically associated. (i.e. a set contains only one > initial request and only its retries.) > For type 1, we can utilize a local failure counter of a Java generic data > type. > For case 2, we can make those classes containing retriable data inherit from > an abstract class Retriable. Retriable will implement interfaces recording > the numb
[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation
[ https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Tan updated KAFKA-9800: - Description: Design: The main idea is to bookkeep the failed attempt. Currently, the retry backoff has two main usage patterns: # Synchronous retires and blocking loop. The thread will sleep in each iteration for # Async retries. In each polling, the retries do not meet the backoff will be filtered. The data class often maintains a 1:1 mapping to a set of requests which are logically associated. (i.e. a set contains only one initial request and only its retries.) For type 1, we can utilize a local failure counter of a Java generic data type. For case 2, we can make those classes containing retriable data inherit from an abstract class Retriable. Retriable will implement interfaces recording the number of failed attempts. I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the number of failures and returns the backoff/timeout value at the corresponding level. Changes: KafkaProducer: # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each ProducerBatch in Accumulator, which already has an attribute attempts recording the number of failed attempts. # Transaction request (API_KEY..*TXN). TxnRequestHandler will inherit from Retriable and record each failed attempt. KafkaConsumer was: Design: The main idea is to bookkeep the failed attempt. Currently, the retry backoff has two main usage patterns: # Synchronous retires and blocking loop. The thread will sleep in each iteration for # Async retries. In each polling, the retries do not meet the backoff will be filtered. The data class often maintains a 1:1 mapping to a set of requests which are logically associated. (i.e. a set contains only one initial request and only its retries.) For type 1, we can utilize a local failure counter of a Java generic data type. For case 2, we can make those classes containing retriable data inherit from an abstract class Retriable. Retriable will implement interfaces recording the number of failed attempts. I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the number of failures and returns the backoff/timeout value at the corresponding level. Currently, the retry backoff has two main usage patterns. # For those async retries, the data often stays in a queue. We will make the class inherit from the {{Retriable}} and record failure when a {{RetriableException}} happens. # For those synchronous retires, the backoff is often implemented in a blocking poll/loop, we won’t need the inheritance and will just record the failed attempts using a local variable of generic data type (Long). Producer side: # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each ProducerBatch in Accumulator, which already has an attribute attempts recording the number of failed attempts. # Transaction request (API_KEY..*TXN). TxnRequestHandler will inherit from Retriable and record each failed attempts, which will . # {{}} > [KIP-580] Client Exponential Backoff Implementation > --- > > Key: KAFKA-9800 > URL: https://issues.apache.org/jira/browse/KAFKA-9800 > Project: Kafka > Issue Type: New Feature >Reporter: Cheng Tan >Assignee: Cheng Tan >Priority: Major > Labels: KIP-580 > > Design: > The main idea is to bookkeep the failed attempt. Currently, the retry backoff > has two main usage patterns: > # Synchronous retires and blocking loop. The thread will sleep in each > iteration for > # Async retries. In each polling, the retries do not meet the backoff will > be filtered. The data class often maintains a 1:1 mapping to a set of > requests which are logically associated. (i.e. a set contains only one > initial request and only its retries.) > For type 1, we can utilize a local failure counter of a Java generic data > type. > For case 2, we can make those classes containing retriable data inherit from > an abstract class Retriable. Retriable will implement interfaces recording > the number of failed attempts. I already wrapped the exponential > backoff/timeout util class in my KIP-601 > [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] > which takes the number of failures and returns the backoff/timeout value at > the corresponding level. > > Changes: > KafkaProducer: > # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each > ProducerBa
[jira] [Resolved] (KAFKA-9724) Consumer wrongly ignores fetched records "since it no longer has valid position"
[ https://issues.apache.org/jira/browse/KAFKA-9724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur resolved KAFKA-9724. - Fix Version/s: 2.6.0 Assignee: David Arthur Resolution: Fixed > Consumer wrongly ignores fetched records "since it no longer has valid > position" > > > Key: KAFKA-9724 > URL: https://issues.apache.org/jira/browse/KAFKA-9724 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 2.4.0 >Reporter: Oleg Muravskiy >Assignee: David Arthur >Priority: Major > Fix For: 2.6.0 > > Attachments: consumer.log.xz > > > After upgrading kafka-client to 2.4.0 (while brokers are still at 2.2.0) > consumers in a consumer group intermittently stop progressing on assigned > partitions, even when there are messages to consume. This is not a permanent > condition, as they progress from time to time, but become slower with time, > and catch up after restart. > Here is a sample of 3 consecutive ignored fetches: > {noformat} > 2020-03-15 12:08:58,440 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - > Committed offset 538065584 for partition mrt-rrc10-6 > 2020-03-15 12:08:58,541 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Skipping validation of fetch offsets for partitions [mrt-rrc10-1, > mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required > protocol version (introduced in Kafka 2.3) > 2020-03-15 12:08:58,549 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - > Updating last seen epoch from null to 62 for partition mrt-rrc10-6 > 2020-03-15 12:08:58,557 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - > Committed offset 538065584 for partition mrt-rrc10-6 > 2020-03-15 12:08:58,652 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Skipping validation of fetch offsets for partitions [mrt-rrc10-1, > mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required > protocol version (introduced in Kafka 2.3) > 2020-03-15 12:08:58,659 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - > Updating last seen epoch from null to 62 for partition mrt-rrc10-6 > 2020-03-15 12:08:58,659 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Fetch READ_UNCOMMITTED at offset 538065584 for partition mrt-rrc10-6 returned > fetch data (error=NONE, highWaterMark=538065631, lastStableOffset = > 538065631, logStartOffset = 485284547, preferredReadReplica = absent, > abortedTransactions = null, recordsSizeInBytes=16380) > 2020-03-15 12:08:58,659 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Ignoring fetched records for partition mrt-rrc10-6 since it no longer has > valid position > 2020-03-15 12:08:58,665 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - > Committed offset 538065584 for partition mrt-rrc10-6 > 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Skipping validation of fetch offsets for partitions [mrt-rrc10-1, > mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required > protocol version (introduced in Kafka 2.3) > 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Added READ_UNCOMMITTED fetch request for partition mrt-rrc10-6 at position > FetchPosition{offset=538065584, offsetEpoch=Optional[62], > currentLeader=LeaderAndEpoch{leader=node03.kafka:9092 (id: 3 rack: null), > epoch=-1}} to node node03.kafka:9092 (id: 3 rack: null) > 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), > implied=(mrt-rrc10-6, mrt-rrc22-7, mrt-rrc10-1)) to broker node03.kafka:9092 > (id: 3 rack: null) > 2020-03-15 12:08:58,770 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - > Updating last seen epoch from null to 62 for partition mrt-rrc10-6 > 2020-03-15 12:08:58,770 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Fetch READ_UNCOMMITTED at offset 538065584 for partition mrt-rrc10-6 returned > fetch data (error=NONE, highWaterMark=538065727, lastStableOffset = > 538065727, logStartOffset = 485284547, preferredReadReplica = absent, > abortedTransactions = null, recordsSizeInBytes=51864) > 2020-03-15 12:08:58,770 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Ignoring fetched records for partition mrt-rrc10-6 since it no longer has > valid position > 2020-03-15 12:08:58,808 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - > Committed offset 538065584 for partition mrt-rrc10-6 > {noformat} > After which consumer makes progress: > {noformat} > 2020-03-15 12:08:58,871 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Skipping validation of fetch offsets for partitions [mrt-rrc10-1, > mrt-rrc10-6, mrt-rrc22-7] since the
[jira] [Updated] (KAFKA-9724) Consumer wrongly ignores fetched records "since it no longer has valid position"
[ https://issues.apache.org/jira/browse/KAFKA-9724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-9724: Fix Version/s: 2.5.1 > Consumer wrongly ignores fetched records "since it no longer has valid > position" > > > Key: KAFKA-9724 > URL: https://issues.apache.org/jira/browse/KAFKA-9724 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 2.4.0 >Reporter: Oleg Muravskiy >Assignee: David Arthur >Priority: Major > Fix For: 2.6.0, 2.5.1 > > Attachments: consumer.log.xz > > > After upgrading kafka-client to 2.4.0 (while brokers are still at 2.2.0) > consumers in a consumer group intermittently stop progressing on assigned > partitions, even when there are messages to consume. This is not a permanent > condition, as they progress from time to time, but become slower with time, > and catch up after restart. > Here is a sample of 3 consecutive ignored fetches: > {noformat} > 2020-03-15 12:08:58,440 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - > Committed offset 538065584 for partition mrt-rrc10-6 > 2020-03-15 12:08:58,541 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Skipping validation of fetch offsets for partitions [mrt-rrc10-1, > mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required > protocol version (introduced in Kafka 2.3) > 2020-03-15 12:08:58,549 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - > Updating last seen epoch from null to 62 for partition mrt-rrc10-6 > 2020-03-15 12:08:58,557 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - > Committed offset 538065584 for partition mrt-rrc10-6 > 2020-03-15 12:08:58,652 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Skipping validation of fetch offsets for partitions [mrt-rrc10-1, > mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required > protocol version (introduced in Kafka 2.3) > 2020-03-15 12:08:58,659 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - > Updating last seen epoch from null to 62 for partition mrt-rrc10-6 > 2020-03-15 12:08:58,659 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Fetch READ_UNCOMMITTED at offset 538065584 for partition mrt-rrc10-6 returned > fetch data (error=NONE, highWaterMark=538065631, lastStableOffset = > 538065631, logStartOffset = 485284547, preferredReadReplica = absent, > abortedTransactions = null, recordsSizeInBytes=16380) > 2020-03-15 12:08:58,659 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Ignoring fetched records for partition mrt-rrc10-6 since it no longer has > valid position > 2020-03-15 12:08:58,665 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - > Committed offset 538065584 for partition mrt-rrc10-6 > 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Skipping validation of fetch offsets for partitions [mrt-rrc10-1, > mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required > protocol version (introduced in Kafka 2.3) > 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Added READ_UNCOMMITTED fetch request for partition mrt-rrc10-6 at position > FetchPosition{offset=538065584, offsetEpoch=Optional[62], > currentLeader=LeaderAndEpoch{leader=node03.kafka:9092 (id: 3 rack: null), > epoch=-1}} to node node03.kafka:9092 (id: 3 rack: null) > 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), > implied=(mrt-rrc10-6, mrt-rrc22-7, mrt-rrc10-1)) to broker node03.kafka:9092 > (id: 3 rack: null) > 2020-03-15 12:08:58,770 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - > Updating last seen epoch from null to 62 for partition mrt-rrc10-6 > 2020-03-15 12:08:58,770 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Fetch READ_UNCOMMITTED at offset 538065584 for partition mrt-rrc10-6 returned > fetch data (error=NONE, highWaterMark=538065727, lastStableOffset = > 538065727, logStartOffset = 485284547, preferredReadReplica = absent, > abortedTransactions = null, recordsSizeInBytes=51864) > 2020-03-15 12:08:58,770 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Ignoring fetched records for partition mrt-rrc10-6 since it no longer has > valid position > 2020-03-15 12:08:58,808 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - > Committed offset 538065584 for partition mrt-rrc10-6 > {noformat} > After which consumer makes progress: > {noformat} > 2020-03-15 12:08:58,871 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - > Skipping validation of fetch offsets for partitions [mrt-rrc10-1, > mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required > protocol ve
[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation
[ https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Tan updated KAFKA-9800: - Description: Design: The main idea is to bookkeep the failed attempt. Currently, the retry backoff has two main usage patterns: # Synchronous retires and blocking loop. The thread will sleep in each iteration for # Async retries. In each polling, the retries do not meet the backoff will be filtered. The data class often maintains a 1:1 mapping to a set of requests which are logically associated. (i.e. a set contains only one initial request and only its retries.) For type 1, we can utilize a local failure counter of a Java generic data type. For case 2, we can make those classes containing retriable data inherit from an abstract class Retriable. Retriable will implement interfaces recording the number of failed attempts. I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the number of failures and returns the backoff/timeout value at the corresponding level. Changes: KafkaProducer: # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each ProducerBatch in Accumulator, which already has an attribute attempts recording the number of failed attempts. # Transaction request (API_KEY..*TXN). TxnRequestHandler will inherit from Retriable and record each failed attempt. KafkaConsumer: # # Partition state request (API_KEY.OFFSET_FOR_LEADER_EPOCH, was: Design: The main idea is to bookkeep the failed attempt. Currently, the retry backoff has two main usage patterns: # Synchronous retires and blocking loop. The thread will sleep in each iteration for # Async retries. In each polling, the retries do not meet the backoff will be filtered. The data class often maintains a 1:1 mapping to a set of requests which are logically associated. (i.e. a set contains only one initial request and only its retries.) For type 1, we can utilize a local failure counter of a Java generic data type. For case 2, we can make those classes containing retriable data inherit from an abstract class Retriable. Retriable will implement interfaces recording the number of failed attempts. I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the number of failures and returns the backoff/timeout value at the corresponding level. Changes: KafkaProducer: # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each ProducerBatch in Accumulator, which already has an attribute attempts recording the number of failed attempts. # Transaction request (API_KEY..*TXN). TxnRequestHandler will inherit from Retriable and record each failed attempt. KafkaConsumer > [KIP-580] Client Exponential Backoff Implementation > --- > > Key: KAFKA-9800 > URL: https://issues.apache.org/jira/browse/KAFKA-9800 > Project: Kafka > Issue Type: New Feature >Reporter: Cheng Tan >Assignee: Cheng Tan >Priority: Major > Labels: KIP-580 > > Design: > The main idea is to bookkeep the failed attempt. Currently, the retry backoff > has two main usage patterns: > # Synchronous retires and blocking loop. The thread will sleep in each > iteration for > # Async retries. In each polling, the retries do not meet the backoff will > be filtered. The data class often maintains a 1:1 mapping to a set of > requests which are logically associated. (i.e. a set contains only one > initial request and only its retries.) > For type 1, we can utilize a local failure counter of a Java generic data > type. > For case 2, we can make those classes containing retriable data inherit from > an abstract class Retriable. Retriable will implement interfaces recording > the number of failed attempts. I already wrapped the exponential > backoff/timeout util class in my KIP-601 > [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] > which takes the number of failures and returns the backoff/timeout value at > the corresponding level. > > Changes: > KafkaProducer: > # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each > ProducerBatch in Accumulator, which already has an attribute attempts > recording the number of failed attempts. > # Transaction request (API_KEY..*TXN). TxnRequestHandler will inherit from > Retriable and record each failed attempt. > KafkaConsumer: > # > # Partition state request (API_KEY.OFFSET_FOR_LEADER_EPOCH, > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10127) kafka cluster not recovering - Shrinking ISR continously
[ https://issues.apache.org/jira/browse/KAFKA-10127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Youssef BOUZAIENNE updated KAFKA-10127: --- Description: We are actually facing issue from time to time where our kafka cluster goes into a weird state where we see the following log repeating [2020-06-06 08:35:48,117] INFO [Partition test broker=1002] Cached zkVersion 620 not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) [2020-06-06 08:35:48,117] INFO [Partition test broker=1002] Shrinking ISR from 1006,1002 to 1002. Leader: (highWatermark: 3222733572, endOffset: 3222741893). Out of sync replicas: (brokerId: 1006, endOffset: 3222733572). (kafka.cluster.Partition) Before that our zookeeper session expired which lead us to that state after we increased this two value we encounter the issue less frequently but it still appears from time to time and the only solution is restart of kafka service on all brokers zookeeper.session.timeout.ms=18000 replica.lag.time.max.ms=3 Any help on that please was: We are actually facing issue from time to time where our kafka cluster goes into a weird state where we see the following log repeating [2020-06-06 08:35:48,117] INFO [Partition test broker=1002] Cached zkVersion 620 not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) [2020-06-06 08:35:48,117] INFO [Partition test broker=1002] Shrinking ISR from 1006,1002 to 1002. Leader: (highWatermark: 3222733572, endOffset: 3222741893). Out of sync replicas: (brokerId: 1006, endOffset: 3222733572). (kafka.cluster.Partition) Before that our zookeeper session expired which lead us to that state after we increased this two value we encounter the issue less frequently but it still appears from time to time and the only solution is restart of kafka service on all brokers zookeeper.session.timeout.ms=18000 replica.lag.time.max.ms=3 Any help on that please > kafka cluster not recovering - Shrinking ISR continously > - > > Key: KAFKA-10127 > URL: https://issues.apache.org/jira/browse/KAFKA-10127 > Project: Kafka > Issue Type: Bug > Components: replication, zkclient >Affects Versions: 2.4.1 > Environment: using kafka version 2.4.1 and zookeeper version 3.5.7 >Reporter: Youssef BOUZAIENNE >Priority: Major > > We are actually facing issue from time to time where our kafka cluster goes > into a weird state where we see the following log repeating > [2020-06-06 08:35:48,117] INFO [Partition test broker=1002] Cached zkVersion > 620 not equal to that in zookeeper, skip updating ISR > (kafka.cluster.Partition) > [2020-06-06 08:35:48,117] INFO [Partition test broker=1002] Shrinking ISR > from 1006,1002 to 1002. Leader: (highWatermark: 3222733572, endOffset: > 3222741893). Out of sync replicas: (brokerId: 1006, endOffset: 3222733572). > (kafka.cluster.Partition) > > Before that our zookeeper session expired which lead us to that state > > after we increased this two value we encounter the issue less frequently but > it still appears from time to time and the only solution is restart of kafka > service on all brokers > zookeeper.session.timeout.ms=18000 > replica.lag.time.max.ms=3 > > Any help on that please -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9747) No tasks created for a connector
[ https://issues.apache.org/jira/browse/KAFKA-9747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129374#comment-17129374 ] Rodrigo commented on KAFKA-9747: I have the same issue here... MongoDB (Atlas), MongoDB Connector and AWS MSK. > No tasks created for a connector > > > Key: KAFKA-9747 > URL: https://issues.apache.org/jira/browse/KAFKA-9747 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0 > Environment: OS: Ubuntu 18.04 LTS > Platform: Confluent Platform 5.4 > HW: The same behaviour on various AWS instances - from t3.small to c5.xlarge >Reporter: Vit Koma >Priority: Major > Attachments: connect-distributed.properties, connect.log > > > We are running Kafka Connect in a distributed mode on 3 nodes using Debezium > (MongoDB) and Confluent S3 connectors. When adding a new connector via the > REST API the connector is created in RUNNING state, but no tasks are created > for the connector. > Pausing and resuming the connector does not help. When we stop all workers > and then start them again, the tasks are created and everything runs as it > should. > The issue does not show up if we run only a single node. > The issue is not caused by the connector plugins, because we see the same > behaviour for both Debezium and S3 connectors. Also in debug logs I can see > that Debezium is correctly returning a task configuration from the > Connector.taskConfigs() method. > Connector configuration examples > Debezium: > { > "name": "qa-mongodb-comp-converter-task|1", > "config": { > "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", > "mongodb.hosts": > "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017", > "mongodb.name": "qa-debezium-comp", > "mongodb.ssl.enabled": true, > "collection.whitelist": "converter[.]task", > "tombstones.on.delete": true > } > } > S3 Connector: > { > "name": "qa-s3-sink-task|1", > "config": { > "connector.class": "io.confluent.connect.s3.S3SinkConnector", > "topics": "qa-debezium-comp.converter.task", > "topics.dir": "data/env/qa", > "s3.region": "eu-west-1", > "s3.bucket.name": "", > "flush.size": "15000", > "rotate.interval.ms": "360", > "storage.class": "io.confluent.connect.s3.storage.S3Storage", > "format.class": > "custom.kafka.connect.s3.format.plaintext.PlaintextFormat", > "schema.generator.class": > "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", > "partitioner.class": > "io.confluent.connect.storage.partitioner.DefaultPartitioner", > "schema.compatibility": "NONE", > "key.converter": "org.apache.kafka.connect.json.JsonConverter", > "value.converter": "org.apache.kafka.connect.json.JsonConverter", > "key.converter.schemas.enable": false, > "value.converter.schemas.enable": false, > "transforms": "ExtractDocument", > > "transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value" > } > } > The connectors are created using curl: {{curl -X POST -H "Content-Type: > application/json" --data @ http:/:10083/connectors}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10129) Fail the QA if there is javadoc error
Chia-Ping Tsai created KAFKA-10129: -- Summary: Fail the QA if there is javadoc error Key: KAFKA-10129 URL: https://issues.apache.org/jira/browse/KAFKA-10129 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai from [~hachikuji] (https://github.com/apache/kafka/pull/8660#pullrequestreview-425856179) {quote} One other question I had is whether we should consider making doc failures also fail the build? {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10129) Fail the QA if there is javadoc error
[ https://issues.apache.org/jira/browse/KAFKA-10129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17129393#comment-17129393 ] Chia-Ping Tsai commented on KAFKA-10129: The javadoc is a part of public publication so we should avoid the error as much as possible > Fail the QA if there is javadoc error > - > > Key: KAFKA-10129 > URL: https://issues.apache.org/jira/browse/KAFKA-10129 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > from [~hachikuji] > (https://github.com/apache/kafka/pull/8660#pullrequestreview-425856179) > {quote} > One other question I had is whether we should consider making doc failures > also fail the build? > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation
[ https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Tan updated KAFKA-9800: - Description: Design: The main idea is to bookkeep the failed attempt. Currently, the retry backoff has two main usage patterns: # Synchronous retires and blocking loop. The thread will sleep in each iteration for # Async retries. In each polling, the retries do not meet the backoff will be filtered. The data class often maintains a 1:1 mapping to a set of requests which are logically associated. (i.e. a set contains only one initial request and only its retries.) For type 1, we can utilize a local failure counter of a Java generic data type. For case 2, we can make those classes containing retriable data inherit from an abstract class Retriable. Retriable will implement interfaces recording the number of failed attempts. I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the number of failures and returns the backoff/timeout value at the corresponding level. Changes: KafkaProducer: # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each ProducerBatch in Accumulator, which already has an attribute attempts recording the number of failed attempts. So probably clean up the logic a little bit by hiding the failed attempts property and the getter method by inheritance. # Transaction request (ApiKeys..*TXN). TxnRequestHandler will inherit from Retriable and record each failed attempt. KafkaConsumer: # Some synchronous retry use cases. Record the failed attempts in the blocking loop. # Partition state request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). Though the actual requests are packed for each node, the current implementation is applying backoff to each topic partition, where the backoff value is kept by TopicPartitionState. Thus, TopicPartitionState will inherit from Retribale. Metadata: AdminClient: # AdminClient has its own request abstraction Call. The failed attempts are kept by the abstraction. So probably clean the logic a bit by inheritance. was: Design: The main idea is to bookkeep the failed attempt. Currently, the retry backoff has two main usage patterns: # Synchronous retires and blocking loop. The thread will sleep in each iteration for # Async retries. In each polling, the retries do not meet the backoff will be filtered. The data class often maintains a 1:1 mapping to a set of requests which are logically associated. (i.e. a set contains only one initial request and only its retries.) For type 1, we can utilize a local failure counter of a Java generic data type. For case 2, we can make those classes containing retriable data inherit from an abstract class Retriable. Retriable will implement interfaces recording the number of failed attempts. I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the number of failures and returns the backoff/timeout value at the corresponding level. Changes: KafkaProducer: # Produce request (API_KEY.PRODUCE). Currently, the backoff applies to each ProducerBatch in Accumulator, which already has an attribute attempts recording the number of failed attempts. # Transaction request (API_KEY..*TXN). TxnRequestHandler will inherit from Retriable and record each failed attempt. KafkaConsumer: # # Partition state request (API_KEY.OFFSET_FOR_LEADER_EPOCH, > [KIP-580] Client Exponential Backoff Implementation > --- > > Key: KAFKA-9800 > URL: https://issues.apache.org/jira/browse/KAFKA-9800 > Project: Kafka > Issue Type: New Feature >Reporter: Cheng Tan >Assignee: Cheng Tan >Priority: Major > Labels: KIP-580 > > Design: > The main idea is to bookkeep the failed attempt. Currently, the retry backoff > has two main usage patterns: > # Synchronous retires and blocking loop. The thread will sleep in each > iteration for > # Async retries. In each polling, the retries do not meet the backoff will > be filtered. The data class often maintains a 1:1 mapping to a set of > requests which are logically associated. (i.e. a set contains only one > initial request and only its retries.) > For type 1, we can utilize a local failure counter of a Java generic data > type. > For case 2, we can make those classes containing retriable data inherit from > an abstract class Retriable. Retriable will implement interfaces recording > the number of failed attempts. I already wrapped the exponential > backoff/timeout util class in my KIP-601 > [implem
[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation
[ https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Tan updated KAFKA-9800: - Description: Design: The main idea is to bookkeep the failed attempt. Currently, the retry backoff has two main usage patterns: # Synchronous retires and blocking loop. The thread will sleep in each iteration for # Async retries. In each polling, the retries do not meet the backoff will be filtered. The data class often maintains a 1:1 mapping to a set of requests which are logically associated. (i.e. a set contains only one initial request and only its retries.) For type 1, we can utilize a local failure counter of a Java generic data type. For case 2, we can make those classes containing retriable data inherit from an abstract class Retriable. Retriable will implement interfaces recording the number of failed attempts. I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the number of failures and returns the backoff/timeout value at the corresponding level. Changes: KafkaProducer: # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each ProducerBatch in Accumulator, which already has an attribute attempts recording the number of failed attempts. So probably clean up the logic a little bit by hiding the failed attempts property and the getter method by inheritance. # Transaction request (ApiKeys..*TXN). TxnRequestHandler will inherit from Retriable and record each failed attempt. KafkaConsumer: # Some synchronous retry use cases. Record the failed attempts in the blocking loop. # Partition state request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). Though the actual requests are packed for each node, the current implementation is applying backoff to each topic partition, where the backoff value is kept by TopicPartitionState. Thus, TopicPartitionState will inherit from Retribale. Metadata: # Metadata lives as a singleton in many clients. It can inherit from Retriable. AdminClient: # AdminClient has its own request abstraction Call. The failed attempts are kept by the abstraction. So probably clean the logic a bit by inheritance. There're other common usages look like client.poll(timeout), where the timeout passed in is the retry backoff value. We won't change these usages since its underlying logic is nioSelector.select(timeout) and nioSelector.selectNow(), which means if no interested op exists, the client will block retry backoff milliseconds. This is an optimization when there's no request that needs to be sent but the client is waiting for responses. Specifically, if the client fails the inflight requests before the retry backoff milliseconds passed, it still needs to wait until that amount of time passed, unless there's a new request need to be sent. was: Design: The main idea is to bookkeep the failed attempt. Currently, the retry backoff has two main usage patterns: # Synchronous retires and blocking loop. The thread will sleep in each iteration for # Async retries. In each polling, the retries do not meet the backoff will be filtered. The data class often maintains a 1:1 mapping to a set of requests which are logically associated. (i.e. a set contains only one initial request and only its retries.) For type 1, we can utilize a local failure counter of a Java generic data type. For case 2, we can make those classes containing retriable data inherit from an abstract class Retriable. Retriable will implement interfaces recording the number of failed attempts. I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the number of failures and returns the backoff/timeout value at the corresponding level. Changes: KafkaProducer: # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each ProducerBatch in Accumulator, which already has an attribute attempts recording the number of failed attempts. So probably clean up the logic a little bit by hiding the failed attempts property and the getter method by inheritance. # Transaction request (ApiKeys..*TXN). TxnRequestHandler will inherit from Retriable and record each failed attempt. KafkaConsumer: # Some synchronous retry use cases. Record the failed attempts in the blocking loop. # Partition state request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). Though the actual requests are packed for each node, the current implementation is applying backoff to each topic partition, where the backoff value is kept by TopicPartitionState. Thus, TopicPartitionState will inherit from Retribale. Metadata: AdminClient: # AdminClient has its own request
[jira] [Updated] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation
[ https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Tan updated KAFKA-9800: - Description: Design: The main idea is to bookkeep the failed attempt. Currently, the retry backoff has two main usage patterns: # Synchronous retires and blocking loop. The thread will sleep in each iteration for # Async retries. In each polling, the retries do not meet the backoff will be filtered. The data class often maintains a 1:1 mapping to a set of requests which are logically associated. (i.e. a set contains only one initial request and only its retries.) For type 1, we can utilize a local failure counter of a Java generic data type. For case 2, we can make those classes containing retriable data inherit from an abstract class Retriable. Retriable will implement interfaces recording the number of failed attempts. I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the number of failures and returns the backoff/timeout value at the corresponding level. Changes: KafkaProducer: # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each ProducerBatch in Accumulator, which already has an attribute attempts recording the number of failed attempts. So probably clean up the logic a little bit by hiding the failed attempts property and the getter method by inheritance. # Transaction request (ApiKeys..*TXN). TxnRequestHandler will inherit from Retriable and record each failed attempt. KafkaConsumer: # Some synchronous retry use cases. Record the failed attempts in the blocking loop. # Partition state request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). Though the actual requests are packed for each node, the current implementation is applying backoff to each topic partition, where the backoff value is kept by TopicPartitionState. Thus, TopicPartitionState will inherit from Retribale. Metadata: # Metadata lives as a singleton in many clients. It can inherit from Retriable. AdminClient: # AdminClient has its own request abstraction Call. The failed attempts are kept by the abstraction. So probably clean the Call class logic a bit by inheritance. There're other common usages look like client.poll(timeout), where the timeout passed in is the retry backoff value. We won't change these usages since its underlying logic is nioSelector.select(timeout) and nioSelector.selectNow(), which means if no interested op exists, the client will block retry backoff milliseconds. This is an optimization when there's no request that needs to be sent but the client is waiting for responses. Specifically, if the client fails the inflight requests before the retry backoff milliseconds passed, it still needs to wait until that amount of time passed, unless there's a new request need to be sent. was: Design: The main idea is to bookkeep the failed attempt. Currently, the retry backoff has two main usage patterns: # Synchronous retires and blocking loop. The thread will sleep in each iteration for # Async retries. In each polling, the retries do not meet the backoff will be filtered. The data class often maintains a 1:1 mapping to a set of requests which are logically associated. (i.e. a set contains only one initial request and only its retries.) For type 1, we can utilize a local failure counter of a Java generic data type. For case 2, we can make those classes containing retriable data inherit from an abstract class Retriable. Retriable will implement interfaces recording the number of failed attempts. I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the number of failures and returns the backoff/timeout value at the corresponding level. Changes: KafkaProducer: # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each ProducerBatch in Accumulator, which already has an attribute attempts recording the number of failed attempts. So probably clean up the logic a little bit by hiding the failed attempts property and the getter method by inheritance. # Transaction request (ApiKeys..*TXN). TxnRequestHandler will inherit from Retriable and record each failed attempt. KafkaConsumer: # Some synchronous retry use cases. Record the failed attempts in the blocking loop. # Partition state request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). Though the actual requests are packed for each node, the current implementation is applying backoff to each topic partition, where the backoff value is kept by TopicPartitionState. Thus, TopicPartitionState will inherit from Retribale. Metadata: # Metadata lives as a singleton in man
[GitHub] [kafka] hachikuji commented on pull request #8664: KAFKA-9716: Clarify meaning of compression rate metrics
hachikuji commented on pull request #8664: URL: https://github.com/apache/kafka/pull/8664#issuecomment-640947926 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10129) Fail QA if there are javadoc warnings
[ https://issues.apache.org/jira/browse/KAFKA-10129?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-10129: --- Summary: Fail QA if there are javadoc warnings (was: Fail the QA if there is javadoc error) > Fail QA if there are javadoc warnings > - > > Key: KAFKA-10129 > URL: https://issues.apache.org/jira/browse/KAFKA-10129 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > from [~hachikuji] > (https://github.com/apache/kafka/pull/8660#pullrequestreview-425856179) > {quote} > One other question I had is whether we should consider making doc failures > also fail the build? > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang merged pull request #8788: MINOR: Remove unused isSticky assert out from tests only do constrainedAssign
guozhangwang merged pull request #8788: URL: https://github.com/apache/kafka/pull/8788 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #8835: MINOR: reduce sizeInBytes for percentiles metrics
ableegoldman opened a new pull request #8835: URL: https://github.com/apache/kafka/pull/8835 The total amount of memory per Percentiles metric is actually the sizeInBytes * number of samples, where the default number of samples is 2. There are also at least 2 Percentiles metrics per task (and more when there are multiple sources or multiple terminal nodes). So the total memory we allocate for the current e2e latency metrics is `4 * sizeInBytes * numTasks` The current sizeInBytes we chose was 1MB. This is still not particularly large, but may add up and cause unnecessary pressure for users running on lean machines. I reduced the size to 100kB and still found it accurate enough* so we may as well reduce the size so we don't have to worry. (*1,000 runs of the randomized test all passed, meaning it was accurate to within 20% of the expected value) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #8312: KAFKA-9432 automated protocol for DescribeConfigs
mimaison commented on pull request #8312: URL: https://github.com/apache/kafka/pull/8312#issuecomment-640882951 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #8828: KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy
C0urante commented on a change in pull request #8828: URL: https://github.com/apache/kafka/pull/8828#discussion_r436881026 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java ## @@ -375,6 +383,152 @@ public boolean createTopic(NewTopic topic) { return existingTopics; } +/** + * Verify the named topic uses only compaction for the cleanup policy. + * + * @param topic the name of the topic + * @param workerTopicConfig the name of the worker configuration that specifies the topic name + * @return true if the admin client could be used to verify the topic setting, or false if + * the verification could not be performed, likely because the admin client principal + * did not have the required permissions or because the broker was older than 0.11.0.0 + * @throws ConfigException if the actual topic setting did not match the required setting + */ +public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String workerTopicConfig, +String topicPurpose) { +Set cleanupPolicies = topicCleanupPolicy(topic); +if (cleanupPolicies == null || cleanupPolicies.isEmpty()) { Review comment: Is it possible that this will also be true if there isn't a cleanup policy configured on the topic? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #8312: KAFKA-9432 automated protocol for DescribeConfigs
tombentley commented on pull request #8312: URL: https://github.com/apache/kafka/pull/8312#issuecomment-640700423 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8672: KAFKA-10002; Improve performances of StopReplicaRequest with large number of partitions to be deleted
ijuma commented on a change in pull request #8672: URL: https://github.com/apache/kafka/pull/8672#discussion_r436791602 ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -878,8 +936,7 @@ class LogManager(logDirs: Seq[File], // Now that replica in source log directory has been successfully renamed for deletion. // Close the log, update checkpoint files, and enqueue this log to be deleted. sourceLog.close() -checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.parentDirFile, ArrayBuffer.empty) -checkpointLogStartOffsetsInDir(sourceLog.parentDirFile) +checkpointRecoveryAndLogStartOffsetsInDir(sourceLog.parentDirFile) Review comment: Well, they are tightly coupled right? The method name of `deleteSnapshotsAfterRecoveryPointCheckpoint` makes it clear that this should be called after the recovery point is checkpointed. Generally, we've had bugs whenever we left it to the callers to make the same multiple calls in sequence in multiple places. I haven't looked at this PR in detail, so there are probably good reasons to change it. Also keep in mind https://github.com/apache/kafka/pull/7929/files that tries to improve the approach on how we handle this more generally. ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -878,8 +936,7 @@ class LogManager(logDirs: Seq[File], // Now that replica in source log directory has been successfully renamed for deletion. // Close the log, update checkpoint files, and enqueue this log to be deleted. sourceLog.close() -checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.parentDirFile, ArrayBuffer.empty) -checkpointLogStartOffsetsInDir(sourceLog.parentDirFile) +checkpointRecoveryAndLogStartOffsetsInDir(sourceLog.parentDirFile) Review comment: Well, they are tightly coupled right? The method name of `deleteSnapshotsAfterRecoveryPointCheckpoint` makes it clear that this should be called after the recovery point is checkpointed. Generally, we've had bugs whenever we left it to the callers to make the same multiple calls in sequence in multiple places. I haven't looked at this PR in detail, so there are probably good reasons to change it. Also keep in mind https://github.com/apache/kafka/pull/7929/files which tries to improve the approach on how we handle this more generally. ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -878,8 +936,7 @@ class LogManager(logDirs: Seq[File], // Now that replica in source log directory has been successfully renamed for deletion. // Close the log, update checkpoint files, and enqueue this log to be deleted. sourceLog.close() -checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.parentDirFile, ArrayBuffer.empty) -checkpointLogStartOffsetsInDir(sourceLog.parentDirFile) +checkpointRecoveryAndLogStartOffsetsInDir(sourceLog.parentDirFile) Review comment: Well, they are tightly coupled right? The method name of `deleteSnapshotsAfterRecoveryPointCheckpoint` makes it clear that this should be called after the recovery point is checkpointed. Generally, we've had bugs whenever we left it to the callers to make the same multiple calls in sequence in multiple places. I haven't looked at this PR in detail, so there are probably good reasons to change it. Also keep in mind #7929 which tries to improve the approach on how we handle this more generally. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jiameixie commented on pull request #8836: KAFKA-10124:Wrong rebalance.time.ms
jiameixie commented on pull request #8836: URL: https://github.com/apache/kafka/pull/8836#issuecomment-641125571 @ijuma @huxihx @guozhangwang Call for a review. Do you think there is a good place to update `joinStart `? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #8815: HOTFIX: fix validity check in sticky assignor tests
guozhangwang merged pull request #8815: URL: https://github.com/apache/kafka/pull/8815 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #8672: KAFKA-10002; Improve performances of StopReplicaRequest with large number of partitions to be deleted
dajac commented on a change in pull request #8672: URL: https://github.com/apache/kafka/pull/8672#discussion_r436704800 ## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ## @@ -273,7 +272,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], * 6. If the partition is already paused, a new call to this function * will increase the paused count by one. */ - def abortAndPauseCleaning(topicPartition: TopicPartition): Unit = { + def abortAndPauseCleaning(topicPartition: TopicPartition, partitionDeleted: Boolean = false): Unit = { Review comment: Actually, we already have `abortCleaning` for this purpose and `abortCleaning` calls `abortAndPauseCleaning`. I was trying to avoid logging a message when the partition is deleted because it does not bring much and literally flood the log when many partitions are deleted. While re-looking at this, I have done it differently now. I have found that logs were spread between the LogCleanerManager and the LogManager and that we were logging when resuming cleaning but not all the time. I have consolidated all the cases where we want to log explicitly in helper methods. It also helps with not always having to check if `cleaner != null`. Let me know what you think. ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -912,12 +969,9 @@ class LogManager(logDirs: Seq[File], if (removedLog != null) { //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it. if (cleaner != null && !isFuture) { -cleaner.abortCleaning(topicPartition) -cleaner.updateCheckpoints(removedLog.parentDirFile) +cleaner.abortCleaning(topicPartition, partitionDeleted = true) } removedLog.renameDir(Log.logDeleteDirName(topicPartition)) - checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.parentDirFile, ArrayBuffer.empty) Review comment: I totally agree with you. I have tried different ways trying to keep a better encapsulation but I have found a really satisfying way to get there. As you pointed out, the involvement of `Partition` makes this complex. Here is my best idea so far: * We remove the calls to `asyncDelete` in the `Partition.delete` method. It seems safe to "delete the partition" while keeping the files on disk while holding the `replicaStateChangeLock` in the `ReplicaManager`. * We use a `asyncDelete` that takes a batch of logs, deletes them and checkpoint. * We rename `Partition.delete` to something like `Partition.close` as the log is not really deleted any more. What do you think? ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -912,12 +969,9 @@ class LogManager(logDirs: Seq[File], if (removedLog != null) { //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it. if (cleaner != null && !isFuture) { -cleaner.abortCleaning(topicPartition) -cleaner.updateCheckpoints(removedLog.parentDirFile) +cleaner.abortCleaning(topicPartition, partitionDeleted = true) } removedLog.renameDir(Log.logDeleteDirName(topicPartition)) - checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.parentDirFile, ArrayBuffer.empty) Review comment: I have implemented it to see. You can check out the last commit. ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -878,8 +936,7 @@ class LogManager(logDirs: Seq[File], // Now that replica in source log directory has been successfully renamed for deletion. // Close the log, update checkpoint files, and enqueue this log to be deleted. sourceLog.close() -checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.parentDirFile, ArrayBuffer.empty) -checkpointLogStartOffsetsInDir(sourceLog.parentDirFile) +checkpointRecoveryAndLogStartOffsetsInDir(sourceLog.parentDirFile) Review comment: Indeed, they are coupled from that perspective. We can keep them together in one method though. ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -869,17 +903,18 @@ class LogManager(logDirs: Seq[File], currentLogs.put(topicPartition, destLog) if (cleaner != null) { cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile) -cleaner.resumeCleaning(Seq(topicPartition)) -info(s"Compaction for partition $topicPartition is resumed") +resumeCleaning(topicPartition) } try { sourceLog.renameDir(Log.logDeleteDirName(topicPartition)) // Now that replica in source log directory has been successfully renamed for deletion. // Close the log, update checkpoint files, and enqueue this log to be deleted. sourceLog.close() -checkpointRec
[GitHub] [kafka] bbejeck merged pull request #8823: MINOR: fix HTML markup
bbejeck merged pull request #8823: URL: https://github.com/apache/kafka/pull/8823 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8836: KAFKA-10124:Wrong rebalance.time.ms
chia7712 commented on a change in pull request #8836: URL: https://github.com/apache/kafka/pull/8836#discussion_r437177352 ## File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala ## @@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging { var messagesRead = 0L var lastBytesRead = 0L var lastMessagesRead = 0L -var joinStart = 0L var joinTimeMsInSingleRound = 0L consumer.subscribe(topics.asJava, new ConsumerRebalanceListener { def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { -joinTime.addAndGet(System.currentTimeMillis - joinStart) Review comment: It seems to me that the origin code want to count the elapsed time of joining group. Hence, the ```joinStart``` is required and the following ```joinStart = System.currentTimeMillis``` is required too. For another, the initial value of ```joinStart``` is incorrect. It should be equal to ```testStartTime``` ## File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala ## @@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging { var messagesRead = 0L var lastBytesRead = 0L var lastMessagesRead = 0L -var joinStart = 0L var joinTimeMsInSingleRound = 0L consumer.subscribe(topics.asJava, new ConsumerRebalanceListener { def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { -joinTime.addAndGet(System.currentTimeMillis - joinStart) Review comment: hmmm, is there a good place to update the start time of join? ## File path: core/src/main/scala/kafka/tools/ConsumerPerformance.scala ## @@ -105,16 +105,14 @@ object ConsumerPerformance extends LazyLogging { var messagesRead = 0L var lastBytesRead = 0L var lastMessagesRead = 0L -var joinStart = 0L var joinTimeMsInSingleRound = 0L consumer.subscribe(topics.asJava, new ConsumerRebalanceListener { def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { -joinTime.addAndGet(System.currentTimeMillis - joinStart) Review comment: hmmm, is there a good place to update the start time of join if they are NOT called just once? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8805: KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group
kkonstantine commented on pull request #8805: URL: https://github.com/apache/kafka/pull/8805#issuecomment-640853227 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #8672: KAFKA-10002; Improve performances of StopReplicaRequest with large number of partitions to be deleted
dajac commented on pull request #8672: URL: https://github.com/apache/kafka/pull/8672#issuecomment-641249326 @hachikuji I have made another pass on the PR based on your input. Could you please have a second look at it? I am especially interested by getting your view on the overall approach for the checkpointing and the batch deletion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gardnervickers commented on pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories
gardnervickers commented on pull request #7929: URL: https://github.com/apache/kafka/pull/7929#issuecomment-640691697 In 6152847, I changed the approach taken in this PR to move away from managing the producer state snapshot file as part of the segment lifecycle. This was done for two reasons, first it turned out to be a bit difficult to reason about producer state snapshot files which did not have a corresponding segment file (such as the snapshot taken during shutdown). Second, moving the management of deletion to the LogSegment ended up splitting ownership of the producer state snapshot file between the `LogSegment` and the `ProducerStateManager`, which seemed to complicate when things like truncation could happen. Instead, a call is made to the `ProducerStateManager` to delete any matching producer snapshot files when `LogSegment`'s are deleted. This occurs during retention and as a result of log cleaning (compaction). For producer state snapshot files which do not have a corresponding segment, they will be cleaned up on restart (unless they are the latest snapshot file). I think this ends up being simpler while more closely matching the current set of responsibilities, where the `ProducerStateManager` "owns" creating/reading/deleting snapshot files, but I'd be interested in hearing other opinions 😄. cc/ @junrao This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #8680: URL: https://github.com/apache/kafka/pull/8680#discussion_r437124061 ## File path: core/src/main/scala/kafka/zk/ZkData.scala ## @@ -81,17 +83,26 @@ object BrokerIdsZNode { object BrokerInfo { /** - * Create a broker info with v4 json format (which includes multiple endpoints and rack) if - * the apiVersion is 0.10.0.X or above. Register the broker with v2 json format otherwise. + * - Create a broker info with v5 json format if the apiVersion is 2.6.x or above. + * - Create a broker info with v4 json format (which includes multiple endpoints and rack) if + * the apiVersion is 0.10.0.X or above but lesser than 2.6.x. Review comment: Done. ## File path: core/src/main/scala/kafka/zk/ZkData.scala ## @@ -81,17 +83,26 @@ object BrokerIdsZNode { object BrokerInfo { /** - * Create a broker info with v4 json format (which includes multiple endpoints and rack) if - * the apiVersion is 0.10.0.X or above. Register the broker with v2 json format otherwise. + * - Create a broker info with v5 json format if the apiVersion is 2.6.x or above. Review comment: Done. ## File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala ## @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit} + +import kafka.utils.{Logging, ShutdownableThread} +import kafka.zk.{FeatureZNode, FeatureZNodeStatus, KafkaZkClient, ZkVersion} +import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler} +import org.apache.kafka.common.internals.FatalExitError + +import scala.concurrent.TimeoutException +import scala.util.control.Exception.ignoring + +/** + * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification + * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated + * to the latest features read from ZK. The cache updates are serialized through a single + * notification processor thread. + * + * @param zkClient the Zookeeper client + */ +class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging { + + /** + * Helper class used to update the FinalizedFeatureCache. + * + * @param featureZkNodePath the path to the ZK feature node to be read + * @param maybeNotifyOnce an optional latch that can be used to notify the caller when an + *updateOrThrow() operation is over + */ + private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) { + +def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty) + +/** + * Updates the feature cache in FinalizedFeatureCache with the latest features read from the + * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable + * exception is raised. + * + * NOTE: if a notifier was provided in the constructor, then, this method can be invoked exactly + * once successfully. A subsequent invocation will raise an exception. + * + * @throws IllegalStateException, if a non-empty notifier was provided in the constructor, and + * this method is called again after a successful previous invocation. + * @throws FeatureCacheUpdateException, if there was an error in updating the + * FinalizedFeatureCache. + * @throws RuntimeException, if there was a failure in reading/deserializing the + * contents of the feature ZK node. + */ +def updateLatestOrThrow(): Unit = { + maybeNotifyOnce.foreach(notifier => { +if (notifier.getCount != 1) { + throw new IllegalStateException( +"Can not notify after updateLatestOrThrow was called more than once successfully.") +} + }) + + debug(s"Reading feature ZK node at path: $featureZkNodePath") + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath) + + // There are 4 cases: + // + // (empty dataBytes, valid version
[GitHub] [kafka] Lucent-Wong removed a comment on pull request #8453: KAFKA-9841: Connector and Task duplicated when a worker join with old…
Lucent-Wong removed a comment on pull request #8453: URL: https://github.com/apache/kafka/pull/8453#issuecomment-641192433 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sneakyburro commented on a change in pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs
sneakyburro commented on a change in pull request #8752: URL: https://github.com/apache/kafka/pull/8752#discussion_r437317697 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -842,7 +842,14 @@ public synchronized ProcessorTopology buildTopology() { nodeGroup.addAll(value); } nodeGroup.removeAll(globalNodeGroups()); - +for (final NodeFactory entry : nodeFactories.values()) { +if (entry instanceof ProcessorNodeFactory) { +ProcessorNodeFactory factory = (ProcessorNodeFactory) entry; +if (factory.supplier.get() == factory.supplier.get()) { +throw new TopologyException("topology has singleton result of ProcessorSupplier " + factory.name); +} +} +} Review comment: That's a great idea. I introduced a util class `TopologyUtil` under processor package. ## File path: docs/streams/developer-guide/processor-api.html ## @@ -439,6 +439,9 @@ Connecting Processors and State StoresTopology code, accessing it in the processor’s init() method will also throw an exception at runtime, indicating the state store is not accessible from this processor. +Note that there could be multiple ProcessorContext instances initialize your Processor during initialization. Review comment: Adopted and revised a little. Thanks! ## File path: streams/src/main/java/org/apache/kafka/streams/Topology.java ## @@ -648,11 +648,12 @@ public synchronized Topology addSink(final String name, * If {@code supplier} provides stores via {@link ConnectedStoreProvider#stores()}, the provided {@link StoreBuilder}s * will be added to the topology and connected to this processor automatically. * - * @param name the unique name of the processor node - * @param supplier the supplier used to obtain this node's {@link Processor} instance - * @param parentNames the name of one or more source or processor nodes whose output records this processor should receive - * and process - * @return itself + * @param name the unique name of the processor node + * @param supplier the supplier used to construct this node's {@link Processor} instance; the implementation of supplier + * should return a newly constructed {@link Processor} instance inside the scope of the lambda expression. Review comment: I reverted java docs on parameter and added web-docs-like docs on main java docs for this function. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ## @@ -200,6 +200,9 @@ public StateStore getStateStore(final String name) { } final String sendTo = toInternal.child(); +if (currentNode() == null) { +throw new StreamsException("Current node is unknown when forwarding to: " + key); Review comment: Made the change! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8453: KAFKA-9841: Connector and Task duplicated when a worker join with old…
kkonstantine commented on pull request #8453: URL: https://github.com/apache/kafka/pull/8453#issuecomment-641102055 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic
abbccdda commented on a change in pull request #8832: URL: https://github.com/apache/kafka/pull/8832#discussion_r436913810 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -147,6 +147,46 @@ public String toString() { } } +/** + * TopicNode is the a topic node abstraction for graph built with TopicNode and TopicsInfo, the graph is useful + * when in certain cases traverse is needed. For example, method setRepartitionTopicMetadataNumberOfPartitions + * internally do a DFS search along with the graph. + * + TopicNode("t1") TopicNode("t2") TopicNode("t6") TopicNode("t7") +\ / \ / + TopicsInfo(source = (t1,t2), sink = (t3,t4)) TopicsInfo(source = (t6,t7), sink = (t4)) +/ \ / + / \ / + /\ / + /\ / + / \/ + TopicNode("t3") TopicNode("t4") +\ + TopicsInfo(source = (t3), sink = ()) + + t3 = max(t1,t2) + t4 = max(max(t1,t2), max(t6,t7)) + */ +private static class TopicNode { +public final String topicName; +public final Set upStreams; // upStream TopicsInfo's sinkTopics contains this +public final Set downStreams; // downStreams TopicsInfo's sourceTopics contains this Review comment: I haven't seen this struct being used. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion, */ private void setRepartitionTopicMetadataNumberOfPartitions(final Map repartitionTopicMetadata, final Map topicGroups, - final Cluster metadata) { -boolean numPartitionsNeeded; -do { -numPartitionsNeeded = false; - -for (final TopicsInfo topicsInfo : topicGroups.values()) { -for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) { -final Optional maybeNumPartitions = repartitionTopicMetadata.get(topicName) - .numberOfPartitions(); -Integer numPartitions = null; - -if (!maybeNumPartitions.isPresent()) { -// try set the number of partitions for this repartition topic if it is not set yet -for (final TopicsInfo otherTopicsInfo : topicGroups.values()) { -final Set otherSinkTopics = otherTopicsInfo.sinkTopics; - -if (otherSinkTopics.contains(topicName)) { -// if this topic is one of the sink topics of this topology, -// use the maximum of all its source topic partitions as the number of partitions -for (final String sourceTopicName : otherTopicsInfo.sourceTopics) { -Integer numPartitionsCandidate = null; -// It is possible the sourceTopic is another internal topic, i.e, -// map().join().join(map()) -if (repartitionTopicMetadata.containsKey(sourceTopicName)) { -if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) { -numPartitionsCandidate = - repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get(); -} -} else { -final Integer count = metadata.partitionCountForTopic(sourceTopicName); -if (count == null) { -throw new IllegalStateException( -"No partition count found for source topic " -+ sourceTopicName -
[GitHub] [kafka] ijuma commented on pull request #8802: MINOR: Fix fetch session epoch comment in `FetchRequest.json`
ijuma commented on pull request #8802: URL: https://github.com/apache/kafka/pull/8802#issuecomment-640721062 Request/response names are just labels for humans, so you can change them without affecting compatibility. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management
mjsax commented on a change in pull request #8833: URL: https://github.com/apache/kafka/pull/8833#discussion_r436864561 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ## @@ -108,13 +107,20 @@ public void completeRestoration() { } @Override -public void prepareSuspend() { -log.trace("No-op prepareSuspend with state {}", state()); +public void suspendDirty() { +log.trace("No-op suspend dirty with state {}", state()); +if (state() == State.RUNNING) { +transitionTo(State.SUSPENDED); +} } @Override -public void suspend() { -log.trace("No-op suspend with state {}", state()); +public void suspendCleanAndPrepareCommit() { +log.trace("No-op suspend clean with state {}", state()); +if (state() == State.RUNNING) { +transitionTo(State.SUSPENDED); +} +prepareCommit(); Review comment: When we suspend, we _always_ want to commit. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ## @@ -151,69 +157,23 @@ public void postCommit() { } } -@Override -public void prepareCloseClean() { -prepareClose(true); - -log.info("Prepared clean close"); -} - -@Override -public void prepareCloseDirty() { -prepareClose(false); - -log.info("Prepared dirty close"); -} - -/** - * 1. commit if we are running and clean close; - * 2. close the state manager. - * - * @throws TaskMigratedException all the task has been migrated - * @throws StreamsException fatal error, should close the thread - */ -private void prepareClose(final boolean clean) { Review comment: This logic is now followed in `suspendCleanAndPrepareCommit()` that must be called before a task can be closed. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -246,82 +245,39 @@ public void completeRestoration() { } } -/** - * - * the following order must be followed: - * 1. first close topology to make sure all cached records in the topology are processed - * 2. then flush the state, send any left changelog records - * 3. then flush the record collector - * 4. then commit the record collector -- for EOS this is the synchronization barrier - * 5. then checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed - * - * - * @throws TaskMigratedException if committing offsets failed (non-EOS) - * or if the task producer got fenced (EOS) - */ @Override -public void prepareSuspend() { Review comment: This is now done via `suspendAndPrepareCommit()` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -246,82 +245,39 @@ public void completeRestoration() { } } -/** - * - * the following order must be followed: - * 1. first close topology to make sure all cached records in the topology are processed - * 2. then flush the state, send any left changelog records - * 3. then flush the record collector - * 4. then commit the record collector -- for EOS this is the synchronization barrier - * 5. then checkpoint the state manager -- even if we crash before this step, EOS is still guaranteed - * - * - * @throws TaskMigratedException if committing offsets failed (non-EOS) - * or if the task producer got fenced (EOS) - */ @Override -public void prepareSuspend() { -switch (state()) { -case CREATED: -case SUSPENDED: -// do nothing -log.trace("Skip prepare suspending since state is {}", state()); - -break; - -case RESTORING: -stateMgr.flush(); -log.info("Prepare suspending restoring"); - -break; - -case RUNNING: -closeTopology(true); - -stateMgr.flush(); -recordCollector.flush(); - -log.info("Prepare suspending running"); - -break; - -case CLOSED: -throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + id); - -default: -throw new IllegalStateException("Unknown state " + state() + " while suspending active task " + id); -} +public void suspendDirty() { +log.info("Suspending dirty"); +suspend(false); } @Override -public void suspend() { +public void suspendCleanAndPrepareCommit() { +log.info("Suspending clean"); +suspend(true); +
[GitHub] [kafka] guozhangwang commented on a change in pull request #8833: KAFKA-9441: remove prepareClose() to simplify task management
guozhangwang commented on a change in pull request #8833: URL: https://github.com/apache/kafka/pull/8833#discussion_r437090376 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java ## @@ -125,40 +126,21 @@ public boolean isValidTransition(final State newState) { void postCommit(); -/** - * @throws TaskMigratedException all the task has been migrated - * @throws StreamsException fatal error, should close the thread - */ -void prepareSuspend(); +void suspendDirty(); Review comment: It seems to me that the reason we want to have two suspends and also merging the suspendClean with prepareCommit is that for `StreamTask`, if state `SUSPENDED` we want to skip prepareCommit. I feel it is a tad cleaner to separate them further into one `suspend` which does not try to call prepareCommit, and rely on whether prepareCommit should do anything or not based on both `state` (i.e. only running/restoring/suspended need to commit) and `commitNeeded` flag. With that we can convert the callers as follows: 1. suspendDirty(): just call suspend(), do not call prepareCommit(). 2. suspendCleanAndPrepareCommit(): 2.a) from `task.closeAndRecycleState`: call suspend(), and then call prepareCommit(); the second would check `commitNeeded` and if it was false, we would not try to flush / commit. Hence if the task just transited from other states to suspended, then `commitNeeded` should still be true. 2.b) from `taskManager` directly: same as above, but for this call we always follow with a `committableOffsetsAndMetadata` getting the map of offsets, so I'm thinking we can merge `prepareCommit` with `committableOffsetsAndMetadata` as well: if the state is right and `commitNeeded` is set, execute the prepare committing procedure, and accumulate the offsets, otherwise returning `null` indicating no offsets needed to be committed. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java ## @@ -125,40 +126,21 @@ public boolean isValidTransition(final State newState) { void postCommit(); -/** Review comment: I'm wondering if we could merge `committableOffsetsAndMetadata` with `prepareCommit` as well, letting the latter to return the map? See my other comment aside. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8676: KAFKA-10005: Decouple RestoreListener from RestoreCallback
vvcephei commented on a change in pull request #8676: URL: https://github.com/apache/kafka/pull/8676#discussion_r436755332 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java ## @@ -237,25 +236,6 @@ public void shouldNotRemoveStatisticsFromInjectedMetricsRecorderOnCloseWhenUserP verify(metricsRecorder); } -@Test -public void shouldRespectBulkloadOptionsDuringInit() { -rocksDBStore.init(context, rocksDBStore); - -final StateRestoreListener restoreListener = context.getRestoreListener(rocksDBStore.name()); - -restoreListener.onRestoreStart(null, rocksDBStore.name(), 0L, 0L); - -assertThat(rocksDBStore.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30)); Review comment: Sounds good. I was thrown off by `userSpecifiedOptions.prepareForBulkLoad()` because it looks like the user could have specified some bulk loading options, but I took another look, and I see that that's just the variable name. The actual method is internal, and there's no way to configure bulk-loading options specifically as a user. So, I'm satisfied. Thanks! ## File path: streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java ## @@ -29,9 +29,12 @@ * Users desiring stateful operations will need to provide synchronization internally in * the {@code StateRestorerListener} implementation. * - * When used for monitoring a single {@link StateStore} using either {@link AbstractNotifyingRestoreCallback} or - * {@link AbstractNotifyingBatchingRestoreCallback} no synchronization is necessary - * as each StreamThread has its own StateStore instance. + * Note that this listener is only registered at the per-client level and users can base on the {@code storeName} Review comment: Thanks! ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ## @@ -210,30 +207,6 @@ public void shouldFindSingleStoreForChangelog() { ); } -@Test -public void shouldRestoreStoreWithRestoreCallback() { Review comment: Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: MINOR: Do not disable heartbeat during Rebalance
guozhangwang commented on a change in pull request #8834: URL: https://github.com/apache/kafka/pull/8834#discussion_r437069073 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) { } private void recordRebalanceFailure() { -state = MemberState.UNJOINED; Review comment: This is a piggy-backed cleanup: we call resetGenerationXXX in the join/sync-group handler func and hence should not re-call it again here. ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -652,9 +651,10 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture fut } else if (error == Errors.MEMBER_ID_REQUIRED) { // Broker requires a concrete member id to be allowed to join the group. Update member id // and send another join group request in next cycle. +String memberId = joinResponse.data().memberId(); +log.debug("Attempt to join group returned {} error. Will set the member id as {} and then rejoin", error, memberId); Review comment: Minor log4j improvement. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8838: MINOR: fix flaky TopicCommandWithAdminClientTest.testDescribeUnderRep…
chia7712 commented on a change in pull request #8838: URL: https://github.com/apache/kafka/pull/8838#discussion_r437289845 ## File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala ## @@ -697,6 +697,10 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin assertTrue(simpleDescribeOutputRows(0).startsWith(s"Topic: $testTopicName")) assertEquals(2, simpleDescribeOutputRows.size) +// let's wait until the LAIR is NOT propagated +TestUtils.waitUntilTrue(() => !adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get().containsKey(tp), Review comment: make sure the reassignment is completed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8676: KAFKA-10005: Decouple RestoreListener from RestoreCallback
guozhangwang commented on pull request #8676: URL: https://github.com/apache/kafka/pull/8676#issuecomment-640892231 Merged to trunk and cherry-picked to 2.6 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bellemare commented on pull request #8764: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes
bellemare commented on pull request #8764: URL: https://github.com/apache/kafka/pull/8764#issuecomment-640926702 I swear it ran checkstyle when I compiled it... bah! I guess not. I did run ./gradlew :streams:test it had one unrelated error (I think) so lets see how it does now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #8839: KIP-585: Documentation
tombentley commented on pull request #8839: URL: https://github.com/apache/kafka/pull/8839#issuecomment-641201269 @kkonstantine please could you review this, thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #8805: KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group
rhauch commented on a change in pull request #8805: URL: https://github.com/apache/kafka/pull/8805#discussion_r436925315 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ## @@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that // can be used to calculate derived sets log.debug("Previous assignments: {}", previousAssignment); +int lastCompletedGenerationId = coordinator.lastCompletedGenerationId(); +if (previousGenerationId != lastCompletedGenerationId) { +log.debug("Emptying previous assignments due to generation mismatch between previous " ++ "generation ID {} and last completed generation ID {} since the last assignment: {}", +previousGenerationId, lastCompletedGenerationId, previousAssignment); Review comment: Can we make this a little easier to understand for most users? I think it might be sufficient to add some combination of: * what this means (e.g., the worker was partitioned and missed at least one rebalance rounds, likely due to a network issue), and * what resulted (e.g., the workers gave up its tasks in case the cluster had reassigned them to another worker). And, should this be debug or info or warn? Warn seems wrong, since the user shouldn't do anything, but excessive #s of these could signal the need for additional tuning. WDYT? ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ## @@ -361,6 +369,14 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments, log.debug("Found the following connectors and tasks missing from previous assignments: " + lostAssignments); +if (previousMembers.size() == memberConfigs.size() && scheduledRebalance <= 0) { +log.debug("Group size is same between rebalances. Lost assignments are probably due to lost SyncGroup " ++ "responses. Treating lost tasks as new tasks"); Review comment: Similar comment to that above. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ## @@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that // can be used to calculate derived sets log.debug("Previous assignments: {}", previousAssignment); +int lastCompletedGenerationId = coordinator.lastCompletedGenerationId(); +if (previousGenerationId != lastCompletedGenerationId) { +log.debug("Emptying previous assignments due to generation mismatch between previous " ++ "generation ID {} and last completed generation ID {} since the last assignment: {}", +previousGenerationId, lastCompletedGenerationId, previousAssignment); Review comment: Sounds good, though it'd be better to have a single (long) log message to prevent them from being separated by other log messages from other threads. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ## @@ -361,6 +369,14 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments, log.debug("Found the following connectors and tasks missing from previous assignments: " + lostAssignments); +if (previousMembers.size() == memberConfigs.size() && scheduledRebalance <= 0) { +log.debug("Group size is same between rebalances. Lost assignments are probably due to lost SyncGroup " ++ "responses. Treating lost tasks as new tasks"); Review comment: Sounds good. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ## @@ -361,6 +373,16 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments, log.debug("Found the following connectors and tasks missing from previous assignments: " + lostAssignments); +if (previousMembers.size() == memberConfigs.size() && scheduledRebalance <= 0) { Review comment: Is it enough to trust that the # of workers has not changed, or should we compare the members, via something like: ``` if (previousMembers.equals(memberConfigs.keySet()) && scheduledRebalance <= 0) { ``` IOW, what happens if one worker disappeared about the same time that an operator added a new worker? IIUC from the integration tests, this logic
[GitHub] [kafka] vvcephei commented on a change in pull request #8818: KAFKA-10086: Integration test for ensuring warmups are effective
vvcephei commented on a change in pull request #8818: URL: https://github.com/apache/kafka/pull/8818#discussion_r436758351 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -1084,12 +1088,15 @@ private boolean populateActiveTaskAndPartitionsLists(final List // If the partition is new to this consumer but is still owned by another, remove from the assignment // until it has been revoked and can safely be reassigned according to the COOPERATIVE protocol if (newPartitionForConsumer && allOwnedPartitions.contains(partition)) { -log.info("Removing task {} from assignment until it is safely revoked in followup rebalance", taskId); -clientState.unassignActive(taskId); Review comment: Ah, I see. I'll put it back with a comment that we're just using it for the internal assertions. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -514,17 +515,24 @@ void handleLostAll() { /** * Compute the offset total summed across all stores in a task. Includes offset sum for any tasks we own the - * lock for, which includes assigned and unassigned tasks we locked in {@link #tryToLockAllNonEmptyTaskDirectories()} - * - * @return Map from task id to its total offset summed across all state stores + * lock for, which includes assigned and unassigned tasks we locked in {@link #tryToLockAllNonEmptyTaskDirectories()}. + * Does not include stateless or non-logged tasks. */ public Map getTaskOffsetSums() { final Map taskOffsetSums = new HashMap<>(); -for (final TaskId id : lockedTaskDirectories) { +// Not all tasks will create directories, and there may be directories for tasks we don't currently own, +// so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should +// just have an empty changelogOffsets map. +for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.keySet())) { final Task task = tasks.get(id); if (task != null) { -taskOffsetSums.put(id, sumOfChangelogOffsets(id, task.changelogOffsets())); +final Map changelogOffsets = task.changelogOffsets(); +if (changelogOffsets.isEmpty()) { +log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", id); Review comment: Yep! We don't know that it's stateless, just that it didn't report any changelogs. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java ## @@ -142,4 +169,184 @@ public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, Il assertThat(taskAssignor, instanceOf(MyTaskAssignor.class)); } } + +@Test +public void shouldScaleOutWithWarmupTasksAndInMemoryStores() throws InterruptedException { +// NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum +// value is one minute +shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName))); +} + +@Test +public void shouldScaleOutWithWarmupTasksAndPersistentStores() throws InterruptedException { +// NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum Review comment: https://en.wikipedia.org/wiki/Nota_bene :) ## File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java ## @@ -142,4 +169,184 @@ public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, Il assertThat(taskAssignor, instanceOf(MyTaskAssignor.class)); } } + +@Test +public void shouldScaleOutWithWarmupTasksAndInMemoryStores() throws InterruptedException { +// NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum +// value is one minute +shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName))); +} + +@Test +public void shouldScaleOutWithWarmupTasksAndPersistentStores() throws InterruptedException { +// NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum +// value is one minute +shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.persistentKeyValueStore(storeName))); +} + +private void shouldScaleOutWithWarmupTasks(final Function>> materializedFunction) throws InterruptedException { +final String testId = safeUniqueTestName(getClass(), testName); +final St
[GitHub] [kafka] guozhangwang commented on pull request #8816: MINOR: Print all removed dynamic members during join complete
guozhangwang commented on pull request #8816: URL: https://github.com/apache/kafka/pull/8816#issuecomment-640934122 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic
feyman2016 commented on a change in pull request #8832: URL: https://github.com/apache/kafka/pull/8832#discussion_r437529829 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -147,6 +147,46 @@ public String toString() { } } +/** + * TopicNode is the a topic node abstraction for graph built with TopicNode and TopicsInfo, the graph is useful Review comment: Fixed ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -147,6 +147,46 @@ public String toString() { } } +/** + * TopicNode is the a topic node abstraction for graph built with TopicNode and TopicsInfo, the graph is useful Review comment: Updated ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -513,66 +553,94 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion, */ private void setRepartitionTopicMetadataNumberOfPartitions(final Map repartitionTopicMetadata, final Map topicGroups, - final Cluster metadata) { -boolean numPartitionsNeeded; -do { -numPartitionsNeeded = false; - -for (final TopicsInfo topicsInfo : topicGroups.values()) { -for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) { -final Optional maybeNumPartitions = repartitionTopicMetadata.get(topicName) - .numberOfPartitions(); -Integer numPartitions = null; - -if (!maybeNumPartitions.isPresent()) { -// try set the number of partitions for this repartition topic if it is not set yet -for (final TopicsInfo otherTopicsInfo : topicGroups.values()) { -final Set otherSinkTopics = otherTopicsInfo.sinkTopics; - -if (otherSinkTopics.contains(topicName)) { -// if this topic is one of the sink topics of this topology, -// use the maximum of all its source topic partitions as the number of partitions -for (final String sourceTopicName : otherTopicsInfo.sourceTopics) { -Integer numPartitionsCandidate = null; -// It is possible the sourceTopic is another internal topic, i.e, -// map().join().join(map()) -if (repartitionTopicMetadata.containsKey(sourceTopicName)) { -if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) { -numPartitionsCandidate = - repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get(); -} -} else { -final Integer count = metadata.partitionCountForTopic(sourceTopicName); -if (count == null) { -throw new IllegalStateException( -"No partition count found for source topic " -+ sourceTopicName -+ ", but it should have been." -); -} -numPartitionsCandidate = count; -} - -if (numPartitionsCandidate != null) { -if (numPartitions == null || numPartitionsCandidate > numPartitions) { -numPartitions = numPartitionsCandidate; -} -} -} -} -} - -// if we still have not found the right number of partitions, -// another iteration is needed -if (numPartitions == null) { -numPartitionsNeeded = true; -} else { - repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions); -} -} +
[GitHub] [kafka] xvrl commented on pull request #8800: MINOR: Fix incorrect GC log size with JDK9+
xvrl commented on pull request #8800: URL: https://github.com/apache/kafka/pull/8800#issuecomment-640914664 @ijuma tested this change locally, broker starts up fine and gc logs show up. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wicknicks commented on a change in pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter
wicknicks commented on a change in pull request #8829: URL: https://github.com/apache/kafka/pull/8829#discussion_r436843713 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java ## @@ -87,6 +87,12 @@ public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMi public Future executeFailed(Stage stage, Class executingClass, ConsumerRecord consumerRecord, Throwable error) { +if (!withinToleranceLimits()) { +errorHandlingMetrics.recordFailure(); +markAsFailed(); +throw new ConnectException("Tolerance exceeded in the errant record reporter", error); +} + Review comment: if we attempt an operation and it fails, `recordFailure` will be incremented, but `recordError` only tracks the cases where the when we encounter a problem that the framework cannot retry or skip. In the first case, we may still be able to retry or skip the record. In the `executeFailed` scenario, we should `recordFailure()` every time, and only `recordError` only when we have to fail the task. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java ## @@ -87,6 +87,12 @@ public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMi public Future executeFailed(Stage stage, Class executingClass, ConsumerRecord consumerRecord, Throwable error) { +if (!withinToleranceLimits()) { +errorHandlingMetrics.recordFailure(); +markAsFailed(); +throw new ConnectException("Tolerance exceeded in the errant record reporter", error); Review comment: Since this is called from the task(), is it enough to just raise an exception? that may be swallowed by the task, and could continue processing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it
mjsax merged pull request #8803: URL: https://github.com/apache/kafka/pull/8803 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bellemare commented on a change in pull request #8764: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes
bellemare commented on a change in pull request #8764: URL: https://github.com/apache/kafka/pull/8764#discussion_r436871862 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java ## @@ -243,17 +244,17 @@ private void validateTopologyCanProcessData(final StreamsBuilder builder) { final String safeTestName = safeUniqueTestName(getClass(), testName); config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-" + safeTestName); config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy"); -config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); +config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class.getName()); config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); config.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), config)) { -final TestInputTopic aTopic = topologyTestDriver.createInputTopic("A", new StringSerializer(), new StringSerializer()); -final TestInputTopic bTopic = topologyTestDriver.createInputTopic("B", new StringSerializer(), new StringSerializer()); -final TestOutputTopic output = topologyTestDriver.createOutputTopic("output", new StringDeserializer(), new StringDeserializer()); -aTopic.pipeInput("a1", "b1-alpha"); -bTopic.pipeInput("b1", "beta"); -final Map x = output.readKeyValuesToMap(); -assertThat(x, is(Collections.singletonMap("a1", "(b1-alpha,(b1-alpha,beta))"))); +final TestInputTopic aTopic = topologyTestDriver.createInputTopic("A", new IntegerSerializer(), new StringSerializer()); +final TestInputTopic bTopic = topologyTestDriver.createInputTopic("B", new IntegerSerializer(), new StringSerializer()); +final TestOutputTopic output = topologyTestDriver.createOutputTopic("output", new IntegerDeserializer(), new StringDeserializer()); +aTopic.pipeInput(1, "1-alpha"); +bTopic.pipeInput(1, "beta"); Review comment: Agreed. Good catch. ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java ## @@ -181,60 +182,60 @@ public void shouldWorkWithDefaultAndProducedSerdes() { public void shouldUseExpectedTopicsWithSerde() { final String applicationId = "ktable-ktable-joinOnForeignKey"; final Properties streamsConfig = mkProperties(mkMap( -mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId), -mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:"), -mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()) +mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId), +mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:"), +mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()) )); final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope(); final StreamsBuilder builder = new StreamsBuilder(); -final KTable left = builder.table( -LEFT_TABLE, -Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), - serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) +final KTable left = builder.table( +LEFT_TABLE, +Consumed.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true), +serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) ); -final KTable right = builder.table( -RIGHT_TABLE, -Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), - serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) +final KTable right = builder.table( +RIGHT_TABLE, +Consumed.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true), +serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)) ); left.join( -right, -value -> value.split("\\|")[1], -(value1, value2) -> "(" + value1 + "," + value2 + ")", -Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), streamsConfig, false) -)) -.toStream() -.to(OUTPUT); +right, +value -> Integer.parseInt(value.split("\\|")[1]), +(value1, value2) -> "(" + value1 + "," + value2 + ")", +
[GitHub] [kafka] abbccdda commented on a change in pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException
abbccdda commented on a change in pull request #8822: URL: https://github.com/apache/kafka/pull/8822#discussion_r436800666 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ## @@ -1050,4 +1062,43 @@ public String toString() { '}'; } } + +public static class LogTruncation { +public final TopicPartition topicPartition; +public final FetchPosition fetchPosition; +public final Optional divergentOffsetOpt; + +public LogTruncation(TopicPartition topicPartition, + FetchPosition fetchPosition, + Optional divergentOffsetOpt) { +this.topicPartition = topicPartition; +this.fetchPosition = fetchPosition; Review comment: After a second thought, I don't feel strong about it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] msilb commented on pull request #1596: KAFKA-1543: Change replication factor during partition map generation
msilb commented on pull request #1596: URL: https://github.com/apache/kafka/pull/1596#issuecomment-641312887 I see the parent issue has been open since 2014. Any idea when this finally is going to be implemented? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8788: MINOR: Remove unused isSticky assert out from tests only do constrainedAssign
mjsax commented on pull request #8788: URL: https://github.com/apache/kafka/pull/8788#issuecomment-640741662 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #7384: KAFKA-8938 - Connect - Improve Allocations During Struct Validation
kkonstantine commented on pull request #7384: URL: https://github.com/apache/kafka/pull/7384#issuecomment-641103428 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9923) Join window store duplicates can be compacted in changelog
[ https://issues.apache.org/jira/browse/KAFKA-9923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-9923: --- Fix Version/s: (was: 2.6.0) > Join window store duplicates can be compacted in changelog > --- > > Key: KAFKA-9923 > URL: https://issues.apache.org/jira/browse/KAFKA-9923 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: Bruno Cadonna >Priority: Blocker > > Stream-stream joins use the regular `WindowStore` implementation but with > `retainDuplicates` set to true. To allow for duplicates while using the same > unique-key underlying stores we just wrap the key with an incrementing > sequence number before inserting it. > This wrapping occurs at the innermost layer of the store hierarchy, which > means the duplicates must first pass through the changelogging layer. At this > point the keys are still identical. So, we end up sending the records to the > changelog without distinct keys and therefore may lose the older of the > duplicates during compaction. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9923) Join window store duplicates can be compacted in changelog
[ https://issues.apache.org/jira/browse/KAFKA-9923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman resolved KAFKA-9923. Resolution: Not A Problem [~cadonna] pointed out that this actually ins't a problem/has been fixed by KAFKA-5804. We could certainly stand to do some cleaning up around the duplicates handling but at least we aren't losing data! > Join window store duplicates can be compacted in changelog > --- > > Key: KAFKA-9923 > URL: https://issues.apache.org/jira/browse/KAFKA-9923 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: Bruno Cadonna >Priority: Blocker > Fix For: 2.6.0 > > > Stream-stream joins use the regular `WindowStore` implementation but with > `retainDuplicates` set to true. To allow for duplicates while using the same > unique-key underlying stores we just wrap the key with an incrementing > sequence number before inserting it. > This wrapping occurs at the innermost layer of the store hierarchy, which > means the duplicates must first pass through the changelogging layer. At this > point the keys are still identical. So, we end up sending the records to the > changelog without distinct keys and therefore may lose the older of the > duplicates during compaction. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it
mjsax commented on pull request #8803: URL: https://github.com/apache/kafka/pull/8803#issuecomment-640934234 Merged to `trunk` and cherry-picked to `2.6` branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories
ijuma commented on a change in pull request #7929: URL: https://github.com/apache/kafka/pull/7929#discussion_r436792477 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -2237,7 +2209,11 @@ class Log(@volatile private var _dir: File, def deleteSegments(): Unit = { info(s"Deleting segments ${segments.mkString(",")}") maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") { -segments.foreach(_.deleteIfExists()) +segments.foreach { + case segment => Review comment: Nit: `case` and newline are unnecessary here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)
junrao commented on a change in pull request #8680: URL: https://github.com/apache/kafka/pull/8680#discussion_r437049072 ## File path: core/src/main/scala/kafka/zk/ZkData.scala ## @@ -81,17 +83,26 @@ object BrokerIdsZNode { object BrokerInfo { /** - * Create a broker info with v4 json format (which includes multiple endpoints and rack) if - * the apiVersion is 0.10.0.X or above. Register the broker with v2 json format otherwise. + * - Create a broker info with v5 json format if the apiVersion is 2.6.x or above. Review comment: 2.6.x => 2.7.x ## File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala ## @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit} + +import kafka.utils.{Logging, ShutdownableThread} +import kafka.zk.{FeatureZNode, FeatureZNodeStatus, KafkaZkClient, ZkVersion} +import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler} +import org.apache.kafka.common.internals.FatalExitError + +import scala.concurrent.TimeoutException +import scala.util.control.Exception.ignoring + +/** + * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification + * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated + * to the latest features read from ZK. The cache updates are serialized through a single + * notification processor thread. + * + * @param zkClient the Zookeeper client + */ +class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging { + + /** + * Helper class used to update the FinalizedFeatureCache. + * + * @param featureZkNodePath the path to the ZK feature node to be read + * @param maybeNotifyOnce an optional latch that can be used to notify the caller when an + *updateOrThrow() operation is over + */ + private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) { + +def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty) + +/** + * Updates the feature cache in FinalizedFeatureCache with the latest features read from the + * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable + * exception is raised. + * + * NOTE: if a notifier was provided in the constructor, then, this method can be invoked exactly + * once successfully. A subsequent invocation will raise an exception. + * + * @throws IllegalStateException, if a non-empty notifier was provided in the constructor, and + * this method is called again after a successful previous invocation. + * @throws FeatureCacheUpdateException, if there was an error in updating the + * FinalizedFeatureCache. + * @throws RuntimeException, if there was a failure in reading/deserializing the + * contents of the feature ZK node. + */ +def updateLatestOrThrow(): Unit = { + maybeNotifyOnce.foreach(notifier => { +if (notifier.getCount != 1) { + throw new IllegalStateException( +"Can not notify after updateLatestOrThrow was called more than once successfully.") +} + }) + + debug(s"Reading feature ZK node at path: $featureZkNodePath") + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath) + + // There are 4 cases: + // + // (empty dataBytes, valid version) => The empty dataBytes will fail FeatureZNode deserialization. + // FeatureZNode, when present in ZK, can not have empty contents. + // (non-empty dataBytes, valid version) => This is a valid case, and should pass FeatureZNode deserialization + // if dataBytes contains valid data. + // (empty dataBytes, unknown version) => This is a valid case, and this can happen if the FeatureZNode + // does not exist in ZK. + // (non-empty dataBytes,
[GitHub] [kafka] chia7712 opened a new pull request #8837: KAFKA-10125 The partition which is removing should be considered to b…
chia7712 opened a new pull request #8837: URL: https://github.com/apache/kafka/pull/8837 When a reassignment is still in progress, the replica which is either removing or adding should be considered to be under reassignment. However, TopicCommand still print the partition which is removing. issue: https://issues.apache.org/jira/browse/KAFKA-10125 related to d63e0181bb7b9b4f5ed088abc00d7b32aeb0 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #8802: MINOR: Fix fetch session epoch comment in `FetchRequest.json`
hachikuji merged pull request #8802: URL: https://github.com/apache/kafka/pull/8802 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8764: KAFKA-10049: Fixed FKJ bug where wrapped serdes are set incorrectly when using default StreamsConfig serdes
vvcephei commented on pull request #8764: URL: https://github.com/apache/kafka/pull/8764#issuecomment-640686536 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Lucent-Wong commented on pull request #8453: KAFKA-9841: Connector and Task duplicated when a worker join with old…
Lucent-Wong commented on pull request #8453: URL: https://github.com/apache/kafka/pull/8453#issuecomment-641192263 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #8823: MINOR: fix HTML markup
bbejeck commented on pull request #8823: URL: https://github.com/apache/kafka/pull/8823#issuecomment-640729994 Merged #8823 into trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sneakyburro commented on pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs
sneakyburro commented on pull request #8752: URL: https://github.com/apache/kafka/pull/8752#issuecomment-640936812 > @sneakyburro -- Just a heads up: code freeze for 2.6 release in on Wednesday. If you want to get the PR into the release, please address the review comments soon. Otherwise, this will slip and go into 2.7. I'll update the PR tonight. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8818: KAFKA-10086: Integration test for ensuring warmups are effective
ableegoldman commented on a change in pull request #8818: URL: https://github.com/apache/kafka/pull/8818#discussion_r436825265 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -86,7 +87,7 @@ private boolean rebalanceInProgress = false; // if we are in the middle of a rebalance, it is not safe to commit // includes assigned & initialized tasks and unassigned tasks we locked temporarily during rebalance -private Set lockedTaskDirectories = new HashSet<>(); +private final Set lockedTaskDirectories = new HashSet<>(); Review comment: Has checkstyle just been dropping the ball lately? Could we be unknowingly suppressing this...? ## File path: streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java ## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.processor.StateRestoreListener; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener; +import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.util.Collection; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkObjectProperties; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +@Category(IntegrationTest.class) +public class HighAvailabilityTaskAssignorIntegrationTest { Review comment: Cool, I was kind of hoping you would put this in a separate integration test class This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8752: KAFKA-10036 early check singleton ProcessorSupplier and improve docs
mjsax commented on pull request #8752: URL: https://github.com/apache/kafka/pull/8752#issuecomment-640935840 @sneakyburro -- Just a heads up: code freeze for 2.6 release in on Wednesday. If you want to get the PR into the release, please address the review comments soon. Otherwise, this will slip and go into 2.7. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rgroothuijsen commented on a change in pull request #8664: KAFKA-9716: Clarify meaning of compression rate metrics
rgroothuijsen commented on a change in pull request #8664: URL: https://github.com/apache/kafka/pull/8664#discussion_r436973523 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/SenderMetricsRegistry.java ## @@ -84,7 +84,7 @@ public SenderMetricsRegistry(Metrics metrics) { this.batchSizeMax = createMetricName("batch-size-max", "The max number of bytes sent per partition per-request."); this.compressionRateAvg = createMetricName("compression-rate-avg", -"The average compression rate of record batches."); +"The average compressed-to-uncompressed size ratio of record batches."); Review comment: I was in fact going for brevity with my approach, but if more detail will mean increased clarity for the user then I'm all for it. With the way you've defined it, I think there can be little question regarding the meaning. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8834: MINOR: Do not disable heartbeat during Rebalance
abbccdda commented on a change in pull request #8834: URL: https://github.com/apache/kafka/pull/8834#discussion_r437110527 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ## @@ -604,6 +605,25 @@ public void testSyncGroupIllegalGenerationResponseWithOldGeneration() throws Int assertEquals(newGen, coordinator.generation()); } +@Test +public void testHeartbeatSentWhenRebalancing() throws Exception { +setupCoordinator(); +joinGroup(); + +final AbstractCoordinator.Generation currGen = coordinator.generation(); + +coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING); + +// the heartbeat thread should be sent out during a rebalance Review comment: remove `thread` ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -652,9 +651,10 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture fut } else if (error == Errors.MEMBER_ID_REQUIRED) { // Broker requires a concrete member id to be allowed to join the group. Update member id // and send another join group request in next cycle. +String memberId = joinResponse.data().memberId(); +log.debug("Attempt to join group returned {} error. Will set the member id as {} and then rejoin", error, memberId); Review comment: There might be slight performance gain if we just say "Attempt to join group and receive member id required error." instead of passing in the error. ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -287,7 +287,7 @@ class GroupCoordinator(val brokerId: Int, group.currentState match { case PreparingRebalance => - updateMemberAndRebalance(group, member, protocols, responseCallback) + updateMemberAndRebalance(group, member, protocols, s"Member ${member.memberId} joining group during ${group.currentState}", responseCallback) Review comment: Just to confirm, this file only has logging changes right? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -1069,6 +1069,13 @@ private HeartbeatResponseHandler(final Generation generation) { public void handle(HeartbeatResponse heartbeatResponse, RequestFuture future) { sensors.heartbeatSensor.record(response.requestLatencyMs()); Errors error = heartbeatResponse.error(); + +if (state != MemberState.STABLE) { Review comment: We should still handle fatal exception IMHO, such as FencedInstanceIdException ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) { } private void recordRebalanceFailure() { -state = MemberState.UNJOINED; Review comment: Comment here as no better place: on L485 we have this logic: ``` if (joinFuture == null) { // fence off the heartbeat thread explicitly so that it cannot interfere with the join group. // Note that this must come after the call to onJoinPrepare since we must be able to continue // sending heartbeats if that callback takes some time. disableHeartbeatThread(); ``` As we are ensuring the heartbeat thread working during rebalance, will this case be a bit dangerous for heartbeat disabling? Maybe we could also do a check of member status here to decide whether to disable. ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ## @@ -604,6 +605,25 @@ public void testSyncGroupIllegalGenerationResponseWithOldGeneration() throws Int assertEquals(newGen, coordinator.generation()); } +@Test +public void testHeartbeatSentWhenRebalancing() throws Exception { +setupCoordinator(); +joinGroup(); + +final AbstractCoordinator.Generation currGen = coordinator.generation(); + +coordinator.setNewState(AbstractCoordinator.MemberState.REBALANCING); + +// the heartbeat thread should be sent out during a rebalance +mockTime.sleep(HEARTBEAT_INTERVAL_MS); +TestUtils.waitForCondition(() -> !mockClient.requests().isEmpty(), 2000, +"The heartbeat request was not sent"); +assertTrue(coordinator.heartbeat().hasInflight()); + +mockClient.respond(heartbeatResponse(Errors.REBALANCE_IN_PROGRESS)); Review comment: Why do we need to respond? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ##
[GitHub] [kafka] chia7712 opened a new pull request #8838: MINOR: fix flaky TopicCommandWithAdminClientTest.testDescribeUnderRep…
chia7712 opened a new pull request #8838: URL: https://github.com/apache/kafka/pull/8838 ```TopicCommandWithAdminClientTest.testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress``` fails frequently on my local. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy merged pull request #8800: MINOR: Fix incorrect GC log size with JDK9+
omkreddy merged pull request #8800: URL: https://github.com/apache/kafka/pull/8800 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Lucent-Wong edited a comment on pull request #8453: KAFKA-9841: Connector and Task duplicated when a worker join with old…
Lucent-Wong edited a comment on pull request #8453: URL: https://github.com/apache/kafka/pull/8453#issuecomment-641192263 Looks like the build failure is not related with my changes. > Task :streams:test-utils:integrationTest > Task :streams:upgrade-system-tests-0100:integrationTest > Task :streams:upgrade-system-tests-0101:integrationTest > Task :streams:upgrade-system-tests-0102:integrationTest > Task :streams:upgrade-system-tests-0110:integrationTest > Task :streams:upgrade-system-tests-10:integrationTest > Task :streams:upgrade-system-tests-11:integrationTest > Task :streams:upgrade-system-tests-20:integrationTest > Task :streams:upgrade-system-tests-21:integrationTest > Task :streams:upgrade-system-tests-22:integrationTest > Task :streams:upgrade-system-tests-23:integrationTest > Task :streams:upgrade-system-tests-24:integrationTest > Task :streams:upgrade-system-tests-25:integrationTest FAILURE: Build failed with an exception. * What went wrong: Execution failed for task ':streams:unitTest'. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #8805: KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group
kkonstantine commented on a change in pull request #8805: URL: https://github.com/apache/kafka/pull/8805#discussion_r436950305 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ## @@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that // can be used to calculate derived sets log.debug("Previous assignments: {}", previousAssignment); +int lastCompletedGenerationId = coordinator.lastCompletedGenerationId(); +if (previousGenerationId != lastCompletedGenerationId) { +log.debug("Emptying previous assignments due to generation mismatch between previous " ++ "generation ID {} and last completed generation ID {} since the last assignment: {}", +previousGenerationId, lastCompletedGenerationId, previousAssignment); Review comment: This is related to the leader's internal bookkeeping when it calculates a new assignment. It's not related to the tasks that a worker (even the leader) is actually running. Emptying/clearing the previous assignment might result in some tasks shuffling around, because the leader will calculate an assignment from scratch, but it doesn't affect running tasks. The new computed assignment will send assignment and/or revocations as needed based on a) what tasks each worker has reported running in this round and which tasks are configured in the config topic. Another way to say this is that the leader won't bother detecting lost tasks in this round. Every unassigned task will be treated as a new task. You are right on the log message not conveying that meaning exactly. How about: ``` log.debug("Clearing the view of previous assignments due to generation mismatch between " + "previous generation ID {} and last completed generation ID {}. ", previousGenerationId, lastCompletedGenerationId); log.debug("This can happen if the leader fails to sync the assignment within a " + "rebalancing round. The following view of previous assignments might be " + "outdated and will be ignored by the leader in the current computation of " + "new assignments. Possibly outdated previous assignments: {}", previousAssignment); ``` ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ## @@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that // can be used to calculate derived sets log.debug("Previous assignments: {}", previousAssignment); +int lastCompletedGenerationId = coordinator.lastCompletedGenerationId(); +if (previousGenerationId != lastCompletedGenerationId) { +log.debug("Emptying previous assignments due to generation mismatch between previous " ++ "generation ID {} and last completed generation ID {} since the last assignment: {}", +previousGenerationId, lastCompletedGenerationId, previousAssignment); Review comment: Also, given that the previous assignments are printed in `debug`, I think it makes sense to keep these log messages in debug as well. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ## @@ -361,6 +369,14 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments, log.debug("Found the following connectors and tasks missing from previous assignments: " + lostAssignments); +if (previousMembers.size() == memberConfigs.size() && scheduledRebalance <= 0) { +log.debug("Group size is same between rebalances. Lost assignments are probably due to lost SyncGroup " ++ "responses. Treating lost tasks as new tasks"); Review comment: How about: ``` log.debug("The number of workers remained the same between rebalances. The missing " + "assignments that the leader is detecting are probably due to some workers " + "failing to receive the new assignments in the previous rebalance. Will " + "reassign missing tasks as new tasks"); ``` ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ## @@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { // Base set: The pr
[GitHub] [kafka] guozhangwang merged pull request #8676: KAFKA-10005: Decouple RestoreListener from RestoreCallback
guozhangwang merged pull request #8676: URL: https://github.com/apache/kafka/pull/8676 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #8783: KAFKA-10063 UnsupportedOperation when querying cleaner metrics after …
hachikuji merged pull request #8783: URL: https://github.com/apache/kafka/pull/8783 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8676: KAFKA-10005: Decouple RestoreListener from RestoreCallback
ableegoldman commented on a change in pull request #8676: URL: https://github.com/apache/kafka/pull/8676#discussion_r436858916 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java ## @@ -34,7 +34,6 @@ /** * Unregisters and removes the passed in partitions from the set of changelogs * @param removedPartitions the set of partitions to remove - * @param triggerOnRestoreEnd whether to trigger the onRestoreEnd callback */ -void unregister(final Collection removedPartitions, final boolean triggerOnRestoreEnd); +void unregister(final Collection removedPartitions); Review comment: Nice, this was kind of an ugly hack to being with so I'm happy to see it go This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8821: [DO NOT MERGE] Reenable flaky EosBetaUpgradeIntegrationTest
mjsax commented on pull request #8821: URL: https://github.com/apache/kafka/pull/8821#issuecomment-641022736 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang opened a new pull request #8834: MINOR: Do not disable heartbeat during Rebalance
guozhangwang opened a new pull request #8834: URL: https://github.com/apache/kafka/pull/8834 1. Allow the heartbeat thread to send hb request during rebalance; in turn when handling responses if the state is not in STABLE ignore the error. 2. Piggy-backing a log4j improvement on the broker coordinator for triggering rebalance reason. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bob-barrett commented on a change in pull request #8239: KAFKA-9666: Don't increase transactional epoch when trying to fence if the log append fails
bob-barrett commented on a change in pull request #8239: URL: https://github.com/apache/kafka/pull/8239#discussion_r437077452 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ## @@ -487,6 +487,33 @@ class TransactionCoordinator(brokerId: Int, info(s"Aborting sending of transaction markers and returning $error error to client for $transactionalId's EndTransaction request of $txnMarkerResult, " + s"since appending $newMetadata to transaction log with coordinator epoch $coordinatorEpoch failed") + txnManager.getTransactionState(transactionalId).right.foreach { Review comment: Thanks for the suggestion, @hachikuji! I wound up taking that approach. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 opened a new pull request #8832: KAFKA-9377: Refactor StreamsPartitionAssignor Repartition Count Logic
feyman2016 opened a new pull request #8832: URL: https://github.com/apache/kafka/pull/8832 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org