[jira] [Updated] (KAFKA-6625) kafka offset reset when I upgrade kafka from 0.11.0 to 1.0.0 version

2018-03-08 Thread barry010 (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

barry010 updated KAFKA-6625:

Description: 
when I upgrade kafka from 0.11.0 to 1.0.0 version,during I shut down the 
broker01 with kill signal, update the code,and start the new broker01,I found 
that one of my consumer group  “flume.mpns”  consumerate rose to 10M/s。

And I got the information from __consumer_offsets-63 partition,I found the most 
of the partition's offset was reset in timestamp 1520316174188(2018/3/6 
14:2:54)。this timestamp is between I shut down the broker01 and start the new 
broker01。

I have not set the auto.offset.reset,and this topic only one consumer 
group,with flume

I put the three log ,consumer log,coordinator  log,one __consumer_offsets info。

 

  was:
when I upgrade kafka from 0.11.0 to 1.0.0 version,during I shut down the 
broker01 with kill signal, update the code,and start the new broker01,I found 
that one of my consumer group consumerate rose to 10M/s。

And I got the information from __consumer_offsets-63 partition,I found the most 
of the partition's offset was reset in timestamp 1520316174188(2018/3/6 
14:2:54)。this timestamp is between I shut down the broker01 and start the new 
broker01。

I have not set the auto.offset.reset,and this topic only one consumer 
group,with flume

I put the three log ,consumer log,coordinator  log,one __consumer_offsets info。

 


> kafka offset reset when I upgrade kafka from 0.11.0 to 1.0.0 version
> 
>
> Key: KAFKA-6625
> URL: https://issues.apache.org/jira/browse/KAFKA-6625
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: flume 1.8 with Kafka client version : 0.9.0.1
> Kafka version : 0.11.0
>Reporter: barry010
>Priority: Critical
> Attachments: consumer_offset-36.txt, flume.log.txt, server.log.txt
>
>
> when I upgrade kafka from 0.11.0 to 1.0.0 version,during I shut down the 
> broker01 with kill signal, update the code,and start the new broker01,I found 
> that one of my consumer group  “flume.mpns”  consumerate rose to 10M/s。
> And I got the information from __consumer_offsets-63 partition,I found the 
> most of the partition's offset was reset in timestamp 1520316174188(2018/3/6 
> 14:2:54)。this timestamp is between I shut down the broker01 and start the new 
> broker01。
> I have not set the auto.offset.reset,and this topic only one consumer 
> group,with flume
> I put the three log ,consumer log,coordinator  log,one __consumer_offsets 
> info。
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6626) Performance bottleneck in Kafka Connect sendRecords

2018-03-08 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/KAFKA-6626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maciej Bryński updated KAFKA-6626:
--
Description: 
Kafka Connect is using IdentityHashMap for storing records.

[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L239]

Unfortunately this solution is very slow (2-4 times slower than normal HashMap 
/ HashSet).

Benchmark result (code in attachment). 
{code:java}
Identity 4220
Set 2115
Map 1941
Fast Set 2121
{code}
Things are even worse when using default GC configuration 
(-server -XX:+UseG1GC -XX:MaxGCPauseMillis=100 
-XX:InitiatingHeapOccupancyPercent=35  -Djava.awt.headless=true)
{code:java}
Identity 7885
Set 2364
Map 1548
Fast Set 1520
{code}
Java version
{code:java}
java version "1.8.0_152"
Java(TM) SE Runtime Environment (build 1.8.0_152-b16)
Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)
{code}
This problem is greatly slowing Kafka Connect.

!image-2018-03-08-08-35-19-247.png!

 

  was:
Kafka Connect is using IdentityHashMap for storing records.

[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L239]

Unfortunately this solution is very slow (2 times slower than normal HashMap / 
HashSet).

Benchmark result (code in attachment).
{code:java}
Identity 4220
Set 2115
Map 1941
Fast Set 2121
{code}
This problem is greatly slowing Kafka Connect.

!image-2018-03-08-08-35-19-247.png!

 


> Performance bottleneck in Kafka Connect sendRecords
> ---
>
> Key: KAFKA-6626
> URL: https://issues.apache.org/jira/browse/KAFKA-6626
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Maciej Bryński
>Priority: Major
> Attachments: MapPerf.java, image-2018-03-08-08-35-19-247.png
>
>
> Kafka Connect is using IdentityHashMap for storing records.
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L239]
> Unfortunately this solution is very slow (2-4 times slower than normal 
> HashMap / HashSet).
> Benchmark result (code in attachment). 
> {code:java}
> Identity 4220
> Set 2115
> Map 1941
> Fast Set 2121
> {code}
> Things are even worse when using default GC configuration 
> (-server -XX:+UseG1GC -XX:MaxGCPauseMillis=100 
> -XX:InitiatingHeapOccupancyPercent=35  -Djava.awt.headless=true)
> {code:java}
> Identity 7885
> Set 2364
> Map 1548
> Fast Set 1520
> {code}
> Java version
> {code:java}
> java version "1.8.0_152"
> Java(TM) SE Runtime Environment (build 1.8.0_152-b16)
> Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)
> {code}
> This problem is greatly slowing Kafka Connect.
> !image-2018-03-08-08-35-19-247.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6195) DNS alias support for secured connections

2018-03-08 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16390947#comment-16390947
 ] 

Rajini Sivaram commented on KAFKA-6195:
---

In general, it is good to avoid configuration options if we can determine the 
behaviour automatically. But in this case, my preference is for a config option 
because a DNS lookup can add delays in some environments. Will be good to see 
if there are other opinions.

SSL certificates can contain hostnames that are wildcarded, making it easy to 
add a single certificate for the cluster. If you are using IP addresses instead 
of hostnames, I believe you need to specify the full IP address, but you can 
have multiple addresses (or hostnames) in a single certificate.

 

> DNS alias support for secured connections
> -
>
> Key: KAFKA-6195
> URL: https://issues.apache.org/jira/browse/KAFKA-6195
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Jonathan Skrzypek
>Priority: Major
>
> It seems clients can't use a dns alias in front of a secured Kafka cluster.
> So applications can only specify a list of hosts or IPs in bootstrap.servers 
> instead of an alias encompassing all cluster nodes.
> Using an alias in bootstrap.servers results in the following error : 
> javax.security.sasl.SaslException: An error: 
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
> GSS initiate failed [Caused by GSSException: No valid credentials provided 
> (Mechanism level: Fail to create credential. (63) - No service creds)]) 
> occurred when evaluating SASL token received from the Kafka Broker. Kafka 
> Client will go to AUTH_FAILED state. [Caused by 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Fail to create 
> credential. (63) - No service creds)]]
> When using SASL/Kerberos authentication, the kafka server principal is of the 
> form kafka@kafka/broker1.hostname@example.com
> Kerberos requires that the hosts can be resolved by their FQDNs.
> During SASL handshake, the client will create a SASL token and then send it 
> to kafka for auth.
> But to create a SASL token the client first needs to be able to validate that 
> the broker's kerberos is a valid one.
> There are 3 potential options :
> 1. Creating a single kerberos principal not linked to a host but to an alias 
> and reference it in the broker jaas file.
> But I think the kerberos infrastructure would refuse to validate it, so the 
> SASL handshake would still fail
> 2. Modify the client bootstrap mechanism to detect whether bootstrap.servers 
> contains a dns alias. If it does, resolve and expand the alias to retrieve 
> all hostnames behind it and add them to the list of nodes.
> This could be done by modifying parseAndValidateAddresses() in ClientUtils
> 3. Add a cluster.alias parameter that would be handled by the logic above. 
> Having another parameter to avoid confusion on how bootstrap.servers works 
> behind the scene.
> Thoughts ?
> I would be happy to contribute the change for any of the options.
> I believe the ability to use a dns alias instead of static lists of brokers 
> would bring good deployment flexibility.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6625) kafka offset reset when I upgrade kafka from 0.11.0 to 1.0.0 version

2018-03-08 Thread barry010 (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

barry010 updated KAFKA-6625:

Description: 
when I upgrade kafka from 0.11.0 to 1.0.0 version,during I shut down the 
broker01 with kill signal, update the code,and start the new broker01,I found 
that one of my consumer group  “flume.mpns”  consumption rate had risen to 
10M/s。

And I got the information from __consumer_offsets-63 partition,I found the most 
of the partition's offset was reset in timestamp 1520316174188(2018/3/6 
14:2:54)。this timestamp is between I shut down the broker01 and start the new 
broker01。

I have not set the auto.offset.reset,and this topic only one consumer 
group,with flume

I put the three log ,consumer log,coordinator  log,one __consumer_offsets info。

 

  was:
when I upgrade kafka from 0.11.0 to 1.0.0 version,during I shut down the 
broker01 with kill signal, update the code,and start the new broker01,I found 
that one of my consumer group  “flume.mpns”  consumerate rose to 10M/s。

And I got the information from __consumer_offsets-63 partition,I found the most 
of the partition's offset was reset in timestamp 1520316174188(2018/3/6 
14:2:54)。this timestamp is between I shut down the broker01 and start the new 
broker01。

I have not set the auto.offset.reset,and this topic only one consumer 
group,with flume

I put the three log ,consumer log,coordinator  log,one __consumer_offsets info。

 


> kafka offset reset when I upgrade kafka from 0.11.0 to 1.0.0 version
> 
>
> Key: KAFKA-6625
> URL: https://issues.apache.org/jira/browse/KAFKA-6625
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: flume 1.8 with Kafka client version : 0.9.0.1
> Kafka version : 0.11.0
>Reporter: barry010
>Priority: Critical
> Attachments: consumer_offset-36.txt, flume.log.txt, server.log.txt
>
>
> when I upgrade kafka from 0.11.0 to 1.0.0 version,during I shut down the 
> broker01 with kill signal, update the code,and start the new broker01,I found 
> that one of my consumer group  “flume.mpns”  consumption rate had risen to 
> 10M/s。
> And I got the information from __consumer_offsets-63 partition,I found the 
> most of the partition's offset was reset in timestamp 1520316174188(2018/3/6 
> 14:2:54)。this timestamp is between I shut down the broker01 and start the new 
> broker01。
> I have not set the auto.offset.reset,and this topic only one consumer 
> group,with flume
> I put the three log ,consumer log,coordinator  log,one __consumer_offsets 
> info。
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6560) Use single-point queries than range queries for windowed aggregation operators

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391063#comment-16391063
 ] 

ASF GitHub Bot commented on KAFKA-6560:
---

dguy opened a new pull request #4665: KAFKA-6560: [FOLLOW-UP] don't deserialize 
null byte array in window store fetch
URL: https://github.com/apache/kafka/pull/4665
 
 
   If the result of a fetch from a Window Store results in a null byte array we 
should return null rather than passing it to the serde to deserialize.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use single-point queries than range queries for windowed aggregation operators
> --
>
> Key: KAFKA-6560
> URL: https://issues.apache.org/jira/browse/KAFKA-6560
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: needs-kip
> Fix For: 1.2.0
>
>
> Today for windowed aggregations in Streams DSL, the underlying implementation 
> is leveraging the fetch(key, from, to) API to get all the related windows for 
> a single record to update. However, this is a very inefficient operation with 
> significant amount of CPU time iterating over window stores. On the other 
> hand, since the operator implementation itself have full knowledge of the 
> window specs it can actually translate this operation into multiple 
> single-point queries with the accurate window start timestamp, which would 
> largely reduce the overhead.
> The proposed approach is to add a single fetch API to the WindowedStore and 
> use that in the KStreamWindowedAggregate / KStreamWindowedReduce operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3530) Making the broker-list option consistent across all tools

2018-03-08 Thread Chia-Ping Tsai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391260#comment-16391260
 ] 

Chia-Ping Tsai commented on KAFKA-3530:
---

Any updates? Many peoples are confused by difference between broker-list(in 
producer) and bootstrap-server(in consumer)

> Making the broker-list option consistent across all tools
> -
>
> Key: KAFKA-3530
> URL: https://issues.apache.org/jira/browse/KAFKA-3530
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Jun Rao
>Assignee: Liquan Pei
>Priority: Major
>
> Currently, console-producer uses --broker-list and console-consumer uses 
> --bootstrap-server. This can be confusing to the users. We should standardize 
> the name on all tools using broker list.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6598) Kafka to support using ETCD beside Zookeeper

2018-03-08 Thread Tom Bentley (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391312#comment-16391312
 ] 

Tom Bentley commented on KAFKA-6598:


[~cmccabe] any more info about that (such as when might the KIP be published?)

> Kafka to support using ETCD beside Zookeeper
> 
>
> Key: KAFKA-6598
> URL: https://issues.apache.org/jira/browse/KAFKA-6598
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, core
>Reporter: Sebastian Toader
>Priority: Major
>
> The current Kafka implementation is bound to {{Zookeeper}} to store its 
> metadata for forming a cluster of nodes (producer/consumer/broker). 
> As Kafka is becoming popular for streaming in various environments where 
> {{Zookeeper}} is either not easy to deploy/manage or there are better 
> alternatives to it there is a need 
> to run Kafka with other metastore implementation than {{Zookeeper}}.
> {{etcd}} can provide the same semantics as {{Zookeeper}} for Kafka and since 
> {{etcd}} is the favorable choice in certain environments (e.g. Kubernetes) 
> Kafka should be able to run with {{etcd}}.
> From the user's point of view should be straightforward to configure to use 
> {{etcd}} by just simply specifying a connection string that point to {{etcd}} 
> cluster.
> To avoid introducing instability the original interfaces should be kept and 
> only the low level {{Zookeeper}} API calls should be replaced with \{{etcd}} 
> API calls in case Kafka is configured 
> to use {{etcd}}.
> On the long run (which is out of scope of this jira) there should be an 
> abstract layer in Kafka which then various metastore implementations would 
> implement.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6603) Kafka streams off heap memory usage does not match expected values from configuration

2018-03-08 Thread Igor Calabria (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391479#comment-16391479
 ] 

Igor Calabria commented on KAFKA-6603:
--

Yes, I was accessing the store directly and replacing the iterator with 
individual fetch calls really improved performance. 

> Kafka streams off heap memory usage does not match expected values from 
> configuration
> -
>
> Key: KAFKA-6603
> URL: https://issues.apache.org/jira/browse/KAFKA-6603
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Igor Calabria
>Priority: Minor
>
> Hi, I have a simple aggregation pipeline that's backed by the default state 
> store(rocksdb). The pipeline works fine except that off heap the memory usage 
> is way higher than expected. Following the 
> [documention|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config]
>  has some effect(memory usage is reduced) but the values don't match at all. 
> The java process is set to run with just `-Xmx300m -Xms300m`  and rocksdb 
> config looks like this
> {code:java}
> tableConfig.setCacheIndexAndFilterBlocks(true);
> tableConfig.setBlockCacheSize(1048576); //1MB
> tableConfig.setBlockSize(16 * 1024); // 16KB
> options.setTableFormatConfig(tableConfig);
> options.setMaxWriteBufferNumber(2);
> options.setWriteBufferSize(8 * 1024); // 8KB{code}
> To estimate memory usage, I'm using this formula  
> {noformat}
> (block_cache_size + write_buffer_size * write_buffer_number) * segments * 
> partitions{noformat}
> Since my topic has 25 partitions with 3 segments each(it's a windowed store), 
> off heap memory usage should be about 76MB. What I'm seeing in production is 
> upwards of 300MB, even taking in consideration  extra overhead from rocksdb 
> compaction threads, this seems a bit high (especially when the disk usage for 
> all files is just 1GB) 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6603) Kafka streams off heap memory usage does not match expected values from configuration

2018-03-08 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391509#comment-16391509
 ] 

Guozhang Wang commented on KAFKA-6603:
--

Awesome, thanks for confirming Igor.

At the mean time, we are still benchmarking and investigating into RocksDB for 
possible optimizations. Stay tuned.

> Kafka streams off heap memory usage does not match expected values from 
> configuration
> -
>
> Key: KAFKA-6603
> URL: https://issues.apache.org/jira/browse/KAFKA-6603
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Igor Calabria
>Priority: Minor
>
> Hi, I have a simple aggregation pipeline that's backed by the default state 
> store(rocksdb). The pipeline works fine except that off heap the memory usage 
> is way higher than expected. Following the 
> [documention|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config]
>  has some effect(memory usage is reduced) but the values don't match at all. 
> The java process is set to run with just `-Xmx300m -Xms300m`  and rocksdb 
> config looks like this
> {code:java}
> tableConfig.setCacheIndexAndFilterBlocks(true);
> tableConfig.setBlockCacheSize(1048576); //1MB
> tableConfig.setBlockSize(16 * 1024); // 16KB
> options.setTableFormatConfig(tableConfig);
> options.setMaxWriteBufferNumber(2);
> options.setWriteBufferSize(8 * 1024); // 8KB{code}
> To estimate memory usage, I'm using this formula  
> {noformat}
> (block_cache_size + write_buffer_size * write_buffer_number) * segments * 
> partitions{noformat}
> Since my topic has 25 partitions with 3 segments each(it's a windowed store), 
> off heap memory usage should be about 76MB. What I'm seeing in production is 
> upwards of 300MB, even taking in consideration  extra overhead from rocksdb 
> compaction threads, this seems a bit high (especially when the disk usage for 
> all files is just 1GB) 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6627) Console producer default config values override explicitly provided properties

2018-03-08 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-6627:
--

 Summary: Console producer default config values override 
explicitly provided properties
 Key: KAFKA-6627
 URL: https://issues.apache.org/jira/browse/KAFKA-6627
 Project: Kafka
  Issue Type: Bug
  Components: tools
Reporter: Jason Gustafson


Some producer properties can be provided through custom parameters (e.g. 
{{\-\-request-required-acks}}) and explicitly through 
{{\-\-producer-property}}. At the moment, some of the custom parameters have 
default values which actually override explicitly provided properties. For 
example, if you set {{\-\-producer-property acks=all}} when starting the 
console producer, the argument will be ignored since 
{{\-\-request-required-acks}} has a default value. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6626) Performance bottleneck in Kafka Connect sendRecords

2018-03-08 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/KAFKA-6626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maciej Bryński updated KAFKA-6626:
--
Description: 
Kafka Connect is using IdentityHashMap for storing records.

[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L239]

Unfortunately this solution is very slow (2-4 times slower than normal HashMap 
/ HashSet).

Benchmark result (code in attachment). 
{code:java}
Identity 4220
Set 2115
Map 1941
Fast Set 2121
{code}
Things are even worse when using default GC configuration 
 (-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 
-XX:InitiatingHeapOccupancyPercent=35  -Djava.awt.headless=true)
{code:java}
Identity 7885
Set 2364
Map 1548
Fast Set 1520
{code}
Java version
{code:java}
java version "1.8.0_152"
Java(TM) SE Runtime Environment (build 1.8.0_152-b16)
Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)
{code}
This problem is greatly slowing Kafka Connect.

!image-2018-03-08-08-35-19-247.png!

 

  was:
Kafka Connect is using IdentityHashMap for storing records.

[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L239]

Unfortunately this solution is very slow (2-4 times slower than normal HashMap 
/ HashSet).

Benchmark result (code in attachment). 
{code:java}
Identity 4220
Set 2115
Map 1941
Fast Set 2121
{code}
Things are even worse when using default GC configuration 
(-server -XX:+UseG1GC -XX:MaxGCPauseMillis=100 
-XX:InitiatingHeapOccupancyPercent=35  -Djava.awt.headless=true)
{code:java}
Identity 7885
Set 2364
Map 1548
Fast Set 1520
{code}
Java version
{code:java}
java version "1.8.0_152"
Java(TM) SE Runtime Environment (build 1.8.0_152-b16)
Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)
{code}
This problem is greatly slowing Kafka Connect.

!image-2018-03-08-08-35-19-247.png!

 


> Performance bottleneck in Kafka Connect sendRecords
> ---
>
> Key: KAFKA-6626
> URL: https://issues.apache.org/jira/browse/KAFKA-6626
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Maciej Bryński
>Priority: Major
> Attachments: MapPerf.java, image-2018-03-08-08-35-19-247.png
>
>
> Kafka Connect is using IdentityHashMap for storing records.
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L239]
> Unfortunately this solution is very slow (2-4 times slower than normal 
> HashMap / HashSet).
> Benchmark result (code in attachment). 
> {code:java}
> Identity 4220
> Set 2115
> Map 1941
> Fast Set 2121
> {code}
> Things are even worse when using default GC configuration 
>  (-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 
> -XX:InitiatingHeapOccupancyPercent=35  -Djava.awt.headless=true)
> {code:java}
> Identity 7885
> Set 2364
> Map 1548
> Fast Set 1520
> {code}
> Java version
> {code:java}
> java version "1.8.0_152"
> Java(TM) SE Runtime Environment (build 1.8.0_152-b16)
> Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)
> {code}
> This problem is greatly slowing Kafka Connect.
> !image-2018-03-08-08-35-19-247.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3978) Cannot truncate to a negative offset (-1) exception at broker startup

2018-03-08 Thread Dong Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391698#comment-16391698
 ] 

Dong Lin commented on KAFKA-3978:
-

I also encountered this issue and I have been looking at this all day 
yesterday. Still no clue how leader can sender FetchResponse with error=None 
and hw=-1

> Cannot truncate to a negative offset (-1) exception at broker startup
> -
>
> Key: KAFKA-3978
> URL: https://issues.apache.org/jira/browse/KAFKA-3978
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
> Environment: 3.13.0-87-generic 
>Reporter: Juho Mäkinen
>Priority: Critical
>  Labels: reliability, startup
>
> During broker startup sequence the broker server.log has this exception. 
> Problem persists after multiple restarts and also on another broker in the 
> cluster.
> {code}
> INFO [Socket Server on Broker 1002], Started 1 acceptor threads 
> (kafka.network.SocketServer)
> INFO [Socket Server on Broker 1002], Started 1 acceptor threads 
> (kafka.network.SocketServer)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [GroupCoordinator 1002]: Starting up. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Starting up. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Startup complete. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Startup complete. 
> (kafka.coordinator.GroupCoordinator)
> INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 
> milliseconds. (kafka.coordinator.GroupMetadataManager)
> INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 
> milliseconds. (kafka.coordinator.GroupMetadataManager)
> INFO [ThrottledRequestReaper-Produce], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Produce], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Fetch], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Fetch], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO Will not load MX4J, mx4j-tools.jar is not in the classpath 
> (kafka.utils.Mx4jLoader$)
> INFO Will not load MX4J, mx4j-tools.jar is not in the classpath 
> (kafka.utils.Mx4jLoader$)
> INFO Creating /brokers/ids/1002 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> INFO Creating /brokers/ids/1002 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
> INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
> INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: 
> PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils)
> INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: 
> PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils)
> INFO Kafka version : 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser)
> INFO Kafka commitId : b8642491e78c5a13 
> (org.apache.kafka.common.utils.AppInfoParser)
> INFO [Kafka Server 1002], started (kafka.server.KafkaServer)
> INFO [Kafka Server 1002], started (kafka.server.KafkaServer)
> Error when handling request 
> {controller_id=1004,controller_epoch=1,partition_states=[..REALLY LONG OUTPUT 
> SNIPPED AWAY..], 
> live_leaders=[{id=1004,host=172.16.6.187,port=9092},{id=1003,host=172.16.2.21,port=9092}]}
>  (kafka.server.KafkaApis)
> ERROR java.lang.IllegalArgumentException: Cannot truncate to a negative 
> offset (-1).
> at kafka.log.Log.truncateTo(Log.scala:731)
> at 
> kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:288)
> at 
> kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:280)
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>

[jira] [Commented] (KAFKA-6473) Add MockProcessorContext to public test-utils

2018-03-08 Thread John Roesler (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391741#comment-16391741
 ] 

John Roesler commented on KAFKA-6473:
-

I have created 
[KIP-267|https://cwiki.apache.org/confluence/display/KAFKA/KIP-267%3A+Add+Processor+Unit+Test+Support+to+Kafka+Streams+Test+Utils]
 to propose a solution to this.

> Add MockProcessorContext to public test-utils
> -
>
> Key: KAFKA-6473
> URL: https://issues.apache.org/jira/browse/KAFKA-6473
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>  Labels: needs-kip, user-experience
>
> With KIP-247, we added public test-utils artifact with a TopologyTestDriver 
> class. Using the test driver for a single 
> Processor/Transformer/ValueTransformer it's required to specify a whole 
> topology with source and sink and plus the 
> Processor/Transformer/ValueTransformer into it.
> For unit testing, it might be more convenient to have a MockProcessorContext, 
> that can be used to test the Processor/Transformer/ValueTransformer in 
> isolation. Ie, the test itself creates new 
> Processor/Transformer/ValueTransformer object and calls init() manually 
> passing in the MockProcessorContext.
> This is a public API change and requires a KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6626) Performance bottleneck in Kafka Connect sendRecords

2018-03-08 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391818#comment-16391818
 ] 

Ted Yu commented on KAFKA-6626:
---

There are alternatives in https://github.com/carrotsearch/hppc and 
http://leventov.github.io/Koloboke/api/1.0/java8/index.html for their 
object-to-object maps.

They're Apache 2 licensed.
I wonder if we can utilize any of those.

> Performance bottleneck in Kafka Connect sendRecords
> ---
>
> Key: KAFKA-6626
> URL: https://issues.apache.org/jira/browse/KAFKA-6626
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Maciej Bryński
>Priority: Major
> Attachments: MapPerf.java, image-2018-03-08-08-35-19-247.png
>
>
> Kafka Connect is using IdentityHashMap for storing records.
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L239]
> Unfortunately this solution is very slow (2-4 times slower than normal 
> HashMap / HashSet).
> Benchmark result (code in attachment). 
> {code:java}
> Identity 4220
> Set 2115
> Map 1941
> Fast Set 2121
> {code}
> Things are even worse when using default GC configuration 
>  (-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 
> -XX:InitiatingHeapOccupancyPercent=35  -Djava.awt.headless=true)
> {code:java}
> Identity 7885
> Set 2364
> Map 1548
> Fast Set 1520
> {code}
> Java version
> {code:java}
> java version "1.8.0_152"
> Java(TM) SE Runtime Environment (build 1.8.0_152-b16)
> Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)
> {code}
> This problem is greatly slowing Kafka Connect.
> !image-2018-03-08-08-35-19-247.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6626) Performance bottleneck in Kafka Connect sendRecords

2018-03-08 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-6626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391839#comment-16391839
 ] 

Maciej Bryński commented on KAFKA-6626:
---

I think even standard java HashMap can be used.

The only thing I wonder is if we need to use IdentityHashMap (comparing records 
by instance, not by equals)

[~ewencp] ?

> Performance bottleneck in Kafka Connect sendRecords
> ---
>
> Key: KAFKA-6626
> URL: https://issues.apache.org/jira/browse/KAFKA-6626
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Maciej Bryński
>Priority: Major
> Attachments: MapPerf.java, image-2018-03-08-08-35-19-247.png
>
>
> Kafka Connect is using IdentityHashMap for storing records.
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L239]
> Unfortunately this solution is very slow (2-4 times slower than normal 
> HashMap / HashSet).
> Benchmark result (code in attachment). 
> {code:java}
> Identity 4220
> Set 2115
> Map 1941
> Fast Set 2121
> {code}
> Things are even worse when using default GC configuration 
>  (-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 
> -XX:InitiatingHeapOccupancyPercent=35  -Djava.awt.headless=true)
> {code:java}
> Identity 7885
> Set 2364
> Map 1548
> Fast Set 1520
> {code}
> Java version
> {code:java}
> java version "1.8.0_152"
> Java(TM) SE Runtime Environment (build 1.8.0_152-b16)
> Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)
> {code}
> This problem is greatly slowing Kafka Connect.
> !image-2018-03-08-08-35-19-247.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3978) Cannot truncate to a negative offset (-1) exception at broker startup

2018-03-08 Thread Dong Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391863#comment-16391863
 ] 

Dong Lin commented on KAFKA-3978:
-

[~junrao] [~hachikuji] [~ijuma] I have a good theory now. I will submit a patch 
to fix it.

 
- Broker A is leader for partition P1 and broker B is follower for partition P1.
 
- Broker C receives LeaderAndIsrRequest to become follower for P1. In broker 
C's memory, P1's highWatermarkMetadata = LogOffsetMetadata(hw=0, 
segmentBaseOffset=-1). Local replica's LEO for P1 is 0.
 
- Broker C's ReplicaFetchRequest sends leader epoch request to broker A and 
then truncates its local replica's LEO to 100.
 
- Broker C receives LeaderAndIsrRequest to become leader for P1 with 
ISR=(A,B,C). In broker C's memory, P1's highWatermarkMetadata = 
LogOffsetMetadata(hw=0, segmentBaseOffset=-1). And in broker C's memory, 
according to partition.makeLeader(), replica A's logEndOffsetMetadata is 
initialized to be LogOffsetMetadata.UnknownOffsetMetadata, which has HW= - 1 
and segmentBaseOffset = 0.
 
- Broker C receives FetchRequest from broker B. In 
Partition.maybeIncrementLeaderHW(), new highWaterMark will be derived with 
min(logEndOffset metadata of all replicas), which will be 
LogOffsetMetadata(HW=-1, segmentBaseOffset=0) because A's logEndOffset metadata 
is smallest. Because new hw's segmentBaseOffset > old hw's segmentBaseOffset, 
the high watermark is updated to be LogOffsetMetadata(hw=-1, 
segmentBaseOffset=100L).
 
- Now we have a partition whose HW is negative, which cause problem for broker 
B when it fetches from broker C.

> Cannot truncate to a negative offset (-1) exception at broker startup
> -
>
> Key: KAFKA-3978
> URL: https://issues.apache.org/jira/browse/KAFKA-3978
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
> Environment: 3.13.0-87-generic 
>Reporter: Juho Mäkinen
>Priority: Critical
>  Labels: reliability, startup
>
> During broker startup sequence the broker server.log has this exception. 
> Problem persists after multiple restarts and also on another broker in the 
> cluster.
> {code}
> INFO [Socket Server on Broker 1002], Started 1 acceptor threads 
> (kafka.network.SocketServer)
> INFO [Socket Server on Broker 1002], Started 1 acceptor threads 
> (kafka.network.SocketServer)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [GroupCoordinator 1002]: Starting up. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Starting up. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Startup complete. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Startup complete. 
> (kafka.coordinator.GroupCoordinator)
> INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 
> milliseconds. (kafka.coordinator.GroupMetadataManager)
> INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 
> milliseconds. (kafka.coordinator.GroupMetadataManager)
> INFO [ThrottledRequestReaper-Produce], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Produce], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Fetch], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Fetch], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO Will not load MX4J, mx4j-tools.jar is not in the classpath 
> (kafka.utils.Mx4jLoader$)
> INFO Will not load MX4J, mx4j-tools.jar is not in the classpath 
> (kafka.utils.Mx4jLoader$)
> INFO Creating /brokers/ids/1002 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> INFO Creating /brokers/ids/1002 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
> INFO Result of znode creation is: OK (kafka.utils.ZKChecked

[jira] [Commented] (KAFKA-6612) Added logic to prevent increasing partition counts during topic deletion

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391906#comment-16391906
 ] 

ASF GitHub Bot commented on KAFKA-6612:
---

gitlw closed pull request #4649: KAFKA-6612: Added logic to prevent increasing 
partition counts during topic deletion
URL: https://github.com/apache/kafka/pull/4649
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):



 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Added logic to prevent increasing partition counts during topic deletion
> 
>
> Key: KAFKA-6612
> URL: https://issues.apache.org/jira/browse/KAFKA-6612
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Major
>
> Problem: trying to increase the partition count of a topic while the topic 
> deletion is in progress can cause the topic to be never deleted.
> In the current code base, if a topic deletion is still in progress and the 
> partition count is increased,
> the new partition and its replica assignment be created on zookeeper as data 
> of the path /brokers/topics/.
> Upon detecting the change, the controller sees the topic is being deleted, 
> and therefore ignores the partition change. Therefore the zk path 
> /brokers/topics//partitions/ will NOT be created.
> If a controller switch happens next, the added partition will be detected by 
> the new controller and stored in the 
> controllerContext.partitionReplicaAssignment. The new controller then tries 
> to delete the topic by first transitioning its replicas to OfflineReplica. 
> However the transition to OfflineReplica state will NOT succeed since there 
> is no leader for the partition. Since the only state change path for a 
> replica to be successfully deleted is OfflineReplica -> 
> ReplicaDeletionStarted -> ReplicaDeletionSuccessful, not being able to enter 
> the OfflineReplica state means the replica can never be successfully deleted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6612) Added logic to prevent increasing partition counts during topic deletion

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391913#comment-16391913
 ] 

ASF GitHub Bot commented on KAFKA-6612:
---

gitlw opened a new pull request #4666: KAFKA-6612: Added logic to prevent 
increasing partition counts during topic deletion
URL: https://github.com/apache/kafka/pull/4666
 
 
   This patch adds logic in handling the PartitionModifications event, so that 
if the partition count is increased when a topic deletion is still in progress, 
the controller will restore the data of the path /brokers/topics/"topic" to 
remove the added partitions.
   
   Testing done:
   Added a new test method to cover the bug
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Added logic to prevent increasing partition counts during topic deletion
> 
>
> Key: KAFKA-6612
> URL: https://issues.apache.org/jira/browse/KAFKA-6612
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Major
>
> Problem: trying to increase the partition count of a topic while the topic 
> deletion is in progress can cause the topic to be never deleted.
> In the current code base, if a topic deletion is still in progress and the 
> partition count is increased,
> the new partition and its replica assignment be created on zookeeper as data 
> of the path /brokers/topics/.
> Upon detecting the change, the controller sees the topic is being deleted, 
> and therefore ignores the partition change. Therefore the zk path 
> /brokers/topics//partitions/ will NOT be created.
> If a controller switch happens next, the added partition will be detected by 
> the new controller and stored in the 
> controllerContext.partitionReplicaAssignment. The new controller then tries 
> to delete the topic by first transitioning its replicas to OfflineReplica. 
> However the transition to OfflineReplica state will NOT succeed since there 
> is no leader for the partition. Since the only state change path for a 
> replica to be successfully deleted is OfflineReplica -> 
> ReplicaDeletionStarted -> ReplicaDeletionSuccessful, not being able to enter 
> the OfflineReplica state means the replica can never be successfully deleted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6560) Use single-point queries than range queries for windowed aggregation operators

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391932#comment-16391932
 ] 

ASF GitHub Bot commented on KAFKA-6560:
---

guozhangwang closed pull request #4665: KAFKA-6560: [FOLLOW-UP] don't 
deserialize null byte array in window store fetch
URL: https://github.com/apache/kafka/pull/4665
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 15961e7c721..b131db59973 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -106,15 +106,15 @@ private Bytes keyBytes(final K key) {
 @Override
 public V fetch(final K key, final long timestamp) {
 final long startNs = time.nanoseconds();
-V ret;
 try {
 final byte[] result = inner.fetch(keyBytes(key), timestamp);
-ret = serdes.valueFrom(result);
+if (result == null) {
+return null;
+}
+return serdes.valueFrom(result);
 } finally {
 metrics.recordLatency(this.fetchTime, startNs, time.nanoseconds());
 }
-
-return ret;
 }
 
 @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 58c345a7867..732f3d62a42 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -110,6 +110,9 @@ public void put(final K key, final V value, final long 
timestamp) {
 @Override
 public V fetch(final K key, final long timestamp) {
 final byte[] bytesValue = 
bytesStore.get(WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes));
+if (bytesValue == null) {
+return null;
+}
 return serdes.valueFrom(bytesValue);
 }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 3ff343a7155..0e3b4e8223e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -81,6 +81,7 @@ public void setUp() {
 
 @After
 public void after() {
+super.after();
 context.close();
 }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index bda385ebcb9..16ef47c1c1b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -28,7 +28,6 @@
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.MockProcessorContext;
@@ -76,7 +75,7 @@ public void setUp() {
  
Segments.segmentInterval(retention, numSegments)
  );
 cache = new ThreadCache(new LogContext("testCache "), 
MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
-context = new MockProcessorContext(TestUtils.tempDirectory(), null, 
null, (RecordCollector) null, cache);
+context = new MockProcessorContext(TestUtils.tempDirectory(), null, 
null, null, cache);
 context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 
0, 0, "topic"));
 cachingStore.init(context, cachingStore);
 }
@@ -87,10 +86,6 @@ public void close() {
 cachingStore.close();
 }
 
-private Bytes bytesKey(final String key) {
-return Bytes.wrap(key.getBytes());
-}
-
 @Test
 public void shouldPutFetchFromCache() {
 cachingStore.put(new 

[jira] [Created] (KAFKA-6628) RocksDBSegmentedBytesStoreTest does not cover time window serdes

2018-03-08 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-6628:


 Summary: RocksDBSegmentedBytesStoreTest does not cover time window 
serdes
 Key: KAFKA-6628
 URL: https://issues.apache.org/jira/browse/KAFKA-6628
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


The RocksDBSegmentedBytesStoreTest.java only covers session window serdes, but 
not time window serdes. We should fill in this coverage gap.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6629) SegmentedCacheFunctionTest does not cover session window serdes

2018-03-08 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-6629:


 Summary: SegmentedCacheFunctionTest does not cover session window 
serdes
 Key: KAFKA-6629
 URL: https://issues.apache.org/jira/browse/KAFKA-6629
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


The SegmentedCacheFunctionTest.java only covers time window serdes, but not 
session window serdes. We should fill in this coverage gap.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6630) Speed up the processing of StopReplicaResponse events on the controller

2018-03-08 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-6630:
-

 Summary: Speed up the processing of StopReplicaResponse events on 
the controller
 Key: KAFKA-6630
 URL: https://issues.apache.org/jira/browse/KAFKA-6630
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Lucas Wang
Assignee: Lucas Wang


Problem Statement:
We find in a large cluster with many partition replicas, it takes a long time 
to successfully delete a topic. 

Root cause:
Further analysis shows that for a topic with N replicas, the controller 
receives all the N StopReplicaResponses from brokers within a short time, 
however sequentially handling all the N 
TopicDeletionStopReplicaResponseReceived events one by one takes a long time.

Specifically the functions triggered while handling every single 
TopicDeletionStopReplicaResponseReceived event include:
TopicDeletionStopReplicaResponseReceived.process calls 
TopicDeletionManager.completeReplicaDeletion, which calls 
TopicDeletionManager.resumeDeletions, which calls several inefficient functions.

The inefficient functions called inside TopicDeletionManager.resumeDeletions 
include
ReplicaStateMachine.areAllReplicasForTopicDeleted
ReplicaStateMachine.isAtLeastOneReplicaInDeletionStartedState
ReplicaStateMachine.replicasInState

Each of the 3 inefficient functions above will iterate through all the replicas 
in the cluster, and filter out the replicas belonging to a topic. In a large 
cluster with many replicas, these functions can be quite slow. 

Total deletion time for a topic becomes long in single threaded controller 
processing model:
Since the controller needs to sequentially process the queued 
TopicDeletionStopReplicaResponseReceived events, if the time cost to process 
one event is t, the total time to process all events for all replicas of a 
topic is N * t.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6630) Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller

2018-03-08 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6630:
--
Summary: Speed up the processing of 
TopicDeletionStopReplicaResponseReceived events on the controller  (was: Speed 
up the processing of StopReplicaResponse events on the controller)

> Speed up the processing of TopicDeletionStopReplicaResponseReceived events on 
> the controller
> 
>
> Key: KAFKA-6630
> URL: https://issues.apache.org/jira/browse/KAFKA-6630
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> We find in a large cluster with many partition replicas, it takes a long time 
> to successfully delete a topic. 
> Root cause:
> Further analysis shows that for a topic with N replicas, the controller 
> receives all the N StopReplicaResponses from brokers within a short time, 
> however sequentially handling all the N 
> TopicDeletionStopReplicaResponseReceived events one by one takes a long time.
> Specifically the functions triggered while handling every single 
> TopicDeletionStopReplicaResponseReceived event include:
> TopicDeletionStopReplicaResponseReceived.process calls 
> TopicDeletionManager.completeReplicaDeletion, which calls 
> TopicDeletionManager.resumeDeletions, which calls several inefficient 
> functions.
> The inefficient functions called inside TopicDeletionManager.resumeDeletions 
> include
> ReplicaStateMachine.areAllReplicasForTopicDeleted
> ReplicaStateMachine.isAtLeastOneReplicaInDeletionStartedState
> ReplicaStateMachine.replicasInState
> Each of the 3 inefficient functions above will iterate through all the 
> replicas in the cluster, and filter out the replicas belonging to a topic. In 
> a large cluster with many replicas, these functions can be quite slow. 
> Total deletion time for a topic becomes long in single threaded controller 
> processing model:
> Since the controller needs to sequentially process the queued 
> TopicDeletionStopReplicaResponseReceived events, if the time cost to process 
> one event is t, the total time to process all events for all replicas of a 
> topic is N * t.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6630) Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller

2018-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392070#comment-16392070
 ] 

ASF GitHub Bot commented on KAFKA-6630:
---

gitlw opened a new pull request #4668: KAFKA-6630: Speed up the processing of 
TopicDeletionStopReplicaResponseReceived events on the controller
URL: https://github.com/apache/kafka/pull/4668
 
 
   This patch tries to speed up the inefficient functions identified in 
Kafka-6630 by grouping partitions in the 
ControllerContext.partitionReplicaAssignment variable by topics. Hence trying 
to find all replicas for a topic won't need to go through all the replicas in 
the cluster.
   
   Passed all tests using "gradle testAll"
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up the processing of TopicDeletionStopReplicaResponseReceived events on 
> the controller
> 
>
> Key: KAFKA-6630
> URL: https://issues.apache.org/jira/browse/KAFKA-6630
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> We find in a large cluster with many partition replicas, it takes a long time 
> to successfully delete a topic. 
> Root cause:
> Further analysis shows that for a topic with N replicas, the controller 
> receives all the N StopReplicaResponses from brokers within a short time, 
> however sequentially handling all the N 
> TopicDeletionStopReplicaResponseReceived events one by one takes a long time.
> Specifically the functions triggered while handling every single 
> TopicDeletionStopReplicaResponseReceived event include:
> TopicDeletionStopReplicaResponseReceived.process calls 
> TopicDeletionManager.completeReplicaDeletion, which calls 
> TopicDeletionManager.resumeDeletions, which calls several inefficient 
> functions.
> The inefficient functions called inside TopicDeletionManager.resumeDeletions 
> include
> ReplicaStateMachine.areAllReplicasForTopicDeleted
> ReplicaStateMachine.isAtLeastOneReplicaInDeletionStartedState
> ReplicaStateMachine.replicasInState
> Each of the 3 inefficient functions above will iterate through all the 
> replicas in the cluster, and filter out the replicas belonging to a topic. In 
> a large cluster with many replicas, these functions can be quite slow. 
> Total deletion time for a topic becomes long in single threaded controller 
> processing model:
> Since the controller needs to sequentially process the queued 
> TopicDeletionStopReplicaResponseReceived events, if the time cost to process 
> one event is t, the total time to process all events for all replicas of a 
> topic is N * t.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6400) Consider setting default cache size to zero in Kafka Streams

2018-03-08 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392078#comment-16392078
 ] 

Matthias J. Sax commented on KAFKA-6400:


Yes. CACHE_MAX_BYTES_BUFFERING_CONFIG refers to the config. Note, that some 
people were also confuse that they did not see a single result for windowed 
aggregations if you disable caching. Thus, it might also be "bad" to set cache 
size back to zero by default. Before we make any change, we need to discuss the 
impact in detail.

> Consider setting default cache size to zero in Kafka Streams
> 
>
> Key: KAFKA-6400
> URL: https://issues.apache.org/jira/browse/KAFKA-6400
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Priority: Minor
>
> Since the introduction of record caching in Kafka Streams DSL, we see regular 
> reports/questions of first times users about "Kafka Streams does not emit 
> anything" or "Kafka Streams loses messages". Those report are subject to 
> record caching but no bugs and indicate bad user experience.
> We might consider setting the default cache size to zero to avoid those 
> issues and improve the experience for first time users. This hold especially 
> for simple word-count-demos (Note, many people don't copy out example 
> word-count but build their own first demo app.)
> Remark: before we had caching, many users got confused about our update 
> semantics and that we emit an output record for each input record for 
> windowed aggregation (ie, please give me the "final" result"). Thus, we need 
> to consider this and judge with care to not go "forth and back" with default 
> user experience -- we did have less questions about this behavior lately.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6399) Consider reducing "max.poll.interval.ms" default for Kafka Streams

2018-03-08 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392079#comment-16392079
 ] 

Matthias J. Sax commented on KAFKA-6399:


\cc [~vvcephei] [~bbejeck] [~guozhang] [~damianguy] What is your take on this?

> Consider reducing "max.poll.interval.ms" default for Kafka Streams
> --
>
> Key: KAFKA-6399
> URL: https://issues.apache.org/jira/browse/KAFKA-6399
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Khaireddine Rezgui
>Priority: Minor
>
> In Kafka {{0.10.2.1}} we change the default value of 
> {{max.poll.intervall.ms}} for Kafka Streams to {{Integer.MAX_VALUE}}. The 
> reason was that long state restore phases during rebalance could yield 
> "rebalance storms" as consumers drop out of a consumer group even if they are 
> healthy as they didn't call {{poll()}} during state restore phase.
> In version {{0.11}} and {{1.0}} the state restore logic was improved a lot 
> and thus, now Kafka Streams does call {{poll()}} even during restore phase. 
> Therefore, we might consider setting a smaller timeout for 
> {{max.poll.intervall.ms}} to detect bad behaving Kafka Streams applications 
> (ie, targeting user code) that don't make progress any more during regular 
> operations.
> The open question would be, what a good default might be. Maybe the actual 
> consumer default of 30 seconds might be sufficient. During one {{poll()}} 
> roundtrip, we would only call {{restoreConsumer.poll()}} once and restore a 
> single batch of records. This should take way less time than 30 seconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6631) Kafka Streams - Rebalancing exception in Kafka 1.0.0

2018-03-08 Thread Alexander Ivanichev (JIRA)
Alexander Ivanichev created KAFKA-6631:
--

 Summary: Kafka Streams - Rebalancing exception in Kafka 1.0.0
 Key: KAFKA-6631
 URL: https://issues.apache.org/jira/browse/KAFKA-6631
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0
 Environment: Container Linux by CoreOS 1576.5.0
Reporter: Alexander Ivanichev


 
In Kafka Streams 1.0.0, we saw a strange rebalance error, our stream app 
performs window based aggregations, sometimes on start when all stream workers  
join the app just crash, however if we enable only one worker than it works 
fine, sometime 2 workers work just fine, but when third join the app crashes 
again, some critical issue with rebalance.
{code:java}
018-03-08T18:51:01.226243000Z org.apache.kafka.common.KafkaException: 
Unexpected error from SyncGroup: The server experienced an unexpected error 
when processing the request
2018-03-08T18:51:01.226557000Z at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:566)
2018-03-08T18:51:01.22686Z at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:539)
2018-03-08T18:51:01.227328000Z at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
2018-03-08T18:51:01.22763Z at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
2018-03-08T18:51:01.228152000Z at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
2018-03-08T18:51:01.228449000Z at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
2018-03-08T18:51:01.228897000Z at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
2018-03-08T18:51:01.229196000Z at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506)
2018-03-08T18:51:01.229673000Z at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
2018-03-08T18:51:01.229971000Z at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)
2018-03-08T18:51:01.230436000Z at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
2018-03-08T18:51:01.230749000Z at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174)
2018-03-08T18:51:01.231065000Z at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
2018-03-08T18:51:01.231584000Z at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
2018-03-08T18:51:01.231911000Z at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
2018-03-08T18:51:01.23219Z at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138)
2018-03-08T18:51:01.232643000Z at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
2018-03-08T18:51:01.233121000Z at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
2018-03-08T18:51:01.233409000Z at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
2018-03-08T18:51:01.23372Z at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
2018-03-08T18:51:01.234196000Z at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
2018-03-08T18:51:01.234655000Z org.apache.kafka.common.KafkaException: 
Unexpected error from SyncGroup: The server experienced an unexpected error 
when processing the request
2018-03-08T18:51:01.234972000Z exception in thread, closing process
2018-03-08T18:51:01.23550Z at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:566)
2018-03-08T18:51:01.235839000Z at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:539)
2018-03-08T18:51:01.236336000Z at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
2018-03-08T18:51:01.236603000Z at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
2018-03-08T18:51:01.236889000Z at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
2018

[jira] [Commented] (KAFKA-6399) Consider reducing "max.poll.interval.ms" default for Kafka Streams

2018-03-08 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392105#comment-16392105
 ] 

Guozhang Wang commented on KAFKA-6399:
--

I'm +1 on changing this config's default from Integer.MAX_VALUE, but I'm on the 
fence for setting the default to 30 seconds. The reason is that in Streams, it 
is quite common to have stateful processing such that each record may take some 
time to process. I'd prefer setting the default to larger values, say 5 minute 
than 30 seconds.

> Consider reducing "max.poll.interval.ms" default for Kafka Streams
> --
>
> Key: KAFKA-6399
> URL: https://issues.apache.org/jira/browse/KAFKA-6399
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Khaireddine Rezgui
>Priority: Minor
>
> In Kafka {{0.10.2.1}} we change the default value of 
> {{max.poll.intervall.ms}} for Kafka Streams to {{Integer.MAX_VALUE}}. The 
> reason was that long state restore phases during rebalance could yield 
> "rebalance storms" as consumers drop out of a consumer group even if they are 
> healthy as they didn't call {{poll()}} during state restore phase.
> In version {{0.11}} and {{1.0}} the state restore logic was improved a lot 
> and thus, now Kafka Streams does call {{poll()}} even during restore phase. 
> Therefore, we might consider setting a smaller timeout for 
> {{max.poll.intervall.ms}} to detect bad behaving Kafka Streams applications 
> (ie, targeting user code) that don't make progress any more during regular 
> operations.
> The open question would be, what a good default might be. Maybe the actual 
> consumer default of 30 seconds might be sufficient. During one {{poll()}} 
> roundtrip, we would only call {{restoreConsumer.poll()}} once and restore a 
> single batch of records. This should take way less time than 30 seconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6539) KafkaConsumer endlessly try to connect to a dead broker, ignoring brokers alive

2018-03-08 Thread Simon Trigona (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392181#comment-16392181
 ] 

Simon Trigona commented on KAFKA-6539:
--

I don't currently have any technical details to add, but I experienced a 
similar issue with a producer endlessly trying to produce to a broker that was 
offline despite the other 2 brokers being online.

> KafkaConsumer endlessly try to connect to a dead broker, ignoring brokers 
> alive
> ---
>
> Key: KAFKA-6539
> URL: https://issues.apache.org/jira/browse/KAFKA-6539
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.0.0
> Environment: Java 8
> Brokers on CentOS 7.4
> Consumers on Windows 10
>Reporter: Song Younghwan
>Priority: Major
>  Labels: windows
> Attachments: consumer.log
>
>
> I consider to use Kafka in my company, so currently doing failover test.
> Conditions:
>  * org.apache.kafka:kafka-clients:1.0.0
>  * New consumer using bootstrap.servers, a consumer group and a group 
> coordinator
>  * num. brokers = 3 (id #1, #2, #3)
>  * Topic num. partitions = 3, replication factor = 3
>  * offsets.topic.replication.factor = 3
> Reproduction Step:
>  # Run consumers in the same consumer group, each of them subscribe to a topic
>  # Kill (kill -9) #1, #2 broker simultaneously (only #3 online)
>  # Consumers eventually connect to #3 broker
>  # Start #1, #2 broker again after a while (#1, #2, #3 online)
>  # Kill (kill -9) #2, #3 broker simultaneously (only #1 online)
>  # *{color:#FF}Now consumers endlessly try to connect to #3 broker 
> only{color}*
>  # Start #2 broker again after a while (#1, #2 online)
>  # *{color:#FF}Consumers still blindly try to connect to #3 broker{color}*
> Expectation:
> Consumers successfully connect to #1 broker after step 5.
> Record:
> I attached a consumer log file with TRACE log level. Related events below:
>  * 12:03:13 kills #1, #2 broker simultaneously
>  * 12:03:42 starts #1, #2 broker again
>  * 12:04:01 kills #2, #3 broker simultaneously
>  * 12:04:42 starts #2 broker again
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6627) Console producer default config values override explicitly provided properties

2018-03-08 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392272#comment-16392272
 ] 

huxihx commented on KAFKA-6627:
---

Seems this is similar to what's reported in 
[KAFKA-2526|https://issues.apache.org/jira/browse/KAFKA-2526].

> Console producer default config values override explicitly provided properties
> --
>
> Key: KAFKA-6627
> URL: https://issues.apache.org/jira/browse/KAFKA-6627
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Jason Gustafson
>Priority: Minor
>  Labels: newbie
>
> Some producer properties can be provided through custom parameters (e.g. 
> {{\-\-request-required-acks}}) and explicitly through 
> {{\-\-producer-property}}. At the moment, some of the custom parameters have 
> default values which actually override explicitly provided properties. For 
> example, if you set {{\-\-producer-property acks=all}} when starting the 
> console producer, the argument will be ignored since 
> {{\-\-request-required-acks}} has a default value. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-3978) Cannot truncate to a negative offset (-1) exception at broker startup

2018-03-08 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reassigned KAFKA-3978:
---

Assignee: Dong Lin

> Cannot truncate to a negative offset (-1) exception at broker startup
> -
>
> Key: KAFKA-3978
> URL: https://issues.apache.org/jira/browse/KAFKA-3978
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
> Environment: 3.13.0-87-generic 
>Reporter: Juho Mäkinen
>Assignee: Dong Lin
>Priority: Critical
>  Labels: reliability, startup
>
> During broker startup sequence the broker server.log has this exception. 
> Problem persists after multiple restarts and also on another broker in the 
> cluster.
> {code}
> INFO [Socket Server on Broker 1002], Started 1 acceptor threads 
> (kafka.network.SocketServer)
> INFO [Socket Server on Broker 1002], Started 1 acceptor threads 
> (kafka.network.SocketServer)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [GroupCoordinator 1002]: Starting up. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Starting up. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Startup complete. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Startup complete. 
> (kafka.coordinator.GroupCoordinator)
> INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 
> milliseconds. (kafka.coordinator.GroupMetadataManager)
> INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 
> milliseconds. (kafka.coordinator.GroupMetadataManager)
> INFO [ThrottledRequestReaper-Produce], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Produce], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Fetch], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Fetch], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO Will not load MX4J, mx4j-tools.jar is not in the classpath 
> (kafka.utils.Mx4jLoader$)
> INFO Will not load MX4J, mx4j-tools.jar is not in the classpath 
> (kafka.utils.Mx4jLoader$)
> INFO Creating /brokers/ids/1002 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> INFO Creating /brokers/ids/1002 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
> INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
> INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: 
> PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils)
> INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: 
> PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils)
> INFO Kafka version : 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser)
> INFO Kafka commitId : b8642491e78c5a13 
> (org.apache.kafka.common.utils.AppInfoParser)
> INFO [Kafka Server 1002], started (kafka.server.KafkaServer)
> INFO [Kafka Server 1002], started (kafka.server.KafkaServer)
> Error when handling request 
> {controller_id=1004,controller_epoch=1,partition_states=[..REALLY LONG OUTPUT 
> SNIPPED AWAY..], 
> live_leaders=[{id=1004,host=172.16.6.187,port=9092},{id=1003,host=172.16.2.21,port=9092}]}
>  (kafka.server.KafkaApis)
> ERROR java.lang.IllegalArgumentException: Cannot truncate to a negative 
> offset (-1).
> at kafka.log.Log.truncateTo(Log.scala:731)
> at 
> kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:288)
> at 
> kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:280)
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
> at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
> at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(Ha

[jira] [Commented] (KAFKA-6335) SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails intermittently

2018-03-08 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392410#comment-16392410
 ] 

Ted Yu commented on KAFKA-6335:
---

Sigh - the above link is no longer accessible.

> SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails 
> intermittently
> --
>
> Key: KAFKA-6335
> URL: https://issues.apache.org/jira/browse/KAFKA-6335
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: Manikumar
>Priority: Major
> Fix For: 1.2.0
>
>
> From 
> https://builds.apache.org/job/kafka-pr-jdk9-scala2.12/3045/testReport/junit/kafka.security.auth/SimpleAclAuthorizerTest/testHighConcurrencyModificationOfResourceAcls/
>  :
> {code}
> java.lang.AssertionError: expected acls Set(User:36 has Allow permission for 
> operations: Read from hosts: *, User:7 has Allow permission for operations: 
> Read from hosts: *, User:21 has Allow permission for operations: Read from 
> hosts: *, User:39 has Allow permission for operations: Read from hosts: *, 
> User:43 has Allow permission for operations: Read from hosts: *, User:3 has 
> Allow permission for operations: Read from hosts: *, User:35 has Allow 
> permission for operations: Read from hosts: *, User:15 has Allow permission 
> for operations: Read from hosts: *, User:16 has Allow permission for 
> operations: Read from hosts: *, User:22 has Allow permission for operations: 
> Read from hosts: *, User:26 has Allow permission for operations: Read from 
> hosts: *, User:11 has Allow permission for operations: Read from hosts: *, 
> User:38 has Allow permission for operations: Read from hosts: *, User:8 has 
> Allow permission for operations: Read from hosts: *, User:28 has Allow 
> permission for operations: Read from hosts: *, User:32 has Allow permission 
> for operations: Read from hosts: *, User:25 has Allow permission for 
> operations: Read from hosts: *, User:41 has Allow permission for operations: 
> Read from hosts: *, User:44 has Allow permission for operations: Read from 
> hosts: *, User:48 has Allow permission for operations: Read from hosts: *, 
> User:2 has Allow permission for operations: Read from hosts: *, User:9 has 
> Allow permission for operations: Read from hosts: *, User:14 has Allow 
> permission for operations: Read from hosts: *, User:46 has Allow permission 
> for operations: Read from hosts: *, User:13 has Allow permission for 
> operations: Read from hosts: *, User:5 has Allow permission for operations: 
> Read from hosts: *, User:29 has Allow permission for operations: Read from 
> hosts: *, User:45 has Allow permission for operations: Read from hosts: *, 
> User:6 has Allow permission for operations: Read from hosts: *, User:37 has 
> Allow permission for operations: Read from hosts: *, User:23 has Allow 
> permission for operations: Read from hosts: *, User:19 has Allow permission 
> for operations: Read from hosts: *, User:24 has Allow permission for 
> operations: Read from hosts: *, User:17 has Allow permission for operations: 
> Read from hosts: *, User:34 has Allow permission for operations: Read from 
> hosts: *, User:12 has Allow permission for operations: Read from hosts: *, 
> User:42 has Allow permission for operations: Read from hosts: *, User:4 has 
> Allow permission for operations: Read from hosts: *, User:47 has Allow 
> permission for operations: Read from hosts: *, User:18 has Allow permission 
> for operations: Read from hosts: *, User:31 has Allow permission for 
> operations: Read from hosts: *, User:49 has Allow permission for operations: 
> Read from hosts: *, User:33 has Allow permission for operations: Read from 
> hosts: *, User:1 has Allow permission for operations: Read from hosts: *, 
> User:27 has Allow permission for operations: Read from hosts: *) but got 
> Set(User:36 has Allow permission for operations: Read from hosts: *, User:7 
> has Allow permission for operations: Read from hosts: *, User:21 has Allow 
> permission for operations: Read from hosts: *, User:39 has Allow permission 
> for operations: Read from hosts: *, User:43 has Allow permission for 
> operations: Read from hosts: *, User:3 has Allow permission for operations: 
> Read from hosts: *, User:35 has Allow permission for operations: Read from 
> hosts: *, User:15 has Allow permission for operations: Read from hosts: *, 
> User:16 has Allow permission for operations: Read from hosts: *, User:22 has 
> Allow permission for operations: Read from hosts: *, User:26 has Allow 
> permission for operations: Read from hosts: *, User:11 has Allow permission 
> for operations: Read from hosts: *, User:38 has Allow permission for 
> operations: Read from hosts: *, User:8 has Allow permission for operations: 
> Read from hosts: *, User

[jira] [Commented] (KAFKA-5946) Give connector method parameter better name

2018-03-08 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392411#comment-16392411
 ] 

Ted Yu commented on KAFKA-5946:
---

Thanks for taking it.

> Give connector method parameter better name
> ---
>
> Key: KAFKA-5946
> URL: https://issues.apache.org/jira/browse/KAFKA-5946
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Tanvi Jaywant
>Priority: Major
>  Labels: connector, newbie
>
> During the development of KAFKA-5657, there were several iterations where 
> method call didn't match what the connector parameter actually represents.
> [~ewencp] had used connType as equivalent to connClass because Type wasn't 
> used to differentiate source vs sink.
> [~ewencp] proposed the following:
> {code}
> It would help to convert all the uses of connType to connClass first, then 
> standardize on class == java class, type == source/sink, name == 
> user-specified name.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5943) Reduce dependency on mock in connector tests

2018-03-08 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-5943:
--
Labels: connector  (was: connector mock)

> Reduce dependency on mock in connector tests
> 
>
> Key: KAFKA-5943
> URL: https://issues.apache.org/jira/browse/KAFKA-5943
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Priority: Minor
>  Labels: connector
>
> Currently connector tests make heavy use of mock (easymock, power mock).
> This may hide the real logic behind operations and makes finding bugs 
> difficult.
> We should reduce the use of mocks so that developers can debug connector code 
> using unit tests.
> This would shorten the development cycle for connector.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6399) Consider reducing "max.poll.interval.ms" default for Kafka Streams

2018-03-08 Thread John Roesler (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392493#comment-16392493
 ] 

John Roesler commented on KAFKA-6399:
-

I'm not sure, since I haven't had a lot of time so far to build up 
expectations, but here are a couple of thoughts...

I'm generally a fan of exercising your expectations, thus if you think the loop 
should be faster then 30s, then I'd say to go ahead and set it. If it turns out 
to be wrong, we'll learn something new.

The con to this viewpoint in this case is that potentially a lot of topologies 
are running with the default, and if 30s is too short, it could cause a lot of 
rebalancing. Then each affected person would have to investigate it and find 
out they need to set this config higher, and then tell us so we can adjust the 
default, so the OODA loop isn't very tight.

Plus, the reason to set it lower is to catch runaway applications and attempt 
to recover. So it seems reasonable to ask on what time scale would you be happy 
to see a long-running application detect and recover from runaway code. I think 
in general 5 minutes of backup won't cause too much problems.

So I guess, I'm falling more in the 5 minute camp, since it seems to me that 
it's likely to still help the 80% for whom 5 minutes is fine, without risking a 
lot of shenanigans in case the poll loop takes a little longer than we expect.

> Consider reducing "max.poll.interval.ms" default for Kafka Streams
> --
>
> Key: KAFKA-6399
> URL: https://issues.apache.org/jira/browse/KAFKA-6399
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Khaireddine Rezgui
>Priority: Minor
>
> In Kafka {{0.10.2.1}} we change the default value of 
> {{max.poll.intervall.ms}} for Kafka Streams to {{Integer.MAX_VALUE}}. The 
> reason was that long state restore phases during rebalance could yield 
> "rebalance storms" as consumers drop out of a consumer group even if they are 
> healthy as they didn't call {{poll()}} during state restore phase.
> In version {{0.11}} and {{1.0}} the state restore logic was improved a lot 
> and thus, now Kafka Streams does call {{poll()}} even during restore phase. 
> Therefore, we might consider setting a smaller timeout for 
> {{max.poll.intervall.ms}} to detect bad behaving Kafka Streams applications 
> (ie, targeting user code) that don't make progress any more during regular 
> operations.
> The open question would be, what a good default might be. Maybe the actual 
> consumer default of 30 seconds might be sufficient. During one {{poll()}} 
> roundtrip, we would only call {{restoreConsumer.poll()}} once and restore a 
> single batch of records. This should take way less time than 30 seconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6399) Consider reducing "max.poll.interval.ms" default for Kafka Streams

2018-03-08 Thread Khaireddine Rezgui (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Khaireddine Rezgui reassigned KAFKA-6399:
-

Assignee: (was: Khaireddine Rezgui)

> Consider reducing "max.poll.interval.ms" default for Kafka Streams
> --
>
> Key: KAFKA-6399
> URL: https://issues.apache.org/jira/browse/KAFKA-6399
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Priority: Minor
>
> In Kafka {{0.10.2.1}} we change the default value of 
> {{max.poll.intervall.ms}} for Kafka Streams to {{Integer.MAX_VALUE}}. The 
> reason was that long state restore phases during rebalance could yield 
> "rebalance storms" as consumers drop out of a consumer group even if they are 
> healthy as they didn't call {{poll()}} during state restore phase.
> In version {{0.11}} and {{1.0}} the state restore logic was improved a lot 
> and thus, now Kafka Streams does call {{poll()}} even during restore phase. 
> Therefore, we might consider setting a smaller timeout for 
> {{max.poll.intervall.ms}} to detect bad behaving Kafka Streams applications 
> (ie, targeting user code) that don't make progress any more during regular 
> operations.
> The open question would be, what a good default might be. Maybe the actual 
> consumer default of 30 seconds might be sufficient. During one {{poll()}} 
> roundtrip, we would only call {{restoreConsumer.poll()}} once and restore a 
> single batch of records. This should take way less time than 30 seconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)