[jira] [Updated] (KAFKA-6625) kafka offset reset when I upgrade kafka from 0.11.0 to 1.0.0 version
[ https://issues.apache.org/jira/browse/KAFKA-6625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] barry010 updated KAFKA-6625: Description: when I upgrade kafka from 0.11.0 to 1.0.0 version,during I shut down the broker01 with kill signal, update the code,and start the new broker01,I found that one of my consumer group “flume.mpns” consumerate rose to 10M/s。 And I got the information from __consumer_offsets-63 partition,I found the most of the partition's offset was reset in timestamp 1520316174188(2018/3/6 14:2:54)。this timestamp is between I shut down the broker01 and start the new broker01。 I have not set the auto.offset.reset,and this topic only one consumer group,with flume I put the three log ,consumer log,coordinator log,one __consumer_offsets info。 was: when I upgrade kafka from 0.11.0 to 1.0.0 version,during I shut down the broker01 with kill signal, update the code,and start the new broker01,I found that one of my consumer group consumerate rose to 10M/s。 And I got the information from __consumer_offsets-63 partition,I found the most of the partition's offset was reset in timestamp 1520316174188(2018/3/6 14:2:54)。this timestamp is between I shut down the broker01 and start the new broker01。 I have not set the auto.offset.reset,and this topic only one consumer group,with flume I put the three log ,consumer log,coordinator log,one __consumer_offsets info。 > kafka offset reset when I upgrade kafka from 0.11.0 to 1.0.0 version > > > Key: KAFKA-6625 > URL: https://issues.apache.org/jira/browse/KAFKA-6625 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 > Environment: flume 1.8 with Kafka client version : 0.9.0.1 > Kafka version : 0.11.0 >Reporter: barry010 >Priority: Critical > Attachments: consumer_offset-36.txt, flume.log.txt, server.log.txt > > > when I upgrade kafka from 0.11.0 to 1.0.0 version,during I shut down the > broker01 with kill signal, update the code,and start the new broker01,I found > that one of my consumer group “flume.mpns” consumerate rose to 10M/s。 > And I got the information from __consumer_offsets-63 partition,I found the > most of the partition's offset was reset in timestamp 1520316174188(2018/3/6 > 14:2:54)。this timestamp is between I shut down the broker01 and start the new > broker01。 > I have not set the auto.offset.reset,and this topic only one consumer > group,with flume > I put the three log ,consumer log,coordinator log,one __consumer_offsets > info。 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6626) Performance bottleneck in Kafka Connect sendRecords
[ https://issues.apache.org/jira/browse/KAFKA-6626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maciej Bryński updated KAFKA-6626: -- Description: Kafka Connect is using IdentityHashMap for storing records. [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L239] Unfortunately this solution is very slow (2-4 times slower than normal HashMap / HashSet). Benchmark result (code in attachment). {code:java} Identity 4220 Set 2115 Map 1941 Fast Set 2121 {code} Things are even worse when using default GC configuration (-server -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:InitiatingHeapOccupancyPercent=35 -Djava.awt.headless=true) {code:java} Identity 7885 Set 2364 Map 1548 Fast Set 1520 {code} Java version {code:java} java version "1.8.0_152" Java(TM) SE Runtime Environment (build 1.8.0_152-b16) Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode) {code} This problem is greatly slowing Kafka Connect. !image-2018-03-08-08-35-19-247.png! was: Kafka Connect is using IdentityHashMap for storing records. [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L239] Unfortunately this solution is very slow (2 times slower than normal HashMap / HashSet). Benchmark result (code in attachment). {code:java} Identity 4220 Set 2115 Map 1941 Fast Set 2121 {code} This problem is greatly slowing Kafka Connect. !image-2018-03-08-08-35-19-247.png! > Performance bottleneck in Kafka Connect sendRecords > --- > > Key: KAFKA-6626 > URL: https://issues.apache.org/jira/browse/KAFKA-6626 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Maciej Bryński >Priority: Major > Attachments: MapPerf.java, image-2018-03-08-08-35-19-247.png > > > Kafka Connect is using IdentityHashMap for storing records. > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L239] > Unfortunately this solution is very slow (2-4 times slower than normal > HashMap / HashSet). > Benchmark result (code in attachment). > {code:java} > Identity 4220 > Set 2115 > Map 1941 > Fast Set 2121 > {code} > Things are even worse when using default GC configuration > (-server -XX:+UseG1GC -XX:MaxGCPauseMillis=100 > -XX:InitiatingHeapOccupancyPercent=35 -Djava.awt.headless=true) > {code:java} > Identity 7885 > Set 2364 > Map 1548 > Fast Set 1520 > {code} > Java version > {code:java} > java version "1.8.0_152" > Java(TM) SE Runtime Environment (build 1.8.0_152-b16) > Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode) > {code} > This problem is greatly slowing Kafka Connect. > !image-2018-03-08-08-35-19-247.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6195) DNS alias support for secured connections
[ https://issues.apache.org/jira/browse/KAFKA-6195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16390947#comment-16390947 ] Rajini Sivaram commented on KAFKA-6195: --- In general, it is good to avoid configuration options if we can determine the behaviour automatically. But in this case, my preference is for a config option because a DNS lookup can add delays in some environments. Will be good to see if there are other opinions. SSL certificates can contain hostnames that are wildcarded, making it easy to add a single certificate for the cluster. If you are using IP addresses instead of hostnames, I believe you need to specify the full IP address, but you can have multiple addresses (or hostnames) in a single certificate. > DNS alias support for secured connections > - > > Key: KAFKA-6195 > URL: https://issues.apache.org/jira/browse/KAFKA-6195 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Jonathan Skrzypek >Priority: Major > > It seems clients can't use a dns alias in front of a secured Kafka cluster. > So applications can only specify a list of hosts or IPs in bootstrap.servers > instead of an alias encompassing all cluster nodes. > Using an alias in bootstrap.servers results in the following error : > javax.security.sasl.SaslException: An error: > (java.security.PrivilegedActionException: javax.security.sasl.SaslException: > GSS initiate failed [Caused by GSSException: No valid credentials provided > (Mechanism level: Fail to create credential. (63) - No service creds)]) > occurred when evaluating SASL token received from the Kafka Broker. Kafka > Client will go to AUTH_FAILED state. [Caused by > javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Fail to create > credential. (63) - No service creds)]] > When using SASL/Kerberos authentication, the kafka server principal is of the > form kafka@kafka/broker1.hostname@example.com > Kerberos requires that the hosts can be resolved by their FQDNs. > During SASL handshake, the client will create a SASL token and then send it > to kafka for auth. > But to create a SASL token the client first needs to be able to validate that > the broker's kerberos is a valid one. > There are 3 potential options : > 1. Creating a single kerberos principal not linked to a host but to an alias > and reference it in the broker jaas file. > But I think the kerberos infrastructure would refuse to validate it, so the > SASL handshake would still fail > 2. Modify the client bootstrap mechanism to detect whether bootstrap.servers > contains a dns alias. If it does, resolve and expand the alias to retrieve > all hostnames behind it and add them to the list of nodes. > This could be done by modifying parseAndValidateAddresses() in ClientUtils > 3. Add a cluster.alias parameter that would be handled by the logic above. > Having another parameter to avoid confusion on how bootstrap.servers works > behind the scene. > Thoughts ? > I would be happy to contribute the change for any of the options. > I believe the ability to use a dns alias instead of static lists of brokers > would bring good deployment flexibility. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6625) kafka offset reset when I upgrade kafka from 0.11.0 to 1.0.0 version
[ https://issues.apache.org/jira/browse/KAFKA-6625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] barry010 updated KAFKA-6625: Description: when I upgrade kafka from 0.11.0 to 1.0.0 version,during I shut down the broker01 with kill signal, update the code,and start the new broker01,I found that one of my consumer group “flume.mpns” consumption rate had risen to 10M/s。 And I got the information from __consumer_offsets-63 partition,I found the most of the partition's offset was reset in timestamp 1520316174188(2018/3/6 14:2:54)。this timestamp is between I shut down the broker01 and start the new broker01。 I have not set the auto.offset.reset,and this topic only one consumer group,with flume I put the three log ,consumer log,coordinator log,one __consumer_offsets info。 was: when I upgrade kafka from 0.11.0 to 1.0.0 version,during I shut down the broker01 with kill signal, update the code,and start the new broker01,I found that one of my consumer group “flume.mpns” consumerate rose to 10M/s。 And I got the information from __consumer_offsets-63 partition,I found the most of the partition's offset was reset in timestamp 1520316174188(2018/3/6 14:2:54)。this timestamp is between I shut down the broker01 and start the new broker01。 I have not set the auto.offset.reset,and this topic only one consumer group,with flume I put the three log ,consumer log,coordinator log,one __consumer_offsets info。 > kafka offset reset when I upgrade kafka from 0.11.0 to 1.0.0 version > > > Key: KAFKA-6625 > URL: https://issues.apache.org/jira/browse/KAFKA-6625 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 > Environment: flume 1.8 with Kafka client version : 0.9.0.1 > Kafka version : 0.11.0 >Reporter: barry010 >Priority: Critical > Attachments: consumer_offset-36.txt, flume.log.txt, server.log.txt > > > when I upgrade kafka from 0.11.0 to 1.0.0 version,during I shut down the > broker01 with kill signal, update the code,and start the new broker01,I found > that one of my consumer group “flume.mpns” consumption rate had risen to > 10M/s。 > And I got the information from __consumer_offsets-63 partition,I found the > most of the partition's offset was reset in timestamp 1520316174188(2018/3/6 > 14:2:54)。this timestamp is between I shut down the broker01 and start the new > broker01。 > I have not set the auto.offset.reset,and this topic only one consumer > group,with flume > I put the three log ,consumer log,coordinator log,one __consumer_offsets > info。 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6560) Use single-point queries than range queries for windowed aggregation operators
[ https://issues.apache.org/jira/browse/KAFKA-6560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391063#comment-16391063 ] ASF GitHub Bot commented on KAFKA-6560: --- dguy opened a new pull request #4665: KAFKA-6560: [FOLLOW-UP] don't deserialize null byte array in window store fetch URL: https://github.com/apache/kafka/pull/4665 If the result of a fetch from a Window Store results in a null byte array we should return null rather than passing it to the serde to deserialize. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Use single-point queries than range queries for windowed aggregation operators > -- > > Key: KAFKA-6560 > URL: https://issues.apache.org/jira/browse/KAFKA-6560 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Critical > Labels: needs-kip > Fix For: 1.2.0 > > > Today for windowed aggregations in Streams DSL, the underlying implementation > is leveraging the fetch(key, from, to) API to get all the related windows for > a single record to update. However, this is a very inefficient operation with > significant amount of CPU time iterating over window stores. On the other > hand, since the operator implementation itself have full knowledge of the > window specs it can actually translate this operation into multiple > single-point queries with the accurate window start timestamp, which would > largely reduce the overhead. > The proposed approach is to add a single fetch API to the WindowedStore and > use that in the KStreamWindowedAggregate / KStreamWindowedReduce operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3530) Making the broker-list option consistent across all tools
[ https://issues.apache.org/jira/browse/KAFKA-3530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391260#comment-16391260 ] Chia-Ping Tsai commented on KAFKA-3530: --- Any updates? Many peoples are confused by difference between broker-list(in producer) and bootstrap-server(in consumer) > Making the broker-list option consistent across all tools > - > > Key: KAFKA-3530 > URL: https://issues.apache.org/jira/browse/KAFKA-3530 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.1 >Reporter: Jun Rao >Assignee: Liquan Pei >Priority: Major > > Currently, console-producer uses --broker-list and console-consumer uses > --bootstrap-server. This can be confusing to the users. We should standardize > the name on all tools using broker list. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6598) Kafka to support using ETCD beside Zookeeper
[ https://issues.apache.org/jira/browse/KAFKA-6598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391312#comment-16391312 ] Tom Bentley commented on KAFKA-6598: [~cmccabe] any more info about that (such as when might the KIP be published?) > Kafka to support using ETCD beside Zookeeper > > > Key: KAFKA-6598 > URL: https://issues.apache.org/jira/browse/KAFKA-6598 > Project: Kafka > Issue Type: New Feature > Components: clients, core >Reporter: Sebastian Toader >Priority: Major > > The current Kafka implementation is bound to {{Zookeeper}} to store its > metadata for forming a cluster of nodes (producer/consumer/broker). > As Kafka is becoming popular for streaming in various environments where > {{Zookeeper}} is either not easy to deploy/manage or there are better > alternatives to it there is a need > to run Kafka with other metastore implementation than {{Zookeeper}}. > {{etcd}} can provide the same semantics as {{Zookeeper}} for Kafka and since > {{etcd}} is the favorable choice in certain environments (e.g. Kubernetes) > Kafka should be able to run with {{etcd}}. > From the user's point of view should be straightforward to configure to use > {{etcd}} by just simply specifying a connection string that point to {{etcd}} > cluster. > To avoid introducing instability the original interfaces should be kept and > only the low level {{Zookeeper}} API calls should be replaced with \{{etcd}} > API calls in case Kafka is configured > to use {{etcd}}. > On the long run (which is out of scope of this jira) there should be an > abstract layer in Kafka which then various metastore implementations would > implement. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6603) Kafka streams off heap memory usage does not match expected values from configuration
[ https://issues.apache.org/jira/browse/KAFKA-6603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391479#comment-16391479 ] Igor Calabria commented on KAFKA-6603: -- Yes, I was accessing the store directly and replacing the iterator with individual fetch calls really improved performance. > Kafka streams off heap memory usage does not match expected values from > configuration > - > > Key: KAFKA-6603 > URL: https://issues.apache.org/jira/browse/KAFKA-6603 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Igor Calabria >Priority: Minor > > Hi, I have a simple aggregation pipeline that's backed by the default state > store(rocksdb). The pipeline works fine except that off heap the memory usage > is way higher than expected. Following the > [documention|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config] > has some effect(memory usage is reduced) but the values don't match at all. > The java process is set to run with just `-Xmx300m -Xms300m` and rocksdb > config looks like this > {code:java} > tableConfig.setCacheIndexAndFilterBlocks(true); > tableConfig.setBlockCacheSize(1048576); //1MB > tableConfig.setBlockSize(16 * 1024); // 16KB > options.setTableFormatConfig(tableConfig); > options.setMaxWriteBufferNumber(2); > options.setWriteBufferSize(8 * 1024); // 8KB{code} > To estimate memory usage, I'm using this formula > {noformat} > (block_cache_size + write_buffer_size * write_buffer_number) * segments * > partitions{noformat} > Since my topic has 25 partitions with 3 segments each(it's a windowed store), > off heap memory usage should be about 76MB. What I'm seeing in production is > upwards of 300MB, even taking in consideration extra overhead from rocksdb > compaction threads, this seems a bit high (especially when the disk usage for > all files is just 1GB) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6603) Kafka streams off heap memory usage does not match expected values from configuration
[ https://issues.apache.org/jira/browse/KAFKA-6603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391509#comment-16391509 ] Guozhang Wang commented on KAFKA-6603: -- Awesome, thanks for confirming Igor. At the mean time, we are still benchmarking and investigating into RocksDB for possible optimizations. Stay tuned. > Kafka streams off heap memory usage does not match expected values from > configuration > - > > Key: KAFKA-6603 > URL: https://issues.apache.org/jira/browse/KAFKA-6603 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Igor Calabria >Priority: Minor > > Hi, I have a simple aggregation pipeline that's backed by the default state > store(rocksdb). The pipeline works fine except that off heap the memory usage > is way higher than expected. Following the > [documention|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config] > has some effect(memory usage is reduced) but the values don't match at all. > The java process is set to run with just `-Xmx300m -Xms300m` and rocksdb > config looks like this > {code:java} > tableConfig.setCacheIndexAndFilterBlocks(true); > tableConfig.setBlockCacheSize(1048576); //1MB > tableConfig.setBlockSize(16 * 1024); // 16KB > options.setTableFormatConfig(tableConfig); > options.setMaxWriteBufferNumber(2); > options.setWriteBufferSize(8 * 1024); // 8KB{code} > To estimate memory usage, I'm using this formula > {noformat} > (block_cache_size + write_buffer_size * write_buffer_number) * segments * > partitions{noformat} > Since my topic has 25 partitions with 3 segments each(it's a windowed store), > off heap memory usage should be about 76MB. What I'm seeing in production is > upwards of 300MB, even taking in consideration extra overhead from rocksdb > compaction threads, this seems a bit high (especially when the disk usage for > all files is just 1GB) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6627) Console producer default config values override explicitly provided properties
Jason Gustafson created KAFKA-6627: -- Summary: Console producer default config values override explicitly provided properties Key: KAFKA-6627 URL: https://issues.apache.org/jira/browse/KAFKA-6627 Project: Kafka Issue Type: Bug Components: tools Reporter: Jason Gustafson Some producer properties can be provided through custom parameters (e.g. {{\-\-request-required-acks}}) and explicitly through {{\-\-producer-property}}. At the moment, some of the custom parameters have default values which actually override explicitly provided properties. For example, if you set {{\-\-producer-property acks=all}} when starting the console producer, the argument will be ignored since {{\-\-request-required-acks}} has a default value. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6626) Performance bottleneck in Kafka Connect sendRecords
[ https://issues.apache.org/jira/browse/KAFKA-6626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maciej Bryński updated KAFKA-6626: -- Description: Kafka Connect is using IdentityHashMap for storing records. [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L239] Unfortunately this solution is very slow (2-4 times slower than normal HashMap / HashSet). Benchmark result (code in attachment). {code:java} Identity 4220 Set 2115 Map 1941 Fast Set 2121 {code} Things are even worse when using default GC configuration (-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -Djava.awt.headless=true) {code:java} Identity 7885 Set 2364 Map 1548 Fast Set 1520 {code} Java version {code:java} java version "1.8.0_152" Java(TM) SE Runtime Environment (build 1.8.0_152-b16) Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode) {code} This problem is greatly slowing Kafka Connect. !image-2018-03-08-08-35-19-247.png! was: Kafka Connect is using IdentityHashMap for storing records. [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L239] Unfortunately this solution is very slow (2-4 times slower than normal HashMap / HashSet). Benchmark result (code in attachment). {code:java} Identity 4220 Set 2115 Map 1941 Fast Set 2121 {code} Things are even worse when using default GC configuration (-server -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:InitiatingHeapOccupancyPercent=35 -Djava.awt.headless=true) {code:java} Identity 7885 Set 2364 Map 1548 Fast Set 1520 {code} Java version {code:java} java version "1.8.0_152" Java(TM) SE Runtime Environment (build 1.8.0_152-b16) Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode) {code} This problem is greatly slowing Kafka Connect. !image-2018-03-08-08-35-19-247.png! > Performance bottleneck in Kafka Connect sendRecords > --- > > Key: KAFKA-6626 > URL: https://issues.apache.org/jira/browse/KAFKA-6626 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Maciej Bryński >Priority: Major > Attachments: MapPerf.java, image-2018-03-08-08-35-19-247.png > > > Kafka Connect is using IdentityHashMap for storing records. > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L239] > Unfortunately this solution is very slow (2-4 times slower than normal > HashMap / HashSet). > Benchmark result (code in attachment). > {code:java} > Identity 4220 > Set 2115 > Map 1941 > Fast Set 2121 > {code} > Things are even worse when using default GC configuration > (-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 > -XX:InitiatingHeapOccupancyPercent=35 -Djava.awt.headless=true) > {code:java} > Identity 7885 > Set 2364 > Map 1548 > Fast Set 1520 > {code} > Java version > {code:java} > java version "1.8.0_152" > Java(TM) SE Runtime Environment (build 1.8.0_152-b16) > Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode) > {code} > This problem is greatly slowing Kafka Connect. > !image-2018-03-08-08-35-19-247.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3978) Cannot truncate to a negative offset (-1) exception at broker startup
[ https://issues.apache.org/jira/browse/KAFKA-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391698#comment-16391698 ] Dong Lin commented on KAFKA-3978: - I also encountered this issue and I have been looking at this all day yesterday. Still no clue how leader can sender FetchResponse with error=None and hw=-1 > Cannot truncate to a negative offset (-1) exception at broker startup > - > > Key: KAFKA-3978 > URL: https://issues.apache.org/jira/browse/KAFKA-3978 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.0 > Environment: 3.13.0-87-generic >Reporter: Juho Mäkinen >Priority: Critical > Labels: reliability, startup > > During broker startup sequence the broker server.log has this exception. > Problem persists after multiple restarts and also on another broker in the > cluster. > {code} > INFO [Socket Server on Broker 1002], Started 1 acceptor threads > (kafka.network.SocketServer) > INFO [Socket Server on Broker 1002], Started 1 acceptor threads > (kafka.network.SocketServer) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [GroupCoordinator 1002]: Starting up. > (kafka.coordinator.GroupCoordinator) > INFO [GroupCoordinator 1002]: Starting up. > (kafka.coordinator.GroupCoordinator) > INFO [GroupCoordinator 1002]: Startup complete. > (kafka.coordinator.GroupCoordinator) > INFO [GroupCoordinator 1002]: Startup complete. > (kafka.coordinator.GroupCoordinator) > INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 > milliseconds. (kafka.coordinator.GroupMetadataManager) > INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 > milliseconds. (kafka.coordinator.GroupMetadataManager) > INFO [ThrottledRequestReaper-Produce], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO [ThrottledRequestReaper-Produce], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO [ThrottledRequestReaper-Fetch], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO [ThrottledRequestReaper-Fetch], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO Will not load MX4J, mx4j-tools.jar is not in the classpath > (kafka.utils.Mx4jLoader$) > INFO Will not load MX4J, mx4j-tools.jar is not in the classpath > (kafka.utils.Mx4jLoader$) > INFO Creating /brokers/ids/1002 (is it secure? false) > (kafka.utils.ZKCheckedEphemeral) > INFO Creating /brokers/ids/1002 (is it secure? false) > (kafka.utils.ZKCheckedEphemeral) > INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) > INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) > INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: > PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils) > INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: > PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils) > INFO Kafka version : 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser) > INFO Kafka commitId : b8642491e78c5a13 > (org.apache.kafka.common.utils.AppInfoParser) > INFO [Kafka Server 1002], started (kafka.server.KafkaServer) > INFO [Kafka Server 1002], started (kafka.server.KafkaServer) > Error when handling request > {controller_id=1004,controller_epoch=1,partition_states=[..REALLY LONG OUTPUT > SNIPPED AWAY..], > live_leaders=[{id=1004,host=172.16.6.187,port=9092},{id=1003,host=172.16.2.21,port=9092}]} > (kafka.server.KafkaApis) > ERROR java.lang.IllegalArgumentException: Cannot truncate to a negative > offset (-1). > at kafka.log.Log.truncateTo(Log.scala:731) > at > kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:288) > at > kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:280) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) >
[jira] [Commented] (KAFKA-6473) Add MockProcessorContext to public test-utils
[ https://issues.apache.org/jira/browse/KAFKA-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391741#comment-16391741 ] John Roesler commented on KAFKA-6473: - I have created [KIP-267|https://cwiki.apache.org/confluence/display/KAFKA/KIP-267%3A+Add+Processor+Unit+Test+Support+to+Kafka+Streams+Test+Utils] to propose a solution to this. > Add MockProcessorContext to public test-utils > - > > Key: KAFKA-6473 > URL: https://issues.apache.org/jira/browse/KAFKA-6473 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: John Roesler >Priority: Major > Labels: needs-kip, user-experience > > With KIP-247, we added public test-utils artifact with a TopologyTestDriver > class. Using the test driver for a single > Processor/Transformer/ValueTransformer it's required to specify a whole > topology with source and sink and plus the > Processor/Transformer/ValueTransformer into it. > For unit testing, it might be more convenient to have a MockProcessorContext, > that can be used to test the Processor/Transformer/ValueTransformer in > isolation. Ie, the test itself creates new > Processor/Transformer/ValueTransformer object and calls init() manually > passing in the MockProcessorContext. > This is a public API change and requires a KIP: > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6626) Performance bottleneck in Kafka Connect sendRecords
[ https://issues.apache.org/jira/browse/KAFKA-6626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391818#comment-16391818 ] Ted Yu commented on KAFKA-6626: --- There are alternatives in https://github.com/carrotsearch/hppc and http://leventov.github.io/Koloboke/api/1.0/java8/index.html for their object-to-object maps. They're Apache 2 licensed. I wonder if we can utilize any of those. > Performance bottleneck in Kafka Connect sendRecords > --- > > Key: KAFKA-6626 > URL: https://issues.apache.org/jira/browse/KAFKA-6626 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Maciej Bryński >Priority: Major > Attachments: MapPerf.java, image-2018-03-08-08-35-19-247.png > > > Kafka Connect is using IdentityHashMap for storing records. > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L239] > Unfortunately this solution is very slow (2-4 times slower than normal > HashMap / HashSet). > Benchmark result (code in attachment). > {code:java} > Identity 4220 > Set 2115 > Map 1941 > Fast Set 2121 > {code} > Things are even worse when using default GC configuration > (-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 > -XX:InitiatingHeapOccupancyPercent=35 -Djava.awt.headless=true) > {code:java} > Identity 7885 > Set 2364 > Map 1548 > Fast Set 1520 > {code} > Java version > {code:java} > java version "1.8.0_152" > Java(TM) SE Runtime Environment (build 1.8.0_152-b16) > Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode) > {code} > This problem is greatly slowing Kafka Connect. > !image-2018-03-08-08-35-19-247.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6626) Performance bottleneck in Kafka Connect sendRecords
[ https://issues.apache.org/jira/browse/KAFKA-6626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391839#comment-16391839 ] Maciej Bryński commented on KAFKA-6626: --- I think even standard java HashMap can be used. The only thing I wonder is if we need to use IdentityHashMap (comparing records by instance, not by equals) [~ewencp] ? > Performance bottleneck in Kafka Connect sendRecords > --- > > Key: KAFKA-6626 > URL: https://issues.apache.org/jira/browse/KAFKA-6626 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Maciej Bryński >Priority: Major > Attachments: MapPerf.java, image-2018-03-08-08-35-19-247.png > > > Kafka Connect is using IdentityHashMap for storing records. > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L239] > Unfortunately this solution is very slow (2-4 times slower than normal > HashMap / HashSet). > Benchmark result (code in attachment). > {code:java} > Identity 4220 > Set 2115 > Map 1941 > Fast Set 2121 > {code} > Things are even worse when using default GC configuration > (-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 > -XX:InitiatingHeapOccupancyPercent=35 -Djava.awt.headless=true) > {code:java} > Identity 7885 > Set 2364 > Map 1548 > Fast Set 1520 > {code} > Java version > {code:java} > java version "1.8.0_152" > Java(TM) SE Runtime Environment (build 1.8.0_152-b16) > Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode) > {code} > This problem is greatly slowing Kafka Connect. > !image-2018-03-08-08-35-19-247.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3978) Cannot truncate to a negative offset (-1) exception at broker startup
[ https://issues.apache.org/jira/browse/KAFKA-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391863#comment-16391863 ] Dong Lin commented on KAFKA-3978: - [~junrao] [~hachikuji] [~ijuma] I have a good theory now. I will submit a patch to fix it. - Broker A is leader for partition P1 and broker B is follower for partition P1. - Broker C receives LeaderAndIsrRequest to become follower for P1. In broker C's memory, P1's highWatermarkMetadata = LogOffsetMetadata(hw=0, segmentBaseOffset=-1). Local replica's LEO for P1 is 0. - Broker C's ReplicaFetchRequest sends leader epoch request to broker A and then truncates its local replica's LEO to 100. - Broker C receives LeaderAndIsrRequest to become leader for P1 with ISR=(A,B,C). In broker C's memory, P1's highWatermarkMetadata = LogOffsetMetadata(hw=0, segmentBaseOffset=-1). And in broker C's memory, according to partition.makeLeader(), replica A's logEndOffsetMetadata is initialized to be LogOffsetMetadata.UnknownOffsetMetadata, which has HW= - 1 and segmentBaseOffset = 0. - Broker C receives FetchRequest from broker B. In Partition.maybeIncrementLeaderHW(), new highWaterMark will be derived with min(logEndOffset metadata of all replicas), which will be LogOffsetMetadata(HW=-1, segmentBaseOffset=0) because A's logEndOffset metadata is smallest. Because new hw's segmentBaseOffset > old hw's segmentBaseOffset, the high watermark is updated to be LogOffsetMetadata(hw=-1, segmentBaseOffset=100L). - Now we have a partition whose HW is negative, which cause problem for broker B when it fetches from broker C. > Cannot truncate to a negative offset (-1) exception at broker startup > - > > Key: KAFKA-3978 > URL: https://issues.apache.org/jira/browse/KAFKA-3978 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.0 > Environment: 3.13.0-87-generic >Reporter: Juho Mäkinen >Priority: Critical > Labels: reliability, startup > > During broker startup sequence the broker server.log has this exception. > Problem persists after multiple restarts and also on another broker in the > cluster. > {code} > INFO [Socket Server on Broker 1002], Started 1 acceptor threads > (kafka.network.SocketServer) > INFO [Socket Server on Broker 1002], Started 1 acceptor threads > (kafka.network.SocketServer) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [GroupCoordinator 1002]: Starting up. > (kafka.coordinator.GroupCoordinator) > INFO [GroupCoordinator 1002]: Starting up. > (kafka.coordinator.GroupCoordinator) > INFO [GroupCoordinator 1002]: Startup complete. > (kafka.coordinator.GroupCoordinator) > INFO [GroupCoordinator 1002]: Startup complete. > (kafka.coordinator.GroupCoordinator) > INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 > milliseconds. (kafka.coordinator.GroupMetadataManager) > INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 > milliseconds. (kafka.coordinator.GroupMetadataManager) > INFO [ThrottledRequestReaper-Produce], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO [ThrottledRequestReaper-Produce], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO [ThrottledRequestReaper-Fetch], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO [ThrottledRequestReaper-Fetch], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO Will not load MX4J, mx4j-tools.jar is not in the classpath > (kafka.utils.Mx4jLoader$) > INFO Will not load MX4J, mx4j-tools.jar is not in the classpath > (kafka.utils.Mx4jLoader$) > INFO Creating /brokers/ids/1002 (is it secure? false) > (kafka.utils.ZKCheckedEphemeral) > INFO Creating /brokers/ids/1002 (is it secure? false) > (kafka.utils.ZKCheckedEphemeral) > INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) > INFO Result of znode creation is: OK (kafka.utils.ZKChecked
[jira] [Commented] (KAFKA-6612) Added logic to prevent increasing partition counts during topic deletion
[ https://issues.apache.org/jira/browse/KAFKA-6612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391906#comment-16391906 ] ASF GitHub Bot commented on KAFKA-6612: --- gitlw closed pull request #4649: KAFKA-6612: Added logic to prevent increasing partition counts during topic deletion URL: https://github.com/apache/kafka/pull/4649 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Added logic to prevent increasing partition counts during topic deletion > > > Key: KAFKA-6612 > URL: https://issues.apache.org/jira/browse/KAFKA-6612 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Major > > Problem: trying to increase the partition count of a topic while the topic > deletion is in progress can cause the topic to be never deleted. > In the current code base, if a topic deletion is still in progress and the > partition count is increased, > the new partition and its replica assignment be created on zookeeper as data > of the path /brokers/topics/. > Upon detecting the change, the controller sees the topic is being deleted, > and therefore ignores the partition change. Therefore the zk path > /brokers/topics//partitions/ will NOT be created. > If a controller switch happens next, the added partition will be detected by > the new controller and stored in the > controllerContext.partitionReplicaAssignment. The new controller then tries > to delete the topic by first transitioning its replicas to OfflineReplica. > However the transition to OfflineReplica state will NOT succeed since there > is no leader for the partition. Since the only state change path for a > replica to be successfully deleted is OfflineReplica -> > ReplicaDeletionStarted -> ReplicaDeletionSuccessful, not being able to enter > the OfflineReplica state means the replica can never be successfully deleted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6612) Added logic to prevent increasing partition counts during topic deletion
[ https://issues.apache.org/jira/browse/KAFKA-6612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391913#comment-16391913 ] ASF GitHub Bot commented on KAFKA-6612: --- gitlw opened a new pull request #4666: KAFKA-6612: Added logic to prevent increasing partition counts during topic deletion URL: https://github.com/apache/kafka/pull/4666 This patch adds logic in handling the PartitionModifications event, so that if the partition count is increased when a topic deletion is still in progress, the controller will restore the data of the path /brokers/topics/"topic" to remove the added partitions. Testing done: Added a new test method to cover the bug ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Added logic to prevent increasing partition counts during topic deletion > > > Key: KAFKA-6612 > URL: https://issues.apache.org/jira/browse/KAFKA-6612 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Major > > Problem: trying to increase the partition count of a topic while the topic > deletion is in progress can cause the topic to be never deleted. > In the current code base, if a topic deletion is still in progress and the > partition count is increased, > the new partition and its replica assignment be created on zookeeper as data > of the path /brokers/topics/. > Upon detecting the change, the controller sees the topic is being deleted, > and therefore ignores the partition change. Therefore the zk path > /brokers/topics//partitions/ will NOT be created. > If a controller switch happens next, the added partition will be detected by > the new controller and stored in the > controllerContext.partitionReplicaAssignment. The new controller then tries > to delete the topic by first transitioning its replicas to OfflineReplica. > However the transition to OfflineReplica state will NOT succeed since there > is no leader for the partition. Since the only state change path for a > replica to be successfully deleted is OfflineReplica -> > ReplicaDeletionStarted -> ReplicaDeletionSuccessful, not being able to enter > the OfflineReplica state means the replica can never be successfully deleted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6560) Use single-point queries than range queries for windowed aggregation operators
[ https://issues.apache.org/jira/browse/KAFKA-6560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391932#comment-16391932 ] ASF GitHub Bot commented on KAFKA-6560: --- guozhangwang closed pull request #4665: KAFKA-6560: [FOLLOW-UP] don't deserialize null byte array in window store fetch URL: https://github.com/apache/kafka/pull/4665 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 15961e7c721..b131db59973 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -106,15 +106,15 @@ private Bytes keyBytes(final K key) { @Override public V fetch(final K key, final long timestamp) { final long startNs = time.nanoseconds(); -V ret; try { final byte[] result = inner.fetch(keyBytes(key), timestamp); -ret = serdes.valueFrom(result); +if (result == null) { +return null; +} +return serdes.valueFrom(result); } finally { metrics.recordLatency(this.fetchTime, startNs, time.nanoseconds()); } - -return ret; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index 58c345a7867..732f3d62a42 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -110,6 +110,9 @@ public void put(final K key, final V value, final long timestamp) { @Override public V fetch(final K key, final long timestamp) { final byte[] bytesValue = bytesStore.get(WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes)); +if (bytesValue == null) { +return null; +} return serdes.valueFrom(bytesValue); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java index 3ff343a7155..0e3b4e8223e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java @@ -81,6 +81,7 @@ public void setUp() { @After public void after() { +super.after(); context.close(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index bda385ebcb9..16ef47c1c1b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -28,7 +28,6 @@ import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; -import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.MockProcessorContext; @@ -76,7 +75,7 @@ public void setUp() { Segments.segmentInterval(retention, numSegments) ); cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); -context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, (RecordCollector) null, cache); +context = new MockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic")); cachingStore.init(context, cachingStore); } @@ -87,10 +86,6 @@ public void close() { cachingStore.close(); } -private Bytes bytesKey(final String key) { -return Bytes.wrap(key.getBytes()); -} - @Test public void shouldPutFetchFromCache() { cachingStore.put(new
[jira] [Created] (KAFKA-6628) RocksDBSegmentedBytesStoreTest does not cover time window serdes
Guozhang Wang created KAFKA-6628: Summary: RocksDBSegmentedBytesStoreTest does not cover time window serdes Key: KAFKA-6628 URL: https://issues.apache.org/jira/browse/KAFKA-6628 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang The RocksDBSegmentedBytesStoreTest.java only covers session window serdes, but not time window serdes. We should fill in this coverage gap. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6629) SegmentedCacheFunctionTest does not cover session window serdes
Guozhang Wang created KAFKA-6629: Summary: SegmentedCacheFunctionTest does not cover session window serdes Key: KAFKA-6629 URL: https://issues.apache.org/jira/browse/KAFKA-6629 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang The SegmentedCacheFunctionTest.java only covers time window serdes, but not session window serdes. We should fill in this coverage gap. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6630) Speed up the processing of StopReplicaResponse events on the controller
Lucas Wang created KAFKA-6630: - Summary: Speed up the processing of StopReplicaResponse events on the controller Key: KAFKA-6630 URL: https://issues.apache.org/jira/browse/KAFKA-6630 Project: Kafka Issue Type: Improvement Components: core Reporter: Lucas Wang Assignee: Lucas Wang Problem Statement: We find in a large cluster with many partition replicas, it takes a long time to successfully delete a topic. Root cause: Further analysis shows that for a topic with N replicas, the controller receives all the N StopReplicaResponses from brokers within a short time, however sequentially handling all the N TopicDeletionStopReplicaResponseReceived events one by one takes a long time. Specifically the functions triggered while handling every single TopicDeletionStopReplicaResponseReceived event include: TopicDeletionStopReplicaResponseReceived.process calls TopicDeletionManager.completeReplicaDeletion, which calls TopicDeletionManager.resumeDeletions, which calls several inefficient functions. The inefficient functions called inside TopicDeletionManager.resumeDeletions include ReplicaStateMachine.areAllReplicasForTopicDeleted ReplicaStateMachine.isAtLeastOneReplicaInDeletionStartedState ReplicaStateMachine.replicasInState Each of the 3 inefficient functions above will iterate through all the replicas in the cluster, and filter out the replicas belonging to a topic. In a large cluster with many replicas, these functions can be quite slow. Total deletion time for a topic becomes long in single threaded controller processing model: Since the controller needs to sequentially process the queued TopicDeletionStopReplicaResponseReceived events, if the time cost to process one event is t, the total time to process all events for all replicas of a topic is N * t. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6630) Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller
[ https://issues.apache.org/jira/browse/KAFKA-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6630: -- Summary: Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller (was: Speed up the processing of StopReplicaResponse events on the controller) > Speed up the processing of TopicDeletionStopReplicaResponseReceived events on > the controller > > > Key: KAFKA-6630 > URL: https://issues.apache.org/jira/browse/KAFKA-6630 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > > Problem Statement: > We find in a large cluster with many partition replicas, it takes a long time > to successfully delete a topic. > Root cause: > Further analysis shows that for a topic with N replicas, the controller > receives all the N StopReplicaResponses from brokers within a short time, > however sequentially handling all the N > TopicDeletionStopReplicaResponseReceived events one by one takes a long time. > Specifically the functions triggered while handling every single > TopicDeletionStopReplicaResponseReceived event include: > TopicDeletionStopReplicaResponseReceived.process calls > TopicDeletionManager.completeReplicaDeletion, which calls > TopicDeletionManager.resumeDeletions, which calls several inefficient > functions. > The inefficient functions called inside TopicDeletionManager.resumeDeletions > include > ReplicaStateMachine.areAllReplicasForTopicDeleted > ReplicaStateMachine.isAtLeastOneReplicaInDeletionStartedState > ReplicaStateMachine.replicasInState > Each of the 3 inefficient functions above will iterate through all the > replicas in the cluster, and filter out the replicas belonging to a topic. In > a large cluster with many replicas, these functions can be quite slow. > Total deletion time for a topic becomes long in single threaded controller > processing model: > Since the controller needs to sequentially process the queued > TopicDeletionStopReplicaResponseReceived events, if the time cost to process > one event is t, the total time to process all events for all replicas of a > topic is N * t. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6630) Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller
[ https://issues.apache.org/jira/browse/KAFKA-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392070#comment-16392070 ] ASF GitHub Bot commented on KAFKA-6630: --- gitlw opened a new pull request #4668: KAFKA-6630: Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller URL: https://github.com/apache/kafka/pull/4668 This patch tries to speed up the inefficient functions identified in Kafka-6630 by grouping partitions in the ControllerContext.partitionReplicaAssignment variable by topics. Hence trying to find all replicas for a topic won't need to go through all the replicas in the cluster. Passed all tests using "gradle testAll" ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up the processing of TopicDeletionStopReplicaResponseReceived events on > the controller > > > Key: KAFKA-6630 > URL: https://issues.apache.org/jira/browse/KAFKA-6630 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > > Problem Statement: > We find in a large cluster with many partition replicas, it takes a long time > to successfully delete a topic. > Root cause: > Further analysis shows that for a topic with N replicas, the controller > receives all the N StopReplicaResponses from brokers within a short time, > however sequentially handling all the N > TopicDeletionStopReplicaResponseReceived events one by one takes a long time. > Specifically the functions triggered while handling every single > TopicDeletionStopReplicaResponseReceived event include: > TopicDeletionStopReplicaResponseReceived.process calls > TopicDeletionManager.completeReplicaDeletion, which calls > TopicDeletionManager.resumeDeletions, which calls several inefficient > functions. > The inefficient functions called inside TopicDeletionManager.resumeDeletions > include > ReplicaStateMachine.areAllReplicasForTopicDeleted > ReplicaStateMachine.isAtLeastOneReplicaInDeletionStartedState > ReplicaStateMachine.replicasInState > Each of the 3 inefficient functions above will iterate through all the > replicas in the cluster, and filter out the replicas belonging to a topic. In > a large cluster with many replicas, these functions can be quite slow. > Total deletion time for a topic becomes long in single threaded controller > processing model: > Since the controller needs to sequentially process the queued > TopicDeletionStopReplicaResponseReceived events, if the time cost to process > one event is t, the total time to process all events for all replicas of a > topic is N * t. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6400) Consider setting default cache size to zero in Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-6400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392078#comment-16392078 ] Matthias J. Sax commented on KAFKA-6400: Yes. CACHE_MAX_BYTES_BUFFERING_CONFIG refers to the config. Note, that some people were also confuse that they did not see a single result for windowed aggregations if you disable caching. Thus, it might also be "bad" to set cache size back to zero by default. Before we make any change, we need to discuss the impact in detail. > Consider setting default cache size to zero in Kafka Streams > > > Key: KAFKA-6400 > URL: https://issues.apache.org/jira/browse/KAFKA-6400 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Priority: Minor > > Since the introduction of record caching in Kafka Streams DSL, we see regular > reports/questions of first times users about "Kafka Streams does not emit > anything" or "Kafka Streams loses messages". Those report are subject to > record caching but no bugs and indicate bad user experience. > We might consider setting the default cache size to zero to avoid those > issues and improve the experience for first time users. This hold especially > for simple word-count-demos (Note, many people don't copy out example > word-count but build their own first demo app.) > Remark: before we had caching, many users got confused about our update > semantics and that we emit an output record for each input record for > windowed aggregation (ie, please give me the "final" result"). Thus, we need > to consider this and judge with care to not go "forth and back" with default > user experience -- we did have less questions about this behavior lately. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6399) Consider reducing "max.poll.interval.ms" default for Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-6399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392079#comment-16392079 ] Matthias J. Sax commented on KAFKA-6399: \cc [~vvcephei] [~bbejeck] [~guozhang] [~damianguy] What is your take on this? > Consider reducing "max.poll.interval.ms" default for Kafka Streams > -- > > Key: KAFKA-6399 > URL: https://issues.apache.org/jira/browse/KAFKA-6399 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Khaireddine Rezgui >Priority: Minor > > In Kafka {{0.10.2.1}} we change the default value of > {{max.poll.intervall.ms}} for Kafka Streams to {{Integer.MAX_VALUE}}. The > reason was that long state restore phases during rebalance could yield > "rebalance storms" as consumers drop out of a consumer group even if they are > healthy as they didn't call {{poll()}} during state restore phase. > In version {{0.11}} and {{1.0}} the state restore logic was improved a lot > and thus, now Kafka Streams does call {{poll()}} even during restore phase. > Therefore, we might consider setting a smaller timeout for > {{max.poll.intervall.ms}} to detect bad behaving Kafka Streams applications > (ie, targeting user code) that don't make progress any more during regular > operations. > The open question would be, what a good default might be. Maybe the actual > consumer default of 30 seconds might be sufficient. During one {{poll()}} > roundtrip, we would only call {{restoreConsumer.poll()}} once and restore a > single batch of records. This should take way less time than 30 seconds. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6631) Kafka Streams - Rebalancing exception in Kafka 1.0.0
Alexander Ivanichev created KAFKA-6631: -- Summary: Kafka Streams - Rebalancing exception in Kafka 1.0.0 Key: KAFKA-6631 URL: https://issues.apache.org/jira/browse/KAFKA-6631 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.0 Environment: Container Linux by CoreOS 1576.5.0 Reporter: Alexander Ivanichev In Kafka Streams 1.0.0, we saw a strange rebalance error, our stream app performs window based aggregations, sometimes on start when all stream workers join the app just crash, however if we enable only one worker than it works fine, sometime 2 workers work just fine, but when third join the app crashes again, some critical issue with rebalance. {code:java} 018-03-08T18:51:01.226243000Z org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: The server experienced an unexpected error when processing the request 2018-03-08T18:51:01.226557000Z at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:566) 2018-03-08T18:51:01.22686Z at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:539) 2018-03-08T18:51:01.227328000Z at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) 2018-03-08T18:51:01.22763Z at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) 2018-03-08T18:51:01.228152000Z at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) 2018-03-08T18:51:01.228449000Z at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) 2018-03-08T18:51:01.228897000Z at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) 2018-03-08T18:51:01.229196000Z at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506) 2018-03-08T18:51:01.229673000Z at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) 2018-03-08T18:51:01.229971000Z at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268) 2018-03-08T18:51:01.230436000Z at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) 2018-03-08T18:51:01.230749000Z at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174) 2018-03-08T18:51:01.231065000Z at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364) 2018-03-08T18:51:01.231584000Z at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) 2018-03-08T18:51:01.231911000Z at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) 2018-03-08T18:51:01.23219Z at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138) 2018-03-08T18:51:01.232643000Z at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103) 2018-03-08T18:51:01.233121000Z at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851) 2018-03-08T18:51:01.233409000Z at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) 2018-03-08T18:51:01.23372Z at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) 2018-03-08T18:51:01.234196000Z at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) 2018-03-08T18:51:01.234655000Z org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: The server experienced an unexpected error when processing the request 2018-03-08T18:51:01.234972000Z exception in thread, closing process 2018-03-08T18:51:01.23550Z at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:566) 2018-03-08T18:51:01.235839000Z at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:539) 2018-03-08T18:51:01.236336000Z at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) 2018-03-08T18:51:01.236603000Z at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) 2018-03-08T18:51:01.236889000Z at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) 2018
[jira] [Commented] (KAFKA-6399) Consider reducing "max.poll.interval.ms" default for Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-6399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392105#comment-16392105 ] Guozhang Wang commented on KAFKA-6399: -- I'm +1 on changing this config's default from Integer.MAX_VALUE, but I'm on the fence for setting the default to 30 seconds. The reason is that in Streams, it is quite common to have stateful processing such that each record may take some time to process. I'd prefer setting the default to larger values, say 5 minute than 30 seconds. > Consider reducing "max.poll.interval.ms" default for Kafka Streams > -- > > Key: KAFKA-6399 > URL: https://issues.apache.org/jira/browse/KAFKA-6399 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Khaireddine Rezgui >Priority: Minor > > In Kafka {{0.10.2.1}} we change the default value of > {{max.poll.intervall.ms}} for Kafka Streams to {{Integer.MAX_VALUE}}. The > reason was that long state restore phases during rebalance could yield > "rebalance storms" as consumers drop out of a consumer group even if they are > healthy as they didn't call {{poll()}} during state restore phase. > In version {{0.11}} and {{1.0}} the state restore logic was improved a lot > and thus, now Kafka Streams does call {{poll()}} even during restore phase. > Therefore, we might consider setting a smaller timeout for > {{max.poll.intervall.ms}} to detect bad behaving Kafka Streams applications > (ie, targeting user code) that don't make progress any more during regular > operations. > The open question would be, what a good default might be. Maybe the actual > consumer default of 30 seconds might be sufficient. During one {{poll()}} > roundtrip, we would only call {{restoreConsumer.poll()}} once and restore a > single batch of records. This should take way less time than 30 seconds. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6539) KafkaConsumer endlessly try to connect to a dead broker, ignoring brokers alive
[ https://issues.apache.org/jira/browse/KAFKA-6539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392181#comment-16392181 ] Simon Trigona commented on KAFKA-6539: -- I don't currently have any technical details to add, but I experienced a similar issue with a producer endlessly trying to produce to a broker that was offline despite the other 2 brokers being online. > KafkaConsumer endlessly try to connect to a dead broker, ignoring brokers > alive > --- > > Key: KAFKA-6539 > URL: https://issues.apache.org/jira/browse/KAFKA-6539 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.0.0 > Environment: Java 8 > Brokers on CentOS 7.4 > Consumers on Windows 10 >Reporter: Song Younghwan >Priority: Major > Labels: windows > Attachments: consumer.log > > > I consider to use Kafka in my company, so currently doing failover test. > Conditions: > * org.apache.kafka:kafka-clients:1.0.0 > * New consumer using bootstrap.servers, a consumer group and a group > coordinator > * num. brokers = 3 (id #1, #2, #3) > * Topic num. partitions = 3, replication factor = 3 > * offsets.topic.replication.factor = 3 > Reproduction Step: > # Run consumers in the same consumer group, each of them subscribe to a topic > # Kill (kill -9) #1, #2 broker simultaneously (only #3 online) > # Consumers eventually connect to #3 broker > # Start #1, #2 broker again after a while (#1, #2, #3 online) > # Kill (kill -9) #2, #3 broker simultaneously (only #1 online) > # *{color:#FF}Now consumers endlessly try to connect to #3 broker > only{color}* > # Start #2 broker again after a while (#1, #2 online) > # *{color:#FF}Consumers still blindly try to connect to #3 broker{color}* > Expectation: > Consumers successfully connect to #1 broker after step 5. > Record: > I attached a consumer log file with TRACE log level. Related events below: > * 12:03:13 kills #1, #2 broker simultaneously > * 12:03:42 starts #1, #2 broker again > * 12:04:01 kills #2, #3 broker simultaneously > * 12:04:42 starts #2 broker again > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6627) Console producer default config values override explicitly provided properties
[ https://issues.apache.org/jira/browse/KAFKA-6627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392272#comment-16392272 ] huxihx commented on KAFKA-6627: --- Seems this is similar to what's reported in [KAFKA-2526|https://issues.apache.org/jira/browse/KAFKA-2526]. > Console producer default config values override explicitly provided properties > -- > > Key: KAFKA-6627 > URL: https://issues.apache.org/jira/browse/KAFKA-6627 > Project: Kafka > Issue Type: Bug > Components: tools >Reporter: Jason Gustafson >Priority: Minor > Labels: newbie > > Some producer properties can be provided through custom parameters (e.g. > {{\-\-request-required-acks}}) and explicitly through > {{\-\-producer-property}}. At the moment, some of the custom parameters have > default values which actually override explicitly provided properties. For > example, if you set {{\-\-producer-property acks=all}} when starting the > console producer, the argument will be ignored since > {{\-\-request-required-acks}} has a default value. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-3978) Cannot truncate to a negative offset (-1) exception at broker startup
[ https://issues.apache.org/jira/browse/KAFKA-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin reassigned KAFKA-3978: --- Assignee: Dong Lin > Cannot truncate to a negative offset (-1) exception at broker startup > - > > Key: KAFKA-3978 > URL: https://issues.apache.org/jira/browse/KAFKA-3978 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.0 > Environment: 3.13.0-87-generic >Reporter: Juho Mäkinen >Assignee: Dong Lin >Priority: Critical > Labels: reliability, startup > > During broker startup sequence the broker server.log has this exception. > Problem persists after multiple restarts and also on another broker in the > cluster. > {code} > INFO [Socket Server on Broker 1002], Started 1 acceptor threads > (kafka.network.SocketServer) > INFO [Socket Server on Broker 1002], Started 1 acceptor threads > (kafka.network.SocketServer) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [ExpirationReaper-1002], Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > INFO [GroupCoordinator 1002]: Starting up. > (kafka.coordinator.GroupCoordinator) > INFO [GroupCoordinator 1002]: Starting up. > (kafka.coordinator.GroupCoordinator) > INFO [GroupCoordinator 1002]: Startup complete. > (kafka.coordinator.GroupCoordinator) > INFO [GroupCoordinator 1002]: Startup complete. > (kafka.coordinator.GroupCoordinator) > INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 > milliseconds. (kafka.coordinator.GroupMetadataManager) > INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 > milliseconds. (kafka.coordinator.GroupMetadataManager) > INFO [ThrottledRequestReaper-Produce], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO [ThrottledRequestReaper-Produce], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO [ThrottledRequestReaper-Fetch], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO [ThrottledRequestReaper-Fetch], Starting > (kafka.server.ClientQuotaManager$ThrottledRequestReaper) > INFO Will not load MX4J, mx4j-tools.jar is not in the classpath > (kafka.utils.Mx4jLoader$) > INFO Will not load MX4J, mx4j-tools.jar is not in the classpath > (kafka.utils.Mx4jLoader$) > INFO Creating /brokers/ids/1002 (is it secure? false) > (kafka.utils.ZKCheckedEphemeral) > INFO Creating /brokers/ids/1002 (is it secure? false) > (kafka.utils.ZKCheckedEphemeral) > INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) > INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) > INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: > PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils) > INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: > PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils) > INFO Kafka version : 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser) > INFO Kafka commitId : b8642491e78c5a13 > (org.apache.kafka.common.utils.AppInfoParser) > INFO [Kafka Server 1002], started (kafka.server.KafkaServer) > INFO [Kafka Server 1002], started (kafka.server.KafkaServer) > Error when handling request > {controller_id=1004,controller_epoch=1,partition_states=[..REALLY LONG OUTPUT > SNIPPED AWAY..], > live_leaders=[{id=1004,host=172.16.6.187,port=9092},{id=1003,host=172.16.2.21,port=9092}]} > (kafka.server.KafkaApis) > ERROR java.lang.IllegalArgumentException: Cannot truncate to a negative > offset (-1). > at kafka.log.Log.truncateTo(Log.scala:731) > at > kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:288) > at > kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:280) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(Ha
[jira] [Commented] (KAFKA-6335) SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails intermittently
[ https://issues.apache.org/jira/browse/KAFKA-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392410#comment-16392410 ] Ted Yu commented on KAFKA-6335: --- Sigh - the above link is no longer accessible. > SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails > intermittently > -- > > Key: KAFKA-6335 > URL: https://issues.apache.org/jira/browse/KAFKA-6335 > Project: Kafka > Issue Type: Test >Reporter: Ted Yu >Assignee: Manikumar >Priority: Major > Fix For: 1.2.0 > > > From > https://builds.apache.org/job/kafka-pr-jdk9-scala2.12/3045/testReport/junit/kafka.security.auth/SimpleAclAuthorizerTest/testHighConcurrencyModificationOfResourceAcls/ > : > {code} > java.lang.AssertionError: expected acls Set(User:36 has Allow permission for > operations: Read from hosts: *, User:7 has Allow permission for operations: > Read from hosts: *, User:21 has Allow permission for operations: Read from > hosts: *, User:39 has Allow permission for operations: Read from hosts: *, > User:43 has Allow permission for operations: Read from hosts: *, User:3 has > Allow permission for operations: Read from hosts: *, User:35 has Allow > permission for operations: Read from hosts: *, User:15 has Allow permission > for operations: Read from hosts: *, User:16 has Allow permission for > operations: Read from hosts: *, User:22 has Allow permission for operations: > Read from hosts: *, User:26 has Allow permission for operations: Read from > hosts: *, User:11 has Allow permission for operations: Read from hosts: *, > User:38 has Allow permission for operations: Read from hosts: *, User:8 has > Allow permission for operations: Read from hosts: *, User:28 has Allow > permission for operations: Read from hosts: *, User:32 has Allow permission > for operations: Read from hosts: *, User:25 has Allow permission for > operations: Read from hosts: *, User:41 has Allow permission for operations: > Read from hosts: *, User:44 has Allow permission for operations: Read from > hosts: *, User:48 has Allow permission for operations: Read from hosts: *, > User:2 has Allow permission for operations: Read from hosts: *, User:9 has > Allow permission for operations: Read from hosts: *, User:14 has Allow > permission for operations: Read from hosts: *, User:46 has Allow permission > for operations: Read from hosts: *, User:13 has Allow permission for > operations: Read from hosts: *, User:5 has Allow permission for operations: > Read from hosts: *, User:29 has Allow permission for operations: Read from > hosts: *, User:45 has Allow permission for operations: Read from hosts: *, > User:6 has Allow permission for operations: Read from hosts: *, User:37 has > Allow permission for operations: Read from hosts: *, User:23 has Allow > permission for operations: Read from hosts: *, User:19 has Allow permission > for operations: Read from hosts: *, User:24 has Allow permission for > operations: Read from hosts: *, User:17 has Allow permission for operations: > Read from hosts: *, User:34 has Allow permission for operations: Read from > hosts: *, User:12 has Allow permission for operations: Read from hosts: *, > User:42 has Allow permission for operations: Read from hosts: *, User:4 has > Allow permission for operations: Read from hosts: *, User:47 has Allow > permission for operations: Read from hosts: *, User:18 has Allow permission > for operations: Read from hosts: *, User:31 has Allow permission for > operations: Read from hosts: *, User:49 has Allow permission for operations: > Read from hosts: *, User:33 has Allow permission for operations: Read from > hosts: *, User:1 has Allow permission for operations: Read from hosts: *, > User:27 has Allow permission for operations: Read from hosts: *) but got > Set(User:36 has Allow permission for operations: Read from hosts: *, User:7 > has Allow permission for operations: Read from hosts: *, User:21 has Allow > permission for operations: Read from hosts: *, User:39 has Allow permission > for operations: Read from hosts: *, User:43 has Allow permission for > operations: Read from hosts: *, User:3 has Allow permission for operations: > Read from hosts: *, User:35 has Allow permission for operations: Read from > hosts: *, User:15 has Allow permission for operations: Read from hosts: *, > User:16 has Allow permission for operations: Read from hosts: *, User:22 has > Allow permission for operations: Read from hosts: *, User:26 has Allow > permission for operations: Read from hosts: *, User:11 has Allow permission > for operations: Read from hosts: *, User:38 has Allow permission for > operations: Read from hosts: *, User:8 has Allow permission for operations: > Read from hosts: *, User
[jira] [Commented] (KAFKA-5946) Give connector method parameter better name
[ https://issues.apache.org/jira/browse/KAFKA-5946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392411#comment-16392411 ] Ted Yu commented on KAFKA-5946: --- Thanks for taking it. > Give connector method parameter better name > --- > > Key: KAFKA-5946 > URL: https://issues.apache.org/jira/browse/KAFKA-5946 > Project: Kafka > Issue Type: Improvement >Reporter: Ted Yu >Assignee: Tanvi Jaywant >Priority: Major > Labels: connector, newbie > > During the development of KAFKA-5657, there were several iterations where > method call didn't match what the connector parameter actually represents. > [~ewencp] had used connType as equivalent to connClass because Type wasn't > used to differentiate source vs sink. > [~ewencp] proposed the following: > {code} > It would help to convert all the uses of connType to connClass first, then > standardize on class == java class, type == source/sink, name == > user-specified name. > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5943) Reduce dependency on mock in connector tests
[ https://issues.apache.org/jira/browse/KAFKA-5943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-5943: -- Labels: connector (was: connector mock) > Reduce dependency on mock in connector tests > > > Key: KAFKA-5943 > URL: https://issues.apache.org/jira/browse/KAFKA-5943 > Project: Kafka > Issue Type: Test >Reporter: Ted Yu >Priority: Minor > Labels: connector > > Currently connector tests make heavy use of mock (easymock, power mock). > This may hide the real logic behind operations and makes finding bugs > difficult. > We should reduce the use of mocks so that developers can debug connector code > using unit tests. > This would shorten the development cycle for connector. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6399) Consider reducing "max.poll.interval.ms" default for Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-6399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392493#comment-16392493 ] John Roesler commented on KAFKA-6399: - I'm not sure, since I haven't had a lot of time so far to build up expectations, but here are a couple of thoughts... I'm generally a fan of exercising your expectations, thus if you think the loop should be faster then 30s, then I'd say to go ahead and set it. If it turns out to be wrong, we'll learn something new. The con to this viewpoint in this case is that potentially a lot of topologies are running with the default, and if 30s is too short, it could cause a lot of rebalancing. Then each affected person would have to investigate it and find out they need to set this config higher, and then tell us so we can adjust the default, so the OODA loop isn't very tight. Plus, the reason to set it lower is to catch runaway applications and attempt to recover. So it seems reasonable to ask on what time scale would you be happy to see a long-running application detect and recover from runaway code. I think in general 5 minutes of backup won't cause too much problems. So I guess, I'm falling more in the 5 minute camp, since it seems to me that it's likely to still help the 80% for whom 5 minutes is fine, without risking a lot of shenanigans in case the poll loop takes a little longer than we expect. > Consider reducing "max.poll.interval.ms" default for Kafka Streams > -- > > Key: KAFKA-6399 > URL: https://issues.apache.org/jira/browse/KAFKA-6399 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Khaireddine Rezgui >Priority: Minor > > In Kafka {{0.10.2.1}} we change the default value of > {{max.poll.intervall.ms}} for Kafka Streams to {{Integer.MAX_VALUE}}. The > reason was that long state restore phases during rebalance could yield > "rebalance storms" as consumers drop out of a consumer group even if they are > healthy as they didn't call {{poll()}} during state restore phase. > In version {{0.11}} and {{1.0}} the state restore logic was improved a lot > and thus, now Kafka Streams does call {{poll()}} even during restore phase. > Therefore, we might consider setting a smaller timeout for > {{max.poll.intervall.ms}} to detect bad behaving Kafka Streams applications > (ie, targeting user code) that don't make progress any more during regular > operations. > The open question would be, what a good default might be. Maybe the actual > consumer default of 30 seconds might be sufficient. During one {{poll()}} > roundtrip, we would only call {{restoreConsumer.poll()}} once and restore a > single batch of records. This should take way less time than 30 seconds. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6399) Consider reducing "max.poll.interval.ms" default for Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-6399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Khaireddine Rezgui reassigned KAFKA-6399: - Assignee: (was: Khaireddine Rezgui) > Consider reducing "max.poll.interval.ms" default for Kafka Streams > -- > > Key: KAFKA-6399 > URL: https://issues.apache.org/jira/browse/KAFKA-6399 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Priority: Minor > > In Kafka {{0.10.2.1}} we change the default value of > {{max.poll.intervall.ms}} for Kafka Streams to {{Integer.MAX_VALUE}}. The > reason was that long state restore phases during rebalance could yield > "rebalance storms" as consumers drop out of a consumer group even if they are > healthy as they didn't call {{poll()}} during state restore phase. > In version {{0.11}} and {{1.0}} the state restore logic was improved a lot > and thus, now Kafka Streams does call {{poll()}} even during restore phase. > Therefore, we might consider setting a smaller timeout for > {{max.poll.intervall.ms}} to detect bad behaving Kafka Streams applications > (ie, targeting user code) that don't make progress any more during regular > operations. > The open question would be, what a good default might be. Maybe the actual > consumer default of 30 seconds might be sufficient. During one {{poll()}} > roundtrip, we would only call {{restoreConsumer.poll()}} once and restore a > single batch of records. This should take way less time than 30 seconds. -- This message was sent by Atlassian JIRA (v7.6.3#76005)