[jira] [Updated] (KAFKA-8611) Add KStream#repartition operation
[ https://issues.apache.org/jira/browse/KAFKA-8611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Levani Kokhreidze updated KAFKA-8611: - Summary: Add KStream#repartition operation (was: Make topic optional when using through() operations in DSL) > Add KStream#repartition operation > - > > Key: KAFKA-8611 > URL: https://issues.apache.org/jira/browse/KAFKA-8611 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Levani Kokhreidze >Assignee: Levani Kokhreidze >Priority: Minor > Labels: kip > > When using DSL in Kafka Streams, data re-partition happens only when > key-changing operation is followed by stateful operation. On the other hand, > in DSL, stateful computation can happen using _transform()_ operation as > well. Problem with this approach is that, even if any upstream operation was > key-changing before calling _transform()_, no auto-repartition is triggered. > If repartitioning is required, a call to _through(String)_ should be > performed before _transform()_. With the current implementation, burden of > managing and creating the topic falls on user and introduces extra complexity > of managing Kafka Streams application. > KIP-221: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8641) Invalid value ogg_kafka_test_key for configuration value.deserializer: Class ogg_kafka_test_key could not be found.
chen qiang created KAFKA-8641: - Summary: Invalid value ogg_kafka_test_key for configuration value.deserializer: Class ogg_kafka_test_key could not be found. Key: KAFKA-8641 URL: https://issues.apache.org/jira/browse/KAFKA-8641 Project: Kafka Issue Type: Bug Reporter: chen qiang Invalid value ogg_kafka_test_key for configuration value.deserializer: Class ogg_kafka_test_key could not be found. at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) at org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:481) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:635) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:617) at cc.ewell.datatools.consumer.ConsumerInitRunnable.(ConsumerInitRunnable.java:59) at cc.ewell.datatools.consumer.ConsumerGroup.addInitConsumer(ConsumerGroup.java:80) at cc.ewell.datatools.service.impl.DataPushLinkServiceImpl.init(DataPushLinkServiceImpl.java:618) at cc.ewell.datatools.controller.DataPushLinkController.init(DataPushLinkController.java:180) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) for (int i = 0; i < 10; i++) { CacheUtils.consumerGroup().push("CONSUMER_" + topic + "_" + i, consumerInitThread); new Thread(consumerInitThread, "CONSUMER_" + topic + "_" + i).start(); } -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8641) Invalid value ogg_kafka_test_key for configuration value.deserializer: Class ogg_kafka_test_key could not be found.
[ https://issues.apache.org/jira/browse/KAFKA-8641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16881094#comment-16881094 ] chen qiang commented on KAFKA-8641: --- 我开启了10个消费者的线程,做为初始化的线程。调用这个方法时,报错了以上的错误。请问有关于多线程消费的配置吗 > Invalid value ogg_kafka_test_key for configuration value.deserializer: Class > ogg_kafka_test_key could not be found. > --- > > Key: KAFKA-8641 > URL: https://issues.apache.org/jira/browse/KAFKA-8641 > Project: Kafka > Issue Type: Bug >Reporter: chen qiang >Priority: Major > > Invalid value ogg_kafka_test_key for configuration value.deserializer: Class > ogg_kafka_test_key could not be found. > > at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) > at > org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:481) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:635) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:617) > at > cc.ewell.datatools.consumer.ConsumerInitRunnable.(ConsumerInitRunnable.java:59) > at > cc.ewell.datatools.consumer.ConsumerGroup.addInitConsumer(ConsumerGroup.java:80) > at > cc.ewell.datatools.service.impl.DataPushLinkServiceImpl.init(DataPushLinkServiceImpl.java:618) > at > cc.ewell.datatools.controller.DataPushLinkController.init(DataPushLinkController.java:180) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > for (int i = 0; i < 10; i++) { > CacheUtils.consumerGroup().push("CONSUMER_" + topic + "_" + i, > consumerInitThread); > new Thread(consumerInitThread, "CONSUMER_" + topic + "_" + i).start(); > } -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8633) Extra in generated documents
[ https://issues.apache.org/jira/browse/KAFKA-8633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16881110#comment-16881110 ] Weichu Liu commented on KAFKA-8633: --- Seems no one is responding, so I will make a PR to fix this. > Extra in generated documents > -- > > Key: KAFKA-8633 > URL: https://issues.apache.org/jira/browse/KAFKA-8633 > Project: Kafka > Issue Type: Task > Components: documentation >Reporter: Weichu Liu >Priority: Trivial > > The auto generated tables for all configurations (e.g. > https://kafka.apache.org/documentation/#brokerconfigs) are with 2 for > each cell. > e.g. the first row for broker configuration. > {noformat} > > zookeeper.connectSpecifies the ZooKeeper connection string > in the form hostname:port where host and port are the host and > port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes > when that ZooKeeper machine is down you can also specify multiple hosts in > the form hostname1:port1,hostname2:port2,hostname3:port3. > The server can also have a ZooKeeper chroot path as part of its ZooKeeper > connection string which puts its data under some path in the global ZooKeeper > namespace. For example to give a chroot path of /chroot/path you > would give the connection string as > hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.stringhighread-only > {noformat} > This is due to {{toHtmlTable}} function in > {{./clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java}} is > appending an extra "" in the code. > {code:java} > for (String headerName : headers()) { > addColumnValue(b, getConfigValue(key, headerName)); > b.append(""); > } > {code} > (The addColumnValue already wrap the value with and ) > This is very minor issue, but it will prevent an html parser to properly > fetch table data (like what I was trying to do) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8633) Extra in generated documents
[ https://issues.apache.org/jira/browse/KAFKA-8633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16881135#comment-16881135 ] ASF GitHub Bot commented on KAFKA-8633: --- weichuliu commented on pull request #7056: KAFKA-8633: Fix Auto Generated Kafka Configuration Docs URL: https://github.com/apache/kafka/pull/7056 *More detailed description of your change, This PR fixes 2 things for auto-generated Kafka documents, namely configuration table part. 1. Remove the duplicated /td tag. 2. Change angle brackets in some description into square brackets, so browsers can display properly. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extra in generated documents > -- > > Key: KAFKA-8633 > URL: https://issues.apache.org/jira/browse/KAFKA-8633 > Project: Kafka > Issue Type: Task > Components: documentation >Reporter: Weichu Liu >Priority: Trivial > > The auto generated tables for all configurations (e.g. > https://kafka.apache.org/documentation/#brokerconfigs) are with 2 for > each cell. > e.g. the first row for broker configuration. > {noformat} > > zookeeper.connectSpecifies the ZooKeeper connection string > in the form hostname:port where host and port are the host and > port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes > when that ZooKeeper machine is down you can also specify multiple hosts in > the form hostname1:port1,hostname2:port2,hostname3:port3. > The server can also have a ZooKeeper chroot path as part of its ZooKeeper > connection string which puts its data under some path in the global ZooKeeper > namespace. For example to give a chroot path of /chroot/path you > would give the connection string as > hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.stringhighread-only > {noformat} > This is due to {{toHtmlTable}} function in > {{./clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java}} is > appending an extra "" in the code. > {code:java} > for (String headerName : headers()) { > addColumnValue(b, getConfigValue(key, headerName)); > b.append(""); > } > {code} > (The addColumnValue already wrap the value with and ) > This is very minor issue, but it will prevent an html parser to properly > fetch table data (like what I was trying to do) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8633) Extra in generated documents
[ https://issues.apache.org/jira/browse/KAFKA-8633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weichu Liu updated KAFKA-8633: -- Description: The auto generated tables for all configurations (e.g. https://kafka.apache.org/documentation/#brokerconfigs) are with 2 for each cell. e.g. the first row for broker configuration. {noformat} zookeeper.connectSpecifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3. The server can also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. For example to give a chroot path of /chroot/path you would give the connection string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.stringhighread-only {noformat} This is due to {{toHtmlTable}} function in {{./clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java}} is appending an extra "" in the code. {code:java} for (String headerName : headers()) { addColumnValue(b, getConfigValue(key, headerName)); b.append(""); } {code} (The addColumnValue already wrap the value with and ) This is very minor issue, but it will prevent an html parser to properly fetch table data (like what I was trying to do) -- Update: I also found another glitch in the doc: Some configuration are using '<>' in the string, but they are recognized as html tags so the description is not properly displayed. For example, the {{client.id}} of [Kafka Streams Configs|https://kafka.apache.org/documentation/#streamsconfigs] displays > An ID prefix string used for the client IDs of internal consumer, producer > and restore-consumer, with pattern '-StreamThread--'. However it should be > with pattern > '-StreamThread--'. I feel the fastest way is to avoid angle brackets at all. was: The auto generated tables for all configurations (e.g. https://kafka.apache.org/documentation/#brokerconfigs) are with 2 for each cell. e.g. the first row for broker configuration. {noformat} zookeeper.connectSpecifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3. The server can also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. For example to give a chroot path of /chroot/path you would give the connection string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.stringhighread-only {noformat} This is due to {{toHtmlTable}} function in {{./clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java}} is appending an extra "" in the code. {code:java} for (String headerName : headers()) { addColumnValue(b, getConfigValue(key, headerName)); b.append(""); } {code} (The addColumnValue already wrap the value with and ) This is very minor issue, but it will prevent an html parser to properly fetch table data (like what I was trying to do) > Extra in generated documents > -- > > Key: KAFKA-8633 > URL: https://issues.apache.org/jira/browse/KAFKA-8633 > Project: Kafka > Issue Type: Task > Components: documentation >Reporter: Weichu Liu >Priority: Trivial > > The auto generated tables for all configurations (e.g. > https://kafka.apache.org/documentation/#brokerconfigs) are with 2 for > each cell. > e.g. the first row for broker configuration. > {noformat} > > zookeeper.connectSpecifies the ZooKeeper connection string > in the form hostname:port where host and port are the host and > port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes > when that ZooKeeper machine is down you can also specify multiple hosts in > the form hostname1:port1,hostname2:port2,hostname3:port3. > The server can also have a ZooKeeper chroot path as part of its ZooKeeper > connection string which puts its data under some path in the global ZooKeeper > namespace. For example to give a chroot path of /chroot/path you > would give the connection string as > hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.stringhighread-only > {noformat} > This is due to {{toHtmlTable}} function in > {{./clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java}} is > appending an extra
[jira] [Updated] (KAFKA-8633) Extra in generated documents
[ https://issues.apache.org/jira/browse/KAFKA-8633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weichu Liu updated KAFKA-8633: -- Description: The auto generated tables for all configurations (e.g. [https://kafka.apache.org/documentation/#brokerconfigs]) are with 2 for each cell. e.g. the first row for broker configuration. {noformat} zookeeper.connectSpecifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3. The server can also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. For example to give a chroot path of /chroot/path you would give the connection string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.stringhighread-only {noformat} This is due to {{toHtmlTable}} function in {{./clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java}} is appending an extra "" in the code. {code:java} for (String headerName : headers()) { addColumnValue(b, getConfigValue(key, headerName)); b.append(""); } {code} (The addColumnValue already wrap the value with and ) This is very minor issue, but it will prevent an html parser to properly fetch table data (like what I was trying to do) -- Update: I also found another glitch in the doc: Some configuration are using '<>' in the string, but they are recognized as html tags so the description is not properly displayed. For example, the {{client.id}} of [Kafka Streams Configs|https://kafka.apache.org/documentation/#streamsconfigs] displays {noformat} An ID prefix string used for the client IDs of internal consumer, producer and restore-consumer, with pattern '-StreamThread--'. {noformat} However it should be {noformat} with pattern '-StreamThread--'. {noformat} I feel the fastest way is to avoid angle brackets at all. was: The auto generated tables for all configurations (e.g. https://kafka.apache.org/documentation/#brokerconfigs) are with 2 for each cell. e.g. the first row for broker configuration. {noformat} zookeeper.connectSpecifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3. The server can also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. For example to give a chroot path of /chroot/path you would give the connection string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.stringhighread-only {noformat} This is due to {{toHtmlTable}} function in {{./clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java}} is appending an extra "" in the code. {code:java} for (String headerName : headers()) { addColumnValue(b, getConfigValue(key, headerName)); b.append(""); } {code} (The addColumnValue already wrap the value with and ) This is very minor issue, but it will prevent an html parser to properly fetch table data (like what I was trying to do) -- Update: I also found another glitch in the doc: Some configuration are using '<>' in the string, but they are recognized as html tags so the description is not properly displayed. For example, the {{client.id}} of [Kafka Streams Configs|https://kafka.apache.org/documentation/#streamsconfigs] displays > An ID prefix string used for the client IDs of internal consumer, producer > and restore-consumer, with pattern '-StreamThread--'. However it should be > with pattern > '-StreamThread--'. I feel the fastest way is to avoid angle brackets at all. > Extra in generated documents > -- > > Key: KAFKA-8633 > URL: https://issues.apache.org/jira/browse/KAFKA-8633 > Project: Kafka > Issue Type: Task > Components: documentation >Reporter: Weichu Liu >Priority: Trivial > > The auto generated tables for all configurations (e.g. > [https://kafka.apache.org/documentation/#brokerconfigs]) are with 2 for > each cell. > e.g. the first row for broker configuration. > {noformat} > > zookeeper.connectSpecifies the ZooKeeper connection string > in the form hostname:port where host and port are the host and > port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes > when that ZooKeeper machine is
[jira] [Assigned] (KAFKA-8615) Change to track partition time breaks TimestampExtractor
[ https://issues.apache.org/jira/browse/KAFKA-8615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-8615: -- Assignee: Sophie Blee-Goldman (was: Bill Bejeck) > Change to track partition time breaks TimestampExtractor > > > Key: KAFKA-8615 > URL: https://issues.apache.org/jira/browse/KAFKA-8615 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: Bill Bejeck >Assignee: Sophie Blee-Goldman >Priority: Major > > From the users mailing list, *UPDATED* by Jonathan Santilli: > {noformat} > Am testing the new version 2.3 for Kafka Streams specifically. I have noticed > that now, the implementation of the method extract from the > interface org.apache.kafka.streams.processor.TimestampExtractor: > public class OwnTimeExtractor implements TimestampExtractor { > @Override > public long extract(final ConsumerRecord record, final > long previousTimestamp) { > // previousTimestamp is always == -1. For version 2.3 > } > } > Previous version 2.2.1 was returning the correct value for the record > partition. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8638) Preferred Leader Blacklist (deprioritized list)
[ https://issues.apache.org/jira/browse/KAFKA-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16881421#comment-16881421 ] Jose Armando Garcia Sancio commented on KAFKA-8638: --- Hi George, Thanks for the issue. I see that you have tried a reassignment where the assigned replicas stay the same but the order the replicas is changed. How is managing this more or less tedious that managing this "deprioritized list"? How do you see the user managing this "deprioritized list"? For example, how do you see the user determining which brokers should be added and removed from this list? Thanks! > Preferred Leader Blacklist (deprioritized list) > --- > > Key: KAFKA-8638 > URL: https://issues.apache.org/jira/browse/KAFKA-8638 > Project: Kafka > Issue Type: Improvement > Components: config, controller, core >Affects Versions: 1.1.1, 2.3.0, 2.2.1 >Reporter: GEORGE LI >Assignee: GEORGE LI >Priority: Major > > Currently, the kafka preferred leader election will pick the broker_id in the > topic/partition replica assignments in a priority order when the broker is in > ISR. The preferred leader is the broker id in the first position of replica. > There are use-cases that, even the first broker in the replica assignment is > in ISR, there is a need for it to be moved to the end of ordering (lowest > priority) when deciding leadership during preferred leader election. > Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred > leader. When preferred leadership is run, it will pick 1 as the leader if > it's ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, > then pick 3 as the leader. There are use cases that, even 1 is in ISR, we > would like it to be moved to the end of ordering (lowest priority) when > deciding leadership during preferred leader election. Below is a list of > use cases: > * (If broker_id 1 is a swapped failed host and brought up with last segments > or latest offset without historical data (There is another effort on this), > it's better for it to not serve leadership till it's caught-up. > * The cross-data center cluster has AWS instances which have less computing > power than the on-prem bare metal machines. We could put the AWS broker_ids > in Preferred Leader Blacklist, so on-prem brokers can be elected leaders, > without changing the reassignments ordering of the replicas. > * If the broker_id 1 is constantly losing leadership after some time: > "Flapping". we would want to exclude 1 to be a leader unless all other > brokers of this topic/partition are offline. The “Flapping” effect was seen > in the past when 2 or more brokers were bad, when they lost leadership > constantly/quickly, the sets of partition replicas they belong to will see > leadership constantly changing. The ultimate solution is to swap these bad > hosts. But for quick mitigation, we can also put the bad hosts in the > Preferred Leader Blacklist to move the priority of its being elected as > leaders to the lowest. > * If the controller is busy serving an extra load of metadata requests and > other tasks. we would like to put the controller's leaders to other brokers > to lower its CPU load. currently bouncing to lose leadership would not work > for Controller, because after the bounce, the controller fails over to > another broker. > * Avoid bouncing broker in order to lose its leadership: it would be good if > we have a way to specify which broker should be excluded from serving > traffic/leadership (without changing the replica assignment ordering by > reassignments, even though that's quick), and run preferred leader election. > A bouncing broker will cause temporary URP, and sometimes other issues. Also > a bouncing of broker (e.g. broker_id 1) can temporarily lose all its > leadership, but if another broker (e.g. broker_id 2) fails or gets bounced, > some of its leaderships will likely failover to broker_id 1 on a replica with > 3 brokers. If broker_id 1 is in the blacklist, then in such a scenario even > broker_id 2 offline, the 3rd broker can take leadership. > The current work-around of the above is to change the topic/partition's > replica reassignments to move the broker_id 1 from the first position to the > last position and run preferred leader election. e.g. (1, 2, 3) => (2, 3, 1). > This changes the replica reassignments, and we need to keep track of the > original one and restore if things change (e.g. controller fails over to > another broker, the swapped empty broker caught up). That’s a rather tedious > task. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3333) Alternative Partitioner to Support "Always Round-Robin" partitioning
[ https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16881468#comment-16881468 ] ASF GitHub Bot commented on KAFKA-: --- cmccabe commented on pull request #6771: KAFKA-: Adds RoundRobinPartitioner with tests URL: https://github.com/apache/kafka/pull/6771 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Alternative Partitioner to Support "Always Round-Robin" partitioning > > > Key: KAFKA- > URL: https://issues.apache.org/jira/browse/KAFKA- > Project: Kafka > Issue Type: New Feature > Components: clients >Reporter: Stephen Powis >Assignee: M. Manna >Priority: Major > Labels: kip > Fix For: 2.4.0 > > Original Estimate: 1h > Remaining Estimate: 1h > > KIP: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89070828] > Please Look into KAFKA-7358 for the official description ** > The > [DefaultPartitioner|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java] > typically distributes using the hash of the keybytes, and falls back to > round robin if there is no key. But there is currently no way to do Round > Robin partitioning if you have keys on your messages without writing your own > partitioning implementation. > I think it'd be helpful to have an implementation of straight Round Robin > partitioning included with the library. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8638) Preferred Leader Blacklist (deprioritized list)
[ https://issues.apache.org/jira/browse/KAFKA-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16881480#comment-16881480 ] GEORGE LI commented on KAFKA-8638: -- Hi Jose, Because a broker can have hundreds/thousands topic/partitions assigned to it. To do reassignments to move it to the end and lower the priority, then remember to the original ordering to restore later is much more tedious than simply put it in "deprioritized list" for some time, then remove it when certain conditions are improved/met. We have a Rebalance Tool which rebalance the whole cluster, it's better not keep changing the assignments replicas ordering constantly. With the "deprioritized list" , it's cleaner. Let's just take the use case of taking controller out of being leaders/serving traffic, and just as followers. We observed that broker not serving any leaders will have less CPU utilization. For clusters with busy controller doing extra work than other brokers, we would like it to not taking any leaders. Right now, for a broker to lose leadership, we need to bounce the broker. In this case, if bounce, the controller fails over to another broker. If we change the ordering of the current assignments for the controller, next time, the controller fails over, we need to do the same. For managing "deprioritized list", the user (e.g. the on-call engineer seeing issue with a broker that should not serve leadership traffic) should have the ability to add/remove it. My initial thought on how to store this "deprioritized list" is 2 approaches below: * Design #1: Introduce a Preferred Leader Blacklist. e.g. ZK path/node: /preferred_leader_blacklist/ Direct manipulation of ZK should be avoided as Kafka is moving toward RPC based. A new Request/Response RPC call is needed. No ZK Watcher of this ZK node children is needed to trigger leadership changes for the current design. * Design #2: Introduce a preferred_leader_blacklist dynamic config which by default is empty. It allows a list of broker IDs separated by commas. E.g. below broker ID 1, 10, 65 are being put into the blacklist. {code} /usr/lib/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config preferred_leader_blacklist=1,10,65 {code} Since the Kafka dynamic config is already using --bootstrap-server, it does not need to manipulate the Zookeeper directly. The downside of this: when adding/removing one broker from the list, instead of doing with one ZK node per broker in Design#1 above, the dynamic config needs to be updated with a new complete list. E.g. in order to remove broker 10 from the blacklist, update preferred_leader_blacklist=1,65 The dynamic config should not trigger any leadership changes for the current design. > Preferred Leader Blacklist (deprioritized list) > --- > > Key: KAFKA-8638 > URL: https://issues.apache.org/jira/browse/KAFKA-8638 > Project: Kafka > Issue Type: Improvement > Components: config, controller, core >Affects Versions: 1.1.1, 2.3.0, 2.2.1 >Reporter: GEORGE LI >Assignee: GEORGE LI >Priority: Major > > Currently, the kafka preferred leader election will pick the broker_id in the > topic/partition replica assignments in a priority order when the broker is in > ISR. The preferred leader is the broker id in the first position of replica. > There are use-cases that, even the first broker in the replica assignment is > in ISR, there is a need for it to be moved to the end of ordering (lowest > priority) when deciding leadership during preferred leader election. > Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred > leader. When preferred leadership is run, it will pick 1 as the leader if > it's ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, > then pick 3 as the leader. There are use cases that, even 1 is in ISR, we > would like it to be moved to the end of ordering (lowest priority) when > deciding leadership during preferred leader election. Below is a list of > use cases: > * (If broker_id 1 is a swapped failed host and brought up with last segments > or latest offset without historical data (There is another effort on this), > it's better for it to not serve leadership till it's caught-up. > * The cross-data center cluster has AWS instances which have less computing > power than the on-prem bare metal machines. We could put the AWS broker_ids > in Preferred Leader Blacklist, so on-prem brokers can be elected leaders, > without changing the reassignments ordering of the replicas. > * If the broker_id 1 is constantly losing leadership after some time: > "Flapping". we would want to exclude 1 to be a leader
[jira] [Commented] (KAFKA-8638) Preferred Leader Blacklist (deprioritized list)
[ https://issues.apache.org/jira/browse/KAFKA-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16881511#comment-16881511 ] Jose Armando Garcia Sancio commented on KAFKA-8638: --- Thanks for feedback and clear use cases [~sql_consulting]. > Preferred Leader Blacklist (deprioritized list) > --- > > Key: KAFKA-8638 > URL: https://issues.apache.org/jira/browse/KAFKA-8638 > Project: Kafka > Issue Type: Improvement > Components: config, controller, core >Affects Versions: 1.1.1, 2.3.0, 2.2.1 >Reporter: GEORGE LI >Assignee: GEORGE LI >Priority: Major > > Currently, the kafka preferred leader election will pick the broker_id in the > topic/partition replica assignments in a priority order when the broker is in > ISR. The preferred leader is the broker id in the first position of replica. > There are use-cases that, even the first broker in the replica assignment is > in ISR, there is a need for it to be moved to the end of ordering (lowest > priority) when deciding leadership during preferred leader election. > Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred > leader. When preferred leadership is run, it will pick 1 as the leader if > it's ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, > then pick 3 as the leader. There are use cases that, even 1 is in ISR, we > would like it to be moved to the end of ordering (lowest priority) when > deciding leadership during preferred leader election. Below is a list of > use cases: > * (If broker_id 1 is a swapped failed host and brought up with last segments > or latest offset without historical data (There is another effort on this), > it's better for it to not serve leadership till it's caught-up. > * The cross-data center cluster has AWS instances which have less computing > power than the on-prem bare metal machines. We could put the AWS broker_ids > in Preferred Leader Blacklist, so on-prem brokers can be elected leaders, > without changing the reassignments ordering of the replicas. > * If the broker_id 1 is constantly losing leadership after some time: > "Flapping". we would want to exclude 1 to be a leader unless all other > brokers of this topic/partition are offline. The “Flapping” effect was seen > in the past when 2 or more brokers were bad, when they lost leadership > constantly/quickly, the sets of partition replicas they belong to will see > leadership constantly changing. The ultimate solution is to swap these bad > hosts. But for quick mitigation, we can also put the bad hosts in the > Preferred Leader Blacklist to move the priority of its being elected as > leaders to the lowest. > * If the controller is busy serving an extra load of metadata requests and > other tasks. we would like to put the controller's leaders to other brokers > to lower its CPU load. currently bouncing to lose leadership would not work > for Controller, because after the bounce, the controller fails over to > another broker. > * Avoid bouncing broker in order to lose its leadership: it would be good if > we have a way to specify which broker should be excluded from serving > traffic/leadership (without changing the replica assignment ordering by > reassignments, even though that's quick), and run preferred leader election. > A bouncing broker will cause temporary URP, and sometimes other issues. Also > a bouncing of broker (e.g. broker_id 1) can temporarily lose all its > leadership, but if another broker (e.g. broker_id 2) fails or gets bounced, > some of its leaderships will likely failover to broker_id 1 on a replica with > 3 brokers. If broker_id 1 is in the blacklist, then in such a scenario even > broker_id 2 offline, the 3rd broker can take leadership. > The current work-around of the above is to change the topic/partition's > replica reassignments to move the broker_id 1 from the first position to the > last position and run preferred leader election. e.g. (1, 2, 3) => (2, 3, 1). > This changes the replica reassignments, and we need to keep track of the > original one and restore if things change (e.g. controller fails over to > another broker, the swapped empty broker caught up). That’s a rather tedious > task. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8642) Send LeaveGroupRequest for static members when reaching `max.poll.interval.ms`
Boyang Chen created KAFKA-8642: -- Summary: Send LeaveGroupRequest for static members when reaching `max.poll.interval.ms` Key: KAFKA-8642 URL: https://issues.apache.org/jira/browse/KAFKA-8642 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen Assignee: Boyang Chen Static members don't leave group explicitly. However, when the progress of static member is going low, it might be favorable to let it leave the group to leverage rebalance to shuffle assignment and become progressive again. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException
[ https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16881603#comment-16881603 ] Sophie Blee-Goldman commented on KAFKA-8630: Yep, that's the KIP. There was some ongoing discussion but I think it got dropped on a floor a little while back as people became busy. You might be able to kick start discussion again with this as motivation – someone on the mailing list had a similar problem, but wanted to use key-value stores so for the time being they can get by with just MockProcessorContext. Of course, even though some stores don't currently need the InternalProcessorContext yet, we are actively adding more metrics and this may not be true for long. It's worth solving this holistically. That's probably a lot more work than we need to do here. You're right about the StreamsMetrics vs StreamsMetricsImpl – the window store, for example, needs to access the storeLevelSensor method, but we don't want to expose that. I agree it's pretty annoying that all this boils down to is needing to access some internal metrics/sensors, but I do think the right way to solve it is on the test side. From the source code side, everything is working as it should – it's the MockProcessorContext that fails to adequately 'mock' the real thing. I think some light refactoring to have the contexts implement some "ProcessorContextMetrics" interface could be accomplished pretty easily > Unit testing a streams processor with a WindowStore throws a > ClassCastException > --- > > Key: KAFKA-8630 > URL: https://issues.apache.org/jira/browse/KAFKA-8630 > Project: Kafka > Issue Type: Bug > Components: streams-test-utils >Affects Versions: 2.3.0 >Reporter: Justin Fetherolf >Priority: Major > > I was attempting to write a unit test for a class implementing the > {{Processor}} interface that contained a {{WindowStore}}, but running the > test fails with a {{ClassCastException}} coming out of > {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to > {{InternalProcessorContext}}. > Minimal code to reproduce: > {code:java} > package com.cantgetthistowork; > import org.apache.kafka.streams.processor.Processor; > import org.apache.kafka.streams.processor.ProcessorContext; > import org.apache.kafka.streams.state.WindowStore; > public class InMemWindowProcessor implements Processor { > private ProcessorContext context; > private WindowStore windowStore; > @Override > public void init(ProcessorContext context) { > this.context = context; > windowStore = (WindowStore) > context.getStateStore("my-win-store"); > } > @Override > public void process(String key, String value) { > } > @Override > public void close() { > } > } > {code} > {code:java} > package com.cantgetthistowork; > import java.time.Duration; > import java.time.Instant; > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.streams.processor.MockProcessorContext; > import org.apache.kafka.streams.state.Stores; > import org.apache.kafka.streams.state.WindowStore; > import org.junit.Before; > import org.junit.Test; > public class InMemWindowProcessorTest { > InMemWindowProcessor processor = null; > MockProcessorContext context = null; > @Before > public void setup() { > processor = new InMemWindowProcessor(); > context = new MockProcessorContext(); > WindowStore store = > Stores.windowStoreBuilder( > Stores.inMemoryWindowStore( > "my-win-store", > Duration.ofMinutes(10), > Duration.ofSeconds(10), > false > ), > Serdes.String(), > Serdes.String() > ) > .withLoggingDisabled() > .build(); > store.init(context, store); > context.register(store, null); > processor.init(context); > } > @Test > public void testThings() { > Instant baseTime = Instant.now(); > context.setTimestamp(baseTime.toEpochMilli()); > context.setTopic("topic-name"); > processor.process("key1", "value1"); > } > } > {code} > > I was trying this with maven, with mvn --version outputting: > {noformat} > Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; > 2017-04-03T13:39:06-06:00) > Maven home: ~/opt/apache-maven-3.5.0 > Java version: 1.8.0_212, vendor: Oracle Corporation > Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre > Default locale: en_US, platform encoding: UTF-8 > OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: > "unix"{noformat} > And finally the stack trace: > {noformat} > --- > T E S T S > --- > Running com.cantgetthistowork.InMemWindo
[jira] [Created] (KAFKA-8643) Incompatible MemberDescription constructor change
Boyang Chen created KAFKA-8643: -- Summary: Incompatible MemberDescription constructor change Key: KAFKA-8643 URL: https://issues.apache.org/jira/browse/KAFKA-8643 Project: Kafka Issue Type: Bug Components: consumer Reporter: Boyang Chen Assignee: Boyang Chen Accidentally deleted the existing public constructor interface in the MemberDescription. Need to bring back the old constructors for compatibility. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7788) Support null defaults in KAFKA-7609 RPC specifications
[ https://issues.apache.org/jira/browse/KAFKA-7788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin P. McCabe reassigned KAFKA-7788: -- Assignee: Colin P. McCabe > Support null defaults in KAFKA-7609 RPC specifications > -- > > Key: KAFKA-7788 > URL: https://issues.apache.org/jira/browse/KAFKA-7788 > Project: Kafka > Issue Type: Sub-task >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe >Priority: Minor > > It would be nice if we could support null values as defaults in the > KAFKA-7609 RPC specification files. null defaults should be allowed only if > the field is nullable in all supported versions of the field. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7788) Support null defaults in KAFKA-7609 RPC specifications
[ https://issues.apache.org/jira/browse/KAFKA-7788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin P. McCabe resolved KAFKA-7788. Resolution: Duplicate > Support null defaults in KAFKA-7609 RPC specifications > -- > > Key: KAFKA-7788 > URL: https://issues.apache.org/jira/browse/KAFKA-7788 > Project: Kafka > Issue Type: Sub-task >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe >Priority: Minor > > It would be nice if we could support null values as defaults in the > KAFKA-7609 RPC specification files. null defaults should be allowed only if > the field is nullable in all supported versions of the field. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8644) The Kafka protocol generator should allow null defaults for bytes and array fields
Colin P. McCabe created KAFKA-8644: -- Summary: The Kafka protocol generator should allow null defaults for bytes and array fields Key: KAFKA-8644 URL: https://issues.apache.org/jira/browse/KAFKA-8644 Project: Kafka Issue Type: Improvement Reporter: Colin P. McCabe Assignee: Colin P. McCabe The Kafka protocol generator should allow null defaults for bytes and array fields. Currently, null defaults are only allowed for string fields. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8644) The Kafka protocol generator should allow null defaults for bytes and array fields
[ https://issues.apache.org/jira/browse/KAFKA-8644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16881611#comment-16881611 ] ASF GitHub Bot commented on KAFKA-8644: --- cmccabe commented on pull request #7059: KAFKA-8644. The Kafka protocol generator should allow null defaults for bytes and array fields URL: https://github.com/apache/kafka/pull/7059 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The Kafka protocol generator should allow null defaults for bytes and array > fields > -- > > Key: KAFKA-8644 > URL: https://issues.apache.org/jira/browse/KAFKA-8644 > Project: Kafka > Issue Type: Improvement >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe >Priority: Major > > The Kafka protocol generator should allow null defaults for bytes and array > fields. Currently, null defaults are only allowed for string fields. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8643) Incompatible MemberDescription constructor change
[ https://issues.apache.org/jira/browse/KAFKA-8643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16881613#comment-16881613 ] ASF GitHub Bot commented on KAFKA-8643: --- abbccdda commented on pull request #7060: KAFKA-8643: bring back public MemberDescription constructor URL: https://github.com/apache/kafka/pull/7060 a compatibility fix aiming to avoid breaking user's code by accidentally removing a public constructor. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Incompatible MemberDescription constructor change > - > > Key: KAFKA-8643 > URL: https://issues.apache.org/jira/browse/KAFKA-8643 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Accidentally deleted the existing public constructor interface in the > MemberDescription. Need to bring back the old constructors for compatibility. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8642) Send LeaveGroupRequest for static members when reaching `max.poll.interval.ms`
[ https://issues.apache.org/jira/browse/KAFKA-8642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-8642. Resolution: Invalid Synced with [~hachikuji], the session timeout should always be smaller than the max.poll.interval, as if we could tolerant a long unavailability for a consumer such like 10 minutes, then it makes no sense to expect itself making progress every 5 minutes. > Send LeaveGroupRequest for static members when reaching `max.poll.interval.ms` > -- > > Key: KAFKA-8642 > URL: https://issues.apache.org/jira/browse/KAFKA-8642 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Static members don't leave group explicitly. However, when the progress of > static member is going low, it might be favorable to let it leave the group > to leverage rebalance to shuffle assignment and become progressive again. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8640) Replace OffsetFetch request/response with automated protocol
[ https://issues.apache.org/jira/browse/KAFKA-8640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16881660#comment-16881660 ] ASF GitHub Bot commented on KAFKA-8640: --- abbccdda commented on pull request #7062: KAFKA-8640: Replace OffsetFetch request with automated protocol URL: https://github.com/apache/kafka/pull/7062 As title. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Replace OffsetFetch request/response with automated protocol > > > Key: KAFKA-8640 > URL: https://issues.apache.org/jira/browse/KAFKA-8640 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8645) Flakey test SaslSslAdminClientIntegrationTest#testElectUncleanLeadersAndNoop
Boyang Chen created KAFKA-8645: -- Summary: Flakey test SaslSslAdminClientIntegrationTest#testElectUncleanLeadersAndNoop Key: KAFKA-8645 URL: https://issues.apache.org/jira/browse/KAFKA-8645 Project: Kafka Issue Type: Bug Reporter: Boyang Chen *https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/155/console* *21:02:21* kafka.api.SaslSslAdminClientIntegrationTest > testElectUncleanLeadersAndNoop STARTED*21:03:14* kafka.api.SaslSslAdminClientIntegrationTest.testElectUncleanLeadersAndNoop failed, log available in /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/core/build/reports/testOutput/kafka.api.SaslSslAdminClientIntegrationTest.testElectUncleanLeadersAndNoop.test.stdout*21:03:14* *21:03:14* kafka.api.SaslSslAdminClientIntegrationTest > testElectUncleanLeadersAndNoop FAILED*21:03:14* java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.*21:03:14* *21:03:14* Caused by:*21:03:14* org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout. -- This message was sent by Atlassian JIRA (v7.6.3#76005)