Writing a Producer from Scratch

2016-03-04 Thread Hopson, Stephen
Hi,
I am new to Kafka (but not new to computers).

I want to write a kafka producer client for a Unisys OS 2200 mainframe. I need 
to write it in C, and since I have no access to Windows / Unix / Linux 
libraries, I have to develop the interface at the lowest level.

So far, I have downloaded a kafka server with associated zookeeper (kafka 
_2.10-0.8.2.2). Note I have downloaded the Windows version and have it running 
on my laptop, successfully tested on the same laptop with the provided provider 
and consumer clients.

I have developed code to open a TCP session to the kafka server which appears 
to work and I have attempted to send a metadata request which does not appear 
to work. When I say it does not appear to work, I mean that I send the message 
and then I sit on a retrieve, which eventually times out ( I do seem to get one 
character in the receive buffer of 0235 octal). The message format I am using 
is the one described by the excellent document by Jay Creps / Gwen Shapira 
athttps://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
  However, it is not clear what level of kafka these message formats are 
applicable for.

Can anybody offer me any advice or suggestions as to how to progress?

PS is the CRC mandatory in the Producer messages?
Many thanks in advance.


Stephen Hopson | Infrastructure Architect | Enterprise Solutions
Unisys | +44 (0)1908 805010 | +44 (0)7557 303321 | 
stephen.hop...@gb.unisys.com

[unisys_logo]

THIS COMMUNICATION MAY CONTAIN CONFIDENTIAL AND/OR OTHERWISE PROPRIETARY 
MATERIAL and is for use only by the intended recipient. If you received this in 
error, please contact the sender and delete the e-mail and its attachments from 
all devices.



[jira] [Created] (KAFKA-3332) Consumer can't consume messages from zookeeper chroot

2016-03-04 Thread Sergey Vergun (JIRA)
Sergey Vergun created KAFKA-3332:


 Summary: Consumer can't consume messages from zookeeper chroot
 Key: KAFKA-3332
 URL: https://issues.apache.org/jira/browse/KAFKA-3332
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.9.0.1, 0.8.2.2
 Environment: RHEL 6.X, OS X
Reporter: Sergey Vergun
Assignee: Neha Narkhede


I have faced issue when consumer can't consume messages from zookeeper chroot. 
It was tested on Kafka 0.8.2.2 and Kafka 0.9.0.1

My zookeeper options into server.properties:
$cat config/server.properties | grep zookeeper
zookeeper.connect=localhost:2181/kafka-cluster/kafka-0.9.0.1
zookeeper.session.timeout.ms=1
zookeeper.connection.timeout.ms=1
zookeeper.sync.time.ms=2000

I can create successfully a new topic
$kafka-topics.sh --create --partition 3 --replication-factor 1 --topic 
__TEST-Topic_1 --zookeeper localhost:2181/kafka-cluster/kafka-0.9.0.1
Created topic "__TEST-Topic_1".

and produce messages into it
$kafka-console-producer.sh --topic __TEST-Topic_1 --broker-list localhost:9092
1
2
3
4
5

In Kafka Manager I see that messages was delivered:
Sum of partition offsets5

But I can't consume the messages via kafka-console-consumer
$kafka-console-consumer.sh --topic TEST-Topic_1 --zookeeper 
localhost:2181/kafka-cluster/kafka-0.9.0.1 --from-beginning

If I configure kafka cluster with "/" zookeeper chroot then everything is ok.
$cat config/server.properties | grep zookeeper
zookeeper.connect=localhost:2181
zookeeper.session.timeout.ms=1
zookeeper.connection.timeout.ms=1
zookeeper.sync.time.ms=2000

$kafka-console-consumer.sh --topic TEST-Topic_1 --zookeeper localhost:2181 
--from-beginning
1
2
3
4
5

Is it bug or my mistake?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3332) Consumer can't consume messages from zookeeper chroot

2016-03-04 Thread Sergey Vergun (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Vergun updated KAFKA-3332:
-
Description: 
I have faced issue when consumer can't consume messages from zookeeper chroot. 
It was tested on Kafka 0.8.2.2 and Kafka 0.9.0.1

My zookeeper options into server.properties:
$cat config/server.properties | grep zookeeper
zookeeper.connect=localhost:2181/kafka-cluster/kafka-0.9.0.1
zookeeper.session.timeout.ms=1
zookeeper.connection.timeout.ms=1
zookeeper.sync.time.ms=2000

I can create successfully a new topic
$kafka-topics.sh --create --partition 3 --replication-factor 1 --topic 
__TEST-Topic_1 --zookeeper localhost:2181/kafka-cluster/kafka-0.9.0.1
Created topic "__TEST-Topic_1".

and produce messages into it
$kafka-console-producer.sh --topic __TEST-Topic_1 --broker-list localhost:9092
1
2
3
4
5

In Kafka Manager I see that messages was delivered:
Sum of partition offsets5

But I can't consume the messages via kafka-console-consumer
$kafka-console-consumer.sh --topic TEST-Topic_1 --zookeeper 
localhost:2181/kafka-cluster/kafka-0.9.0.1 --from-beginning

If I configure kafka cluster with zookeeper chroot "/" then everything is ok.
$cat config/server.properties | grep zookeeper
zookeeper.connect=localhost:2181
zookeeper.session.timeout.ms=1
zookeeper.connection.timeout.ms=1
zookeeper.sync.time.ms=2000

$kafka-console-producer.sh --topic __TEST-Topic_1 --broker-list localhost:9092
1
2
3
4
5

$kafka-console-consumer.sh --topic TEST-Topic_1 --zookeeper localhost:2181 
--from-beginning
1
2
3
4
5

Is it bug or my mistake?

  was:
I have faced issue when consumer can't consume messages from zookeeper chroot. 
It was tested on Kafka 0.8.2.2 and Kafka 0.9.0.1

My zookeeper options into server.properties:
$cat config/server.properties | grep zookeeper
zookeeper.connect=localhost:2181/kafka-cluster/kafka-0.9.0.1
zookeeper.session.timeout.ms=1
zookeeper.connection.timeout.ms=1
zookeeper.sync.time.ms=2000

I can create successfully a new topic
$kafka-topics.sh --create --partition 3 --replication-factor 1 --topic 
__TEST-Topic_1 --zookeeper localhost:2181/kafka-cluster/kafka-0.9.0.1
Created topic "__TEST-Topic_1".

and produce messages into it
$kafka-console-producer.sh --topic __TEST-Topic_1 --broker-list localhost:9092
1
2
3
4
5

In Kafka Manager I see that messages was delivered:
Sum of partition offsets5

But I can't consume the messages via kafka-console-consumer
$kafka-console-consumer.sh --topic TEST-Topic_1 --zookeeper 
localhost:2181/kafka-cluster/kafka-0.9.0.1 --from-beginning

If I configure kafka cluster with "/" zookeeper chroot then everything is ok.
$cat config/server.properties | grep zookeeper
zookeeper.connect=localhost:2181
zookeeper.session.timeout.ms=1
zookeeper.connection.timeout.ms=1
zookeeper.sync.time.ms=2000

$kafka-console-consumer.sh --topic TEST-Topic_1 --zookeeper localhost:2181 
--from-beginning
1
2
3
4
5

Is it bug or my mistake?


> Consumer can't consume messages from zookeeper chroot
> -
>
> Key: KAFKA-3332
> URL: https://issues.apache.org/jira/browse/KAFKA-3332
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.2.2, 0.9.0.1
> Environment: RHEL 6.X, OS X
>Reporter: Sergey Vergun
>Assignee: Neha Narkhede
>
> I have faced issue when consumer can't consume messages from zookeeper 
> chroot. It was tested on Kafka 0.8.2.2 and Kafka 0.9.0.1
> My zookeeper options into server.properties:
> $cat config/server.properties | grep zookeeper
> zookeeper.connect=localhost:2181/kafka-cluster/kafka-0.9.0.1
> zookeeper.session.timeout.ms=1
> zookeeper.connection.timeout.ms=1
> zookeeper.sync.time.ms=2000
> I can create successfully a new topic
> $kafka-topics.sh --create --partition 3 --replication-factor 1 --topic 
> __TEST-Topic_1 --zookeeper localhost:2181/kafka-cluster/kafka-0.9.0.1
> Created topic "__TEST-Topic_1".
> and produce messages into it
> $kafka-console-producer.sh --topic __TEST-Topic_1 --broker-list localhost:9092
> 1
> 2
> 3
> 4
> 5
> In Kafka Manager I see that messages was delivered:
> Sum of partition offsets  5
> But I can't consume the messages via kafka-console-consumer
> $kafka-console-consumer.sh --topic TEST-Topic_1 --zookeeper 
> localhost:2181/kafka-cluster/kafka-0.9.0.1 --from-beginning
> If I configure kafka cluster with zookeeper chroot "/" then everything is ok.
> $cat config/server.properties | grep zookeeper
> zookeeper.connect=localhost:2181
> zookeeper.session.timeout.ms=1
> zookeeper.connection.timeout.ms=1
> zookeeper.sync.time.ms=2000
> $kafka-console-producer.sh --topic __TEST-Topic_1 --broker-list localhost:9092
> 1
> 2
> 3
> 4
> 5
> $kafka-console-consumer.sh --topic TEST-Topic_1 --zookeeper localhost:218

[jira] [Updated] (KAFKA-3332) Consumer can't consume messages from zookeeper chroot

2016-03-04 Thread Sergey Vergun (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Vergun updated KAFKA-3332:
-
Description: 
I have faced issue when consumer can't consume messages from zookeeper chroot. 
It was tested on Kafka 0.8.2.2 and Kafka 0.9.0.1

My zookeeper options into server.properties:
$cat config/server.properties | grep zookeeper
zookeeper.connect=localhost:2181/kafka-cluster/kafka-0.9.0.1
zookeeper.session.timeout.ms=1
zookeeper.connection.timeout.ms=1
zookeeper.sync.time.ms=2000

I can create successfully a new topic
$kafka-topics.sh --create --partition 3 --replication-factor 1 --topic 
__TEST-Topic_1 --zookeeper localhost:2181/kafka-cluster/kafka-0.9.0.1
Created topic "__TEST-Topic_1".

and produce messages into it
$kafka-console-producer.sh --topic __TEST-Topic_1 --broker-list localhost:9092
1
2
3
4
5

In Kafka Manager I see that messages was delivered:
Sum of partition offsets5

But I can't consume the messages via kafka-console-consumer
$kafka-console-consumer.sh --topic TEST-Topic_1 --zookeeper 
localhost:2181/kafka-cluster/kafka-0.9.0.1 --from-beginning

The consumer is present in zookeeper
[zk: localhost:2181(CONNECTED) 10] ls /kafka-cluster/kafka-0.9.0.1/consumers
[console-consumer-62895] 
[zk: localhost:2181(CONNECTED) 12] ls 
/kafka-cluster/kafka-0.9.0.1/consumers/console-consumer-62895/ids
[console-consumer-62895_SV-Macbook-1457097451996-64640cc1] 


If I reconfigure kafka cluster with zookeeper chroot "/" then everything is ok.
$cat config/server.properties | grep zookeeper
zookeeper.connect=localhost:2181
zookeeper.session.timeout.ms=1
zookeeper.connection.timeout.ms=1
zookeeper.sync.time.ms=2000

$kafka-console-producer.sh --topic __TEST-Topic_1 --broker-list localhost:9092
1
2
3
4
5

$kafka-console-consumer.sh --topic TEST-Topic_1 --zookeeper localhost:2181 
--from-beginning
1
2
3
4
5

Is it bug or my mistake?

  was:
I have faced issue when consumer can't consume messages from zookeeper chroot. 
It was tested on Kafka 0.8.2.2 and Kafka 0.9.0.1

My zookeeper options into server.properties:
$cat config/server.properties | grep zookeeper
zookeeper.connect=localhost:2181/kafka-cluster/kafka-0.9.0.1
zookeeper.session.timeout.ms=1
zookeeper.connection.timeout.ms=1
zookeeper.sync.time.ms=2000

I can create successfully a new topic
$kafka-topics.sh --create --partition 3 --replication-factor 1 --topic 
__TEST-Topic_1 --zookeeper localhost:2181/kafka-cluster/kafka-0.9.0.1
Created topic "__TEST-Topic_1".

and produce messages into it
$kafka-console-producer.sh --topic __TEST-Topic_1 --broker-list localhost:9092
1
2
3
4
5

In Kafka Manager I see that messages was delivered:
Sum of partition offsets5

But I can't consume the messages via kafka-console-consumer
$kafka-console-consumer.sh --topic TEST-Topic_1 --zookeeper 
localhost:2181/kafka-cluster/kafka-0.9.0.1 --from-beginning

If I configure kafka cluster with zookeeper chroot "/" then everything is ok.
$cat config/server.properties | grep zookeeper
zookeeper.connect=localhost:2181
zookeeper.session.timeout.ms=1
zookeeper.connection.timeout.ms=1
zookeeper.sync.time.ms=2000

$kafka-console-producer.sh --topic __TEST-Topic_1 --broker-list localhost:9092
1
2
3
4
5

$kafka-console-consumer.sh --topic TEST-Topic_1 --zookeeper localhost:2181 
--from-beginning
1
2
3
4
5

Is it bug or my mistake?


> Consumer can't consume messages from zookeeper chroot
> -
>
> Key: KAFKA-3332
> URL: https://issues.apache.org/jira/browse/KAFKA-3332
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.2.2, 0.9.0.1
> Environment: RHEL 6.X, OS X
>Reporter: Sergey Vergun
>Assignee: Neha Narkhede
>
> I have faced issue when consumer can't consume messages from zookeeper 
> chroot. It was tested on Kafka 0.8.2.2 and Kafka 0.9.0.1
> My zookeeper options into server.properties:
> $cat config/server.properties | grep zookeeper
> zookeeper.connect=localhost:2181/kafka-cluster/kafka-0.9.0.1
> zookeeper.session.timeout.ms=1
> zookeeper.connection.timeout.ms=1
> zookeeper.sync.time.ms=2000
> I can create successfully a new topic
> $kafka-topics.sh --create --partition 3 --replication-factor 1 --topic 
> __TEST-Topic_1 --zookeeper localhost:2181/kafka-cluster/kafka-0.9.0.1
> Created topic "__TEST-Topic_1".
> and produce messages into it
> $kafka-console-producer.sh --topic __TEST-Topic_1 --broker-list localhost:9092
> 1
> 2
> 3
> 4
> 5
> In Kafka Manager I see that messages was delivered:
> Sum of partition offsets  5
> But I can't consume the messages via kafka-console-consumer
> $kafka-console-consumer.sh --topic TEST-Topic_1 --zookeeper 
> localhost:2181/kafka-cluster/kafka-0.9.0.1 --from-beginning
> The consumer is present in zookeeper
> [zk: lo

[jira] [Created] (KAFKA-3333) Client Partitioner - Round Robin

2016-03-04 Thread Stephen Powis (JIRA)
Stephen Powis created KAFKA-:


 Summary: Client Partitioner - Round Robin
 Key: KAFKA-
 URL: https://issues.apache.org/jira/browse/KAFKA-
 Project: Kafka
  Issue Type: New Feature
  Components: clients
Reporter: Stephen Powis


The 
[DefaultPartitioner|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java]
 typically distributes using the hash of the keybytes, and falls back to round 
robin if there is no key.  But there is currently no way to do Round Robin 
partitioning if you have keys on your messages without writing your own 
partitioning implementation.

I think it'd be helpful to have an implementation of straight Round Robin 
partitioning included with the library.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3333) Client Partitioner - Round Robin

2016-03-04 Thread Stephen Powis (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15179984#comment-15179984
 ] 

Stephen Powis commented on KAFKA-:
--

Will link to a github pull request shortly.

> Client Partitioner - Round Robin
> 
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Reporter: Stephen Powis
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The 
> [DefaultPartitioner|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java]
>  typically distributes using the hash of the keybytes, and falls back to 
> round robin if there is no key.  But there is currently no way to do Round 
> Robin partitioning if you have keys on your messages without writing your own 
> partitioning implementation.
> I think it'd be helpful to have an implementation of straight Round Robin 
> partitioning included with the library.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: [KAFKA-3333] - Add RoundRobinPartitioner

2016-03-04 Thread Crim
GitHub user Crim opened a pull request:

https://github.com/apache/kafka/pull/1012

[KAFKA-] - Add RoundRobinPartitioner

https://issues.apache.org/jira/browse/KAFKA-


The DefaultPartitioner typically distributes using the hash of the 
keybytes, and falls back to round robin if there is no key. But there is 
currently no way to do Round Robin partitioning if you have keys on your 
messages without writing your own partitioning implementation.

I think it'd be helpful to have an implementation of straight Round Robin 
partitioning included with the library.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Crim/kafka KAFKA-

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1012.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1012


commit 7a6781dfdfd1cbb34976a8dc37f0be72c644257b
Author: stephen powis 
Date:   2016-03-04T15:14:42Z

[KAFKA-] - Add RoundRobinPartitioner




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3333) Client Partitioner - Round Robin

2016-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15179998#comment-15179998
 ] 

ASF GitHub Bot commented on KAFKA-:
---

GitHub user Crim opened a pull request:

https://github.com/apache/kafka/pull/1012

[KAFKA-] - Add RoundRobinPartitioner

https://issues.apache.org/jira/browse/KAFKA-


The DefaultPartitioner typically distributes using the hash of the 
keybytes, and falls back to round robin if there is no key. But there is 
currently no way to do Round Robin partitioning if you have keys on your 
messages without writing your own partitioning implementation.

I think it'd be helpful to have an implementation of straight Round Robin 
partitioning included with the library.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Crim/kafka KAFKA-

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1012.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1012


commit 7a6781dfdfd1cbb34976a8dc37f0be72c644257b
Author: stephen powis 
Date:   2016-03-04T15:14:42Z

[KAFKA-] - Add RoundRobinPartitioner




> Client Partitioner - Round Robin
> 
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Reporter: Stephen Powis
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The 
> [DefaultPartitioner|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java]
>  typically distributes using the hash of the keybytes, and falls back to 
> round robin if there is no key.  But there is currently no way to do Round 
> Robin partitioning if you have keys on your messages without writing your own 
> partitioning implementation.
> I think it'd be helpful to have an implementation of straight Round Robin 
> partitioning included with the library.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3333) Client Partitioner - Round Robin

2016-03-04 Thread Stephen Powis (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephen Powis updated KAFKA-:
-
Fix Version/s: 0.10.0.0
Affects Version/s: 0.10.0.0
   Status: Patch Available  (was: Open)

https://github.com/apache/kafka/pull/1012

> Client Partitioner - Round Robin
> 
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Affects Versions: 0.10.0.0
>Reporter: Stephen Powis
> Fix For: 0.10.0.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The 
> [DefaultPartitioner|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java]
>  typically distributes using the hash of the keybytes, and falls back to 
> round robin if there is no key.  But there is currently no way to do Round 
> Robin partitioning if you have keys on your messages without writing your own 
> partitioning implementation.
> I think it'd be helpful to have an implementation of straight Round Robin 
> partitioning included with the library.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3334) First message on new topic not actually being sent, no exception thrown

2016-03-04 Thread Aleksandar Stojadinovic (JIRA)
Aleksandar Stojadinovic created KAFKA-3334:
--

 Summary: First message on new topic not actually being sent, no 
exception thrown
 Key: KAFKA-3334
 URL: https://issues.apache.org/jira/browse/KAFKA-3334
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.9.0.0
 Environment: Linux, Java
Reporter: Aleksandar Stojadinovic


Although I've seen this issue pop around the internet in a few forms, I'm not 
sure it is yet properly fixed. 

When publishing to a new topic, with auto create-enabled, the java client 
(0.9.0) shows this WARN message in the log, and the message is not sent 
obviously:

org.apache.kafka.clients.NetworkClient - Error while fetching metadata with 
correlation id 0 : {file.topic=LEADER_NOT_AVAILABLE}

In the meantime I see in the console the message that a log for partition is 
created. The next messages are patched through normally, but the first one is 
never sent. No exception is ever thrown, either by calling get on the future, 
or with the async usage, like everything is perfect.

I notice when I leave my application blocked on the get call, in the debugger, 
then the message may be processed, but with significant delay. This is 
consistent with another issue I found for the python client. Also, if I call 
partitionsFor previously, the topic is created and the message is sent. But it 
seems silly to call it every time, just to mitigate this issue.

{code}
Future recordMetadataFuture = producer.send(new 
ProducerRecord<>(topic, key, file));
RecordMetadata recordMetadata = recordMetadataFuture.get(30, 
TimeUnit.SECONDS);
{code}

I hope I'm clear enough.

Related similar (but not same) issues:
https://issues.apache.org/jira/browse/KAFKA-1124
https://github.com/dpkp/kafka-python/issues/150
http://stackoverflow.com/questions/35187933/how-to-resolve-leader-not-available-kafka-error-when-trying-to-consume



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3334) First message on new topic not actually being sent, no exception thrown

2016-03-04 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15180235#comment-15180235
 ] 

Ashish K Singh commented on KAFKA-3334:
---

Typically one should create topic before producing to it. The auto-create 
functionality has been very often frowned upon, as it relies on broker creating 
a topic whenever it receives a request for unknown topic. auto-create could 
lead to the issue you ran into among other issues, like, creating topics by 
mistake.

When you sent first message for a non-existent topic, broker starts creating 
the topic, which takes some time. During this period your send request can 
timeout and potentially lead to msg loss. However, once the topic is created, 
things fall in place.

Is it not possible for you to create your topics prior to producing to them. 
There is no standard public API to create topic as of now. However, you can use 
the {{kafka-topics.sh}} tool for now. KIP-4, is going to add client side 
implementation to create topics among various other admin APIs, which is 
planned for 0.10.0.0 release.

I think producer configs, retries and retry.backoff.ms, can probably help you 
to make sure even the first message is not lost.

> First message on new topic not actually being sent, no exception thrown
> ---
>
> Key: KAFKA-3334
> URL: https://issues.apache.org/jira/browse/KAFKA-3334
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
> Environment: Linux, Java
>Reporter: Aleksandar Stojadinovic
>
> Although I've seen this issue pop around the internet in a few forms, I'm not 
> sure it is yet properly fixed. 
> When publishing to a new topic, with auto create-enabled, the java client 
> (0.9.0) shows this WARN message in the log, and the message is not sent 
> obviously:
> org.apache.kafka.clients.NetworkClient - Error while fetching metadata with 
> correlation id 0 : {file.topic=LEADER_NOT_AVAILABLE}
> In the meantime I see in the console the message that a log for partition is 
> created. The next messages are patched through normally, but the first one is 
> never sent. No exception is ever thrown, either by calling get on the future, 
> or with the async usage, like everything is perfect.
> I notice when I leave my application blocked on the get call, in the 
> debugger, then the message may be processed, but with significant delay. This 
> is consistent with another issue I found for the python client. Also, if I 
> call partitionsFor previously, the topic is created and the message is sent. 
> But it seems silly to call it every time, just to mitigate this issue.
> {code}
> Future recordMetadataFuture = producer.send(new 
> ProducerRecord<>(topic, key, file));
> RecordMetadata recordMetadata = recordMetadataFuture.get(30, 
> TimeUnit.SECONDS);
> {code}
> I hope I'm clear enough.
> Related similar (but not same) issues:
> https://issues.apache.org/jira/browse/KAFKA-1124
> https://github.com/dpkp/kafka-python/issues/150
> http://stackoverflow.com/questions/35187933/how-to-resolve-leader-not-available-kafka-error-when-trying-to-consume



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-35 - Retrieve protocol version

2016-03-04 Thread Jay Kreps
Yeah here is my summary of my take:

1. Negotiating a per-connection protocol actually does add a lot of
complexity to clients (many more failure states to get right).

2. Having the client configure the protocol version manually is doable now
but probably a worse state. I suspect this will lead to more not less
confusion.

3. I don't think the current state is actually that bad. Integrators pick a
conservative version and build against that. There is a tradeoff between
getting the new features and being compatible with old Kafka versions. But
a large part of this tradeoff is essential since new features aren't going
to magically appear on old servers, so even if you upgrade your client you
likely aren't going to get the new stuff (since we will end up dynamically
turning it off). Having client features that are there but don't work
because you're on an old cluster may actually be a worse experience if not
handled very carefully..

4. The problems Dana brought up are totally orthogonal to the problem of
having per-api versions or overall versions. The problem was that we
changed behavior subtly without changing the version. This will be an issue
regardless of whether the version is global or not.

5. Using the broker release as the version is strictly worse than using a
global protocol version (0, 1, 2, ...) that increments any time any api
changes but doesn't increment just because non-protocol code is changed.
The problem with using the broker release version is we want to be able to
keep Kafka releasable from any commit which means there isn't as clear a
sequencing of releases as you would think.

6. We need to consider the case of mixed version clusters during the time
period when you are upgrading Kafka.

So overall I think this is not a critical thing to do right now, but if we
are going to do it we should do it in a way that actually improves things.

Here would be one proposal for that:
a. Add a global protocol version that increments with any api version
update. Move the documentation so that the docs are by version. This is
basically just a short-hand for a complete set of supported api versions.
b. Include a field in the metadata response for each broker that adds the
protocol version.
c. To maintain the protocol version this information will have to get
propagated with the rest of the broker metadata like host, port, id, etc.

The instructions to clients would be:
- By default you build against a single conservative Kafka protocol version
and we carry that support forward, as today
- If you want to get fancy you can use the protocol version field in the
metadata request to more dynamically chose what features are available and
select api versions appropriately. This is purely optional.

-Jay

On Thu, Mar 3, 2016 at 9:38 PM, Jason Gustafson  wrote:

> I talked with Jay about this KIP briefly this morning, so let me try to
> summarize the discussion (I'm sure he'll jump in if I get anything wrong).
> Apologies in advance for the length.
>
> I think we both share some skepticism that a request with all the supported
> versions of all the request APIs is going to be a useful primitive to try
> and build client compatibility around. In practice I think people would end
> up checking for particular request versions in order to determine if the
> broker is 0.8 or 0.9 or whatever, and then change behavior accordingly. I'm
> wondering if there's a reasonable way to handle the version responses that
> doesn't amount to that. Maybe you could try to capture feature
> compatibility by checking the versions for a subset of request types? For
> example, to ensure that you can use the new consumer API, you check that
> the group coordinator request is present, the offset commit request version
> is greater than 2, the offset fetch request is greater than 1, and the join
> group request is present. And to ensure compatibility with KIP-32, maybe
> you only need to check the appropriate versions of the fetch and produce
> requests. That sounds kind of complicated to keep track of and you probably
> end up trying to handle combinations which aren't even possible in
> practice.
>
> The alternative is to use a single API version. It could be the Kafka
> release version, but then you need to figure out how to handle users who
> are running off of trunk since multiple API versions will typically change
> between releases. Perhaps it makes more sense to keep a separate API
> version number which is incremented every time any one of the API versions
> increases? This also decouples the protocol from the Kafka distribution.
>
> As far as whether there should be a separate request or not, I get Becket's
> point that you would only need to do the version check once when a
> connection is established, but another round trip still complicates the
> picture quite a bit. Before you just need to send a metadata request to
> bootstrap yourself to the cluster, but now you need to do version
> negotiation before you can even do th

[jira] [Commented] (KAFKA-3334) First message on new topic not actually being sent, no exception thrown

2016-03-04 Thread Aleksandar Stojadinovic (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15180267#comment-15180267
 ] 

Aleksandar Stojadinovic commented on KAFKA-3334:


Understood. To be honest, I could manage manual creation, the fact that
this happens pretty much silently introduces fear if there are not more
situations like this. To be fair I have not seen them until now.

Maybe document this somehow?




> First message on new topic not actually being sent, no exception thrown
> ---
>
> Key: KAFKA-3334
> URL: https://issues.apache.org/jira/browse/KAFKA-3334
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
> Environment: Linux, Java
>Reporter: Aleksandar Stojadinovic
>
> Although I've seen this issue pop around the internet in a few forms, I'm not 
> sure it is yet properly fixed. 
> When publishing to a new topic, with auto create-enabled, the java client 
> (0.9.0) shows this WARN message in the log, and the message is not sent 
> obviously:
> org.apache.kafka.clients.NetworkClient - Error while fetching metadata with 
> correlation id 0 : {file.topic=LEADER_NOT_AVAILABLE}
> In the meantime I see in the console the message that a log for partition is 
> created. The next messages are patched through normally, but the first one is 
> never sent. No exception is ever thrown, either by calling get on the future, 
> or with the async usage, like everything is perfect.
> I notice when I leave my application blocked on the get call, in the 
> debugger, then the message may be processed, but with significant delay. This 
> is consistent with another issue I found for the python client. Also, if I 
> call partitionsFor previously, the topic is created and the message is sent. 
> But it seems silly to call it every time, just to mitigate this issue.
> {code}
> Future recordMetadataFuture = producer.send(new 
> ProducerRecord<>(topic, key, file));
> RecordMetadata recordMetadata = recordMetadataFuture.get(30, 
> TimeUnit.SECONDS);
> {code}
> I hope I'm clear enough.
> Related similar (but not same) issues:
> https://issues.apache.org/jira/browse/KAFKA-1124
> https://github.com/dpkp/kafka-python/issues/150
> http://stackoverflow.com/questions/35187933/how-to-resolve-leader-not-available-kafka-error-when-trying-to-consume



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3334) First message on new topic not actually being sent, no exception thrown

2016-03-04 Thread Ashish K Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish K Singh reassigned KAFKA-3334:
-

Assignee: Ashish K Singh

> First message on new topic not actually being sent, no exception thrown
> ---
>
> Key: KAFKA-3334
> URL: https://issues.apache.org/jira/browse/KAFKA-3334
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
> Environment: Linux, Java
>Reporter: Aleksandar Stojadinovic
>Assignee: Ashish K Singh
>
> Although I've seen this issue pop around the internet in a few forms, I'm not 
> sure it is yet properly fixed. 
> When publishing to a new topic, with auto create-enabled, the java client 
> (0.9.0) shows this WARN message in the log, and the message is not sent 
> obviously:
> org.apache.kafka.clients.NetworkClient - Error while fetching metadata with 
> correlation id 0 : {file.topic=LEADER_NOT_AVAILABLE}
> In the meantime I see in the console the message that a log for partition is 
> created. The next messages are patched through normally, but the first one is 
> never sent. No exception is ever thrown, either by calling get on the future, 
> or with the async usage, like everything is perfect.
> I notice when I leave my application blocked on the get call, in the 
> debugger, then the message may be processed, but with significant delay. This 
> is consistent with another issue I found for the python client. Also, if I 
> call partitionsFor previously, the topic is created and the message is sent. 
> But it seems silly to call it every time, just to mitigate this issue.
> {code}
> Future recordMetadataFuture = producer.send(new 
> ProducerRecord<>(topic, key, file));
> RecordMetadata recordMetadata = recordMetadataFuture.get(30, 
> TimeUnit.SECONDS);
> {code}
> I hope I'm clear enough.
> Related similar (but not same) issues:
> https://issues.apache.org/jira/browse/KAFKA-1124
> https://github.com/dpkp/kafka-python/issues/150
> http://stackoverflow.com/questions/35187933/how-to-resolve-leader-not-available-kafka-error-when-trying-to-consume



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: Generate javadocs for all Streams packages wit...

2016-03-04 Thread miguno
GitHub user miguno opened a pull request:

https://github.com/apache/kafka/pull/1013

Generate javadocs for all Streams packages with the exception of internals



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/miguno/kafka trunk-streams-javadocs-fixes

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1013.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1013


commit e9b3f8ffcfd47fb950fda52fa975930b0d9010e3
Author: Michael G. Noll 
Date:   2016-03-04T18:03:56Z

Generate javadocs for all Streams packages with the exception of 
`*.internals.*`




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3334) First message on new topic not actually being sent, no exception thrown

2016-03-04 Thread Ashish K Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15180279#comment-15180279
 ] 

Ashish K Singh commented on KAFKA-3334:
---

Documentation should help. Will post a PR soon and would love you to review.

> First message on new topic not actually being sent, no exception thrown
> ---
>
> Key: KAFKA-3334
> URL: https://issues.apache.org/jira/browse/KAFKA-3334
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
> Environment: Linux, Java
>Reporter: Aleksandar Stojadinovic
>Assignee: Ashish K Singh
>
> Although I've seen this issue pop around the internet in a few forms, I'm not 
> sure it is yet properly fixed. 
> When publishing to a new topic, with auto create-enabled, the java client 
> (0.9.0) shows this WARN message in the log, and the message is not sent 
> obviously:
> org.apache.kafka.clients.NetworkClient - Error while fetching metadata with 
> correlation id 0 : {file.topic=LEADER_NOT_AVAILABLE}
> In the meantime I see in the console the message that a log for partition is 
> created. The next messages are patched through normally, but the first one is 
> never sent. No exception is ever thrown, either by calling get on the future, 
> or with the async usage, like everything is perfect.
> I notice when I leave my application blocked on the get call, in the 
> debugger, then the message may be processed, but with significant delay. This 
> is consistent with another issue I found for the python client. Also, if I 
> call partitionsFor previously, the topic is created and the message is sent. 
> But it seems silly to call it every time, just to mitigate this issue.
> {code}
> Future recordMetadataFuture = producer.send(new 
> ProducerRecord<>(topic, key, file));
> RecordMetadata recordMetadata = recordMetadataFuture.get(30, 
> TimeUnit.SECONDS);
> {code}
> I hope I'm clear enough.
> Related similar (but not same) issues:
> https://issues.apache.org/jira/browse/KAFKA-1124
> https://github.com/dpkp/kafka-python/issues/150
> http://stackoverflow.com/questions/35187933/how-to-resolve-leader-not-available-kafka-error-when-trying-to-consume



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: HOTFIX: Generate javadocs for all Streams pack...

2016-03-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1013


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3334) First message on new topic not actually being sent, no exception thrown

2016-03-04 Thread James Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15180332#comment-15180332
 ] 

James Cheng commented on KAFKA-3334:


I believe part of the explanation for this is the value of auto.offset.reset, 
which defaults to largest. When the consumer is starting up, it tries to access 
the topic and fails. By the time it retries, the topic has been created but 
also has data in it, so the consumer tries to connect at "largest", which is 
the offset of the most recent bit of data, and so it never receives it.

I believe you may be able to avoid/mitigate this by using 
auto.offset.reset=smallest.

Btw, this has impact on mirrormaker. If you have a mirror whitelist and mirror 
new topics as they are created (whether manually or automatically), and if 
mirrormaker has auto.offset.reset=largest, then mirrormaker may miss messages 
when new topics are created.


> First message on new topic not actually being sent, no exception thrown
> ---
>
> Key: KAFKA-3334
> URL: https://issues.apache.org/jira/browse/KAFKA-3334
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
> Environment: Linux, Java
>Reporter: Aleksandar Stojadinovic
>Assignee: Ashish K Singh
>
> Although I've seen this issue pop around the internet in a few forms, I'm not 
> sure it is yet properly fixed. 
> When publishing to a new topic, with auto create-enabled, the java client 
> (0.9.0) shows this WARN message in the log, and the message is not sent 
> obviously:
> org.apache.kafka.clients.NetworkClient - Error while fetching metadata with 
> correlation id 0 : {file.topic=LEADER_NOT_AVAILABLE}
> In the meantime I see in the console the message that a log for partition is 
> created. The next messages are patched through normally, but the first one is 
> never sent. No exception is ever thrown, either by calling get on the future, 
> or with the async usage, like everything is perfect.
> I notice when I leave my application blocked on the get call, in the 
> debugger, then the message may be processed, but with significant delay. This 
> is consistent with another issue I found for the python client. Also, if I 
> call partitionsFor previously, the topic is created and the message is sent. 
> But it seems silly to call it every time, just to mitigate this issue.
> {code}
> Future recordMetadataFuture = producer.send(new 
> ProducerRecord<>(topic, key, file));
> RecordMetadata recordMetadata = recordMetadataFuture.get(30, 
> TimeUnit.SECONDS);
> {code}
> I hope I'm clear enough.
> Related similar (but not same) issues:
> https://issues.apache.org/jira/browse/KAFKA-1124
> https://github.com/dpkp/kafka-python/issues/150
> http://stackoverflow.com/questions/35187933/how-to-resolve-leader-not-available-kafka-error-when-trying-to-consume



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3334) First message on new topic not actually being sent, no exception thrown

2016-03-04 Thread James Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15180332#comment-15180332
 ] 

James Cheng edited comment on KAFKA-3334 at 3/4/16 6:37 PM:


I believe part of the explanation for this is the value of auto.offset.reset, 
which defaults to largest. When the consumer is starting up, it tries to access 
the topic and fails. By the time it retries, the topic has been created but 
also has data in it, so the consumer tries to connect at "largest", which is 
the offset of the most recent message, and so the consumer never receives the 
initial message.

I believe you may be able to avoid/mitigate this by using 
auto.offset.reset=smallest.

Btw, this has impact on mirrormaker. If you have a mirror whitelist and mirror 
new topics as they are created (whether manually or automatically), and if 
mirrormaker has auto.offset.reset=largest, then mirrormaker may miss messages 
when new topics are created.



was (Author: wushujames):
I believe part of the explanation for this is the value of auto.offset.reset, 
which defaults to largest. When the consumer is starting up, it tries to access 
the topic and fails. By the time it retries, the topic has been created but 
also has data in it, so the consumer tries to connect at "largest", which is 
the offset of the most recent bit of data, and so it never receives it.

I believe you may be able to avoid/mitigate this by using 
auto.offset.reset=smallest.

Btw, this has impact on mirrormaker. If you have a mirror whitelist and mirror 
new topics as they are created (whether manually or automatically), and if 
mirrormaker has auto.offset.reset=largest, then mirrormaker may miss messages 
when new topics are created.


> First message on new topic not actually being sent, no exception thrown
> ---
>
> Key: KAFKA-3334
> URL: https://issues.apache.org/jira/browse/KAFKA-3334
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
> Environment: Linux, Java
>Reporter: Aleksandar Stojadinovic
>Assignee: Ashish K Singh
>
> Although I've seen this issue pop around the internet in a few forms, I'm not 
> sure it is yet properly fixed. 
> When publishing to a new topic, with auto create-enabled, the java client 
> (0.9.0) shows this WARN message in the log, and the message is not sent 
> obviously:
> org.apache.kafka.clients.NetworkClient - Error while fetching metadata with 
> correlation id 0 : {file.topic=LEADER_NOT_AVAILABLE}
> In the meantime I see in the console the message that a log for partition is 
> created. The next messages are patched through normally, but the first one is 
> never sent. No exception is ever thrown, either by calling get on the future, 
> or with the async usage, like everything is perfect.
> I notice when I leave my application blocked on the get call, in the 
> debugger, then the message may be processed, but with significant delay. This 
> is consistent with another issue I found for the python client. Also, if I 
> call partitionsFor previously, the topic is created and the message is sent. 
> But it seems silly to call it every time, just to mitigate this issue.
> {code}
> Future recordMetadataFuture = producer.send(new 
> ProducerRecord<>(topic, key, file));
> RecordMetadata recordMetadata = recordMetadataFuture.get(30, 
> TimeUnit.SECONDS);
> {code}
> I hope I'm clear enough.
> Related similar (but not same) issues:
> https://issues.apache.org/jira/browse/KAFKA-1124
> https://github.com/dpkp/kafka-python/issues/150
> http://stackoverflow.com/questions/35187933/how-to-resolve-leader-not-available-kafka-error-when-trying-to-consume



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3312) Add a offsets methods to ZkUtils and replace relevant usages

2016-03-04 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15180394#comment-15180394
 ] 

Vahid Hashemian commented on KAFKA-3312:


[~granthenke] Just to clarify, do you mean have some helper function in ZkUtils 
to build paths out of zookeeper node names? Thanks.

> Add a offsets methods to ZkUtils and replace relevant usages
> 
>
> Key: KAFKA-3312
> URL: https://issues.apache.org/jira/browse/KAFKA-3312
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.1
>Reporter: Grant Henke
>Assignee: Vahid Hashemian
>
> There are many places in the code that manually build a zookeeper path and 
> get or update offsets. Moving this logic to a common location in ZkUtils 
> would be nice. 
> Ex:
> {code}
> zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
> {code}
> {code}
>  zkUtils.readData(topicDirs.consumerOffsetDir + "/" + 
> topicAndPartition.partition)._1.toLong
> {code}
> {code}
> zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}",
>  partitionData.offset.toString)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3312) Add a offsets methods to ZkUtils and replace relevant usages

2016-03-04 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15180394#comment-15180394
 ] 

Vahid Hashemian edited comment on KAFKA-3312 at 3/4/16 7:12 PM:


[~granthenke] Just to clarify, did you mean adding some helper function in 
ZkUtils to build paths out of zookeeper node names? Thanks.


was (Author: vahid):
[~granthenke] Just to clarify, do you mean have some helper function in ZkUtils 
to build paths out of zookeeper node names? Thanks.

> Add a offsets methods to ZkUtils and replace relevant usages
> 
>
> Key: KAFKA-3312
> URL: https://issues.apache.org/jira/browse/KAFKA-3312
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.1
>Reporter: Grant Henke
>Assignee: Vahid Hashemian
>
> There are many places in the code that manually build a zookeeper path and 
> get or update offsets. Moving this logic to a common location in ZkUtils 
> would be nice. 
> Ex:
> {code}
> zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
> {code}
> {code}
>  zkUtils.readData(topicDirs.consumerOffsetDir + "/" + 
> topicAndPartition.partition)._1.toLong
> {code}
> {code}
> zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}",
>  partitionData.offset.toString)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3312) Add a offsets methods to ZkUtils and replace relevant usages

2016-03-04 Thread Grant Henke (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15180398#comment-15180398
 ] 

Grant Henke commented on KAFKA-3312:


[~vahid] Yes, but I was thinking no only building the path, but actually add 
methods like getOffset and updateOffset that takes care of calling zookeeper 
all together. 

> Add a offsets methods to ZkUtils and replace relevant usages
> 
>
> Key: KAFKA-3312
> URL: https://issues.apache.org/jira/browse/KAFKA-3312
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.1
>Reporter: Grant Henke
>Assignee: Vahid Hashemian
>
> There are many places in the code that manually build a zookeeper path and 
> get or update offsets. Moving this logic to a common location in ZkUtils 
> would be nice. 
> Ex:
> {code}
> zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
> {code}
> {code}
>  zkUtils.readData(topicDirs.consumerOffsetDir + "/" + 
> topicAndPartition.partition)._1.toLong
> {code}
> {code}
> zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}",
>  partitionData.offset.toString)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk7 #1092

2016-03-04 Thread Apache Jenkins Server
See 

Changes:

[cshapi] HOTFIX: Generate javadocs for all Streams packages with the exception 
of

--
[...truncated 1517 lines...]

kafka.consumer.ZookeeperConsumerConnectorTest > testConsumerRebalanceListener 
PASSED

kafka.consumer.ZookeeperConsumerConnectorTest > testCompression PASSED

kafka.controller.ControllerFailoverTest > testMetadataUpdate PASSED

kafka.producer.ProducerTest > testSendToNewTopic PASSED

kafka.producer.ProducerTest > testAsyncSendCanCorrectlyFailWithTimeout PASSED

kafka.producer.ProducerTest > testSendNullMessage PASSED

kafka.producer.ProducerTest > testUpdateBrokerPartitionInfo PASSED

kafka.producer.ProducerTest > testSendWithDeadBroker PASSED

kafka.producer.AsyncProducerTest > testFailedSendRetryLogic PASSED

kafka.producer.AsyncProducerTest > testQueueTimeExpired PASSED

kafka.producer.AsyncProducerTest > testPartitionAndCollateEvents PASSED

kafka.producer.AsyncProducerTest > testBatchSize PASSED

kafka.producer.AsyncProducerTest > testSerializeEvents PASSED

kafka.producer.AsyncProducerTest > testProducerQueueSize PASSED

kafka.producer.AsyncProducerTest > testRandomPartitioner PASSED

kafka.producer.AsyncProducerTest > testInvalidConfiguration PASSED

kafka.producer.AsyncProducerTest > testInvalidPartition PASSED

kafka.producer.AsyncProducerTest > testNoBroker PASSED

kafka.producer.AsyncProducerTest > testProduceAfterClosed PASSED

kafka.producer.AsyncProducerTest > testJavaProducer PASSED

kafka.producer.AsyncProducerTest > testIncompatibleEncoder PASSED

kafka.producer.SyncProducerTest > testReachableServer PASSED

kafka.producer.SyncProducerTest > testMessageSizeTooLarge PASSED

kafka.producer.SyncProducerTest > testNotEnoughReplicas PASSED

kafka.producer.SyncProducerTest > testMessageSizeTooLargeWithAckZero PASSED

kafka.producer.SyncProducerTest > testProducerCanTimeout PASSED

kafka.producer.SyncProducerTest > testProduceRequestWithNoResponse PASSED

kafka.producer.SyncProducerTest > testEmptyProduceRequest PASSED

kafka.producer.SyncProducerTest > testProduceCorrectlyReceivesResponse PASSED

kafka.tools.ConsoleProducerTest > testParseKeyProp PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsOldProducer PASSED

kafka.tools.ConsoleProducerTest > testInvalidConfigs PASSED

kafka.tools.ConsoleProducerTest > testValidConfigsNewProducer PASSED

kafka.tools.ConsoleConsumerTest > shouldLimitReadsToMaxMessageLimit PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidNewConsumerValidConfig PASSED

kafka.tools.ConsoleConsumerTest > shouldParseConfigsFromFile PASSED

kafka.tools.ConsoleConsumerTest > shouldParseValidOldConsumerValidConfig PASSED

kafka.security.auth.PermissionTypeTest > testFromString PASSED

kafka.security.auth.ResourceTypeTest > testFromString PASSED

kafka.security.auth.OperationTest > testFromString PASSED

kafka.security.auth.AclTest > testAclJsonConversion PASSED

kafka.security.auth.ZkAuthorizationTest > testIsZkSecurityEnabled PASSED

kafka.security.auth.ZkAuthorizationTest > testZkUtils PASSED

kafka.security.auth.ZkAuthorizationTest > testZkAntiMigration PASSED

kafka.security.auth.ZkAuthorizationTest > testZkMigration PASSED

kafka.security.auth.ZkAuthorizationTest > testChroot PASSED

kafka.security.auth.ZkAuthorizationTest > testDelete PASSED

kafka.security.auth.ZkAuthorizationTest > testDeleteRecursive PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAllowAllAccess PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFound PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAclManagementAPIs PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testWildCardAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testTopicAcl PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testSuperUserHasAccess PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testDenyTakesPrecedence PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFoundOverride PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testTopicMetadataRequest 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.PrimitiveApiTest > testMultiProdu

[GitHub] kafka pull request: KAFKA-3331: Refactor TopicCommand to make desc...

2016-03-04 Thread SinghAsDev
GitHub user SinghAsDev opened a pull request:

https://github.com/apache/kafka/pull/1014

KAFKA-3331: Refactor TopicCommand to make describeTopic testable and …

…add unit tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/SinghAsDev/kafka KAFKA-3331

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1014.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1014


commit 835a54c5abffcfb31949a876cbcddeb077141c15
Author: Ashish Singh 
Date:   2016-03-04T19:34:21Z

KAFKA-3331: Refactor TopicCommand to make describeTopic testable and add 
unit tests.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-3331) Refactor TopicCommand to make it testable and add unit tests

2016-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15180418#comment-15180418
 ] 

ASF GitHub Bot commented on KAFKA-3331:
---

GitHub user SinghAsDev opened a pull request:

https://github.com/apache/kafka/pull/1014

KAFKA-3331: Refactor TopicCommand to make describeTopic testable and …

…add unit tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/SinghAsDev/kafka KAFKA-3331

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1014.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1014


commit 835a54c5abffcfb31949a876cbcddeb077141c15
Author: Ashish Singh 
Date:   2016-03-04T19:34:21Z

KAFKA-3331: Refactor TopicCommand to make describeTopic testable and add 
unit tests.




> Refactor TopicCommand to make it testable and add unit tests
> 
>
> Key: KAFKA-3331
> URL: https://issues.apache.org/jira/browse/KAFKA-3331
> Project: Kafka
>  Issue Type: Wish
>Reporter: Ashish K Singh
>Assignee: Ashish K Singh
>
> TopicCommand has become a functionality packed, hard to read, class. Adding 
> or changing it with confidence requires some unit tests around it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3335) Kafka Connect hangs in shutdown hook

2016-03-04 Thread Ben Kirwin (JIRA)
Ben Kirwin created KAFKA-3335:
-

 Summary: Kafka Connect hangs in shutdown hook
 Key: KAFKA-3335
 URL: https://issues.apache.org/jira/browse/KAFKA-3335
 Project: Kafka
  Issue Type: Bug
Reporter: Ben Kirwin


The `Connect` class can run into issues during start, such as:

{noformat}
Exception in thread "main" org.apache.kafka.connect.errors.ConnectException: 
Could not look up partition metadata for offset backing store topic in allotted 
period. This could indicate a connectivity issue, unavailable topic partitions, 
or if this is your first use of the topic it may have taken too long to create.
at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:130)
at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:85)
at org.apache.kafka.connect.runtime.Worker.start(Worker.java:108)
at org.apache.kafka.connect.runtime.Connect.start(Connect.java:56)
at 
org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:62)
{noformat}

This exception halts the startup process. It also triggers the shutdown hook... 
which blocks waiting for the service to start up before calling stop. This 
causes the process to hang forever.

There's a few things that could be done here, but it would be nice to bound the 
amount of time the process spends trying to exit gracefully.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Additional Kafka Connect schema logical types?

2016-03-04 Thread Randall Hauch
I’m working on a Kafka Connect connector that reads a MySQL binlog to provide 
near real-time change data capture, and I also plan connectors for other 
DBMSes. The problem is that I’m not able to map all of the MySQL data types — 
or even all of the standard JDBC types — to Kafka Connect Schemas without 
resorting to complex Schemas that radically increase the footprint of messages.

Specifically, I’d like my connectors to be able to use the following “logical” 
types:

- Bits: A set of bits of arbitrary length, corresponding to java.util.BitSet. 
See [1] for code..
- IsoTime: An ISO8601 time that includes the time zone and corresponding to 
Java 8’s java.time.OffsetTime that represents a time with the offset from 
UTC/Greenwich, and that has a well-defined ordering and thus is more suitable 
for persistent storage. See [2] for code..
- IsoTimestamp: An ISO8601 timestamp that includes the time zone and 
corresponding to Java 8’s java.time.OffsetDateTime that represents an instant 
with the offset from UTC/Greenwich, and that has a well-defined ordering and 
thus is more suitable for persistent storage. See [3] for code.

These are very similar to the 4 built-in logical types (Decimal, Date, Time, 
and Timestamp). These logical types are much akin to aliases for a primitive 
type (typically BYTES), and their use within a Schema includes semantics that 
would not be there by just using the corresponding primitive.

Unfortunately, Kafka Connect is not currently able to support custom logical 
types. Sure, you can create them, since the JsonConverter (nor any of the other 
Converters) will know how to serialize or deserialize them.

One option is for Kafka Connect to add these, but this is sort of a 
never-ending battle. And, since Kafka is not yet on Java 8, supporting 
OffsetTime and OffsetDateTime would be problematic.

Perhaps a better option is to support custom logical types, where each logical 
type must be based upon a single primitive type and must define a class that 
knows how to serialize and deserialize the logical type from the primitive 
type. The Converters, once modified, could look for the referenced class and 
use its serdes logic as needed.

A couple of points:

1) Any source connector that is producing a record with these logical types 
would obviously have to have the logical type’s class available on the 
classpath. That doesn’t seem a difficult requirement to satisfy.

2) Any consumer or source connector that is consuming records with these values 
needs to be able to work with the logical type’s class to be able to work with 
it. This doesn’t seem too horrible, especially if the logical type class(es) 
are nicely separated into separate JARs. However, if the consumer doesn’t have 
the logical type class, then its local Converter would just deserialize to the 
corresponding primitive value (e.g., byte[], int, long, float, String, etc.) — 
is this sufficient if the consumer or source connector is simply passing the 
value along?

3) There are a couple of ways the logical type’s Schema object could reference 
its class. The 4 built-ins use the convention that the name corresponds to the 
name of the class, though I suspect this is largely just a technique to 
guarantees a unique name. However, at this time there is no interface or base 
class for logical types, so something would have to be changed to allow for 
easy invocation of the serdes methods. An alternative might be to add to 
“Schema” an optional “serdes” field that references the name of the class that 
implements a serdes interface; this is probably cleaner, though it does 
increase the verbosity of the Schema object.


Thoughts?

Randall Hauch

[1] 
https://github.com/debezium/debezium/blob/74c5adcc8d30afaa221bbdbecad3bb6f6febbaa5/debezium-core/src/main/java/io/debezium/data/Bits.java
[2] 
https://github.com/debezium/debezium/blob/74c5adcc8d30afaa221bbdbecad3bb6f6febbaa5/debezium-core/src/main/java/io/debezium/data/IsoTime.java
[3] 
https://github.com/debezium/debezium/blob/74c5adcc8d30afaa221bbdbecad3bb6f6febbaa5/debezium-core/src/main/java/io/debezium/data/IsoTimestamp.java
 




[GitHub] kafka pull request: KAFKA-3299: Ensure that reading config log on ...

2016-03-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/981


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (KAFKA-3299) KafkaConnect: DistributedHerder shouldn't wait forever to read configs after rebalance

2016-03-04 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-3299.
--
   Resolution: Fixed
Fix Version/s: 0.10.0.0

Issue resolved by pull request 981
[https://github.com/apache/kafka/pull/981]

> KafkaConnect: DistributedHerder shouldn't wait forever to read configs after 
> rebalance
> --
>
> Key: KAFKA-3299
> URL: https://issues.apache.org/jira/browse/KAFKA-3299
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
> Fix For: 0.10.0.0
>
>
> Right now, the handleRebalance code calls readConfigToEnd with timeout of 
> MAX_INT if it isn't the leader.
> The normal workerSyncTimeoutMs is probably sufficient.
> At least this allows a worker to time-out, get back to the tick() loop and 
> check the "stopping" flag to see if it should shut down, to prevent it from 
> hanging forever.
> It doesn't resolve the question of what we should do with a worker that 
> repeatedly fails to read configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3299) KafkaConnect: DistributedHerder shouldn't wait forever to read configs after rebalance

2016-03-04 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-3299:
-
Assignee: Gwen Shapira

> KafkaConnect: DistributedHerder shouldn't wait forever to read configs after 
> rebalance
> --
>
> Key: KAFKA-3299
> URL: https://issues.apache.org/jira/browse/KAFKA-3299
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Fix For: 0.10.0.0
>
>
> Right now, the handleRebalance code calls readConfigToEnd with timeout of 
> MAX_INT if it isn't the leader.
> The normal workerSyncTimeoutMs is probably sufficient.
> At least this allows a worker to time-out, get back to the tick() loop and 
> check the "stopping" flag to see if it should shut down, to prevent it from 
> hanging forever.
> It doesn't resolve the question of what we should do with a worker that 
> repeatedly fails to read configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3299) KafkaConnect: DistributedHerder shouldn't wait forever to read configs after rebalance

2016-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15180642#comment-15180642
 ] 

ASF GitHub Bot commented on KAFKA-3299:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/981


> KafkaConnect: DistributedHerder shouldn't wait forever to read configs after 
> rebalance
> --
>
> Key: KAFKA-3299
> URL: https://issues.apache.org/jira/browse/KAFKA-3299
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
> Fix For: 0.10.0.0
>
>
> Right now, the handleRebalance code calls readConfigToEnd with timeout of 
> MAX_INT if it isn't the leader.
> The normal workerSyncTimeoutMs is probably sufficient.
> At least this allows a worker to time-out, get back to the tick() loop and 
> check the "stopping" flag to see if it should shut down, to prevent it from 
> hanging forever.
> It doesn't resolve the question of what we should do with a worker that 
> repeatedly fails to read configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk7 #1093

2016-03-04 Thread Apache Jenkins Server
See 

Changes:

[me] KAFKA-3299: Ensure that reading config log on rebalance doesn't hang

--
[...truncated 1494 lines...]

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooLow 
PASSED

kafka.integration.AutoOffsetResetTest > testResetToEarliestWhenOffsetTooHigh 
PASSED

kafka.integration.AutoOffsetResetTest > testResetToLatestWhenOffsetTooHigh 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testTopicMetadataRequest 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.KafkaTest > testGetKafkaConfigFromArgsWrongSetValue PASSED

kafka.KafkaTest > testKafkaSslPasswords PASSED

kafka.KafkaTest > testGetKafkaConfigFromArgs PASSED

kafka.KafkaTest > testGetKafkaConfigFromArgsNonArgsAtTheEnd PASSED

kafka.KafkaTest > testGetKafkaConfigFromArgsNonArgsOnly PASSED

kafka.KafkaTest > testGetKafkaConfigFromArgsNonArgsAtTheBegging PASSED

kafka.metrics.KafkaTimerTest > testKafkaTimer PASSED

kafka.utils.UtilsTest > testAbs PASSED

kafka.utils.UtilsTest > testReplaceSuffix PASSED

kafka.utils.UtilsTest > testCircularIterator PASSED

kafka.utils.UtilsTest > testReadBytes PASSED

kafka.utils.UtilsTest > testCsvList PASSED

kafka.utils.UtilsTest > testReadInt PASSED

kafka.utils.UtilsTest > testCsvMap PASSED

kafka.utils.UtilsTest > testInLock PASSED

kafka.utils.UtilsTest > testSwallow PASSED

kafka.utils.SchedulerTest > testMockSchedulerNonPeriodicTask PASSED

kafka.utils.SchedulerTest > testMockSchedulerPeriodicTask PASSED

kafka.utils.SchedulerTest > testNonPeriodicTask PASSED

kafka.utils.SchedulerTest > testRestart PASSED

kafka.utils.SchedulerTest > testReentrantTaskInMockScheduler PASSED

kafka.utils.SchedulerTest > testPeriodicTask PASSED

kafka.utils.ByteBoundedBlockingQueueTest > testByteBoundedBlockingQueue PASSED

kafka.utils.timer.TimerTaskListTest > testAll PASSED

kafka.utils.timer.TimerTest > testAlreadyExpiredTask PASSED

kafka.utils.timer.TimerTest > testTaskExpiration PASSED

kafka.utils.CommandLineUtilsTest > testParseEmptyArg PASSED

kafka.utils.CommandLineUtilsTest > testParseSingleArg PASSED

kafka.utils.CommandLineUtilsTest > testParseArgs PASSED

kafka.utils.CommandLineUtilsTest > testParseEmptyArgAsValid PASSED

kafka.utils.IteratorTemplateTest > testIterator PASSED

kafka.utils.ReplicationUtilsTest > testUpdateLeaderAndIsr PASSED

kafka.utils.ReplicationUtilsTest > testGetLeaderIsrAndEpochForPartition PASSED

kafka.utils.JsonTest > testJsonEncoding PASSED

kafka.message.MessageCompressionTest > testCompressSize PASSED

kafka.message.MessageCompressionTest > testSimpleCompressDecompress PASSED

kafka.message.MessageWriterTest > testWithNoCompressionAttribute PASSED

kafka.message.MessageWriterTest > testWithCompressionAttribute PASSED

kafka.message.MessageWriterTest > testBufferingOutputStream PASSED

kafka.message.MessageWriterTest > testWithKey PASSED

kafka.message.MessageTest > testChecksum PASSED

kafka.message.MessageTest > testInvalidTimestamp PASSED

kafka.message.MessageTest > testIsHashable PASSED

kafka.message.MessageTest > testInvalidTimestampAndMagicValueCombination PASSED

kafka.message.MessageTest > testExceptionMapping PASSED

kafka.message.MessageTest > testFieldValues PASSED

kafka.message.MessageTest > testInvalidMagicByte PASSED

kafka.message.MessageTest > testEquality PASSED

kafka.message.MessageTest > testMessageFormatConversion PASSED

kafka.message.ByteBufferMessageSetTest > testMessageWithProvidedOffsetSeq PASSED

kafka.message.ByteBufferMessageSetTest > testValidBytes PASSED

kafka.message.ByteBufferMessageSetTest > testValidBytesWithCompression PASSED

kafka.message.ByteBufferMessageSetTest > 
testOffsetAssignmentAfterMessageFormatConversion PASSED

kafka.message.ByteBufferMessageSetTest > testIteratorIsConsistent PASSED

kafka.message.ByteBufferMessageSetTest > testAbsoluteOffsetAssignment PASSED

kafka.message.ByteBufferMessageSetTest > testCreateTime PASSED

kafka.message.ByteBufferMessageSetTest > testInvalidCreateTime PASSED

kafka.message.ByteBufferMessageSetTest > testWrittenEqualsRead PASSED

kafka.message.ByteBufferMessageSetTest > testLogAppendTime PASSED

kafka.message.ByteBufferMessageSetTest > testWriteTo PASSED

ka

Re: [DISCUSS] KIP-35 - Retrieve protocol version

2016-03-04 Thread Ashish Singh
Hello Jay,

The overall approach sounds good. I do realize that this discussion has
gotten too lengthy and is starting to shoot tangents. Maybe a KIP call will
help us getting to a decision faster. I do have a few questions though.

On Fri, Mar 4, 2016 at 9:52 AM, Jay Kreps  wrote:

> Yeah here is my summary of my take:
>
> 1. Negotiating a per-connection protocol actually does add a lot of
> complexity to clients (many more failure states to get right).
>
> 2. Having the client configure the protocol version manually is doable now
> but probably a worse state. I suspect this will lead to more not less
> confusion.
>
> 3. I don't think the current state is actually that bad. Integrators pick a
> conservative version and build against that. There is a tradeoff between
> getting the new features and being compatible with old Kafka versions. But
> a large part of this tradeoff is essential since new features aren't going
> to magically appear on old servers, so even if you upgrade your client you
> likely aren't going to get the new stuff (since we will end up dynamically
> turning it off). Having client features that are there but don't work
> because you're on an old cluster may actually be a worse experience if not
> handled very carefully..
>
> 4. The problems Dana brought up are totally orthogonal to the problem of
> having per-api versions or overall versions. The problem was that we
> changed behavior subtly without changing the version. This will be an issue
> regardless of whether the version is global or not.
>
> 5. Using the broker release as the version is strictly worse than using a
> global protocol version (0, 1, 2, ...) that increments any time any api
> changes but doesn't increment just because non-protocol code is changed.
> The problem with using the broker release version is we want to be able to
> keep Kafka releasable from any commit which means there isn't as clear a
> sequencing of releases as you would think.
>
> 6. We need to consider the case of mixed version clusters during the time
> period when you are upgrading Kafka.
>
> So overall I think this is not a critical thing to do right now, but if we
> are going to do it we should do it in a way that actually improves things.
>
> Here would be one proposal for that:
> a. Add a global protocol version that increments with any api version
> update. Move the documentation so that the docs are by version. This is
> basically just a short-hand for a complete set of supported api versions.
> b. Include a field in the metadata response for each broker that adds the
> protocol version.
>
There might be an issue here where the metadata request version sent by
client is not supported by broker, an older broker. However, if we are
clearly stating that a client is not guaranteed to work with an older
broker then this becomes expected. This will potentially limit us in terms
of supporting downgrades though, if we ever want to.

> c. To maintain the protocol version this information will have to get
> propagated with the rest of the broker metadata like host, port, id, etc.
>
> The instructions to clients would be:
> - By default you build against a single conservative Kafka protocol version
> and we carry that support forward, as today
>
If I am getting this correct, this will mean we will never deprecate/remove
any protocol version in future. Having some way to deprecate/remove older
protocol versions will probably be a good idea. It is possible with the
global protocol version approach, it could be as simple as marking a
protocol deprecated in protocol doc before removing it. Just want to make
sure deprecation is still on the table.

> - If you want to get fancy you can use the protocol version field in the
> metadata request to more dynamically chose what features are available and
> select api versions appropriately. This is purely optional.
>
> -Jay
>
> On Thu, Mar 3, 2016 at 9:38 PM, Jason Gustafson 
> wrote:
>
> > I talked with Jay about this KIP briefly this morning, so let me try to
> > summarize the discussion (I'm sure he'll jump in if I get anything
> wrong).
> > Apologies in advance for the length.
> >
> > I think we both share some skepticism that a request with all the
> supported
> > versions of all the request APIs is going to be a useful primitive to try
> > and build client compatibility around. In practice I think people would
> end
> > up checking for particular request versions in order to determine if the
> > broker is 0.8 or 0.9 or whatever, and then change behavior accordingly.
> I'm
> > wondering if there's a reasonable way to handle the version responses
> that
> > doesn't amount to that. Maybe you could try to capture feature
> > compatibility by checking the versions for a subset of request types? For
> > example, to ensure that you can use the new consumer API, you check that
> > the group coordinator request is present, the offset commit request
> version
> > is greater than 2, the offset fetch request is greater t

[jira] [Commented] (KAFKA-3303) Pass partial record metadata to Interceptor onAcknowledgement in case of errors

2016-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15181383#comment-15181383
 ] 

ASF GitHub Bot commented on KAFKA-3303:
---

GitHub user apovzner opened a pull request:

https://github.com/apache/kafka/pull/1015

KAFKA-3303: Pass partial record metadata to 
ProducerInterceptor.onAcknowledgement on error

This is a KIP-42 followup. 

Currently, If sending the record fails before it gets to the server, 
ProducerInterceptor.onAcknowledgement() is called with metadata == null, and 
non-null exception. However, it is useful to pass topic and partition, if 
known, to ProducerInterceptor.onAcknowledgement() as well. This patch ensures 
that  ProducerInterceptor.onAcknowledgement()  gets record metadata with topic 
and maybe partition. If partition is not set in 'record' and 
KafkaProducer.send() fails before partition gets assigned, then 
ProducerInterceptor.onAcknowledgement() gets RecordMetadata with partition == 
-1. Only time when  ProducerInterceptor.onAcknowledgement() gets null record 
metadata is when the client passes null record to KafkaProducer.send().

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apovzner/kafka kip42-3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1015.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1015


commit 169085a6b502d9458f477030cd6045f20b0100a7
Author: Anna Povzner 
Date:   2016-03-05T01:05:56Z

KAFKA-3303: Pass partial record metadata to Interceptor onAcknowledgement 
in case of errors




> Pass partial record metadata to Interceptor onAcknowledgement in case of 
> errors
> ---
>
> Key: KAFKA-3303
> URL: https://issues.apache.org/jira/browse/KAFKA-3303
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.0.0
>Reporter: Anna Povzner
>Assignee: Anna Povzner
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> Currently Interceptor.onAcknowledgement behaves similarly to Callback. If 
> exception occurred and exception is passed to onAcknowledgement, metadata 
> param is set to null.
> However, it would be useful to pass topic, and partition if available to the 
> interceptor so that it knows which topic/partition got an error.
> This is part of KIP-42.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request: KAFKA-3303: Pass partial record metadata to Pr...

2016-03-04 Thread apovzner
GitHub user apovzner opened a pull request:

https://github.com/apache/kafka/pull/1015

KAFKA-3303: Pass partial record metadata to 
ProducerInterceptor.onAcknowledgement on error

This is a KIP-42 followup. 

Currently, If sending the record fails before it gets to the server, 
ProducerInterceptor.onAcknowledgement() is called with metadata == null, and 
non-null exception. However, it is useful to pass topic and partition, if 
known, to ProducerInterceptor.onAcknowledgement() as well. This patch ensures 
that  ProducerInterceptor.onAcknowledgement()  gets record metadata with topic 
and maybe partition. If partition is not set in 'record' and 
KafkaProducer.send() fails before partition gets assigned, then 
ProducerInterceptor.onAcknowledgement() gets RecordMetadata with partition == 
-1. Only time when  ProducerInterceptor.onAcknowledgement() gets null record 
metadata is when the client passes null record to KafkaProducer.send().

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apovzner/kafka kip42-3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1015.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1015


commit 169085a6b502d9458f477030cd6045f20b0100a7
Author: Anna Povzner 
Date:   2016-03-05T01:05:56Z

KAFKA-3303: Pass partial record metadata to Interceptor onAcknowledgement 
in case of errors




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Build failed in Jenkins: kafka-trunk-jdk8 #421

2016-03-04 Thread Apache Jenkins Server
See 

Changes:

[me] KAFKA-3299: Ensure that reading config log on rebalance doesn't hang

--
[...truncated 3533 lines...]

org.apache.kafka.clients.producer.RecordSendTest > testBlocking PASSED

org.apache.kafka.clients.ClientUtilsTest > testParseAndValidateAddresses PASSED

org.apache.kafka.clients.ClientUtilsTest > testNoPort PASSED

org.apache.kafka.clients.NetworkClientTest > testSimpleRequestResponse PASSED

org.apache.kafka.clients.NetworkClientTest > testClose PASSED

org.apache.kafka.clients.NetworkClientTest > testLeastLoadedNode PASSED

org.apache.kafka.clients.NetworkClientTest > testConnectionDelayConnected PASSED

org.apache.kafka.clients.NetworkClientTest > testRequestTimeout PASSED

org.apache.kafka.clients.NetworkClientTest > 
testSimpleRequestResponseWithStaticNodes PASSED

org.apache.kafka.clients.NetworkClientTest > testConnectionDelay PASSED

org.apache.kafka.clients.NetworkClientTest > testSendToUnreadyNode PASSED

org.apache.kafka.clients.NetworkClientTest > testConnectionDelayDisconnected 
PASSED

org.apache.kafka.clients.MetadataTest > testListenerCanUnregister PASSED

org.apache.kafka.clients.MetadataTest > testFailedUpdate PASSED

org.apache.kafka.clients.MetadataTest > testMetadataUpdateWaitTime PASSED

org.apache.kafka.clients.MetadataTest > testUpdateWithNeedMetadataForAllTopics 
PASSED

org.apache.kafka.clients.MetadataTest > testMetadata PASSED

org.apache.kafka.clients.MetadataTest > testListenerGetsNotifiedOfUpdate PASSED

org.apache.kafka.common.SerializeCompatibilityTopicPartitionTest > 
testSerializationRoundtrip PASSED

org.apache.kafka.common.SerializeCompatibilityTopicPartitionTest > 
testTopiPartitionSerializationCompatibility PASSED

org.apache.kafka.common.serialization.SerializationTest > testStringSerializer 
PASSED

org.apache.kafka.common.serialization.SerializationTest > testIntegerSerializer 
PASSED

org.apache.kafka.common.serialization.SerializationTest > 
testByteBufferSerializer PASSED

org.apache.kafka.common.config.AbstractConfigTest > testOriginalsWithPrefix 
PASSED

org.apache.kafka.common.config.AbstractConfigTest > testConfiguredInstances 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testBasicTypes PASSED

org.apache.kafka.common.config.ConfigDefTest > testNullDefault PASSED

org.apache.kafka.common.config.ConfigDefTest > testInvalidDefaultRange PASSED

org.apache.kafka.common.config.ConfigDefTest > testValidators PASSED

org.apache.kafka.common.config.ConfigDefTest > testInvalidDefaultString PASSED

org.apache.kafka.common.config.ConfigDefTest > testSslPasswords PASSED

org.apache.kafka.common.config.ConfigDefTest > testInvalidDefault PASSED

org.apache.kafka.common.config.ConfigDefTest > testMissingRequired PASSED

org.apache.kafka.common.config.ConfigDefTest > testDefinedTwice PASSED

org.apache.kafka.common.config.ConfigDefTest > testBadInputs PASSED

org.apache.kafka.common.protocol.ErrorsTest > testForExceptionDefault PASSED

org.apache.kafka.common.protocol.ErrorsTest > testUniqueExceptions PASSED

org.apache.kafka.common.protocol.ErrorsTest > testForExceptionInheritance PASSED

org.apache.kafka.common.protocol.ErrorsTest > testNoneException PASSED

org.apache.kafka.common.protocol.ErrorsTest > testUniqueErrorCodes PASSED

org.apache.kafka.common.protocol.ErrorsTest > testExceptionsAreNotGeneric PASSED

org.apache.kafka.common.protocol.ProtoUtilsTest > schemaVersionOutOfRange PASSED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > testArray 
PASSED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > testNulls 
PASSED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > 
testNullableDefault PASSED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > testDefault 
PASSED

org.apache.kafka.common.protocol.types.ProtocolSerializationTest > testSimple 
PASSED

org.apache.kafka.common.requests.RequestResponseTest > testSerialization PASSED

org.apache.kafka.common.requests.RequestResponseTest > fetchResponseVersionTest 
PASSED

org.apache.kafka.common.requests.RequestResponseTest > 
produceResponseVersionTest PASSED

org.apache.kafka.common.requests.RequestResponseTest > 
testControlledShutdownResponse PASSED

org.apache.kafka.common.requests.RequestResponseTest > 
testRequestHeaderWithNullClientId PASSED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testPrincipalNameCanContainSeparator PASSED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testEqualsAndHashCode PASSED

org.apache.kafka.common.security.kerberos.KerberosNameTest > testParse PASSED

org.apache.kafka.common.security.ssl.SslFactoryTest > testClientMode PASSED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryConfiguration PASSED

org.apache.kafka.common.metrics.MetricsTest > testSimpleStats PASSED

org.apache.kafka.common.metrics.MetricsTest > testOldDataHasNoEffect 

[jira] [Updated] (KAFKA-3303) Pass partial record metadata to Interceptor onAcknowledgement in case of errors

2016-03-04 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anna Povzner updated KAFKA-3303:

Status: Patch Available  (was: In Progress)

> Pass partial record metadata to Interceptor onAcknowledgement in case of 
> errors
> ---
>
> Key: KAFKA-3303
> URL: https://issues.apache.org/jira/browse/KAFKA-3303
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.0.0
>Reporter: Anna Povzner
>Assignee: Anna Povzner
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> Currently Interceptor.onAcknowledgement behaves similarly to Callback. If 
> exception occurred and exception is passed to onAcknowledgement, metadata 
> param is set to null.
> However, it would be useful to pass topic, and partition if available to the 
> interceptor so that it knows which topic/partition got an error.
> This is part of KIP-42.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-35 - Retrieve protocol version

2016-03-04 Thread Jay Kreps
Hey Ashish,

Both good points.

I think the issue with the general metadata request is the same as the
issue with a version-specific metadata request from the other
proposal--basically it's a chicken and egg problem, to find out anything
about the cluster you have to be able to communicate something in a format
the server can understand without knowing a priori what version it's on. I
guess the question is how can you continue to evolve the metadata request
(whether it is the existing metadata or a protocol-version specific
metadata request) given that you need this information to bootstrap you
have to be more careful in how that request evolves.

I think deprecation/removal may be okay. Ultimately clients will always use
the highest possible version of the protocol the server supports so if
we've already deprecated and removed your highest version then you are
screwed and you're going to get an error no matter what, right? Basically
there is nothing dynamic you can do in that case.

-Jay

On Fri, Mar 4, 2016 at 4:05 PM, Ashish Singh  wrote:

> Hello Jay,
>
> The overall approach sounds good. I do realize that this discussion has
> gotten too lengthy and is starting to shoot tangents. Maybe a KIP call will
> help us getting to a decision faster. I do have a few questions though.
>
> On Fri, Mar 4, 2016 at 9:52 AM, Jay Kreps  wrote:
>
> > Yeah here is my summary of my take:
> >
> > 1. Negotiating a per-connection protocol actually does add a lot of
> > complexity to clients (many more failure states to get right).
> >
> > 2. Having the client configure the protocol version manually is doable
> now
> > but probably a worse state. I suspect this will lead to more not less
> > confusion.
> >
> > 3. I don't think the current state is actually that bad. Integrators
> pick a
> > conservative version and build against that. There is a tradeoff between
> > getting the new features and being compatible with old Kafka versions.
> But
> > a large part of this tradeoff is essential since new features aren't
> going
> > to magically appear on old servers, so even if you upgrade your client
> you
> > likely aren't going to get the new stuff (since we will end up
> dynamically
> > turning it off). Having client features that are there but don't work
> > because you're on an old cluster may actually be a worse experience if
> not
> > handled very carefully..
> >
> > 4. The problems Dana brought up are totally orthogonal to the problem of
> > having per-api versions or overall versions. The problem was that we
> > changed behavior subtly without changing the version. This will be an
> issue
> > regardless of whether the version is global or not.
> >
> > 5. Using the broker release as the version is strictly worse than using a
> > global protocol version (0, 1, 2, ...) that increments any time any api
> > changes but doesn't increment just because non-protocol code is changed.
> > The problem with using the broker release version is we want to be able
> to
> > keep Kafka releasable from any commit which means there isn't as clear a
> > sequencing of releases as you would think.
> >
> > 6. We need to consider the case of mixed version clusters during the time
> > period when you are upgrading Kafka.
> >
> > So overall I think this is not a critical thing to do right now, but if
> we
> > are going to do it we should do it in a way that actually improves
> things.
> >
> > Here would be one proposal for that:
> > a. Add a global protocol version that increments with any api version
> > update. Move the documentation so that the docs are by version. This is
> > basically just a short-hand for a complete set of supported api versions.
> > b. Include a field in the metadata response for each broker that adds the
> > protocol version.
> >
> There might be an issue here where the metadata request version sent by
> client is not supported by broker, an older broker. However, if we are
> clearly stating that a client is not guaranteed to work with an older
> broker then this becomes expected. This will potentially limit us in terms
> of supporting downgrades though, if we ever want to.
>
> > c. To maintain the protocol version this information will have to get
> > propagated with the rest of the broker metadata like host, port, id, etc.
> >
> > The instructions to clients would be:
> > - By default you build against a single conservative Kafka protocol
> version
> > and we carry that support forward, as today
> >
> If I am getting this correct, this will mean we will never deprecate/remove
> any protocol version in future. Having some way to deprecate/remove older
> protocol versions will probably be a good idea. It is possible with the
> global protocol version approach, it could be as simple as marking a
> protocol deprecated in protocol doc before removing it. Just want to make
> sure deprecation is still on the table.
>
> > - If you want to get fancy you can use the protocol version field in the
> > metadata request to more dynamic

[jira] [Updated] (KAFKA-3185) Allow users to cleanup internal data

2016-03-04 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3185:
-
Priority: Blocker  (was: Major)

> Allow users to cleanup internal data
> 
>
> Key: KAFKA-3185
> URL: https://issues.apache.org/jira/browse/KAFKA-3185
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Priority: Blocker
>
> Currently the internal data is managed completely by Kafka Streams framework 
> and users cannot clean them up actively. This results in a bad out-of-the-box 
> user experience especially for running demo programs since it results 
> internal data (changelog topics, RocksDB files, etc) that need to be cleaned 
> manually. It will be better to add a
> {code}
> KafkaStreams.cleanup()
> {code}
> function call to clean up these internal data programmatically.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3336) Unify ser/de pair classes into one serde class

2016-03-04 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-3336:


 Summary: Unify ser/de pair classes into one serde class
 Key: KAFKA-3336
 URL: https://issues.apache.org/jira/browse/KAFKA-3336
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang
Priority: Blocker
 Fix For: 0.10.0.0


Right now users must provide two separate classes for serializers and 
deserializers, respectively.  This means the current API functions have at 
least 2 * numberOfTypes parameters.

*Example (current, bad): "foo(..., longSerializer, longDeserializer)".*

Because the serde aspect of the API is already one of the biggest UX issues, we 
should unify the serde functionality into a single serde class, i.e. one class 
that provides both serialization and deserialization functionality.  This will 
reduce the number of required serde parameters in the API by 50%.

*Example (suggested, better): "foo(..., longSerializerDeserializer)"*. 
* Note: This parameter name is horrible and only used to highlight the 
difference to the "current" example above.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3336) Unify ser/de pair classes into one serde class

2016-03-04 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3336:
-
Description: 
Right now users must provide two separate classes for serializers and 
deserializers, respectively.  This means the current API functions have at 
least 2 * numberOfTypes parameters.

*Example (current, bad): "foo(..., longSerializer, longDeserializer)".*

Because the serde aspect of the API is already one of the biggest UX issues, we 
should unify the serde functionality into a single serde class, i.e. one class 
that provides both serialization and deserialization functionality.  This will 
reduce the number of required serde parameters in the API by 50%.

*Example (suggested, better): "foo(..., longSerializerDeserializer)"*. 
* Note: This parameter name is horrible and only used to highlight the 
difference to the "current" example above.

We also want to 1) add a pairing function for each operator that does not 
require serialization, and 2) provide default serdes for common data types.

  was:
Right now users must provide two separate classes for serializers and 
deserializers, respectively.  This means the current API functions have at 
least 2 * numberOfTypes parameters.

*Example (current, bad): "foo(..., longSerializer, longDeserializer)".*

Because the serde aspect of the API is already one of the biggest UX issues, we 
should unify the serde functionality into a single serde class, i.e. one class 
that provides both serialization and deserialization functionality.  This will 
reduce the number of required serde parameters in the API by 50%.

*Example (suggested, better): "foo(..., longSerializerDeserializer)"*. 
* Note: This parameter name is horrible and only used to highlight the 
difference to the "current" example above.


> Unify ser/de pair classes into one serde class
> --
>
> Key: KAFKA-3336
> URL: https://issues.apache.org/jira/browse/KAFKA-3336
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> Right now users must provide two separate classes for serializers and 
> deserializers, respectively.  This means the current API functions have at 
> least 2 * numberOfTypes parameters.
> *Example (current, bad): "foo(..., longSerializer, longDeserializer)".*
> Because the serde aspect of the API is already one of the biggest UX issues, 
> we should unify the serde functionality into a single serde class, i.e. one 
> class that provides both serialization and deserialization functionality.  
> This will reduce the number of required serde parameters in the API by 50%.
> *Example (suggested, better): "foo(..., longSerializerDeserializer)"*. 
> * Note: This parameter name is horrible and only used to highlight the 
> difference to the "current" example above.
> We also want to 1) add a pairing function for each operator that does not 
> require serialization, and 2) provide default serdes for common data types.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3337) Extract selector as a separate groupBy operator for KTable aggregations

2016-03-04 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-3337:


 Summary: Extract selector as a separate groupBy operator for 
KTable aggregations
 Key: KAFKA-3337
 URL: https://issues.apache.org/jira/browse/KAFKA-3337
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang
Priority: Blocker
 Fix For: 0.10.0.0


Currently KTable aggregation takes a selector and an aggregator, which makes 
the function a little bit "heavy". It is better to extract the selector in a 
separate groupBy function such that

{code}
table.groupBy(selector).aggregate(aggregator);
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3338) Add print / writeAsTex / etc functions to the DSL

2016-03-04 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-3338:


 Summary: Add print / writeAsTex / etc functions to the DSL
 Key: KAFKA-3338
 URL: https://issues.apache.org/jira/browse/KAFKA-3338
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang
Priority: Blocker
 Fix For: 0.10.0.0


We want to provide some REPL-like pattern for users for better debuggability; 
this would be like a finer grained trace-level logging. Such example APIs can 
be found in Flink (search for class DataSet):

https://ci.apache.org/projects/flink/flink-docs-master/api/java/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)