[jira] [Commented] (KAFKA-4277) creating ephemeral node already exist

2017-03-13 Thread Wrikken (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15906972#comment-15906972
 ] 

Wrikken commented on KAFKA-4277:


[~junrao]: it was not there several hours later, but I haven't been able to 
catch it red-handed yet. I would assume while Kafka things the session is 
invalid, zookeeper thinks it's active for at least a few fractions of a second 
longer. I'm not terribly good at reading the source: does this start a _new_ 
session rather than trying to continue the existing session?

> creating ephemeral node already exist
> -
>
> Key: KAFKA-4277
> URL: https://issues.apache.org/jira/browse/KAFKA-4277
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Feixiang Yan
>
> I use zookeeper 3.4.6.
> Zookeeper session time out, zkClient try reconnect failed. Then re-establish 
> the session and re-registering broker info in ZK, throws NODEEXISTS Exception.
>  I think it is because the ephemeral node which created by old session has 
> not removed. 
> I read the 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>  of 0.8.1, createEphemeralPathExpectConflictHandleZKBug try create node in a 
> while loop until create success. This can solve the issue. But in 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.10.0.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>   0.10.1 the function removed.
> {noformat}
> [2016-10-07 19:00:32,562] INFO Socket connection established to 
> 10.191.155.238/10.191.155.238:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,563] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,564] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1576b11f9b201bd has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,564] INFO Initiating client connection, 
> connectString=10.191.155.237:21819,10.191.155.238:21819,10.191.155.239:21819/cluster2
>  sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@ae71be2 
> (org.apache.zookeeper.ZooKeeper)
> [2016-10-07 19:00:32,566] INFO Opening socket connection to server 
> 10.191.155.237/10.191.155.237:21819. Will not attempt to authenticate using 
> SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO Socket connection established to 
> 10.191.155.237/10.191.155.237:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO EventThread shut down 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO Session establishment complete on server 
> 10.191.155.237/10.191.155.237:21819, sessionid = 0x1579ecd39c20006, 
> negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,608] INFO re-registering broker info in ZK for broker 3 
> (kafka.server.KafkaHealthcheck$SessionExpireListener)
> [2016-10-07 19:00:32,610] INFO Creating /brokers/ids/3 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,611] INFO Result of znode creation is: NODEEXISTS 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,614] ERROR Error handling event ZkEvent[New session 
> event sent to kafka.server.KafkaHealthcheck$SessionExpireListener@324f1bc] 
> (org.I0Itec.zkclient.ZkEventThread)
> java.lang.RuntimeException: A broker is already registered on the path 
> /brokers/ids/3. This probably indicates that you either have configured a 
> brokerid that is already in use, or else you have shutdown this broker and 
> restarted it faster than the zookeeper timeout so it appears to be 
> re-registering.
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:305)
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:291)
> at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:70)
> at 
> kafka.server.KafkaHealthcheck$SessionExpireListener.handleNewSession(KafkaHealthcheck.scala:104)
> at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-4277) creating ephemeral node already exist

2017-03-13 Thread Wrikken (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15906972#comment-15906972
 ] 

Wrikken edited comment on KAFKA-4277 at 3/13/17 7:53 AM:
-

[~junrao]: it was not there several hours later, but I haven't been able to 
catch it red-handed yet. I would assume while Kafka thinks the session is 
invalid, zookeeper thinks it's active for at least a few fractions of a second 
longer. I'm not terribly good at reading the source: does this start a _new_ 
session rather than trying to continue the existing session?


was (Author: wrikken):
[~junrao]: it was not there several hours later, but I haven't been able to 
catch it red-handed yet. I would assume while Kafka things the session is 
invalid, zookeeper thinks it's active for at least a few fractions of a second 
longer. I'm not terribly good at reading the source: does this start a _new_ 
session rather than trying to continue the existing session?

> creating ephemeral node already exist
> -
>
> Key: KAFKA-4277
> URL: https://issues.apache.org/jira/browse/KAFKA-4277
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Feixiang Yan
>
> I use zookeeper 3.4.6.
> Zookeeper session time out, zkClient try reconnect failed. Then re-establish 
> the session and re-registering broker info in ZK, throws NODEEXISTS Exception.
>  I think it is because the ephemeral node which created by old session has 
> not removed. 
> I read the 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>  of 0.8.1, createEphemeralPathExpectConflictHandleZKBug try create node in a 
> while loop until create success. This can solve the issue. But in 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.10.0.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>   0.10.1 the function removed.
> {noformat}
> [2016-10-07 19:00:32,562] INFO Socket connection established to 
> 10.191.155.238/10.191.155.238:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,563] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,564] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1576b11f9b201bd has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,564] INFO Initiating client connection, 
> connectString=10.191.155.237:21819,10.191.155.238:21819,10.191.155.239:21819/cluster2
>  sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@ae71be2 
> (org.apache.zookeeper.ZooKeeper)
> [2016-10-07 19:00:32,566] INFO Opening socket connection to server 
> 10.191.155.237/10.191.155.237:21819. Will not attempt to authenticate using 
> SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO Socket connection established to 
> 10.191.155.237/10.191.155.237:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO EventThread shut down 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO Session establishment complete on server 
> 10.191.155.237/10.191.155.237:21819, sessionid = 0x1579ecd39c20006, 
> negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,608] INFO re-registering broker info in ZK for broker 3 
> (kafka.server.KafkaHealthcheck$SessionExpireListener)
> [2016-10-07 19:00:32,610] INFO Creating /brokers/ids/3 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,611] INFO Result of znode creation is: NODEEXISTS 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,614] ERROR Error handling event ZkEvent[New session 
> event sent to kafka.server.KafkaHealthcheck$SessionExpireListener@324f1bc] 
> (org.I0Itec.zkclient.ZkEventThread)
> java.lang.RuntimeException: A broker is already registered on the path 
> /brokers/ids/3. This probably indicates that you either have configured a 
> brokerid that is already in use, or else you have shutdown this broker and 
> restarted it faster than the zookeeper timeout so it appears to be 
> re-registering.
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:305)
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:291)
> at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:70)
> at 
> kafka.server.KafkaHealthcheck$SessionExpireListener.handleNewSession(KafkaHealthcheck.scala:104)
> at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-4884) __consumer_offsets topic processing consuming all resources

2017-03-13 Thread Tom Coupland (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Coupland resolved KAFKA-4884.
-
Resolution: Invalid

> __consumer_offsets topic processing consuming all resources
> ---
>
> Key: KAFKA-4884
> URL: https://issues.apache.org/jira/browse/KAFKA-4884
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: Mesos cluster, coreos
>Reporter: Tom Coupland
>
> Since this morning it appears that the processing for the __consumer_offsets 
> topic is consuming all the resources in our test cluster. There are no other 
> messages being dispatch through other topics, yet the brokers are using all 
> their cpu and lot of network.
> A clear sign that the problem is with the special topic, is when I deleted 
> some test topics (leaving three topic remaining, including 
> __consumer_offsets) the network load decreased somewhat, not enough to be 
> fixed, be enough to point the figure firmly in this direction.
> The rate of offsets for the consumer-offsets topic seems overly high. I'm 
> summing the total offset across all 50 partitions and it's on the order of 
> 22000 every ten seconds, dropping to 17000 when I deleted the spare test 
> topics.
> These are time-stamps and total offsets for all partitions summed from before 
> test topic deletion:
> Fri 10 Mar 18:57:38 GMT 2017 
> 114700933 
> Fri 10 Mar 18:57:56 GMT 2017 
> 114727290 
> Fri 10 Mar 18:58:12 GMT 2017 
> 114750030 
> Fri 10 Mar 18:58:31 GMT 2017 
> 114776560
> There is nothing in the broker logs pointing to any errors, in fact, there is 
> little to go on. Attempting to attach a consumer to topic just results in a 
> hanging process.
> It feels like the topic is being looped back on itself, creating offset 
> updates for its own updates or something like that. I'm leaving the cluster 
> up for the weekend so we can continue to diagnose, but it seem's like there 
> must be a bug at the root of this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4829) Improve logging of StreamTask commits

2017-03-13 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15907078#comment-15907078
 ] 

Nicolas Fouché commented on KAFKA-4829:
---

Some comments on how we felt that some logs were missing at INFO level: 
http://mail-archives.apache.org/mod_mbox/kafka-users/201703.mbox/%3ccabqkjkj2ozswjaidar_sggztmpwgguo07iaqp9je5-5yzmr...@mail.gmail.com%3e

> Improve logging of StreamTask commits
> -
>
> Key: KAFKA-4829
> URL: https://issues.apache.org/jira/browse/KAFKA-4829
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Steven Schlansker
>Priority: Minor
>  Labels: user-experience
>
> Currently I see this every commit interval:
> {code}
> 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing 
> task StreamTask 1_31
> 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing 
> task StreamTask 2_31
> {code}
> We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic.
> This means every commit interval we log a few hundred lines of the above
> which is an order of magnitude chattier than anything else in the log
> during normal operations.
> To improve visibility of important messages, we should reduce the chattiness 
> of normal commits and highlight abnormal commits.  An example proposal:
> existing message is fine at TRACE level for diagnostics
> {{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}}
> normal fast case, wrap them all up into one summary line
> {{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}}
> some kind of threshold / messaging in case it doesn't complete quickly or 
> logs an exception
> {{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Doubts regarding KafkaProducer implemetation

2017-03-13 Thread Madhukar Bharti
Hi,

We have three brokers in a cluster with replication factor is 3. We are
using Kafka-0.10.0.1. We see some failures on metadata timeout exceptions
while producing.
We have configured retries=3 and max in flight request=1.
After comparing with the old scala Producer code found that in new Producer
"retries" is not working if there is a failure on meta-data refresh so
an exception is throwing to caller API.

But in older Producer, it was retrying based on configured values and
throwing exception only after all retries completed. Can we achieve this in
new Producer? As we don't see any exception like *failed to produce after 3
retries*.



Regards,
Madhukar Bharti


[jira] [Created] (KAFKA-4889) 2G8lc

2017-03-13 Thread Vamsi Jakkula (JIRA)
Vamsi Jakkula created KAFKA-4889:


 Summary: 2G8lc
 Key: KAFKA-4889
 URL: https://issues.apache.org/jira/browse/KAFKA-4889
 Project: Kafka
  Issue Type: Task
Reporter: Vamsi Jakkula


Creating of an issue using project keys and issue type names using the REST API



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Doubts regarding KafkaProducer implemetation

2017-03-13 Thread Manikumar
Hi,

java producer waits for the metadata of the given topic to be available. We
will get
TimeoutException if we didn't get metadata within max.block.ms. As observed
by you, this
behavior is different from old producer.  "retries" config is not
applicable to metadata fetch.
We can adjust max.block.ms to avoid metadata Timeout exception.


On Mon, Mar 13, 2017 at 4:04 PM, Madhukar Bharti 
wrote:

> Hi,
>
> We have three brokers in a cluster with replication factor is 3. We are
> using Kafka-0.10.0.1. We see some failures on metadata timeout exceptions
> while producing.
> We have configured retries=3 and max in flight request=1.
> After comparing with the old scala Producer code found that in new Producer
> "retries" is not working if there is a failure on meta-data refresh so
> an exception is throwing to caller API.
>
> But in older Producer, it was retrying based on configured values and
> throwing exception only after all retries completed. Can we achieve this in
> new Producer? As we don't see any exception like *failed to produce after 3
> retries*.
>
>
>
> Regards,
> Madhukar Bharti
>


Re: Add to contributor list

2017-03-13 Thread Raja Guru Thirumavalavan
Thank you
Raja Guru T

On Mon, Mar 13, 2017 at 3:24 AM, Guozhang Wang  wrote:

> Done.
>
> Guozhang
>
> On Sun, Mar 12, 2017 at 11:19 AM, Raja Guru Thirumavalavan <
> geeky.rajag...@gmail.com> wrote:
>
> > Hi,,Can you please add me to contributor list.
> > or is there any other way to request.
> > reminder please.
> >
> > Thanks
> > Raja Guru T
> >
> > On Mon, Mar 6, 2017 at 1:13 AM, Raja Guru Thirumavalavan <
> > geeky.rajag...@gmail.com> wrote:
> >
> > > Hi,
> > > Please add me to contributor list.
> > > JIRA user name - geeky.rajaguru
> > >
> > > Thanks.
> > >
> > > Regards,
> > > Raja Guru T
> > >
> >
>
>
>
> --
> -- Guozhang
>


[jira] [Commented] (KAFKA-4835) Allow users control over repartitioning

2017-03-13 Thread Michal Borowiecki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15907523#comment-15907523
 ] 

Michal Borowiecki commented on KAFKA-4835:
--

My point above was that if the customerRef (which is what we partition by) was 
part of the key (not the whole key) then we'd still need to modify the key for 
the purpose of the join operation.
We'd need to do that for both streams, even though they would both be 
partitioned by the same part of the key, hence the re-partitioning (forced 
automatically by kafka streams) would be totally unnecessary.

In more generic terms, I think this can be a common use case. Let' consider it 
using DDD concepts. We have an aggregate comprised of multiple entities. We 
send messages for each entity (not the whole aggregate) but to ensure 
sequential processing for entities belonging to the same aggregate, the 
messages are partitioned by the aggregate id. The entity id is still important, 
especially for compacted topics it would be needed for deletion markers, as the 
key is all there is in that case. Hence it comes naturally to compose the 
message key as ::
Then, if you want to join two such streams by aggregate id, you should be able 
to do it without repartitioning (since both partitioned by the aggregate-id 
part of the msg key). However, since joins are only supported on the whole msg 
key, you're forced to re-map the key to just  prior to the join 
which in turn currently forces repartitioning.

> Allow users control over repartitioning
> ---
>
> Key: KAFKA-4835
> URL: https://issues.apache.org/jira/browse/KAFKA-4835
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Michal Borowiecki
>  Labels: needs-kip
>
> From 
> https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030
> ...it would be good to provide users more control over the repartitioning. 
> My use case is as follows (unrelated bits omitted for brevity):
> {code}
>   KTable loggedInCustomers = builder
>   .stream("customerLogins")
>   .groupBy((key, activity) -> 
>   activity.getCustomerRef())
>   .reduce((first,second) -> second, loginStore());
>   
>   builder
>   .stream("balanceUpdates")
>   .map((key, activity) -> new KeyValue<>(
>   activity.getCustomerRef(),
>   activity))
>   .join(loggedInCustomers, (activity, session) -> ...
>   .to("sessions");
> {code}
> Both "groupBy" and "map" in the underlying implementation set the 
> repartitionRequired flag (since the key changes), and the aggregation/join 
> that follows will create the repartitioned topic.
> However, in our case I know that both input streams are already partitioned 
> by the customerRef value, which I'm mapping into the key (because it's 
> required by the join operation).
> So there are 2 unnecessary intermediate topics created with their associated 
> overhead, while the ultimate goal is simply to do a join on a value that we 
> already use to partition the original streams anyway.
> (Note, we don't have the option to re-implement the original input streams to 
> make customerRef the message key.)
> I think it would be better to allow the user to decide (from their knowledge 
> of the incoming streams) whether a repartition is mandatory on aggregation 
> and join operations (overloaded version of the methods with the 
> repartitionRequired flag exposed maybe?)
> An alternative would be to allow users to perform a join on a value other 
> than the key (a keyValueMapper parameter to join, like the one used for joins 
> with global tables), but I expect that to be more involved and error-prone to 
> use for people who don't understand the partitioning requirements well 
> (whereas it's safe for global tables).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[DISCUSS] KIP-132: Augment KStream.print to allow extra parameters in the printed string

2017-03-13 Thread Marc Juchli
Dear all,

The following describes KIP-132, which I just created. See:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-132+-+Augment+KStream.print+to+allow+extra+parameters+in+the+printed+string

Motivation

As for now, KStream#print leads to a predefined output where key and value are
printed with comma separation.
KAFKA-4830  suggests to
extend print in a way that it takes KeyValueMapper as a parameter.
This will allow a user to change outputs according to the users demand.
Public Interfaces

The affected interface is KStream, which needs to be extended with another
overloaded version of print:

void print(final Serde keySerde,
   final Serde valSerde,
   final String streamName,
   final KeyValueMapper mapper);

Proposed Changes

See pull request GH-2669 .
This PR contains a discussion regarding KAFKA-4830
 as well as KAFKA-4772
.

Compatibility, Deprecation, and Migration Plan

The extension of print will not introduce compatibility issues – we can
maintain the current output by keeping the current output format as a
default (if mapper was not set):

if(mapper == null) {
printStream.println("[" + streamName + "]: " + keyToPrint + " , "
+ valueToPrint);
} else {
printStream.println("[" + streamName + "]: " +
mapper.apply(keyToPrint, valueToPrint));
}



Kind regards,
Marc


[jira] [Updated] (KAFKA-4128) Kafka broker losses messages when zookeeper session times out

2017-03-13 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4128:
---
Labels: reliability  (was: )

> Kafka broker losses messages when zookeeper session times out
> -
>
> Key: KAFKA-4128
> URL: https://issues.apache.org/jira/browse/KAFKA-4128
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1, 0.9.0.1
>Reporter: Mazhar Shaikh
>Priority: Critical
>  Labels: reliability
> Attachments: log.tar.gz
>
>
> Pumping 30k msgs/second after some 6-8 hrs of run below logs are printed and 
> the messages are lost.
> [More than 5k messages are lost on every partitions]
> Below are few logs:
> [2016-09-06 05:00:42,595] INFO Client session timed out, have not heard from 
> server in 20903ms for sessionid 0x256fabec47c0003, closing socket connection 
> and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:42,696] INFO zookeeper state changed (Disconnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-09-06 05:00:42,753] INFO Partition [topic,62] on broker 4: Shrinking 
> ISR for partition [topic,62] from 4,2 to 4 (kafka.cluster.Partition)
> [2016-09-06 05:00:43,585] INFO Opening socket connection to server 
> b0/169.254.2.1:2182. Will not attempt to authenticate using SASL (unknown 
> error) (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:43,586] INFO Socket connection established to 
> b0/169.254.2.1:2182, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:43,587] INFO Unable to read additional data from server 
> sessionid 0x256fabec47c0003, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:44,644] INFO Opening socket connection to server 
> b1/169.254.2.116:2181. Will not attempt to authenticate using SASL (unknown 
> error) (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:44,651] INFO Socket connection established to 
> b1/169.254.2.116:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:44,658] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-09-06 05:00:44,659] INFO Initiating client connection, 
> connectString=b2.broker.com:2181,b1.broker.com:2181,zoo3.broker.com:2182 
> sessionTimeout=15000 watcher=org.I0Itec.zkclient.ZkClient@37b8e86a 
> (org.apache.zookeeper.ZooKeeper)
> [2016-09-06 05:00:44,659] INFO Unable to reconnect to ZooKeeper service, 
> session 0x256fabec47c0003 has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:44,661] INFO EventThread shut down 
> (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:44,662] INFO Opening socket connection to server 
> b2/169.254.2.216:2181. Will not attempt to authenticate using SASL (unknown 
> error) (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:44,662] INFO Socket connection established to 
> b2/169.254.2.216:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:44,665] ERROR Error handling event ZkEvent[New session 
> event sent to 
> kafka.controller.KafkaController$SessionExpirationListener@33b7dedc] 
> (org.I0Itec.zkclient.ZkEventThread)
> java.lang.IllegalStateException: Kafka scheduler has not been started
> at kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:114)
> at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86)
> at 
> kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350)
> at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1108)
> at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1107)
> at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1107)
> at kafka.utils.Utils$.inLock(Utils.scala:535)
> at 
> kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1107)
> at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> [2016-09-06 05:00:44,666] INFO re-registering broker info in ZK for broker 4 
> (kafka.server.KafkaHealthcheck)
> [2016-09-06 05:00:44,801] INFO Session establishment complete on server 
> b2/169.254.2.216:2181, sessionid = 0x256fabec47c0005, negotiated timeout = 
> 15000 (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:44,802] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-09-06 05:00:44,812] INFO Registered broker 4 at path /brokers/ids/4 
> with address b5.broker.com:9

[jira] [Created] (KAFKA-4890) State directory being deleted when another thread holds the lock

2017-03-13 Thread Damian Guy (JIRA)
Damian Guy created KAFKA-4890:
-

 Summary: State directory being deleted when another thread holds 
the lock
 Key: KAFKA-4890
 URL: https://issues.apache.org/jira/browse/KAFKA-4890
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Damian Guy
 Attachments: logs.tar.gz

Looks like a state directory is being cleaned up when another thread already 
has the lock:

{{2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager 
- task [0_6] Registering state store perGameScoreStore to its state manager

2017-03-12 20:40:21 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_6 for task 0_6

2017-03-12 20:40:22 [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - 
User provided listener 
org.apache.kafka.streams.processor.internals.StreamThread$1 for group 
fireflyProd failed on partition assignment
org.apache.kafka.streams.errors.ProcessorStateException: Error while executing 
put key \x00\x00\x00\x00}\xA2\x9E\x9D\x05\xF6\x95\xAB\x01\x12dayOfGame and 
value \x00\x00\x00\x00z\x00\x00\x00\x00\x00\x80G@ from store perGameScoreStore
at 
org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:248)
at 
org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:65)
at 
org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:156)
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:230)
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193)
at 
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
at 
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:152)
at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:39)
at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
at 
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
at 
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141)
at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
at 
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
 at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: org.rocksdb.RocksDBException: `
at org.rocksdb.RocksDB.put(Native Method)
at org.rocksdb.RocksDB.put(RocksDB.java:488)
at 
org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:246)
... 27 common frames omitted}}




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-13 Thread Jun Rao
Hi, Dong,

Thanks for the updated KIP. Some more comments below.

10. For the .move log, do we perform any segment deletion (based on
retention) or log cleaning (if a compacted topic)? Or do we only enable
that after the swap?

11. kafka-reassign-partitions.sh
11.1 If all reassigned replicas are in the current broker and only the log
directories have changed, we can probably optimize the tool to not trigger
partition reassignment through the controller and only
send ChangeReplicaDirRequest.
11.2 If ChangeReplicaDirRequest specifies a replica that's not created yet,
could the broker just remember that in memory and create the replica when
the creation is requested? This way, when doing cluster expansion, we can
make sure that the new replicas on the new brokers are created in the right
log directory in the first place. We can also avoid the tool having to keep
issuing ChangeReplicaDirRequest in response to ReplicaNotAvailableException.
11.3 Do we need an option in the tool to specify intra.broker.
throttled.rate?

12. DescribeDirsRequest
12.1 In other requests like CreateTopicRequest, we return an empty list in
the response for an empty input list. If the input list is null, we return
everything. We should probably follow the same convention here.
12.2 Do we need the topics field? Since the request is about log dirs, it
makes sense to specify the log dirs. But it's weird to specify topics.
12.3 DescribeDirsResponsePartition: Should we include firstOffset and
nextOffset in the response? That could be useful to track the progress of
the movement.

13. ChangeReplicaDirResponse: Do we need error code at both levels?

14. num.replica.move.threads: Does it default to # log dirs?

Thanks,

Jun

On Thu, Mar 9, 2017 at 7:04 PM, Dong Lin  wrote:

> I just made one correction in the KIP. If broker receives
> ChangeReplicaDirRequest and the replica hasn't been created there, the
> broker will respond ReplicaNotAvailableException.
> The kafka-reassignemnt-partitions.sh will need to re-send
> ChangeReplicaDirRequest in this case in order to wait for controller to
> send LeaderAndIsrRequest to broker. The previous approach of creating an
> empty directory seems hacky.
>
>
>
>
> On Thu, Mar 9, 2017 at 6:33 PM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > Thanks for your comments! I have updated the KIP to address your
> comments.
> > Please see my reply inline.
> >
> > Can you let me know if the latest KIP has addressed your comments?
> >
> > On Wed, Mar 8, 2017 at 9:56 PM, Jun Rao  wrote:
> >
> >> Hi, Dong,
> >>
> >> Thanks for the reply.
> >>
> >> 1.3 So the thread gets the lock, checks if caught up and releases the
> lock
> >> if not? Then, in the case when there is continuous incoming data, the
> >> thread may never get a chance to swap. One way to address this is when
> the
> >> thread is getting really close in catching up, just hold onto the lock
> >> until the thread fully catches up.
> >>
> >
> > Yes, that was my original solution. I see your point that the lock may
> not
> > be fairly assigned to ReplicaMoveThread and RequestHandlerThread when
> there
> > is frequent incoming requets. You solution should address the problem
> and I
> > have updated the KIP to use it.
> >
> >
> >>
> >> 2.3 So, you are saying that the partition reassignment tool can first
> send
> >> a ChangeReplicaDirRequest to relevant brokers to establish the log dir
> for
> >> replicas not created yet, then trigger the partition movement across
> >> brokers through the controller? That's actually a good idea. Then, we
> can
> >> just leave LeaderAndIsrRequest as it is.
> >
> >
> > Yes, that is what I plan to do. If broker receives a
> > ChangeReplicaDirRequest while it is not leader or follower of the
> > partition, the broker will create an empty Log instance (i.e. a directory
> > named topicPartition) in the destination log directory so that the
> replica
> > will be placed there when broker receives LeaderAndIsrRequest from the
> > broker. The broker should clean up empty those Log instances on startup
> > just in case a ChangeReplicaDirRequest was mistakenly sent to a broker
> that
> > was not meant to be follower/leader of the partition..
> >
> >
> >> Another thing related to
> >> ChangeReplicaDirRequest.
> >> Since this request may take long to complete, I am not sure if we should
> >> wait for the movement to complete before respond. While waiting for the
> >> movement to complete, the idle connection may be killed or the client
> may
> >> be gone already. An alternative is to return immediately and add a new
> >> request like CheckReplicaDirRequest to see if the movement has
> completed.
> >> The tool can take advantage of that to check the status.
> >>
> >
> > I agree with your concern and solution. We need request to query the
> > partition -> log_directory mapping on the broker. I have updated the KIP
> to
> > remove need for ChangeReplicaDirRequestPurgatory.
> > Instead, kafka-reassignemnt-partitions.sh will send DescribeDirsRequest
> > 

[jira] [Updated] (KAFKA-4890) State directory being deleted when another thread holds the lock

2017-03-13 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4890:
--
Description: 
Looks like a state directory is being cleaned up when another thread already 
has the lock:

{code}
2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager - 
task [0_6] Registering state store perGameScoreStore to its state manager

2017-03-12 20:40:21 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_6 for task 0_6

2017-03-12 20:40:22 [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - 
User provided listener 
org.apache.kafka.streams.processor.internals.StreamThread$1 for group 
fireflyProd failed on partition assignment
org.apache.kafka.streams.errors.ProcessorStateException: Error while executing 
put key \x00\x00\x00\x00}\xA2\x9E\x9D\x05\xF6\x95\xAB\x01\x12dayOfGame and 
value \x00\x00\x00\x00z\x00\x00\x00\x00\x00\x80G@ from store perGameScoreStore
at 
org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:248)
at 
org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:65)
at 
org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:156)
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:230)
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193)
at 
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
at 
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:152)
at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:39)
at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
at 
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
at 
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141)
at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
at 
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
 at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: org.rocksdb.RocksDBException: `
at org.rocksdb.RocksDB.put(Native Method)
at org.rocksdb.RocksDB.put(RocksDB.java:488)
at 
org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:246)
... 27 common frames omitted
{code}

Also 
{code}
2017-03-12 20:46:58 [StreamThread-4] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_2 for task 0_2

...
2017-03-12 20:47:02 [StreamThread-2] ERROR o.a.k.s.p.i.StreamThread - 
stream-thread [StreamThread-2] Failed to commit StandbyTask 0_2 state:
org.apache.kafka.streams.errors.ProcessorStateException: task [0_2] Failed to 
flush state store lifetimeScoreStore
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:325)
at 
org.apache.kafka.streams.processor.internals.

[jira] [Created] (KAFKA-4891) kafka.request.logger TRACE regression

2017-03-13 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-4891:
---

 Summary: kafka.request.logger TRACE regression
 Key: KAFKA-4891
 URL: https://issues.apache.org/jira/browse/KAFKA-4891
 Project: Kafka
  Issue Type: Bug
Reporter: Onur Karaman
Assignee: Onur Karaman


Here's what kafka-request.log shows with {{kafka.request.logger}} set to TRACE:
{code}
[2017-03-13 10:06:24,402] TRACE Completed 
request:org.apache.kafka.common.requests.RequestHeader@2f99ef87 -- 
org.apache.kafka.common.requests.LeaderAndIsrRequest@34c40fc5 from connection 
127.0.0.1:9090-127.0.0.1:50969;totalTime:125,requestQueueTime:0,localTime:124,remoteTime:0,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
 (kafka.request.logger)
[2017-03-13 10:06:24,406] TRACE Completed 
request:org.apache.kafka.common.requests.RequestHeader@133b1de8 -- 
org.apache.kafka.common.requests.UpdateMetadataRequest@795826d from connection 
127.0.0.1:9090-127.0.0.1:50969;totalTime:3,requestQueueTime:0,localTime:2,remoteTime:0,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
 (kafka.request.logger)
[2017-03-13 10:06:24,943] TRACE Completed 
request:org.apache.kafka.common.requests.RequestHeader@76a9acaa -- 
org.apache.kafka.common.requests.FetchRequest@33ab3c1b from connection 
127.0.0.1:9090-127.0.0.1:50976;totalTime:522,requestQueueTime:0,localTime:13,remoteTime:506,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
 (kafka.request.logger)
{code}

Both the headers and requests have regressed to just show object ids instead of 
their contents from their underlying structs. I'm guessing this regression came 
from commit 
[fc1cfe475e8ae8458d8ddf119ce18d0c64653a70|https://github.com/apache/kafka/commit/fc1cfe475e8ae8458d8ddf119ce18d0c64653a70]

The logs should look something like this:
{code}
[2017-03-13 10:14:36,754] TRACE Completed 
request:{api_key=4,api_version=0,correlation_id=2,client_id=0} -- 
{controller_id=0,controller_epoch=1,partition_states=[{topic=t,partition=5,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=4,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=3,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=2,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=1,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=0,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=7,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=6,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]}],live_leaders=[{id=0,host=localhost,port=9090},{id=1,host=localhost,port=9091}]}
 from connection 
127.0.0.1:9090-127.0.0.1:51349;totalTime:155,requestQueueTime:0,localTime:155,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
 (kafka.request.logger)
[2017-03-13 10:14:36,758] TRACE Completed 
request:{api_key=6,api_version=3,correlation_id=3,client_id=0} -- 
{controller_id=0,controller_epoch=1,partition_states=[{topic=t,partition=5,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=4,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=3,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=2,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=1,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=0,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=7,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=6,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]}],live_brokers=[{id=1,end_points=[{port=9091,host=localhost,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null},{id=0,end_points=[{port=9090,host=localhost,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null}]}
 from connection 
127.0.0.1:9090-127.0.0.1:51349;totalTime:3,requestQueueTime:1,localTime:2,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
 (kafka.request.logger)
[2017-03-13 10:14:37,297] TRACE Completed 
request:{api_key=1,api_version=3,correlation_id=0,client_id=ReplicaFetcherThread-0-0}
 -- 
{replica_id=1,max_wait_time=500,min_bytes=1,max_bytes=10485760,topics=[{topic=t,partitions=[{parti

[GitHub] kafka pull request #2677: MINOR: set trace logging for zookeeper upgrade tes...

2017-03-13 Thread apurvam
GitHub user apurvam opened a pull request:

https://github.com/apache/kafka/pull/2677

MINOR: set trace logging for zookeeper upgrade test

This adds logging which will hopefully help root cause 
https://issues.apache.org/jira/browse/KAFKA-4574.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apurvam/kafka 
minor-set-trace-logging-for-zookeeper-upgrade-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2677.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2677


commit f643d8ddc2b9c9952fa2cd62ba5e3dadac092562
Author: Apurva Mehta 
Date:   2017-03-11T23:43:48Z

MINOR: configure TRACE logging for zookeeper upgrade test

We need trace logging for the controller and state change log to debug
KAFKA-4574.

commit f9c8c51cfdf7eae1842c441ece56d65738cfdfc8
Author: Apurva Mehta 
Date:   2017-03-12T00:22:52Z

More experiments

commit 88314b97a774d1c629409ca0e46f596e8f4e7a11
Author: Apurva Mehta 
Date:   2017-03-12T00:29:25Z

Add the trace directory for collection




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4574) Transient failure in ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade with security_protocol = SASL_PLAINTEXT, SSL

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15907916#comment-15907916
 ] 

ASF GitHub Bot commented on KAFKA-4574:
---

GitHub user apurvam opened a pull request:

https://github.com/apache/kafka/pull/2677

MINOR: set trace logging for zookeeper upgrade test

This adds logging which will hopefully help root cause 
https://issues.apache.org/jira/browse/KAFKA-4574.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apurvam/kafka 
minor-set-trace-logging-for-zookeeper-upgrade-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2677.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2677


commit f643d8ddc2b9c9952fa2cd62ba5e3dadac092562
Author: Apurva Mehta 
Date:   2017-03-11T23:43:48Z

MINOR: configure TRACE logging for zookeeper upgrade test

We need trace logging for the controller and state change log to debug
KAFKA-4574.

commit f9c8c51cfdf7eae1842c441ece56d65738cfdfc8
Author: Apurva Mehta 
Date:   2017-03-12T00:22:52Z

More experiments

commit 88314b97a774d1c629409ca0e46f596e8f4e7a11
Author: Apurva Mehta 
Date:   2017-03-12T00:29:25Z

Add the trace directory for collection




> Transient failure in ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade 
> with security_protocol = SASL_PLAINTEXT, SSL
> ---
>
> Key: KAFKA-4574
> URL: https://issues.apache.org/jira/browse/KAFKA-4574
> Project: Kafka
>  Issue Type: Test
>  Components: system tests
>Reporter: Shikhar Bhushan
>Assignee: Apurva Mehta
>
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-12-29--001.1483003056--apache--trunk--dc55025/report.html
> {{ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade}} failed with these 
> {{security_protocol}} parameters 
> {noformat}
> 
> test_id:
> kafkatest.tests.core.zookeeper_security_upgrade_test.ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol=SASL_PLAINTEXT
> status: FAIL
> run time:   3 minutes 44.094 seconds
> 1 acked message did not make it to the Consumer. They are: [5076]. We 
> validated that the first 1 of these missing messages correctly made it into 
> Kafka's data files. This suggests they were lost on their way to the consumer.
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 123, in run
> data = self.run_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
>  line 176, in run_test
> return self.test_context.function(self.test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/mark/_mark.py",
>  line 321, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py",
>  line 117, in test_zk_security_upgrade
> self.run_produce_consume_validate(self.run_zk_migration)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 101, in run_produce_consume_validate
> self.validate()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 163, in validate
> assert success, msg
> AssertionError: 1 acked message did not make it to the Consumer. They are: 
> [5076]. We validated that the first 1 of these missing messages correctly 
> made it into Kafka's data files. This suggests they were lost on their way to 
> the consumer.
> {noformat}
> {noformat}
> 
> test_id:
> kafkatest.tests.core.zookeeper_security_upgrade_test.ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol=SSL
> status: FAIL
> run time:   3 minutes 50.578 seconds
> 1 acked message did not make it to the Consumer. They are: [3559]. We 
> validated that the first 1 of these missing messages correctly made it into 
> Kafka's data files. This suggests they were lost on their way to the consumer.
> Traceback (most rece

[GitHub] kafka pull request #2678: KAFKA-4891 kafka.request.logger TRACE regression

2017-03-13 Thread onurkaraman
GitHub user onurkaraman opened a pull request:

https://github.com/apache/kafka/pull/2678

KAFKA-4891 kafka.request.logger TRACE regression

Both the headers and requests have regressed to just show object ids 
instead of their contents from their underlying structs. I'm guessing this 
regression came from commit 
[fc1cfe475e8ae8458d8ddf119ce18d0c64653a70](https://github.com/apache/kafka/commit/fc1cfe475e8ae8458d8ddf119ce18d0c64653a70)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/onurkaraman/kafka KAFKA-4891

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2678.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2678


commit f0303949e8a104813272cc911abd3231b0398368
Author: Onur Karaman 
Date:   2017-03-13T17:46:32Z

KAFKA-4891 kafka.request.logger TRACE regression




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4891) kafka.request.logger TRACE regression

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15911762#comment-15911762
 ] 

ASF GitHub Bot commented on KAFKA-4891:
---

GitHub user onurkaraman opened a pull request:

https://github.com/apache/kafka/pull/2678

KAFKA-4891 kafka.request.logger TRACE regression

Both the headers and requests have regressed to just show object ids 
instead of their contents from their underlying structs. I'm guessing this 
regression came from commit 
[fc1cfe475e8ae8458d8ddf119ce18d0c64653a70](https://github.com/apache/kafka/commit/fc1cfe475e8ae8458d8ddf119ce18d0c64653a70)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/onurkaraman/kafka KAFKA-4891

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2678.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2678


commit f0303949e8a104813272cc911abd3231b0398368
Author: Onur Karaman 
Date:   2017-03-13T17:46:32Z

KAFKA-4891 kafka.request.logger TRACE regression




> kafka.request.logger TRACE regression
> -
>
> Key: KAFKA-4891
> URL: https://issues.apache.org/jira/browse/KAFKA-4891
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> Here's what kafka-request.log shows with {{kafka.request.logger}} set to 
> TRACE:
> {code}
> [2017-03-13 10:06:24,402] TRACE Completed 
> request:org.apache.kafka.common.requests.RequestHeader@2f99ef87 -- 
> org.apache.kafka.common.requests.LeaderAndIsrRequest@34c40fc5 from connection 
> 127.0.0.1:9090-127.0.0.1:50969;totalTime:125,requestQueueTime:0,localTime:124,remoteTime:0,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger)
> [2017-03-13 10:06:24,406] TRACE Completed 
> request:org.apache.kafka.common.requests.RequestHeader@133b1de8 -- 
> org.apache.kafka.common.requests.UpdateMetadataRequest@795826d from 
> connection 
> 127.0.0.1:9090-127.0.0.1:50969;totalTime:3,requestQueueTime:0,localTime:2,remoteTime:0,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger)
> [2017-03-13 10:06:24,943] TRACE Completed 
> request:org.apache.kafka.common.requests.RequestHeader@76a9acaa -- 
> org.apache.kafka.common.requests.FetchRequest@33ab3c1b from connection 
> 127.0.0.1:9090-127.0.0.1:50976;totalTime:522,requestQueueTime:0,localTime:13,remoteTime:506,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger)
> {code}
> Both the headers and requests have regressed to just show object ids instead 
> of their contents from their underlying structs. I'm guessing this regression 
> came from commit 
> [fc1cfe475e8ae8458d8ddf119ce18d0c64653a70|https://github.com/apache/kafka/commit/fc1cfe475e8ae8458d8ddf119ce18d0c64653a70]
> The logs should look something like this:
> {code}
> [2017-03-13 10:14:36,754] TRACE Completed 
> request:{api_key=4,api_version=0,correlation_id=2,client_id=0} -- 
> {controller_id=0,controller_epoch=1,partition_states=[{topic=t,partition=5,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=4,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=3,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=2,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=1,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=0,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=7,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=6,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]}],live_leaders=[{id=0,host=localhost,port=9090},{id=1,host=localhost,port=9091}]}
>  from connection 
> 127.0.0.1:9090-127.0.0.1:51349;totalTime:155,requestQueueTime:0,localTime:155,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger)
> [2017-03-13 10:14:36,758] TRACE Completed 
> request:{api_key=6,api_version=3,correlation_id=3,client_id=0} -- 
> {controller_id=0,controller_epoch=1,partition_states=[{topic=t,partition=5,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=4,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{

Re: [DISCUSS] KIP-82 - Add Record Headers

2017-03-13 Thread radai
the common "stack" we envision at linkedin would consist of (at least) the
following components that add headers to every outgoing request:

1. auditing/"lineage" - appends a header containing "node" (hostname etc),
time (UTC time) and destination (cluster/topic). these accumulate as
requests get mirrored between clusters
2. serialization - sets a header containing a schema identifier to allow
deserialization
3. client-side encryption - would probably set a header identifying the
key/scheme used
4. internal "billing"

there are also several other teams at linkedin that would use headers
(although its unclear yet if via interceptors or by directly manipulating
requests)

if headers are made completely immutable (as the entire request object
currently is) we would end up copying (parts of) every msg 4 times. I
havent benchmarked but this seems like it would have an impact to me.

looking elsewhere rabbitMQ and http components both use mutable request
objects (rabbitMW's BasicProperties object, http components' addHeader
method).

how common is it right now for instances of ProducerRecord to actually be
reused?
do people really have things like publis static final ProducerRecord
MY_FAVORITE_REQUEST = ... ?


[GitHub] kafka pull request #2554: KAFKA-4769: Add Float serializer, deserializer, se...

2017-03-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2554


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4769) Add Float serializer, deserializer, serde

2017-03-13 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4769:
-
   Resolution: Fixed
Fix Version/s: 0.11.0.0
   Status: Resolved  (was: Patch Available)

Issue resolved by pull request 2554
[https://github.com/apache/kafka/pull/2554]

> Add Float serializer, deserializer, serde
> -
>
> Key: KAFKA-4769
> URL: https://issues.apache.org/jira/browse/KAFKA-4769
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.10.1.1, 0.10.2.0
>Reporter: Michael Noll
>Assignee: Michael Noll
>Priority: Minor
> Fix For: 0.11.0.0
>
>
> We currently provide serializers/deserializers/serdes for a few data types 
> such as String, Long, Double, but not yet for Float.
> Adding built-in support Float is helpful for when e.g. you are using Kafka 
> Connect to write data from a MySQL database, where the field was defined as a 
> FLOAT, so the schema was generated as FLOAT, and you like to subsequently 
> process the data with Kafka Streams.
> Possible workaround:
> Instead of adding Float support, users can manually convert from float to 
> double.  The point of this ticket however is to save the user from being 
> forced to convert manually, thus providing more convenience and slightly 
> better Connect-Streams interoperability in a scenario such as above.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4769) Add Float serializer, deserializer, serde

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15922722#comment-15922722
 ] 

ASF GitHub Bot commented on KAFKA-4769:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2554


> Add Float serializer, deserializer, serde
> -
>
> Key: KAFKA-4769
> URL: https://issues.apache.org/jira/browse/KAFKA-4769
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.10.1.1, 0.10.2.0
>Reporter: Michael Noll
>Assignee: Michael Noll
>Priority: Minor
> Fix For: 0.11.0.0
>
>
> We currently provide serializers/deserializers/serdes for a few data types 
> such as String, Long, Double, but not yet for Float.
> Adding built-in support Float is helpful for when e.g. you are using Kafka 
> Connect to write data from a MySQL database, where the field was defined as a 
> FLOAT, so the schema was generated as FLOAT, and you like to subsequently 
> process the data with Kafka Streams.
> Possible workaround:
> Instead of adding Float support, users can manually convert from float to 
> double.  The point of this ticket however is to save the user from being 
> forced to convert manually, thus providing more convenience and slightly 
> better Connect-Streams interoperability in a scenario such as above.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-13 Thread Jay Kreps
Two things:

   1. This is a minor thing but the proposed new name for KStreamBuilder
   is StreamsTopologyBuilder. I actually think we should not put topology in
   the name as topology is not a concept you need to understand at the
   kstreams layer right now. I'd think of three categories of concepts: (1)
   concepts you need to understand to get going even for a simple example, (2)
   concepts you need to understand to operate and debug a real production app,
   (3) concepts we truly abstract and you don't need to ever understand. I
   think in the kstream layer topologies are currently category (2), and this
   is where they belong. By introducing the name in even the simplest example
   it means the user has to go read about toplogies to really understand even
   this simple snippet. What if instead we called it KStreamsBuilder?
   2. For the processor api, I think this api is mostly not for end users.
   However this are a couple cases where it might make sense to expose it. I
   think users coming from Samza, or JMS's MessageListener (
   https://docs.oracle.com/javaee/7/api/javax/jms/MessageListener.html)
   understand a simple callback interface for message processing. In fact,
   people often ask why Kafka's consumer doesn't provide such an interface.
   I'd argue we do, it's KafkaStreams. The only issue is that the processor
   API documentation is a bit scary for a person implementing this type of
   api. My observation is that people using this style of API don't do a lot
   of cross-message operations, then just do single message operations and use
   a database for anything that spans messages. They also don't factor their
   code into many MessageListeners and compose them, they just have one
   listener that has the complete handling logic. Say I am a user who wants to
   implement a single Processor in this style. Do we have an easy way to do
   that today (either with the .transform/.process methods in kstreams or with
   the topology apis)? Is there anything we can do in the way of trivial
   helper code to make this better? Also, how can we explain that pattern to
   people? I think currently we have pretty in-depth docs on our apis but I
   suspect a person trying to figure out how to implement a simple callback
   might get a bit lost trying to figure out how to wire it up. A simple five
   line example in the docs would probably help a lot. Not sure if this is
   best addressed in this KIP or is a side comment.

Cheers,

-Jay

On Fri, Feb 3, 2017 at 3:33 PM, Matthias J. Sax 
wrote:

> Hi All,
>
> I did prepare a KIP to do some cleanup some of Kafka's Streaming API.
>
> Please have a look here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 120%3A+Cleanup+Kafka+Streams+builder+API
>
> Looking forward to your feedback!
>
>
> -Matthias
>
>


[GitHub] kafka pull request #2672: KAFKA-4657: Improve test coverage of CompositeRead...

2017-03-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2672


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4848) Stream thread getting into deadlock state while trying to get rocksdb lock in retryWithBackoff

2017-03-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15922762#comment-15922762
 ] 

Guozhang Wang commented on KAFKA-4848:
--

[~sjmittal] BTW I have added you to the contributor list.

> Stream thread getting into deadlock state while trying to get rocksdb lock in 
> retryWithBackoff
> --
>
> Key: KAFKA-4848
> URL: https://issues.apache.org/jira/browse/KAFKA-4848
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Sachin Mittal
>Assignee: Sachin Mittal
> Attachments: thr-1
>
>
> We see a deadlock state when streams thread to process a task takes longer 
> than MAX_POLL_INTERVAL_MS_CONFIG time. In this case this threads partitions 
> are assigned to some other thread including rocksdb lock. When it tries to 
> process the next task it cannot get rocks db lock and simply keeps waiting 
> for that lock forever.
> in retryWithBackoff for AbstractTaskCreator we have a backoffTimeMs = 50L.
> If it does not get lock the we simply increase the time by 10x and keep 
> trying inside the while true loop.
> We need to have a upper bound for this backoffTimeM. If the time is greater 
> than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the lock means this 
> thread's partitions are moved somewhere else and it may not get the lock 
> again.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4848) Stream thread getting into deadlock state while trying to get rocksdb lock in retryWithBackoff

2017-03-13 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-4848:
--

Assignee: Sachin Mittal

> Stream thread getting into deadlock state while trying to get rocksdb lock in 
> retryWithBackoff
> --
>
> Key: KAFKA-4848
> URL: https://issues.apache.org/jira/browse/KAFKA-4848
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Sachin Mittal
>Assignee: Sachin Mittal
> Attachments: thr-1
>
>
> We see a deadlock state when streams thread to process a task takes longer 
> than MAX_POLL_INTERVAL_MS_CONFIG time. In this case this threads partitions 
> are assigned to some other thread including rocksdb lock. When it tries to 
> process the next task it cannot get rocks db lock and simply keeps waiting 
> for that lock forever.
> in retryWithBackoff for AbstractTaskCreator we have a backoffTimeMs = 50L.
> If it does not get lock the we simply increase the time by 10x and keep 
> trying inside the while true loop.
> We need to have a upper bound for this backoffTimeM. If the time is greater 
> than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the lock means this 
> thread's partitions are moved somewhere else and it may not get the lock 
> again.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4657) Improve test coverage of CompositeReadOnlyWindowStore

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15922752#comment-15922752
 ] 

ASF GitHub Bot commented on KAFKA-4657:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2672


> Improve test coverage of CompositeReadOnlyWindowStore
> -
>
> Key: KAFKA-4657
> URL: https://issues.apache.org/jira/browse/KAFKA-4657
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
> Fix For: 0.11.0.0
>
>
> No tests that cover what happens when {{windowStore.fetch(...)}} throw an 
> {{InvalidStateStoreException}}, i.e., we should check that we get an 
> {{InvalidStateStoreException}} with the modified message.
> There are no tests that cover the {{WindowStoreIterator}} that is returned 
> when there are no results. In this case we should at least add tests that 
> show the expected behaviour for {{peekNextKey}}, {{hasNext}}, and {{next}} 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-4657) Improve test coverage of CompositeReadOnlyWindowStore

2017-03-13 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4657.
--
Resolution: Fixed

Issue resolved by pull request 2672
[https://github.com/apache/kafka/pull/2672]

> Improve test coverage of CompositeReadOnlyWindowStore
> -
>
> Key: KAFKA-4657
> URL: https://issues.apache.org/jira/browse/KAFKA-4657
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
> Fix For: 0.11.0.0
>
>
> No tests that cover what happens when {{windowStore.fetch(...)}} throw an 
> {{InvalidStateStoreException}}, i.e., we should check that we get an 
> {{InvalidStateStoreException}} with the modified message.
> There are no tests that cover the {{WindowStoreIterator}} that is returned 
> when there are no results. In this case we should at least add tests that 
> show the expected behaviour for {{peekNextKey}}, {{hasNext}}, and {{next}} 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4859) Transient test failure: org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion (again)

2017-03-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15922772#comment-15922772
 ] 

Guozhang Wang commented on KAFKA-4859:
--

[~original-brownbear] Thanks a lot for picking this up! Any good new on this 
investigation so far? It seems to be a high frequent transient failure in 
Jenkins and hence we'd like to fix it sooner.

> Transient test failure: 
> org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion
>  (again)
> ---
>
> Key: KAFKA-4859
> URL: https://issues.apache.org/jira/browse/KAFKA-4859
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Armin Braun
>Assignee: Armin Braun
>
> Slightly different than KAFKA-3874 in terms of the way it fails.
> Now we have:
> {code}
> Error Message
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
> records from topic output-topic-2 while only received 0: []
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
> records from topic output-topic-2 while only received 0: []
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:257)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:206)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:175)
>   at 
> org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion(KStreamKTableJoinIntegrationTest.java:297)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> {code}
> e.g. here https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2032/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4887) Enabling caching on a persistent window store breaks support for duplicate insertion

2017-03-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15922789#comment-15922789
 ] 

Guozhang Wang commented on KAFKA-4887:
--

Thanks Elias, this does sound a bug to me.

Just curious what is your scenario that you'd like to insert duplicated records 
with caching turned on? Or do you actually prefer to not have caching since it 
seems you do not want any dedupping anyways but cannot do that?

> Enabling caching on a persistent window store breaks support for duplicate 
> insertion
> 
>
> Key: KAFKA-4887
> URL: https://issues.apache.org/jira/browse/KAFKA-4887
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Elias Levy
>
> {{CachingWindowStore}} and {{RocksDBWindowStore}} interact badly when 
> duplicate insertion support is enabled by passing {{true}} as the fourth 
> argument to {{windowed}} in the state store supplier.
> When the feature is enabled, {{RocksDBWindowStore}} correct handles 
> duplicates by assigning a unique sequence number to each element on insertion 
> and using the number within the key.
> When caching is enabled by calling {{enableCaching}} on the supplier, 
> {{CachingWindowStore}} fails to the the same.  Thus, of multiple values 
> inserted with the same key, only the last one survives.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-03-13 Thread radai
looking at the KIP as it is now, looks like all *Options objects have a
common timeout property. could it be extracted to a common
AdminRequestOptions or something?

On Thu, Mar 9, 2017 at 2:14 PM, Colin McCabe  wrote:

> Hi all,
>
> We've been discussing this for a while (about a month) and I think
> people have made some great points that improved the proposal.  In
> particular, adding async and batching was important.  I've also been
> talking with some end-users who would like to make use of this API.
> Once this is in, we can iterate on it before the release, and it will
> also unblock a lot of other admin proposals.  I think it would be good
> to start the vote in a little bit, assuming there are no objections.
>
> best,
> Colin
>
>
> On Wed, Mar 8, 2017, at 17:09, Colin McCabe wrote:
> > On Mon, Mar 6, 2017, at 11:55, Ismael Juma wrote:
> > > Thanks Colin.
> > >
> > > I am familiar with the protocol semantics, but we need to document the
> > > API
> > > for users who don't know the protocol. I still think it would be
> valuable
> > > to have some examples of how the API would be used for common use
> cases.
> >
> > Getting the version of all nodes in the cluster:
> >   Map nodesToVersions =
> > adminClient.listNodes().nodes().thenApply(
> >   nodes -> adminClient.apiVersions(nodes)).all().get();
> >
> > Creating a topic:
> >   adminClient.createTopic(new NewTopic("myNewTopic", 3, (short)
> >   3)).all().get();
> >
> > Validating that a topic can be created (but not creating it):
> >   adminClient.createTopic(new NewTopic("myNewTopic", 3, (short) 3),
> > new CreateTopicOptions().setValidateOnly(true)).all().get();
> >
> > > For example, say someone creates a topic and then produces to it. What
> > > would be the recommended way to do that?
> >
> > Once the future returned by createTopics has successfully completed, it
> > should be possible to produce to the topic.
> >
> > There are a few warts that are definitely worth calling out.  These are
> > things that need to be fixed at the protocol layer, so they're outside
> > the scope of this KIP.  But you made a good point that we need to
> > document this well.  Here's my list (I wonder if anyone has more?):
> >
> > * If auto.create.topics.enable is true on the brokers,
> > AdminClient#describeTopic(topicName) may create a topic named topicName.
> >  There are two workarounds: either use AdminClient#listTopics and ensure
> > that the topic is present before describing, or disable
> > auto.create.topics.enable.
> >
> > * If delete.topic.enable is false on the brokers,
> > AdminClient#deleteTopic(topicName) will mark topicName for deletion, but
> > not actually delete it.  deleteTopic will return success in this case.
> >
> > * It may take several seconds after AdminClient#deleteTopic returns
> > success for all the brokers to become aware that the topic is gone.
> > During this time, AdminClient#listTopics and AdminClient#describeTopic
> > may continue to return information about the deleted topic.
> >
> > best,
> > Colin
> >
> >
> > >
> > > Ismael
> > >
> > > On Mon, Mar 6, 2017 at 7:43 PM, Colin McCabe 
> wrote:
> > >
> > > > On Mon, Mar 6, 2017, at 05:50, Ismael Juma wrote:
> > > > > Thanks Colin. It seems like you replied to me accidentally instead
> of the
> > > > > list, so leaving your reply below for the benefit of others.
> > > >
> > > > Thanks, Ismael.  I actually realized my mistake right after I sent to
> > > > you, and re-posted it to the mailing list instead of sending
> directly.
> > > > Sigh...
> > > >
> > > > >
> > > > > Regarding the disadvantage of having to hunt through the request
> class,
> > > > > don't people have to do that anyway with the Options classes?
> > > >
> > > > A lot of people will simply choose the default options, until they
> have
> > > > a reason to do otherwise (for example, they want a longer or shorter
> > > > timeout, etc.)
> > > >
> > > > >
> > > > > Aside from that, it would be great if the KIP included more
> detailed
> > > > > javadoc for each method including information about potential
> exceptions.
> > > >
> > > > That's a good question.  Because this is an asynchronous API, methods
> > > > never throw exceptions.  Instead, if you call get() / whenComplete()
> /
> > > > isCompletedExceptionally() / etc. on one of the CompletableFuture
> > > > objects, you will get the exception.  This is to allow Node.js-style
> > > > completion chaining.  I will add this explanation to the KIP.
> > > >
> > > > > I'm particularly interested in what a user can expect if a create
> topics
> > > > > succeeds versus what they can expect if a timeout exception is
> thrown. It
> > > > > would be good to consider partial failures as well.
> > > >
> > > > This is spelled out by KIP-4.
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+
> > > > Command+line+and+centralized+administrative+operations
> > > >
> > > > Specifically,
> > > >
> > > > > If a timeout error occurs [in CreateTopic], the t

[jira] [Commented] (KAFKA-4887) Enabling caching on a persistent window store breaks support for duplicate insertion

2017-03-13 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15922826#comment-15922826
 ] 

Elias Levy commented on KAFKA-4887:
---

It is a bit complicated.  I am abusing the {{RocksDBWnidowStore}} as a TTL 
cache (something I've covered in other JIRA tickets).  In this particular case 
the store is used to perform a join between two streams where in theory one of 
the streams is keyed by two values, while the other stream is keyed only by a 
single of those values.  In practice the streams are only keyed by their shared 
value and when items from the stream that in theory is keyed by two values, as 
they are inserted with a single key, there may be collisions.  To get around 
that I turn on the feature to allow duplicates and the join is performed by 
iterating over duplicates and matching the second key value.


> Enabling caching on a persistent window store breaks support for duplicate 
> insertion
> 
>
> Key: KAFKA-4887
> URL: https://issues.apache.org/jira/browse/KAFKA-4887
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Elias Levy
>
> {{CachingWindowStore}} and {{RocksDBWindowStore}} interact badly when 
> duplicate insertion support is enabled by passing {{true}} as the fourth 
> argument to {{windowed}} in the state store supplier.
> When the feature is enabled, {{RocksDBWindowStore}} correct handles 
> duplicates by assigning a unique sequence number to each element on insertion 
> and using the number within the key.
> When caching is enabled by calling {{enableCaching}} on the supplier, 
> {{CachingWindowStore}} fails to the the same.  Thus, of multiple values 
> inserted with the same key, only the last one survives.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-13 Thread Steven Schlansker

> On Mar 13, 2017, at 12:30 PM, Matthias J. Sax  wrote:
> 
> Jay,
> 
> thanks for your feedback
> 
>> What if instead we called it KStreamsBuilder?
> 
> That's the current name and I personally think it's not the best one.
> The main reason why I don't like KStreamsBuilder is, that we have the
> concepts of KStreams and KTables, and the builder creates both. However,
> the name puts he focus on KStream and devalues KTable.
> 
> I understand your argument, and I am personally open the remove the
> "Topology" part, and name it "StreamsBuilder". Not sure what others
> think about this.

If you worry about that, you could consider "KafkaStreamsBuilder"
which highlights that it is about Kafka Streams the project as
opposed to KStream the feature.  A little bit wordier, but you probably
only type it a couple times.



signature.asc
Description: Message signed with OpenPGP using GPGMail


[GitHub] kafka pull request #2678: KAFKA-4891 kafka.request.logger TRACE regression

2017-03-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2678


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4891) kafka.request.logger TRACE regression

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15922844#comment-15922844
 ] 

ASF GitHub Bot commented on KAFKA-4891:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2678


> kafka.request.logger TRACE regression
> -
>
> Key: KAFKA-4891
> URL: https://issues.apache.org/jira/browse/KAFKA-4891
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 0.10.2.1
>
>
> Here's what kafka-request.log shows with {{kafka.request.logger}} set to 
> TRACE:
> {code}
> [2017-03-13 10:06:24,402] TRACE Completed 
> request:org.apache.kafka.common.requests.RequestHeader@2f99ef87 -- 
> org.apache.kafka.common.requests.LeaderAndIsrRequest@34c40fc5 from connection 
> 127.0.0.1:9090-127.0.0.1:50969;totalTime:125,requestQueueTime:0,localTime:124,remoteTime:0,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger)
> [2017-03-13 10:06:24,406] TRACE Completed 
> request:org.apache.kafka.common.requests.RequestHeader@133b1de8 -- 
> org.apache.kafka.common.requests.UpdateMetadataRequest@795826d from 
> connection 
> 127.0.0.1:9090-127.0.0.1:50969;totalTime:3,requestQueueTime:0,localTime:2,remoteTime:0,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger)
> [2017-03-13 10:06:24,943] TRACE Completed 
> request:org.apache.kafka.common.requests.RequestHeader@76a9acaa -- 
> org.apache.kafka.common.requests.FetchRequest@33ab3c1b from connection 
> 127.0.0.1:9090-127.0.0.1:50976;totalTime:522,requestQueueTime:0,localTime:13,remoteTime:506,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger)
> {code}
> Both the headers and requests have regressed to just show object ids instead 
> of their contents from their underlying structs. I'm guessing this regression 
> came from commit 
> [fc1cfe475e8ae8458d8ddf119ce18d0c64653a70|https://github.com/apache/kafka/commit/fc1cfe475e8ae8458d8ddf119ce18d0c64653a70]
> The logs should look something like this:
> {code}
> [2017-03-13 10:14:36,754] TRACE Completed 
> request:{api_key=4,api_version=0,correlation_id=2,client_id=0} -- 
> {controller_id=0,controller_epoch=1,partition_states=[{topic=t,partition=5,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=4,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=3,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=2,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=1,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=0,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=7,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=6,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]}],live_leaders=[{id=0,host=localhost,port=9090},{id=1,host=localhost,port=9091}]}
>  from connection 
> 127.0.0.1:9090-127.0.0.1:51349;totalTime:155,requestQueueTime:0,localTime:155,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger)
> [2017-03-13 10:14:36,758] TRACE Completed 
> request:{api_key=6,api_version=3,correlation_id=3,client_id=0} -- 
> {controller_id=0,controller_epoch=1,partition_states=[{topic=t,partition=5,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=4,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=3,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=2,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=1,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=0,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=7,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=6,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]}],live_brokers=[{id=1,end_points=[{port=9091,host=localhost,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null},{id=0,end_points=[{port=9090,host=localhost,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null}]}
>  from connection 
> 127.0.0.1:9090-127.0.0.1:51

[jira] [Updated] (KAFKA-4891) kafka.request.logger TRACE regression

2017-03-13 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-4891:

Fix Version/s: 0.10.2.1

> kafka.request.logger TRACE regression
> -
>
> Key: KAFKA-4891
> URL: https://issues.apache.org/jira/browse/KAFKA-4891
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 0.10.2.1
>
>
> Here's what kafka-request.log shows with {{kafka.request.logger}} set to 
> TRACE:
> {code}
> [2017-03-13 10:06:24,402] TRACE Completed 
> request:org.apache.kafka.common.requests.RequestHeader@2f99ef87 -- 
> org.apache.kafka.common.requests.LeaderAndIsrRequest@34c40fc5 from connection 
> 127.0.0.1:9090-127.0.0.1:50969;totalTime:125,requestQueueTime:0,localTime:124,remoteTime:0,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger)
> [2017-03-13 10:06:24,406] TRACE Completed 
> request:org.apache.kafka.common.requests.RequestHeader@133b1de8 -- 
> org.apache.kafka.common.requests.UpdateMetadataRequest@795826d from 
> connection 
> 127.0.0.1:9090-127.0.0.1:50969;totalTime:3,requestQueueTime:0,localTime:2,remoteTime:0,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger)
> [2017-03-13 10:06:24,943] TRACE Completed 
> request:org.apache.kafka.common.requests.RequestHeader@76a9acaa -- 
> org.apache.kafka.common.requests.FetchRequest@33ab3c1b from connection 
> 127.0.0.1:9090-127.0.0.1:50976;totalTime:522,requestQueueTime:0,localTime:13,remoteTime:506,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger)
> {code}
> Both the headers and requests have regressed to just show object ids instead 
> of their contents from their underlying structs. I'm guessing this regression 
> came from commit 
> [fc1cfe475e8ae8458d8ddf119ce18d0c64653a70|https://github.com/apache/kafka/commit/fc1cfe475e8ae8458d8ddf119ce18d0c64653a70]
> The logs should look something like this:
> {code}
> [2017-03-13 10:14:36,754] TRACE Completed 
> request:{api_key=4,api_version=0,correlation_id=2,client_id=0} -- 
> {controller_id=0,controller_epoch=1,partition_states=[{topic=t,partition=5,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=4,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=3,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=2,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=1,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=0,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=7,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=6,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]}],live_leaders=[{id=0,host=localhost,port=9090},{id=1,host=localhost,port=9091}]}
>  from connection 
> 127.0.0.1:9090-127.0.0.1:51349;totalTime:155,requestQueueTime:0,localTime:155,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger)
> [2017-03-13 10:14:36,758] TRACE Completed 
> request:{api_key=6,api_version=3,correlation_id=3,client_id=0} -- 
> {controller_id=0,controller_epoch=1,partition_states=[{topic=t,partition=5,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=4,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=3,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=2,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=1,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=0,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=7,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=6,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]}],live_brokers=[{id=1,end_points=[{port=9091,host=localhost,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null},{id=0,end_points=[{port=9090,host=localhost,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null}]}
>  from connection 
> 127.0.0.1:9090-127.0.0.1:51349;totalTime:3,requestQueueTime:1,localTime:2,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:U

[GitHub] kafka pull request #2679: HOTFIX: Fix apache headers in float serde class fi...

2017-03-13 Thread guozhangwang
GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/2679

HOTFIX: Fix apache headers in float serde class files

@hachikuji 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka KHotfix-serde-headers

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2679.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2679






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4891) kafka.request.logger TRACE regression

2017-03-13 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-4891:

Fix Version/s: (was: 0.10.2.1)
   0.11.0.0

> kafka.request.logger TRACE regression
> -
>
> Key: KAFKA-4891
> URL: https://issues.apache.org/jira/browse/KAFKA-4891
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 0.11.0.0
>
>
> Here's what kafka-request.log shows with {{kafka.request.logger}} set to 
> TRACE:
> {code}
> [2017-03-13 10:06:24,402] TRACE Completed 
> request:org.apache.kafka.common.requests.RequestHeader@2f99ef87 -- 
> org.apache.kafka.common.requests.LeaderAndIsrRequest@34c40fc5 from connection 
> 127.0.0.1:9090-127.0.0.1:50969;totalTime:125,requestQueueTime:0,localTime:124,remoteTime:0,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger)
> [2017-03-13 10:06:24,406] TRACE Completed 
> request:org.apache.kafka.common.requests.RequestHeader@133b1de8 -- 
> org.apache.kafka.common.requests.UpdateMetadataRequest@795826d from 
> connection 
> 127.0.0.1:9090-127.0.0.1:50969;totalTime:3,requestQueueTime:0,localTime:2,remoteTime:0,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger)
> [2017-03-13 10:06:24,943] TRACE Completed 
> request:org.apache.kafka.common.requests.RequestHeader@76a9acaa -- 
> org.apache.kafka.common.requests.FetchRequest@33ab3c1b from connection 
> 127.0.0.1:9090-127.0.0.1:50976;totalTime:522,requestQueueTime:0,localTime:13,remoteTime:506,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger)
> {code}
> Both the headers and requests have regressed to just show object ids instead 
> of their contents from their underlying structs. I'm guessing this regression 
> came from commit 
> [fc1cfe475e8ae8458d8ddf119ce18d0c64653a70|https://github.com/apache/kafka/commit/fc1cfe475e8ae8458d8ddf119ce18d0c64653a70]
> The logs should look something like this:
> {code}
> [2017-03-13 10:14:36,754] TRACE Completed 
> request:{api_key=4,api_version=0,correlation_id=2,client_id=0} -- 
> {controller_id=0,controller_epoch=1,partition_states=[{topic=t,partition=5,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=4,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=3,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=2,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=1,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=0,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=7,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=6,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]}],live_leaders=[{id=0,host=localhost,port=9090},{id=1,host=localhost,port=9091}]}
>  from connection 
> 127.0.0.1:9090-127.0.0.1:51349;totalTime:155,requestQueueTime:0,localTime:155,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger)
> [2017-03-13 10:14:36,758] TRACE Completed 
> request:{api_key=6,api_version=3,correlation_id=3,client_id=0} -- 
> {controller_id=0,controller_epoch=1,partition_states=[{topic=t,partition=5,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=4,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=3,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=2,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=1,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=0,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=7,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=6,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]}],live_brokers=[{id=1,end_points=[{port=9091,host=localhost,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null},{id=0,end_points=[{port=9090,host=localhost,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null}]}
>  from connection 
> 127.0.0.1:9090-127.0.0.1:51349;totalTime:3,requestQueueTime:1,localTime:2,remoteTime:0,responseQueueTime:0,sendTime:0

Build failed in Jenkins: kafka-trunk-jdk8 #1346

2017-03-13 Thread Apache Jenkins Server
See 

--
Failed to access build log

java.io.IOException: remote file operation failed: 
/home/jenkins/jenkins-slave/workspace/kafka-trunk-jdk8 at 
hudson.remoting.Channel@7b7d2d3b:ubuntu-us1: 
hudson.remoting.ChannelClosedException: channel is already closed
at hudson.FilePath.act(FilePath.java:992)
at hudson.FilePath.act(FilePath.java:974)
at hudson.FilePath.toURI(FilePath.java:1119)
at hudson.tasks.MailSender.createFailureMail(MailSender.java:318)
at hudson.tasks.MailSender.createMail(MailSender.java:179)
at hudson.tasks.MailSender.run(MailSender.java:110)
at hudson.tasks.Mailer.perform(Mailer.java:170)
at 
hudson.tasks.BuildStepCompatibilityLayer.perform(BuildStepCompatibilityLayer.java:78)
at hudson.tasks.BuildStepMonitor$1.perform(BuildStepMonitor.java:20)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.perform(AbstractBuild.java:779)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.performAllBuildSteps(AbstractBuild.java:720)
at hudson.model.Build$BuildExecution.post2(Build.java:185)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.post(AbstractBuild.java:665)
at hudson.model.Run.execute(Run.java:1753)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at hudson.model.ResourceController.execute(ResourceController.java:98)
at hudson.model.Executor.run(Executor.java:404)
Caused by: hudson.remoting.ChannelClosedException: channel is already closed
at hudson.remoting.Channel.send(Channel.java:604)
at hudson.remoting.Request.call(Request.java:130)
at hudson.remoting.Channel.call(Channel.java:821)
at hudson.FilePath.act(FilePath.java:985)
... 16 more
Caused by: java.io.IOException
at hudson.remoting.Channel.close(Channel.java:1284)
at hudson.slaves.ChannelPinger$1.onDead(ChannelPinger.java:115)
at hudson.remoting.PingThread.ping(PingThread.java:130)
at hudson.remoting.PingThread.run(PingThread.java:86)
Caused by: java.util.concurrent.TimeoutException: Ping started at 1489434262837 
hasn't completed by 1489434502838
... 2 more


[jira] [Resolved] (KAFKA-4891) kafka.request.logger TRACE regression

2017-03-13 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved KAFKA-4891.
-
Resolution: Fixed

> kafka.request.logger TRACE regression
> -
>
> Key: KAFKA-4891
> URL: https://issues.apache.org/jira/browse/KAFKA-4891
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 0.11.0.0
>
>
> Here's what kafka-request.log shows with {{kafka.request.logger}} set to 
> TRACE:
> {code}
> [2017-03-13 10:06:24,402] TRACE Completed 
> request:org.apache.kafka.common.requests.RequestHeader@2f99ef87 -- 
> org.apache.kafka.common.requests.LeaderAndIsrRequest@34c40fc5 from connection 
> 127.0.0.1:9090-127.0.0.1:50969;totalTime:125,requestQueueTime:0,localTime:124,remoteTime:0,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger)
> [2017-03-13 10:06:24,406] TRACE Completed 
> request:org.apache.kafka.common.requests.RequestHeader@133b1de8 -- 
> org.apache.kafka.common.requests.UpdateMetadataRequest@795826d from 
> connection 
> 127.0.0.1:9090-127.0.0.1:50969;totalTime:3,requestQueueTime:0,localTime:2,remoteTime:0,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger)
> [2017-03-13 10:06:24,943] TRACE Completed 
> request:org.apache.kafka.common.requests.RequestHeader@76a9acaa -- 
> org.apache.kafka.common.requests.FetchRequest@33ab3c1b from connection 
> 127.0.0.1:9090-127.0.0.1:50976;totalTime:522,requestQueueTime:0,localTime:13,remoteTime:506,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger)
> {code}
> Both the headers and requests have regressed to just show object ids instead 
> of their contents from their underlying structs. I'm guessing this regression 
> came from commit 
> [fc1cfe475e8ae8458d8ddf119ce18d0c64653a70|https://github.com/apache/kafka/commit/fc1cfe475e8ae8458d8ddf119ce18d0c64653a70]
> The logs should look something like this:
> {code}
> [2017-03-13 10:14:36,754] TRACE Completed 
> request:{api_key=4,api_version=0,correlation_id=2,client_id=0} -- 
> {controller_id=0,controller_epoch=1,partition_states=[{topic=t,partition=5,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=4,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=3,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=2,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=1,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=0,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=7,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=6,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]}],live_leaders=[{id=0,host=localhost,port=9090},{id=1,host=localhost,port=9091}]}
>  from connection 
> 127.0.0.1:9090-127.0.0.1:51349;totalTime:155,requestQueueTime:0,localTime:155,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
>  (kafka.request.logger)
> [2017-03-13 10:14:36,758] TRACE Completed 
> request:{api_key=6,api_version=3,correlation_id=3,client_id=0} -- 
> {controller_id=0,controller_epoch=1,partition_states=[{topic=t,partition=5,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=4,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=3,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=2,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=1,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=0,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]},{topic=t,partition=7,controller_epoch=1,leader=1,leader_epoch=0,isr=[1,0],zk_version=0,replicas=[0,1]},{topic=t,partition=6,controller_epoch=1,leader=0,leader_epoch=0,isr=[0,1],zk_version=0,replicas=[0,1]}],live_brokers=[{id=1,end_points=[{port=9091,host=localhost,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null},{id=0,end_points=[{port=9090,host=localhost,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null}]}
>  from connection 
> 127.0.0.1:9090-127.0.0.1:51349;totalTime:3,requestQueueTime:1,localTime:2,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:

Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-13 Thread Matthias J. Sax
Steven,

thanks for your feedback.

I am not sure about KafkaStreamsBuilder (even if agree that it is better
than KStreamBuilder), because it sounds like a builder that creates a
KafkaStreams instance. But that's of course not the case. It builds a
Topology -- that was the reason to consider calling it TopologyBuilder.

I suggested StreamsTopologyBuilder (instead of TopologyBuilder) to avoid
any confusion with the current TopologyBuilder (that we are going to
rename to Topology).

We could also go with DslBuilder (or DslTopologyBuilder as suggested by
Michael) -- it should be clear, that this does not build a DSL :) to
contract against KafkaStreamsBuilder.



-Matthias


On 3/13/17 12:46 PM, Steven Schlansker wrote:
> 
>> On Mar 13, 2017, at 12:30 PM, Matthias J. Sax  wrote:
>>
>> Jay,
>>
>> thanks for your feedback
>>
>>> What if instead we called it KStreamsBuilder?
>>
>> That's the current name and I personally think it's not the best one.
>> The main reason why I don't like KStreamsBuilder is, that we have the
>> concepts of KStreams and KTables, and the builder creates both. However,
>> the name puts he focus on KStream and devalues KTable.
>>
>> I understand your argument, and I am personally open the remove the
>> "Topology" part, and name it "StreamsBuilder". Not sure what others
>> think about this.
> 
> If you worry about that, you could consider "KafkaStreamsBuilder"
> which highlights that it is about Kafka Streams the project as
> opposed to KStream the feature.  A little bit wordier, but you probably
> only type it a couple times.
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-13 Thread Matthias J. Sax
Guozhang,

1) I updated the KIP to option (b).

3) Agreed. So we leave this part out, and tackle it within KIP-130


-Matthias


On 3/12/17 3:48 PM, Guozhang Wang wrote:
> Thanks Matthias.
> 
> 1) Given that TopologyDescription is for debugging purposes before
> `KafkaStreams.start()`. I think the simplest option b) may be sufficient.
> Just needs to emphasize its possible value semantics in Java docs.
> 
> 3) You can tell that I was actually thinking about this together with
> KIP-130. To me if we can expose the runtime information, which is dynamic,
> via metrics in KIP-130 then we could remove this function. The main reason
> is that, again, the task migration make this function's behavior a bit
> difficult to explain. For example:
> 
> streams.start();
> 
> sleep(/* some time */)
> 
> streams.toString();
> 
> ---
> 
> Even with the same configuration, depending on for how long did you wait
> after started, the function could return very different string results due
> to rebalances.
> 
> That being said, I was not trying to make the decision in this KIP, as I
> saw it more related to KIP-130. So we could probably still keep it as is in
> KIP-120, and consider removing it in KIP-130. That's why I was just "asking
> your thoughts on this", but not necessary wanting to make an action in this
> KIP.
> 
> 
> Guozhang
> 
> 
> 
> On Sat, Mar 11, 2017 at 11:10 AM, Matthias J. Sax 
> wrote:
> 
>> Thanks for your feedback Guozhang.
>>
>>
>> 1) There are multiple ways to do this. Let me know what you think about
>> all options:
>>
>> (a) I updated the KIP to this:
>>
>>> public final class Source implements Node {
>>> public final String name;
>>> // topicNames and topicPattern are mutually exclusive, i.e.,
>> only one will be not-null
>>> public final List topicNames; // null if #addSource(...,
>> Pattern) was used
>>> public final Pattern topicPattern; // null if #addSource(...,
>> String...) was used
>>> }
>>
>> (b) We could also go with a single variable (as originally proposed).
>> This would have the advantage (compared to (a)), that null checks are
>> not required accessing TopologyDescription#Source class.
>>
>>> String topics; // can be comma separated list of topic names or pattern
>> (as String)
>>
>> However, with an encoded list or an encoded pattern it's required to
>> parse the string again, what we want to avoid in the first place.
>>
>> (c) Use a single variable as in (b)
>>
>>> String topics; // always a pattern (as String)
>>
>> We translate a list of topic names into a pattern
>> "topic1|topic2|topic3". We loose the information if the source was added
>> via list or via pattern.
>>
>>
>>
>> 2) Your understanding is correct. Added a comment to the KIP.
>>
>>
>>
>> 3) I would keep KafkaStreams#toString() -- it's conceptually two
>> different things and runtime information is useful, too. But as its
>> return value is ambiguous to parse (and must be parsed in the first
>> place what is cumbersome), we could add KafkaStreams#describe() as
>>
>>> public synchronized KafkaStreamsDescription describe();
>>
>> KafkaStreamsDescription class would be similar to TopologyDescription to
>> allow programmatic access to runtime information. I guess we could even
>> reuse (parts of) TopologyDescription within KafkaStreamsDescription to
>> avoid code duplication.
>>
>> If you think this would be useful, I can extend the KIP accordingly.
>>
>>
>>
>> -Matthias
>>
>>
>>
>>
>> On 3/10/17 1:38 PM, Guozhang Wang wrote:
>>> One more question here:
>>>
>>> 3. with TopologyDescription, do we still want to keep the
>>> `KafkaStream.toString()` function? I think it may still have some
>> advantage
>>> such that it contains tasks information after `KafkaStream#start()` has
>>> been called, but much of it is duplicate with the TopologyDescription,
>> and
>>> it is only in the form of the string hence hard to programmatically
>>> leverage. So would like to hear your thoughts.
>>>
>>> Guozhang
>>>
>>> On Thu, Mar 9, 2017 at 11:20 PM, Guozhang Wang 
>> wrote:
>>>
 Thanks Matthias, the updated KIP lgtm overall. A couple of minor
>> comments:

 1. With regard to this class:

 public final class Source implements Node {
 public final String name;
 public final String topic; // can be topic name or pattern (as
 String)
 }

 Note that the source node could contain more than a single topic, i.e. a
 list of topics besides a pattern.

 2. With regard to

   public synchronized TopologyDescription describe();

 My understand is that whenever the topology is modified, one needs to
>> call
 this function again to get a new description object, as the old one
>> won't
 be updated automatically. Hence the usage pattern would be:

 TopologyDescription description = topology.describe();

 topology.addProcessor(...)

 description = topology.descri

Re: [DISCUSS] KIP-132: Augment KStream.print to allow extra parameters in the printed string

2017-03-13 Thread Matthias J. Sax
Marc,

Thanks for the KIP.

Can you please update the KIP in a way such that it is self contained.
Right now, you link to all kind of other places making it hard to read
the KIP.

The KIP should be the "center of truth" -- if there is important
information elsewhere, please c&p it into the KIP.


Thanks a lot!


-Matthias



On 3/13/17 1:30 PM, Matthias J. Sax wrote:
> Can you please add the KIP to this table:
> 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals#KafkaImprovementProposals-KIPsunderdiscussion
> 
> Thanks,
> 
>  Matthias
> 
> 
> On 3/13/17 8:08 AM, Marc Juchli wrote:
>> Dear all,
>>
>> The following describes KIP-132, which I just created. See:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-132+-+Augment+KStream.print+to+allow+extra+parameters+in+the+printed+string
>>
>> Motivation
>>
>> As for now, KStream#print leads to a predefined output where key and value 
>> are
>> printed with comma separation.
>> KAFKA-4830  suggests to
>> extend print in a way that it takes KeyValueMapper as a parameter.
>> This will allow a user to change outputs according to the users demand.
>> Public Interfaces
>>
>> The affected interface is KStream, which needs to be extended with another
>> overloaded version of print:
>>
>> void print(final Serde keySerde,
>>final Serde valSerde,
>>final String streamName,
>>final KeyValueMapper mapper);
>>
>> Proposed Changes
>>
>> See pull request GH-2669 .
>> This PR contains a discussion regarding KAFKA-4830
>>  as well as KAFKA-4772
>> .
>>
>> Compatibility, Deprecation, and Migration Plan
>>
>> The extension of print will not introduce compatibility issues – we can
>> maintain the current output by keeping the current output format as a
>> default (if mapper was not set):
>>
>> if(mapper == null) {
>> printStream.println("[" + streamName + "]: " + keyToPrint + " , "
>> + valueToPrint);
>> } else {
>> printStream.println("[" + streamName + "]: " +
>> mapper.apply(keyToPrint, valueToPrint));
>> }
>>
>>
>>
>> Kind regards,
>> Marc
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-13 Thread Matthias J. Sax
Jay,

thanks for your feedback

> What if instead we called it KStreamsBuilder?

That's the current name and I personally think it's not the best one.
The main reason why I don't like KStreamsBuilder is, that we have the
concepts of KStreams and KTables, and the builder creates both. However,
the name puts he focus on KStream and devalues KTable.

I understand your argument, and I am personally open the remove the
"Topology" part, and name it "StreamsBuilder". Not sure what others
think about this.


About Processor API: I like the idea in general, but I thinks it's out
of scope for this KIP. KIP-120 has the focus on removing leaking
internal APIs and do some cleanup how our API reflects some concepts.

However, I added your idea to API discussion Wiki page and we take if
from there:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Discussions



-Matthias


On 3/13/17 11:52 AM, Jay Kreps wrote:
> Two things:
> 
>1. This is a minor thing but the proposed new name for KStreamBuilder
>is StreamsTopologyBuilder. I actually think we should not put topology in
>the name as topology is not a concept you need to understand at the
>kstreams layer right now. I'd think of three categories of concepts: (1)
>concepts you need to understand to get going even for a simple example, (2)
>concepts you need to understand to operate and debug a real production app,
>(3) concepts we truly abstract and you don't need to ever understand. I
>think in the kstream layer topologies are currently category (2), and this
>is where they belong. By introducing the name in even the simplest example
>it means the user has to go read about toplogies to really understand even
>this simple snippet. What if instead we called it KStreamsBuilder?
>2. For the processor api, I think this api is mostly not for end users.
>However this are a couple cases where it might make sense to expose it. I
>think users coming from Samza, or JMS's MessageListener (
>https://docs.oracle.com/javaee/7/api/javax/jms/MessageListener.html)
>understand a simple callback interface for message processing. In fact,
>people often ask why Kafka's consumer doesn't provide such an interface.
>I'd argue we do, it's KafkaStreams. The only issue is that the processor
>API documentation is a bit scary for a person implementing this type of
>api. My observation is that people using this style of API don't do a lot
>of cross-message operations, then just do single message operations and use
>a database for anything that spans messages. They also don't factor their
>code into many MessageListeners and compose them, they just have one
>listener that has the complete handling logic. Say I am a user who wants to
>implement a single Processor in this style. Do we have an easy way to do
>that today (either with the .transform/.process methods in kstreams or with
>the topology apis)? Is there anything we can do in the way of trivial
>helper code to make this better? Also, how can we explain that pattern to
>people? I think currently we have pretty in-depth docs on our apis but I
>suspect a person trying to figure out how to implement a simple callback
>might get a bit lost trying to figure out how to wire it up. A simple five
>line example in the docs would probably help a lot. Not sure if this is
>best addressed in this KIP or is a side comment.
> 
> Cheers,
> 
> -Jay
> 
> On Fri, Feb 3, 2017 at 3:33 PM, Matthias J. Sax 
> wrote:
> 
>> Hi All,
>>
>> I did prepare a KIP to do some cleanup some of Kafka's Streaming API.
>>
>> Please have a look here:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 120%3A+Cleanup+Kafka+Streams+builder+API
>>
>> Looking forward to your feedback!
>>
>>
>> -Matthias
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Updated] (KAFKA-4840) There are are still cases where producer buffer pool will not remove waiters.

2017-03-13 Thread Sean McCauliff (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean McCauliff updated KAFKA-4840:
--
Description: 
There are several problems dealing with errors in  BufferPool.allocate(int 
size, long maxTimeToBlockMs):

* The accumulated number of bytes are not put back into the available pool when 
an exception happens and a thread is waiting for bytes to become available.  
This will cause the capacity of the buffer pool to decrease over time any time 
a timeout is hit within this method.
* If a Throwable other than InterruptedException is thrown out of await() for 
some reason or if there is an exception thrown in the corresponding finally 
block around the await(), for example if waitTime.record(.) throws an 
exception, then the waiters are not removed from the waiters deque.
* On timeout or other exception waiters could be signaled, but are not.  If no 
other buffers are freed then the next waiting thread will also timeout and so 
on.


  was:
In BufferPool.allocate(int size, long maxTimeToBlockMs):
If a Throwable other than InterruptedException is thrown out of await() for 
some reason or if there is an exception thrown in the corresponding finally 
block around the await(), for example if waitTime.record(.) throws an 
exception, then the waiters are not removed from the waiters deque.

The number of available bytes are also not restored when an exception happens.


> There are are still cases where producer buffer pool will not remove waiters.
> -
>
> Key: KAFKA-4840
> URL: https://issues.apache.org/jira/browse/KAFKA-4840
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.0
>Reporter: Sean McCauliff
>
> There are several problems dealing with errors in  BufferPool.allocate(int 
> size, long maxTimeToBlockMs):
> * The accumulated number of bytes are not put back into the available pool 
> when an exception happens and a thread is waiting for bytes to become 
> available.  This will cause the capacity of the buffer pool to decrease over 
> time any time a timeout is hit within this method.
> * If a Throwable other than InterruptedException is thrown out of await() for 
> some reason or if there is an exception thrown in the corresponding finally 
> block around the await(), for example if waitTime.record(.) throws an 
> exception, then the waiters are not removed from the waiters deque.
> * On timeout or other exception waiters could be signaled, but are not.  If 
> no other buffers are freed then the next waiting thread will also timeout and 
> so on.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2679: HOTFIX: Fix apache headers in float serde class fi...

2017-03-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2679


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4835) Allow users control over repartitioning

2017-03-13 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15922911#comment-15922911
 ] 

Matthias J. Sax commented on KAFKA-4835:


Agreed. I just wanted to point out, that there a different scenarios. This JIRA 
make a lot of sense for "re-keying that preserves partitioning" because there a 
(conceptually) multiple primary keys in the data, and you just want to set a 
different primary key.

> Allow users control over repartitioning
> ---
>
> Key: KAFKA-4835
> URL: https://issues.apache.org/jira/browse/KAFKA-4835
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Michal Borowiecki
>  Labels: needs-kip
>
> From 
> https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030
> ...it would be good to provide users more control over the repartitioning. 
> My use case is as follows (unrelated bits omitted for brevity):
> {code}
>   KTable loggedInCustomers = builder
>   .stream("customerLogins")
>   .groupBy((key, activity) -> 
>   activity.getCustomerRef())
>   .reduce((first,second) -> second, loginStore());
>   
>   builder
>   .stream("balanceUpdates")
>   .map((key, activity) -> new KeyValue<>(
>   activity.getCustomerRef(),
>   activity))
>   .join(loggedInCustomers, (activity, session) -> ...
>   .to("sessions");
> {code}
> Both "groupBy" and "map" in the underlying implementation set the 
> repartitionRequired flag (since the key changes), and the aggregation/join 
> that follows will create the repartitioned topic.
> However, in our case I know that both input streams are already partitioned 
> by the customerRef value, which I'm mapping into the key (because it's 
> required by the join operation).
> So there are 2 unnecessary intermediate topics created with their associated 
> overhead, while the ultimate goal is simply to do a join on a value that we 
> already use to partition the original streams anyway.
> (Note, we don't have the option to re-implement the original input streams to 
> make customerRef the message key.)
> I think it would be better to allow the user to decide (from their knowledge 
> of the incoming streams) whether a repartition is mandatory on aggregation 
> and join operations (overloaded version of the methods with the 
> repartitionRequired flag exposed maybe?)
> An alternative would be to allow users to perform a join on a value other 
> than the key (a keyValueMapper parameter to join, like the one used for joins 
> with global tables), but I expect that to be more involved and error-prone to 
> use for people who don't understand the partitioning requirements well 
> (whereas it's safe for global tables).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-131 : Add access to OffsetStorageReader from SourceConnector

2017-03-13 Thread Matthias J. Sax
Can you please add the KIP to this table:

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals#KafkaImprovementProposals-KIPsunderdiscussion

Thanks,

 Matthias


On 3/7/17 1:24 PM, Florian Hussonnois wrote:
> Hi all,
> 
> I've created a new KIP to add access to OffsetStorageReader from
> SourceConnector
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-131+-+Add+access+to+OffsetStorageReader+from+SourceConnector
> 
> Thanks.
> 



signature.asc
Description: OpenPGP digital signature


Build failed in Jenkins: kafka-trunk-jdk7 #2011

2017-03-13 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-4769: Add Float serializer, deserializer, serde

[wangguoz] KAFKA-4657: Improve test coverage of CompositeReadOnlyWindowStore

--
[...truncated 322.47 KB...]

kafka.coordinator.GroupCoordinatorResponseTest > testValidLeaveGroup PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testDescribeGroupInactiveGroup 
STARTED

kafka.coordinator.GroupCoordinatorResponseTest > testDescribeGroupInactiveGroup 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testSyncGroupNotCoordinator 
STARTED

kafka.coordinator.GroupCoordinatorResponseTest > testSyncGroupNotCoordinator 
PASSED

kafka.coordinator.GroupCoordinatorResponseTest > 
testHeartbeatUnknownConsumerExistingGroup STARTED

kafka.coordinator.GroupCoordinatorResponseTest > 
testHeartbeatUnknownConsumerExistingGroup PASSED

kafka.coordinator.GroupCoordinatorResponseTest > testValidHeartbeat STARTED

kafka.coordinator.GroupCoordinatorResponseTest > testValidHeartbeat PASSED

kafka.coordinator.GroupMetadataTest > testDeadToAwaitingSyncIllegalTransition 
STARTED

kafka.coordinator.GroupMetadataTest > testDeadToAwaitingSyncIllegalTransition 
PASSED

kafka.coordinator.GroupMetadataTest > testOffsetCommitFailure STARTED

kafka.coordinator.GroupMetadataTest > testOffsetCommitFailure PASSED

kafka.coordinator.GroupMetadataTest > 
testPreparingRebalanceToStableIllegalTransition STARTED

kafka.coordinator.GroupMetadataTest > 
testPreparingRebalanceToStableIllegalTransition PASSED

kafka.coordinator.GroupMetadataTest > testStableToDeadTransition STARTED

kafka.coordinator.GroupMetadataTest > testStableToDeadTransition PASSED

kafka.coordinator.GroupMetadataTest > testInitNextGenerationEmptyGroup STARTED

kafka.coordinator.GroupMetadataTest > testInitNextGenerationEmptyGroup PASSED

kafka.coordinator.GroupMetadataTest > testCannotRebalanceWhenDead STARTED

kafka.coordinator.GroupMetadataTest > testCannotRebalanceWhenDead PASSED

kafka.coordinator.GroupMetadataTest > testInitNextGeneration STARTED

kafka.coordinator.GroupMetadataTest > testInitNextGeneration PASSED

kafka.coordinator.GroupMetadataTest > testPreparingRebalanceToEmptyTransition 
STARTED

kafka.coordinator.GroupMetadataTest > testPreparingRebalanceToEmptyTransition 
PASSED

kafka.coordinator.GroupMetadataTest > testSelectProtocol STARTED

kafka.coordinator.GroupMetadataTest > testSelectProtocol PASSED

kafka.coordinator.GroupMetadataTest > testCannotRebalanceWhenPreparingRebalance 
STARTED

kafka.coordinator.GroupMetadataTest > testCannotRebalanceWhenPreparingRebalance 
PASSED

kafka.coordinator.GroupMetadataTest > 
testDeadToPreparingRebalanceIllegalTransition STARTED

kafka.coordinator.GroupMetadataTest > 
testDeadToPreparingRebalanceIllegalTransition PASSED

kafka.coordinator.GroupMetadataTest > testCanRebalanceWhenAwaitingSync STARTED

kafka.coordinator.GroupMetadataTest > testCanRebalanceWhenAwaitingSync PASSED

kafka.coordinator.GroupMetadataTest > 
testAwaitingSyncToPreparingRebalanceTransition STARTED

kafka.coordinator.GroupMetadataTest > 
testAwaitingSyncToPreparingRebalanceTransition PASSED

kafka.coordinator.GroupMetadataTest > testStableToAwaitingSyncIllegalTransition 
STARTED

kafka.coordinator.GroupMetadataTest > testStableToAwaitingSyncIllegalTransition 
PASSED

kafka.coordinator.GroupMetadataTest > testEmptyToDeadTransition STARTED

kafka.coordinator.GroupMetadataTest > testEmptyToDeadTransition PASSED

kafka.coordinator.GroupMetadataTest > testSelectProtocolRaisesIfNoMembers 
STARTED

kafka.coordinator.GroupMetadataTest > testSelectProtocolRaisesIfNoMembers PASSED

kafka.coordinator.GroupMetadataTest > testStableToPreparingRebalanceTransition 
STARTED

kafka.coordinator.GroupMetadataTest > testStableToPreparingRebalanceTransition 
PASSED

kafka.coordinator.GroupMetadataTest > testPreparingRebalanceToDeadTransition 
STARTED

kafka.coordinator.GroupMetadataTest > testPreparingRebalanceToDeadTransition 
PASSED

kafka.coordinator.GroupMetadataTest > testStableToStableIllegalTransition 
STARTED

kafka.coordinator.GroupMetadataTest > testStableToStableIllegalTransition PASSED

kafka.coordinator.GroupMetadataTest > testAwaitingSyncToStableTransition STARTED

kafka.coordinator.GroupMetadataTest > testAwaitingSyncToStableTransition PASSED

kafka.coordinator.GroupMetadataTest > testOffsetCommitFailureWithAnotherPending 
STARTED

kafka.coordinator.GroupMetadataTest > testOffsetCommitFailureWithAnotherPending 
PASSED

kafka.coordinator.GroupMetadataTest > testDeadToStableIllegalTransition STARTED

kafka.coordinator.GroupMetadataTest > testDeadToStableIllegalTransition PASSED

kafka.coordinator.GroupMetadataTest > testOffsetCommit STARTED

kafka.coordinator.GroupMetadataTest > testOffsetCommit PASSED

kafka.coordinator.GroupMetadataTest > 
testAwaitingSyncToAwaitingSyncIllegalTransition STARTED

kafka.coordinator.GroupMetadataTest > 
testAwai

[jira] [Commented] (KAFKA-4643) Improve test coverage of StreamsKafkaClient

2017-03-13 Thread Andrey Dyachkov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15922945#comment-15922945
 ] 

Andrey Dyachkov commented on KAFKA-4643:


 I would like to grab this one, could you explain more on `getTopicMetadata` 
since I even was not able to find it in StreamsKafkaClient?

> Improve test coverage of StreamsKafkaClient
> ---
>
> Key: KAFKA-4643
> URL: https://issues.apache.org/jira/browse/KAFKA-4643
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
> Fix For: 0.11.0.0
>
>
> Exception paths not tested.
> {{getTopicMetadata}} not tested



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4859) Transient test failure: org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion (again)

2017-03-13 Thread Armin Braun (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15922979#comment-15922979
 ] 

Armin Braun commented on KAFKA-4859:


[~guozhang] zeroing in on it :) Should have a PR in either ~ 2h or ~ 12h :)

> Transient test failure: 
> org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion
>  (again)
> ---
>
> Key: KAFKA-4859
> URL: https://issues.apache.org/jira/browse/KAFKA-4859
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Armin Braun
>Assignee: Armin Braun
>
> Slightly different than KAFKA-3874 in terms of the way it fails.
> Now we have:
> {code}
> Error Message
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
> records from topic output-topic-2 while only received 0: []
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
> records from topic output-topic-2 while only received 0: []
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:257)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:206)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:175)
>   at 
> org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion(KStreamKTableJoinIntegrationTest.java:297)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> {code}
> e.g. here https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2032/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-125: ZookeeperConsumerConnector to KafkaConsumer Migration and Rollback

2017-03-13 Thread Jason Gustafson
Hey Onur,


> Regarding 1: I've been considering something like this for a while now.
> KIP-122 has a similar issue and I brought up some hacks in that discussion
> to work around it (http://markmail.org/message/kk4ng74riejidify). While
> solving this problem would help loosen the requirements for migration, it
> seems beyond the scope of this KIP. It's hard to say whether we should be
> trying to solve that issue here.


I won't press if you don't want to do it here, but the point for this KIP
would be to avoid the awkward requirement to first disable offset commits
in Kafka, which feels like a step backwards. I can imagine it causing some
confusion (and annoyance for any users tracking progress through offset
commits in Kafka), but it's probably fine as long as the documentation is
clear.

Regarding 2: I agree that we should offer a tool somewhere to help with the
> migration and do the toggle. It's not clear to me if we should put it in
> kafka-consumer-groups.sh or in some new migration script.


Either way works for me. Eventually we'll deprecate and remove the
capability, so having a separate tool may make that easier. Probably makes
sense for this tool to be part of the KIP.

As an example, we can rid of the notion of "coordination.migration.enabled"
> and just have a config called "coordination.migration.mode" whose values
> can be {"off", "manual", "auto"} where:
>

The "auto" option seems useful. I'm tempted to suggest that be the default
setting, but I guess that would be dangerous since the old group may still
be committing offsets to Kafka. Still it seems useful not to always require
the manual step, especially once you've validated the workflow.

Thanks,
Jason


On Fri, Mar 10, 2017 at 12:42 PM, Onur Karaman  wrote:

> I forgot to mention that in that above idea, the
> "coordination.migration.mode" config would default to "auto".
>
> On Fri, Mar 10, 2017 at 1:08 AM, Onur Karaman <
> onurkaraman.apa...@gmail.com>
> wrote:
>
> > Hey Jason.
> >
> > Thanks for the comments!
> >
> > Regarding 1: I've been considering something like this for a while now.
> > KIP-122 has a similar issue and I brought up some hacks in that
> discussion
> > to work around it (http://markmail.org/message/kk4ng74riejidify). While
> > solving this problem would help loosen the requirements for migration, it
> > seems beyond the scope of this KIP. It's hard to say whether we should be
> > trying to solve that issue here.
> >
> > Regarding 2: I agree that we should offer a tool somewhere to help with
> > the migration and do the toggle. It's not clear to me if we should put it
> > in kafka-consumer-groups.sh or in some new migration script.
> >
> > Regarding general migration complexity: something else Joel and I had
> > considered was the ability to optionally create the toggle on startup to
> > skip the step of having to manually set the toggle. There are many ways
> we
> > can do this.
> >
> > As an example, we can rid of the notion of "coordination.migration.
> enabled"
> > and just have a config called "coordination.migration.mode" whose values
> > can be {"off", "manual", "auto"} where:
> >
> >- "off" would act like "coordination.migration.enabled" set to false.
> >We do not participate in coordination migration.
> >- "manual" would act like "coordination.migration.enabled" set to
> true
> >in the current KIP proposal. Do not attempt to create the toggle on
> >startup, but spin up an EKC and be ready to react to the toggle. This
> mode
> >helps an org gradually migrate to or rollback from kafka-based
> coordination.
> >- "auto" would act like "coordination.migration.enabled" set to true
> >in the current KIP proposal but additionally attempt to create the
> toggle
> >with "kafka" on startup if the znode doesn't already exist. The same
> rules
> >from the KIP apply where if a OZKCC or MDZKCC exists, the value is
> ignored
> >and we just use zookeeper-based coordination. This mode lets us skip
> the
> >step of having to manually set the toggle.
> >
> > Let me know what you think!
> >
> > On Thu, Mar 9, 2017 at 10:30 AM, Jason Gustafson 
> > wrote:
> >
> >> Hey Onur,
> >>
> >> Sorry for the late reply. Thanks for the well-written KIP! I think the
> >> proposal makes sense. The only thing I was wondering is whether the
> >> process
> >> is a bit complex for most users. You'd probably have no trouble at LI
> >> (especially given you're implementing it!), but I'm not so sure about
> the
> >> users who aren't as close to the Kafka internals. That said, I don't see
> >> any great options to simplify the process, and having this approach is
> >> better than having none, so maybe it's fine. Here are a couple minor
> >> suggestions:
> >>
> >> 1. One thought that came to mind is whether it would be worthwhile to
> add
> >> a
> >> broker config to disable the group membership check for offset commits.
> >> This would simplify the process by eliminating the initial step of
> turning

Re: [DISCUSS] KIP-132: Augment KStream.print to allow extra parameters in the printed string

2017-03-13 Thread Matthias J. Sax
Can you please add the KIP to this table:

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals#KafkaImprovementProposals-KIPsunderdiscussion

Thanks,

 Matthias


On 3/13/17 8:08 AM, Marc Juchli wrote:
> Dear all,
> 
> The following describes KIP-132, which I just created. See:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-132+-+Augment+KStream.print+to+allow+extra+parameters+in+the+printed+string
> 
> Motivation
> 
> As for now, KStream#print leads to a predefined output where key and value are
> printed with comma separation.
> KAFKA-4830  suggests to
> extend print in a way that it takes KeyValueMapper as a parameter.
> This will allow a user to change outputs according to the users demand.
> Public Interfaces
> 
> The affected interface is KStream, which needs to be extended with another
> overloaded version of print:
> 
> void print(final Serde keySerde,
>final Serde valSerde,
>final String streamName,
>final KeyValueMapper mapper);
> 
> Proposed Changes
> 
> See pull request GH-2669 .
> This PR contains a discussion regarding KAFKA-4830
>  as well as KAFKA-4772
> .
> 
> Compatibility, Deprecation, and Migration Plan
> 
> The extension of print will not introduce compatibility issues – we can
> maintain the current output by keeping the current output format as a
> default (if mapper was not set):
> 
> if(mapper == null) {
> printStream.println("[" + streamName + "]: " + keyToPrint + " , "
> + valueToPrint);
> } else {
> printStream.println("[" + streamName + "]: " +
> mapper.apply(keyToPrint, valueToPrint));
> }
> 
> 
> 
> Kind regards,
> Marc
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-125: ZookeeperConsumerConnector to KafkaConsumer Migration and Rollback

2017-03-13 Thread Jason Gustafson
Oh, seems I missed your comment saying the default would be "auto." Hmm...
If that's safe, then it sounds good to me.

-Jason

On Mon, Mar 13, 2017 at 2:32 PM, Jason Gustafson  wrote:

> Hey Onur,
>
>
>> Regarding 1: I've been considering something like this for a while now.
>> KIP-122 has a similar issue and I brought up some hacks in that
>> discussion
>> to work around it (http://markmail.org/message/kk4ng74riejidify). While
>> solving this problem would help loosen the requirements for migration, it
>> seems beyond the scope of this KIP. It's hard to say whether we should be
>> trying to solve that issue here.
>
>
> I won't press if you don't want to do it here, but the point for this KIP
> would be to avoid the awkward requirement to first disable offset commits
> in Kafka, which feels like a step backwards. I can imagine it causing some
> confusion (and annoyance for any users tracking progress through offset
> commits in Kafka), but it's probably fine as long as the documentation is
> clear.
>
> Regarding 2: I agree that we should offer a tool somewhere to help with the
>> migration and do the toggle. It's not clear to me if we should put it in
>> kafka-consumer-groups.sh or in some new migration script.
>
>
> Either way works for me. Eventually we'll deprecate and remove the
> capability, so having a separate tool may make that easier. Probably makes
> sense for this tool to be part of the KIP.
>
> As an example, we can rid of the notion of "coordination.migration.enable
>> d"
>> and just have a config called "coordination.migration.mode" whose values
>> can be {"off", "manual", "auto"} where:
>>
>
> The "auto" option seems useful. I'm tempted to suggest that be the default
> setting, but I guess that would be dangerous since the old group may still
> be committing offsets to Kafka. Still it seems useful not to always require
> the manual step, especially once you've validated the workflow.
>
> Thanks,
> Jason
>
>
> On Fri, Mar 10, 2017 at 12:42 PM, Onur Karaman <
> onurkaraman.apa...@gmail.com> wrote:
>
>> I forgot to mention that in that above idea, the
>> "coordination.migration.mode" config would default to "auto".
>>
>> On Fri, Mar 10, 2017 at 1:08 AM, Onur Karaman <
>> onurkaraman.apa...@gmail.com>
>> wrote:
>>
>> > Hey Jason.
>> >
>> > Thanks for the comments!
>> >
>> > Regarding 1: I've been considering something like this for a while now.
>> > KIP-122 has a similar issue and I brought up some hacks in that
>> discussion
>> > to work around it (http://markmail.org/message/kk4ng74riejidify). While
>> > solving this problem would help loosen the requirements for migration,
>> it
>> > seems beyond the scope of this KIP. It's hard to say whether we should
>> be
>> > trying to solve that issue here.
>> >
>> > Regarding 2: I agree that we should offer a tool somewhere to help with
>> > the migration and do the toggle. It's not clear to me if we should put
>> it
>> > in kafka-consumer-groups.sh or in some new migration script.
>> >
>> > Regarding general migration complexity: something else Joel and I had
>> > considered was the ability to optionally create the toggle on startup to
>> > skip the step of having to manually set the toggle. There are many ways
>> we
>> > can do this.
>> >
>> > As an example, we can rid of the notion of
>> "coordination.migration.enabled"
>> > and just have a config called "coordination.migration.mode" whose values
>> > can be {"off", "manual", "auto"} where:
>> >
>> >- "off" would act like "coordination.migration.enabled" set to
>> false.
>> >We do not participate in coordination migration.
>> >- "manual" would act like "coordination.migration.enabled" set to
>> true
>> >in the current KIP proposal. Do not attempt to create the toggle on
>> >startup, but spin up an EKC and be ready to react to the toggle.
>> This mode
>> >helps an org gradually migrate to or rollback from kafka-based
>> coordination.
>> >- "auto" would act like "coordination.migration.enabled" set to true
>> >in the current KIP proposal but additionally attempt to create the
>> toggle
>> >with "kafka" on startup if the znode doesn't already exist. The same
>> rules
>> >from the KIP apply where if a OZKCC or MDZKCC exists, the value is
>> ignored
>> >and we just use zookeeper-based coordination. This mode lets us skip
>> the
>> >step of having to manually set the toggle.
>> >
>> > Let me know what you think!
>> >
>> > On Thu, Mar 9, 2017 at 10:30 AM, Jason Gustafson 
>> > wrote:
>> >
>> >> Hey Onur,
>> >>
>> >> Sorry for the late reply. Thanks for the well-written KIP! I think the
>> >> proposal makes sense. The only thing I was wondering is whether the
>> >> process
>> >> is a bit complex for most users. You'd probably have no trouble at LI
>> >> (especially given you're implementing it!), but I'm not so sure about
>> the
>> >> users who aren't as close to the Kafka internals. That said, I don't
>> see
>> >> any great options to simplify the process, an

Jenkins build is back to normal : kafka-trunk-jdk7 #2012

2017-03-13 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-03-13 Thread Colin McCabe
Hi Radai,

Thanks for looking at the KIP again.

On Mon, Mar 13, 2017, at 12:33, radai wrote:
> looking at the KIP as it is now, looks like all *Options objects have a
> common timeout property. could it be extracted to a common
> AdminRequestOptions or something?

Perhaps I'm missing something, but I don't think there is any reason to
extract the timeout property.  It doesn't simplify the implementation
(I've already implemented the interface in a branch, so I know this for
sure.)  It doesn't simplify the API exposed to the users, since they
will still want to provide the specific option type corresponding to the
call.  Also, as we discussed previously in the thread (about NewTopic),
having lot of inheritance and base classes makes it difficult to change
classes over time.  It is better to simply use composition.

I think it would be much better to get the AdminClient interface in, and
start iterating on it incrementally as we discover ways it could be
better.  This is similar to how some things in Streams were added as
unstable interfaces and then stabilized over time.

best,
Colin


> 
> On Thu, Mar 9, 2017 at 2:14 PM, Colin McCabe  wrote:
> 
> > Hi all,
> >
> > We've been discussing this for a while (about a month) and I think
> > people have made some great points that improved the proposal.  In
> > particular, adding async and batching was important.  I've also been
> > talking with some end-users who would like to make use of this API.
> > Once this is in, we can iterate on it before the release, and it will
> > also unblock a lot of other admin proposals.  I think it would be good
> > to start the vote in a little bit, assuming there are no objections.
> >
> > best,
> > Colin
> >
> >
> > On Wed, Mar 8, 2017, at 17:09, Colin McCabe wrote:
> > > On Mon, Mar 6, 2017, at 11:55, Ismael Juma wrote:
> > > > Thanks Colin.
> > > >
> > > > I am familiar with the protocol semantics, but we need to document the
> > > > API
> > > > for users who don't know the protocol. I still think it would be
> > valuable
> > > > to have some examples of how the API would be used for common use
> > cases.
> > >
> > > Getting the version of all nodes in the cluster:
> > >   Map nodesToVersions =
> > > adminClient.listNodes().nodes().thenApply(
> > >   nodes -> adminClient.apiVersions(nodes)).all().get();
> > >
> > > Creating a topic:
> > >   adminClient.createTopic(new NewTopic("myNewTopic", 3, (short)
> > >   3)).all().get();
> > >
> > > Validating that a topic can be created (but not creating it):
> > >   adminClient.createTopic(new NewTopic("myNewTopic", 3, (short) 3),
> > > new CreateTopicOptions().setValidateOnly(true)).all().get();
> > >
> > > > For example, say someone creates a topic and then produces to it. What
> > > > would be the recommended way to do that?
> > >
> > > Once the future returned by createTopics has successfully completed, it
> > > should be possible to produce to the topic.
> > >
> > > There are a few warts that are definitely worth calling out.  These are
> > > things that need to be fixed at the protocol layer, so they're outside
> > > the scope of this KIP.  But you made a good point that we need to
> > > document this well.  Here's my list (I wonder if anyone has more?):
> > >
> > > * If auto.create.topics.enable is true on the brokers,
> > > AdminClient#describeTopic(topicName) may create a topic named topicName.
> > >  There are two workarounds: either use AdminClient#listTopics and ensure
> > > that the topic is present before describing, or disable
> > > auto.create.topics.enable.
> > >
> > > * If delete.topic.enable is false on the brokers,
> > > AdminClient#deleteTopic(topicName) will mark topicName for deletion, but
> > > not actually delete it.  deleteTopic will return success in this case.
> > >
> > > * It may take several seconds after AdminClient#deleteTopic returns
> > > success for all the brokers to become aware that the topic is gone.
> > > During this time, AdminClient#listTopics and AdminClient#describeTopic
> > > may continue to return information about the deleted topic.
> > >
> > > best,
> > > Colin
> > >
> > >
> > > >
> > > > Ismael
> > > >
> > > > On Mon, Mar 6, 2017 at 7:43 PM, Colin McCabe 
> > wrote:
> > > >
> > > > > On Mon, Mar 6, 2017, at 05:50, Ismael Juma wrote:
> > > > > > Thanks Colin. It seems like you replied to me accidentally instead
> > of the
> > > > > > list, so leaving your reply below for the benefit of others.
> > > > >
> > > > > Thanks, Ismael.  I actually realized my mistake right after I sent to
> > > > > you, and re-posted it to the mailing list instead of sending
> > directly.
> > > > > Sigh...
> > > > >
> > > > > >
> > > > > > Regarding the disadvantage of having to hunt through the request
> > class,
> > > > > > don't people have to do that anyway with the Options classes?
> > > > >
> > > > > A lot of people will simply choose the default options, until they
> > have
> > > > > a reason to do otherwise (for example, they want a longer o

Build failed in Jenkins: kafka-trunk-jdk8 #1347

2017-03-13 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-4769: Add Float serializer, deserializer, serde

[wangguoz] KAFKA-4657: Improve test coverage of CompositeReadOnlyWindowStore

[becket.qin] KAFKA-4891; kafka.request.logger TRACE regression

--
[...truncated 325.59 KB...]

kafka.producer.ProducerTest > testSendNullMessage PASSED

kafka.producer.ProducerTest > testUpdateBrokerPartitionInfo STARTED

kafka.producer.ProducerTest > testUpdateBrokerPartitionInfo PASSED

kafka.producer.ProducerTest > testSendWithDeadBroker STARTED

kafka.producer.ProducerTest > testSendWithDeadBroker PASSED

kafka.tools.MirrorMakerIntegrationTest > testCommaSeparatedRegex STARTED

kafka.tools.MirrorMakerIntegrationTest > testCommaSeparatedRegex PASSED

kafka.tools.ReplicaVerificationToolTest > testReplicaBufferVerifyChecksum 
STARTED

kafka.tools.ReplicaVerificationToolTest > testReplicaBufferVerifyChecksum PASSED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit STARTED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig PASSED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails STARTED

kafka.tools.ConsoleConsumerTest > shouldStopWhenOutputCheckErrorFails PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithStringOffset STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithStringOffset PASSED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile STARTED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile PASSED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset STARTED

kafka.tools.ConsoleConsumerTest > 
shouldParseValidNewSimpleConsumerValidConfigWithNumericOffset PASSED

kafka.tools.ConsoleConsumerTest > testDefaultConsumer STARTED

kafka.tools.ConsoleConsumerTest > testDefaultConsumer PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidOldConsumerValidConfig STARTED

kafka.tools.ConsoleConsumerTest > shouldParseValidOldConsumerValidConfig PASSED

kafka.tools.ConsoleProducerTest > testParseKeyProp STARTED

kafka.tools.ConsoleProducerTest > testParseKeyProp PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer PASSED

kafka.tools.ConsoleProducerTest > testInvalidConfigs STARTED

kafka.tools.ConsoleProducerTest > testInvalidConfigs PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer STARTED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer PASSED

kafka.tools.MirrorMakerTest > 
testDefaultMirrorMakerMessageHandlerWithNoTimestampInSourceMessage STARTED

kafka.tools.MirrorMakerTest > 
testDefaultMirrorMakerMessageHandlerWithNoTimestampInSourceMessage PASSED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandler STARTED

kafka.tools.MirrorMakerTest > testDefaultMirrorMakerMessageHandler PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes PASSED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic STARTED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic PASSED

kafka.common.ConfigTest > testInvalidGroupIds STARTED

kafka.common.ConfigTest > testInvalidGroupIds PASSED

kafka.common.ConfigTest > testInvalidClientIds STARTED

kafka.common.ConfigTest > testInvalidClientIds PASSED

kafka.common.ZkNodeChangeNotificationListenerTest > testProcessNotification 
STARTED

kafka.common.ZkNodeChangeNotificationListenerTest > testProcessNotification 
PASSED

kafka.common.TopicTest > testInvalidTopicNames STARTED

kafka.common.TopicTest > testI

[jira] [Commented] (KAFKA-4859) Transient test failure: org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion (again)

2017-03-13 Thread Armin Braun (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15923095#comment-15923095
 ] 

Armin Braun commented on KAFKA-4859:


[~guozhang] so basically this can just be resolved by setting a higher timeout 
on the waiting before the last assert at the end of the test.

{code:java}
final List> actualClicksPerRegion = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
outputTopic, expectedClicksPerRegion.size(), 2 * 
IntegrationTestUtils.DEFAULT_TIMEOUT);
assertThat(actualClicksPerRegion, equalTo(expectedClicksPerRegion));
   }
{code}

Tested this over 200+ iterations just now, while I get to less than 30 without 
this adjustment.
What I find interesting though is that the runtime for this test is always very 
close to 30s. Always kind of between 30.4s - 30.6s. Is this just a coincidence 
or is that 30s maybe coming from some place I missed and would allow expressing 
the timeout a little cleaner (i.e. as some constant + x)?
If not I'd just put in a PR with the raised timeout?

> Transient test failure: 
> org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion
>  (again)
> ---
>
> Key: KAFKA-4859
> URL: https://issues.apache.org/jira/browse/KAFKA-4859
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Armin Braun
>Assignee: Armin Braun
>
> Slightly different than KAFKA-3874 in terms of the way it fails.
> Now we have:
> {code}
> Error Message
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
> records from topic output-topic-2 while only received 0: []
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
> records from topic output-topic-2 while only received 0: []
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:257)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:206)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:175)
>   at 
> org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion(KStreamKTableJoinIntegrationTest.java:297)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> {code}
> e.g. here https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2032/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-03-13 Thread Dong Lin
Hey Jun,

Thanks much for your detailed comments. Please see my reply below.

On Mon, Mar 13, 2017 at 9:09 AM, Jun Rao  wrote:

> Hi, Dong,
>
> Thanks for the updated KIP. Some more comments below.
>
> 10. For the .move log, do we perform any segment deletion (based on
> retention) or log cleaning (if a compacted topic)? Or do we only enable
> that after the swap?
>
> 11. kafka-reassign-partitions.sh
> 11.1 If all reassigned replicas are in the current broker and only the log
> directories have changed, we can probably optimize the tool to not trigger
> partition reassignment through the controller and only
> send ChangeReplicaDirRequest.
>

Yes, the reassignment script should not create the reassignment znode if no
replicas are not be moved between brokers. This falls into the "How to move
replica between log directories on the same broker" of the Proposed Change
section.


> 11.2 If ChangeReplicaDirRequest specifies a replica that's not created yet,
> could the broker just remember that in memory and create the replica when
> the creation is requested? This way, when doing cluster expansion, we can
> make sure that the new replicas on the new brokers are created in the right
> log directory in the first place. We can also avoid the tool having to keep
> issuing ChangeReplicaDirRequest in response to
> ReplicaNotAvailableException.
>

I am concerned that the ChangeReplicaDirRequest would be lost if broker
restarts after it sends ChangeReplicaDirResponse but before it receives
LeaderAndIsrRequest. In this case, the user will receive success when they
initiate replica reassignment, but replica reassignment will never complete
when they verify the reassignment later. This would be confusing to user.

There are three different approaches to this problem if broker has not
created replica yet after it receives ChangeReplicaDirResquest:

1) Broker immediately replies to user with ReplicaNotAvailableException and
user can decide to retry again later. The advantage of this solution is
that the broker logic is very simple and the reassignment script logic also
seems straightforward. The disadvantage is that user script has to retry.
But it seems fine - we can set interval between retries to be 0.5 sec so
that broker want be bombarded by those requests. This is the solution
chosen in the current KIP.

2) Broker can put ChangeReplicaDirRequest in a purgatory with timeout and
replies to user after the replica has been created. I didn't choose this in
the interest of keeping broker logic simpler.

3) Broker can remember that by making a mark in the disk, e.g. create
topicPartition.tomove directory in the destination log directory. This mark
will be persisted across broker restart. This is the first idea I had but I
replaced it with solution 1) in the interest of keeping broker simple.

It seems that solution 1) is the simplest one that works. But I am OK to
switch to the other two solutions if we don't want the retry logic. What do
you think?


11.3 Do we need an option in the tool to specify intra.broker.
> throttled.rate?
>

I don't find it useful to add this option to kafka-reassign-partitions.sh.
The reason we have the option "--throttle" in the script to throttle
replication rate is that we usually want higher quota to fix an offline
replica to get out of URP. But we are OK to have a lower quota if we are
moving replica only to balance the cluster. Thus it is common for SRE to
use different quota when using kafka-reassign-partitions.sh to move replica
between brokers.

However, the only reason for moving replica between log directories of the
same broker is to balance cluster resource. Thus the option to
specify intra.broker.throttled.rate in the tool is not that useful. I am
inclined not to add this option to keep this tool's usage simpler.


>
> 12. DescribeDirsRequest
> 12.1 In other requests like CreateTopicRequest, we return an empty list in
> the response for an empty input list. If the input list is null, we return
> everything. We should probably follow the same convention here.
>

Thanks. I wasn't aware of this convention. I have change
DescribeDirsRequest so that "null" indicates "all".


> 12.2 Do we need the topics field? Since the request is about log dirs, it
> makes sense to specify the log dirs. But it's weird to specify topics.
>

The topics field is not necessary. But it is useful to reduce the response
size in case user are only interested in the status of a few topics. For
example, user may have initiated the reassignment of a given replica from
one log directory to another log directory on the same broker, and the user
only wants to check the status of this given partition by looking
at DescribeDirsResponse. Thus this field is useful.

I am not sure if it is weird to call this request DescribeDirsRequest. The
response is a map from log directory to information to some partitions on
the log directory. Do you think we need to change the name of the request?


> 12.3 DescribeDirsResponsePartition

Build failed in Jenkins: kafka-trunk-jdk8 #1348

2017-03-13 Thread Apache Jenkins Server
See 


Changes:

[jason] HOTFIX: Fix apache headers in float serde class files

--
[...truncated 21.52 KB...]
kafka.log.BrokerCompressionTest > testBrokerSideCompression[8] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[9] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[9] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[10] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[10] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[11] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[11] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[12] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[12] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[13] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[13] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[14] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[14] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[15] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[15] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[16] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[16] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[17] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[17] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[18] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[18] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[19] STARTED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[19] PASSED

kafka.log.OffsetIndexTest > lookupExtremeCases STARTED

kafka.log.OffsetIndexTest > lookupExtremeCases PASSED

kafka.log.OffsetIndexTest > appendTooMany STARTED

kafka.log.OffsetIndexTest > appendTooMany PASSED

kafka.log.OffsetIndexTest > randomLookupTest STARTED

kafka.log.OffsetIndexTest > randomLookupTest PASSED

kafka.log.OffsetIndexTest > testReopen STARTED

kafka.log.OffsetIndexTest > testReopen PASSED

kafka.log.OffsetIndexTest > appendOutOfOrder STARTED

kafka.log.OffsetIndexTest > appendOutOfOrder PASSED

kafka.log.OffsetIndexTest > truncate STARTED

kafka.log.OffsetIndexTest > truncate PASSED

kafka.log.TimeIndexTest > testTruncate STARTED

kafka.log.TimeIndexTest > testTruncate PASSED

kafka.log.TimeIndexTest > testAppend STARTED

kafka.log.TimeIndexTest > testAppend PASSED

kafka.log.TimeIndexTest > testLookUp STARTED

kafka.log.TimeIndexTest > testLookUp PASSED

kafka.log.OffsetMapTest > testClear STARTED

kafka.log.OffsetMapTest > testClear PASSED

kafka.log.OffsetMapTest > testGetWhenFull STARTED

kafka.log.OffsetMapTest > testGetWhenFull PASSED

kafka.log.OffsetMapTest > testBasicValidation STARTED

kafka.log.OffsetMapTest > testBasicValidation PASSED

kafka.log.LogManagerTest > testCleanupSegmentsToMaintainSize STARTED

kafka.log.LogManagerTest > testCleanupSegmentsToMaintainSize PASSED

kafka.log.LogManagerTest > testRecoveryDirectoryMappingWithRelativeDirectory 
STARTED

kafka.log.LogManagerTest > testRecoveryDirectoryMappingWithRelativeDirectory 
PASSED

kafka.log.LogManagerTest > testGetNonExistentLog STARTED

kafka.log.LogManagerTest > testGetNonExistentLog PASSED

kafka.log.LogManagerTest > testTwoLogManagersUsingSameDirFails STARTED

kafka.log.LogManagerTest > testTwoLogManagersUsingSameDirFails PASSED

kafka.log.LogManagerTest > testLeastLoadedAssignment STARTED

kafka.log.LogManagerTest > testLeastLoadedAssignment PASSED

kafka.log.LogManagerTest > testCleanupExpiredSegments STARTED

kafka.log.LogManagerTest > testCleanupExpiredSegments PASSED

kafka.log.LogManagerTest > testCheckpointRecoveryPoints STARTED

kafka.log.LogManagerTest > testCheckpointRecoveryPoints PASSED

kafka.log.LogManagerTest > testTimeBasedFlush STARTED

kafka.log.LogManagerTest > testTimeBasedFlush PASSED

kafka.log.LogManagerTest > testCreateLog STARTED

kafka.log.LogManagerTest > testCreateLog PASSED

kafka.log.LogManagerTest > testRecoveryDirectoryMappingWithTrailingSlash STARTED

kafka.log.LogManagerTest > testRecoveryDirectoryMappingWithTrailingSlash PASSED

kafka.log.LogManagerTest > testDoesntCleanLogsWithCompactDeletePolicy STARTED

kafka.log.LogManagerTest > testDoesntCleanLogsWithCompactDeletePolicy PASSED

kafka.log.LogCleanerTest > testCleanCorruptMessageSet STARTED

kafka.log.LogCleanerTest > testCleanCorruptMessageSet PASSED

kafka.log.LogCleanerTest > testBuildOffsetMap STARTED

kafka.log.LogCleanerTest > testBuildOffsetMap PASSED

kafka.log.LogCleanerTest > testBuildOffsetMapFakeLarge STARTED

kafka.log.LogCleanerTest > testBuildOffsetMapFakeLarge PASSED

kafka.log.LogCleanerTest > testSegmentGrouping STARTED

kafka.log.LogCleanerTe

[GitHub] kafka pull request #2680: KAFKA-4859: Raised Timeout

2017-03-13 Thread original-brownbear
GitHub user original-brownbear opened a pull request:

https://github.com/apache/kafka/pull/2680

KAFKA-4859: Raised Timeout

This fixes https://issues.apache.org/jira/browse/KAFKA-4859 for me over 
hundreds of iterations while I could easily reproduce it with less than ~30-40 
iterations without the increased timeout.

Also raising the timeout (at least on my setup) looks like a valid approach 
looking at test runtimes being consistently slightly above those 30s default 
timeout coming from 
`org.apache.kafka.streams.integration.utils.IntegrationTestUtils#DEFAULT_TIMEOUT`.


![byregion](https://cloud.githubusercontent.com/assets/6490959/23878174/1eac8154-0846-11e7-82a7-9e04f235630f.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/original-brownbear/kafka KAFKA-4859

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2680.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2680


commit 7f2d8854e369ced5c3a6e44da9042a84712601a1
Author: Armin Braun 
Date:   2017-03-13T22:35:06Z

KAFKA-4859 Raised Timeout




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4859) Transient test failure: org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion (again)

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15923105#comment-15923105
 ] 

ASF GitHub Bot commented on KAFKA-4859:
---

GitHub user original-brownbear opened a pull request:

https://github.com/apache/kafka/pull/2680

KAFKA-4859: Raised Timeout

This fixes https://issues.apache.org/jira/browse/KAFKA-4859 for me over 
hundreds of iterations while I could easily reproduce it with less than ~30-40 
iterations without the increased timeout.

Also raising the timeout (at least on my setup) looks like a valid approach 
looking at test runtimes being consistently slightly above those 30s default 
timeout coming from 
`org.apache.kafka.streams.integration.utils.IntegrationTestUtils#DEFAULT_TIMEOUT`.


![byregion](https://cloud.githubusercontent.com/assets/6490959/23878174/1eac8154-0846-11e7-82a7-9e04f235630f.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/original-brownbear/kafka KAFKA-4859

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2680.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2680


commit 7f2d8854e369ced5c3a6e44da9042a84712601a1
Author: Armin Braun 
Date:   2017-03-13T22:35:06Z

KAFKA-4859 Raised Timeout




> Transient test failure: 
> org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion
>  (again)
> ---
>
> Key: KAFKA-4859
> URL: https://issues.apache.org/jira/browse/KAFKA-4859
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Armin Braun
>Assignee: Armin Braun
>
> Slightly different than KAFKA-3874 in terms of the way it fails.
> Now we have:
> {code}
> Error Message
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
> records from topic output-topic-2 while only received 0: []
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
> records from topic output-topic-2 while only received 0: []
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:257)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:206)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:175)
>   at 
> org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion(KStreamKTableJoinIntegrationTest.java:297)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> {code}
> e.g. here https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2032/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4892) kafka 0.8.2.1 getOffsetsBefore API returning correct offset given timestamp in unix epoch format

2017-03-13 Thread Chris Bedford (JIRA)
Chris Bedford created KAFKA-4892:


 Summary: kafka 0.8.2.1 getOffsetsBefore API returning correct 
offset given timestamp in unix epoch format
 Key: KAFKA-4892
 URL: https://issues.apache.org/jira/browse/KAFKA-4892
 Project: Kafka
  Issue Type: Bug
  Components: clients, offset manager
Affects Versions: 0.8.2.1
 Environment: ubuntu 16.04
Reporter: Chris Bedford


I am seeing unexpected behavior in the getOffsetsBefore method of the client 
API.

I understand the granularity of 'start-from-offset' via kafka spout is based on 
how many log segments you have.  

I have created a demo program that repro's this on my 
git hub account [ 
https://github.com/buildlackey/kafkaOffsetBug/blob/master/README.md ], 
and I have also posted this same question to stack overflow with 
 a detailed set of steps for how to repro this issue 
(using my test program and scripts).  

See: 
http://stackoverflow.com/questions/42775128/kafka-0-8-2-1-getoffsetsbefore-api-returning-correct-offset-given-timestamp-in-u

Thanks in advance for your help !




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4890) State directory being deleted when another thread holds the lock

2017-03-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15923322#comment-15923322
 ] 

Guozhang Wang commented on KAFKA-4890:
--

[~damianguy] I looked at the logs (actually only one of the log files 
{{firefly_2.log}} which I believe correspond to the first trace you posted 
above). And here is what I have found:

1. Before thread-1 hit the error, it did seem like having a long GC for while 
it is creating task 6 (used {{grep -i "threadid" logfile}}):

{code}
2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.c.c.KafkaConsumer - Seeking to 
beginning of partition fireflyProd-perGameScoreStore-changelog-6
2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.c.m.Metrics - Added sensor 
with name topic.fireflyProd-userAwardStore-changelog.bytes-fetched
2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.c.m.Metrics - Added sensor 
with name topic.fireflyProd-userAwardStore-changelog.records-fetched
2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.c.m.Metrics - Added sensor 
with name fireflyProd-perGameScoreStore-changelog-6.records-lag
2017-03-12 20:40:22 [StreamThread-1] DEBUG o.a.k.c.c.KafkaConsumer - 
Unsubscribed all topics or patterns and assigned partitions// 11 seconds 
later
2017-03-12 20:40:22 [StreamThread-1] DEBUG o.a.k.c.m.Metrics - Removed sensor 
with name fireflyProd-perGameScoreStore-changelog-6.records-lag
{code}

And during this period of time thread-3 has deleted the state directory for 
task 0_6, as you observed:

{code}
2017-03-12 20:40:21 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_6 for task 0_6
{code}

With a deeper look, it actually shows that it is because thread-3 has once 
successfully grabbed the lock for 0_6 (during generation 97) before this task 
has been created on thread-1 (during generation 98) and within that 2 seconds, 
thread-3 grabs a bunch of locks and successfully deleted the state directories. 
At this time it likely did not have any data yet.

{code}
2017-03-12 20:37:04 [StreamThread-3] INFO  o.a.k.c.c.i.AbstractCoordinator - 
Successfully joined group fireflyProd with generation 97
2017-03-12 20:37:04 [StreamThread-3] INFO  o.a.k.c.c.i.ConsumerCoordinator - 
Setting newly assigned partitions [debugStatistics-12] for group fireflyProd
2017-03-12 20:37:04 [StreamThread-1] INFO  o.a.k.c.c.i.AbstractCoordinator - 
Successfully joined group fireflyProd with generation 97
2017-03-12 20:37:04 [StreamThread-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - 
Setting newly assigned partitions [debugStatistics-14] for group fireflyProd
...
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_10 for task 0_10
2017-03-12 20:38:07 [StreamThread-1] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_10 for task 0_10
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_0 for task 0_0
2017-03-12 20:38:07 [StreamThread-1] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_5 for task 0_5
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_0 for task 0_0
2017-03-12 20:38:07 [StreamThread-1] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_4 for task 0_4
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_5 for task 0_5
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_4 for task 0_4
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_5 for task 0_5
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_4 for task 0_4
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_15 for task 0_15
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_15 for task 0_15
2017-03-12 20:38:07 [StreamThread-1] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_15 for task 0_15
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_8 for task 0_8
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_8 for task 0_8
2017-03-12 20:38:07 [StreamThread-1] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_8 for task 0_8
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_2 for task 0_2
2017-03-12 20:38:07 [StreamThread-1] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_2 for task 0_2
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting ob

[jira] [Comment Edited] (KAFKA-4890) State directory being deleted when another thread holds the lock

2017-03-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15923322#comment-15923322
 ] 

Guozhang Wang edited comment on KAFKA-4890 at 3/14/17 12:41 AM:


[~damianguy] I looked at the logs (actually only one of the log files 
{{firefly_2.log}} which I believe corresponds to the first trace you posted 
above). And here is what I have found:

1. Before thread-1 hit the error, it did seem like having a long GC for while 
it is creating task 6 (used {{grep -i "threadid" logfile}}):

{code}
2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.c.c.KafkaConsumer - Seeking to 
beginning of partition fireflyProd-perGameScoreStore-changelog-6
2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.c.m.Metrics - Added sensor 
with name topic.fireflyProd-userAwardStore-changelog.bytes-fetched
2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.c.m.Metrics - Added sensor 
with name topic.fireflyProd-userAwardStore-changelog.records-fetched
2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.c.m.Metrics - Added sensor 
with name fireflyProd-perGameScoreStore-changelog-6.records-lag
2017-03-12 20:40:22 [StreamThread-1] DEBUG o.a.k.c.c.KafkaConsumer - 
Unsubscribed all topics or patterns and assigned partitions// 11 seconds 
later
2017-03-12 20:40:22 [StreamThread-1] DEBUG o.a.k.c.m.Metrics - Removed sensor 
with name fireflyProd-perGameScoreStore-changelog-6.records-lag
{code}

And during this period of time thread-3 has deleted the state directory for 
task 0_6, as you observed:

{code}
2017-03-12 20:40:21 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_6 for task 0_6
{code}

With a deeper look, it actually shows that it is because thread-3 has once 
successfully grabbed the lock for 0_6 (during generation 97) before this task 
has been created on thread-1 (during generation 98) and within that 2 seconds, 
thread-3 grabs a bunch of locks and successfully deleted the state directories. 
At this time it likely did not have any data yet.

{code}
2017-03-12 20:37:04 [StreamThread-3] INFO  o.a.k.c.c.i.AbstractCoordinator - 
Successfully joined group fireflyProd with generation 97
2017-03-12 20:37:04 [StreamThread-3] INFO  o.a.k.c.c.i.ConsumerCoordinator - 
Setting newly assigned partitions [debugStatistics-12] for group fireflyProd
2017-03-12 20:37:04 [StreamThread-1] INFO  o.a.k.c.c.i.AbstractCoordinator - 
Successfully joined group fireflyProd with generation 97
2017-03-12 20:37:04 [StreamThread-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - 
Setting newly assigned partitions [debugStatistics-14] for group fireflyProd
...
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_10 for task 0_10
2017-03-12 20:38:07 [StreamThread-1] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_10 for task 0_10
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_0 for task 0_0
2017-03-12 20:38:07 [StreamThread-1] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_5 for task 0_5
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_0 for task 0_0
2017-03-12 20:38:07 [StreamThread-1] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_4 for task 0_4
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_5 for task 0_5
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_4 for task 0_4
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_5 for task 0_5
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_4 for task 0_4
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_15 for task 0_15
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_15 for task 0_15
2017-03-12 20:38:07 [StreamThread-1] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_15 for task 0_15
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_8 for task 0_8
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_8 for task 0_8
2017-03-12 20:38:07 [StreamThread-1] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_8 for task 0_8
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_2 for task 0_2
2017-03-12 20:38:07 [StreamThread-1] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_2 for task 0_2
2017-03-12 20:38:07 [StreamThrea

[jira] [Commented] (KAFKA-4890) State directory being deleted when another thread holds the lock

2017-03-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15923328#comment-15923328
 ] 

Guozhang Wang commented on KAFKA-4890:
--

Since I do not have the code sketch it is hard to reproduce the issue. 
[~damianguy] could you ask the person who reported this issue to upload his 
code as well, and we can try if it is re-producible on trunk with 
"state.cleanup.delay.ms" setting to very low, say 100 ms?

> State directory being deleted when another thread holds the lock
> 
>
> Key: KAFKA-4890
> URL: https://issues.apache.org/jira/browse/KAFKA-4890
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
> Attachments: logs.tar.gz
>
>
> Looks like a state directory is being cleaned up when another thread already 
> has the lock:
> {code}
> 2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager 
> - task [0_6] Registering state store perGameScoreStore to its state manager
> 2017-03-12 20:40:21 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
> Deleting obsolete state directory 0_6 for task 0_6
> 2017-03-12 20:40:22 [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - 
> User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group 
> fireflyProd failed on partition assignment
> org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> executing put key 
> \x00\x00\x00\x00}\xA2\x9E\x9D\x05\xF6\x95\xAB\x01\x12dayOfGame and value 
> \x00\x00\x00\x00z\x00\x00\x00\x00\x00\x80G@ from store perGameScoreStore
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:248)
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:65)
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:156)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:230)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193)
> at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:152)
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:39)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
> at 
> org.apache.kafka.streams.processor.internals.StreamThrea

[jira] [Commented] (KAFKA-4890) State directory being deleted when another thread holds the lock

2017-03-13 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15923361#comment-15923361
 ] 

Guozhang Wang commented on KAFKA-4890:
--

I have uploaded a small PR to improve the log4j for debugging, since I compared 
0.10.2 and trunk but did not observe any differences in logic that could cause 
it in 0.10.2 except the cleanup delay change you made. So if you happen to have 
the code to reproduce could you try that with the PR and a small cleanup delay 
value? Or you can share the code so I can try reproducing on my end.

> State directory being deleted when another thread holds the lock
> 
>
> Key: KAFKA-4890
> URL: https://issues.apache.org/jira/browse/KAFKA-4890
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
> Attachments: logs.tar.gz
>
>
> Looks like a state directory is being cleaned up when another thread already 
> has the lock:
> {code}
> 2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager 
> - task [0_6] Registering state store perGameScoreStore to its state manager
> 2017-03-12 20:40:21 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
> Deleting obsolete state directory 0_6 for task 0_6
> 2017-03-12 20:40:22 [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - 
> User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group 
> fireflyProd failed on partition assignment
> org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> executing put key 
> \x00\x00\x00\x00}\xA2\x9E\x9D\x05\xF6\x95\xAB\x01\x12dayOfGame and value 
> \x00\x00\x00\x00z\x00\x00\x00\x00\x00\x80G@ from store perGameScoreStore
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:248)
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:65)
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:156)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:230)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193)
> at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:152)
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:39)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> at 
> org.apache.kafka.streams.processor.internals

[GitHub] kafka pull request #2681: KAFKA-4890: Improve log4j for debugging [WIP]

2017-03-13 Thread guozhangwang
GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/2681

KAFKA-4890: Improve log4j for debugging [WIP]



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka K4890-state-lock-cleanup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2681.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2681


commit d3be536da07ee816a3c56fa7f38f06d9177df3d8
Author: Guozhang Wang 
Date:   2017-03-14T01:10:10Z

improve log4j




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4890) State directory being deleted when another thread holds the lock

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15923367#comment-15923367
 ] 

ASF GitHub Bot commented on KAFKA-4890:
---

GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/2681

KAFKA-4890: Improve log4j for debugging [WIP]



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka K4890-state-lock-cleanup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2681.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2681


commit d3be536da07ee816a3c56fa7f38f06d9177df3d8
Author: Guozhang Wang 
Date:   2017-03-14T01:10:10Z

improve log4j




> State directory being deleted when another thread holds the lock
> 
>
> Key: KAFKA-4890
> URL: https://issues.apache.org/jira/browse/KAFKA-4890
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
> Attachments: logs.tar.gz
>
>
> Looks like a state directory is being cleaned up when another thread already 
> has the lock:
> {code}
> 2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager 
> - task [0_6] Registering state store perGameScoreStore to its state manager
> 2017-03-12 20:40:21 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
> Deleting obsolete state directory 0_6 for task 0_6
> 2017-03-12 20:40:22 [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - 
> User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group 
> fireflyProd failed on partition assignment
> org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> executing put key 
> \x00\x00\x00\x00}\xA2\x9E\x9D\x05\xF6\x95\xAB\x01\x12dayOfGame and value 
> \x00\x00\x00\x00z\x00\x00\x00\x00\x00\x80G@ from store perGameScoreStore
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:248)
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:65)
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:156)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:230)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193)
> at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:152)
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:39)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> at 
> org.apache.kafka.clients.consumer.

Re: [DISCUSS] KIP-126 - Allow KafkaProducer to batch based on uncompressed size

2017-03-13 Thread Becket Qin
Bump up the thread for further comments. If there is no more comments on
the KIP I will start the voting thread on Wed.

Thanks,

Jiangjie (Becket) Qin

On Tue, Mar 7, 2017 at 9:48 AM, Becket Qin  wrote:

> Hi Dong,
>
> Thanks for the comments.
>
> The patch is mostly for proof of concept in case there is any concern
> about the implementation which is indeed a little tricky.
>
> The new metric has already been mentioned in the Public Interface Change
> section.
>
> I added the reasoning about how the compression ratio
> improving/deteriorate steps are determined in the wiki.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Mon, Mar 6, 2017 at 4:42 PM, Dong Lin  wrote:
>
>> Hey Becket,
>>
>> I am wondering if we should first vote for the KIP before reviewing the
>> patch. I have two comments below:
>>
>> - Should we specify the new sensors as part of interface change in the
>> KIP?
>> - The KIP proposes to increase estimated compression ratio by 0.05 for
>> each
>> underestimation and decrement the estimation by 0.005 for each
>> overestimation. Why are these two values chosen? I think there is some
>> tradeoff in selecting the value. Can the KIP be more explicit about the
>> tradeoff and explain how these two values would impact producer's
>> performance?
>>
>> Thanks,
>> Dong
>>
>>
>> On Sat, Mar 4, 2017 at 11:42 AM, Becket Qin  wrote:
>>
>> > I have updated the KIP based on the latest discussion. Please check and
>> let
>> > me know if there is any further concern.
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> > On Sat, Mar 4, 2017 at 10:56 AM, Becket Qin 
>> wrote:
>> >
>> > > Actually second thought on this, rate might be better for two reasons:
>> > > 1. Most of the metrics in the producer we already have are using rate
>> > > instead of count.
>> > > 2. If a service is bounced, the count will be reset to 0, but it does
>> not
>> > > affect rate.
>> > >
>> > > I'll make the change.
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > > On Sat, Mar 4, 2017 at 10:27 AM, Becket Qin 
>> > wrote:
>> > >
>> > >> Hi Dong,
>> > >>
>> > >> Yes, there is a sensor in the patch about the split occurrence.
>> > >>
>> > >> Currently it is a count instead of rate. In practice, it seems count
>> is
>> > >> easier to use in this case. But I am open to change.
>> > >>
>> > >> Thanks,
>> > >>
>> > >> Jiangjie (Becket) Qin
>> > >>
>> > >> On Fri, Mar 3, 2017 at 7:43 PM, Dong Lin 
>> wrote:
>> > >>
>> > >>> Hey Becket,
>> > >>>
>> > >>> I haven't looked at the patch yet. But since we are going to try the
>> > >>> split-on-oversize solution, should the KIP also add a sensor that
>> shows
>> > >>> the
>> > >>> rate of split per second and the probability of split?
>> > >>>
>> > >>> Thanks,
>> > >>> Dong
>> > >>>
>> > >>>
>> > >>> On Fri, Mar 3, 2017 at 6:39 PM, Becket Qin 
>> > wrote:
>> > >>>
>> > >>> > Just to clarify, the implementation is basically what I mentioned
>> > above
>> > >>> > (split/resend + adjusted estimation evolving algorithm) and
>> changing
>> > >>> the
>> > >>> > compression ratio estimation to be per topic.
>> > >>> >
>> > >>> > Thanks,
>> > >>> >
>> > >>> > Jiangjie (Becket) Qin
>> > >>> >
>> > >>> > On Fri, Mar 3, 2017 at 6:36 PM, Becket Qin 
>> > >>> wrote:
>> > >>> >
>> > >>> > > I went ahead and have a patch submitted here:
>> > >>> > > https://github.com/apache/kafka/pull/2638
>> > >>> > >
>> > >>> > > Per Joel's suggestion, I changed the compression ratio to be per
>> > >>> topic as
>> > >>> > > well. It seems working well. Since there is an important
>> behavior
>> > >>> change
>> > >>> > > and a new sensor is added, I'll keep the KIP and update it
>> > according.
>> > >>> > >
>> > >>> > > Thanks,
>> > >>> > >
>> > >>> > > Jiangjie (Becket) Qin
>> > >>> > >
>> > >>> > > On Mon, Feb 27, 2017 at 3:50 PM, Joel Koshy <
>> jjkosh...@gmail.com>
>> > >>> wrote:
>> > >>> > >
>> > >>> > >> >
>> > >>> > >> > Lets say we sent the batch over the wire and received a
>> > >>> > >> > RecordTooLargeException, how do we split it as once we add
>> the
>> > >>> message
>> > >>> > >> to
>> > >>> > >> > the batch we loose the message level granularity. We will
>> have
>> > to
>> > >>> > >> > decompress, do deep iteration and split and again compress.
>> > right?
>> > >>> > This
>> > >>> > >> > looks like a performance bottle neck in case of multi topic
>> > >>> producers
>> > >>> > >> like
>> > >>> > >> > mirror maker.
>> > >>> > >> >
>> > >>> > >>
>> > >>> > >> Yes, but these should be outliers if we do estimation on a
>> > per-topic
>> > >>> > basis
>> > >>> > >> and if we target a conservative-enough compression ratio. The
>> > >>> producer
>> > >>> > >> should also avoid sending over the wire if it can be made
>> aware of
>> > >>> the
>> > >>> > >> max-message size limit on the broker, and split if it
>> determines
>> > >>> that a
>> > >>> > >> record exceeds the broker's config. Ideally this should be
>> part of
>> > >>> topic
>> > >>> > >> metadata but is not - so it could be off 

[jira] [Commented] (KAFKA-4277) creating ephemeral node already exist

2017-03-13 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15923516#comment-15923516
 ] 

Jun Rao commented on KAFKA-4277:


[~Wrikken], the only case when we need to re-register the broker in ZK is when 
the current ZK session has expired. The re-registration will be done in a new 
session, at which time, the ephemeral node in the previous session is supposed 
to be gone. Which version of ZK are you using?

> creating ephemeral node already exist
> -
>
> Key: KAFKA-4277
> URL: https://issues.apache.org/jira/browse/KAFKA-4277
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Feixiang Yan
>
> I use zookeeper 3.4.6.
> Zookeeper session time out, zkClient try reconnect failed. Then re-establish 
> the session and re-registering broker info in ZK, throws NODEEXISTS Exception.
>  I think it is because the ephemeral node which created by old session has 
> not removed. 
> I read the 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>  of 0.8.1, createEphemeralPathExpectConflictHandleZKBug try create node in a 
> while loop until create success. This can solve the issue. But in 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.10.0.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>   0.10.1 the function removed.
> {noformat}
> [2016-10-07 19:00:32,562] INFO Socket connection established to 
> 10.191.155.238/10.191.155.238:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,563] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,564] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1576b11f9b201bd has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,564] INFO Initiating client connection, 
> connectString=10.191.155.237:21819,10.191.155.238:21819,10.191.155.239:21819/cluster2
>  sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@ae71be2 
> (org.apache.zookeeper.ZooKeeper)
> [2016-10-07 19:00:32,566] INFO Opening socket connection to server 
> 10.191.155.237/10.191.155.237:21819. Will not attempt to authenticate using 
> SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO Socket connection established to 
> 10.191.155.237/10.191.155.237:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO EventThread shut down 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO Session establishment complete on server 
> 10.191.155.237/10.191.155.237:21819, sessionid = 0x1579ecd39c20006, 
> negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,608] INFO re-registering broker info in ZK for broker 3 
> (kafka.server.KafkaHealthcheck$SessionExpireListener)
> [2016-10-07 19:00:32,610] INFO Creating /brokers/ids/3 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,611] INFO Result of znode creation is: NODEEXISTS 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,614] ERROR Error handling event ZkEvent[New session 
> event sent to kafka.server.KafkaHealthcheck$SessionExpireListener@324f1bc] 
> (org.I0Itec.zkclient.ZkEventThread)
> java.lang.RuntimeException: A broker is already registered on the path 
> /brokers/ids/3. This probably indicates that you either have configured a 
> brokerid that is already in use, or else you have shutdown this broker and 
> restarted it faster than the zookeeper timeout so it appears to be 
> re-registering.
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:305)
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:291)
> at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:70)
> at 
> kafka.server.KafkaHealthcheck$SessionExpireListener.handleNewSession(KafkaHealthcheck.scala:104)
> at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-13 Thread Jay Kreps
Hey Matthias,

Make sense, I'm more advocating for removing the word topology than any
particular new replacement.

-Jay

On Mon, Mar 13, 2017 at 12:30 PM, Matthias J. Sax 
wrote:

> Jay,
>
> thanks for your feedback
>
> > What if instead we called it KStreamsBuilder?
>
> That's the current name and I personally think it's not the best one.
> The main reason why I don't like KStreamsBuilder is, that we have the
> concepts of KStreams and KTables, and the builder creates both. However,
> the name puts he focus on KStream and devalues KTable.
>
> I understand your argument, and I am personally open the remove the
> "Topology" part, and name it "StreamsBuilder". Not sure what others
> think about this.
>
>
> About Processor API: I like the idea in general, but I thinks it's out
> of scope for this KIP. KIP-120 has the focus on removing leaking
> internal APIs and do some cleanup how our API reflects some concepts.
>
> However, I added your idea to API discussion Wiki page and we take if
> from there:
> https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Streams+Discussions
>
>
>
> -Matthias
>
>
> On 3/13/17 11:52 AM, Jay Kreps wrote:
> > Two things:
> >
> >1. This is a minor thing but the proposed new name for KStreamBuilder
> >is StreamsTopologyBuilder. I actually think we should not put
> topology in
> >the name as topology is not a concept you need to understand at the
> >kstreams layer right now. I'd think of three categories of concepts:
> (1)
> >concepts you need to understand to get going even for a simple
> example, (2)
> >concepts you need to understand to operate and debug a real
> production app,
> >(3) concepts we truly abstract and you don't need to ever understand.
> I
> >think in the kstream layer topologies are currently category (2), and
> this
> >is where they belong. By introducing the name in even the simplest
> example
> >it means the user has to go read about toplogies to really understand
> even
> >this simple snippet. What if instead we called it KStreamsBuilder?
> >2. For the processor api, I think this api is mostly not for end
> users.
> >However this are a couple cases where it might make sense to expose
> it. I
> >think users coming from Samza, or JMS's MessageListener (
> >https://docs.oracle.com/javaee/7/api/javax/jms/MessageListener.html)
> >understand a simple callback interface for message processing. In
> fact,
> >people often ask why Kafka's consumer doesn't provide such an
> interface.
> >I'd argue we do, it's KafkaStreams. The only issue is that the
> processor
> >API documentation is a bit scary for a person implementing this type
> of
> >api. My observation is that people using this style of API don't do a
> lot
> >of cross-message operations, then just do single message operations
> and use
> >a database for anything that spans messages. They also don't factor
> their
> >code into many MessageListeners and compose them, they just have one
> >listener that has the complete handling logic. Say I am a user who
> wants to
> >implement a single Processor in this style. Do we have an easy way to
> do
> >that today (either with the .transform/.process methods in kstreams
> or with
> >the topology apis)? Is there anything we can do in the way of trivial
> >helper code to make this better? Also, how can we explain that
> pattern to
> >people? I think currently we have pretty in-depth docs on our apis
> but I
> >suspect a person trying to figure out how to implement a simple
> callback
> >might get a bit lost trying to figure out how to wire it up. A simple
> five
> >line example in the docs would probably help a lot. Not sure if this
> is
> >best addressed in this KIP or is a side comment.
> >
> > Cheers,
> >
> > -Jay
> >
> > On Fri, Feb 3, 2017 at 3:33 PM, Matthias J. Sax 
> > wrote:
> >
> >> Hi All,
> >>
> >> I did prepare a KIP to do some cleanup some of Kafka's Streaming API.
> >>
> >> Please have a look here:
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 120%3A+Cleanup+Kafka+Streams+builder+API
> >>
> >> Looking forward to your feedback!
> >>
> >>
> >> -Matthias
> >>
> >>
> >
>
>


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-13 Thread Sriram Subramanian
StreamsBuilder would be my vote.

> On Mar 13, 2017, at 9:42 PM, Jay Kreps  wrote:
> 
> Hey Matthias,
> 
> Make sense, I'm more advocating for removing the word topology than any
> particular new replacement.
> 
> -Jay
> 
> On Mon, Mar 13, 2017 at 12:30 PM, Matthias J. Sax 
> wrote:
> 
>> Jay,
>> 
>> thanks for your feedback
>> 
>>> What if instead we called it KStreamsBuilder?
>> 
>> That's the current name and I personally think it's not the best one.
>> The main reason why I don't like KStreamsBuilder is, that we have the
>> concepts of KStreams and KTables, and the builder creates both. However,
>> the name puts he focus on KStream and devalues KTable.
>> 
>> I understand your argument, and I am personally open the remove the
>> "Topology" part, and name it "StreamsBuilder". Not sure what others
>> think about this.
>> 
>> 
>> About Processor API: I like the idea in general, but I thinks it's out
>> of scope for this KIP. KIP-120 has the focus on removing leaking
>> internal APIs and do some cleanup how our API reflects some concepts.
>> 
>> However, I added your idea to API discussion Wiki page and we take if
>> from there:
>> https://cwiki.apache.org/confluence/display/KAFKA/
>> Kafka+Streams+Discussions
>> 
>> 
>> 
>> -Matthias
>> 
>> 
>>> On 3/13/17 11:52 AM, Jay Kreps wrote:
>>> Two things:
>>> 
>>>   1. This is a minor thing but the proposed new name for KStreamBuilder
>>>   is StreamsTopologyBuilder. I actually think we should not put
>> topology in
>>>   the name as topology is not a concept you need to understand at the
>>>   kstreams layer right now. I'd think of three categories of concepts:
>> (1)
>>>   concepts you need to understand to get going even for a simple
>> example, (2)
>>>   concepts you need to understand to operate and debug a real
>> production app,
>>>   (3) concepts we truly abstract and you don't need to ever understand.
>> I
>>>   think in the kstream layer topologies are currently category (2), and
>> this
>>>   is where they belong. By introducing the name in even the simplest
>> example
>>>   it means the user has to go read about toplogies to really understand
>> even
>>>   this simple snippet. What if instead we called it KStreamsBuilder?
>>>   2. For the processor api, I think this api is mostly not for end
>> users.
>>>   However this are a couple cases where it might make sense to expose
>> it. I
>>>   think users coming from Samza, or JMS's MessageListener (
>>>   https://docs.oracle.com/javaee/7/api/javax/jms/MessageListener.html)
>>>   understand a simple callback interface for message processing. In
>> fact,
>>>   people often ask why Kafka's consumer doesn't provide such an
>> interface.
>>>   I'd argue we do, it's KafkaStreams. The only issue is that the
>> processor
>>>   API documentation is a bit scary for a person implementing this type
>> of
>>>   api. My observation is that people using this style of API don't do a
>> lot
>>>   of cross-message operations, then just do single message operations
>> and use
>>>   a database for anything that spans messages. They also don't factor
>> their
>>>   code into many MessageListeners and compose them, they just have one
>>>   listener that has the complete handling logic. Say I am a user who
>> wants to
>>>   implement a single Processor in this style. Do we have an easy way to
>> do
>>>   that today (either with the .transform/.process methods in kstreams
>> or with
>>>   the topology apis)? Is there anything we can do in the way of trivial
>>>   helper code to make this better? Also, how can we explain that
>> pattern to
>>>   people? I think currently we have pretty in-depth docs on our apis
>> but I
>>>   suspect a person trying to figure out how to implement a simple
>> callback
>>>   might get a bit lost trying to figure out how to wire it up. A simple
>> five
>>>   line example in the docs would probably help a lot. Not sure if this
>> is
>>>   best addressed in this KIP or is a side comment.
>>> 
>>> Cheers,
>>> 
>>> -Jay
>>> 
>>> On Fri, Feb 3, 2017 at 3:33 PM, Matthias J. Sax 
>>> wrote:
>>> 
 Hi All,
 
 I did prepare a KIP to do some cleanup some of Kafka's Streaming API.
 
 Please have a look here:
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-
 120%3A+Cleanup+Kafka+Streams+builder+API
 
 Looking forward to your feedback!
 
 
 -Matthias
>> 
>> 


[jira] [Comment Edited] (KAFKA-4878) Kafka Connect does not log connector configuration errors

2017-03-13 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15923586#comment-15923586
 ] 

Gwen Shapira edited comment on KAFKA-4878 at 3/14/17 5:07 AM:
--

This is even worse in stand-alone mode because if you start it with bad config, 
it won't start and you don't even have a REST API...

So far I ran into 4 users complaining about it, and 0.10.2.0 has only been out 
for few weeks...


was (Author: gwenshap):
This is even worse in stand-alone mode because if you start it with bad config, 
it won't start and you don't even have a REST API...

So far I ran into 4 users complaining about it, and 3.2 has only been out for a 
week...

> Kafka Connect does not log connector configuration errors
> -
>
> Key: KAFKA-4878
> URL: https://issues.apache.org/jira/browse/KAFKA-4878
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Priority: Blocker
> Fix For: 0.10.2.1
>
>
> Currently, on connector configuration error, Kafka Connect (both distributed 
> and stand alone) logs:
> org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector 
> configuration is invalid (use the endpoint `/{connectorType}/config/validate` 
> to get a full list of errors)
> This is annoying because:
> 1. If I'm using stand-alone mode, I may have configured my connector via 
> configuration file and I don't want to know about the REST API at all.
> 2. The output of validate is rather annoying
> What I'd like to see in the output is:
> 1. number of errors in my configuration
> 2. at least one error, preferably all of them



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4878) Kafka Connect does not log connector configuration errors

2017-03-13 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15923586#comment-15923586
 ] 

Gwen Shapira commented on KAFKA-4878:
-

This is even worse in stand-alone mode because if you start it with bad config, 
it won't start and you don't even have a REST API...

So far I ran into 4 users complaining about it, and 3.2 has only been out for a 
week...

> Kafka Connect does not log connector configuration errors
> -
>
> Key: KAFKA-4878
> URL: https://issues.apache.org/jira/browse/KAFKA-4878
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
> Fix For: 0.10.2.1
>
>
> Currently, on connector configuration error, Kafka Connect (both distributed 
> and stand alone) logs:
> org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector 
> configuration is invalid (use the endpoint `/{connectorType}/config/validate` 
> to get a full list of errors)
> This is annoying because:
> 1. If I'm using stand-alone mode, I may have configured my connector via 
> configuration file and I don't want to know about the REST API at all.
> 2. The output of validate is rather annoying
> What I'd like to see in the output is:
> 1. number of errors in my configuration
> 2. at least one error, preferably all of them



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4878) Kafka Connect does not log connector configuration errors

2017-03-13 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-4878:

Fix Version/s: 0.10.2.1

> Kafka Connect does not log connector configuration errors
> -
>
> Key: KAFKA-4878
> URL: https://issues.apache.org/jira/browse/KAFKA-4878
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
> Fix For: 0.10.2.1
>
>
> Currently, on connector configuration error, Kafka Connect (both distributed 
> and stand alone) logs:
> org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector 
> configuration is invalid (use the endpoint `/{connectorType}/config/validate` 
> to get a full list of errors)
> This is annoying because:
> 1. If I'm using stand-alone mode, I may have configured my connector via 
> configuration file and I don't want to know about the REST API at all.
> 2. The output of validate is rather annoying
> What I'd like to see in the output is:
> 1. number of errors in my configuration
> 2. at least one error, preferably all of them



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4878) Kafka Connect does not log connector configuration errors

2017-03-13 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-4878:

Priority: Blocker  (was: Major)

> Kafka Connect does not log connector configuration errors
> -
>
> Key: KAFKA-4878
> URL: https://issues.apache.org/jira/browse/KAFKA-4878
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Priority: Blocker
> Fix For: 0.10.2.1
>
>
> Currently, on connector configuration error, Kafka Connect (both distributed 
> and stand alone) logs:
> org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector 
> configuration is invalid (use the endpoint `/{connectorType}/config/validate` 
> to get a full list of errors)
> This is annoying because:
> 1. If I'm using stand-alone mode, I may have configured my connector via 
> configuration file and I don't want to know about the REST API at all.
> 2. The output of validate is rather annoying
> What I'd like to see in the output is:
> 1. number of errors in my configuration
> 2. at least one error, preferably all of them



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-122: Add Reset Consumer Group Offsets tooling

2017-03-13 Thread Gwen Shapira
+1 (binding)

Nice job - this is going to be super useful.

On Thu, Feb 23, 2017 at 4:46 PM, Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Hi All,
>
> It seems that there is no further concern with the KIP-122.
> At this point we would like to start the voting process.
>
> The KIP can be found here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 122%3A+Add+Reset+Consumer+Group+Offsets+tooling
>
>
> Thanks!
>
> Jorge.
>



-- 
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter  | blog



[jira] [Created] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-03-13 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-4893:
---

 Summary: async topic deletion conflicts with max topic length
 Key: KAFKA-4893
 URL: https://issues.apache.org/jira/browse/KAFKA-4893
 Project: Kafka
  Issue Type: Bug
Reporter: Onur Karaman
Priority: Minor


As per the 
[documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
topics can be only 249 characters long to line up with typical filesystem 
limitations:
{quote}
Each sharded partition log is placed into its own folder under the Kafka log 
directory. The name of such folders consists of the topic name, appended by a 
dash (\-) and the partition id. Since a typical folder name can not be over 255 
characters long, there will be a limitation on the length of topic names. We 
assume the number of partitions will not ever be above 100,000. Therefore, 
topic names cannot be longer than 249 characters. This leaves just enough room 
in the folder name for a dash and a potentially 5 digit long partition id.
{quote}

{{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
validation.

This limit ends up not being quite right since topic deletion ends up renaming 
the directory to the form {{topic-partition.uniqueId-delete}} as can be seen in 
{{LogManager.asyncDelete}}:
{code}
val dirName = new StringBuilder(removedLog.name)
  .append(".")
  .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
  .append(Log.DeleteDirSuffix)
  .toString()
{code}

So the unique id and "-delete" suffix end up hogging some of the characters. 
Deleting a long-named topic results in a log message such as the following:
{code}
kafka.common.KafkaStorageException: Failed to rename log directory from 
/tmp/kafka-logs0/0-0
 to 
/tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
  at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
  at kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
  at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
  at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
  at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
  at kafka.cluster.Partition.delete(Partition.scala:137)
  at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
  at 
kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
  at 
kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
  at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
  at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
  at java.lang.Thread.run(Thread.java:745)
{code}

The topic after this point still exists but has Leader set to -1 and the 
controller recognizes the topic completion as incomplete (the topic znode is 
still in /admin/delete_topics).

I don't believe linkedin has any topic name this long but I'm making the ticket 
in case anyone runs into this problem.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-117: Add a public AdminClient API for Kafka admin operations

2017-03-13 Thread Gwen Shapira
+1 (binding)

I expressed few concerns in the discussion thread, but in general this is
super important to get done.

On Fri, Mar 10, 2017 at 10:38 AM, Colin McCabe  wrote:

> Hi all,
>
> I'd like to start voting on KIP-117
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations
> ).
>
> The discussion thread can be found here:
> https://www.mail-archive.com/dev@kafka.apache.org/msg65697.html
>
> best,
> Colin
>



-- 
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter  | blog



Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-03-13 Thread Gwen Shapira
I'm torn between my desire to get this in already and the fact that parts
of the API feel a bit alien to Kafka.

I will resolve my difficulties by giving my feedback here and then going to
vote +1 on the vote thread.
Colin can choose whether to address my concerns now or use his "unstable"
option to wait and see...

My main concern is the RequestOptions objects... It was discussed earlier a
bit, but I'm not sure my particular concerns were addressed. I see the
following issues with it:
* They double the number of methods we have
* They are pretty similar, so it isn't clear why we need all those
different objects
* All our other APIs specify timeouts either in method calls directly or in
the configuration for the entire object.

We also typically don't use methods that start with "set", but this is
minor.

The configs for NewTopic are Map - shouldn't we use the
LogConfig object that we already have? This will take care of documentation
and be similar to ProducerConfig and ConsumerConfig?

My concerns aside, thank you for working on this much needed API.

Gwen

On Mon, Mar 13, 2017 at 3:26 PM, Colin McCabe  wrote:

> Hi Radai,
>
> Thanks for looking at the KIP again.
>
> On Mon, Mar 13, 2017, at 12:33, radai wrote:
> > looking at the KIP as it is now, looks like all *Options objects have a
> > common timeout property. could it be extracted to a common
> > AdminRequestOptions or something?
>
> Perhaps I'm missing something, but I don't think there is any reason to
> extract the timeout property.  It doesn't simplify the implementation
> (I've already implemented the interface in a branch, so I know this for
> sure.)  It doesn't simplify the API exposed to the users, since they
> will still want to provide the specific option type corresponding to the
> call.  Also, as we discussed previously in the thread (about NewTopic),
> having lot of inheritance and base classes makes it difficult to change
> classes over time.  It is better to simply use composition.
>
> I think it would be much better to get the AdminClient interface in, and
> start iterating on it incrementally as we discover ways it could be
> better.  This is similar to how some things in Streams were added as
> unstable interfaces and then stabilized over time.
>
> best,
> Colin
>
>
> >
> > On Thu, Mar 9, 2017 at 2:14 PM, Colin McCabe  wrote:
> >
> > > Hi all,
> > >
> > > We've been discussing this for a while (about a month) and I think
> > > people have made some great points that improved the proposal.  In
> > > particular, adding async and batching was important.  I've also been
> > > talking with some end-users who would like to make use of this API.
> > > Once this is in, we can iterate on it before the release, and it will
> > > also unblock a lot of other admin proposals.  I think it would be good
> > > to start the vote in a little bit, assuming there are no objections.
> > >
> > > best,
> > > Colin
> > >
> > >
> > > On Wed, Mar 8, 2017, at 17:09, Colin McCabe wrote:
> > > > On Mon, Mar 6, 2017, at 11:55, Ismael Juma wrote:
> > > > > Thanks Colin.
> > > > >
> > > > > I am familiar with the protocol semantics, but we need to document
> the
> > > > > API
> > > > > for users who don't know the protocol. I still think it would be
> > > valuable
> > > > > to have some examples of how the API would be used for common use
> > > cases.
> > > >
> > > > Getting the version of all nodes in the cluster:
> > > >   Map nodesToVersions =
> > > > adminClient.listNodes().nodes().thenApply(
> > > >   nodes -> adminClient.apiVersions(nodes)).all().get();
> > > >
> > > > Creating a topic:
> > > >   adminClient.createTopic(new NewTopic("myNewTopic", 3, (short)
> > > >   3)).all().get();
> > > >
> > > > Validating that a topic can be created (but not creating it):
> > > >   adminClient.createTopic(new NewTopic("myNewTopic", 3, (short) 3),
> > > > new CreateTopicOptions().setValidateOnly(true)).all().get();
> > > >
> > > > > For example, say someone creates a topic and then produces to it.
> What
> > > > > would be the recommended way to do that?
> > > >
> > > > Once the future returned by createTopics has successfully completed,
> it
> > > > should be possible to produce to the topic.
> > > >
> > > > There are a few warts that are definitely worth calling out.  These
> are
> > > > things that need to be fixed at the protocol layer, so they're
> outside
> > > > the scope of this KIP.  But you made a good point that we need to
> > > > document this well.  Here's my list (I wonder if anyone has more?):
> > > >
> > > > * If auto.create.topics.enable is true on the brokers,
> > > > AdminClient#describeTopic(topicName) may create a topic named
> topicName.
> > > >  There are two workarounds: either use AdminClient#listTopics and
> ensure
> > > > that the topic is present before describing, or disable
> > > > auto.create.topics.enable.
> > > >
> > > > * If delete.topic.enable is false on the brokers,
> > > > AdminClient#deleteTopic(topicName) will mark top

[jira] [Assigned] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-03-13 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-4893:
--

Assignee: Vahid Hashemian

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at kafka.cluster.Partition.delete(Partition.scala:137)
>   at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
>   at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic after this point still exists but has Leader set to -1 and the 
> controller recognizes the topic completion as incomplete (the topic znode is 
> still in /admin/delete_topics).
> I don't believe linkedin has any topic name this long but I'm making the 
> ticket in case anyone runs into this problem.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4859) Transient test failure: org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion (again)

2017-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15923614#comment-15923614
 ] 

ASF GitHub Bot commented on KAFKA-4859:
---

GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/2682

KAFKA-4859: Set shorter commit interval for integration tests with caching



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka K4859-cache-commit-interval

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2682.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2682


commit 733ef40cbd698f8533cf888629f159a3e3168099
Author: Guozhang Wang 
Date:   2017-03-14T05:24:59Z

use shorter commit interval and also doubling on waiting conditions

commit a486cebdfbbf7b833387b62800671d238fee2521
Author: Guozhang Wang 
Date:   2017-03-14T05:39:29Z

use more conservative wait time




> Transient test failure: 
> org.apache.kafka.streams.integration.JoinIntegrationTest.shouldCountClicksPerRegion
>  (again)
> ---
>
> Key: KAFKA-4859
> URL: https://issues.apache.org/jira/browse/KAFKA-4859
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Armin Braun
>Assignee: Armin Braun
>
> Slightly different than KAFKA-3874 in terms of the way it fails.
> Now we have:
> {code}
> Error Message
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
> records from topic output-topic-2 while only received 0: []
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 3. Expecting 3 
> records from topic output-topic-2 while only received 0: []
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:257)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:206)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:175)
>   at 
> org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest.shouldCountClicksPerRegion(KStreamKTableJoinIntegrationTest.java:297)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> {code}
> e.g. here https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2032/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2682: KAFKA-4859: Set shorter commit interval for integr...

2017-03-13 Thread guozhangwang
GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/2682

KAFKA-4859: Set shorter commit interval for integration tests with caching



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka K4859-cache-commit-interval

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2682.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2682


commit 733ef40cbd698f8533cf888629f159a3e3168099
Author: Guozhang Wang 
Date:   2017-03-14T05:24:59Z

use shorter commit interval and also doubling on waiting conditions

commit a486cebdfbbf7b833387b62800671d238fee2521
Author: Guozhang Wang 
Date:   2017-03-14T05:39:29Z

use more conservative wait time




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4893) async topic deletion conflicts with max topic length

2017-03-13 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15923636#comment-15923636
 ] 

Onur Karaman commented on KAFKA-4893:
-

Maybe the right move is to just add more layers to the log directory like so:
{code}
kafkaLogDir/delete/topic-partition/uniqueId/
{code}
and put all the data under that directory.

I think typical filesystems have a directory limit of 4096 characters so we 
should be good to go. The one annoyance is that the old structure is already 
out in the wild, so you may need to recognize and handle both delete directory 
structures for some time.

> async topic deletion conflicts with max topic length
> 
>
> Key: KAFKA-4893
> URL: https://issues.apache.org/jira/browse/KAFKA-4893
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Vahid Hashemian
>Priority: Minor
>
> As per the 
> [documentation|http://kafka.apache.org/documentation/#basic_ops_add_topic], 
> topics can be only 249 characters long to line up with typical filesystem 
> limitations:
> {quote}
> Each sharded partition log is placed into its own folder under the Kafka log 
> directory. The name of such folders consists of the topic name, appended by a 
> dash (\-) and the partition id. Since a typical folder name can not be over 
> 255 characters long, there will be a limitation on the length of topic names. 
> We assume the number of partitions will not ever be above 100,000. Therefore, 
> topic names cannot be longer than 249 characters. This leaves just enough 
> room in the folder name for a dash and a potentially 5 digit long partition 
> id.
> {quote}
> {{kafka.common.Topic.maxNameLength}} is set to 249 and is used during 
> validation.
> This limit ends up not being quite right since topic deletion ends up 
> renaming the directory to the form {{topic-partition.uniqueId-delete}} as can 
> be seen in {{LogManager.asyncDelete}}:
> {code}
> val dirName = new StringBuilder(removedLog.name)
>   .append(".")
>   
> .append(java.util.UUID.randomUUID.toString.replaceAll("-",""))
>   .append(Log.DeleteDirSuffix)
>   .toString()
> {code}
> So the unique id and "-delete" suffix end up hogging some of the characters. 
> Deleting a long-named topic results in a log message such as the following:
> {code}
> kafka.common.KafkaStorageException: Failed to rename log directory from 
> /tmp/kafka-logs0/0-0
>  to 
> /tmp/kafka-logs0/0-0.797bba3fb2464729840f87769243edbb-delete
>   at kafka.log.LogManager.asyncDelete(LogManager.scala:439)
>   at 
> kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:142)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:137)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
>   at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:221)
>   at kafka.cluster.Partition.delete(Partition.scala:137)
>   at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:230)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:260)
>   at 
> kafka.server.ReplicaManager$$anonfun$stopReplicas$2.apply(ReplicaManager.scala:259)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:259)
>   at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:174)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:64)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The topic after this point still exists but has Leader set to -1 and the 
> controller recognizes the topic completion as incomplete (the topic znode is 
> still in /admin/delete_topics).
> I don't believe linkedin has any topic name this long but I'm making the 
> ticket in case anyone runs into this problem.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)