[jira] [Updated] (KAFKA-6123) Give MetricsReporter auto-generated client.id

2018-03-02 Thread Kevin Lu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Lu updated KAFKA-6123:

Affects Version/s: (was: 0.11.0.0)
  Description: 
KAFKA-4756 bugfix resolved the broker's KafkaMetricsReporter missing auto 
generated broker ids, but this was not fixed on the client side.

 

Metric reporters configured for clients should also be given the auto-generated 
client id in the `configure` method. The interceptors do receive the 
auto-generated client id.

  was:
When a {{MetricsReporter}} is configured for a client, it will receive the 
user-specified configurations via {{Configurable.configure(Map 
configs)}}. Likewise, {{ProducerInterceptor}} and {{ConsumerInterceptor}} 
receive user-specified configurations in their configure methods. 

The difference is when a user does not specify the {{client.id}} field, Kafka 
will auto-generate client ids (producer-1, producer-2, consumer-1, consumer-2, 
etc). This auto-generated {{client.id}} will be passed into the interceptors' 
configure method, but it is not passed to the {{MetricsReporter}} configure 
method.

This makes it harder to directly map {{MetricsReporter}} with the interceptors 
for the client when users do not specify the {{client.id}} field. The 
{{client.id}} can be determined from identifying a metric with the 
{{client.id}} tag, but this is hacky and requires traversal. 

It would be useful to have auto-generated {{client.id}} field also passed to 
the {{MetricsReporter}}.

   Issue Type: Bug  (was: Improvement)
  Summary: Give MetricsReporter auto-generated client.id  (was: 
MetricsReporter does not get auto-generated client.id)

> Give MetricsReporter auto-generated client.id
> -
>
> Key: KAFKA-6123
> URL: https://issues.apache.org/jira/browse/KAFKA-6123
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, metrics
>Reporter: Kevin Lu
>Priority: Minor
>  Labels: clients, metrics, newbie++
>
> KAFKA-4756 bugfix resolved the broker's KafkaMetricsReporter missing auto 
> generated broker ids, but this was not fixed on the client side.
>  
> Metric reporters configured for clients should also be given the 
> auto-generated client id in the `configure` method. The interceptors do 
> receive the auto-generated client id.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6603) Kafka streams off heap memory usage does not match expected values from configuration

2018-03-02 Thread Igor Calabria (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383572#comment-16383572
 ] 

Igor Calabria commented on KAFKA-6603:
--

Hey, thanks for the quick reply. I just tested your code in production and it 
didn't reduce memory usage for simple aggregations. What really helped me was 
your pull request, I had some code that used the same rocksDB iterator and 
replaced it with something similar to what you did on the aggregations. code. 
The improvement was significant(especially for throughput), I'm attributing the 
extra memory usage to this inefficient iterator but I still need to do more 
testing.

> Kafka streams off heap memory usage does not match expected values from 
> configuration
> -
>
> Key: KAFKA-6603
> URL: https://issues.apache.org/jira/browse/KAFKA-6603
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Igor Calabria
>Priority: Minor
>
> Hi, I have a simple aggregation pipeline that's backed by the default state 
> store(rocksdb). The pipeline works fine except that off heap the memory usage 
> is way higher than expected. Following the 
> [documention|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config]
>  has some effect(memory usage is reduced) but the values don't match at all. 
> The java process is set to run with just `-Xmx300m -Xms300m`  and rocksdb 
> config looks like this
> {code:java}
> tableConfig.setCacheIndexAndFilterBlocks(true);
> tableConfig.setBlockCacheSize(1048576); //1MB
> tableConfig.setBlockSize(16 * 1024); // 16KB
> options.setTableFormatConfig(tableConfig);
> options.setMaxWriteBufferNumber(2);
> options.setWriteBufferSize(8 * 1024); // 8KB{code}
> To estimate memory usage, I'm using this formula  
> {noformat}
> (block_cache_size + write_buffer_size * write_buffer_number) * segments * 
> partitions{noformat}
> Since my topic has 25 partitions with 3 segments each(it's a windowed store), 
> off heap memory usage should be about 76MB. What I'm seeing in production is 
> upwards of 300MB, even taking in consideration  extra overhead from rocksdb 
> compaction threads, this seems a bit high (especially when the disk usage for 
> all files is just 1GB) 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6603) Kafka streams off heap memory usage does not match expected values from configuration

2018-03-02 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383778#comment-16383778
 ] 

Guozhang Wang commented on KAFKA-6603:
--

Hi Igor, what's the difference between "my code" and "my pull request"? My pull 
request has been merged to trunk so if you compiled from trunk directly, that 
"code" should have the same effect as my pull request?

> Kafka streams off heap memory usage does not match expected values from 
> configuration
> -
>
> Key: KAFKA-6603
> URL: https://issues.apache.org/jira/browse/KAFKA-6603
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Igor Calabria
>Priority: Minor
>
> Hi, I have a simple aggregation pipeline that's backed by the default state 
> store(rocksdb). The pipeline works fine except that off heap the memory usage 
> is way higher than expected. Following the 
> [documention|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config]
>  has some effect(memory usage is reduced) but the values don't match at all. 
> The java process is set to run with just `-Xmx300m -Xms300m`  and rocksdb 
> config looks like this
> {code:java}
> tableConfig.setCacheIndexAndFilterBlocks(true);
> tableConfig.setBlockCacheSize(1048576); //1MB
> tableConfig.setBlockSize(16 * 1024); // 16KB
> options.setTableFormatConfig(tableConfig);
> options.setMaxWriteBufferNumber(2);
> options.setWriteBufferSize(8 * 1024); // 8KB{code}
> To estimate memory usage, I'm using this formula  
> {noformat}
> (block_cache_size + write_buffer_size * write_buffer_number) * segments * 
> partitions{noformat}
> Since my topic has 25 partitions with 3 segments each(it's a windowed store), 
> off heap memory usage should be about 76MB. What I'm seeing in production is 
> upwards of 300MB, even taking in consideration  extra overhead from rocksdb 
> compaction threads, this seems a bit high (especially when the disk usage for 
> all files is just 1GB) 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6603) Kafka streams off heap memory usage does not match expected values from configuration

2018-03-02 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383778#comment-16383778
 ] 

Guozhang Wang edited comment on KAFKA-6603 at 3/2/18 4:30 PM:
--

Hi Igor, what's the difference between "my code" and "my pull request"? My pull 
request has been merged to trunk so if you compiled from trunk directly, that 
"code" should have the same effect as my pull request?

EDIT: never mind, I think I understand your explanation now, you were not using 
DSL but was writing on PAPI layer and access the underlying store directly, 
right? In that case yes the pull request would not help you, and you have to do 
the similar change as in the PR manually.


was (Author: guozhang):
Hi Igor, what's the difference between "my code" and "my pull request"? My pull 
request has been merged to trunk so if you compiled from trunk directly, that 
"code" should have the same effect as my pull request?

> Kafka streams off heap memory usage does not match expected values from 
> configuration
> -
>
> Key: KAFKA-6603
> URL: https://issues.apache.org/jira/browse/KAFKA-6603
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Igor Calabria
>Priority: Minor
>
> Hi, I have a simple aggregation pipeline that's backed by the default state 
> store(rocksdb). The pipeline works fine except that off heap the memory usage 
> is way higher than expected. Following the 
> [documention|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config]
>  has some effect(memory usage is reduced) but the values don't match at all. 
> The java process is set to run with just `-Xmx300m -Xms300m`  and rocksdb 
> config looks like this
> {code:java}
> tableConfig.setCacheIndexAndFilterBlocks(true);
> tableConfig.setBlockCacheSize(1048576); //1MB
> tableConfig.setBlockSize(16 * 1024); // 16KB
> options.setTableFormatConfig(tableConfig);
> options.setMaxWriteBufferNumber(2);
> options.setWriteBufferSize(8 * 1024); // 8KB{code}
> To estimate memory usage, I'm using this formula  
> {noformat}
> (block_cache_size + write_buffer_size * write_buffer_number) * segments * 
> partitions{noformat}
> Since my topic has 25 partitions with 3 segments each(it's a windowed store), 
> off heap memory usage should be about 76MB. What I'm seeing in production is 
> upwards of 300MB, even taking in consideration  extra overhead from rocksdb 
> compaction threads, this seems a bit high (especially when the disk usage for 
> all files is just 1GB) 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6473) Add MockProcessorContext to public test-utils

2018-03-02 Thread John Roesler (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-6473:
---

Assignee: John Roesler

> Add MockProcessorContext to public test-utils
> -
>
> Key: KAFKA-6473
> URL: https://issues.apache.org/jira/browse/KAFKA-6473
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>  Labels: needs-kip, user-experience
>
> With KIP-247, we added public test-utils artifact with a TopologyTestDriver 
> class. Using the test driver for a single 
> Processor/Transformer/ValueTransformer it's required to specify a whole 
> topology with source and sink and plus the 
> Processor/Transformer/ValueTransformer into it.
> For unit testing, it might be more convenient to have a MockProcessorContext, 
> that can be used to test the Processor/Transformer/ValueTransformer in 
> isolation. Ie, the test itself creates new 
> Processor/Transformer/ValueTransformer object and calls init() manually 
> passing in the MockProcessorContext.
> This is a public API change and requires a KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4936) Allow dynamic routing of output records

2018-03-02 Thread Blake Miller (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384010#comment-16384010
 ] 

Blake Miller commented on KAFKA-4936:
-

{quote}this implies that all output topic must be know in advance because users 
need to create them. And thus, users can also use the branch() operator and 
don't need "auto routing" in the first place
{quote}
I think that is not quite correct ^

The set of output topics might be open, (not known at topology-creation time), 
but still created explicitly.

They may be created explicitly outside of the Kafka Streams app, while the 
Kafka Streams app is running, for example.

That's precisely what drew my interest to this potential feature: I had planned 
to use Kafka Streams for a simple service I was building, that has that 
property ^, and found that this limitation prevented me from doing so. So I 
fell back on a plain Producer/Consumer implementation, which is fine, but I'm 
pretty convinced that there is utility to this.

I will organize my thoughts on it and start a discussion on the mailing list. 
Thanks for helping me get started!

> Allow dynamic routing of output records
> ---
>
> Key: KAFKA-4936
> URL: https://issues.apache.org/jira/browse/KAFKA-4936
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> Currently, all used output topics must be know beforehand, and thus, it's not 
> possible to send output records to topic in a dynamic fashion.
> There have been couple of request for this feature and we should consider 
> adding it. There are many open questions however, with regard to topic 
> creation and configuration (replication factor, number of partitions) etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-03-02 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384159#comment-16384159
 ] 

Ted Yu commented on KAFKA-6566:
---

[~rhauch]:
What would be the next step for this JIRA ?

> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Priority: Major
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6606) Regression in consumer auto-commit backoff behavior

2018-03-02 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-6606:
--

 Summary: Regression in consumer auto-commit backoff behavior
 Key: KAFKA-6606
 URL: https://issues.apache.org/jira/browse/KAFKA-6606
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Jason Gustafson
 Fix For: 1.1.0


We introduced a regression in the auto-commit behavior in KAFKA-6362. After 
initiating a send, the consumer does not reset its next commit deadline, so it 
will send auto-commits as fast as the user can call poll() until the first 
offset commit returns.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6606) Regression in consumer auto-commit backoff behavior

2018-03-02 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-6606:
--

Assignee: Jason Gustafson

> Regression in consumer auto-commit backoff behavior
> ---
>
> Key: KAFKA-6606
> URL: https://issues.apache.org/jira/browse/KAFKA-6606
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 1.1.0
>
>
> We introduced a regression in the auto-commit behavior in KAFKA-6362. After 
> initiating a send, the consumer does not reset its next commit deadline, so 
> it will send auto-commits as fast as the user can call poll() until the first 
> offset commit returns.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4936) Allow dynamic routing of output records

2018-03-02 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384283#comment-16384283
 ] 

Matthias J. Sax commented on KAFKA-4936:


Cool. The scenario you describe make sense to me. Looking forward to discuss 
you proposal on the mailing list. If you feel confident, you can start with a 
KIP directly. Up to you.

> Allow dynamic routing of output records
> ---
>
> Key: KAFKA-4936
> URL: https://issues.apache.org/jira/browse/KAFKA-4936
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> Currently, all used output topics must be know beforehand, and thus, it's not 
> possible to send output records to topic in a dynamic fashion.
> There have been couple of request for this feature and we should consider 
> adding it. There are many open questions however, with regard to topic 
> creation and configuration (replication factor, number of partitions) etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6607) Kafka Streams lag not zero when input topic transactional

2018-03-02 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-6607:
--

 Summary: Kafka Streams lag not zero when input topic transactional
 Key: KAFKA-6607
 URL: https://issues.apache.org/jira/browse/KAFKA-6607
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 1.0.0
Reporter: Matthias J. Sax


When an input topic for a Kafka Streams application is written using 
transaction, Kafka Streams does not commit "endOffset" but "endOffset - 1" if 
it reaches the end of topic. The reason is the commit marker that is the last 
"message" in the topic; Streams commit "offset of last processed message plus 
1" and does not take commit markers into account.

This is not a correctness issue, but when one inspect the consumer lag via 
{{bin/kafka-consumer.group.sh}} the lag is show as 1 instead of 0 – what is 
correct from consumer-group tool point of view.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6606) Regression in consumer auto-commit backoff behavior

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384401#comment-16384401
 ] 

ASF GitHub Bot commented on KAFKA-6606:
---

hachikuji opened a new pull request #4641: KAFKA-6606; Ensure consumer awaits 
auto-commit interval after sending…
URL: https://github.com/apache/kafka/pull/4641
 
 
   We need to reset the auto-commit deadline after sending the offset commit 
request so that we do not resend it while the request is still inflight. 
   
   Added unit tests ensuring this behavior and proper backoff in the case of a 
failure.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Regression in consumer auto-commit backoff behavior
> ---
>
> Key: KAFKA-6606
> URL: https://issues.apache.org/jira/browse/KAFKA-6606
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 1.1.0
>
>
> We introduced a regression in the auto-commit behavior in KAFKA-6362. After 
> initiating a send, the consumer does not reset its next commit deadline, so 
> it will send auto-commits as fast as the user can call poll() until the first 
> offset commit returns.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5863) Potential null dereference in DistributedHerder#reconfigureConnector()

2018-03-02 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-5863:
--
Description: 
Here is the call chain:
{code}
RestServer.httpRequest(reconfigUrl, "POST", 
taskProps, null);
{code}
In httpRequest():
{code}
} else if (responseCode >= 200 && responseCode < 300) {
InputStream is = connection.getInputStream();
T result = JSON_SERDE.readValue(is, responseFormat);
{code}
For readValue():
{code}
public  T readValue(InputStream src, TypeReference valueTypeRef)
throws IOException, JsonParseException, JsonMappingException
{
return (T) _readMapAndClose(_jsonFactory.createParser(src), 
_typeFactory.constructType(valueTypeRef));
{code}
Then there would be NPE in constructType():
{code}
public JavaType constructType(TypeReference typeRef)
{
// 19-Oct-2015, tatu: Simpler variant like so should work
return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
{code}

  was:
Here is the call chain:
{code}
RestServer.httpRequest(reconfigUrl, "POST", 
taskProps, null);
{code}
In httpRequest():
{code}
} else if (responseCode >= 200 && responseCode < 300) {
InputStream is = connection.getInputStream();
T result = JSON_SERDE.readValue(is, responseFormat);
{code}
For readValue():
{code}
public  T readValue(InputStream src, TypeReference valueTypeRef)
throws IOException, JsonParseException, JsonMappingException
{
return (T) _readMapAndClose(_jsonFactory.createParser(src), 
_typeFactory.constructType(valueTypeRef));
{code}
Then there would be NPE in constructType():

{code}
public JavaType constructType(TypeReference typeRef)
{
// 19-Oct-2015, tatu: Simpler variant like so should work
return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
{code}


> Potential null dereference in DistributedHerder#reconfigureConnector()
> --
>
> Key: KAFKA-5863
> URL: https://issues.apache.org/jira/browse/KAFKA-5863
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Ted Yu
>Priority: Minor
>
> Here is the call chain:
> {code}
> RestServer.httpRequest(reconfigUrl, "POST", 
> taskProps, null);
> {code}
> In httpRequest():
> {code}
> } else if (responseCode >= 200 && responseCode < 300) {
> InputStream is = connection.getInputStream();
> T result = JSON_SERDE.readValue(is, responseFormat);
> {code}
> For readValue():
> {code}
> public  T readValue(InputStream src, TypeReference valueTypeRef)
> throws IOException, JsonParseException, JsonMappingException
> {
> return (T) _readMapAndClose(_jsonFactory.createParser(src), 
> _typeFactory.constructType(valueTypeRef));
> {code}
> Then there would be NPE in constructType():
> {code}
> public JavaType constructType(TypeReference typeRef)
> {
> // 19-Oct-2015, tatu: Simpler variant like so should work
> return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic

2018-03-02 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384490#comment-16384490
 ] 

Ted Yu commented on KAFKA-4879:
---

What about #2 listed by Jason above ?
Should a KIP be drafted ?

Thanks

> KafkaConsumer.position may hang forever when deleting a topic
> -
>
> Key: KAFKA-4879
> URL: https://issues.apache.org/jira/browse/KAFKA-4879
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.0
>Reporter: Shixiong Zhu
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.0.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is 
> this line 
> https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for 
> UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
>   public static void main(String[] args) {
> // Make sure "delete.topic.enable" is set to true.
> // Please create the topic test with "3" partitions manually.
> // The issue is gone when there is only one partition.
> String topic = "test";
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "testgroup");
> props.put("value.deserializer", StringDeserializer.class.getName());
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("enable.auto.commit", "false");
> KafkaConsumer kc = new KafkaConsumer(props);
> kc.subscribe(Collections.singletonList(topic));
> kc.poll(0);
> Set partitions = kc.assignment();
> System.out.println("partitions: " + partitions);
> kc.pause(partitions);
> kc.seekToEnd(partitions);
> System.out.println("please delete the topic in 30 seconds");
> try {
>   // Sleep 30 seconds to give us enough time to delete the topic.
>   Thread.sleep(3);
> } catch (InterruptedException e) {
>   e.printStackTrace();
> }
> System.out.println("sleep end");
> for (TopicPartition p : partitions) {
>   System.out.println(p + " offset: " + kc.position(p));
> }
> System.out.println("cannot reach here");
> kc.close();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1954) Speed Up The Unit Tests

2018-03-02 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384501#comment-16384501
 ] 

Ewen Cheslack-Postava commented on KAFKA-1954:
--

The full test suite is still very slow (now > 30min on my laptop), but the unit 
tests have mostly been brought under control by annotations of unit vs 
integration tests. There's one that we can't yet label properly due to a junit 
issue (waiting on 4.13 release [https://github.com/apache/kafka/pull/).]

I think the 25+ min integration tests are probably still an issue and the focus 
of this bug, but we should consider relabeling now that we make a better 
distinction between the two classes of tests. Even better would be if we had 
integration tests that tested multiple classes working together without 
requiring a full ZK + Kafka set of services to be pulled up...

> Speed Up The Unit Tests
> ---
>
> Key: KAFKA-1954
> URL: https://issues.apache.org/jira/browse/KAFKA-1954
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Balint Molnar
>Priority: Major
>  Labels: newbie++
> Attachments: KAFKA-1954.patch
>
>
> The server unit tests are pretty slow. They take about 8m40s on my machine. 
> Combined with slow scala compile time this is kind of painful.
> Almost all of this time comes from the integration tests which start one or 
> more brokers and then shut them down.
> Our finding has been that these integration tests are actually quite useful 
> so we probably can't just get rid of them.
> Here are some times:
> Zk startup: 100ms
> Kafka server startup: 600ms
> Kafka server shutdown: 500ms
>  
> So you can see that an integration test suite with 10 tests that starts and 
> stops a 3 node cluster for each test will take ~34 seconds even if the tests 
> themselves are instantaneous.
> I think the best solution to this is to get the test harness classes in shape 
> and then performance tune them a bit as this would potentially speed 
> everything up. There are several test harness classes:
> - ZooKeeperTestHarness
> - KafkaServerTestHarness
> - ProducerConsumerTestHarness
> - IntegrationTestHarness (similar to ProducerConsumerTestHarness but using 
> new clients)
> Unfortunately often tests don't use the right harness, they often use a 
> lower-level harness than they should and manually create stuff. Usually the 
> cause of this is that the harness is missing some feature.
> I think the right thing to do here is
> 1. Get the tests converted to the best possible harness. If you are testing 
> producers and consumers then you should use the harness that creates all that 
> and shuts it down for you.
> 2. Optimize the harnesses to be faster.
> How can we optimize the harnesses? I'm not sure, I would solicit ideas. Here 
> are a few:
> 1. It's worth analyzing the logging to see what is taking up time in the 
> startup and shutdown.
> 2. There may be things like controlled shutdown that we can disable (since we 
> are anyway going to discard the brokers after shutdown.
> 3. The harnesses could probably start all the servers and all the clients in 
> parallel.
> 4. We maybe able to tune down the resource usage in the server config for 
> test cases a bit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1368) Upgrade log4j

2018-03-02 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384512#comment-16384512
 ] 

Ewen Cheslack-Postava commented on KAFKA-1368:
--

Per original ticket, it looks like we've had log4j at 1.2.17 for almost 2 years 
already. Is this ticket now really about upgrading to log4j 2.x, and should we 
update the title accordingly?

> Upgrade log4j
> -
>
> Key: KAFKA-1368
> URL: https://issues.apache.org/jira/browse/KAFKA-1368
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.0
>Reporter: Vladislav Pernin
>Assignee: Mickael Maison
>Priority: Major
>
> Upgrade log4j to at least 1.2.16 ou 1.2.17.
> Usage of EnhancedPatternLayout will be possible.
> It allows to set delimiters around the full log, stacktrace included, making 
> log messages collection easier with tools like Logstash.
> Example : <[%d{}]...[%t] %m%throwable>%n
> <[2014-04-08 11:07:20,360] ERROR [KafkaApi-1] Error when processing fetch 
> request for partition [X,6] offset 700 from consumer with correlation id 
> 0 (kafka.server.KafkaApis)
> kafka.common.OffsetOutOfRangeException: Request for offset 700 but we only 
> have log segments in the range 16021 to 16021.
> at kafka.log.Log.read(Log.scala:429)
> ...
> at java.lang.Thread.run(Thread.java:744)>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4854) Producer RecordBatch executes callbacks with `null` provided for metadata if an exception is encountered

2018-03-02 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-4854.
--
Resolution: Not A Bug
  Assignee: Ewen Cheslack-Postava

This behavior is intended. The idea is to have *either* valid metadata about 
the produced messages based on the successful reply from the broker *or* an 
exception indicating why production failed. Metadata about produced messages 
doesn't make sense in the case of an exception since the exception implies the 
messages were not successfully added to the log.

> Producer RecordBatch executes callbacks with `null` provided for metadata if 
> an exception is encountered
> 
>
> Key: KAFKA-4854
> URL: https://issues.apache.org/jira/browse/KAFKA-4854
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.1.1
>Reporter: Robert Quinlivan
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
>
> When using a user-provided callback with the producer, the `RecordBatch` 
> executes the callbacks with a null metadata argument if an exception was 
> encountered. For monitoring and debugging purposes, I would prefer if the 
> metadata were provided, perhaps optionally. For example, it would be useful 
> to know the size of the serialized payload and the offset so these values 
> could appear in application logs.
> To be entirely clear, the piece of code I am considering is in 
> `org.apache.kafka.clients.producer.internals.RecordBatch#done`:
> ```java
> // execute callbacks
> for (Thunk thunk : thunks) {
> try {
> if (exception == null) {
> RecordMetadata metadata = thunk.future.value();
> thunk.callback.onCompletion(metadata, null);
> } else {
> thunk.callback.onCompletion(null, exception);
> }
> } catch (Exception e) {
> log.error("Error executing user-provided callback on message 
> for topic-partition '{}'", topicPartition, e);
> }
> }
> ```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)