[jira] [Commented] (KAFKA-4277) creating ephemeral node already exist

2018-11-19 Thread Birger Brunswiek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16691514#comment-16691514
 ] 

Birger Brunswiek commented on KAFKA-4277:
-

I am not sure if the issue I experienced is solved. I ran into the problem not 
because of a reconnect to Zookeeper. Instead, it was due to a restart which was 
faster than it takes Zookeeper to delete ephemeral nodes. To me it looks as if 
the solution to KAFKA-7165 only handles reconnects but not the initial 
connection. Kafka's code has changed a lot since version 0.10 where I saw the 
issue so I find it difficult to say if it is still a problem in 2.2.0.

> creating ephemeral node already exist
> -
>
> Key: KAFKA-4277
> URL: https://issues.apache.org/jira/browse/KAFKA-4277
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 0.10.0.0
>Reporter: Feixiang Yan
>Priority: Major
>
> I use zookeeper 3.4.6.
> Zookeeper session time out, zkClient try reconnect failed. Then re-establish 
> the session and re-registering broker info in ZK, throws NODEEXISTS Exception.
>  I think it is because the ephemeral node which created by old session has 
> not removed. 
> I read the 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>  of 0.8.1, createEphemeralPathExpectConflictHandleZKBug try create node in a 
> while loop until create success. This can solve the issue. But in 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.10.0.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>   0.10.1 the function removed.
> {noformat}
> [2016-10-07 19:00:32,562] INFO Socket connection established to 
> 10.191.155.238/10.191.155.238:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,563] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,564] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1576b11f9b201bd has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,564] INFO Initiating client connection, 
> connectString=10.191.155.237:21819,10.191.155.238:21819,10.191.155.239:21819/cluster2
>  sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@ae71be2 
> (org.apache.zookeeper.ZooKeeper)
> [2016-10-07 19:00:32,566] INFO Opening socket connection to server 
> 10.191.155.237/10.191.155.237:21819. Will not attempt to authenticate using 
> SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO Socket connection established to 
> 10.191.155.237/10.191.155.237:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO EventThread shut down 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO Session establishment complete on server 
> 10.191.155.237/10.191.155.237:21819, sessionid = 0x1579ecd39c20006, 
> negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,608] INFO re-registering broker info in ZK for broker 3 
> (kafka.server.KafkaHealthcheck$SessionExpireListener)
> [2016-10-07 19:00:32,610] INFO Creating /brokers/ids/3 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,611] INFO Result of znode creation is: NODEEXISTS 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,614] ERROR Error handling event ZkEvent[New session 
> event sent to kafka.server.KafkaHealthcheck$SessionExpireListener@324f1bc] 
> (org.I0Itec.zkclient.ZkEventThread)
> java.lang.RuntimeException: A broker is already registered on the path 
> /brokers/ids/3. This probably indicates that you either have configured a 
> brokerid that is already in use, or else you have shutdown this broker and 
> restarted it faster than the zookeeper timeout so it appears to be 
> re-registering.
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:305)
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:291)
> at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:70)
> at 
> kafka.server.KafkaHealthcheck$SessionExpireListener.handleNewSession(KafkaHealthcheck.scala:104)
> at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7654) Relax requirements on serializing-only methods.

2018-11-19 Thread Bruno Bieth (JIRA)
Bruno Bieth created KAFKA-7654:
--

 Summary: Relax requirements on serializing-only methods.
 Key: KAFKA-7654
 URL: https://issues.apache.org/jira/browse/KAFKA-7654
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Bruno Bieth


Methods such as KStream#to shouldn't require a Produced as only the serializing 
part is ever used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4493) Connections to Kafka brokers should be validated

2018-11-19 Thread Stanislav Kozlovski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16691676#comment-16691676
 ] 

Stanislav Kozlovski commented on KAFKA-4493:


[~taku-k] , I'd like to fix this issue. Are you still working on it or can I 
take it?

> Connections to Kafka brokers should be validated
> 
>
> Key: KAFKA-4493
> URL: https://issues.apache.org/jira/browse/KAFKA-4493
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Ismael Juma
>Priority: Major
>
> There have been a few reports of Kafka clients throwing an OOM because they 
> read 4 bytes from the stream and then use that to allocate a ByteBuffer 
> without validating that they are using the right security protocol or even 
> communicating with a Kafka broker.
> It would be good to perform some validation in order to show a useful error 
> message to the user instead of the OOM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-4493) Connections to Kafka brokers should be validated

2018-11-19 Thread Stanislav Kozlovski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16691676#comment-16691676
 ] 

Stanislav Kozlovski edited comment on KAFKA-4493 at 11/19/18 1:07 PM:
--

Hey [~taku-k] , I'd like to fix this issue.

Are you still working on it or can I take it?


was (Author: enether):
[~taku-k] , I'd like to fix this issue. Are you still working on it or can I 
take it?

> Connections to Kafka brokers should be validated
> 
>
> Key: KAFKA-4493
> URL: https://issues.apache.org/jira/browse/KAFKA-4493
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Ismael Juma
>Priority: Major
>
> There have been a few reports of Kafka clients throwing an OOM because they 
> read 4 bytes from the stream and then use that to allocate a ByteBuffer 
> without validating that they are using the right security protocol or even 
> communicating with a Kafka broker.
> It would be good to perform some validation in order to show a useful error 
> message to the user instead of the OOM.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7397) Ability to apply DSL stateless transformation on a global table

2018-11-19 Thread Frederic Tardif (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16691751#comment-16691751
 ] 

Frederic Tardif commented on KAFKA-7397:


Hi [~vvcephei], BTW, the work around code does not quite work because of a 
unexpected handling of the global table when the local store gets deleted:
{quote}I have implemented a {{addGlobalStore}} with a custom processor 
responsible to transform a K,V record from the input stream into a V,K records. 
It works fine and my {{store.all()}} does print the correct persisted V,K 
records. However, if I clean the local store and restart the stream app, the 
global table is reloaded but without going through the custom processor; 
instead, it calls {{GlobalStateManagerImp#restoreState}} which simply stores 
the input topic K,V records into rocksDB (hence bypassing the mapping function 
of my custom processor). I believe this must not be the expected result?
{quote}
Any update on the KIP?

> Ability to apply DSL stateless transformation on a global table
> ---
>
> Key: KAFKA-7397
> URL: https://issues.apache.org/jira/browse/KAFKA-7397
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Frederic Tardif
>Priority: Major
>  Labels: needs-kip
> Attachments: kafka.zip
>
>
> When consuming a globalKTable (with the expectation of caching all the data 
> of a topic in a consumer store), we can't apply any stateless transformation 
> (filter, map), prior to materializing. To achieve this, while ensuring to 
> consume the records of all the partitions, we must first run a stream app 
> that does preprocessing on the ingress topic into an exact K1,V1 egress topic 
> as we want to store in our GlobalKTable. This looks unnecessarily complex, 
> and causes to double the storage of the topic, while the only goal is to 
> adapt statelessly the data prior to storing (rockDB) at the receiving end.
> See discussion on 
> :[https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic]
> As a workaround, I have used `new Builder().addGlobalStore()` with a 
> Custom Processor able to filter and map prior to store (see attached). 
> Although this seem to work, I believe this functionality should be part of 
> the basic dsl api when working with a globalTable (`new 
> StreamsBuilder().globalTable().filter(...).map()... `).
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7577) Semantics of Table-Table Join with Null Message Are Incorrect

2018-11-19 Thread Daren Thomas (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16691821#comment-16691821
 ] 

Daren Thomas commented on KAFKA-7577:
-

[~guozhang] I have unit tests for the issue, but not ones I could easily break 
out in order to provide them.  Unfortunately due to schedule pressure I've had 
to just work around the issue.

My best guess for the root cause of the issue resides with the aggregate I'm 
performing on a stream to create the tables used in the join.  I don't think 
it's passing on the null message.  Since I'm just trying to pass messages along 
after changing the key (not adding or subtracting values as shown in the DSL 
documentation), I have the following:
{code:java}
.aggregate(() -> myMessageValue.newBuilder().build(), (aggKey, newValue, 
aggValue) -> newValue, ...{code}
Perhaps there is something wrong with that and not the join?

> Semantics of Table-Table Join with Null Message Are Incorrect
> -
>
> Key: KAFKA-7577
> URL: https://issues.apache.org/jira/browse/KAFKA-7577
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Daren Thomas
>Priority: Major
>
> Observed behavior of Table-Table join with a Null Message does not match the 
> semantics described in the documentation 
> ([https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-ktable-join).]
>   The expectation is:
>  * Message A results in [A, null] from the Left Join
>  * Message null (tombstone) results in null (tombstone) from the Left Join
>  The observed behavior was that the null (tombstone) message did not pass 
> through the Left Join to the output topic like expected.  This behavior was 
> observed with and without caching enabled, against a test harness, and 
> against a local Confluent 5.0.0 platform.  It was also observed that the 
> KTableKTableLeftJoinProcessor.process() function was not called for the null 
> (tombstone) message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7655) Metadata spamming requests from Kafka Streams under some circumstances, potential DOS

2018-11-19 Thread Pasquale Vazzana (JIRA)
Pasquale Vazzana created KAFKA-7655:
---

 Summary: Metadata spamming requests from Kafka Streams under some 
circumstances, potential DOS
 Key: KAFKA-7655
 URL: https://issues.apache.org/jira/browse/KAFKA-7655
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.0.1
Reporter: Pasquale Vazzana


There is a bug in the InternalTopicManager that makes the client believe that a 
topic exists even though it doesn't, it occurs mostly in those few seconds 
between when a topic is marked for deletion and when it is actually deleted. In 
that timespan, the Broker gives inconsistent information, first it hides the 
topic but then it refuses to create a new one therefore the client believes the 
topic was existing already and it starts polling for metadata.

The consequence is that the client goes into a loop where it polls for topic 
metadata and if this is done by many threads it can take down a small cluster 
or degrade greatly its performances.

The real life scenario is probably a reset gone wrong. Reproducing the issue is 
fairly simple, these are the steps:
 * Stop a Kafka streams application
 * Delete one of its changelog and the local store
 * Restart the application immediately after the topic delete
 * You will see the Kafka streams application hanging after the bootstrap 
saying something like: INFO  Metadata - Cluster ID: 

 

I am attaching a patch that fixes the issue client side but my personal opinion 
is that this should be tackled on the broker as well, metadata requests seem 
expensive and it would be easy to craft a DDOS that can potentially take down 
an entire cluster in seconds just by flooding the brokers with metadata 
requests.

The patch kicks in only when a topic that wasn't existing in the first call to 
getNumPartitions triggers a TopicExistsException. When this happens it forces 
the re-validation of the topic and if it still looks like doesn't exists plan a 
retry with some delay, to give the broker the necessary time to sort it out.

I think this patch makes sense beside the above mentioned use case where a 
topic it's not existing, because, even if the topic was actually created, the 
client should not blindly trust it and should still re-validate it by checking 
the number of partitions. IE: a topic can be created automatically by the first 
request and then it would have the default partitions rather than the expected 
ones.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7655) Metadata spamming requests from Kafka Streams under some circumstances, potential DOS

2018-11-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16691875#comment-16691875
 ] 

ASF GitHub Bot commented on KAFKA-7655:
---

Pasvaz opened a new pull request #5929: KAFKA-7655 Metadata spamming requests 
from Kafka Streams under some circumstances, potential DOS
URL: https://github.com/apache/kafka/pull/5929
 
 
   Re-validate and make sure the topic either exists or it's gone by using a 
delay.
   
   There is a bug in the InternalTopicManager that makes the client believe 
that a topic exists even though it doesn't, it occurs mostly in those few 
seconds between when a topic is marked for deletion and when it is actually 
deleted. In that timespan, the Broker gives inconsistent information, first it 
hides the topic but then it refuses to create a new one therefore the client 
believes the topic was existing already and it starts polling for metadata.
   
   The consequence is that the client goes into a loop where it polls for topic 
metadata and if this is done by many threads it can take down a small cluster 
or degrade greatly its performances.
   
   The real life scenario is probably a reset gone wrong. Reproducing the issue 
is fairly simple, these are the steps:
   
   Stop a Kafka streams application
   Delete one of its changelog and the local store
   Restart the application immediately after the topic delete
   You will see the Kafka streams application hanging after the bootstrap 
saying something like: INFO  Metadata - Cluster ID: 

   
   I am attaching a patch that fixes the issue client side but my personal 
opinion is that this should be tackled on the broker as well, metadata requests 
seem expensive and it would be easy to craft a DDOS that can potentially take 
down an entire cluster in seconds just by flooding the brokers with metadata 
requests.
   
   The patch kicks in only when a topic that wasn't existing in the first call 
to getNumPartitions triggers a TopicExistsException. When this happens it 
forces the re-validation of the topic and if it still looks like doesn't exists 
plan a retry with some delay, to give the broker the necessary time to sort it 
out.
   
   I think this patch makes sense beside the above mentioned use case where a 
topic it's not existing, because, even if the topic was actually created, the 
client should not blindly trust it and should still re-validate it by checking 
the number of partitions. IE: a topic can be created automatically by the first 
request and then it would have the default partitions rather than the expected 
ones.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Metadata spamming requests from Kafka Streams under some circumstances, 
> potential DOS
> -
>
> Key: KAFKA-7655
> URL: https://issues.apache.org/jira/browse/KAFKA-7655
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Pasquale Vazzana
>Priority: Major
>  Labels: performance, pull-request-available, security
>
> There is a bug in the InternalTopicManager that makes the client believe that 
> a topic exists even though it doesn't, it occurs mostly in those few seconds 
> between when a topic is marked for deletion and when it is actually deleted. 
> In that timespan, the Broker gives inconsistent information, first it hides 
> the topic but then it refuses to create a new one therefore the client 
> believes the topic was existing already and it starts polling for metadata.
> The consequence is that the client goes into a loop where it polls for topic 
> metadata and if this is done by many threads it can take down a small cluster 
> or degrade greatly its performances.
> The real life scenario is probably a reset gone wrong. Reproducing the issue 
> is fairly simple, these are the steps:
>  * Stop a Kafka streams application
>  * Delete one of its changelog and the local store
>  * Restart the application immediately after the topic delete
>  * You will see the Kafka streams application hanging after the bootstrap 
> saying something like: INFO  Metadata - Cluster ID: 
>  
> I am attaching a patch that fixes the issue client side but my personal 
> opinion is that this should be tackled on the broker as well, metadata 
> requests 

[jira] [Resolved] (KAFKA-7554) zookeeper.session.timeout.ms Value

2018-11-19 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-7554.
--
Resolution: Not A Problem

Default Zookeeper session timeout value 6 seconds. If the server fails to 
heartbeat to zookeeper within this period of time it is considered dead. If you 
set this too low the server may be falsely considered dead; if you set it too 
high it may take too long to recognize a truly dead server.
We can tune this parameter as per requirement. 

> zookeeper.session.timeout.ms Value
> --
>
> Key: KAFKA-7554
> URL: https://issues.apache.org/jira/browse/KAFKA-7554
> Project: Kafka
>  Issue Type: Improvement
>  Components: zkclient
>Reporter: BELUGA BEHR
>Priority: Major
>
> {quote}
> zookeeper.session.timeout.ms = 6000 (6s)
> zookeeper.connection.timeout.ms = 6000 (6s)
> {quote}
> - https://kafka.apache.org/documentation/#configuration
> Kind of an odd value?  Was it supposed to be 6 (60s) ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7323) add replication factor doesn't work

2018-11-19 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-7323.
--
Resolution: Not A Problem

Closing as per above comment.  probably heap memory is not sufficient. Please 
reopen if you think the issue still exists

> add replication factor doesn't work
> ---
>
> Key: KAFKA-7323
> URL: https://issues.apache.org/jira/browse/KAFKA-7323
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.11.0.2
>Reporter: superheizai
>Priority: Major
>
> I have topic with 256 parititons.
> Firstly, I generate the  topic partitions with their brokerIds with 
> kafka-reassign-partitions generate.
> Seconld, I add a brokerId for each partition.
> Then, I run kafka-reassign-partitions, some partitions increased their 
> replication factor, but the others stoped.
> When I read log controller.log,  some partitions' replication factors 
> increased. Then I remove these paritions which replication factor base been 
> increased and run kafka-reassign-partitions again, but no log in 
> controller.log, all paritions are "still in progress", no network flow 
> changed when watch zabbix network.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7654) Relax requirements on serializing-only methods.

2018-11-19 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692023#comment-16692023
 ] 

Bill Bejeck commented on KAFKA-7654:


Hi Bruno Bieth, 

I understand your point, but by using KStream#to(Produced) leaves the door open 
to providing a seamless way of delivering additional parameters to the 
KStream#to operation without having to add an overloaded method (although I 
don't have any ideas of what those might be ATM). 

 

Thanks,

Bill

> Relax requirements on serializing-only methods.
> ---
>
> Key: KAFKA-7654
> URL: https://issues.apache.org/jira/browse/KAFKA-7654
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Bieth
>Priority: Major
>
> Methods such as KStream#to shouldn't require a Produced as only the 
> serializing part is ever used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7361) Kafka wont reconnect after NoRouteToHostException

2018-11-19 Thread Manikumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692035#comment-16692035
 ] 

Manikumar commented on KAFKA-7361:
--

This must be fixed in https://issues.apache.org/jira/browse/KAFKA-4041

> Kafka wont reconnect after NoRouteToHostException
> -
>
> Key: KAFKA-7361
> URL: https://issues.apache.org/jira/browse/KAFKA-7361
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 1.1.0
> Environment: kubernetes cluster
>Reporter: C Schabert
>Priority: Major
>
> After Zookeeper died and came back up kafka could not reconnect to zookeeper.
> In this Setup zookeeper ist behind a dns and came up with a different ip.
>  
> Here is the kafka log output:
>  
> {code:java}
> [2018-08-30 14:50:23,846] INFO Opening socket connection to server 
> zookeeper-0.zookeeper./10.42.0.123:2181. Will not attempt to authenticate 
> using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> 8/30/2018 4:50:26 PM [2018-08-30 14:50:26,916] WARN Session 0x1658b2f0f4e0002 
> for server null, unexpected error, closing socket connection and attempting 
> reconnect (org.apache.zookeeper.ClientCnxn)
> 8/30/2018 4:50:26 PM java.net.NoRouteToHostException: No route to host
> 8/30/2018 4:50:26 PM at sun.nio.ch.SocketChannelImpl.checkConnect(Native 
> Method)
> 8/30/2018 4:50:26 PM at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> 8/30/2018 4:50:26 PM at 
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> 8/30/2018 4:50:26 PM at 
> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4277) creating ephemeral node already exist

2018-11-19 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692047#comment-16692047
 ] 

Jun Rao commented on KAFKA-4277:


[~birger] : Hmm, according to 
[http://mail-archives.apache.org/mod_mbox/zookeeper-user/201701.mbox/%3CB512F6DE-C0BF-45CE-8102-6F242988268E%40apache.org%3E,]
 what you described shouldn't have happened in ZK. Was the restart after a hard 
broker failure? If so, did you wait longer than zookeeper.session.timeout.ms 
before restart the failed broker?

> creating ephemeral node already exist
> -
>
> Key: KAFKA-4277
> URL: https://issues.apache.org/jira/browse/KAFKA-4277
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 0.10.0.0
>Reporter: Feixiang Yan
>Priority: Major
>
> I use zookeeper 3.4.6.
> Zookeeper session time out, zkClient try reconnect failed. Then re-establish 
> the session and re-registering broker info in ZK, throws NODEEXISTS Exception.
>  I think it is because the ephemeral node which created by old session has 
> not removed. 
> I read the 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>  of 0.8.1, createEphemeralPathExpectConflictHandleZKBug try create node in a 
> while loop until create success. This can solve the issue. But in 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.10.0.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>   0.10.1 the function removed.
> {noformat}
> [2016-10-07 19:00:32,562] INFO Socket connection established to 
> 10.191.155.238/10.191.155.238:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,563] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,564] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1576b11f9b201bd has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,564] INFO Initiating client connection, 
> connectString=10.191.155.237:21819,10.191.155.238:21819,10.191.155.239:21819/cluster2
>  sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@ae71be2 
> (org.apache.zookeeper.ZooKeeper)
> [2016-10-07 19:00:32,566] INFO Opening socket connection to server 
> 10.191.155.237/10.191.155.237:21819. Will not attempt to authenticate using 
> SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO Socket connection established to 
> 10.191.155.237/10.191.155.237:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO EventThread shut down 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO Session establishment complete on server 
> 10.191.155.237/10.191.155.237:21819, sessionid = 0x1579ecd39c20006, 
> negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,608] INFO re-registering broker info in ZK for broker 3 
> (kafka.server.KafkaHealthcheck$SessionExpireListener)
> [2016-10-07 19:00:32,610] INFO Creating /brokers/ids/3 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,611] INFO Result of znode creation is: NODEEXISTS 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,614] ERROR Error handling event ZkEvent[New session 
> event sent to kafka.server.KafkaHealthcheck$SessionExpireListener@324f1bc] 
> (org.I0Itec.zkclient.ZkEventThread)
> java.lang.RuntimeException: A broker is already registered on the path 
> /brokers/ids/3. This probably indicates that you either have configured a 
> brokerid that is already in use, or else you have shutdown this broker and 
> restarted it faster than the zookeeper timeout so it appears to be 
> re-registering.
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:305)
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:291)
> at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:70)
> at 
> kafka.server.KafkaHealthcheck$SessionExpireListener.handleNewSession(KafkaHealthcheck.scala:104)
> at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7654) Relax requirements on serializing-only methods.

2018-11-19 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692065#comment-16692065
 ] 

Matthias J. Sax commented on KAFKA-7654:


`Produced` also allows to hand in an `StreamsPartitioner` to customize data 
partitioning if required.

What is the concrete issue you are facing with the current API and what 
improved API you suggest? Can you give an example?

> Relax requirements on serializing-only methods.
> ---
>
> Key: KAFKA-7654
> URL: https://issues.apache.org/jira/browse/KAFKA-7654
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Bieth
>Priority: Major
>
> Methods such as KStream#to shouldn't require a Produced as only the 
> serializing part is ever used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7577) Semantics of Table-Table Join with Null Message Are Incorrect

2018-11-19 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692066#comment-16692066
 ] 

Matthias J. Sax commented on KAFKA-7577:


`aggregate()` will drop any `null`-value input record as pointed out in the 
JavaDocs. Can this be issue you are facing? If yes, it's not a bug, but by 
design.

> Semantics of Table-Table Join with Null Message Are Incorrect
> -
>
> Key: KAFKA-7577
> URL: https://issues.apache.org/jira/browse/KAFKA-7577
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Daren Thomas
>Priority: Major
>
> Observed behavior of Table-Table join with a Null Message does not match the 
> semantics described in the documentation 
> ([https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-ktable-join).]
>   The expectation is:
>  * Message A results in [A, null] from the Left Join
>  * Message null (tombstone) results in null (tombstone) from the Left Join
>  The observed behavior was that the null (tombstone) message did not pass 
> through the Left Join to the output topic like expected.  This behavior was 
> observed with and without caching enabled, against a test harness, and 
> against a local Confluent 5.0.0 platform.  It was also observed that the 
> KTableKTableLeftJoinProcessor.process() function was not called for the null 
> (tombstone) message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7446) Better error message to explain the upper limit of TimeWindow

2018-11-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692077#comment-16692077
 ] 

ASF GitHub Bot commented on KAFKA-7446:
---

mrsrinivas opened a new pull request #5930: [WIP] KAFKA-7446: Better error 
messages on `WindowDuration` and `AdvanceInterval`
URL: https://github.com/apache/kafka/pull/5930
 
 
   
   Changes made as part of this PR
- Added new `Exception` classes for `WindowDuration` and `AdvanceInterval`
- Improved error message for better readability
- Corrected java documentation on `AdvanceInterval` check.
- Improved JUnit tests with `expected` attribute in `@Test`
   
   Core and Streams modules' JUnit tests are successful in local but 
   tests related to file system is failed since I am running on Windows. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Better error message to explain the upper limit of TimeWindow
> -
>
> Key: KAFKA-7446
> URL: https://issues.apache.org/jira/browse/KAFKA-7446
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Jacek Laskowski
>Assignee: Srinivas Reddy
>Priority: Trivial
>  Labels: newbie++
>
> The following code throws a {{IllegalArgumentException}}.
> {code:java}
> import org.apache.kafka.streams.kstream.TimeWindows
> import scala.concurrent.duration._
> val timeWindow = TimeWindows
> .of(1.minute.toMillis)
> .advanceBy(2.minutes.toMillis)
> {code}
> The exception is as follows and it's not clear why {{6}} is the upper 
> limit (not to mention that {{AdvanceMs}} with the uppercase {{A}} did also 
> confuse me).
> {code:java}
> java.lang.IllegalArgumentException: AdvanceMs must lie within interval (0, 
> 6].
> at 
> org.apache.kafka.streams.kstream.TimeWindows.advanceBy(TimeWindows.java:100)
> ... 44 elided{code}
> I think that the message should be more developer-friendly and explain the 
> boundaries, perhaps with an example (and a link to docs)?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7634) Punctuate not being called with merge() and/or outerJoin()

2018-11-19 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690159#comment-16690159
 ] 

Eugen Feller edited comment on KAFKA-7634 at 11/19/18 6:39 PM:
---

Sure, let me try what I can do. Code looks like this:

 
{code:java}
val table =
  bldr
.table[Key, Value1](
  keySerde,
  valueSerde1,
  topicA,
  stateStoreName
)

val stream1 =
  bldr
.stream[Key, Value2](
  keySerde,
  valueSerde2,
  topicB
)
.filterNot((k: Key, s: Value2) => s == null)

val enrichedStream = stream1
  .leftJoin[Value1, Value3](
table,
joiner,
keySerde,
valueSerde2
  )

val explodedStream =
  bldr
.stream[Key, Value4](
  keySerde,
  valueSerde4,
  topicC
)
.flatMapValues[Value3]()

val mergedStream = bldr.merge[Key, Value3](enrichedStream, explodedStream)
mergedStream.transform[Key, Value3](transformer).to(keySerde, valueSerde3, 
outputTopic){code}
 


was (Author: efeller):
Sure, let me try what I can do. Code looks like this:

 
{code:java}
val table =
  bldr
.table[Key, Value1](
  keySerde,
  valueSerde1,
  topicA,
  stateStoreName
)

val stream1 =
  bldr
.stream[Key, Value2](
  keySerde,
  valueSerde2,
  topicB
)
.filterNot((k: Key, s: Value2) => s == null)

val enrichedStream = stream1
  .leftJoin[Value1, Value3](
table,
joiner,
keySerde,
valueSerde2
  )

val explodedStream =
  bldr
.stream[Mac, Value4](
  keySerde,
  valueSerde4,
  topicC
)
.flatMapValues[Value3]()

val mergedStream = bldr.merge[Key, Value3](enrichedStream, explodedStream)
mergedStream.transform[Key, Value3](transformer).to(keySerde, valueSerde3, 
outputTopic){code}
 

> Punctuate not being called with merge() and/or outerJoin()
> --
>
> Key: KAFKA-7634
> URL: https://issues.apache.org/jira/browse/KAFKA-7634
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3
>Reporter: Eugen Feller
>Priority: Major
>
> Hi all,
> I am using the Processor API and having trouble to get Kafka streams 
> v0.11.0.3 call the punctuate() function after a merge() and/or outerJoin(). 
> Specifically, I am having a topology where I am doing flatMapValues() -> 
> merge() and/or outerJoin -> transform(). If I dont call merge() and/or 
> outerJoin() before transform(), punctuate is being called as expected.
> Thank you very much in advance for your help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7634) Punctuate not being called with merge() and/or outerJoin()

2018-11-19 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692099#comment-16692099
 ] 

Eugen Feller commented on KAFKA-7634:
-

Sure. Punctuate call looks like this:.
{code:java}
override def punctuate(timestamp: Long): KeyValue[Key, Value3] = {
  val iterator: KeyValueIterator[Key, Value3] = myStore.all()

  while (iterator.hasNext) {
val kv = iterator.next()
val key = kv.key
val value = kv.value
// Some small time based updates for the value
ctx.forward(key, value))
myStore.delete(key)
  }

  ctx.commit()
  iterator.close()
  return null
}
{code}

> Punctuate not being called with merge() and/or outerJoin()
> --
>
> Key: KAFKA-7634
> URL: https://issues.apache.org/jira/browse/KAFKA-7634
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3
>Reporter: Eugen Feller
>Priority: Major
>
> Hi all,
> I am using the Processor API and having trouble to get Kafka streams 
> v0.11.0.3 call the punctuate() function after a merge() and/or outerJoin(). 
> Specifically, I am having a topology where I am doing flatMapValues() -> 
> merge() and/or outerJoin -> transform(). If I dont call merge() and/or 
> outerJoin() before transform(), punctuate is being called as expected.
> Thank you very much in advance for your help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7634) Punctuate not being called with merge() and/or outerJoin()

2018-11-19 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692099#comment-16692099
 ] 

Eugen Feller edited comment on KAFKA-7634 at 11/19/18 6:40 PM:
---

Sure. Punctuate call looks like this:
{code:java}
override def punctuate(timestamp: Long): KeyValue[Key, Value3] = {
  val iterator: KeyValueIterator[Key, Value3] = myStore.all()

  while (iterator.hasNext) {
val kv = iterator.next()
val key = kv.key
val value = kv.value
// Some small time based updates for the value
ctx.forward(key, value)
myStore.delete(key)
  }

  ctx.commit()
  iterator.close()
  return null
}
{code}


was (Author: efeller):
Sure. Punctuate call looks like this:.
{code:java}
override def punctuate(timestamp: Long): KeyValue[Key, Value3] = {
  val iterator: KeyValueIterator[Key, Value3] = myStore.all()

  while (iterator.hasNext) {
val kv = iterator.next()
val key = kv.key
val value = kv.value
// Some small time based updates for the value
ctx.forward(key, value)
myStore.delete(key)
  }

  ctx.commit()
  iterator.close()
  return null
}
{code}

> Punctuate not being called with merge() and/or outerJoin()
> --
>
> Key: KAFKA-7634
> URL: https://issues.apache.org/jira/browse/KAFKA-7634
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3
>Reporter: Eugen Feller
>Priority: Major
>
> Hi all,
> I am using the Processor API and having trouble to get Kafka streams 
> v0.11.0.3 call the punctuate() function after a merge() and/or outerJoin(). 
> Specifically, I am having a topology where I am doing flatMapValues() -> 
> merge() and/or outerJoin -> transform(). If I dont call merge() and/or 
> outerJoin() before transform(), punctuate is being called as expected.
> Thank you very much in advance for your help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-764) Race Condition in Broker Registration after ZooKeeper disconnect

2018-11-19 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-764.
-
Resolution: Duplicate

This old issue is similar to KAFKA-7165. Closing this as duplicate KAFKA-7165

> Race Condition in Broker Registration after ZooKeeper disconnect
> 
>
> Key: KAFKA-764
> URL: https://issues.apache.org/jira/browse/KAFKA-764
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 0.7.1
>Reporter: Bob Cotton
>Priority: Major
> Attachments: BPPF_2900-Broker_Logs.tbz2
>
>
> When running our ZooKeepers in VMware, occasionally all the keepers 
> simultaneously pause long enough for the Kafka clients to time out and then 
> the keepers simultaneously un-pause.
> When this happens, the zk clients disconnect from ZooKeeper. When ZooKeeper 
> comes back ZkUtils.createEphemeralPathExpectConflict discovers the node id of 
> itself and does not re-register the broker id node and the function call 
> succeeds. Then ZooKeeper figures out the broker disconnected from the keeper 
> and deletes the ephemeral node *after* allowing the consumer to read the data 
> in the /brokers/ids/x node.  The broker then goes on to register all the 
> topics, etc.  When consumers connect, they see topic nodes associated with 
> the broker but thy can't find the broker node to get connection information 
> for the broker, sending them into a rebalance loop until they reach 
> rebalance.retries.max and fail.
> This might also be a ZooKeeper issue, but the desired behavior for a 
> disconnect case might be, if the broker node is found to explicitly delete 
> and recreate it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7634) Punctuate not being called with merge() and/or outerJoin()

2018-11-19 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692099#comment-16692099
 ] 

Eugen Feller edited comment on KAFKA-7634 at 11/19/18 6:40 PM:
---

Sure. Punctuate call looks like this:.
{code:java}
override def punctuate(timestamp: Long): KeyValue[Key, Value3] = {
  val iterator: KeyValueIterator[Key, Value3] = myStore.all()

  while (iterator.hasNext) {
val kv = iterator.next()
val key = kv.key
val value = kv.value
// Some small time based updates for the value
ctx.forward(key, value)
myStore.delete(key)
  }

  ctx.commit()
  iterator.close()
  return null
}
{code}


was (Author: efeller):
Sure. Punctuate call looks like this:.
{code:java}
override def punctuate(timestamp: Long): KeyValue[Key, Value3] = {
  val iterator: KeyValueIterator[Key, Value3] = myStore.all()

  while (iterator.hasNext) {
val kv = iterator.next()
val key = kv.key
val value = kv.value
// Some small time based updates for the value
ctx.forward(key, value))
myStore.delete(key)
  }

  ctx.commit()
  iterator.close()
  return null
}
{code}

> Punctuate not being called with merge() and/or outerJoin()
> --
>
> Key: KAFKA-7634
> URL: https://issues.apache.org/jira/browse/KAFKA-7634
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3
>Reporter: Eugen Feller
>Priority: Major
>
> Hi all,
> I am using the Processor API and having trouble to get Kafka streams 
> v0.11.0.3 call the punctuate() function after a merge() and/or outerJoin(). 
> Specifically, I am having a topology where I am doing flatMapValues() -> 
> merge() and/or outerJoin -> transform(). If I dont call merge() and/or 
> outerJoin() before transform(), punctuate is being called as expected.
> Thank you very much in advance for your help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7610) Detect consumer failures in initial JoinGroup

2018-11-19 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692108#comment-16692108
 ] 

Jason Gustafson edited comment on KAFKA-7610 at 11/19/18 6:49 PM:
--

[~guozhang] Here is a quick sketch of the approach I had in mind: 
https://github.com/hachikuji/kafka/commit/315a8ff11f5d7412394b22f4b85cec22607bc30a.
 It needs some polish of course, but the additional complexity seems tolerable. 
What do you think?


was (Author: hachikuji):
[~guozhang] Here is a quick sketch of the approach I had in mind: 
https://github.com/hachikuji/kafka/commit/ff57bf569a974d982f124a37a38ddb046f0c596d.
 It needs some polish of course, but the additional complexity seems tolerable. 
What do you think?

> Detect consumer failures in initial JoinGroup
> -
>
> Key: KAFKA-7610
> URL: https://issues.apache.org/jira/browse/KAFKA-7610
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> The session timeout and heartbeating logic in the consumer allow us to detect 
> failures after a consumer joins the group. However, we have no mechanism to 
> detect failures during a consumer's initial JoinGroup when its memberId is 
> empty. When a client fails (e.g. due to a disconnect), the newly created 
> MemberMetadata will be left in the group metadata cache. Typically when this 
> happens, the client simply retries the JoinGroup. Every retry results in a 
> new dangling member created and left in the group. These members are doomed 
> to a session timeout when the group finally finishes the rebalance, but 
> before that time, they are occupying memory. In extreme cases, when a 
> rebalance is delayed (possibly due to a buggy application), this cycle can 
> repeat and the cache can grow quite large.
> There are a couple options that come to mind to fix the problem:
> 1. During the initial JoinGroup, we can detect failed members when the TCP 
> connection fails. This is difficult at the moment because we do not have a 
> mechanism to propagate disconnects from the network layer. A potential option 
> is to treat the disconnect as just another type of request and pass it to the 
> handlers through the request queue.
> 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of 
> time, we can return earlier with the generated memberId and an error code 
> (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the 
> rebalance. The consumer can then poll for the rebalance using its assigned 
> memberId. And we can detect failures through the session timeout. Obviously 
> this option requires a KIP (and some more thought).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup

2018-11-19 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692108#comment-16692108
 ] 

Jason Gustafson commented on KAFKA-7610:


[~guozhang] Here is a quick sketch of the approach I had in mind: 
https://github.com/hachikuji/kafka/commit/ff57bf569a974d982f124a37a38ddb046f0c596d.
 It needs some polish of course, but the additional complexity seems tolerable. 
What do you think?

> Detect consumer failures in initial JoinGroup
> -
>
> Key: KAFKA-7610
> URL: https://issues.apache.org/jira/browse/KAFKA-7610
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> The session timeout and heartbeating logic in the consumer allow us to detect 
> failures after a consumer joins the group. However, we have no mechanism to 
> detect failures during a consumer's initial JoinGroup when its memberId is 
> empty. When a client fails (e.g. due to a disconnect), the newly created 
> MemberMetadata will be left in the group metadata cache. Typically when this 
> happens, the client simply retries the JoinGroup. Every retry results in a 
> new dangling member created and left in the group. These members are doomed 
> to a session timeout when the group finally finishes the rebalance, but 
> before that time, they are occupying memory. In extreme cases, when a 
> rebalance is delayed (possibly due to a buggy application), this cycle can 
> repeat and the cache can grow quite large.
> There are a couple options that come to mind to fix the problem:
> 1. During the initial JoinGroup, we can detect failed members when the TCP 
> connection fails. This is difficult at the moment because we do not have a 
> mechanism to propagate disconnects from the network layer. A potential option 
> is to treat the disconnect as just another type of request and pass it to the 
> handlers through the request queue.
> 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of 
> time, we can return earlier with the generated memberId and an error code 
> (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the 
> rebalance. The consumer can then poll for the rebalance using its assigned 
> memberId. And we can detect failures through the session timeout. Obviously 
> this option requires a KIP (and some more thought).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6585) Consolidate duplicated logic on reset tools

2018-11-19 Thread Samuel Hawker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692118#comment-16692118
 ] 

Samuel Hawker commented on KAFKA-6585:
--

[~adityavivek94] are you still working on this? It has been quite some time 
since there has been an update to this issue.
If you no longer want it I will happily pick it up.

> Consolidate duplicated logic on reset tools
> ---
>
> Key: KAFKA-6585
> URL: https://issues.apache.org/jira/browse/KAFKA-6585
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Assignee: Aditya Vivek
>Priority: Minor
>  Labels: newbie
>
> The consumer reset tool and streams reset tool today shares lot of common 
> logics such as resetting to a datetime etc. We can consolidate them into a 
> common class which directly depend on admin client at simply let these tools 
> to use the class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6585) Consolidate duplicated logic on reset tools

2018-11-19 Thread Samuel Hawker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692129#comment-16692129
 ] 

Samuel Hawker commented on KAFKA-6585:
--

Assuming I can pick it up I have a question.

The stream resetter tool is the class with path: 
core/src/main/scala/kafka/tools/StreamsResetter.java

Where in the consumer reset tool? I cannot find it.

> Consolidate duplicated logic on reset tools
> ---
>
> Key: KAFKA-6585
> URL: https://issues.apache.org/jira/browse/KAFKA-6585
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Assignee: Aditya Vivek
>Priority: Minor
>  Labels: newbie
>
> The consumer reset tool and streams reset tool today shares lot of common 
> logics such as resetting to a datetime etc. We can consolidate them into a 
> common class which directly depend on admin client at simply let these tools 
> to use the class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7577) Semantics of Table-Table Join with Null Message Are Incorrect

2018-11-19 Thread Daren Thomas (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692136#comment-16692136
 ] 

Daren Thomas commented on KAFKA-7577:
-

What is the reasoning behind that being the design?  

I'm using Kafka Connect to stream database updates to a Kafka topic.  I then 
rekey and repartition the streams so I can perform various table joins.  If 
aggregate() drops any `null`-value input records, how are deletions suppose to 
propagate into the system and remove entries from the joins?  Right now I'm 
catching those deletions and creating my own messages that are specifically 
constructed to generate the desired output from the joins.  

> Semantics of Table-Table Join with Null Message Are Incorrect
> -
>
> Key: KAFKA-7577
> URL: https://issues.apache.org/jira/browse/KAFKA-7577
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Daren Thomas
>Priority: Major
>
> Observed behavior of Table-Table join with a Null Message does not match the 
> semantics described in the documentation 
> ([https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-ktable-join).]
>   The expectation is:
>  * Message A results in [A, null] from the Left Join
>  * Message null (tombstone) results in null (tombstone) from the Left Join
>  The observed behavior was that the null (tombstone) message did not pass 
> through the Left Join to the output topic like expected.  This behavior was 
> observed with and without caching enabled, against a test harness, and 
> against a local Confluent 5.0.0 platform.  It was also observed that the 
> KTableKTableLeftJoinProcessor.process() function was not called for the null 
> (tombstone) message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.

2018-11-19 Thread Michael Noll (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692148#comment-16692148
 ] 

Michael Noll commented on KAFKA-7653:
-

Thanks for raising this, [~mark.tranter]!  The issue makes sense to me.

Question for clarification:  The workaround today would be to explicitly 
specify the serdes you want (this way one can differentiate between key serdes 
vs. value serdes), but this would by definition negate the convenience in the 
Scala API where you normally do not need to do that (because of implicit 
serdes).  Right?

> Streams-Scala: Add type level differentiation for Key and Value serdes.
> ---
>
> Key: KAFKA-7653
> URL: https://issues.apache.org/jira/browse/KAFKA-7653
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mark Tranter
>Priority: Minor
>
> Implicit resolution/conversion of Serdes/Consumed etc is a big improvement 
> for the Scala Streams API. However in cases where a user needs to 
> differentiate between Key and Value serializer functionality (i.e. using the 
> Schema Registry), implicit resolution doesn't help and could cause issues. 
> e.g.
> {code:java}
> case class MouseClickEvent(pageId: Long, userId: String)
> builder
>   // Long serde taken from implicit scope configured with
>   // `isKey` = true
>   .stream[Long, MouseClickEvent]("mouse-clicks")
>   .selectKey((_,v) => v.userId)
>   .groupByKey
>   .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => 
> count + 1)
>   .toStream
>   // Same Long serde taken from implicit scope configured with
>   // `isKey` = true, even thought the `Long` value in this case
>   // will be the Value
>   .to("mouse-clicks-by-user")
> {code}
> It would be ideal if Key and Value Serde/SerdeWrapper types/type classes 
> could be introduced to overcome this limitation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7577) Semantics of Table-Table Join with Null Message Are Incorrect

2018-11-19 Thread Daren Thomas (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692143#comment-16692143
 ] 

Daren Thomas commented on KAFKA-7577:
-

This is the detailed behavior of KGroupedTable:
 * When a tombstone record – i.e. a record with a {{null}} value – is received 
for a key (e.g., DELETE), then only the subtractor is called. Note that, 
whenever the subtractor returns a {{null}} value itself, then the corresponding 
key is removed from the resulting {{KTable}}. If that happens, any next input 
record for that key will trigger the initializer again.

That is the behavior I hoped to see.  It seems that my aggregation is following 
the semantics of KGroupedStream instead.

> Semantics of Table-Table Join with Null Message Are Incorrect
> -
>
> Key: KAFKA-7577
> URL: https://issues.apache.org/jira/browse/KAFKA-7577
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Daren Thomas
>Priority: Major
>
> Observed behavior of Table-Table join with a Null Message does not match the 
> semantics described in the documentation 
> ([https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-ktable-join).]
>   The expectation is:
>  * Message A results in [A, null] from the Left Join
>  * Message null (tombstone) results in null (tombstone) from the Left Join
>  The observed behavior was that the null (tombstone) message did not pass 
> through the Left Join to the output topic like expected.  This behavior was 
> observed with and without caching enabled, against a test harness, and 
> against a local Confluent 5.0.0 platform.  It was also observed that the 
> KTableKTableLeftJoinProcessor.process() function was not called for the null 
> (tombstone) message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7654) Relax requirements on serializing-only methods.

2018-11-19 Thread Bruno Bieth (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692168#comment-16692168
 ] 

Bruno Bieth commented on KAFKA-7654:


Hi,

[~bbejeck] this is future proofing and in the meantime the API forces us to 
provide things that aren't used.

[~mjsax] if you (really) need a `StreamsPartitioner` then you can create 
another class that wraps a `StreamsPartitioner` and a pair of `Serializer`s.

The concrete issue that I'm facing is that I'm forced to provide something that 
isn't used.
So it's two-fold, first the API is puzzling (a `Produced` requiring a 
`Deserializer` is a smell to me), then coming up with a deserializer for a 
non-trivial format is way too much work for something that's not going to be 
used. Any good serialization library actually separate the 2 concerns 
(serialization/deserialization) and acknowledge the fact that you don't always 
need both.

I would suggest 2 possible changes:
1) a stop-gap overload: `KStream#to(String, Serializer, Serializer)`

2) fix the API in backward incompatible ways - change `Produced` and `Consumed` 
so that they respectively need `Serializer`s and `Deserializer`s only. For the 
cases where you need a `Serde` (say for a `Materialized`) then you'll have to 
change the API a bit further. Let's take a look at this:

{code:java}
public synchronized  KTable table(final String topic,
  final Consumed consumed,
  final Materialized> materialized) {
// ...

materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());
 // < HERE 
// ...
return internalStreamsBuilder.table(topic, consumedInternal, 
materializedInternal);
}
{code}

The materialized is mutated to get the serdes from the consumed, which is 
counter-intuitive (I guess that's why it's explained in the documentation). I 
don't understand why the materialized need to have the exact same serde as the 
consumed used to read `topic`, but assuming there's a good reason there are 
ways to enforce this at the type level rather than overriding pre-existing 
values. Once this is done then consumed is only used for it's deserializers.
I believe the same reasoning can be applied throughout the whole API (`stream`, 
`through`, `to`) where serializers and deserializers are conflated. I can help 
more if you're willing to go down that road :)

> Relax requirements on serializing-only methods.
> ---
>
> Key: KAFKA-7654
> URL: https://issues.apache.org/jira/browse/KAFKA-7654
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Bieth
>Priority: Major
>
> Methods such as KStream#to shouldn't require a Produced as only the 
> serializing part is ever used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4277) creating ephemeral node already exist

2018-11-19 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692211#comment-16692211
 ] 

Jonathan Santilli commented on KAFKA-4277:
--

Hello [~birger],  KAFKA-7165 will solve the NODEEXISTS issue in case the 
session is regenerated (expired) and the Broker was the author of creating the 
ephemeral znode into zookeeper.

If I understood correctly, I think is what happened according to part of the 
description:

 
{noformat}
...
[2016-10-07 19:00:32,563] INFO zookeeper state changed (Expired) 
(org.I0Itec.zkclient.ZkClient)
...
{noformat}
 

> creating ephemeral node already exist
> -
>
> Key: KAFKA-4277
> URL: https://issues.apache.org/jira/browse/KAFKA-4277
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 0.10.0.0
>Reporter: Feixiang Yan
>Priority: Major
>
> I use zookeeper 3.4.6.
> Zookeeper session time out, zkClient try reconnect failed. Then re-establish 
> the session and re-registering broker info in ZK, throws NODEEXISTS Exception.
>  I think it is because the ephemeral node which created by old session has 
> not removed. 
> I read the 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>  of 0.8.1, createEphemeralPathExpectConflictHandleZKBug try create node in a 
> while loop until create success. This can solve the issue. But in 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.10.0.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>   0.10.1 the function removed.
> {noformat}
> [2016-10-07 19:00:32,562] INFO Socket connection established to 
> 10.191.155.238/10.191.155.238:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,563] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,564] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1576b11f9b201bd has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,564] INFO Initiating client connection, 
> connectString=10.191.155.237:21819,10.191.155.238:21819,10.191.155.239:21819/cluster2
>  sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@ae71be2 
> (org.apache.zookeeper.ZooKeeper)
> [2016-10-07 19:00:32,566] INFO Opening socket connection to server 
> 10.191.155.237/10.191.155.237:21819. Will not attempt to authenticate using 
> SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO Socket connection established to 
> 10.191.155.237/10.191.155.237:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO EventThread shut down 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO Session establishment complete on server 
> 10.191.155.237/10.191.155.237:21819, sessionid = 0x1579ecd39c20006, 
> negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,608] INFO re-registering broker info in ZK for broker 3 
> (kafka.server.KafkaHealthcheck$SessionExpireListener)
> [2016-10-07 19:00:32,610] INFO Creating /brokers/ids/3 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,611] INFO Result of znode creation is: NODEEXISTS 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,614] ERROR Error handling event ZkEvent[New session 
> event sent to kafka.server.KafkaHealthcheck$SessionExpireListener@324f1bc] 
> (org.I0Itec.zkclient.ZkEventThread)
> java.lang.RuntimeException: A broker is already registered on the path 
> /brokers/ids/3. This probably indicates that you either have configured a 
> brokerid that is already in use, or else you have shutdown this broker and 
> restarted it faster than the zookeeper timeout so it appears to be 
> re-registering.
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:305)
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:291)
> at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:70)
> at 
> kafka.server.KafkaHealthcheck$SessionExpireListener.handleNewSession(KafkaHealthcheck.scala:104)
> at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow

2018-11-19 Thread Patrick Haas (JIRA)
Patrick Haas created KAFKA-7656:
---

 Summary: ReplicaManager fetch fails on leader due to long/integer 
overflow
 Key: KAFKA-7656
 URL: https://issues.apache.org/jira/browse/KAFKA-7656
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.0.1
 Environment: Linux 3.10.0-693.el7.x86_64 #1 SMP Thu Jul 6 19:56:57 EDT 
2017 x86_64 x86_64 x86_64 GNU/Linux
Reporter: Patrick Haas


(Note: From 2.0.1-cp1 from confluent distribution)

This may not be the root cause, but the immediate issue is that 
[Log#read|https://github.com/apache/kafka/blob/9646602d6832ad0a5f2e9b65af5df1a80a571691/core/src/main/scala/kafka/log/Log.scala#L1179]
 computes *maxPosition* as a long, which is then passed to

{{ val fetchInfo = 
[segment.read|https://github.com/apache/kafka/blob/9646602d6832ad0a5f2e9b65af5df1a80a571691/core/src/main/scala/kafka/log/Log.scala#L1192](startOffset,
 maxOffset, maxLength, *{color:#FF}maxPosition{color}*, minOneMessage)}}

This results in Integer.MIN_VALUE getting passed, which triggers an assertion 
error.

 

{{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error processing 
fetch operation on partition __consumer_offsets-20, offset 0 
(kafka.server.ReplicaManager)}}
{{java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read 
from segment FileRecords(file= 
/prod/kafka/data/kafka-logs/__consumer_offsets-20/.log, 
start=0, end=2147483647)}}
{{ at kafka.log.LogSegment.read(LogSegment.scala:274)}}
{{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)}}
{{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)}}
{{ at kafka.log.Log.maybeHandleIOException(Log.scala:1842)}}
{{ at kafka.log.Log.read(Log.scala:1114)}}
{{ at 
kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)}}
{{ at 
kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)}}
{{ at 
kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)}}
{{ at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}}
{{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}}
{{ at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)}}
{{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)}}
{{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)}}
{{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:685)}}
{{ at kafka.server.KafkaApis.handle(KafkaApis.scala:114)}}
{{ at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}}
{{ at java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow

2018-11-19 Thread Patrick Haas (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrick Haas updated KAFKA-7656:

Description: 
(Note: From 2.0.1-cp1 from confluent distribution)

This may not be the root cause, but the immediate issue is that 
[Log#read|https://github.com/apache/kafka/blob/9646602d6832ad0a5f2e9b65af5df1a80a571691/core/src/main/scala/kafka/log/Log.scala#L1179]
 computes *maxPosition* as a long, which is then passed to

val fetchInfo = 
[segment.read|https://github.com/apache/kafka/blob/9646602d6832ad0a5f2e9b65af5df1a80a571691/core/src/main/scala/kafka/log/Log.scala#L1192](startOffset,
 maxOffset, maxLength, *{color:#FF}maxPosition{color}*, minOneMessage)

This results in Integer.MIN_VALUE getting passed, which triggers an assertion 
error.
 

{{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error processing 
fetch operation on partition __consumer_offsets-20, offset 0 
(kafka.server.ReplicaManager)}}
{{java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read 
from segment FileRecords(file= 
/prod/kafka/data/kafka-logs/__consumer_offsets-20/.log, 
start=0, end=2147483647)}}
{{ at kafka.log.LogSegment.read(LogSegment.scala:274)}}
{{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)}}
{{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)}}
{{ at kafka.log.Log.maybeHandleIOException(Log.scala:1842)}}
{{ at kafka.log.Log.read(Log.scala:1114)}}
{{ at 
kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)}}
{{ at 
kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)}}
{{ at 
kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)}}
{{ at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}}
{{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}}
{{ at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)}}
{{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)}}
{{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)}}
{{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:685)}}
{{ at kafka.server.KafkaApis.handle(KafkaApis.scala:114)}}
{{ at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}}
{{ at java.lang.Thread.run(Thread.java:748)}}

  was:
(Note: From 2.0.1-cp1 from confluent distribution)

This may not be the root cause, but the immediate issue is that 
[Log#read|https://github.com/apache/kafka/blob/9646602d6832ad0a5f2e9b65af5df1a80a571691/core/src/main/scala/kafka/log/Log.scala#L1179]
 computes *maxPosition* as a long, which is then passed to

{{ val fetchInfo = 
[segment.read|https://github.com/apache/kafka/blob/9646602d6832ad0a5f2e9b65af5df1a80a571691/core/src/main/scala/kafka/log/Log.scala#L1192](startOffset,
 maxOffset, maxLength, *{color:#FF}maxPosition{color}*, minOneMessage)}}

This results in Integer.MIN_VALUE getting passed, which triggers an assertion 
error.

 

{{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error processing 
fetch operation on partition __consumer_offsets-20, offset 0 
(kafka.server.ReplicaManager)}}
{{java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read 
from segment FileRecords(file= 
/prod/kafka/data/kafka-logs/__consumer_offsets-20/.log, 
start=0, end=2147483647)}}
{{ at kafka.log.LogSegment.read(LogSegment.scala:274)}}
{{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)}}
{{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)}}
{{ at kafka.log.Log.maybeHandleIOException(Log.scala:1842)}}
{{ at kafka.log.Log.read(Log.scala:1114)}}
{{ at 
kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)}}
{{ at 
kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)}}
{{ at 
kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)}}
{{ at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}}
{{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}}
{{ at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)}}
{{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)}}
{{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)}}
{{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:685)}}
{{ at kafka.server.KafkaApis.handle(KafkaApis.scala:114)}}
{{ at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}}
{{ at java.lang.Thread.run(Thread.java:748)}}


> ReplicaManager fetch fails on leader due to long/integer overflow
> -
>
> Key: KAFKA-7656
> URL: https://issues.apache.org/jira/browse/KAFKA-7656
> Pr

[jira] [Updated] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow

2018-11-19 Thread Patrick Haas (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrick Haas updated KAFKA-7656:

Description: 
(Note: From 2.0.1-cp1 from confluent distribution)


{{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error processing 
fetch operation on partition __consumer_offsets-20, offset 0 
(kafka.server.ReplicaManager)}}
{{java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read 
from segment FileRecords(file= 
/prod/kafka/data/kafka-logs/__consumer_offsets-20/.log, 
start=0, end=2147483647)}}
{{ at kafka.log.LogSegment.read(LogSegment.scala:274)}}
{{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)}}
{{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)}}
{{ at kafka.log.Log.maybeHandleIOException(Log.scala:1842)}}
{{ at kafka.log.Log.read(Log.scala:1114)}}
{{ at 
kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)}}
{{ at 
kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)}}
{{ at 
kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)}}
{{ at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}}
{{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}}
{{ at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)}}
{{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)}}
{{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)}}
{{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:685)}}
{{ at kafka.server.KafkaApis.handle(KafkaApis.scala:114)}}
{{ at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}}
{{ at java.lang.Thread.run(Thread.java:748)}}

  was:
(Note: From 2.0.1-cp1 from confluent distribution)

Edit: Misread the code, issue is with maxLength..
-This may not be the root cause, but the immediate issue is that 
[Log#read|https://github.com/apache/kafka/blob/9646602d6832ad0a5f2e9b65af5df1a80a571691/core/src/main/scala/kafka/log/Log.scala#L1179]
 computes *maxPosition* as a long, which is then passed to

val fetchInfo = 
[segment.read|https://github.com/apache/kafka/blob/9646602d6832ad0a5f2e9b65af5df1a80a571691/core/src/main/scala/kafka/log/Log.scala#L1192](startOffset,
 maxOffset, maxLength, *{color:#FF}maxPosition{color}*, minOneMessage)-

This results in Integer.MIN_VALUE getting passed, which triggers an assertion 
error.
 

{{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error processing 
fetch operation on partition __consumer_offsets-20, offset 0 
(kafka.server.ReplicaManager)}}
{{java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read 
from segment FileRecords(file= 
/prod/kafka/data/kafka-logs/__consumer_offsets-20/.log, 
start=0, end=2147483647)}}
{{ at kafka.log.LogSegment.read(LogSegment.scala:274)}}
{{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)}}
{{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)}}
{{ at kafka.log.Log.maybeHandleIOException(Log.scala:1842)}}
{{ at kafka.log.Log.read(Log.scala:1114)}}
{{ at 
kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)}}
{{ at 
kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)}}
{{ at 
kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)}}
{{ at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}}
{{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}}
{{ at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)}}
{{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)}}
{{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)}}
{{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:685)}}
{{ at kafka.server.KafkaApis.handle(KafkaApis.scala:114)}}
{{ at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}}
{{ at java.lang.Thread.run(Thread.java:748)}}


> ReplicaManager fetch fails on leader due to long/integer overflow
> -
>
> Key: KAFKA-7656
> URL: https://issues.apache.org/jira/browse/KAFKA-7656
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.1
> Environment: Linux 3.10.0-693.el7.x86_64 #1 SMP Thu Jul 6 19:56:57 
> EDT 2017 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: Patrick Haas
>Priority: Major
>
> (Note: From 2.0.1-cp1 from confluent distribution)
> {{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error 
> processing fetch operation on partition __consumer_offsets-20, offset 0 
> (kafka.server.ReplicaManager)}}
> {{java.lang.IllegalArgumentE

[jira] [Updated] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow

2018-11-19 Thread Patrick Haas (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrick Haas updated KAFKA-7656:

Description: 
(Note: From 2.0.1-cp1 from confluent distribution)

Edit: Misread the code, issue is with maxLength..
-This may not be the root cause, but the immediate issue is that 
[Log#read|https://github.com/apache/kafka/blob/9646602d6832ad0a5f2e9b65af5df1a80a571691/core/src/main/scala/kafka/log/Log.scala#L1179]
 computes *maxPosition* as a long, which is then passed to

val fetchInfo = 
[segment.read|https://github.com/apache/kafka/blob/9646602d6832ad0a5f2e9b65af5df1a80a571691/core/src/main/scala/kafka/log/Log.scala#L1192](startOffset,
 maxOffset, maxLength, *{color:#FF}maxPosition{color}*, minOneMessage)-

This results in Integer.MIN_VALUE getting passed, which triggers an assertion 
error.
 

{{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error processing 
fetch operation on partition __consumer_offsets-20, offset 0 
(kafka.server.ReplicaManager)}}
{{java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read 
from segment FileRecords(file= 
/prod/kafka/data/kafka-logs/__consumer_offsets-20/.log, 
start=0, end=2147483647)}}
{{ at kafka.log.LogSegment.read(LogSegment.scala:274)}}
{{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)}}
{{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)}}
{{ at kafka.log.Log.maybeHandleIOException(Log.scala:1842)}}
{{ at kafka.log.Log.read(Log.scala:1114)}}
{{ at 
kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)}}
{{ at 
kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)}}
{{ at 
kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)}}
{{ at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}}
{{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}}
{{ at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)}}
{{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)}}
{{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)}}
{{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:685)}}
{{ at kafka.server.KafkaApis.handle(KafkaApis.scala:114)}}
{{ at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}}
{{ at java.lang.Thread.run(Thread.java:748)}}

  was:
(Note: From 2.0.1-cp1 from confluent distribution)

This may not be the root cause, but the immediate issue is that 
[Log#read|https://github.com/apache/kafka/blob/9646602d6832ad0a5f2e9b65af5df1a80a571691/core/src/main/scala/kafka/log/Log.scala#L1179]
 computes *maxPosition* as a long, which is then passed to

val fetchInfo = 
[segment.read|https://github.com/apache/kafka/blob/9646602d6832ad0a5f2e9b65af5df1a80a571691/core/src/main/scala/kafka/log/Log.scala#L1192](startOffset,
 maxOffset, maxLength, *{color:#FF}maxPosition{color}*, minOneMessage)

This results in Integer.MIN_VALUE getting passed, which triggers an assertion 
error.
 

{{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error processing 
fetch operation on partition __consumer_offsets-20, offset 0 
(kafka.server.ReplicaManager)}}
{{java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read 
from segment FileRecords(file= 
/prod/kafka/data/kafka-logs/__consumer_offsets-20/.log, 
start=0, end=2147483647)}}
{{ at kafka.log.LogSegment.read(LogSegment.scala:274)}}
{{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)}}
{{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)}}
{{ at kafka.log.Log.maybeHandleIOException(Log.scala:1842)}}
{{ at kafka.log.Log.read(Log.scala:1114)}}
{{ at 
kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)}}
{{ at 
kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)}}
{{ at 
kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)}}
{{ at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}}
{{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}}
{{ at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)}}
{{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)}}
{{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)}}
{{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:685)}}
{{ at kafka.server.KafkaApis.handle(KafkaApis.scala:114)}}
{{ at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}}
{{ at java.lang.Thread.run(Thread.java:748)}}


> ReplicaManager fetch fails on leader due to long/integer overflow
> -
>
> Key: KAFKA-7656
> URL: https://issues.apac

[jira] [Commented] (KAFKA-7531) NPE NullPointerException at TransactionCoordinator handleEndTransaction

2018-11-19 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692316#comment-16692316
 ] 

Jason Gustafson commented on KAFKA-7531:


[~spuzon] Thanks for the additional info. I spent some time looking at this and 
I'm also having a hard time explaining the NPE. Which jdk version are you 
using? As Guozhang suggests, since you can reproduce the issue, would you be 
willing to add some additional logging? I can give you a patch if that is 
easier.

> NPE NullPointerException at TransactionCoordinator handleEndTransaction
> ---
>
> Key: KAFKA-7531
> URL: https://issues.apache.org/jira/browse/KAFKA-7531
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.0.0
>Reporter: Sebastian Puzoń
>Priority: Critical
> Fix For: 2.1.1, 2.0.2
>
>
> Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper.
> Streams Application 4 instances, each has 5 Streams threads, total 20 stream 
> threads.
> I observe NPE NullPointerException at coordinator broker which causes all 
> application stream threads shutdown, here's stack from broker:
> {code:java}
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member 
> elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe
>  in group elo
> g_agg has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance 
> group elog_agg with old generation 49 (__consumer_offsets-21) 
> (kafka.coordinator.gro
> up.GroupCoordinator)
> [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group 
> elog_agg generation 50 (__consumer_offsets-21) 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from 
> leader for group elog_agg for generation 50 
> (kafka.coordinator.group.GroupCoordina
> tor)
> [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized 
> transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on 
> partition _
> _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator)
> [
> [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr
> ue} (kafka.server.KafkaApis)
> java.lang.NullPointerException
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
>  at scala.util.Either$RightProjection.flatMap(Either.scala:702)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
>  at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>  at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>  at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
>  at 
> kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
>  at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
>  at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:13

[jira] [Commented] (KAFKA-7531) NPE NullPointerException at TransactionCoordinator handleEndTransaction

2018-11-19 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692347#comment-16692347
 ] 

Sebastian Puzoń commented on KAFKA-7531:


[~guozhang] Your're right, kafka documentation does not specify 
session.timeout.ms for broker, just two you mentioned. not sure where did I got 
this setting for broker, probably StackOverflow :)

I didn't build kafka myself, I'm using binaries from 
[https://kafka.apache.org/downloads]

I can try run from source code and apply patch or add extra 
logs/instrumentation, if you can point me where can I find source code.

I'm using:
{code:java}
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)
{code}
{code:java}
Red Hat Enterprise Linux Server release 6.10 (Santiago)
Patch level   2018.3.0

{code}
For past 6 days Kafka brokers are rock steady, no single instance of NPE, I 
enabled debug logging for most log4j loggers except few. I pull data from 
certain broker mbeans and all consumer mbeans and store in ELK, same with 
broker logs.

 

> NPE NullPointerException at TransactionCoordinator handleEndTransaction
> ---
>
> Key: KAFKA-7531
> URL: https://issues.apache.org/jira/browse/KAFKA-7531
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.0.0
>Reporter: Sebastian Puzoń
>Priority: Critical
> Fix For: 2.1.1, 2.0.2
>
>
> Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper.
> Streams Application 4 instances, each has 5 Streams threads, total 20 stream 
> threads.
> I observe NPE NullPointerException at coordinator broker which causes all 
> application stream threads shutdown, here's stack from broker:
> {code:java}
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member 
> elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe
>  in group elo
> g_agg has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance 
> group elog_agg with old generation 49 (__consumer_offsets-21) 
> (kafka.coordinator.gro
> up.GroupCoordinator)
> [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group 
> elog_agg generation 50 (__consumer_offsets-21) 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from 
> leader for group elog_agg for generation 50 
> (kafka.coordinator.group.GroupCoordina
> tor)
> [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized 
> transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on 
> partition _
> _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator)
> [
> [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr
> ue} (kafka.server.KafkaApis)
> java.lang.NullPointerException
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
>  at scala.util.Either$RightProjection.flatMap(Either.scala:702)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
>  at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>  at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>  at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
>  at 
> kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
>  at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.

[jira] [Commented] (KAFKA-7531) NPE NullPointerException at TransactionCoordinator handleEndTransaction

2018-11-19 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692365#comment-16692365
 ] 

Sebastian Puzoń commented on KAFKA-7531:


ok I'll check this : [https://github.com/apache/kafka]

 

> NPE NullPointerException at TransactionCoordinator handleEndTransaction
> ---
>
> Key: KAFKA-7531
> URL: https://issues.apache.org/jira/browse/KAFKA-7531
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.0.0
>Reporter: Sebastian Puzoń
>Priority: Critical
> Fix For: 2.1.1, 2.0.2
>
>
> Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper.
> Streams Application 4 instances, each has 5 Streams threads, total 20 stream 
> threads.
> I observe NPE NullPointerException at coordinator broker which causes all 
> application stream threads shutdown, here's stack from broker:
> {code:java}
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member 
> elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe
>  in group elo
> g_agg has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance 
> group elog_agg with old generation 49 (__consumer_offsets-21) 
> (kafka.coordinator.gro
> up.GroupCoordinator)
> [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group 
> elog_agg generation 50 (__consumer_offsets-21) 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from 
> leader for group elog_agg for generation 50 
> (kafka.coordinator.group.GroupCoordina
> tor)
> [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized 
> transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on 
> partition _
> _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator)
> [
> [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr
> ue} (kafka.server.KafkaApis)
> java.lang.NullPointerException
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
>  at scala.util.Either$RightProjection.flatMap(Either.scala:702)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
>  at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>  at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>  at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
>  at 
> kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
>  at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
>  at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>  at java.lang.Thread.run(Thread.java:745)
> [2018-10-22 21:52:15,958] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_9,producer_id=1005,producer_epoc

[jira] [Commented] (KAFKA-7638) Trogdor - Support mass task creation endpoint

2018-11-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692366#comment-16692366
 ] 

ASF GitHub Bot commented on KAFKA-7638:
---

stanislavkozlovski opened a new pull request #5932: KAFKA-7638: Add support for 
multiple task creation in one request
URL: https://github.com/apache/kafka/pull/5932
 
 
   [JIRA](https://issues.apache.org/jira/browse/KAFKA-7638)
   Trogdor Coordinator now supports creation of multiple tasks in one request 
call.
   This is exposed under `/coordinator/tasks/mass_create`
   Sample body: 
   ```json
   {"tasks": [{...}, {...}]}
   ```
   
   Additional changes:
   The /tasks/create endpoint (and now mass_create) return 400 with an error 
message whenever an error is encountered during the task creation. Previously, 
Trogdor would create the task as "DONE" and return a successful status code, 
making users manually inspect the tasks' status.
   This, unfortunately, has the nasty downside of returning 400 when an 
unexpected internal server error occurs. 
   I think that we can live with this for now and open up another KIP that 
could address error handling in the Trogdor Coordinator REST API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Trogdor - Support mass task creation endpoint
> -
>
> Key: KAFKA-7638
> URL: https://issues.apache.org/jira/browse/KAFKA-7638
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
>
> Trogdor supports the creation of tasks via the `coordinator/tasks/create` 
> endpoint - it currently accepts only one task.
> Since Trogdor support scheduling multiple jobs to execute at a certain time 
> (via the `startTime` task parameter leveraged by all tasks), it makes sense 
> to support creating multiple tasks in a single endpoint. 
> Users might want to leverage the scheduler to, say, create 100 tasks. In the 
> current model, they would need to issue 100 requests - which is inefficient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-11-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692414#comment-16692414
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

mjsax closed pull request #5767: KAFKA-7192: Wipe out state store if EOS is 
turned on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5767
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
index a99e45147b9..40ce79c289e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java
@@ -24,4 +24,8 @@
 super(logContext, "standby task");
 }
 
+void addToRestoring(final StandbyTask task) {
+throw new UnsupportedOperationException("Standby tasks cannot be 
restored actively.");
+}
+
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 7b05f6488e7..98c2bbd2563 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -23,12 +23,21 @@
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 class AssignedStreamsTasks extends AssignedTasks implements 
RestoringTasks {
 private final Logger log;
+private final Map restoring = new HashMap<>();
+private final Set restoredPartitions = new HashSet<>();
+private final Map restoringByPartition = new 
HashMap<>();
 private final TaskAction maybeCommitAction;
 private int committed = 0;
 
@@ -59,6 +68,52 @@ public StreamTask restoringTaskFor(final TopicPartition 
partition) {
 return restoringByPartition.get(partition);
 }
 
+void updateRestored(final Collection restored) {
+if (restored.isEmpty()) {
+return;
+}
+log.trace("Stream task changelog partitions that have completed 
restoring so far: {}", restored);
+restoredPartitions.addAll(restored);
+for (final Iterator> it = 
restoring.entrySet().iterator(); it.hasNext(); ) {
+final Map.Entry entry = it.next();
+final StreamTask task = entry.getValue();
+if (restoredPartitions.containsAll(task.changelogPartitions())) {
+transitionToRunning(task);
+it.remove();
+log.trace("Stream task {} completed restoration as all its 
changelog partitions {} have been applied to restore state",
+task.id(),
+task.changelogPartitions());
+} else {
+if (log.isTraceEnabled()) {
+final HashSet outstandingPartitions = new 
HashSet<>(task.changelogPartitions());
+outstandingPartitions.removeAll(restoredPartitions);
+log.trace("Stream task {} cannot resume processing yet 
since some of its changelog partitions have not completed restoring: {}",
+task.id(),
+outstandingPartitions);
+}
+}
+}
+if (allTasksRunning()) {
+restoredPartitions.clear();
+}
+}
+
+void addToRestoring(final StreamTask task) {
+restoring.put(task.id(), task);
+for (final TopicPartition topicPartition : task.partitions()) {
+restoringByPartition.put(topicPartition, task);
+}
+for (final TopicPartition topicPartition : task.changelogPartitions()) 
{
+restoringByPartition.put(topicPartition, task);
+}
+}
+
+boolean allTasksRunning() {
+return created.isEmpty()
+&& suspended.isEmpty()
+&& restoring.isEmpty();
+}
+
 /**
  * @throws TaskMigratedException if committing offsets failed (non-EOS)
  *   or if the task producer got fenced (EOS)
@@ -139,4 +194,43 @@ int punctuate() {
 return punctuated;
 }
 
+RuntimeException suspend() {
+fi

[jira] [Updated] (KAFKA-7192) State-store can desynchronise with changelog

2018-11-19 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7192:
---
Fix Version/s: 1.1.2

> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 0.11.0.4, 1.0.3, 1.1.2, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-11-19 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692417#comment-16692417
 ] 

Matthias J. Sax commented on KAFKA-7192:


[~tobiajo] Back ported to 1.1 branch :)

> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 0.11.0.4, 1.0.3, 1.1.2, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2018-11-19 Thread Adam Bellemare (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692424#comment-16692424
 ] 

Adam Bellemare commented on KAFKA-4212:
---

We have some business requirements where we need to store a large amount of 
data in a KTable for interactive querying. After a period of time, we want to 
time it out via TTL.

Currently we have an implementation of 2.0 with RocksDB using TTL. In addition, 
we prevent records from loading from the changelog if recordTime + TTL < 
referenceTimeStamp (default = System.currentTimeInMillis() ). We do not set any 
retention properties besides the default on the changelog (such that if we 
needed to change the retention period, we would not have to recompute 
everything).

I believe that the ask of this ticket and the solution my company is using 
intersect. As far as I can see, there is no KIP to address this specific issue. 
Is there anything obvious preventing my solution from being a good candidate? I 
will create a KIP and submit a PR if there are no obvious blockers, if this 
ticket isn't out of date or overtaken by events.

 

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7653) Streams-Scala: Add type level differentiation for Key and Value serdes.

2018-11-19 Thread Mark Tranter (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692440#comment-16692440
 ] 

Mark Tranter commented on KAFKA-7653:
-

Thanks [~miguno]

Yes that is one workaround. And as you say, negates the use of implicit serdes.

Another option might be to wrap the types to be serialized
{code:java}
case class StringKey(k: String)
case class Value(v: String){code}
Then use these types within the Streams Builder and write basic wrapper 
serializers for these wrapper types. Again not ideal.

 

 

I'm more than happy to help out with a fix for this if we can decide on an 
appropriate abstraction to use.

I cant see a nice way of doing it currently that wouldn't cause breaking 
changes to existing users. 

 

Thanks for your reply!

> Streams-Scala: Add type level differentiation for Key and Value serdes.
> ---
>
> Key: KAFKA-7653
> URL: https://issues.apache.org/jira/browse/KAFKA-7653
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mark Tranter
>Priority: Minor
>
> Implicit resolution/conversion of Serdes/Consumed etc is a big improvement 
> for the Scala Streams API. However in cases where a user needs to 
> differentiate between Key and Value serializer functionality (i.e. using the 
> Schema Registry), implicit resolution doesn't help and could cause issues. 
> e.g.
> {code:java}
> case class MouseClickEvent(pageId: Long, userId: String)
> builder
>   // Long serde taken from implicit scope configured with
>   // `isKey` = true
>   .stream[Long, MouseClickEvent]("mouse-clicks")
>   .selectKey((_,v) => v.userId)
>   .groupByKey
>   .aggregate(() => 0L, (_: String, mce: MouseClickEvent, count: Long) => 
> count + 1)
>   .toStream
>   // Same Long serde taken from implicit scope configured with
>   // `isKey` = true, even thought the `Long` value in this case
>   // will be the Value
>   .to("mouse-clicks-by-user")
> {code}
> It would be ideal if Key and Value Serde/SerdeWrapper types/type classes 
> could be introduced to overcome this limitation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker

2018-11-19 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692496#comment-16692496
 ] 

Boyang Chen commented on KAFKA-7641:


[Stanislav 
Kozlovski|x-note://3AF09C0A-0D9D-4EFC-8F4D-4C82DC12DCE6/jira/secure/ViewProfile.jspa?name=enether]
 So sorry Stanislav for the confusion on my wording! Stanislav and I synced 
offline that I would be working on this Jira while he will take the member id 
change. Thanks a lot for your understanding here!

 

> Add `consumer.group.max.size` to cap consumer metadata size on broker
> -
>
> Key: KAFKA-7641
> URL: https://issues.apache.org/jira/browse/KAFKA-7641
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, 
> Jason concluded an edge case of current consumer protocol which could cause 
> memory burst on broker side:
> ```the case we observed in practice was caused by a consumer that was slow to 
> rejoin the group after a rebalance had begun. At the same time, there were 
> new members that were trying to join the group for the first time. The 
> request timeout was significantly lower than the rebalance timeout, so the 
> JoinGroup of the new members kept timing out. The timeout caused a retry and 
> the group size eventually become quite large because we could not detect the 
> fact that the new members were no longer there.```
> Since many disorganized join group requests are spamming the group metadata, 
> we should define a cap on broker side to avoid one consumer group from 
> growing too large. So far I feel it's appropriate to introduce this as a 
> server config since most times this value is only dealing with error 
> scenarios, client users shouldn't worry about this config.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker

2018-11-19 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692496#comment-16692496
 ] 

Boyang Chen edited comment on KAFKA-7641 at 11/20/18 12:52 AM:
---

[~enether] So sorry Stanislav for the confusion on my wording! Stanislav and I 
synced offline that I would be working on this Jira while he will take the 
member id change. Thanks a lot for your understanding here!

 


was (Author: bchen225242):
[Stanislav 
Kozlovski|x-note://3AF09C0A-0D9D-4EFC-8F4D-4C82DC12DCE6/jira/secure/ViewProfile.jspa?name=enether]
 So sorry Stanislav for the confusion on my wording! Stanislav and I synced 
offline that I would be working on this Jira while he will take the member id 
change. Thanks a lot for your understanding here!

 

> Add `consumer.group.max.size` to cap consumer metadata size on broker
> -
>
> Key: KAFKA-7641
> URL: https://issues.apache.org/jira/browse/KAFKA-7641
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, 
> Jason concluded an edge case of current consumer protocol which could cause 
> memory burst on broker side:
> ```the case we observed in practice was caused by a consumer that was slow to 
> rejoin the group after a rebalance had begun. At the same time, there were 
> new members that were trying to join the group for the first time. The 
> request timeout was significantly lower than the rebalance timeout, so the 
> JoinGroup of the new members kept timing out. The timeout caused a retry and 
> the group size eventually become quite large because we could not detect the 
> fact that the new members were no longer there.```
> Since many disorganized join group requests are spamming the group metadata, 
> we should define a cap on broker side to avoid one consumer group from 
> growing too large. So far I feel it's appropriate to introduce this as a 
> server config since most times this value is only dealing with error 
> scenarios, client users shouldn't worry about this config.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7566) Add sidecar job to leader (or a random single follower) only

2018-11-19 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692501#comment-16692501
 ] 

Boyang Chen commented on KAFKA-7566:


[~guozhang] Yep, for example our KStream application publishes data snapshots 
every 10 minute to S3. Periodically we want to gc the outdated ones which could 
be done on a separate thread. The caveat is that the gc job will be scheduled 
on all instances and sometimes cause race conditions. We could use zk to 
mitigate the issue, but just want to reach out to see if there would be some 
similar request on stream side, which is a leader-only, non-blocking main 
processing feature.

> Add sidecar job to leader (or a random single follower) only
> 
>
> Key: KAFKA-7566
> URL: https://issues.apache.org/jira/browse/KAFKA-7566
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Priority: Minor
>
> Hey there,
> recently we need to add an archive job to a streaming application. The caveat 
> is that we need to make sure only one instance is doing this task to avoid 
> potential race condition, and we also don't want to schedule it as a regular 
> stream task so that we will be blocking normal streaming operation. 
> Although we could do so by doing a zk lease, I'm raising the case here since 
> this could be some potential use case for streaming job also. For example, 
> there are some `leader specific` operation we could schedule in DSL instead 
> of adhoc manner.
> Let me know if you think this makes sense to you, thank you!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7577) Semantics of Table-Table Join with Null Message Are Incorrect

2018-11-19 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692535#comment-16692535
 ] 

Matthias J. Sax commented on KAFKA-7577:


An `aggregation()` is designed like in SQL, that also drops `null`s.

Note, that while an `aggregation` *returns* a KTable, the *input* is a KStream 
that represents fact, but not updates... That is the main difference: if you 
read a topic directly into a `KTable`, you tell KafkaStreams to use update 
semantics including tombstone semantics. However, when you aggregate a 
`KStream` there is no update/tombstone semantics for the *input* records.
{quote}how are deletions suppose to propagate 
{quote}
There is no reason to propagate them, as this concept does not exist in a 
KStream. Also note, that `KStream#groupByKey()` returns a `KGrouped*Stream*` 
but not a `KGroupedTable` (that would be result of `KTable#groupBy()`). It 
might seem subtle, but from my point of view an `aggregate()` is not the same 
as an UPSERT (what you want).

If you want to treat the data as table updates, you should read the topic as a 
table directly (from a semantic point of view) and create a derived table with 
`KTable#groupBy()` to repartition accordingly. If you want to avoid the 
materialization overhead for the first table, you can also do 
`stream.map(/*select new key*/).to("my-new-changelog-topic"); KTable table = 
builder.table("my-new-changelog-topic")` instead.

We are working on an optimization atm, that will allow to read a topic as 
`KTable` directly, while the KTable will only be materialized if necessary. 
This should address your use case, as we would only materialize the second 
KTable. For now, you can use your own workaround (even if I think you miss use 
the API, as I believe an aggregation is not the right operator to implement an 
UPSERT—my personal opinion only), or with the intermediate topic as suggested. 
I also want to point out, that having an UPSERT operator might actually be 
useful. But instead of changing `aggregate()` I would rather add 
`KStream#toTable()` operator (similar to, or reverse of, `KTable#toStream()`) 
for this use case. Because it's semantically two different things, there should 
be two different operators for both IMHO.

Does this make sense?

> Semantics of Table-Table Join with Null Message Are Incorrect
> -
>
> Key: KAFKA-7577
> URL: https://issues.apache.org/jira/browse/KAFKA-7577
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Daren Thomas
>Priority: Major
>
> Observed behavior of Table-Table join with a Null Message does not match the 
> semantics described in the documentation 
> ([https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-ktable-join).]
>   The expectation is:
>  * Message A results in [A, null] from the Left Join
>  * Message null (tombstone) results in null (tombstone) from the Left Join
>  The observed behavior was that the null (tombstone) message did not pass 
> through the Left Join to the output topic like expected.  This behavior was 
> observed with and without caching enabled, against a test harness, and 
> against a local Confluent 5.0.0 platform.  It was also observed that the 
> KTableKTableLeftJoinProcessor.process() function was not called for the null 
> (tombstone) message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7654) Relax requirements on serializing-only methods.

2018-11-19 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692570#comment-16692570
 ] 

Matthias J. Sax commented on KAFKA-7654:


Thanks for sharing your thoughts. I think it basically makes sense, however, 
there are couple of things to consider:
{quote}then coming up with a deserializer for a non-trivial format is way too 
much work for something that's not going to be used
{quote}
For most cases, any type you use in KafkaStreams, most likely needs to be both, 
serialized and deserialized. I think it's only corner cases, for which one 
reads or writes a type only. Because of this assumption, the believe is, that 
all types will have Serdes anyway, and it should not be any extra work to 
create the serdes for `Consumed` (of course, there are always exception).
{quote}So it's two-fold, first the API is puzzling (a `Produced` requiring a 
`Deserializer` is a smell to me)

Any good serialization library actually separate the 2 concerns 
(serialization/deserialization) and acknowledge the fact that you don't always 
need both.
{quote}
I see your point here, and even with my argument from above (ie, not expected 
overhead as a serde is created anyway most likely), a source or sink could 
still just accept a serializer or deserializer.

However, I want to point out, that in upcoming `2.1` we actually pass `Serdes` 
downstream. For example, if you read a topic as KStream and call 
`stream.groupByKey().reduce().toStream().to()`, you don't need to specify the 
serde on reduce() a second time, or on `to()` a third time, because the type 
did no change and the `builder.stream()` provided serde will be reused. Thus, 
if we change the API and only get a deserializer in `builder.stream()`, we 
cannot do this any longer what seems to be a step backward.
{quote}The materialized is mutated to get the serdes from the consumed, which 
is counter-intuitive (I guess that's why it's explained in the documentation). 
I don't understand why the materialized need to have the exact same serde as 
the consumed used to read `topic`, but assuming there's a good reason there are 
ways to enforce this at the type level rather than overriding pre-existing 
values.
{quote}
The idea is, to avoid specifying the Serde twice (what would be redundant)—yes, 
this must be documented because the API allows two ways to specify serdes 
(admittedly not optimal). The data from the input topic is of the same type as 
the data stored in the table and thus, both Serdes (Consumed and Materialized) 
must be of the same type. The type system does enforce this, but if we would 
required that users specify the same Serde twice (on Consumed to read data from 
the topic and on Materialized to read/write data to/from the store) it would be 
error prone (assume the user only specified it once, the other one would fall 
back to the Serde from the StreamsConfig that is of a different type—otherwise, 
it would not be required to overwrite it in the first place).

I agree, that for the `table()` example, the API offers two ways to specify 
Serde what is not optimal. However, because `Materialized` is used as parameter 
for other methods, too, it needs the ability to accept serdes as Materialized 
might be the only config parameter specified (we could have introduced a new 
class for both cases, but decided to accept the "hit" to avoid too many 
different config types—we have already many... — it's debatable if this was the 
best decision).

For your suggestion about `KStream#to(String, Serializer, Serializer)`—this 
was the API in older releases, but had the major drawback, that methods with a 
lot of optional parameters, resulted in a lot of overloads what confused a lot 
of users and was not easy to use. Thus, with 
[KIP-182|https://cwiki.apache.org/confluence/display/KAFKA/KIP-182%3A+Reduce+Streams+DSL+overloads+and+allow+easier+use+of+custom+storage+engines],
 we change the API to reduce the number of overloads: one with all mandatory 
arguments, and a second one with all mandatory argument plus a config parameter 
that takes all optional arguments and that follows a builder-like pattern (some 
method still have more then two overloads unfortunately: we tried to reuse 
config classes instead of creating more and more—again a design decision that 
is debatable). To have a unique API experience, we decided to use this pattern 
for all methods (also for those, with fewer optional parameters for which the 
change is not necessarily an improvement). A unique API experience seemed to be 
more valuable.

Thus, while I see your point, there are many trade-offs to consider and I am 
not convinced atm, that your suggestion would improve the API from a holistic 
point of view. Let me know what you think about this. I just wanted to give you 
some context on why the API is design in the current way, and that changing 
something (even if it seems 

[jira] [Commented] (KAFKA-6567) KStreamWindowReduce can be replaced by KStreamWindowAggregate

2018-11-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692697#comment-16692697
 ] 

ASF GitHub Bot commented on KAFKA-6567:
---

guozhangwang closed pull request #5922: KAFKA-6567: Remove KStreamWindowReducer
URL: https://github.com/apache/kafka/pull/5922
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
deleted file mode 100644
index babe3ebd836..000
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Reducer;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windows;
-
-class KStreamWindowReduce extends 
KStreamWindowAggregate {
-KStreamWindowReduce(final Windows windows,
-final String storeName,
-final Reducer reducer) {
-super(
-windows,
-storeName,
-() -> null,
-(key, newValue, oldValue) -> oldValue == null ? newValue : 
reducer.apply(oldValue, newValue)
-);
-}
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index ecfe1554815..dfead3e6336 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -147,7 +147,7 @@
 return aggregateBuilder.build(
 REDUCE_NAME,
 materialize(materializedInternal),
-new KStreamWindowReduce<>(windows, 
materializedInternal.storeName(), reducer),
+new KStreamWindowAggregate<>(windows, 
materializedInternal.storeName(), aggregateBuilder.reduceInitializer, 
aggregatorForReducer(reducer)),
 materializedInternal.isQueryable(),
 materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
 materializedInternal.valueSerde());
@@ -216,4 +216,8 @@
 }
 return builder;
 }
+
+private Aggregator aggregatorForReducer(final Reducer reducer) 
{
+return (aggKey, value, aggregate) -> aggregate == null ? value : 
reducer.apply(aggregate, value);
+}
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
deleted file mode 100644
index 634cb2f35a5..000
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations unde

[jira] [Created] (KAFKA-7657) Invalid reporting of StreamState in KafkaStreams application

2018-11-19 Thread Thomas Crowley (JIRA)
Thomas Crowley created KAFKA-7657:
-

 Summary: Invalid reporting of StreamState in KafkaStreams 
application
 Key: KAFKA-7657
 URL: https://issues.apache.org/jira/browse/KAFKA-7657
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.0.1
Reporter: Thomas Crowley


We have a streams application with 3 instances running, two of which are 
reporting the state of `REBALANCING` even after they have been running for 
days. Restarting the application has no effect on the stream state.

This seems suspect because each instance appears to be processing messages, and 
the `kafka-consumer-groups` CLI tool reports no offset lag in any of the 
partitions assigned to the `REBALANCING` consumers. Each partition seems to be 
processing an equal amount of records too.

Inspecting the `state.dir` on disk, it looks like the RocksDB state has been 
built and hovers at the expected size on disk.

This problem has persisted for us after we rebuilt our Kafka cluster and reset 
topics + consumer groups in our dev environment.

There is nothing in the logs (with level set to `DEBUG`) in both the broker or 
the application that suggests something exceptional has happened causing the 
application to be stuck `REBALANCING`

We are also running multiple streaming applications where this problem does not 
exist.

Two differences between this application and our other streaming applications 
are:
 * We have `processing.guarantee` set to `exactly_once`
 * We are using a `ValueTransformer` which fetches from and puts data on a 
windowed state store

The `REBALANCING` state is returned from both polling the `state` method of our 
`KafkaStreams` instance, and our custom metric which is derived from some logic 
in a `KafkaStreams.StateListener` class attached via the `setStateListener` 
method.

 

While I have provided a bit of context, before I reply with some reproducible 
code - is there a simple way in which I can determine that my streams 
application is in a `RUNNING` state without relying on the same mechanisms as 
used above?

Further, given that it seems like my application is actually running - could 
this perhaps be a bug to do with how the stream state is being reported (in the 
context of a transactional stream using the processor API)?

 

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application

2018-11-19 Thread Thomas Crowley (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Crowley updated KAFKA-7657:
--
Summary: Invalid reporting of stream state in Kafka streams application  
(was: Invalid reporting of StreamState in KafkaStreams application)

> Invalid reporting of stream state in Kafka streams application
> --
>
> Key: KAFKA-7657
> URL: https://issues.apache.org/jira/browse/KAFKA-7657
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Thomas Crowley
>Priority: Minor
>
> We have a streams application with 3 instances running, two of which are 
> reporting the state of `REBALANCING` even after they have been running for 
> days. Restarting the application has no effect on the stream state.
> This seems suspect because each instance appears to be processing messages, 
> and the `kafka-consumer-groups` CLI tool reports no offset lag in any of the 
> partitions assigned to the `REBALANCING` consumers. Each partition seems to 
> be processing an equal amount of records too.
> Inspecting the `state.dir` on disk, it looks like the RocksDB state has been 
> built and hovers at the expected size on disk.
> This problem has persisted for us after we rebuilt our Kafka cluster and 
> reset topics + consumer groups in our dev environment.
> There is nothing in the logs (with level set to `DEBUG`) in both the broker 
> or the application that suggests something exceptional has happened causing 
> the application to be stuck `REBALANCING`
> We are also running multiple streaming applications where this problem does 
> not exist.
> Two differences between this application and our other streaming applications 
> are:
>  * We have `processing.guarantee` set to `exactly_once`
>  * We are using a `ValueTransformer` which fetches from and puts data on a 
> windowed state store
> The `REBALANCING` state is returned from both polling the `state` method of 
> our `KafkaStreams` instance, and our custom metric which is derived from some 
> logic in a `KafkaStreams.StateListener` class attached via the 
> `setStateListener` method.
>  
> While I have provided a bit of context, before I reply with some reproducible 
> code - is there a simple way in which I can determine that my streams 
> application is in a `RUNNING` state without relying on the same mechanisms as 
> used above?
> Further, given that it seems like my application is actually running - could 
> this perhaps be a bug to do with how the stream state is being reported (in 
> the context of a transactional stream using the processor API)?
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application

2018-11-19 Thread Thomas Crowley (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Crowley updated KAFKA-7657:
--
Description: 
We have a streams application with 3 instances running, two of which are 
reporting the state of REBALANCING` even after they have been running for days. 
Restarting the application has no effect on the stream state.

This seems suspect because each instance appears to be processing messages, and 
the `kafka-consumer-groups` CLI tool reports no offset lag in any of the 
partitions assigned to the `REBALANCING` consumers. Each partition seems to be 
processing an equal amount of records too.

Inspecting the `state.dir` on disk, it looks like the RocksDB state has been 
built and hovers at the expected size on disk.

This problem has persisted for us after we rebuilt our Kafka cluster and reset 
topics + consumer groups in our dev environment.

There is nothing in the logs (with level set to `DEBUG`) in both the broker or 
the application that suggests something exceptional has happened causing the 
application to be stuck `REBALANCING`

We are also running multiple streaming applications where this problem does not 
exist.

Two differences between this application and our other streaming applications 
are:
 * We have `processing.guarantee` set to `exactly_once`
 * We are using a `ValueTransformer` which fetches from and puts data on a 
windowed state store

The `REBALANCING` state is returned from both polling the `state` method of our 
`KafkaStreams` instance, and our custom metric which is derived from some logic 
in a `KafkaStreams.StateListener` class attached via the `setStateListener` 
method.

 

While I have provided a bit of context, before I reply with some reproducible 
code - is there a simple way in which I can determine that my streams 
application is in a `RUNNING` state without relying on the same mechanisms as 
used above?

Further, given that it seems like my application is actually running - could 
this perhaps be a bug to do with how the stream state is being reported (in the 
context of a transactional stream using the processor API)?

 

 

 

 

  was:
We have a streams application with 3 instances running, two of which are 
reporting the state of `REBALANCING` even after they have been running for 
days. Restarting the application has no effect on the stream state.

This seems suspect because each instance appears to be processing messages, and 
the `kafka-consumer-groups` CLI tool reports no offset lag in any of the 
partitions assigned to the `REBALANCING` consumers. Each partition seems to be 
processing an equal amount of records too.

Inspecting the `state.dir` on disk, it looks like the RocksDB state has been 
built and hovers at the expected size on disk.

This problem has persisted for us after we rebuilt our Kafka cluster and reset 
topics + consumer groups in our dev environment.

There is nothing in the logs (with level set to `DEBUG`) in both the broker or 
the application that suggests something exceptional has happened causing the 
application to be stuck `REBALANCING`

We are also running multiple streaming applications where this problem does not 
exist.

Two differences between this application and our other streaming applications 
are:
 * We have `processing.guarantee` set to `exactly_once`
 * We are using a `ValueTransformer` which fetches from and puts data on a 
windowed state store

The `REBALANCING` state is returned from both polling the `state` method of our 
`KafkaStreams` instance, and our custom metric which is derived from some logic 
in a `KafkaStreams.StateListener` class attached via the `setStateListener` 
method.

 

While I have provided a bit of context, before I reply with some reproducible 
code - is there a simple way in which I can determine that my streams 
application is in a `RUNNING` state without relying on the same mechanisms as 
used above?

Further, given that it seems like my application is actually running - could 
this perhaps be a bug to do with how the stream state is being reported (in the 
context of a transactional stream using the processor API)?

 

 

 

 


> Invalid reporting of stream state in Kafka streams application
> --
>
> Key: KAFKA-7657
> URL: https://issues.apache.org/jira/browse/KAFKA-7657
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Thomas Crowley
>Priority: Minor
>
> We have a streams application with 3 instances running, two of which are 
> reporting the state of REBALANCING` even after they have been running for 
> days. Restarting the application has no effect on the stream state.
> This seems suspect because each instance appears to be processing messages, 
> and the `kafka-consumer-groups` CLI tool r

[jira] [Updated] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application

2018-11-19 Thread Thomas Crowley (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Crowley updated KAFKA-7657:
--
Description: 
We have a streams application with 3 instances running, two of which are 
reporting the state of REBALANCING even after they have been running for days. 
Restarting the application has no effect on the stream state.

This seems suspect because each instance appears to be processing messages, and 
the kafka-consumer-groups CLI tool reports no offset lag in any of the 
partitions assigned to the REBALANCING consumers. Each partition seems to be 
processing an equal amount of records too.

Inspecting the state.dir on disk, it looks like the RocksDB state has been 
built and hovers at the expected size on disk.

This problem has persisted for us after we rebuilt our Kafka cluster and reset 
topics + consumer groups in our dev environment.

There is nothing in the logs (with level set to DEBUG) in both the broker or 
the application that suggests something exceptional has happened causing the 
application to be stuck REBALANCING.

We are also running multiple streaming applications where this problem does not 
exist.

Two differences between this application and our other streaming applications 
are:
 * We have processing.guarantee set to exactly_once
 * We are using a ValueTransformer which fetches from and puts data on a 
windowed state store

The REBALANCING state is returned from both polling the state method of our 
KafkaStreams instance, and our custom metric which is derived from some logic 
in a KafkaStreams.StateListener class attached via the setStateListener method.

 

While I have provided a bit of context, before I reply with some reproducible 
code - is there a simple way in which I can determine that my streams 
application is in a RUNNING state without relying on the same mechanisms as 
used above?

Further, given that it seems like my application is actually running - could 
this perhaps be a bug to do with how the stream state is being reported (in the 
context of a transactional stream using the processor API)?

 

 

 

 

  was:
We have a streams application with 3 instances running, two of which are 
reporting the state of REBALANCING` even after they have been running for days. 
Restarting the application has no effect on the stream state.

This seems suspect because each instance appears to be processing messages, and 
the `kafka-consumer-groups` CLI tool reports no offset lag in any of the 
partitions assigned to the `REBALANCING` consumers. Each partition seems to be 
processing an equal amount of records too.

Inspecting the `state.dir` on disk, it looks like the RocksDB state has been 
built and hovers at the expected size on disk.

This problem has persisted for us after we rebuilt our Kafka cluster and reset 
topics + consumer groups in our dev environment.

There is nothing in the logs (with level set to `DEBUG`) in both the broker or 
the application that suggests something exceptional has happened causing the 
application to be stuck `REBALANCING`

We are also running multiple streaming applications where this problem does not 
exist.

Two differences between this application and our other streaming applications 
are:
 * We have `processing.guarantee` set to `exactly_once`
 * We are using a `ValueTransformer` which fetches from and puts data on a 
windowed state store

The `REBALANCING` state is returned from both polling the `state` method of our 
`KafkaStreams` instance, and our custom metric which is derived from some logic 
in a `KafkaStreams.StateListener` class attached via the `setStateListener` 
method.

 

While I have provided a bit of context, before I reply with some reproducible 
code - is there a simple way in which I can determine that my streams 
application is in a `RUNNING` state without relying on the same mechanisms as 
used above?

Further, given that it seems like my application is actually running - could 
this perhaps be a bug to do with how the stream state is being reported (in the 
context of a transactional stream using the processor API)?

 

 

 

 


> Invalid reporting of stream state in Kafka streams application
> --
>
> Key: KAFKA-7657
> URL: https://issues.apache.org/jira/browse/KAFKA-7657
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Thomas Crowley
>Priority: Minor
>
> We have a streams application with 3 instances running, two of which are 
> reporting the state of REBALANCING even after they have been running for 
> days. Restarting the application has no effect on the stream state.
> This seems suspect because each instance appears to be processing messages, 
> and the kafka-consumer-groups CLI tool reports no offset lag in any of th

[jira] [Updated] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application

2018-11-19 Thread Thomas Crowley (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Crowley updated KAFKA-7657:
--
Description: 
We have a streams application with 3 instances running, two of which are 
reporting the state of REBALANCING even after they have been running for days. 
Restarting the application has no effect on the stream state.

This seems suspect because each instance appears to be processing messages, and 
the kafka-consumer-groups CLI tool reports hardly any offset lag in any of the 
partitions assigned to the REBALANCING consumers. Each partition seems to be 
processing an equal amount of records too.

Inspecting the state.dir on disk, it looks like the RocksDB state has been 
built and hovers at the expected size on disk.

This problem has persisted for us after we rebuilt our Kafka cluster and reset 
topics + consumer groups in our dev environment.

There is nothing in the logs (with level set to DEBUG) in both the broker or 
the application that suggests something exceptional has happened causing the 
application to be stuck REBALANCING.

We are also running multiple streaming applications where this problem does not 
exist.

Two differences between this application and our other streaming applications 
are:
 * We have processing.guarantee set to exactly_once
 * We are using a ValueTransformer which fetches from and puts data on a 
windowed state store

The REBALANCING state is returned from both polling the state method of our 
KafkaStreams instance, and our custom metric which is derived from some logic 
in a KafkaStreams.StateListener class attached via the setStateListener method.

 

While I have provided a bit of context, before I reply with some reproducible 
code - is there a simple way in which I can determine that my streams 
application is in a RUNNING state without relying on the same mechanisms as 
used above?

Further, given that it seems like my application is actually running - could 
this perhaps be a bug to do with how the stream state is being reported (in the 
context of a transactional stream using the processor API)?

 

 

 

 

  was:
We have a streams application with 3 instances running, two of which are 
reporting the state of REBALANCING even after they have been running for days. 
Restarting the application has no effect on the stream state.

This seems suspect because each instance appears to be processing messages, and 
the kafka-consumer-groups CLI tool reports no offset lag in any of the 
partitions assigned to the REBALANCING consumers. Each partition seems to be 
processing an equal amount of records too.

Inspecting the state.dir on disk, it looks like the RocksDB state has been 
built and hovers at the expected size on disk.

This problem has persisted for us after we rebuilt our Kafka cluster and reset 
topics + consumer groups in our dev environment.

There is nothing in the logs (with level set to DEBUG) in both the broker or 
the application that suggests something exceptional has happened causing the 
application to be stuck REBALANCING.

We are also running multiple streaming applications where this problem does not 
exist.

Two differences between this application and our other streaming applications 
are:
 * We have processing.guarantee set to exactly_once
 * We are using a ValueTransformer which fetches from and puts data on a 
windowed state store

The REBALANCING state is returned from both polling the state method of our 
KafkaStreams instance, and our custom metric which is derived from some logic 
in a KafkaStreams.StateListener class attached via the setStateListener method.

 

While I have provided a bit of context, before I reply with some reproducible 
code - is there a simple way in which I can determine that my streams 
application is in a RUNNING state without relying on the same mechanisms as 
used above?

Further, given that it seems like my application is actually running - could 
this perhaps be a bug to do with how the stream state is being reported (in the 
context of a transactional stream using the processor API)?

 

 

 

 


> Invalid reporting of stream state in Kafka streams application
> --
>
> Key: KAFKA-7657
> URL: https://issues.apache.org/jira/browse/KAFKA-7657
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Thomas Crowley
>Priority: Minor
>
> We have a streams application with 3 instances running, two of which are 
> reporting the state of REBALANCING even after they have been running for 
> days. Restarting the application has no effect on the stream state.
> This seems suspect because each instance appears to be processing messages, 
> and the kafka-consumer-groups CLI tool reports hardly any offset lag in any 
> of the partitio

[jira] [Created] (KAFKA-7658) Add KStream#toTable to the Streams DSL

2018-11-19 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-7658:


 Summary: Add KStream#toTable to the Streams DSL
 Key: KAFKA-7658
 URL: https://issues.apache.org/jira/browse/KAFKA-7658
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7658) Add KStream#toTable to the Streams DSL

2018-11-19 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7658:
-
Description: 
We'd like to add a new API to the KStream object of the Streams DSL:

{code}
KTable KStream.toTable()

KTable KStream.toTable(Materialized)
{code}

The function re-interpret the event stream {{KStream}} as a changelog stream 
{{KTable}}. Note that this should NOT be treated as a syntax-sugar as a dummy 
{{KStream.reduce()}} function which always take the new value, as it has the 
following difference: 

1) an aggregation operator of {{KStream}} is for aggregating a event stream 
into an evolving table, which will drop null-values from the input event 
stream; whereas a {{toTable}} function will completely change the semantics of 
the input stream from event stream to changelog stream, and null-values will 
still be serialized, and if the resulted bytes are also null they will be 
interpreted as "deletes" to the materialized KTable (i.e. tombstones in the 
changelog stream).

2) the aggregation result {{KTable}} will always be materialized, whereas 
{{toTable}} resulted KTable may only be materialized if the overloaded function 
with Materialized is used (and if optimization is turned on it may still be 
only logically materialized if the queryable name is not set).

Therefore, for users who want to take a event stream into a changelog stream 
(no matter why they cannot read from the source topic as a changelog stream 
{{KTable}} at the beginning), they should be using this new API instead of the 
dummy reduction function.

> Add KStream#toTable to the Streams DSL
> --
>
> Key: KAFKA-7658
> URL: https://issues.apache.org/jira/browse/KAFKA-7658
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie
>
> We'd like to add a new API to the KStream object of the Streams DSL:
> {code}
> KTable KStream.toTable()
> KTable KStream.toTable(Materialized)
> {code}
> The function re-interpret the event stream {{KStream}} as a changelog stream 
> {{KTable}}. Note that this should NOT be treated as a syntax-sugar as a dummy 
> {{KStream.reduce()}} function which always take the new value, as it has the 
> following difference: 
> 1) an aggregation operator of {{KStream}} is for aggregating a event stream 
> into an evolving table, which will drop null-values from the input event 
> stream; whereas a {{toTable}} function will completely change the semantics 
> of the input stream from event stream to changelog stream, and null-values 
> will still be serialized, and if the resulted bytes are also null they will 
> be interpreted as "deletes" to the materialized KTable (i.e. tombstones in 
> the changelog stream).
> 2) the aggregation result {{KTable}} will always be materialized, whereas 
> {{toTable}} resulted KTable may only be materialized if the overloaded 
> function with Materialized is used (and if optimization is turned on it may 
> still be only logically materialized if the queryable name is not set).
> Therefore, for users who want to take a event stream into a changelog stream 
> (no matter why they cannot read from the source topic as a changelog stream 
> {{KTable}} at the beginning), they should be using this new API instead of 
> the dummy reduction function.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7658) Add KStream#toTable to the Streams DSL

2018-11-19 Thread Jan Filipiak (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692791#comment-16692791
 ] 

Jan Filipiak commented on KAFKA-7658:
-

[~guozhang] I remember our conversations when DSL 1.0 was designed and you were 
heavily against this. Why the change now?

> Add KStream#toTable to the Streams DSL
> --
>
> Key: KAFKA-7658
> URL: https://issues.apache.org/jira/browse/KAFKA-7658
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie
>
> We'd like to add a new API to the KStream object of the Streams DSL:
> {code}
> KTable KStream.toTable()
> KTable KStream.toTable(Materialized)
> {code}
> The function re-interpret the event stream {{KStream}} as a changelog stream 
> {{KTable}}. Note that this should NOT be treated as a syntax-sugar as a dummy 
> {{KStream.reduce()}} function which always take the new value, as it has the 
> following difference: 
> 1) an aggregation operator of {{KStream}} is for aggregating a event stream 
> into an evolving table, which will drop null-values from the input event 
> stream; whereas a {{toTable}} function will completely change the semantics 
> of the input stream from event stream to changelog stream, and null-values 
> will still be serialized, and if the resulted bytes are also null they will 
> be interpreted as "deletes" to the materialized KTable (i.e. tombstones in 
> the changelog stream).
> 2) the aggregation result {{KTable}} will always be materialized, whereas 
> {{toTable}} resulted KTable may only be materialized if the overloaded 
> function with Materialized is used (and if optimization is turned on it may 
> still be only logically materialized if the queryable name is not set).
> Therefore, for users who want to take a event stream into a changelog stream 
> (no matter why they cannot read from the source topic as a changelog stream 
> {{KTable}} at the beginning), they should be using this new API instead of 
> the dummy reduction function.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7658) Add KStream#toTable to the Streams DSL

2018-11-19 Thread Jan Filipiak (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16692791#comment-16692791
 ] 

Jan Filipiak edited comment on KAFKA-7658 at 11/20/18 7:46 AM:
---

[~guozhang] I remember our conversations when DSL 1.0 was designed and you were 
heavily against this. Why the change now? 

 

Will materialization still be determined lazily in case of the no parameter 
Method? Will it maintain 2 statestores like all the methods having a 
materialized parameter?


was (Author: jfilipiak):
[~guozhang] I remember our conversations when DSL 1.0 was designed and you were 
heavily against this. Why the change now?

> Add KStream#toTable to the Streams DSL
> --
>
> Key: KAFKA-7658
> URL: https://issues.apache.org/jira/browse/KAFKA-7658
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie
>
> We'd like to add a new API to the KStream object of the Streams DSL:
> {code}
> KTable KStream.toTable()
> KTable KStream.toTable(Materialized)
> {code}
> The function re-interpret the event stream {{KStream}} as a changelog stream 
> {{KTable}}. Note that this should NOT be treated as a syntax-sugar as a dummy 
> {{KStream.reduce()}} function which always take the new value, as it has the 
> following difference: 
> 1) an aggregation operator of {{KStream}} is for aggregating a event stream 
> into an evolving table, which will drop null-values from the input event 
> stream; whereas a {{toTable}} function will completely change the semantics 
> of the input stream from event stream to changelog stream, and null-values 
> will still be serialized, and if the resulted bytes are also null they will 
> be interpreted as "deletes" to the materialized KTable (i.e. tombstones in 
> the changelog stream).
> 2) the aggregation result {{KTable}} will always be materialized, whereas 
> {{toTable}} resulted KTable may only be materialized if the overloaded 
> function with Materialized is used (and if optimization is turned on it may 
> still be only logically materialized if the queryable name is not set).
> Therefore, for users who want to take a event stream into a changelog stream 
> (no matter why they cannot read from the source topic as a changelog stream 
> {{KTable}} at the beginning), they should be using this new API instead of 
> the dummy reduction function.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)