[jira] [Updated] (KAFKA-6777) Wrong reaction on Out Of Memory situation

2018-04-11 Thread Seweryn Habdank-Wojewodzki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seweryn Habdank-Wojewodzki updated KAFKA-6777:
--
Description: 
Dears,

We already encountered many times problems related to Out Of Memory situation 
in Kafka Broker and streaming clients.

The scenario is the following.

When Kafka Broker (or Streaming Client) is under load and has too less memory, 
there are no errors in server logs. One can see some cryptic entries in GC 
logs, but they are definitely not self-explaining.

Kafka Broker (and Streaming Clients) works further. Later we see in JMX 
monitoring, that JVM uses more and more time in GC. In our case it grows from 
e.g. 1% to 80%-90% of CPU time is used by GC.

Next software collapses into zombie mode – process in not ending. In such a 
case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse 
Kafka treats such a zombie process normal and somewhat sends messages, which 
are in fact getting lost, also the cluster is not excluding broken nodes. The 
question is how to configure Kafka to really terminate the JVM and not remain 
in zombie mode, to give a chance to other nodes to know, that something is dead.

I would expect that in Out Of Memory situation JVM is ended if not graceful 
than at least process is crashed.

  was:
Dears,

We already encountered many times problems related to Out Of Memory situation 
in Kafka Broker and streaming clients.

The scenario is the following.

When Kafka Broker (or Streaming Client) is under load and has too less memory, 
there are no errors in server logs. One can see some cryptic entries in GC 
logs, but they are definitely not self-explaining.

Kafka Broker (and Streaming Clients) works further. Later we see in JMX 
monitoring, that JVM uses more and more time in GC. In our case it grows from 
e. 1% to 80%-90% of CPU time is used by GC.

Next software collapses into zombie mode – process in not ending. In such a 
case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse 
Kafka treats such a zombie process normal and somewhat sends messages, which 
are in fact getting lost, also the cluster is not excluding broken nodes. The 
question is how to configure Kafka to really terminate the JVM and not remain 
in zombie mode, to give a chance to other nodes to know, that something is dead.

I would expect that in Out Of Memory situation JVM is ended if not graceful 
than at least process is crashed.


> Wrong reaction on Out Of Memory situation
> -
>
> Key: KAFKA-6777
> URL: https://issues.apache.org/jira/browse/KAFKA-6777
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
>
> Dears,
> We already encountered many times problems related to Out Of Memory situation 
> in Kafka Broker and streaming clients.
> The scenario is the following.
> When Kafka Broker (or Streaming Client) is under load and has too less 
> memory, there are no errors in server logs. One can see some cryptic entries 
> in GC logs, but they are definitely not self-explaining.
> Kafka Broker (and Streaming Clients) works further. Later we see in JMX 
> monitoring, that JVM uses more and more time in GC. In our case it grows from 
> e.g. 1% to 80%-90% of CPU time is used by GC.
> Next software collapses into zombie mode – process in not ending. In such a 
> case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse 
> Kafka treats such a zombie process normal and somewhat sends messages, which 
> are in fact getting lost, also the cluster is not excluding broken nodes. The 
> question is how to configure Kafka to really terminate the JVM and not remain 
> in zombie mode, to give a chance to other nodes to know, that something is 
> dead.
> I would expect that in Out Of Memory situation JVM is ended if not graceful 
> than at least process is crashed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6777) Wrong reaction on Out Of Memory situation

2018-04-11 Thread Seweryn Habdank-Wojewodzki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seweryn Habdank-Wojewodzki updated KAFKA-6777:
--
Description: 
Dears,

We already encountered many times problems related to Out Of Memory situation 
in Kafka Broker and streaming clients.

The scenario is the following.

When Kafka Broker (or Streaming Client) is under load and has too less memory, 
there are no errors in server logs. One can see some cryptic entries in GC 
logs, but they are definitely not self-explaining.

Kafka Broker (and Streaming Clients) works further. Later we see in JMX 
monitoring, that JVM uses more and more time in GC. In our case it grows from 
e.g. 1% to 80%-90% of CPU time is used by GC.

Next, software collapses into zombie mode – process in not ending. In such a 
case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse 
Kafka treats such a zombie process normal and somewhat sends messages, which 
are in fact getting lost, also the cluster is not excluding broken nodes. The 
question is how to configure Kafka to really terminate the JVM and not remain 
in zombie mode, to give a chance to other nodes to know, that something is dead.

I would expect that in Out Of Memory situation JVM is ended if not graceful 
than at least process is crashed.

  was:
Dears,

We already encountered many times problems related to Out Of Memory situation 
in Kafka Broker and streaming clients.

The scenario is the following.

When Kafka Broker (or Streaming Client) is under load and has too less memory, 
there are no errors in server logs. One can see some cryptic entries in GC 
logs, but they are definitely not self-explaining.

Kafka Broker (and Streaming Clients) works further. Later we see in JMX 
monitoring, that JVM uses more and more time in GC. In our case it grows from 
e.g. 1% to 80%-90% of CPU time is used by GC.

Next software collapses into zombie mode – process in not ending. In such a 
case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse 
Kafka treats such a zombie process normal and somewhat sends messages, which 
are in fact getting lost, also the cluster is not excluding broken nodes. The 
question is how to configure Kafka to really terminate the JVM and not remain 
in zombie mode, to give a chance to other nodes to know, that something is dead.

I would expect that in Out Of Memory situation JVM is ended if not graceful 
than at least process is crashed.


> Wrong reaction on Out Of Memory situation
> -
>
> Key: KAFKA-6777
> URL: https://issues.apache.org/jira/browse/KAFKA-6777
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
>
> Dears,
> We already encountered many times problems related to Out Of Memory situation 
> in Kafka Broker and streaming clients.
> The scenario is the following.
> When Kafka Broker (or Streaming Client) is under load and has too less 
> memory, there are no errors in server logs. One can see some cryptic entries 
> in GC logs, but they are definitely not self-explaining.
> Kafka Broker (and Streaming Clients) works further. Later we see in JMX 
> monitoring, that JVM uses more and more time in GC. In our case it grows from 
> e.g. 1% to 80%-90% of CPU time is used by GC.
> Next, software collapses into zombie mode – process in not ending. In such a 
> case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse 
> Kafka treats such a zombie process normal and somewhat sends messages, which 
> are in fact getting lost, also the cluster is not excluding broken nodes. The 
> question is how to configure Kafka to really terminate the JVM and not remain 
> in zombie mode, to give a chance to other nodes to know, that something is 
> dead.
> I would expect that in Out Of Memory situation JVM is ended if not graceful 
> than at least process is crashed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6052) Windows: Consumers not polling when isolation.level=read_committed

2018-04-11 Thread Veera (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16430366#comment-16430366
 ] 

Veera edited comment on KAFKA-6052 at 4/11/18 7:28 AM:
---

Hi Jason

Could you please confirm the release date of either 1.1.1. or 1.2.0 ?  will 
wait if it happens in this month or else we take the fix and apply to 1.0.0 
version. 

Thanks

Veeru


was (Author: vmallavarapu):
Hi Jason

Could you please confirm the release date of either 1.1.1. or 1.2.0 ? Many 
Thanks.

> Windows: Consumers not polling when isolation.level=read_committed 
> ---
>
> Key: KAFKA-6052
> URL: https://issues.apache.org/jira/browse/KAFKA-6052
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, producer 
>Affects Versions: 0.11.0.0, 1.0.1
> Environment: Windows 10. All processes running in embedded mode.
>Reporter: Ansel Zandegran
>Assignee: Vahid Hashemian
>Priority: Major
>  Labels: transactions, windows
> Fix For: 1.2.0, 1.1.1
>
> Attachments: Prducer_Consumer.log, Separate_Logs.zip, kafka-logs.zip, 
> logFile.log
>
>
> *The same code is running fine in Linux.* I am trying to send a transactional 
> record with exactly once schematics. These are my producer, consumer and 
> broker setups. 
> public void sendWithTTemp(String topic, EHEvent event) {
> Properties props = new Properties();
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>   "localhost:9092,localhost:9093,localhost:9094");
> //props.put("bootstrap.servers", 
> "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092");
> props.put(ProducerConfig.ACKS_CONFIG, "all");
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
> props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
> props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
> props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
> props.put("transactional.id", "TID" + transactionId.incrementAndGet());
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000");
> Producer producer =
> new KafkaProducer<>(props,
> new StringSerializer(),
> new StringSerializer());
> Logger.log(this, "Initializing transaction...");
> producer.initTransactions();
> Logger.log(this, "Initializing done.");
> try {
>   Logger.log(this, "Begin transaction...");
>   producer.beginTransaction();
>   Logger.log(this, "Begin transaction done.");
>   Logger.log(this, "Sending events...");
>   producer.send(new ProducerRecord<>(topic,
>  event.getKey().toString(),
>  event.getValue().toString()));
>   Logger.log(this, "Sending events done.");
>   Logger.log(this, "Committing...");
>   producer.commitTransaction();
>   Logger.log(this, "Committing done.");
> } catch (ProducerFencedException | OutOfOrderSequenceException
> | AuthorizationException e) {
>   producer.close();
>   e.printStackTrace();
> } catch (KafkaException e) {
>   producer.abortTransaction();
>   e.printStackTrace();
> }
> producer.close();
>   }
> *In Consumer*
> I have set isolation.level=read_committed
> *In 3 Brokers*
> I'm running with the following properties
>   Properties props = new Properties();
>   props.setProperty("broker.id", "" + i);
>   props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i));
>   props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + 
> i);
>   props.setProperty("num.partitions", "1");
>   props.setProperty("zookeeper.connect", "localhost:2181");
>   props.setProperty("zookeeper.connection.timeout.ms", "6000");
>   props.setProperty("min.insync.replicas", "2");
>   props.setProperty("offsets.topic.replication.factor", "2");
>   props.setProperty("offsets.topic.num.partitions", "1");
>   props.setProperty("transaction.state.log.num.partitions", "2");
>   props.setProperty("transaction.state.log.replication.factor", "2");
>   props.setProperty("transaction.state.log.min.isr", "2");
> I am not getting any records in the consumer. When I set 
> isolation.level=read_uncommitted, I get the records. I assume that the 
> records are not getting commited. What could be the problem? log attached



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6677) Remove EOS producer config max.in.flight.request.per.connection=1

2018-04-11 Thread Jagadesh Adireddi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433723#comment-16433723
 ] 

Jagadesh Adireddi commented on KAFKA-6677:
--

Hi [~mjsax],
I am thinking to contribute to this issue. As per ticket description, by 
removing  ` 
tempProducerDefaultOverrides.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
 1);` .  from Class:  StreamsConfig will resolve the issue, or i need to dig 
more for fixing this. Kindly provide your inputs. 

> Remove EOS producer config max.in.flight.request.per.connection=1
> -
>
> Key: KAFKA-6677
> URL: https://issues.apache.org/jira/browse/KAFKA-6677
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> When EOS was introduced in 0.11, it was required to set producer config 
> max.in.flight.requests.per.connection=1 for idempotent producer.
> This limitations as fixed in 1.0 release via KAFKA-5494
> Thus, we should remove this setting in Kafka Streams if EOS get's enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6777) Wrong reaction on Out Of Memory situation

2018-04-11 Thread Mickael Maison (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433726#comment-16433726
 ] 

Mickael Maison commented on KAFKA-6777:
---

You usually do that via JVM options.

For example on Oracle's JVM, you can use: -XX:OnOutOfMemoryError="; 
"
[http://www.oracle.com/technetwork/java/javase/tech/vmoptions-jsp-140102.html]

If you set it in KAFKA_OPTS, it will be automatically picked up by the tools 
under /bin

> Wrong reaction on Out Of Memory situation
> -
>
> Key: KAFKA-6777
> URL: https://issues.apache.org/jira/browse/KAFKA-6777
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
>
> Dears,
> We already encountered many times problems related to Out Of Memory situation 
> in Kafka Broker and streaming clients.
> The scenario is the following.
> When Kafka Broker (or Streaming Client) is under load and has too less 
> memory, there are no errors in server logs. One can see some cryptic entries 
> in GC logs, but they are definitely not self-explaining.
> Kafka Broker (and Streaming Clients) works further. Later we see in JMX 
> monitoring, that JVM uses more and more time in GC. In our case it grows from 
> e.g. 1% to 80%-90% of CPU time is used by GC.
> Next, software collapses into zombie mode – process in not ending. In such a 
> case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse 
> Kafka treats such a zombie process normal and somewhat sends messages, which 
> are in fact getting lost, also the cluster is not excluding broken nodes. The 
> question is how to configure Kafka to really terminate the JVM and not remain 
> in zombie mode, to give a chance to other nodes to know, that something is 
> dead.
> I would expect that in Out Of Memory situation JVM is ended if not graceful 
> than at least process is crashed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6778) DescribeConfigs does not return error for non-existent topic

2018-04-11 Thread Magnus Edenhill (JIRA)
Magnus Edenhill created KAFKA-6778:
--

 Summary: DescribeConfigs does not return error for non-existent 
topic
 Key: KAFKA-6778
 URL: https://issues.apache.org/jira/browse/KAFKA-6778
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 1.1.0
Reporter: Magnus Edenhill


Sending a DescribeConfigsRequest with a ConfigResource(TOPIC, 
"non-existent-topic") returns a fully populated ConfigResource back in the 
response with 24 configuration entries.

A resource-level error_code of UnknownTopic.. would be expected instead.

 
{code:java}
[0081_admin / 1.143s] ConfigResource #0: type TOPIC (2), 
"rdkafkatest_rnd3df408bf5d94d696_DescribeConfigs_notexist": 24 ConfigEntries, 
error NO_ERROR ()
[0081_admin / 1.144s] #0/24: Source UNKNOWN (5): "compression.type"="producer" 
[is read-only=n, default=n, sensitive=n, synonym=n] with 1 synonym(s)

{code}
But the topic does not exist:
{code:java}
$ $KAFKA_PATH/bin/kafka-topics.sh --zookeeper $ZK_ADDRESS --list | grep 
rdkafkatest_rnd3df408bf5d94d696_DescribeConfigs_notexist ; echo $?
1

{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6777) Wrong reaction on Out Of Memory situation

2018-04-11 Thread Seweryn Habdank-Wojewodzki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seweryn Habdank-Wojewodzki updated KAFKA-6777:
--
Attachment: screenshot-1.png

> Wrong reaction on Out Of Memory situation
> -
>
> Key: KAFKA-6777
> URL: https://issues.apache.org/jira/browse/KAFKA-6777
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
> Attachments: screenshot-1.png
>
>
> Dears,
> We already encountered many times problems related to Out Of Memory situation 
> in Kafka Broker and streaming clients.
> The scenario is the following.
> When Kafka Broker (or Streaming Client) is under load and has too less 
> memory, there are no errors in server logs. One can see some cryptic entries 
> in GC logs, but they are definitely not self-explaining.
> Kafka Broker (and Streaming Clients) works further. Later we see in JMX 
> monitoring, that JVM uses more and more time in GC. In our case it grows from 
> e.g. 1% to 80%-90% of CPU time is used by GC.
> Next, software collapses into zombie mode – process in not ending. In such a 
> case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse 
> Kafka treats such a zombie process normal and somewhat sends messages, which 
> are in fact getting lost, also the cluster is not excluding broken nodes. The 
> question is how to configure Kafka to really terminate the JVM and not remain 
> in zombie mode, to give a chance to other nodes to know, that something is 
> dead.
> I would expect that in Out Of Memory situation JVM is ended if not graceful 
> than at least process is crashed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6777) Wrong reaction on Out Of Memory situation

2018-04-11 Thread Seweryn Habdank-Wojewodzki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433799#comment-16433799
 ] 

Seweryn Habdank-Wojewodzki commented on KAFKA-6777:
---

Thanks for comment.

The problem is, that either the OnOutOfMemoryError is never thrown, as the 
algorithms trying to do their best and they are loading GC, and then no message 
processing may happen.

Or the OnOutOfMemoryError is thrown, but caught in code like catch(Throwable) {}

The observed bahaviour is that at INFO level logs there is no explicit error 
like: OnOutOfMemoryError. 
I had seen in JMX metrics and there heap is out and GC is endless busy, till 
nothing is also to JMX reported.

I mean I can write a tool to reboot Kafka node, when GC load on CPU is higher 
than 40% or so, but this kind of tool is workaround and not a solution for the 
problem.

I am attaching graphs to highlight wat had happend.

 !screenshot-1.png! 

> Wrong reaction on Out Of Memory situation
> -
>
> Key: KAFKA-6777
> URL: https://issues.apache.org/jira/browse/KAFKA-6777
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
> Attachments: screenshot-1.png
>
>
> Dears,
> We already encountered many times problems related to Out Of Memory situation 
> in Kafka Broker and streaming clients.
> The scenario is the following.
> When Kafka Broker (or Streaming Client) is under load and has too less 
> memory, there are no errors in server logs. One can see some cryptic entries 
> in GC logs, but they are definitely not self-explaining.
> Kafka Broker (and Streaming Clients) works further. Later we see in JMX 
> monitoring, that JVM uses more and more time in GC. In our case it grows from 
> e.g. 1% to 80%-90% of CPU time is used by GC.
> Next, software collapses into zombie mode – process in not ending. In such a 
> case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse 
> Kafka treats such a zombie process normal and somewhat sends messages, which 
> are in fact getting lost, also the cluster is not excluding broken nodes. The 
> question is how to configure Kafka to really terminate the JVM and not remain 
> in zombie mode, to give a chance to other nodes to know, that something is 
> dead.
> I would expect that in Out Of Memory situation JVM is ended if not graceful 
> than at least process is crashed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6777) Wrong reaction on Out Of Memory situation

2018-04-11 Thread Seweryn Habdank-Wojewodzki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433799#comment-16433799
 ] 

Seweryn Habdank-Wojewodzki edited comment on KAFKA-6777 at 4/11/18 12:14 PM:
-

Thanks for comment.

The problem is, that either the OnOutOfMemoryError is never thrown, as the 
algorithms trying to do their best and they are loading GC, and then no message 
processing may happen.

Or the OnOutOfMemoryError is thrown, but caught in code like catch(Throwable) {}

The observed bahaviour is that at INFO level logs there is no explicit error 
like: OnOutOfMemoryError. 
I had seen in JMX metrics and there heap is out and GC is endless busy, till 
nothing is also to JMX reported.

I mean I can write a tool to reboot Kafka node, when GC load on CPU is higher 
than 40% or so, but this kind of tool is workaround and not a solution for the 
problem.

I am attaching graphs to highlight wat had happend.

On the image blow there are metrics from 2 Kafka nodes. The green one was 
dead/zombie when GC time reached 80%. This "drop" of value is only a 
presentation matter.

 !screenshot-1.png! 


was (Author: habdank):
Thanks for comment.

The problem is, that either the OnOutOfMemoryError is never thrown, as the 
algorithms trying to do their best and they are loading GC, and then no message 
processing may happen.

Or the OnOutOfMemoryError is thrown, but caught in code like catch(Throwable) {}

The observed bahaviour is that at INFO level logs there is no explicit error 
like: OnOutOfMemoryError. 
I had seen in JMX metrics and there heap is out and GC is endless busy, till 
nothing is also to JMX reported.

I mean I can write a tool to reboot Kafka node, when GC load on CPU is higher 
than 40% or so, but this kind of tool is workaround and not a solution for the 
problem.

I am attaching graphs to highlight wat had happend.

 !screenshot-1.png! 

> Wrong reaction on Out Of Memory situation
> -
>
> Key: KAFKA-6777
> URL: https://issues.apache.org/jira/browse/KAFKA-6777
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
> Attachments: screenshot-1.png
>
>
> Dears,
> We already encountered many times problems related to Out Of Memory situation 
> in Kafka Broker and streaming clients.
> The scenario is the following.
> When Kafka Broker (or Streaming Client) is under load and has too less 
> memory, there are no errors in server logs. One can see some cryptic entries 
> in GC logs, but they are definitely not self-explaining.
> Kafka Broker (and Streaming Clients) works further. Later we see in JMX 
> monitoring, that JVM uses more and more time in GC. In our case it grows from 
> e.g. 1% to 80%-90% of CPU time is used by GC.
> Next, software collapses into zombie mode – process in not ending. In such a 
> case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse 
> Kafka treats such a zombie process normal and somewhat sends messages, which 
> are in fact getting lost, also the cluster is not excluding broken nodes. The 
> question is how to configure Kafka to really terminate the JVM and not remain 
> in zombie mode, to give a chance to other nodes to know, that something is 
> dead.
> I would expect that in Out Of Memory situation JVM is ended if not graceful 
> than at least process is crashed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6777) Wrong reaction on Out Of Memory situation

2018-04-11 Thread Seweryn Habdank-Wojewodzki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433799#comment-16433799
 ] 

Seweryn Habdank-Wojewodzki edited comment on KAFKA-6777 at 4/11/18 12:17 PM:
-

Thanks for comment.

The problem is, that either the OnOutOfMemoryError is never thrown, as the 
algorithms are trying to do their best and they are loading GC, so later no 
message processing may happen, as most CPU is used by GC.

Or the OnOutOfMemoryError is thrown, but caught in code like catch(Throwable) {}

The observed bahaviour is that at INFO level logs there is no explicit error 
like: OnOutOfMemoryError. 
I had seen in JMX metrics and there heap is out and GC is endless busy, till 
nothing is also to JMX reported.

I mean I can write a tool to reboot Kafka node, when GC load on CPU is higher 
than 40% or so, but this kind of tool is workaround and not a solution for the 
problem.

I am attaching graphs to highlight wat had happened.

On the image blow there are metrics from 2 Kafka nodes. The green one was 
dead/zombie when GC time reached 80%. This "drop" of value is only a 
presentation matter.

!screenshot-1.png! 


was (Author: habdank):
Thanks for comment.

The problem is, that either the OnOutOfMemoryError is never thrown, as the 
algorithms trying to do their best and they are loading GC, and then no message 
processing may happen.

Or the OnOutOfMemoryError is thrown, but caught in code like catch(Throwable) {}

The observed bahaviour is that at INFO level logs there is no explicit error 
like: OnOutOfMemoryError. 
I had seen in JMX metrics and there heap is out and GC is endless busy, till 
nothing is also to JMX reported.

I mean I can write a tool to reboot Kafka node, when GC load on CPU is higher 
than 40% or so, but this kind of tool is workaround and not a solution for the 
problem.

I am attaching graphs to highlight wat had happend.

On the image blow there are metrics from 2 Kafka nodes. The green one was 
dead/zombie when GC time reached 80%. This "drop" of value is only a 
presentation matter.

 !screenshot-1.png! 

> Wrong reaction on Out Of Memory situation
> -
>
> Key: KAFKA-6777
> URL: https://issues.apache.org/jira/browse/KAFKA-6777
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
> Attachments: screenshot-1.png
>
>
> Dears,
> We already encountered many times problems related to Out Of Memory situation 
> in Kafka Broker and streaming clients.
> The scenario is the following.
> When Kafka Broker (or Streaming Client) is under load and has too less 
> memory, there are no errors in server logs. One can see some cryptic entries 
> in GC logs, but they are definitely not self-explaining.
> Kafka Broker (and Streaming Clients) works further. Later we see in JMX 
> monitoring, that JVM uses more and more time in GC. In our case it grows from 
> e.g. 1% to 80%-90% of CPU time is used by GC.
> Next, software collapses into zombie mode – process in not ending. In such a 
> case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse 
> Kafka treats such a zombie process normal and somewhat sends messages, which 
> are in fact getting lost, also the cluster is not excluding broken nodes. The 
> question is how to configure Kafka to really terminate the JVM and not remain 
> in zombie mode, to give a chance to other nodes to know, that something is 
> dead.
> I would expect that in Out Of Memory situation JVM is ended if not graceful 
> than at least process is crashed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6777) Wrong reaction on Out Of Memory situation

2018-04-11 Thread Seweryn Habdank-Wojewodzki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433863#comment-16433863
 ] 

Seweryn Habdank-Wojewodzki commented on KAFKA-6777:
---

One more comment. I see quite often in Kafka that Throwable is converted to 
RuntimeException.
 This kind of code may lead to situation when OOM will never appear.

I had made simple example:
{code:java}
public class Main {
public static void main ( String[] args ) {
try {
try {
throw new OutOfMemoryError();
// very often in Kafka code:
} catch ( Throwable t ) {
throw ( RuntimeException ) t;
}
// end of very often
} catch ( Exception ignore ) {
}
}
}
{code}
Executed with:
{code:java}
-XX:OnOutOfMemoryError="echo OOM"
{code}
leads to:
{code:java}
Process finished with exit code 0
{code}
I see no *OOM* string, also no _OutOfMemoryError_ is noticed, by any stactrace.

> Wrong reaction on Out Of Memory situation
> -
>
> Key: KAFKA-6777
> URL: https://issues.apache.org/jira/browse/KAFKA-6777
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
> Attachments: screenshot-1.png
>
>
> Dears,
> We already encountered many times problems related to Out Of Memory situation 
> in Kafka Broker and streaming clients.
> The scenario is the following.
> When Kafka Broker (or Streaming Client) is under load and has too less 
> memory, there are no errors in server logs. One can see some cryptic entries 
> in GC logs, but they are definitely not self-explaining.
> Kafka Broker (and Streaming Clients) works further. Later we see in JMX 
> monitoring, that JVM uses more and more time in GC. In our case it grows from 
> e.g. 1% to 80%-90% of CPU time is used by GC.
> Next, software collapses into zombie mode – process in not ending. In such a 
> case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse 
> Kafka treats such a zombie process normal and somewhat sends messages, which 
> are in fact getting lost, also the cluster is not excluding broken nodes. The 
> question is how to configure Kafka to really terminate the JVM and not remain 
> in zombie mode, to give a chance to other nodes to know, that something is 
> dead.
> I would expect that in Out Of Memory situation JVM is ended if not graceful 
> than at least process is crashed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6777) Wrong reaction on Out Of Memory situation

2018-04-11 Thread Seweryn Habdank-Wojewodzki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433863#comment-16433863
 ] 

Seweryn Habdank-Wojewodzki edited comment on KAFKA-6777 at 4/11/18 12:58 PM:
-

One more comment. I see quite often in Kafka that Throwable is converted to 
RuntimeException.
 This kind of code may lead to situation when OOM will never appear.

I had made simple example:
{code:java}
public class Main {
public static void main ( String[] args ) {
try {
try {
throw new OutOfMemoryError();
// very often in Kafka code:
} catch ( Throwable t ) {
throw ( RuntimeException ) t;
}
// end of very often
} catch ( Exception ignore ) {
}
}
}
{code}
Executed with:
{code:java}
-XX:OnOutOfMemoryError="echo OOM"
{code}
leads to:
{code:java}
Process finished with exit code 0
{code}
I see no *OOM* string, also no _OutOfMemoryError_ is noticed, by any stactrace.

Generally all Errors derived from {{java.lang.Error}} are swallowed, including:
* InternalError, 
* OutOfMemoryError, 
* StackOverflowError, 
* UnknownError, 
* ThreadDeath,
* IOError
 


was (Author: habdank):
One more comment. I see quite often in Kafka that Throwable is converted to 
RuntimeException.
 This kind of code may lead to situation when OOM will never appear.

I had made simple example:
{code:java}
public class Main {
public static void main ( String[] args ) {
try {
try {
throw new OutOfMemoryError();
// very often in Kafka code:
} catch ( Throwable t ) {
throw ( RuntimeException ) t;
}
// end of very often
} catch ( Exception ignore ) {
}
}
}
{code}
Executed with:
{code:java}
-XX:OnOutOfMemoryError="echo OOM"
{code}
leads to:
{code:java}
Process finished with exit code 0
{code}
I see no *OOM* string, also no _OutOfMemoryError_ is noticed, by any stactrace.

> Wrong reaction on Out Of Memory situation
> -
>
> Key: KAFKA-6777
> URL: https://issues.apache.org/jira/browse/KAFKA-6777
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
> Attachments: screenshot-1.png
>
>
> Dears,
> We already encountered many times problems related to Out Of Memory situation 
> in Kafka Broker and streaming clients.
> The scenario is the following.
> When Kafka Broker (or Streaming Client) is under load and has too less 
> memory, there are no errors in server logs. One can see some cryptic entries 
> in GC logs, but they are definitely not self-explaining.
> Kafka Broker (and Streaming Clients) works further. Later we see in JMX 
> monitoring, that JVM uses more and more time in GC. In our case it grows from 
> e.g. 1% to 80%-90% of CPU time is used by GC.
> Next, software collapses into zombie mode – process in not ending. In such a 
> case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse 
> Kafka treats such a zombie process normal and somewhat sends messages, which 
> are in fact getting lost, also the cluster is not excluding broken nodes. The 
> question is how to configure Kafka to really terminate the JVM and not remain 
> in zombie mode, to give a chance to other nodes to know, that something is 
> dead.
> I would expect that in Out Of Memory situation JVM is ended if not graceful 
> than at least process is crashed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6760) responses not logged properly in controller

2018-04-11 Thread Mickael Maison (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison reassigned KAFKA-6760:
-

Assignee: Mickael Maison

> responses not logged properly in controller
> ---
>
> Key: KAFKA-6760
> URL: https://issues.apache.org/jira/browse/KAFKA-6760
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 1.1.0
>Reporter: Jun Rao
>Assignee: Mickael Maison
>Priority: Major
>  Labels: newbie
>
> Saw the following logging in controller.log. We need to log the 
> StopReplicaResponse properly in KafkaController.
> [2018-04-05 14:38:41,878] DEBUG [Controller id=0] Delete topic callback 
> invoked for org.apache.kafka.common.requests.StopReplicaResponse@263d40c 
> (kafka.controller.K
> afkaController)
> It seems that the same issue exists for LeaderAndIsrResponse as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6752) Unclean leader election metric no longer working

2018-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434271#comment-16434271
 ] 

ASF GitHub Bot commented on KAFKA-6752:
---

junrao closed pull request #4838: KAFKA-6752: Enable unclean leader election 
metric
URL: https://github.com/apache/kafka/pull/4838
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 74bc59faee2..6805e321393 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -362,7 +362,7 @@ class PartitionStateMachine(config: KafkaConfig,
   if (leaderIsrAndControllerEpochOpt.nonEmpty) {
 val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochOpt.get
 val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
-val leaderOpt = 
PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, 
isr, liveReplicas.toSet, uncleanLeaderElectionEnabled)
+val leaderOpt = 
PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, 
isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext)
 val newLeaderAndIsrOpt = leaderOpt.map { leader =>
   val newIsr = if (isr.contains(leader)) isr.filter(replica => 
controllerContext.isReplicaOnline(replica, partition))
   else List(leader)
@@ -435,10 +435,13 @@ class PartitionStateMachine(config: KafkaConfig,
 }
 
 object PartitionLeaderElectionAlgorithms {
-  def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], 
liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean): Option[Int] = {
+  def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], 
liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean, 
controllerContext: ControllerContext): Option[Int] = {
 assignment.find(id => liveReplicas.contains(id) && 
isr.contains(id)).orElse {
   if (uncleanLeaderElectionEnabled) {
-assignment.find(liveReplicas.contains)
+val leaderOpt = assignment.find(liveReplicas.contains)
+if (!leaderOpt.isEmpty)
+  controllerContext.stats.uncleanLeaderElectionRate.mark()
+leaderOpt
   } else {
 None
   }
diff --git 
a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
 
b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
index f149fc93a49..113a39d5430 100644
--- 
a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
+++ 
b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
@@ -17,10 +17,17 @@
 package kafka.controller
 
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Before, Test}
 import org.scalatest.junit.JUnitSuite
 
 class PartitionLeaderElectionAlgorithmsTest  extends JUnitSuite {
+  private var controllerContext: ControllerContext = null
+
+  @Before
+  def setUp(): Unit = {
+controllerContext = new ControllerContext
+controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec")
+  }
 
   @Test
   def testOfflinePartitionLeaderElection(): Unit = {
@@ -30,7 +37,8 @@ class PartitionLeaderElectionAlgorithmsTest  extends 
JUnitSuite {
 val leaderOpt = 
PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
   isr,
   liveReplicas,
-  uncleanLeaderElectionEnabled = false)
+  uncleanLeaderElectionEnabled = false,
+  controllerContext)
 assertEquals(Option(4), leaderOpt)
   }
 
@@ -42,9 +50,12 @@ class PartitionLeaderElectionAlgorithmsTest  extends 
JUnitSuite {
 val leaderOpt = 
PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
   isr,
   liveReplicas,
-  uncleanLeaderElectionEnabled = false)
+  uncleanLeaderElectionEnabled = false,
+  controllerContext)
 assertEquals(None, leaderOpt)
+assertEquals(0, controllerContext.stats.uncleanLeaderElectionRate.count())
   }
+
   @Test
   def 
testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionEnabled(): 
Unit = {
 val assignment = Seq(2, 4)
@@ -53,8 +64,10 @@ class PartitionLeaderElectionAlgorithmsTest  extends 
JUnitSuite {
 val leaderOpt = 
PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
   isr,
   liveReplicas,
-  uncleanLeaderElectionEnabled = true)
+  uncleanLeaderElectionEnabled = true,
+  controllerContext)
 assertEquals(Option(4), leaderOpt)
+assertEquals(1, controllerC

[jira] [Resolved] (KAFKA-6752) Unclean leader election metric no longer working

2018-04-11 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-6752.

   Resolution: Fixed
Fix Version/s: 1.1.1
   2.0.0

Merged to trunk and 1.1.

> Unclean leader election metric no longer working
> 
>
> Key: KAFKA-6752
> URL: https://issues.apache.org/jira/browse/KAFKA-6752
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 1.1.0
>Reporter: Jason Gustafson
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> Happened to notice that the unclean leader election meter is no longer being 
> updated. This was probably lost during the controller overhaul.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6447) Add Delegation Token Operations to KafkaAdminClient

2018-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434307#comment-16434307
 ] 

ASF GitHub Bot commented on KAFKA-6447:
---

junrao closed pull request #4427: KAFKA-6447: Add Delegation Token Operations 
to KafkaAdminClient (KIP-249)
URL: https://github.com/apache/kafka/pull/4427
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index f83698050ed..69f560ec38f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -858,6 +858,7 @@ project(':clients') {
 include "**/org/apache/kafka/common/config/*"
 include "**/org/apache/kafka/common/security/auth/*"
 include "**/org/apache/kafka/server/policy/*"
+include "**/org/apache/kafka/common/security/token/delegation/*"
   }
 }
 
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 0fec810a95b..2767132886d 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -10,7 +10,7 @@
 
 
 
+  
files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient|AdminClient).java"/>
 
  re
  */
 public abstract DeleteRecordsResult deleteRecords(Map recordsToDelete,
   DeleteRecordsOptions 
options);
+
+/**
+ * Create a Delegation Token.
+ *
+ * This is a convenience method for {@link 
#createDelegationToken(CreateDelegationTokenOptions)} with default options.
+ * See the overload for more details.
+ *
+ * @return  The CreateDelegationTokenResult.
+ */
+public CreateDelegationTokenResult createDelegationToken() {
+return createDelegationToken(new CreateDelegationTokenOptions());
+}
+
+
+/**
+ * Create a Delegation Token.
+ *
+ * This operation is supported by brokers with version 1.1.0 or 
higher.
+ *
+ * The following exceptions can be anticipated when calling {@code 
get()} on the futures obtained from the
+ * {@link CreateDelegationTokenResult#delegationToken() delegationToken()} 
method of the returned {@code CreateDelegationTokenResult}
+ * 
+ * {@link 
org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+ * If the request sent on PLAINTEXT/1-way SSL channels or delegation 
token authenticated channels.
+ * {@link 
org.apache.kafka.common.errors.InvalidPrincipalTypeException}
+ * if the renewers principal type is not supported.
+ * {@link 
org.apache.kafka.common.errors.DelegationTokenDisabledException}
+ * if the delegation token feature is disabled.
+ * {@link org.apache.kafka.common.errors.TimeoutException}
+ * if the request was not completed in within the given {@link 
CreateDelegationTokenOptions#timeoutMs()}.
+ * 
+ *
+ * @param options   The options to use when creating 
delegation token.
+ * @return  The DeleteRecordsResult.
+ */
+public abstract CreateDelegationTokenResult 
createDelegationToken(CreateDelegationTokenOptions options);
+
+
+/**
+ * Renew a Delegation Token.
+ *
+ * This is a convenience method for {@link 
#renewDelegationToken(byte[], RenewDelegationTokenOptions)} with default 
options.
+ * See the overload for more details.
+ *
+ *
+ * @param hmac  HMAC of the Delegation token
+ * @return  The RenewDelegationTokenResult.
+ */
+public RenewDelegationTokenResult renewDelegationToken(byte[] hmac) {
+return renewDelegationToken(hmac, new RenewDelegationTokenOptions());
+}
+
+/**
+ *  Renew a Delegation Token.
+ *
+ * This operation is supported by brokers with version 1.1.0 or 
higher.
+ *
+ * The following exceptions can be anticipated when calling {@code 
get()} on the futures obtained from the
+ * {@link RenewDelegationTokenResult#expiryTimestamp() expiryTimestamp()} 
method of the returned {@code RenewDelegationTokenResult}
+ * 
+ * {@link 
org.apache.kafka.common.errors.UnsupportedByAuthenticationException}
+ * If the request sent on PLAINTEXT/1-way SSL channels or delegation 
token authenticated channels.
+ * {@link 
org.apache.kafka.common.errors.DelegationTokenDisabledException}
+ * if the delegation token feature is disabled.
+ * {@link 
org.apache.kafka.common.errors.DelegationTokenNotFoundException}
+ * if the delegation token is not found on server.
+ * {@link 
org.apache.kafka.common.errors.DelegationTokenOwnerMismatchExcep

[jira] [Resolved] (KAFKA-6447) Add Delegation Token Operations to KafkaAdminClient

2018-04-11 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-6447.

   Resolution: Fixed
Fix Version/s: 2.0.0

Merged the PR to trunk.

> Add Delegation Token Operations to KafkaAdminClient
> ---
>
> Key: KAFKA-6447
> URL: https://issues.apache.org/jira/browse/KAFKA-6447
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.0.0
>
>
> This JIRA is about adding delegation token operations to the new Admin Client 
> API.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-249%3A+Add+Delegation+Token+Operations+to+KafkaAdminClient



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6611) Re-write simple benchmark in system tests with JMXTool

2018-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434325#comment-16434325
 ] 

ASF GitHub Bot commented on KAFKA-6611:
---

guozhangwang opened a new pull request #4854: KAFKA-6611, PART II: Improve 
Streams SimpleBenchmark
URL: https://github.com/apache/kafka/pull/4854
 
 
   1. SimpleBenchmark:
   
   1.a Do not rely on manual num.records / bytes collection on atomic 
integers.
   1.b Rely on config files for num.threads, bootstrap.servers, etc.
   1.c Add parameters for key skewness and value size.
   1.d Refactor the tests for loading phase, adding tumbling-windowed count.
   1.e For consumer / consumeproduce, collect metrics on consumer instead.
   1.f Force stop the test after 3 minutes, this is based on empirical 
numbers of 10M records.
   
   2. Other tests: use config for kafka bootstrap servers.
   
   3. streams_simple_benchmark.py: only use scale 1 for system test, remove 
yahoo from benchmark tests.
   
   Note that the JMX based metrics is more accurate than the manually collected 
metrics. For example:
   
   ```
   Module: kafkatest.benchmarks.streams.streams_simple_benchmark_test
   Class:  StreamsSimpleBenchmarkTest
   Method: test_simple_benchmark
   Arguments:
   {
 "scale": 3,
 "test": "streamcount"
   }
   
   
   {
 "Streams Count Performance [records/latency/rec-sec/MB-sec counted]0": " 
3691724/180042/20504.79332600171/20.64996886468678\n",
 "Streams Count Performance [records/latency/rec-sec/MB-sec counted]1": " 
3337273/180037/18536.595255419717/18.667835797999594\n",
 "Streams Count Performance [records/latency/rec-sec/MB-sec counted]2": " 
2971003/180029/16502.913419504635/16.61975533580484\n",
 "jmx-avg0": {
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:commit-latency-avg":
 473.9851851851854,
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:commit-rate":
 0.03802973633453574,
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:poll-latency-avg":
 1.6337178935425791,
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:poll-rate":
 41.49027662118575,
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:process-latency-avg":
 0.0021335594038200878,
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:process-rate":
 21042.80038005481
 },
 "jmx-avg1": {
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:commit-latency-avg":
 354.08056,
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:commit-rate":
 0.030462783608341253,
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:poll-latency-avg":
 2.2233941132541286,
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:poll-rate":
 35.82136735561879,
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:process-latency-avg":
 0.002402570337120884,
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:process-rate":
 19100.53692641491
 },
 "jmx-avg2": {
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:commit-latency-avg":
 365.85,
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:commit-rate":
 0.03052173041795845,
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:poll-latency-avg":
 1.1700956516025365,
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:poll-rate":
 28.06739395694291,
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:process-latency-avg":
 0.0016847557478484937,
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:process-rate":
 15594.917035789254
 },
 "jmx-max0": {
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:commit-latency-avg":
 547.0,
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:commit-rate":
 0.06657567990413102,
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:poll-latency-avg":
 13.04,
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:poll-rate":
 196.4955857339561,
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:process-latency-avg":
 0.04672288495817908,
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:process-rate":
 99520.53241178217
 },
 "jmx-max1": {
   
"kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:commit-latency-avg":
 65

[jira] [Commented] (KAFKA-6611) Re-write simple benchmark in system tests with JMXTool

2018-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434326#comment-16434326
 ] 

ASF GitHub Bot commented on KAFKA-6611:
---

guozhangwang closed pull request #4744: KAFKA-6611, PART II: Improve Streams 
SimpleBenchmark
URL: https://github.com/apache/kafka/pull/4744
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 3a9cbc4d664..fc4745d8c6e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -832,6 +832,7 @@ public long maybeUpdate(long now) {
 long waitForMetadataFetch = this.metadataFetchInProgress ? 
requestTimeoutMs : 0;
 
 long metadataTimeout = Math.max(timeToNextMetadataUpdate, 
waitForMetadataFetch);
+
 if (metadataTimeout > 0) {
 return metadataTimeout;
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 0514c995635..6a108269f9a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -252,6 +252,9 @@ private long sendProducerData(long now) {
 // and request metadata update, since there are messages to send 
to the topic.
 for (String topic : result.unknownLeaderTopics)
 this.metadata.add(topic);
+
+log.debug("Requesting metadata update due to unknown leader topics 
from the batched records: {}", result.unknownLeaderTopics);
+
 this.metadata.requestUpdate();
 }
 
@@ -557,9 +560,13 @@ else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
 failBatch(batch, response, exception, batch.attempts() < 
this.retries);
 }
 if (error.exception() instanceof InvalidMetadataException) {
-if (error.exception() instanceof 
UnknownTopicOrPartitionException)
+if (error.exception() instanceof 
UnknownTopicOrPartitionException) {
 log.warn("Received unknown topic or partition error in 
produce request on partition {}. The " +
 "topic/partition may not exist or the user may not 
have Describe access to it", batch.topicPartition);
+} else {
+log.warn("Received invalid metadata error in produce 
request on partition {} due to {}. Going " +
+"to request metadata update now", 
batch.topicPartition, error.exception().toString());
+}
 metadata.requestUpdate();
 }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java 
b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index c66d78b7310..423184c5bdf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -40,6 +40,7 @@
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -48,9 +49,7 @@
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.test.TestUtils;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -59,7 +58,6 @@
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Class that provides support for a series of benchmarks. It is usually 
driven by
@@ -77,114 +75,125 @@
  * is still running "consume"
  */
 public class SimpleBenchmark {
+private static final String LOADING_PRODUCER_CLIENT_ID = 
"simple-benchmark-loading-producer";
 
-final String kafka;
-final Boolean loadPhase;
-final String testName;
-final int numThreads;
-final Properties props;
-static final String ALL_TESTS = "all";
-private static final String SOURCE_TOPIC = "simpleBenchmarkSour

[jira] [Commented] (KAFKA-6677) Remove EOS producer config max.in.flight.request.per.connection=1

2018-04-11 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434331#comment-16434331
 ] 

Matthias J. Sax commented on KAFKA-6677:


Thanks for picking this up. I think what you suggest should be sufficient.

> Remove EOS producer config max.in.flight.request.per.connection=1
> -
>
> Key: KAFKA-6677
> URL: https://issues.apache.org/jira/browse/KAFKA-6677
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> When EOS was introduced in 0.11, it was required to set producer config 
> max.in.flight.requests.per.connection=1 for idempotent producer.
> This limitations as fixed in 1.0 release via KAFKA-5494
> Thus, we should remove this setting in Kafka Streams if EOS get's enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6677) Remove EOS producer config max.in.flight.request.per.connection=1

2018-04-11 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434338#comment-16434338
 ] 

Ismael Juma commented on KAFKA-6677:


There is a limit of 5 now btw.

> Remove EOS producer config max.in.flight.request.per.connection=1
> -
>
> Key: KAFKA-6677
> URL: https://issues.apache.org/jira/browse/KAFKA-6677
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> When EOS was introduced in 0.11, it was required to set producer config 
> max.in.flight.requests.per.connection=1 for idempotent producer.
> This limitations as fixed in 1.0 release via KAFKA-5494
> Thus, we should remove this setting in Kafka Streams if EOS get's enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6744) MockProducer with transaction enabled doesn't fail on commit if a record was failed

2018-04-11 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-6744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434357#comment-16434357
 ] 

Pascal Gélinas commented on KAFKA-6744:
---

Yeah I could, it's been on the back of my mind but I've been busy with trying 
to understand/implement transactions on our current setup. When I get back to 
this test, probably next week, I'll be able to provide a quick PR for this.

> MockProducer with transaction enabled doesn't fail on commit if a record was 
> failed
> ---
>
> Key: KAFKA-6744
> URL: https://issues.apache.org/jira/browse/KAFKA-6744
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.0.0
>Reporter: Pascal Gélinas
>Priority: Minor
>
> The KafkaProducer#send documentation states the following:
> When used as part of a transaction, it is not necessary to define a callback 
> or check the result of the future in order to detect errors from send. If any 
> of the send calls failed with an irrecoverable error, the final 
> commitTransaction() call will fail and throw the exception from the last 
> failed send.
> So I was expecting the following to throw an exception:
> {{*MockProducer* producer = new MockProducer<>(false,}}
> {{ new StringSerializer(), new ByteArraySerializer());}}
> {{producer.initTransactions();}}
> {{producer.beginTransaction();}}
> {{producer.send(new ProducerRecord<>("foo", new byte[]{}));}}
> {{producer.errorNext(new RuntimeException());}}
> {{producer.commitTransaction(); // Expecting this to throw}}
> Unfortunately, the commitTransaction() call returns successfully.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6775) AbstractProcessor created in SimpleBenchmark should call super#init

2018-04-11 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6775:
---
Component/s: streams

> AbstractProcessor created in SimpleBenchmark should call super#init
> ---
>
> Key: KAFKA-6775
> URL: https://issues.apache.org/jira/browse/KAFKA-6775
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Ted Yu
>Priority: Minor
>  Labels: easy-fix, newbie
>
> Around line 610:
> {code}
> return new AbstractProcessor() {
> @Override
> public void init(ProcessorContext context) {
> }
> {code}
> super.init should be called above.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6775) AbstractProcessor created in SimpleBenchmark should call super#init

2018-04-11 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6775:
---
Labels: easy-fix newbie  (was: )

> AbstractProcessor created in SimpleBenchmark should call super#init
> ---
>
> Key: KAFKA-6775
> URL: https://issues.apache.org/jira/browse/KAFKA-6775
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Ted Yu
>Priority: Minor
>  Labels: easy-fix, newbie
>
> Around line 610:
> {code}
> return new AbstractProcessor() {
> @Override
> public void init(ProcessorContext context) {
> }
> {code}
> super.init should be called above.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6677) Remove EOS producer config max.in.flight.request.per.connection=1

2018-04-11 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434363#comment-16434363
 ] 

Matthias J. Sax commented on KAFKA-6677:


Thanks for pointing out! So we might actually set the default to 5, but allow 
users to configure a smaller value if they wish, but throw an exception if they 
configure a larger value.

> Remove EOS producer config max.in.flight.request.per.connection=1
> -
>
> Key: KAFKA-6677
> URL: https://issues.apache.org/jira/browse/KAFKA-6677
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> When EOS was introduced in 0.11, it was required to set producer config 
> max.in.flight.requests.per.connection=1 for idempotent producer.
> This limitations as fixed in 1.0 release via KAFKA-5494
> Thus, we should remove this setting in Kafka Streams if EOS get's enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5697) StreamThread.shutdown() need to interrupt the stream threads to break the loop

2018-04-11 Thread John Roesler (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434478#comment-16434478
 ] 

John Roesler commented on KAFKA-5697:
-

[~guozhang] From your problem description, it seems like the solution isn't to 
make shutdown more aggressive, but to make consumer.poll not block forever.

 

WDYT?

> StreamThread.shutdown() need to interrupt the stream threads to break the loop
> --
>
> Key: KAFKA-5697
> URL: https://issues.apache.org/jira/browse/KAFKA-5697
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: John Roesler
>Priority: Major
>  Labels: newbie
>
> In {{StreamThread.shutdown()}} we currently do nothing but set the state, 
> hoping the stream thread may eventually check it and shutdown itself. 
> However, under certain scenarios the thread may get blocked within a single 
> loop and hence will never check on this state enum. For example, it's 
> {{consumer.poll}} call trigger {{ensureCoordinatorReady()}} which will block 
> until the coordinator can be found. If the coordinator broker is never up and 
> running then the Stream instance will be blocked forever.
> A simple way to produce this issue is to start the work count demo without 
> starting the ZK / Kafka broker, and then it will get stuck in a single loop 
> and even `ctrl-C` will not stop it since its set state will never be read by 
> the thread:
> {code:java}
> [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6779) Kafka connect JMX metrics returns inconsistent metrics while running multiple connectors

2018-04-11 Thread Sairam Polavarapu (JIRA)
Sairam Polavarapu created KAFKA-6779:


 Summary: Kafka connect JMX metrics returns inconsistent metrics 
while running multiple connectors
 Key: KAFKA-6779
 URL: https://issues.apache.org/jira/browse/KAFKA-6779
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 1.0.1, 1.1.0, 1.0.0
 Environment: Linux
Reporter: Sairam Polavarapu


JMX Kafka Connect metrics is returning different values for different runs. We 
are running the Kafka connect in distributed mode on a 3 node cluster which 
allowed us the submit the connectors using rest API calls. One of the use 
cases, where we are using MySQL Source Connector (Confluent) and Amazon s3 Sink 
connector (Confluent) as pipeline and need to get the counts of data received 
by source and delivered to sink. Both the connectors are submitted one after 
the other and the scenario what we have observed is when a new connector is 
submitted using the rest call the existing connector metrics are zeroes and 
restarted. This is a known case when the actual connectors are restarted but in 
our case the source connector was not restarted. We have observed the similar 
behavior when using Kafka 1.0.0, 1.0.1 and even after the fixes released in 
Kafka 1.1.0 still the issue persists. And also on a side note when the Metrics 
were tested on the local machine with Kafka connect in distributed mode on a 
single instance the counts are matching between the source and sink. This issue 
is only on how JMX is fetching the data. The physical data that is being 
handled through the connectors don't have any issues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6780) log cleaner shouldn't clean messages beyond high watermark

2018-04-11 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-6780:
--

 Summary: log cleaner shouldn't clean messages beyond high watermark
 Key: KAFKA-6780
 URL: https://issues.apache.org/jira/browse/KAFKA-6780
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Jun Rao


Currently, the firstUncleanableDirtyOffset computed by the log cleaner is 
bounded by the first offset in the active segment. It's possible for the high 
watermark to be smaller than that. This may cause a committed record to be 
removed because of an uncommitted record.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6780) log cleaner shouldn't clean messages beyond high watermark

2018-04-11 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434646#comment-16434646
 ] 

Jun Rao commented on KAFKA-6780:


The fix is probably to further bound firstUncleanableDirtyOffset by the high 
watermark.

> log cleaner shouldn't clean messages beyond high watermark
> --
>
> Key: KAFKA-6780
> URL: https://issues.apache.org/jira/browse/KAFKA-6780
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Jun Rao
>Priority: Major
>
> Currently, the firstUncleanableDirtyOffset computed by the log cleaner is 
> bounded by the first offset in the active segment. It's possible for the high 
> watermark to be smaller than that. This may cause a committed record to be 
> removed because of an uncommitted record.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6780) log cleaner shouldn't clean messages beyond high watermark

2018-04-11 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-6780:
---
Fix Version/s: 1.1.1
   1.2.0

> log cleaner shouldn't clean messages beyond high watermark
> --
>
> Key: KAFKA-6780
> URL: https://issues.apache.org/jira/browse/KAFKA-6780
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Jun Rao
>Priority: Major
> Fix For: 1.2.0, 1.1.1
>
>
> Currently, the firstUncleanableDirtyOffset computed by the log cleaner is 
> bounded by the first offset in the active segment. It's possible for the high 
> watermark to be smaller than that. This may cause a committed record to be 
> removed because of an uncommitted record.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6773) Kafka consumer without group.id crashes when requesting offset on a topic-partition

2018-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434735#comment-16434735
 ] 

ASF GitHub Bot commented on KAFKA-6773:
---

hachikuji closed pull request #4851: KAFKA-6773; Allow offset 
commit/fetch/describe with empty groupId
URL: https://github.com/apache/kafka/pull/4851
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 225b7090761..cbbd91396b2 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -27,7 +27,7 @@ import kafka.utils.Logging
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch.{NO_PRODUCER_EPOCH, 
NO_PRODUCER_ID}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.Time
@@ -109,7 +109,7 @@ class GroupCoordinator(val brokerId: Int,
   protocolType: String,
   protocols: List[(String, Array[Byte])],
   responseCallback: JoinCallback): Unit = {
-validateGroup(groupId).foreach { error =>
+validateGroupStatus(groupId, ApiKeys.JOIN_GROUP).foreach { error =>
   responseCallback(joinError(memberId, error))
   return
 }
@@ -237,7 +237,7 @@ class GroupCoordinator(val brokerId: Int,
   memberId: String,
   groupAssignment: Map[String, Array[Byte]],
   responseCallback: SyncCallback): Unit = {
-validateGroup(groupId) match {
+validateGroupStatus(groupId, ApiKeys.SYNC_GROUP) match {
   case Some(error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS =>
 // The coordinator is loading, which means we've lost the state of the 
active rebalance and the
 // group will need to start over at JoinGroup. By returning rebalance 
in progress, the consumer
@@ -313,7 +313,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def handleLeaveGroup(groupId: String, memberId: String, responseCallback: 
Errors => Unit): Unit = {
-validateGroup(groupId).foreach { error =>
+validateGroupStatus(groupId, ApiKeys.LEAVE_GROUP).foreach { error =>
   responseCallback(error)
   return
 }
@@ -346,7 +346,7 @@ class GroupCoordinator(val brokerId: Int,
 var groupsEligibleForDeletion: Seq[GroupMetadata] = Seq()
 
 groupIds.foreach { groupId =>
-  validateGroup(groupId) match {
+  validateGroupStatus(groupId, ApiKeys.DELETE_GROUPS) match {
 case Some(error) =>
   groupErrors += groupId -> error
 
@@ -386,7 +386,7 @@ class GroupCoordinator(val brokerId: Int,
   memberId: String,
   generationId: Int,
   responseCallback: Errors => Unit) {
-validateGroup(groupId).foreach { error =>
+validateGroupStatus(groupId, ApiKeys.HEARTBEAT).foreach { error =>
   if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS)
 // the group is still loading, so respond just blindly
 responseCallback(Errors.NONE)
@@ -448,7 +448,7 @@ class GroupCoordinator(val brokerId: Int,
  producerEpoch: Short,
  offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
  responseCallback: immutable.Map[TopicPartition, 
Errors] => Unit): Unit = {
-validateGroup(groupId) match {
+validateGroupStatus(groupId, ApiKeys.TXN_OFFSET_COMMIT) match {
   case Some(error) => responseCallback(offsetMetadata.mapValues(_ => 
error))
   case None =>
 val group = groupManager.getGroup(groupId).getOrElse {
@@ -463,7 +463,7 @@ class GroupCoordinator(val brokerId: Int,
   generationId: Int,
   offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
   responseCallback: immutable.Map[TopicPartition, 
Errors] => Unit) {
-validateGroup(groupId) match {
+validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) match {
   case Some(error) => responseCallback(offsetMetadata.mapValues(_ => 
error))
   case None =>
 groupManager.getGroup(groupId) match {
@@ -524,7 +524,7 @@ class GroupCoordinator(val brokerId: Int,
   def handleFetchOffsets(groupId: String, partitions: 
Option[Seq[TopicPartition]] = None):
   (Errors, Map[Top

[jira] [Commented] (KAFKA-5697) StreamThread.shutdown() need to interrupt the stream threads to break the loop

2018-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434745#comment-16434745
 ] 

ASF GitHub Bot commented on KAFKA-5697:
---

vvcephei opened a new pull request #4855: KAFKA-5697: prevent poll() from 
blocking forever
URL: https://github.com/apache/kafka/pull/4855
 
 
   As the name says, currently `KafkaConsumer#poll` may block forever
   despite having a timeout parameter, since that timeout only applies to
   the `ConsumerNetworkClient#poll` and not the `ConsumerCoordinator#poll`.
   
   This change applies the timeout to the entire `poll` operation, allowing us 
to
   ensure threads won't hang forever in the event, e.g., that we can't talk to 
the 
   coordinator. I've tried tomake the change in a 'private' fashion so as not 
to 
   change any public APIs, but let me know if you think it still needs a KIP.
   
   Several tests depended on being able to send a timeout of 0, but still
   have the coordinator poll take non-zero time to do its work. I updated
   them to send a long enough timeout for the coordinator to to the
   required work.
   
   I believe our testing suite should be sufficient to validate this change.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> StreamThread.shutdown() need to interrupt the stream threads to break the loop
> --
>
> Key: KAFKA-5697
> URL: https://issues.apache.org/jira/browse/KAFKA-5697
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: John Roesler
>Priority: Major
>  Labels: newbie
>
> In {{StreamThread.shutdown()}} we currently do nothing but set the state, 
> hoping the stream thread may eventually check it and shutdown itself. 
> However, under certain scenarios the thread may get blocked within a single 
> loop and hence will never check on this state enum. For example, it's 
> {{consumer.poll}} call trigger {{ensureCoordinatorReady()}} which will block 
> until the coordinator can be found. If the coordinator broker is never up and 
> running then the Stream instance will be blocked forever.
> A simple way to produce this issue is to start the work count demo without 
> starting the ZK / Kafka broker, and then it will get stuck in a single loop 
> and even `ctrl-C` will not stop it since its set state will never be read by 
> the thread:
> {code:java}
> [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4107) Support offset reset capability in Kafka Connect

2018-04-11 Thread Jeremy Custenborder (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434752#comment-16434752
 ] 

Jeremy Custenborder commented on KAFKA-4107:


This should be implemented in the REST API. Maybe a combination of setting the 
offset and restarting the task?

> Support offset reset capability in Kafka Connect
> 
>
> Key: KAFKA-4107
> URL: https://issues.apache.org/jira/browse/KAFKA-4107
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jason Gustafson
>Priority: Major
>
> It would be useful in some cases to be able to reset connector offsets. For 
> example, if a topic in Kafka corresponding to a source database is 
> accidentally deleted (or deleted because of corrupt data), an administrator 
> may want to reset offsets and reproduce the log from the beginning. It may 
> also be useful to have support for overriding offsets, but that seems like a 
> less likely use case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6678) Upgrade dependencies with later release versions

2018-04-11 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434768#comment-16434768
 ] 

Ted Yu commented on KAFKA-6678:
---

Which release would drop support for Java 7 ?

thanks

> Upgrade dependencies with later release versions
> 
>
> Key: KAFKA-6678
> URL: https://issues.apache.org/jira/browse/KAFKA-6678
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Ted Yu
>Priority: Major
> Attachments: k-update.txt
>
>
> {code}
> The following dependencies have later release versions:
>  - net.sourceforge.argparse4j:argparse4j [0.7.0 -> 0.8.1]
>  - org.bouncycastle:bcpkix-jdk15on [1.58 -> 1.59]
>  - com.puppycrawl.tools:checkstyle [6.19 -> 8.8]
>  - org.owasp:dependency-check-gradle [3.0.2 -> 3.1.1]
>  - org.ajoberstar:grgit [1.9.3 -> 2.1.1]
>  - org.glassfish.jersey.containers:jersey-container-servlet [2.25.1 -> 2.26]
>  - org.eclipse.jetty:jetty-client [9.2.24.v20180105 -> 9.4.8.v20171121]
>  - org.eclipse.jetty:jetty-server [9.2.24.v20180105 -> 9.4.8.v20171121]
>  - org.eclipse.jetty:jetty-servlet [9.2.24.v20180105 -> 9.4.8.v20171121]
>  - org.eclipse.jetty:jetty-servlets [9.2.24.v20180105 -> 9.4.8.v20171121]
>  - org.openjdk.jmh:jmh-core [1.19 -> 1.20]
>  - org.openjdk.jmh:jmh-core-benchmarks [1.19 -> 1.20]
>  - org.openjdk.jmh:jmh-generator-annprocess [1.19 -> 1.20]
>  - org.lz4:lz4-java [1.4 -> 1.4.1]
>  - org.apache.maven:maven-artifact [3.5.2 -> 3.5.3]
>  - org.jacoco:org.jacoco.agent [0.7.9 -> 0.8.0]
>  - org.jacoco:org.jacoco.ant [0.7.9 -> 0.8.0]
>  - org.rocksdb:rocksdbjni [5.7.3 -> 5.11.3]
>  - org.scala-lang:scala-library [2.11.12 -> 2.12.4]
>  - com.typesafe.scala-logging:scala-logging_2.11 [3.7.2 -> 3.8.0]
>  - org.scala-lang:scala-reflect [2.11.12 -> 2.12.4]
>  - org.scalatest:scalatest_2.11 [3.0.4 -> 3.0.5]
> {code}
> Looks like we can consider upgrading scalatest, jmh-core and checkstyle



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6058) KIP-222: Add "describe consumer groups" and "list consumer groups" to KafkaAdminClient

2018-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434791#comment-16434791
 ] 

ASF GitHub Bot commented on KAFKA-6058:
---

guozhangwang opened a new pull request #4856: KAFKA-6058: Refactor consumer API 
result return types
URL: https://github.com/apache/kafka/pull/4856
 
 
   Refactored the return types in consumer group APIs the following way:
   
   ```
   Map> 
DeleteConsumerGroupsResult#deletedGroups()
   
   Map> 
DescribeConsumerGroupsResult#describedGroups()
   
   KafkaFuture> 
ListConsumerGroupsResult#listings()
   
   KafkaFuture> 
ListConsumerGroupOffsetsResult#partitionsToOffsetAndMetadata()
   ```
   
   1. For DeleteConsumerGroupsResult and DescribeConsumerGroupsResult, for each 
group id we have two round-trips to get the coordinator, and then send the 
delete / describe request; I leave the potential optimization of batching 
requests for future work.
   
   2. For ListConsumerGroupOffsetsResult, it is a simple single round-trip and 
hence the whole map is wrapped as a Future.
   
   3. ListConsumerGroupsResult, it is the most tricky one: we would only know 
how many futures we should wait for after the first listNode returns, and hence 
I constructed the flattened future in the middle wrapped with the underlying 
map of futures.
   
   3.a Another big change I made is, we do not return the exception in the 
flattened future if only a few of the nodes returns ERROR code, and instead 
just return the rest of the listings; to use that I added a new `anyOf` 
function in `KafkaFuture`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KIP-222: Add "describe consumer groups" and "list consumer groups" to 
> KafkaAdminClient
> --
>
> Key: KAFKA-6058
> URL: https://issues.apache.org/jira/browse/KAFKA-6058
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Matthias J. Sax
>Assignee: Jorge Quilcate
>Priority: Major
>  Labels: kip-222
> Fix For: 1.2.0
>
>
> {{KafkaAdminClient}} does not allow to get information about consumer groups. 
> This feature is supported by old {{kafka.admin.AdminClient}} though.
> We should add {{KafkaAdminClient#describeConsumerGroups()}} and 
> {{KafkaAdminClient#listConsumerGroup()}}.
> Associated KIP: KIP-222



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6773) Kafka consumer without group.id crashes when requesting offset on a topic-partition

2018-04-11 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6773.

   Resolution: Fixed
Fix Version/s: (was: 1.1.1)
   (was: 1.2.0)

> Kafka consumer without group.id crashes when requesting offset on a 
> topic-partition
> ---
>
> Key: KAFKA-6773
> URL: https://issues.apache.org/jira/browse/KAFKA-6773
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Konstantine Karantasis
>Assignee: Jason Gustafson
>Priority: Blocker
>
> After recent changes, when a KafkaConsumer requests the current offset on a 
> topic partition (e.g. via a call to {{position}}), the following exception is 
> thrown if the consumer doesn't belong to a consumer group ( {{group.id}} is 
> unset): 
> {code:java}
> org.apache.kafka.common.KafkaException: Unexpected error in fetch offset 
> response: The configured groupId is invalid         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetFetchResponseHandler.handle(ConsumerCoordinator.java:835)
>          at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetFetchResponseHandler.handle(ConsumerCoordinator.java:818)
>          at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:822)
>          at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:802)
>          at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>          at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>          at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>          at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574)
>          at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
>          at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:292)
>          at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
>          at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>          at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:469)
>          at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(ConsumerCoordinator.java:446)
>          at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1788)
>          at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1429)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6781) Transient failure in KafkaAdminClientTest.testDescribeConsumerGroupOffsets

2018-04-11 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-6781:
--

 Summary: Transient failure in 
KafkaAdminClientTest.testDescribeConsumerGroupOffsets
 Key: KAFKA-6781
 URL: https://issues.apache.org/jira/browse/KAFKA-6781
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


{code}
java.lang.AssertionError: expected:<10> but was:<0>
Stacktrace
java.lang.AssertionError: expected:<10> but was:<0>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at 
org.apache.kafka.clients.admin.KafkaAdminClientTest.testDescribeConsumerGroupOffsets(KafkaAdminClientTest.java:794)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6781) Transient failure in KafkaAdminClientTest.testDescribeConsumerGroupOffsets

2018-04-11 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434807#comment-16434807
 ] 

Guozhang Wang commented on KAFKA-6781:
--

This transient error is fixed along with 
https://github.com/apache/kafka/pull/4856/files

> Transient failure in KafkaAdminClientTest.testDescribeConsumerGroupOffsets
> --
>
> Key: KAFKA-6781
> URL: https://issues.apache.org/jira/browse/KAFKA-6781
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Major
>
> {code}
> java.lang.AssertionError: expected:<10> but was:<0>
> Stacktrace
> java.lang.AssertionError: expected:<10> but was:<0>
> at org.junit.Assert.fail(Assert.java:88)
> at org.junit.Assert.failNotEquals(Assert.java:834)
> at org.junit.Assert.assertEquals(Assert.java:645)
> at org.junit.Assert.assertEquals(Assert.java:631)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClientTest.testDescribeConsumerGroupOffsets(KafkaAdminClientTest.java:794)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6775) AbstractProcessor created in SimpleBenchmark should call super#init

2018-04-11 Thread Jimin Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434864#comment-16434864
 ] 

Jimin Hsieh commented on KAFKA-6775:


If I don't misunderstand, I found SmokeTestUtil#printProcessorSupplier has the 
same issue.

> AbstractProcessor created in SimpleBenchmark should call super#init
> ---
>
> Key: KAFKA-6775
> URL: https://issues.apache.org/jira/browse/KAFKA-6775
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Ted Yu
>Priority: Minor
>  Labels: easy-fix, newbie
>
> Around line 610:
> {code}
> return new AbstractProcessor() {
> @Override
> public void init(ProcessorContext context) {
> }
> {code}
> super.init should be called above.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6775) AbstractProcessor created in SimpleBenchmark should call super#init

2018-04-11 Thread Jimin Hsieh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jimin Hsieh reassigned KAFKA-6775:
--

Assignee: Jimin Hsieh

> AbstractProcessor created in SimpleBenchmark should call super#init
> ---
>
> Key: KAFKA-6775
> URL: https://issues.apache.org/jira/browse/KAFKA-6775
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Ted Yu
>Assignee: Jimin Hsieh
>Priority: Minor
>  Labels: easy-fix, newbie
>
> Around line 610:
> {code}
> return new AbstractProcessor() {
> @Override
> public void init(ProcessorContext context) {
> }
> {code}
> super.init should be called above.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6705) producer.send() should not block due to metadata not available

2018-04-11 Thread Dong Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434896#comment-16434896
 ] 

Dong Lin commented on KAFKA-6705:
-

[~omkreddy] [~ijuma] Thanks for the reference to the previous discussion. It is 
very useful.

After reading through the email previous discussion, it seems that the previous 
discussion is not against the proposal here, i.e. producer.send() does not need 
to block on metadata update. I have created KIP 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update]
 and opened thread for discussion. Can you provide comments? Thanks!

> producer.send() should not block due to metadata not available
> --
>
> Key: KAFKA-6705
> URL: https://issues.apache.org/jira/browse/KAFKA-6705
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently producer.send() may block on metadata for up to max.block.ms. This 
> behavior is well documented but it is a bit sub-optimal. Since we return a 
> future we should be able to make producer.send() completely non-blocking. One 
> idea is to simply insert the record into a global queue shared across all 
> partitions, and let the sender thread fetch record from this queue and send 
> to broker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6052) Windows: Consumers not polling when isolation.level=read_committed

2018-04-11 Thread Veera (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434927#comment-16434927
 ] 

Veera commented on KAFKA-6052:
--

Hi Jason / Vahid

 

Can you please update about the release date of 1.1.1 or 1.2.0 ?

 

Regards

Veeru

> Windows: Consumers not polling when isolation.level=read_committed 
> ---
>
> Key: KAFKA-6052
> URL: https://issues.apache.org/jira/browse/KAFKA-6052
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, producer 
>Affects Versions: 0.11.0.0, 1.0.1
> Environment: Windows 10. All processes running in embedded mode.
>Reporter: Ansel Zandegran
>Assignee: Vahid Hashemian
>Priority: Major
>  Labels: transactions, windows
> Fix For: 1.2.0, 1.1.1
>
> Attachments: Prducer_Consumer.log, Separate_Logs.zip, kafka-logs.zip, 
> logFile.log
>
>
> *The same code is running fine in Linux.* I am trying to send a transactional 
> record with exactly once schematics. These are my producer, consumer and 
> broker setups. 
> public void sendWithTTemp(String topic, EHEvent event) {
> Properties props = new Properties();
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>   "localhost:9092,localhost:9093,localhost:9094");
> //props.put("bootstrap.servers", 
> "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092");
> props.put(ProducerConfig.ACKS_CONFIG, "all");
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
> props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
> props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
> props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
> props.put("transactional.id", "TID" + transactionId.incrementAndGet());
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000");
> Producer producer =
> new KafkaProducer<>(props,
> new StringSerializer(),
> new StringSerializer());
> Logger.log(this, "Initializing transaction...");
> producer.initTransactions();
> Logger.log(this, "Initializing done.");
> try {
>   Logger.log(this, "Begin transaction...");
>   producer.beginTransaction();
>   Logger.log(this, "Begin transaction done.");
>   Logger.log(this, "Sending events...");
>   producer.send(new ProducerRecord<>(topic,
>  event.getKey().toString(),
>  event.getValue().toString()));
>   Logger.log(this, "Sending events done.");
>   Logger.log(this, "Committing...");
>   producer.commitTransaction();
>   Logger.log(this, "Committing done.");
> } catch (ProducerFencedException | OutOfOrderSequenceException
> | AuthorizationException e) {
>   producer.close();
>   e.printStackTrace();
> } catch (KafkaException e) {
>   producer.abortTransaction();
>   e.printStackTrace();
> }
> producer.close();
>   }
> *In Consumer*
> I have set isolation.level=read_committed
> *In 3 Brokers*
> I'm running with the following properties
>   Properties props = new Properties();
>   props.setProperty("broker.id", "" + i);
>   props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i));
>   props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + 
> i);
>   props.setProperty("num.partitions", "1");
>   props.setProperty("zookeeper.connect", "localhost:2181");
>   props.setProperty("zookeeper.connection.timeout.ms", "6000");
>   props.setProperty("min.insync.replicas", "2");
>   props.setProperty("offsets.topic.replication.factor", "2");
>   props.setProperty("offsets.topic.num.partitions", "1");
>   props.setProperty("transaction.state.log.num.partitions", "2");
>   props.setProperty("transaction.state.log.replication.factor", "2");
>   props.setProperty("transaction.state.log.min.isr", "2");
> I am not getting any records in the consumer. When I set 
> isolation.level=read_uncommitted, I get the records. I assume that the 
> records are not getting commited. What could be the problem? log attached



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6052) Windows: Consumers not polling when isolation.level=read_committed

2018-04-11 Thread Veera (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434927#comment-16434927
 ] 

Veera edited comment on KAFKA-6052 at 4/12/18 4:58 AM:
---

Hi Jason / Vahid 

Can you please update about the release date of 1.1.1 or 1.2.0 ? 

Regards

Veeru


was (Author: vmallavarapu):
Hi Jason / Vahid

 

Can you please update about the release date of 1.1.1 or 1.2.0 ?

 

Regards

Veeru

> Windows: Consumers not polling when isolation.level=read_committed 
> ---
>
> Key: KAFKA-6052
> URL: https://issues.apache.org/jira/browse/KAFKA-6052
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, producer 
>Affects Versions: 0.11.0.0, 1.0.1
> Environment: Windows 10. All processes running in embedded mode.
>Reporter: Ansel Zandegran
>Assignee: Vahid Hashemian
>Priority: Major
>  Labels: transactions, windows
> Fix For: 1.2.0, 1.1.1
>
> Attachments: Prducer_Consumer.log, Separate_Logs.zip, kafka-logs.zip, 
> logFile.log
>
>
> *The same code is running fine in Linux.* I am trying to send a transactional 
> record with exactly once schematics. These are my producer, consumer and 
> broker setups. 
> public void sendWithTTemp(String topic, EHEvent event) {
> Properties props = new Properties();
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>   "localhost:9092,localhost:9093,localhost:9094");
> //props.put("bootstrap.servers", 
> "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092");
> props.put(ProducerConfig.ACKS_CONFIG, "all");
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
> props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
> props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
> props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
> props.put("transactional.id", "TID" + transactionId.incrementAndGet());
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000");
> Producer producer =
> new KafkaProducer<>(props,
> new StringSerializer(),
> new StringSerializer());
> Logger.log(this, "Initializing transaction...");
> producer.initTransactions();
> Logger.log(this, "Initializing done.");
> try {
>   Logger.log(this, "Begin transaction...");
>   producer.beginTransaction();
>   Logger.log(this, "Begin transaction done.");
>   Logger.log(this, "Sending events...");
>   producer.send(new ProducerRecord<>(topic,
>  event.getKey().toString(),
>  event.getValue().toString()));
>   Logger.log(this, "Sending events done.");
>   Logger.log(this, "Committing...");
>   producer.commitTransaction();
>   Logger.log(this, "Committing done.");
> } catch (ProducerFencedException | OutOfOrderSequenceException
> | AuthorizationException e) {
>   producer.close();
>   e.printStackTrace();
> } catch (KafkaException e) {
>   producer.abortTransaction();
>   e.printStackTrace();
> }
> producer.close();
>   }
> *In Consumer*
> I have set isolation.level=read_committed
> *In 3 Brokers*
> I'm running with the following properties
>   Properties props = new Properties();
>   props.setProperty("broker.id", "" + i);
>   props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i));
>   props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + 
> i);
>   props.setProperty("num.partitions", "1");
>   props.setProperty("zookeeper.connect", "localhost:2181");
>   props.setProperty("zookeeper.connection.timeout.ms", "6000");
>   props.setProperty("min.insync.replicas", "2");
>   props.setProperty("offsets.topic.replication.factor", "2");
>   props.setProperty("offsets.topic.num.partitions", "1");
>   props.setProperty("transaction.state.log.num.partitions", "2");
>   props.setProperty("transaction.state.log.replication.factor", "2");
>   props.setProperty("transaction.state.log.min.isr", "2");
> I am not getting any records in the consumer. When I set 
> isolation.level=read_uncommitted, I get the records. I assume that the 
> records are not getting commited. What could be the problem? log attached



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)